From 49c0bd393052ac2942307473448d54c271329ffb Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Mon, 31 Jan 2022 08:31:38 -0500 Subject: [PATCH] Fixing issues with the new thread scheduler, nested yields need handling --- multi/init.lua | 61 +++++++++++++++++++++++++--------------- test4.lua | 2 +- tests/connectionTest.lua | 21 ++++++++++++-- tests/objectTests.lua | 7 +++-- tests/runtests.lua | 17 +++++++---- 5 files changed, 73 insertions(+), 35 deletions(-) diff --git a/multi/init.lua b/multi/init.lua index 2119993..3a6dc71 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -79,7 +79,7 @@ function multi.Stop() end --Processor -local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"} +local priorityTable = {[false]="Disabled",[true]="Enabled"} local ProcessName = {"SubProcessor","MainProcessor"} local globalThreads = {} @@ -132,7 +132,7 @@ function multi:getTasksDetails(t) if tostring(th.isProcessThread) == "destroyed" then globalThreads[th] = nil elseif th.isProcessThread then - local load, steps = process:getLoad() + load, steps = process:getLoad() process_count = process_count + 1 table.insert(proc_tab,{th.Name,os.clock()-th.creationTime,(th.PID or "-1"),load,steps}) else @@ -142,16 +142,16 @@ function multi:getTasksDetails(t) end dat = multi.AlignTable(proc_tab).. "\n" dat = dat .. "\n" .. multi.AlignTable(th_tab) - return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: "..thread_count.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat..dat2.."\n\n"..s + return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: "..thread_count.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or false].."\n\n"..dat..dat2.."\n\n"..s else - return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat2.."\n\n"..s + return "Load on "..ProcessName[(self.Type=="process" and 1 or 2)].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or false].."\n\n"..dat2.."\n\n"..s end else local load,steps = self:getLoad() str = { ProcessName = (self.Name or "Unnamed"), MemoryUsage = math.ceil(collectgarbage("count")), - PriorityScheme = priorityTable[multi.defaultSettings.priority or 0], + PriorityScheme = priorityTable[multi.defaultSettings.priority or false], SystemLoad = multi.Round(load,2), CyclesPerSecondPerTask = steps, SystemThreadCount = multi.SystemThreads and #multi.SystemThreads or 0 @@ -286,6 +286,9 @@ function multi:newConnection(protect,func,kill) else function c:Fire(...) for i=#call_funcs,1,-1 do + if type(call_funcs[i])=="table" then + print(call_funcs[i].Parent.Type) + end call_funcs[i](...) if kill then table.remove(call_funcs,i) @@ -304,6 +307,9 @@ function multi:newConnection(protect,func,kill) end end function self:connect(func) + if type(func) == "table" then + print(debug.traceback()) + end table.insert(fast,func) end end @@ -382,6 +388,9 @@ function multi:newConnection(protect,func,kill) function c:connect(...)--func,name,num local tab = {...} local funcs={} + if type(tab[1])=="table" then + print(debug.traceback()) + end for i=1,#tab do if type(tab[i])=="function" then funcs[#funcs+1] = tab[i] @@ -520,7 +529,7 @@ function multi:Destroy() globalThreads = new multi.setType(self,multi.DestroyedObj) else - for i=1,#self.Parent.Mainloop do + for i=#self.Parent.Mainloop,1,-1 do if self.Parent.Mainloop[i]==self then self.Parent.OnObjectDestroyed:Fire(self) table.remove(self.Parent.Mainloop,i) @@ -528,7 +537,7 @@ function multi:Destroy() break end end - multi.setType(self,multi.DestroyedObj) + self.Act = function() end end return self end @@ -726,6 +735,7 @@ function multi:newLoop(func) end function multi:newStep(start,reset,count,skip) + print(self.Type) local c=self:newBase() think=1 c.Type='step' @@ -920,11 +930,9 @@ function multi:newProcessor(name,nothread) c.Type = "process" c.Active = false or nothread c.Name = name or "" - c.process = self:newThread(c.Name,function() + c.process = self:newThread(function() while true do - thread.hold(function() - return c.Active and not(nothread) - end) + thread.hold(function() return c.Active end) __CurrentProcess = c c:uManager() __CurrentProcess = self @@ -1044,6 +1052,7 @@ function thread.hold(n,opt) return coroutine.yield(CMD, t_skip, opt.skip or 1, nil, interval) end end + if type(n) == "number" then thread.getRunningThread().lastSleep = clock() return coroutine.yield(CMD, t_sleep, n or 0, nil, interval) @@ -1055,8 +1064,10 @@ function thread.hold(n,opt) return coroutine.yield(CMD, t_hold, function() return ready end) - else + elseif type(n) == "function" then return coroutine.yield(CMD, t_hold, n or dFunc, nil, interval) + else + error("Invalid argument passed to thread.hold(...)!") end end @@ -1126,6 +1137,7 @@ end function thread.pushStatus(...) local t = thread.getRunningThread() + print("Pushing",t) t.statusconnector:Fire(...) end @@ -1223,10 +1235,10 @@ function multi.setEnv(func,env) local chunk = load(f,"env","bt",env) return chunk end -local resume, status, create = coroutine.resume, coroutine.status, coroutine.create + function multi:attachScheduler() + local resume, status, create = coroutine.resume, coroutine.status, coroutine.create local threads = {} - self.threadsRef = threads local startme = {} local startme_len = 0 function self:newThread(name,func,...) @@ -1246,7 +1258,6 @@ function multi:attachScheduler() c.Type="thread" c.TID = threadid c.firstRunDone=false - c.timer=self:newTimer() c._isPaused = false c.returns = {} c.isError = false @@ -1471,9 +1482,11 @@ function multi:attachScheduler() ["running"] = function(thd,ref) end, ["dead"] = function(thd,ref,task,i) if _ then + print("Death") ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) else - ref.OnError:Fire(ref,ret,r1) + print("Error",ref,ret) + ref.OnError:Fire(ref,ret) end if i then table.remove(threads,i) @@ -1486,15 +1499,11 @@ function multi:attachScheduler() end end _=nil r1=nil r2=nil r3=nil r4=nil r5=nil - self.setType(ref,self.DestroyedObj) + --self.setType(ref,self.DestroyedObj) end, } self.scheduler:OnLoop(function(self) for i=#threads,1,-1 do - ref = threads[i] - task = ref.task - thd = ref.thread - ready = ref.__ready -- First time setup for threads for start = startme_len,1,-1 do _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs)) @@ -1502,7 +1511,13 @@ function multi:attachScheduler() startme[startme_len] = nil startme_len = startme_len - 1 end - co_status[status(thd)](thd,ref,task,i) + if threads[i] then + ref = threads[i] + task = ref.task + thd = ref.thread + ready = ref.__ready + co_status[status(thd)](thd,ref,task,i) + end end end) end @@ -1790,7 +1805,7 @@ function multi:getLoad() _,timeout = multi.hold(function() return bench end,{sleep=.012}) - if timeout then + if timeout or not bench then bench = 0 bb = 0 end diff --git a/test4.lua b/test4.lua index 1150f14..2fcffe3 100644 --- a/test4.lua +++ b/test4.lua @@ -19,10 +19,10 @@ multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench) multi:newThread("Thread 1",function() while true do thread.sleep(1) - error("hi") print("Test 1") thread.hold(conn) print("Conn sleep test") + error("hi") end end).OnError(print) diff --git a/tests/connectionTest.lua b/tests/connectionTest.lua index 98221b4..06f3dc8 100644 --- a/tests/connectionTest.lua +++ b/tests/connectionTest.lua @@ -1,6 +1,9 @@ function connectionThreadTests(multi,thread) print("Starting Connection and Thread tests!") + print("Current Thread:",thread.getRunningThread()) func = thread:newFunction(function(count) + print("Starting Status test: ",count) + print("Current Thread:",thread.getRunningThread().thread) local a = 0 while true do a = a + 1 @@ -14,8 +17,12 @@ function connectionThreadTests(multi,thread) local ret2 = func(15) local ret3 = func(20) local s1,s2,s3 = 0,0,0 + ret.OnError(function(...) + print("Error:",...) + end) ret.OnStatus(function(part,whole) s1 = math.ceil((part/whole)*1000)/10 + print(s1) end) ret2.OnStatus(function(part,whole) s2 = math.ceil((part/whole)*1000)/10 @@ -23,14 +30,22 @@ function connectionThreadTests(multi,thread) ret3.OnStatus(function(part,whole) s3 = math.ceil((part/whole)*1000)/10 end) - local err, timeout = thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn,{sleep=3}) + ret.OnReturn(function() + print("Done") + end) + local err, timeout = thread.hold(ret.OnReturn + ret2.OnReturn + ret3.OnReturn,{sleep=3}) + print(err,timeout) + for i,v in pairs(err) do + print(i,v) + end + os.exit() if s1 == 100 and s2 == 100 and s3 == 100 then print("Threads: Ok") else - print("Threads on status error") + print("Threads OnStatus or thread.hold(conn) Error!") end if timeout then - print("Threads or Connection error!") + print("Threads or Connection Error!") else print("Connection Test 1: Ok") end diff --git a/tests/objectTests.lua b/tests/objectTests.lua index c72624d..b94cdb9 100644 --- a/tests/objectTests.lua +++ b/tests/objectTests.lua @@ -7,12 +7,12 @@ function objectTests(multi,thread) end) multi:newTStep(1,10,1,.1):OnStep(function(t) tsteps = tsteps + 1 - end):OnEnd(function(step) + end).OnEnd(function(step) step:Destroy() end) multi:newStep(1,10):OnStep(function(s) steps = steps + 1 - end):OnEnd(function(step) + end).OnEnd(function(step) step:Destroy() end) local loop = multi:newLoop(function(l) @@ -28,6 +28,7 @@ function objectTests(multi,thread) return alarms end) event.OnEvent(function(evnt) + evnt:Destroy() events = true print("Alarms: Ok") print("Events: Ok") @@ -37,6 +38,6 @@ function objectTests(multi,thread) if tloops > 10 then print("TLoops: Ok") else print("TLoops: Bad!") end if updaters > 100 then print("Updaters: Ok") else print("Updaters: Bad!") end end) - thread.hold(event.OnEvent) + thread.hold(event.OnEvent) end return objectTests \ No newline at end of file diff --git a/tests/runtests.lua b/tests/runtests.lua index 52f27d6..d0a6807 100644 --- a/tests/runtests.lua +++ b/tests/runtests.lua @@ -18,15 +18,22 @@ package.path="./?.lua;../?.lua;../?/init.lua;../?.lua;../?/?/init.lua;"..package local multi, thread = require("multi"):init{priority=true} local good = false runTest = thread:newFunction(function() - local objects = multi:newProcessor("Basic Object Tests") - objects.Start() - require("tests/objectTests")(objects,thread) - objects.Stop() + -- thread.sleep(1) + -- local objects = multi:newProcessor("Basic Object Tests") + -- objects.OnError(function(...) + -- print("Error: ",...) + -- end) + -- objects.Start() + -- require("tests/objectTests")(objects,thread) + -- objects.Stop() local conn_thread = multi:newProcessor("Connection/Thread Tests") + conn_thread.OnError(function(...) + print("Error: ",...) + end) conn_thread.Start() require("tests/connectionTest")(conn_thread,thread) conn_thread.Stop() - print(multi:getTasksDetails()) + --print(multi:getTasksDetails()) os.exit() end) runTest().OnError(function(...)