From 47178dd3b385c162490171529a7776f8e9b48451 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Mon, 24 Jan 2022 09:00:51 -0500 Subject: [PATCH] Started to work on the scheduler rework --- .gitignore | 3 - multi/init.lua | 549 +++++++++++++++++++++++++-------------------- test4.lua | 30 +++ tests/runtests.lua | 6 +- 4 files changed, 339 insertions(+), 249 deletions(-) create mode 100644 test4.lua diff --git a/.gitignore b/.gitignore index 73824b6..c9636a2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,5 @@ *lua5.3 *lua5.4 *luajit -test2.lua -test.lua -test3.lua *.code-workspace *.dat \ No newline at end of file diff --git a/multi/init.lua b/multi/init.lua index 08350fa..9b07bff 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -32,23 +32,14 @@ if not _G["$multi"] then _G["$multi"] = {multi=multi,thread=thread} end -multi.Version = "15.1.0" -multi.stage = "stable" +multi.Version = "15.2.0" multi.Name = "multi.root" multi.NIL = {Type="NIL"} multi.Mainloop = {} -multi.Garbage = {} -multi.ender = {} multi.Children = {} multi.Active = true multi.Type = "rootprocess" -multi.Rest = 0 -multi._type = type -multi.queue = {} -multi.clock = os.clock -multi.time = os.time multi.LinkedPath = multi -multi.lastTime = clock() multi.TIMEOUT = "TIMEOUT" multi.TID = 0 multi.defaultSettings = {} @@ -83,13 +74,15 @@ multi.threstimed=.001 -- System function multi.Stop() - mainloopActive=false + isRunning = false + mainloopActive = false end --Processor local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"} local ProcessName = {"SubProcessor","MainProcessor"} local globalThreads = {} + function multi:getTasksDetails(t) if not(t) then str = { @@ -562,9 +555,9 @@ end --Constructors [CORE] local _tid = 0 function multi:newBase(ins) - if not(self.Type=='rootprocess' or self.Type=='process' or self.Type=='queue' or self.Type == 'sandbox') then error('Can only create an object on multi or an interface obj') return false end + if not(self.Type=='rootprocess' or self.Type=='process') then error('Can only create an object on multi or an interface obj') return false end local c = {} - if self.Type=='process' or self.Type=='queue' or self.Type=='sandbox' then + if self.Type=='process' then setmetatable(c, {__index = multi}) else setmetatable(c, {__index = multi}) @@ -876,6 +869,7 @@ end local scheduledjobs = {} local sthread + function multi:scheduleJob(time,func) if not sthread then sthread = multi:newThread("JobScheduler",function() @@ -906,6 +900,7 @@ end local __CurrentProcess = multi local __CurrentTask + function multi.getCurrentProcess() return __CurrentProcess end @@ -959,146 +954,6 @@ function multi:newProcessor(name,nothread) return c end --- Threading stuff -local initT = false -local threadCount = 0 -local threadid = 0 -thread.__threads = {} -local threads = thread.__threads -local Gref = _G -multi.GlobalVariables={} -local dFunc = function() return true end -local dRef = {nil,nil,nil,nil,nil} -thread.requests = {} - -function thread.request(t,cmd,...) - thread.requests[t.thread] = {cmd,{...}} -end - -function thread.getRunningThread() - local threads = globalThreads - local t = coroutine.running() - if t then - for th,process in pairs(threads) do - if t==th.thread then - return th - end - end - end -end - -function thread._Requests() - local t = thread.requests[coroutine.running()] - if t then - thread.requests[coroutine.running()] = nil - local cmd,args = t[1],t[2] - thread[cmd](unpack(args)) - end -end - -function thread.sleep(n) - thread._Requests() - thread.getRunningThread().lastSleep = clock() - dRef[1] = "_sleep_" - dRef[2] = n or 0 - return coroutine.yield(dRef) -end - -function thread.hold(n,opt) - thread._Requests() - if opt and type(opt)=="table" then - if opt.interval then - dRef[4] = opt.interval - end - if opt.cycles then - dRef[1] = "_holdW_" - dRef[2] = opt.cycles or 1 - dRef[3] = n or dFunc - return coroutine.yield(dRef) - elseif opt.sleep then - dRef[1] = "_holdF_" - dRef[2] = opt.sleep - dRef[3] = n or dFunc - return coroutine.yield(dRef) - elseif opt.skip then - dRef[1] = "_skip_" - dRef[2] = opt.skip or 1 - return coroutine.yield(dRef) - end - end - if type(n) == "number" then - thread.getRunningThread().lastSleep = clock() - dRef[1] = "_sleep_" - dRef[2] = n or 0 - return coroutine.yield(dRef) - else - dRef[1] = "_hold_" - dRef[2] = n or dFunc - return coroutine.yield(dRef) - end -end - -function thread.holdFor(sec,n) - thread._Requests() - dRef[1] = "_holdF_" - dRef[2] = sec - dRef[3] = n or dFunc - return coroutine.yield(dRef) -end - -function thread.holdWithin(skip,n) - thread._Requests() - dRef[1] = "_holdW_" - dRef[2] = skip or 1 - dRef[3] = n or dFunc - return coroutine.yield(dRef) -end - -function thread.skip(n) - thread._Requests() - dRef[1] = "_skip_" - dRef[2] = n or 1 - return coroutine.yield(dRef) -end - -function thread.kill() - dRef[1] = "_kill_" - dRef[2] = "T_T" - return coroutine.yield(dRef) -end - -function thread.yield() - thread._Requests() - return thread.sleep(0) -end - -function thread.isThread() - if _VERSION~="Lua 5.1" then - local a,b = coroutine.running() - return not(b) - else - return coroutine.running()~=nil - end -end - -function thread.getCores() - return thread.__CORES -end - -function thread.set(name,val) - multi.GlobalVariables[name]=val - return true -end - -function thread.get(name) - return multi.GlobalVariables[name] -end - -function thread.waitFor(name) - thread.hold(function() return thread.get(name)~=nil end) - return thread.get(name) -end - function multi.hold(func,opt) if thread.isThread() then if type(func) == "function" or type(func) == "table" then @@ -1132,6 +987,122 @@ function multi.hold(func,opt) end end +-- Threading stuff +local initT = false +local threadCount = 0 +local threadid = 0 +thread.__threads = {} +local threads = thread.__threads +multi.GlobalVariables={} +local dFunc = function() return true end +thread.requests = {} + +function thread.request(t,cmd,...) + thread.requests[t.thread] = {cmd,{...}} +end + +function thread.getRunningThread() + local threads = globalThreads + local t = coroutine.running() + if t then + for th,process in pairs(threads) do + if t==th.thread then + return th + end + end + end +end + +function thread._Requests() + local t = thread.requests[coroutine.running()] + if t then + thread.requests[coroutine.running()] = nil + local cmd,args = t[1],t[2] + thread[cmd](unpack(args)) + end +end + +function thread.sleep(n) + thread._Requests() + thread.getRunningThread().lastSleep = clock() + return coroutine.yield(CMD, t_sleep, n or 1) +end + +local CMD = {} -- We will compare this special local +local interval +local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7 + +function thread.hold(n,opt) + thread._Requests() + if type(opt)=="table" then + interval = opt.interval + if opt.cycles then + return coroutine.yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval) + elseif opt.sleep then + return coroutine.yield(CMD, t_holdF, opt.sleep, n or dFunc, interval) + elseif opt.skip then + return coroutine.yield(CMD, t_skip, opt.skip or 1, nil, interval) + end + end + if type(n) == "number" then + thread.getRunningThread().lastSleep = clock() + return coroutine.yield(CMD, t_sleep, n or 0) + else + return coroutine.yield(CMD, t_hold, n or dFunc) + end +end + +function thread.holdFor(sec,n) + thread._Requests() + return coroutine.yield(CMD, t_holdF, sec, n or dFunc) +end + +function thread.holdWithin(skip,n) + thread._Requests() + return coroutine.yield(CMD, t_holdW, skip or 1, n or dFunc) +end + +function thread.skip(n) + thread._Requests() + return coroutine.yield(CMD, t_skip, n or 1) +end + +function thread.kill() + error("thread killed!") +end + +function thread.yield() + thread._Requests() + return coroutine.yield(CMD, t_yield) +end + +function thread.isThread() + if _VERSION~="Lua 5.1" then + local a,b = coroutine.running() + return not(b) + else + return coroutine.running()~=nil + end +end + +function thread.getCores() + return thread.__CORES +end + +function thread.set(name,val) + multi.GlobalVariables[name]=val + return true +end + +function thread.get(name) + return multi.GlobalVariables[name] +end + +function thread.waitFor(name) + thread.hold(function() return thread.get(name)~=nil end) + return thread.get(name) +end + local function cleanReturns(...) local returns = {...} local rets = {} @@ -1250,7 +1221,7 @@ function multi:attachScheduler() self.threadsRef = threads function self:newThread(name,func,...) - self.OnLoad:Fire() + self.OnLoad:Fire() -- This was made incase a threaded function was called before mainloop/uManager was called local func = func or name if type(name) == "function" then name = "Thread#"..threadCount @@ -1301,48 +1272,33 @@ function multi:attachScheduler() thread.request(self,"kill") return self end + + function c:Sleep(n) + thread.request(self,"exec",function() + thread.sleep(n) + resumed = false + end) + return self + end + + function c:Hold(n,opt) + thread.request(self,"exec",function() + thread.hold(n,opt) + resumed = false + end) + return self + end + c.Destroy = c.Kill - c.kill = c.Kill - - function c.ref:send(name,val) - ret=coroutine.yield({Name=name,Value=val}) - end - - function c.ref:get(name) - return self.Globals[name] - end - - function c.ref:kill() - dRef[1] = "_kill_" - dRef[2] = "I Was killed by You!" - err = coroutine.yield(dRef) - if err then - error("Failed to kill a thread! Exiting...") - end - end - - function c.ref:sleep(n) - if type(n)=="function" then - ret=thread.hold(n) - elseif type(n)=="number" then - ret=thread.sleep(tonumber(n) or 0) - else - error("Invalid Type for sleep!") - end - end - - function c.ref:syncGlobals(v) - self.Globals=v - end table.insert(threads,c) globalThreads[c] = self if initT==false then self.initThreads() end - c.creationTime = os.clock() threadid = threadid + 1 multi:create(c) + c.creationTime = os.clock() return c end @@ -1372,7 +1328,7 @@ function multi:attachScheduler() end self.scheduler.skip=0 local t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 - local r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 + local r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 local ret,_ local function CheckRets(i) if threads[i] and not(threads[i].isError) then @@ -1413,7 +1369,7 @@ function multi:attachScheduler() end end end - local function helper(i) + local function helper(CMD,arg1,arg2,arg3,arg4) if type(ret)=="table" then if ret[1]=="_kill_" then threads[i].OnDeath:Fire(threads[i],"killed",ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) @@ -1465,8 +1421,160 @@ function multi:attachScheduler() end CheckRets(i) end + local task, thd, ref + --[[ + if coroutine.running() ~= threads[i].thread then + _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15) + CheckRets(i) + end + ]] + -- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order + local switch = { + function(th,co,ind,arg1,arg2,arg3,arg4)--hold + if clock() - th.intervalR>=th.interval then + t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func() + if t0 then + if t0==self.NIL then t0 = nil end + th.task = t_none + th.__ready = true + end + th.intervalR = clock() + end + end, + function(th,co,ind,arg1,arg2,arg3,arg4)--sleep + if clock() - th.time>=th.sec then + th.task = t_none + th.__ready = true + end + end, + function(th,co,ind,arg1,arg2,arg3,arg4)--holdf + if clock() - th.intervalR>=th.interval then + t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func() + if t0 then + if t0 then + if t0==self.NIL then t0 = nil end + th.task = t_none + th.__ready = true + end + th.task = t_none + th.__ready = true + elseif clock() - th.time>=th.sec then + th.task = t_none + th.__ready = true + t0 = nil + t1 = multi.TIMEOUT + end + th.intervalR = clock() + end + end, + function(th,co,ind,arg1,arg2,arg3,arg4)--skip + th.pos = th.pos + 1 + if th.count==th.pos then + th.task = t_none + th.__ready = true + end + end, + function(th,co,ind,arg1,arg2,arg3,arg4)--holdw + if clock() - th.intervalR>=th.interval then + th.pos = th.pos + 1 + t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func() + if t0 then + if t0 then + if t0==self.NIL then t0 = nil end + th.task = t_none + th.__ready = true + end + th.task = "" + th.__ready = true + elseif th.count==th.pos then + th.task = "" + th.__ready = true + t0 = nil + t1 = multi.TIMEOUT + end + th.intervalR = clock() + end + end, + function(th,co,ind,arg1,arg2,arg3,arg4)--yield + -- + end, + function() end--none + } + setmetatable(switch,{__index=function() return function() end end}) + local cmds = {-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order + function(th,arg1,arg2,arg3,arg4) + threads[i].OnDeath:Fire(threads[i],"killed",ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) + self.setType(threads[i],self.DestroyedObj) + table.remove(threads,i) + ret = nil + end, + function(th,arg1,arg2,arg3,arg4) + threads[i].sec = ret[2] + threads[i].time = clock() + threads[i].task = "sleep" + threads[i].__ready = false + ret = nil + end, + function(th,arg1,arg2,arg3,arg4) + threads[i].count = ret[2] + threads[i].pos = 0 + threads[i].task = "skip" + threads[i].__ready = false + ret = nil + end, + function(th,arg1,arg2,arg3,arg4) + holdconn(2) + threads[i].func = ret[2] + threads[i].task = "hold" + threads[i].__ready = false + threads[i].interval = ret[4] or 0 + threads[i].intervalR = clock() + ret = nil + end, + function(th,arg1,arg2,arg3,arg4) + holdconn(3) + threads[i].sec = ret[2] + threads[i].func = ret[3] + threads[i].task = "holdF" + threads[i].time = clock() + threads[i].__ready = false + threads[i].interval = ret[4] or 0 + threads[i].intervalR = clock() + ret = nil + end, + function(th,arg1,arg2,arg3,arg4) + holdconn(3) + threads[i].count = ret[2] + threads[i].pos = 0 + threads[i].func = ret[3] + threads[i].task = "holdW" + threads[i].time = clock() + threads[i].__ready = false + threads[i].interval = ret[4] or 0 + threads[i].intervalR = clock() + ret = nil + end, + function() end + } + local status = { + ["suspended"] = function(thd,ref) + if not ref.started then + _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(thd,unpack(ref.startArgs)) + cmds[_](ret,r1,r2,r3) + return + end + end, + ["normal"] = true, + ["running"] = true, + ["dead"] = true + } + self.scheduler:OnLoop(function(self) for i=#threads,1,-1 do + ref = threads[i] + task = ref.task + thd = ref.thread + if threads[i].isError then if coroutine.status(threads[i].thread)=="dead" then threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) @@ -1474,74 +1582,27 @@ function multi:attachScheduler() table.remove(threads,i) end end - if threads[i] and not threads[i].__started then - if coroutine.running() ~= threads[i].thread then - _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,unpack(threads[i].startArgs)) - end - threads[i].__started = true - helper(i) - end + status[coroutine.status(thd)](thd,ref) if threads[i] and not _ then threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) threads[i].isError = true end + switch(task)(ref,thd,i) if threads[i] and coroutine.status(threads[i].thread)=="dead" then local t = threads[i].TempRets or {} threads[i].OnDeath:Fire(threads[i],"ended",t[1],t[2],t[3],t[4],t[5],t[6],t[7],t[8],t[9],t[10],t[11],t[12],t[13],t[14],t[15],t[16]) self.setType(threads[i],self.DestroyedObj) table.remove(threads,i) elseif threads[i] and threads[i].task == "skip" then - threads[i].pos = threads[i].pos + 1 - if threads[i].count==threads[i].pos then - threads[i].task = "" - threads[i].__ready = true - end + elseif threads[i] and threads[i].task == "hold" then - if clock() - threads[i].intervalR>=threads[i].interval then - t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = threads[i].func() - if t0 then - if t0==self.NIL then - t0 = nil - end - threads[i].task = "" - threads[i].__ready = true - end - threads[i].intervalR = clock() - end + elseif threads[i] and threads[i].task == "sleep" then - if clock() - threads[i].time>=threads[i].sec then - threads[i].task = "" - threads[i].__ready = true - end + elseif threads[i] and threads[i].task == "holdF" then - if clock() - threads[i].intervalR>=threads[i].interval then - t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func() - if t0 then - threads[i].task = "" - threads[i].__ready = true - elseif clock() - threads[i].time>=threads[i].sec then - threads[i].task = "" - threads[i].__ready = true - t0 = nil - t1 = multi.TIMEOUT - end - threads[i].intervalR = clock() - end + elseif threads[i] and threads[i].task == "holdW" then - if clock() - threads[i].intervalR>=threads[i].interval then - threads[i].pos = threads[i].pos + 1 - t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func() - if t0 then - threads[i].task = "" - threads[i].__ready = true - elseif threads[i].count==threads[i].pos then - threads[i].task = "" - threads[i].__ready = true - t0 = nil - t1 = multi.TIMEOUT - end - threads[i].intervalR = clock() - end + end if threads[i] and threads[i].__ready then threads[i].__ready = false @@ -1661,7 +1722,7 @@ local function mainloop(self) multi.OnPreLoad:Fire() self.uManager = self.uManagerRef if not isRunning then - isRunning=true + isRunning = true mainloopActive = true local Loop=self.Mainloop local ctask diff --git a/test4.lua b/test4.lua new file mode 100644 index 0000000..8ebd3be --- /dev/null +++ b/test4.lua @@ -0,0 +1,30 @@ +package.path = "./?.lua;?/init.lua;"..package.path +local multi,thread = require("multi"):init() +--[[ Testing... +Before AVG: 522386 +Test 1 AVG: +]] +local sleep_for = 1 + +local function bench(_,steps) + print("Steps/5s: "..steps) + os.exit() +end +multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench) +multi:newThread(function() + while true do + thread.sleep(1) -- We just need to run things + end +end) + +multi:newThread(function() + while true do + thread.sleep(1) -- We just need to run things + end +end) + +-- multi.OnExit(function() +-- print("Total: ".. a) +-- end) + +multi:mainloop() \ No newline at end of file diff --git a/tests/runtests.lua b/tests/runtests.lua index d650347..52f27d6 100644 --- a/tests/runtests.lua +++ b/tests/runtests.lua @@ -15,7 +15,7 @@ package.path="./?.lua;../?.lua;../?/init.lua;../?.lua;../?/?/init.lua;"..package The expected and actual should "match" (Might be impossible when playing with threads) This will be pushed directly to the master as tests start existing. ]] -local multi, thread = require("multi"):init() +local multi, thread = require("multi"):init{priority=true} local good = false runTest = thread:newFunction(function() local objects = multi:newProcessor("Basic Object Tests") @@ -29,5 +29,7 @@ runTest = thread:newFunction(function() print(multi:getTasksDetails()) os.exit() end) -runTest().OnError(print) +runTest().OnError(function(...) + print("Error:",...) +end) multi:mainloop() \ No newline at end of file