Working on thread scheduler rework

This commit is contained in:
Ryan Ward 2022-01-24 14:25:23 -05:00
parent 47178dd3b3
commit 472d1748ee
2 changed files with 76 additions and 81 deletions

View File

@ -996,6 +996,9 @@ local threads = thread.__threads
multi.GlobalVariables={} multi.GlobalVariables={}
local dFunc = function() return true end local dFunc = function() return true end
thread.requests = {} thread.requests = {}
local CMD = {} -- We will compare this special local
local interval
local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7
function thread.request(t,cmd,...) function thread.request(t,cmd,...)
thread.requests[t.thread] = {cmd,{...}} thread.requests[t.thread] = {cmd,{...}}
@ -1028,10 +1031,6 @@ function thread.sleep(n)
return coroutine.yield(CMD, t_sleep, n or 1) return coroutine.yield(CMD, t_sleep, n or 1)
end end
local CMD = {} -- We will compare this special local
local interval
local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7
function thread.hold(n,opt) function thread.hold(n,opt)
thread._Requests() thread._Requests()
if type(opt)=="table" then if type(opt)=="table" then
@ -1215,13 +1214,14 @@ function multi.setEnv(func,env)
local chunk = load(f,"env","bt",env) local chunk = load(f,"env","bt",env)
return chunk return chunk
end end
local resume, status, create = coroutine.resume, coroutine.status, coroutine.create
function multi:attachScheduler() function multi:attachScheduler()
local threads = {} local threads = {}
self.threadsRef = threads self.threadsRef = threads
local startme = {}
local startme_len = 0
function self:newThread(name,func,...) function self:newThread(name,func,...)
self.OnLoad:Fire() -- This was made incase a threaded function was called before mainloop/uManager was called self.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called
local func = func or name local func = func or name
if type(name) == "function" then if type(name) == "function" then
name = "Thread#"..threadCount name = "Thread#"..threadCount
@ -1232,7 +1232,7 @@ function multi:attachScheduler()
c.startArgs = {...} c.startArgs = {...}
c.ref={} c.ref={}
c.Name=name c.Name=name
c.thread=coroutine.create(func) c.thread=create(func)
c.sleep=1 c.sleep=1
c.Type="thread" c.Type="thread"
c.TID = threadid c.TID = threadid
@ -1292,6 +1292,8 @@ function multi:attachScheduler()
c.Destroy = c.Kill c.Destroy = c.Kill
table.insert(threads,c) table.insert(threads,c)
table.insert(startme,c)
startme_len = #startme
globalThreads[c] = self globalThreads[c] = self
if initT==false then if initT==false then
self.initThreads() self.initThreads()
@ -1421,60 +1423,59 @@ function multi:attachScheduler()
end end
CheckRets(i) CheckRets(i)
end end
local task, thd, ref local task, thd, ref, ready
--[[ --[[
if coroutine.running() ~= threads[i].thread then if coroutine.running() ~= threads[i].thread then
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15) _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(coreads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
CheckRets(i) CheckRets(i)
end end
]] ]]
-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order -- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order
local switch = { local switch = {
function(th,co,ind,arg1,arg2,arg3,arg4)--hold function(th,co,arg1,arg2,arg3,arg4)--hold
if clock() - th.intervalR>=th.interval then if clock() - th.intervalR>=th.interval then
t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func() t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func()
if t0 then if t0 then
if t0==self.NIL then t0 = nil end if t0==self.NIL then t0 = nil end
th.task = t_none th.task = t_none
th.__ready = true _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end end
th.intervalR = clock() th.intervalR = clock()
end end
end, end,
function(th,co,ind,arg1,arg2,arg3,arg4)--sleep function(th,co,arg1,arg2,arg3,arg4)--sleep
if clock() - th.time>=th.sec then if clock() - th.time>=th.sec then
th.task = t_none th.task = t_none
th.__ready = true _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end end
end, end,
function(th,co,ind,arg1,arg2,arg3,arg4)--holdf function(th,co,arg1,arg2,arg3,arg4)--holdf
if clock() - th.intervalR>=th.interval then if clock() - th.intervalR>=th.interval then
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func() t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func()
if t0 then if t0 then
if t0 then if t0 then
if t0==self.NIL then t0 = nil end if t0==self.NIL then t0 = nil end
th.task = t_none th.task = t_none
th.__ready = true
end end
th.task = t_none th.task = t_none
th.__ready = true _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
elseif clock() - th.time>=th.sec then elseif clock() - th.time>=th.sec then
th.task = t_none th.task = t_none
th.__ready = true
t0 = nil t0 = nil
t1 = multi.TIMEOUT t1 = multi.TIMEOUT
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end end
th.intervalR = clock() th.intervalR = clock()
end end
end, end,
function(th,co,ind,arg1,arg2,arg3,arg4)--skip function(th,co,arg1,arg2,arg3,arg4)--skip
th.pos = th.pos + 1 th.pos = th.pos + 1
if th.count==th.pos then if th.count==th.pos then
th.task = t_none th.task = t_none
th.__ready = true _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end end
end, end,
function(th,co,ind,arg1,arg2,arg3,arg4)--holdw function(th,co,arg1,arg2,arg3,arg4)--holdw
if clock() - th.intervalR>=th.interval then if clock() - th.intervalR>=th.interval then
th.pos = th.pos + 1 th.pos = th.pos + 1
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func() t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func()
@ -1482,21 +1483,20 @@ function multi:attachScheduler()
if t0 then if t0 then
if t0==self.NIL then t0 = nil end if t0==self.NIL then t0 = nil end
th.task = t_none th.task = t_none
th.__ready = true
end end
th.task = "" th.task = ""
th.__ready = true _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
elseif th.count==th.pos then elseif th.count==th.pos then
th.task = "" th.task = t_none
th.__ready = true
t0 = nil t0 = nil
t1 = multi.TIMEOUT t1 = multi.TIMEOUT
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end end
th.intervalR = clock() th.intervalR = clock()
end end
end, end,
function(th,co,ind,arg1,arg2,arg3,arg4)--yield function(th,co,arg1,arg2,arg3,arg4)--yield
-- _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end, end,
function() end--none function() end--none
} }
@ -1509,23 +1509,23 @@ function multi:attachScheduler()
ret = nil ret = nil
end, end,
function(th,arg1,arg2,arg3,arg4) function(th,arg1,arg2,arg3,arg4)
threads[i].sec = ret[2] print("SLEEP:",th,arg1,arg2,arg3,arg4)
threads[i].time = clock() th.sec = arg1
threads[i].task = "sleep" th.time = clock()
threads[i].__ready = false th.task = t_sleep
ret = nil ret = nil
end, end,
function(th,arg1,arg2,arg3,arg4) function(th,arg1,arg2,arg3,arg4)
threads[i].count = ret[2] threads[i].count = ret[2]
threads[i].pos = 0 threads[i].pos = 0
threads[i].task = "skip" threads[i].task = t_skip
threads[i].__ready = false threads[i].__ready = false
ret = nil ret = nil
end, end,
function(th,arg1,arg2,arg3,arg4) function(th,arg1,arg2,arg3,arg4)
holdconn(2) holdconn(2)
threads[i].func = ret[2] threads[i].func = ret[2]
threads[i].task = "hold" threads[i].task = t_hold
threads[i].__ready = false threads[i].__ready = false
threads[i].interval = ret[4] or 0 threads[i].interval = ret[4] or 0
threads[i].intervalR = clock() threads[i].intervalR = clock()
@ -1535,7 +1535,7 @@ function multi:attachScheduler()
holdconn(3) holdconn(3)
threads[i].sec = ret[2] threads[i].sec = ret[2]
threads[i].func = ret[3] threads[i].func = ret[3]
threads[i].task = "holdF" threads[i].task = t_holdF
threads[i].time = clock() threads[i].time = clock()
threads[i].__ready = false threads[i].__ready = false
threads[i].interval = ret[4] or 0 threads[i].interval = ret[4] or 0
@ -1547,7 +1547,7 @@ function multi:attachScheduler()
threads[i].count = ret[2] threads[i].count = ret[2]
threads[i].pos = 0 threads[i].pos = 0
threads[i].func = ret[3] threads[i].func = ret[3]
threads[i].task = "holdW" threads[i].task = t_holdW
threads[i].time = clock() threads[i].time = clock()
threads[i].__ready = false threads[i].__ready = false
threads[i].interval = ret[4] or 0 threads[i].interval = ret[4] or 0
@ -1556,58 +1556,47 @@ function multi:attachScheduler()
end, end,
function() end function() end
} }
local status = { setmetatable(cmds,{__index=function() return function() end end})
["suspended"] = function(thd,ref) local co_status = {
if not ref.started then ["suspended"] = function(thd,ref,task)
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(thd,unpack(ref.startArgs)) print(ref,r2,r3,r4,r5)
cmds[_](ret,r1,r2,r3) switch[task](ref,thd)
return --cmds[r1](ref,r2,r3,r4,r5)
end end,
["normal"] = function(thd,ref) end, -- Not sure if I will handle this
["running"] = function(thd,ref) end,
["dead"] = function(thd,ref)
local t = ref.TempRets or {}
ref.OnDeath:Fire(ref,"ended",t[1],t[2],t[3],t[4],t[5],t[6],t[7],t[8],t[9],t[10],t[11],t[12],t[13],t[14],t[15],t[16])
self.setType(ref,self.DestroyedObj)
table.remove(threads,i)
end, end,
["normal"] = true,
["running"] = true,
["dead"] = true
} }
self.scheduler:OnLoop(function(self) self.scheduler:OnLoop(function(self)
for i=#threads,1,-1 do for i=#threads,1,-1 do
ref = threads[i] ref = threads[i]
task = ref.task task = ref.task
thd = ref.thread thd = ref.thread
ready = ref.__ready
if threads[i].isError then
if coroutine.status(threads[i].thread)=="dead" then for start = 1, startme_len do
threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(startme[start].thread,unpack(startme[start].startArgs))
self.setType(threads[i],self.DestroyedObj) cmds[r1](ref,r2,r3,r4,r5)
table.remove(threads,i) startme_len = startme_len - 1
end
end
status[coroutine.status(thd)](thd,ref)
if threads[i] and not _ then
threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets))
threads[i].isError = true
end
switch(task)(ref,thd,i)
if threads[i] and coroutine.status(threads[i].thread)=="dead" then
local t = threads[i].TempRets or {}
threads[i].OnDeath:Fire(threads[i],"ended",t[1],t[2],t[3],t[4],t[5],t[6],t[7],t[8],t[9],t[10],t[11],t[12],t[13],t[14],t[15],t[16])
self.setType(threads[i],self.DestroyedObj)
table.remove(threads,i)
elseif threads[i] and threads[i].task == "skip" then
elseif threads[i] and threads[i].task == "hold" then
elseif threads[i] and threads[i].task == "sleep" then
elseif threads[i] and threads[i].task == "holdF" then
elseif threads[i] and threads[i].task == "holdW" then
end end
-- if threads[i].isError then
-- if status(threads[i].thread)=="dead" then
-- threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets))
-- self.setType(threads[i],self.DestroyedObj)
-- table.remove(threads,i)
-- end
-- end
co_status[status(thd)](thd,ref,task)
if threads[i] and threads[i].__ready then if threads[i] and threads[i].__ready then
threads[i].__ready = false threads[i].__ready = false
if coroutine.running() ~= threads[i].thread then if coroutine.running() ~= threads[i].thread then
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
CheckRets(i) CheckRets(i)
end end
end end

View File

@ -4,24 +4,30 @@ local multi,thread = require("multi"):init()
Before AVG: 522386 Before AVG: 522386
Test 1 AVG: Test 1 AVG:
]] ]]
local sleep_for = 1 local sleep_for = 5
local function bench(_,steps) local function bench(_,steps)
print("Steps/5s: "..steps) print("Steps/5s: "..steps)
os.exit() os.exit()
end end
multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench) multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench)
multi:newThread(function() multi:newThread("Thread 1",function(a,b,c)
print(a,b,c)
while true do while true do
print(1)
thread.sleep(1) -- We just need to run things thread.sleep(1) -- We just need to run things
print("1 ...")
end end
end) end,1,2,3)
multi:newThread(function() multi:newThread("Thread 2",function(a,b,c)
print(a,b,c)
while true do while true do
print(2)
thread.sleep(1) -- We just need to run things thread.sleep(1) -- We just need to run things
print("2 ...")
end end
end) end,4,5,6)
-- multi.OnExit(function() -- multi.OnExit(function()
-- print("Total: ".. a) -- print("Total: ".. a)