Fixing issues with the new thread scheduler, nested yields need handling

This commit is contained in:
Ryan Ward 2022-01-31 08:31:38 -05:00
parent cdb4bfda11
commit 49c0bd3930
5 changed files with 73 additions and 35 deletions

View File

@ -79,7 +79,7 @@ function multi.Stop()
end end
--Processor --Processor
local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"} local priorityTable = {[false]="Disabled",[true]="Enabled"}
local ProcessName = {"SubProcessor","MainProcessor"} local ProcessName = {"SubProcessor","MainProcessor"}
local globalThreads = {} local globalThreads = {}
@ -132,7 +132,7 @@ function multi:getTasksDetails(t)
if tostring(th.isProcessThread) == "destroyed" then if tostring(th.isProcessThread) == "destroyed" then
globalThreads[th] = nil globalThreads[th] = nil
elseif th.isProcessThread then elseif th.isProcessThread then
local load, steps = process:getLoad() load, steps = process:getLoad()
process_count = process_count + 1 process_count = process_count + 1
table.insert(proc_tab,{th.Name,os.clock()-th.creationTime,(th.PID or "-1"),load,steps}) table.insert(proc_tab,{th.Name,os.clock()-th.creationTime,(th.PID or "-1"),load,steps})
else else
@ -142,16 +142,16 @@ function multi:getTasksDetails(t)
end end
dat = multi.AlignTable(proc_tab).. "\n" dat = multi.AlignTable(proc_tab).. "\n"
dat = dat .. "\n" .. multi.AlignTable(th_tab) dat = dat .. "\n" .. multi.AlignTable(th_tab)
return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: "..thread_count.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat..dat2.."\n\n"..s return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: "..thread_count.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or false].."\n\n"..dat..dat2.."\n\n"..s
else else
return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat2.."\n\n"..s return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or false].."\n\n"..dat2.."\n\n"..s
end end
else else
local load,steps = self:getLoad() local load,steps = self:getLoad()
str = { str = {
ProcessName = (self.Name or "Unnamed"), ProcessName = (self.Name or "Unnamed"),
MemoryUsage = math.ceil(collectgarbage("count")), MemoryUsage = math.ceil(collectgarbage("count")),
PriorityScheme = priorityTable[multi.defaultSettings.priority or 0], PriorityScheme = priorityTable[multi.defaultSettings.priority or false],
SystemLoad = multi.Round(load,2), SystemLoad = multi.Round(load,2),
CyclesPerSecondPerTask = steps, CyclesPerSecondPerTask = steps,
SystemThreadCount = multi.SystemThreads and #multi.SystemThreads or 0 SystemThreadCount = multi.SystemThreads and #multi.SystemThreads or 0
@ -286,6 +286,9 @@ function multi:newConnection(protect,func,kill)
else else
function c:Fire(...) function c:Fire(...)
for i=#call_funcs,1,-1 do for i=#call_funcs,1,-1 do
if type(call_funcs[i])=="table" then
print(call_funcs[i].Parent.Type)
end
call_funcs[i](...) call_funcs[i](...)
if kill then if kill then
table.remove(call_funcs,i) table.remove(call_funcs,i)
@ -304,6 +307,9 @@ function multi:newConnection(protect,func,kill)
end end
end end
function self:connect(func) function self:connect(func)
if type(func) == "table" then
print(debug.traceback())
end
table.insert(fast,func) table.insert(fast,func)
end end
end end
@ -382,6 +388,9 @@ function multi:newConnection(protect,func,kill)
function c:connect(...)--func,name,num function c:connect(...)--func,name,num
local tab = {...} local tab = {...}
local funcs={} local funcs={}
if type(tab[1])=="table" then
print(debug.traceback())
end
for i=1,#tab do for i=1,#tab do
if type(tab[i])=="function" then if type(tab[i])=="function" then
funcs[#funcs+1] = tab[i] funcs[#funcs+1] = tab[i]
@ -520,7 +529,7 @@ function multi:Destroy()
globalThreads = new globalThreads = new
multi.setType(self,multi.DestroyedObj) multi.setType(self,multi.DestroyedObj)
else else
for i=1,#self.Parent.Mainloop do for i=#self.Parent.Mainloop,1,-1 do
if self.Parent.Mainloop[i]==self then if self.Parent.Mainloop[i]==self then
self.Parent.OnObjectDestroyed:Fire(self) self.Parent.OnObjectDestroyed:Fire(self)
table.remove(self.Parent.Mainloop,i) table.remove(self.Parent.Mainloop,i)
@ -528,7 +537,7 @@ function multi:Destroy()
break break
end end
end end
multi.setType(self,multi.DestroyedObj) self.Act = function() end
end end
return self return self
end end
@ -726,6 +735,7 @@ function multi:newLoop(func)
end end
function multi:newStep(start,reset,count,skip) function multi:newStep(start,reset,count,skip)
print(self.Type)
local c=self:newBase() local c=self:newBase()
think=1 think=1
c.Type='step' c.Type='step'
@ -920,11 +930,9 @@ function multi:newProcessor(name,nothread)
c.Type = "process" c.Type = "process"
c.Active = false or nothread c.Active = false or nothread
c.Name = name or "" c.Name = name or ""
c.process = self:newThread(c.Name,function() c.process = self:newThread(function()
while true do while true do
thread.hold(function() thread.hold(function() return c.Active end)
return c.Active and not(nothread)
end)
__CurrentProcess = c __CurrentProcess = c
c:uManager() c:uManager()
__CurrentProcess = self __CurrentProcess = self
@ -1044,6 +1052,7 @@ function thread.hold(n,opt)
return coroutine.yield(CMD, t_skip, opt.skip or 1, nil, interval) return coroutine.yield(CMD, t_skip, opt.skip or 1, nil, interval)
end end
end end
if type(n) == "number" then if type(n) == "number" then
thread.getRunningThread().lastSleep = clock() thread.getRunningThread().lastSleep = clock()
return coroutine.yield(CMD, t_sleep, n or 0, nil, interval) return coroutine.yield(CMD, t_sleep, n or 0, nil, interval)
@ -1055,8 +1064,10 @@ function thread.hold(n,opt)
return coroutine.yield(CMD, t_hold, function() return coroutine.yield(CMD, t_hold, function()
return ready return ready
end) end)
else elseif type(n) == "function" then
return coroutine.yield(CMD, t_hold, n or dFunc, nil, interval) return coroutine.yield(CMD, t_hold, n or dFunc, nil, interval)
else
error("Invalid argument passed to thread.hold(...)!")
end end
end end
@ -1126,6 +1137,7 @@ end
function thread.pushStatus(...) function thread.pushStatus(...)
local t = thread.getRunningThread() local t = thread.getRunningThread()
print("Pushing",t)
t.statusconnector:Fire(...) t.statusconnector:Fire(...)
end end
@ -1223,10 +1235,10 @@ function multi.setEnv(func,env)
local chunk = load(f,"env","bt",env) local chunk = load(f,"env","bt",env)
return chunk return chunk
end end
local resume, status, create = coroutine.resume, coroutine.status, coroutine.create
function multi:attachScheduler() function multi:attachScheduler()
local resume, status, create = coroutine.resume, coroutine.status, coroutine.create
local threads = {} local threads = {}
self.threadsRef = threads
local startme = {} local startme = {}
local startme_len = 0 local startme_len = 0
function self:newThread(name,func,...) function self:newThread(name,func,...)
@ -1246,7 +1258,6 @@ function multi:attachScheduler()
c.Type="thread" c.Type="thread"
c.TID = threadid c.TID = threadid
c.firstRunDone=false c.firstRunDone=false
c.timer=self:newTimer()
c._isPaused = false c._isPaused = false
c.returns = {} c.returns = {}
c.isError = false c.isError = false
@ -1471,9 +1482,11 @@ function multi:attachScheduler()
["running"] = function(thd,ref) end, ["running"] = function(thd,ref) end,
["dead"] = function(thd,ref,task,i) ["dead"] = function(thd,ref,task,i)
if _ then if _ then
print("Death")
ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
else else
ref.OnError:Fire(ref,ret,r1) print("Error",ref,ret)
ref.OnError:Fire(ref,ret)
end end
if i then if i then
table.remove(threads,i) table.remove(threads,i)
@ -1486,15 +1499,11 @@ function multi:attachScheduler()
end end
end end
_=nil r1=nil r2=nil r3=nil r4=nil r5=nil _=nil r1=nil r2=nil r3=nil r4=nil r5=nil
self.setType(ref,self.DestroyedObj) --self.setType(ref,self.DestroyedObj)
end, end,
} }
self.scheduler:OnLoop(function(self) self.scheduler:OnLoop(function(self)
for i=#threads,1,-1 do for i=#threads,1,-1 do
ref = threads[i]
task = ref.task
thd = ref.thread
ready = ref.__ready
-- First time setup for threads -- First time setup for threads
for start = startme_len,1,-1 do for start = startme_len,1,-1 do
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs)) _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs))
@ -1502,7 +1511,13 @@ function multi:attachScheduler()
startme[startme_len] = nil startme[startme_len] = nil
startme_len = startme_len - 1 startme_len = startme_len - 1
end end
co_status[status(thd)](thd,ref,task,i) if threads[i] then
ref = threads[i]
task = ref.task
thd = ref.thread
ready = ref.__ready
co_status[status(thd)](thd,ref,task,i)
end
end end
end) end)
end end
@ -1790,7 +1805,7 @@ function multi:getLoad()
_,timeout = multi.hold(function() _,timeout = multi.hold(function()
return bench return bench
end,{sleep=.012}) end,{sleep=.012})
if timeout then if timeout or not bench then
bench = 0 bench = 0
bb = 0 bb = 0
end end

View File

@ -19,10 +19,10 @@ multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench)
multi:newThread("Thread 1",function() multi:newThread("Thread 1",function()
while true do while true do
thread.sleep(1) thread.sleep(1)
error("hi")
print("Test 1") print("Test 1")
thread.hold(conn) thread.hold(conn)
print("Conn sleep test") print("Conn sleep test")
error("hi")
end end
end).OnError(print) end).OnError(print)

View File

@ -1,6 +1,9 @@
function connectionThreadTests(multi,thread) function connectionThreadTests(multi,thread)
print("Starting Connection and Thread tests!") print("Starting Connection and Thread tests!")
print("Current Thread:",thread.getRunningThread())
func = thread:newFunction(function(count) func = thread:newFunction(function(count)
print("Starting Status test: ",count)
print("Current Thread:",thread.getRunningThread().thread)
local a = 0 local a = 0
while true do while true do
a = a + 1 a = a + 1
@ -14,8 +17,12 @@ function connectionThreadTests(multi,thread)
local ret2 = func(15) local ret2 = func(15)
local ret3 = func(20) local ret3 = func(20)
local s1,s2,s3 = 0,0,0 local s1,s2,s3 = 0,0,0
ret.OnError(function(...)
print("Error:",...)
end)
ret.OnStatus(function(part,whole) ret.OnStatus(function(part,whole)
s1 = math.ceil((part/whole)*1000)/10 s1 = math.ceil((part/whole)*1000)/10
print(s1)
end) end)
ret2.OnStatus(function(part,whole) ret2.OnStatus(function(part,whole)
s2 = math.ceil((part/whole)*1000)/10 s2 = math.ceil((part/whole)*1000)/10
@ -23,14 +30,22 @@ function connectionThreadTests(multi,thread)
ret3.OnStatus(function(part,whole) ret3.OnStatus(function(part,whole)
s3 = math.ceil((part/whole)*1000)/10 s3 = math.ceil((part/whole)*1000)/10
end) end)
local err, timeout = thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn,{sleep=3}) ret.OnReturn(function()
print("Done")
end)
local err, timeout = thread.hold(ret.OnReturn + ret2.OnReturn + ret3.OnReturn,{sleep=3})
print(err,timeout)
for i,v in pairs(err) do
print(i,v)
end
os.exit()
if s1 == 100 and s2 == 100 and s3 == 100 then if s1 == 100 and s2 == 100 and s3 == 100 then
print("Threads: Ok") print("Threads: Ok")
else else
print("Threads on status error") print("Threads OnStatus or thread.hold(conn) Error!")
end end
if timeout then if timeout then
print("Threads or Connection error!") print("Threads or Connection Error!")
else else
print("Connection Test 1: Ok") print("Connection Test 1: Ok")
end end

View File

@ -7,12 +7,12 @@ function objectTests(multi,thread)
end) end)
multi:newTStep(1,10,1,.1):OnStep(function(t) multi:newTStep(1,10,1,.1):OnStep(function(t)
tsteps = tsteps + 1 tsteps = tsteps + 1
end):OnEnd(function(step) end).OnEnd(function(step)
step:Destroy() step:Destroy()
end) end)
multi:newStep(1,10):OnStep(function(s) multi:newStep(1,10):OnStep(function(s)
steps = steps + 1 steps = steps + 1
end):OnEnd(function(step) end).OnEnd(function(step)
step:Destroy() step:Destroy()
end) end)
local loop = multi:newLoop(function(l) local loop = multi:newLoop(function(l)
@ -28,6 +28,7 @@ function objectTests(multi,thread)
return alarms return alarms
end) end)
event.OnEvent(function(evnt) event.OnEvent(function(evnt)
evnt:Destroy()
events = true events = true
print("Alarms: Ok") print("Alarms: Ok")
print("Events: Ok") print("Events: Ok")
@ -37,6 +38,6 @@ function objectTests(multi,thread)
if tloops > 10 then print("TLoops: Ok") else print("TLoops: Bad!") end if tloops > 10 then print("TLoops: Ok") else print("TLoops: Bad!") end
if updaters > 100 then print("Updaters: Ok") else print("Updaters: Bad!") end if updaters > 100 then print("Updaters: Ok") else print("Updaters: Bad!") end
end) end)
thread.hold(event.OnEvent) thread.hold(event.OnEvent)
end end
return objectTests return objectTests

View File

@ -18,15 +18,22 @@ package.path="./?.lua;../?.lua;../?/init.lua;../?.lua;../?/?/init.lua;"..package
local multi, thread = require("multi"):init{priority=true} local multi, thread = require("multi"):init{priority=true}
local good = false local good = false
runTest = thread:newFunction(function() runTest = thread:newFunction(function()
local objects = multi:newProcessor("Basic Object Tests") -- thread.sleep(1)
objects.Start() -- local objects = multi:newProcessor("Basic Object Tests")
require("tests/objectTests")(objects,thread) -- objects.OnError(function(...)
objects.Stop() -- print("Error: ",...)
-- end)
-- objects.Start()
-- require("tests/objectTests")(objects,thread)
-- objects.Stop()
local conn_thread = multi:newProcessor("Connection/Thread Tests") local conn_thread = multi:newProcessor("Connection/Thread Tests")
conn_thread.OnError(function(...)
print("Error: ",...)
end)
conn_thread.Start() conn_thread.Start()
require("tests/connectionTest")(conn_thread,thread) require("tests/connectionTest")(conn_thread,thread)
conn_thread.Stop() conn_thread.Stop()
print(multi:getTasksDetails()) --print(multi:getTasksDetails())
os.exit() os.exit()
end) end)
runTest().OnError(function(...) runTest().OnError(function(...)