Started to work on the scheduler rework

This commit is contained in:
Ryan Ward 2022-01-24 09:00:51 -05:00
parent 3fcba8825b
commit 47178dd3b3
4 changed files with 339 additions and 249 deletions

3
.gitignore vendored
View File

@ -3,8 +3,5 @@
*lua5.3
*lua5.4
*luajit
test2.lua
test.lua
test3.lua
*.code-workspace
*.dat

View File

@ -32,23 +32,14 @@ if not _G["$multi"] then
_G["$multi"] = {multi=multi,thread=thread}
end
multi.Version = "15.1.0"
multi.stage = "stable"
multi.Version = "15.2.0"
multi.Name = "multi.root"
multi.NIL = {Type="NIL"}
multi.Mainloop = {}
multi.Garbage = {}
multi.ender = {}
multi.Children = {}
multi.Active = true
multi.Type = "rootprocess"
multi.Rest = 0
multi._type = type
multi.queue = {}
multi.clock = os.clock
multi.time = os.time
multi.LinkedPath = multi
multi.lastTime = clock()
multi.TIMEOUT = "TIMEOUT"
multi.TID = 0
multi.defaultSettings = {}
@ -83,6 +74,7 @@ multi.threstimed=.001
-- System
function multi.Stop()
isRunning = false
mainloopActive = false
end
@ -90,6 +82,7 @@ end
local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"}
local ProcessName = {"SubProcessor","MainProcessor"}
local globalThreads = {}
function multi:getTasksDetails(t)
if not(t) then
str = {
@ -562,9 +555,9 @@ end
--Constructors [CORE]
local _tid = 0
function multi:newBase(ins)
if not(self.Type=='rootprocess' or self.Type=='process' or self.Type=='queue' or self.Type == 'sandbox') then error('Can only create an object on multi or an interface obj') return false end
if not(self.Type=='rootprocess' or self.Type=='process') then error('Can only create an object on multi or an interface obj') return false end
local c = {}
if self.Type=='process' or self.Type=='queue' or self.Type=='sandbox' then
if self.Type=='process' then
setmetatable(c, {__index = multi})
else
setmetatable(c, {__index = multi})
@ -876,6 +869,7 @@ end
local scheduledjobs = {}
local sthread
function multi:scheduleJob(time,func)
if not sthread then
sthread = multi:newThread("JobScheduler",function()
@ -906,6 +900,7 @@ end
local __CurrentProcess = multi
local __CurrentTask
function multi.getCurrentProcess()
return __CurrentProcess
end
@ -959,146 +954,6 @@ function multi:newProcessor(name,nothread)
return c
end
-- Threading stuff
local initT = false
local threadCount = 0
local threadid = 0
thread.__threads = {}
local threads = thread.__threads
local Gref = _G
multi.GlobalVariables={}
local dFunc = function() return true end
local dRef = {nil,nil,nil,nil,nil}
thread.requests = {}
function thread.request(t,cmd,...)
thread.requests[t.thread] = {cmd,{...}}
end
function thread.getRunningThread()
local threads = globalThreads
local t = coroutine.running()
if t then
for th,process in pairs(threads) do
if t==th.thread then
return th
end
end
end
end
function thread._Requests()
local t = thread.requests[coroutine.running()]
if t then
thread.requests[coroutine.running()] = nil
local cmd,args = t[1],t[2]
thread[cmd](unpack(args))
end
end
function thread.sleep(n)
thread._Requests()
thread.getRunningThread().lastSleep = clock()
dRef[1] = "_sleep_"
dRef[2] = n or 0
return coroutine.yield(dRef)
end
function thread.hold(n,opt)
thread._Requests()
if opt and type(opt)=="table" then
if opt.interval then
dRef[4] = opt.interval
end
if opt.cycles then
dRef[1] = "_holdW_"
dRef[2] = opt.cycles or 1
dRef[3] = n or dFunc
return coroutine.yield(dRef)
elseif opt.sleep then
dRef[1] = "_holdF_"
dRef[2] = opt.sleep
dRef[3] = n or dFunc
return coroutine.yield(dRef)
elseif opt.skip then
dRef[1] = "_skip_"
dRef[2] = opt.skip or 1
return coroutine.yield(dRef)
end
end
if type(n) == "number" then
thread.getRunningThread().lastSleep = clock()
dRef[1] = "_sleep_"
dRef[2] = n or 0
return coroutine.yield(dRef)
else
dRef[1] = "_hold_"
dRef[2] = n or dFunc
return coroutine.yield(dRef)
end
end
function thread.holdFor(sec,n)
thread._Requests()
dRef[1] = "_holdF_"
dRef[2] = sec
dRef[3] = n or dFunc
return coroutine.yield(dRef)
end
function thread.holdWithin(skip,n)
thread._Requests()
dRef[1] = "_holdW_"
dRef[2] = skip or 1
dRef[3] = n or dFunc
return coroutine.yield(dRef)
end
function thread.skip(n)
thread._Requests()
dRef[1] = "_skip_"
dRef[2] = n or 1
return coroutine.yield(dRef)
end
function thread.kill()
dRef[1] = "_kill_"
dRef[2] = "T_T"
return coroutine.yield(dRef)
end
function thread.yield()
thread._Requests()
return thread.sleep(0)
end
function thread.isThread()
if _VERSION~="Lua 5.1" then
local a,b = coroutine.running()
return not(b)
else
return coroutine.running()~=nil
end
end
function thread.getCores()
return thread.__CORES
end
function thread.set(name,val)
multi.GlobalVariables[name]=val
return true
end
function thread.get(name)
return multi.GlobalVariables[name]
end
function thread.waitFor(name)
thread.hold(function() return thread.get(name)~=nil end)
return thread.get(name)
end
function multi.hold(func,opt)
if thread.isThread() then
if type(func) == "function" or type(func) == "table" then
@ -1132,6 +987,122 @@ function multi.hold(func,opt)
end
end
-- Threading stuff
local initT = false
local threadCount = 0
local threadid = 0
thread.__threads = {}
local threads = thread.__threads
multi.GlobalVariables={}
local dFunc = function() return true end
thread.requests = {}
function thread.request(t,cmd,...)
thread.requests[t.thread] = {cmd,{...}}
end
function thread.getRunningThread()
local threads = globalThreads
local t = coroutine.running()
if t then
for th,process in pairs(threads) do
if t==th.thread then
return th
end
end
end
end
function thread._Requests()
local t = thread.requests[coroutine.running()]
if t then
thread.requests[coroutine.running()] = nil
local cmd,args = t[1],t[2]
thread[cmd](unpack(args))
end
end
function thread.sleep(n)
thread._Requests()
thread.getRunningThread().lastSleep = clock()
return coroutine.yield(CMD, t_sleep, n or 1)
end
local CMD = {} -- We will compare this special local
local interval
local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7
function thread.hold(n,opt)
thread._Requests()
if type(opt)=="table" then
interval = opt.interval
if opt.cycles then
return coroutine.yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval)
elseif opt.sleep then
return coroutine.yield(CMD, t_holdF, opt.sleep, n or dFunc, interval)
elseif opt.skip then
return coroutine.yield(CMD, t_skip, opt.skip or 1, nil, interval)
end
end
if type(n) == "number" then
thread.getRunningThread().lastSleep = clock()
return coroutine.yield(CMD, t_sleep, n or 0)
else
return coroutine.yield(CMD, t_hold, n or dFunc)
end
end
function thread.holdFor(sec,n)
thread._Requests()
return coroutine.yield(CMD, t_holdF, sec, n or dFunc)
end
function thread.holdWithin(skip,n)
thread._Requests()
return coroutine.yield(CMD, t_holdW, skip or 1, n or dFunc)
end
function thread.skip(n)
thread._Requests()
return coroutine.yield(CMD, t_skip, n or 1)
end
function thread.kill()
error("thread killed!")
end
function thread.yield()
thread._Requests()
return coroutine.yield(CMD, t_yield)
end
function thread.isThread()
if _VERSION~="Lua 5.1" then
local a,b = coroutine.running()
return not(b)
else
return coroutine.running()~=nil
end
end
function thread.getCores()
return thread.__CORES
end
function thread.set(name,val)
multi.GlobalVariables[name]=val
return true
end
function thread.get(name)
return multi.GlobalVariables[name]
end
function thread.waitFor(name)
thread.hold(function() return thread.get(name)~=nil end)
return thread.get(name)
end
local function cleanReturns(...)
local returns = {...}
local rets = {}
@ -1250,7 +1221,7 @@ function multi:attachScheduler()
self.threadsRef = threads
function self:newThread(name,func,...)
self.OnLoad:Fire()
self.OnLoad:Fire() -- This was made incase a threaded function was called before mainloop/uManager was called
local func = func or name
if type(name) == "function" then
name = "Thread#"..threadCount
@ -1301,48 +1272,33 @@ function multi:attachScheduler()
thread.request(self,"kill")
return self
end
function c:Sleep(n)
thread.request(self,"exec",function()
thread.sleep(n)
resumed = false
end)
return self
end
function c:Hold(n,opt)
thread.request(self,"exec",function()
thread.hold(n,opt)
resumed = false
end)
return self
end
c.Destroy = c.Kill
c.kill = c.Kill
function c.ref:send(name,val)
ret=coroutine.yield({Name=name,Value=val})
end
function c.ref:get(name)
return self.Globals[name]
end
function c.ref:kill()
dRef[1] = "_kill_"
dRef[2] = "I Was killed by You!"
err = coroutine.yield(dRef)
if err then
error("Failed to kill a thread! Exiting...")
end
end
function c.ref:sleep(n)
if type(n)=="function" then
ret=thread.hold(n)
elseif type(n)=="number" then
ret=thread.sleep(tonumber(n) or 0)
else
error("Invalid Type for sleep!")
end
end
function c.ref:syncGlobals(v)
self.Globals=v
end
table.insert(threads,c)
globalThreads[c] = self
if initT==false then
self.initThreads()
end
c.creationTime = os.clock()
threadid = threadid + 1
multi:create(c)
c.creationTime = os.clock()
return c
end
@ -1413,7 +1369,7 @@ function multi:attachScheduler()
end
end
end
local function helper(i)
local function helper(CMD,arg1,arg2,arg3,arg4)
if type(ret)=="table" then
if ret[1]=="_kill_" then
threads[i].OnDeath:Fire(threads[i],"killed",ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
@ -1465,8 +1421,160 @@ function multi:attachScheduler()
end
CheckRets(i)
end
local task, thd, ref
--[[
if coroutine.running() ~= threads[i].thread then
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
CheckRets(i)
end
]]
-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order
local switch = {
function(th,co,ind,arg1,arg2,arg3,arg4)--hold
if clock() - th.intervalR>=th.interval then
t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func()
if t0 then
if t0==self.NIL then t0 = nil end
th.task = t_none
th.__ready = true
end
th.intervalR = clock()
end
end,
function(th,co,ind,arg1,arg2,arg3,arg4)--sleep
if clock() - th.time>=th.sec then
th.task = t_none
th.__ready = true
end
end,
function(th,co,ind,arg1,arg2,arg3,arg4)--holdf
if clock() - th.intervalR>=th.interval then
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func()
if t0 then
if t0 then
if t0==self.NIL then t0 = nil end
th.task = t_none
th.__ready = true
end
th.task = t_none
th.__ready = true
elseif clock() - th.time>=th.sec then
th.task = t_none
th.__ready = true
t0 = nil
t1 = multi.TIMEOUT
end
th.intervalR = clock()
end
end,
function(th,co,ind,arg1,arg2,arg3,arg4)--skip
th.pos = th.pos + 1
if th.count==th.pos then
th.task = t_none
th.__ready = true
end
end,
function(th,co,ind,arg1,arg2,arg3,arg4)--holdw
if clock() - th.intervalR>=th.interval then
th.pos = th.pos + 1
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func()
if t0 then
if t0 then
if t0==self.NIL then t0 = nil end
th.task = t_none
th.__ready = true
end
th.task = ""
th.__ready = true
elseif th.count==th.pos then
th.task = ""
th.__ready = true
t0 = nil
t1 = multi.TIMEOUT
end
th.intervalR = clock()
end
end,
function(th,co,ind,arg1,arg2,arg3,arg4)--yield
--
end,
function() end--none
}
setmetatable(switch,{__index=function() return function() end end})
local cmds = {-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order
function(th,arg1,arg2,arg3,arg4)
threads[i].OnDeath:Fire(threads[i],"killed",ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
self.setType(threads[i],self.DestroyedObj)
table.remove(threads,i)
ret = nil
end,
function(th,arg1,arg2,arg3,arg4)
threads[i].sec = ret[2]
threads[i].time = clock()
threads[i].task = "sleep"
threads[i].__ready = false
ret = nil
end,
function(th,arg1,arg2,arg3,arg4)
threads[i].count = ret[2]
threads[i].pos = 0
threads[i].task = "skip"
threads[i].__ready = false
ret = nil
end,
function(th,arg1,arg2,arg3,arg4)
holdconn(2)
threads[i].func = ret[2]
threads[i].task = "hold"
threads[i].__ready = false
threads[i].interval = ret[4] or 0
threads[i].intervalR = clock()
ret = nil
end,
function(th,arg1,arg2,arg3,arg4)
holdconn(3)
threads[i].sec = ret[2]
threads[i].func = ret[3]
threads[i].task = "holdF"
threads[i].time = clock()
threads[i].__ready = false
threads[i].interval = ret[4] or 0
threads[i].intervalR = clock()
ret = nil
end,
function(th,arg1,arg2,arg3,arg4)
holdconn(3)
threads[i].count = ret[2]
threads[i].pos = 0
threads[i].func = ret[3]
threads[i].task = "holdW"
threads[i].time = clock()
threads[i].__ready = false
threads[i].interval = ret[4] or 0
threads[i].intervalR = clock()
ret = nil
end,
function() end
}
local status = {
["suspended"] = function(thd,ref)
if not ref.started then
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(thd,unpack(ref.startArgs))
cmds[_](ret,r1,r2,r3)
return
end
end,
["normal"] = true,
["running"] = true,
["dead"] = true
}
self.scheduler:OnLoop(function(self)
for i=#threads,1,-1 do
ref = threads[i]
task = ref.task
thd = ref.thread
if threads[i].isError then
if coroutine.status(threads[i].thread)=="dead" then
threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets))
@ -1474,74 +1582,27 @@ function multi:attachScheduler()
table.remove(threads,i)
end
end
if threads[i] and not threads[i].__started then
if coroutine.running() ~= threads[i].thread then
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,unpack(threads[i].startArgs))
end
threads[i].__started = true
helper(i)
end
status[coroutine.status(thd)](thd,ref)
if threads[i] and not _ then
threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets))
threads[i].isError = true
end
switch(task)(ref,thd,i)
if threads[i] and coroutine.status(threads[i].thread)=="dead" then
local t = threads[i].TempRets or {}
threads[i].OnDeath:Fire(threads[i],"ended",t[1],t[2],t[3],t[4],t[5],t[6],t[7],t[8],t[9],t[10],t[11],t[12],t[13],t[14],t[15],t[16])
self.setType(threads[i],self.DestroyedObj)
table.remove(threads,i)
elseif threads[i] and threads[i].task == "skip" then
threads[i].pos = threads[i].pos + 1
if threads[i].count==threads[i].pos then
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i] and threads[i].task == "hold" then
if clock() - threads[i].intervalR>=threads[i].interval then
t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = threads[i].func()
if t0 then
if t0==self.NIL then
t0 = nil
end
threads[i].task = ""
threads[i].__ready = true
end
threads[i].intervalR = clock()
end
elseif threads[i] and threads[i].task == "sleep" then
if clock() - threads[i].time>=threads[i].sec then
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i] and threads[i].task == "holdF" then
if clock() - threads[i].intervalR>=threads[i].interval then
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func()
if t0 then
threads[i].task = ""
threads[i].__ready = true
elseif clock() - threads[i].time>=threads[i].sec then
threads[i].task = ""
threads[i].__ready = true
t0 = nil
t1 = multi.TIMEOUT
end
threads[i].intervalR = clock()
end
elseif threads[i] and threads[i].task == "holdW" then
if clock() - threads[i].intervalR>=threads[i].interval then
threads[i].pos = threads[i].pos + 1
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func()
if t0 then
threads[i].task = ""
threads[i].__ready = true
elseif threads[i].count==threads[i].pos then
threads[i].task = ""
threads[i].__ready = true
t0 = nil
t1 = multi.TIMEOUT
end
threads[i].intervalR = clock()
end
end
if threads[i] and threads[i].__ready then
threads[i].__ready = false

30
test4.lua Normal file
View File

@ -0,0 +1,30 @@
package.path = "./?.lua;?/init.lua;"..package.path
local multi,thread = require("multi"):init()
--[[ Testing...
Before AVG: 522386
Test 1 AVG:
]]
local sleep_for = 1
local function bench(_,steps)
print("Steps/5s: "..steps)
os.exit()
end
multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench)
multi:newThread(function()
while true do
thread.sleep(1) -- We just need to run things
end
end)
multi:newThread(function()
while true do
thread.sleep(1) -- We just need to run things
end
end)
-- multi.OnExit(function()
-- print("Total: ".. a)
-- end)
multi:mainloop()

View File

@ -15,7 +15,7 @@ package.path="./?.lua;../?.lua;../?/init.lua;../?.lua;../?/?/init.lua;"..package
The expected and actual should "match" (Might be impossible when playing with threads)
This will be pushed directly to the master as tests start existing.
]]
local multi, thread = require("multi"):init()
local multi, thread = require("multi"):init{priority=true}
local good = false
runTest = thread:newFunction(function()
local objects = multi:newProcessor("Basic Object Tests")
@ -29,5 +29,7 @@ runTest = thread:newFunction(function()
print(multi:getTasksDetails())
os.exit()
end)
runTest().OnError(print)
runTest().OnError(function(...)
print("Error:",...)
end)
multi:mainloop()