From b2569118a2df18a2c2d97f2f564efea496b40d32 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Fri, 30 Jun 2023 16:14:21 -0400 Subject: [PATCH] Started work on the debugManager --- init.lua | 98 +++++++++++++++++++------ integration/debugManager/init.lua | 37 ++++++++++ integration/lanesManager/extensions.lua | 1 + integration/lanesManager/init.lua | 4 +- integration/sharedExtensions/init.lua | 28 ------- 5 files changed, 115 insertions(+), 53 deletions(-) create mode 100644 integration/debugManager/init.lua diff --git a/init.lua b/init.lua index 7f3d0ec..e26d4f7 100644 --- a/init.lua +++ b/init.lua @@ -217,7 +217,7 @@ function multi:newConnection(protect,func,kill) end end, __mod = function(obj1, obj2) -- % - local cn = multi:newConnection() + local cn = self:newConnection() if type(obj1) == "function" and type(obj2) == "table" then obj2(function(...) cn:Fire(obj1(...)) @@ -228,7 +228,7 @@ function multi:newConnection(protect,func,kill) return cn end, __concat = function(obj1, obj2) -- .. - local cn = multi:newConnection() + local cn = self:newConnection() local ref if type(obj1) == "function" and type(obj2) == "table" then cn(function(...) @@ -260,7 +260,7 @@ function multi:newConnection(protect,func,kill) return cn end, __add = function(c1,c2) -- Or - local cn = multi:newConnection() + local cn = self:newConnection() c1(function(...) cn:Fire(...) end) @@ -270,7 +270,7 @@ function multi:newConnection(protect,func,kill) return cn end, __mul = function(c1,c2) -- And - local cn = multi:newConnection() + local cn = self:newConnection() local ref1, ref2 if c1.__hasInstances == nil then cn.__hasInstances = {2} @@ -531,8 +531,8 @@ function multi:SetTime(n) c.timer:Start() c.set=n c.link=self - c.OnTimedOut = multi:newConnection() - c.OnTimerResolved = multi:newConnection() + c.OnTimedOut = self:newConnection() + c.OnTimerResolved = self:newConnection() self._timer=c.timer function c:Act() if self.timer:Get()>=self.set then @@ -647,8 +647,8 @@ function multi:newBase(ins) c.func={} c.funcTM={} c.funcTMR={} - c.OnBreak = multi:newConnection() - c.OnPriorityChanged = multi:newConnection() + c.OnBreak = self:newConnection() + c.OnPriorityChanged = self:newConnection() c.TID = _tid c.Act=function() end c.Parent=self @@ -1050,7 +1050,7 @@ end local sandcount = 1 -function multi:newProcessor(name, nothread) +function multi:newProcessor(name, nothread, priority) local c = {} setmetatable(c,{__index = multi}) local name = name or "Processor_" .. sandcount @@ -1062,9 +1062,14 @@ function multi:newProcessor(name, nothread) c.threads = {} c.startme = {} c.parent = self - c.OnObjectCreated = multi:newConnection() + c.OnObjectCreated = self:newConnection() - local handler = c:createHandler(c) + local handler + if priority then + handler = c:createPriorityHandler(c) + else + handler = c:createHandler(c) + end if not nothread then -- Don't create a loop if we are triggering this manually c.process = self:newLoop(function() @@ -1078,7 +1083,7 @@ function multi:newProcessor(name, nothread) c.process.PID = sandcount c.OnError = c.process.OnError else - c.OnError = multi:newConnection() + c.OnError = self:newConnection() end c.OnError(multi.error) @@ -1422,16 +1427,16 @@ function thread:newFunctionBase(generator, holdme) return wait() end local temp = { - OnStatus = multi:newConnection(true), - OnError = multi:newConnection(true), - OnReturn = multi:newConnection(true), + OnStatus = multi:getCurrentProcess():newConnection(true), + OnError = multi:getCurrentProcess():newConnection(true), + OnReturn = multi:getCurrentProcess():newConnection(true), isTFunc = true, wait = wait, getReturns = function() return multi.unpack(rets) end, connect = function(f) - local tempConn = multi:newConnection(true) + local tempConn = multi:getCurrentProcess():newConnection(true) t.OnDeath(function(...) if f then f(...) else tempConn:Fire(...) end end) t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end) return tempConn @@ -1455,13 +1460,18 @@ function thread:newFunction(func, holdme) end, holdme)() end -function thread:newProcessor(name) +function thread:newProcessor(name, nothread, priority) -- Inactive proxy proc local proc = multi:getCurrentProcess():newProcessor(name, true) local thread_proc = multi:getCurrentProcess():newProcessor(name).Start() local Active = true - local handler = thread_proc:getHandler() + local handler + if priority then + handler = thread_proc:createPriorityHandler(c) + else + handler = thread_proc:createHandler(c) + end function proc:getThreads() return thread_proc.threads @@ -1545,9 +1555,16 @@ function thread:newThread(name, func, ...) c.firstRunDone=false c._isPaused = false c.returns = {} - c.isError = false - c.OnError = multi:newConnection(true,nil,true) - c.OnDeath = multi:newConnection(true,nil,true) + c.isError = false + + if self.Type == multi.PROCESS then + c.OnError = self:newConnection(true,nil,true) + c.OnDeath = self:newConnection(true,nil,true) + else + c.OnError = threadManager:newConnection(true,nil,true) + c.OnDeath = threadManager:newConnection(true,nil,true) + end + c.OnError(multi.error) function c:getName() @@ -1619,7 +1636,11 @@ function thread:newThread(name, func, ...) globalThreads[c] = multi threadid = threadid + 1 - multi:getCurrentProcess():create(c) + if self.Type == multi.PROCESS then + self:create(c) + else + threadManager:create(c) + end c.creationTime = clock() return c end @@ -1820,6 +1841,31 @@ function multi:createHandler() end) end +function multi:createPriorityHandler() + local threads, startme = self.threads, self.startme + return coroutine.wrap(function() + local temp_start + while true do + while #startme>0 do + temp_start = table.remove(startme) + _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) + co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) + table.insert(threads, temp_start) + end + for i=#threads,1,-1 do + ref = threads[i] + if ref then + task = ref.task + thd = ref.thread + ready = ref.__ready + co_status[status(thd)](thd, ref, task, i, threads) + end + end + yield() + end + end) +end + function multi:newService(func) -- Priority managed threads local c = {} c.Type = multi.SERVICE @@ -2247,7 +2293,7 @@ function multi:benchMark(sec,p,pt) c=c+1 end end) - temp.OnBench = multi:newConnection() + temp.OnBench = self:newConnection() temp:setPriority(p or 1) return temp end @@ -2412,7 +2458,11 @@ else getmetatable(multi.m.sentinel).__gc = multi.m.onexit end -threadManager = multi:newProcessor("Global_Thread_Manager").Start() +threadManager = multi:newProcessor("Global_Thread_Manager", nil, true).Start() + +function multi:getThreadManagerProcess() + return threadManager +end function multi:getHandler() return threadManager:getHandler() diff --git a/integration/debugManager/init.lua b/integration/debugManager/init.lua new file mode 100644 index 0000000..eae78be --- /dev/null +++ b/integration/debugManager/init.lua @@ -0,0 +1,37 @@ +local multi, thread = require("multi"):init() + +local dbg = {} + +local creation_hook + +creation_hook = function(obj, process) + print("Created: ",obj.Type, "in", process.Type, process:getFullName()) + if obj.Type == multi.PROCESS then + obj.OnObjectCreated(creation_hook) + end +end + +local tmulti = multi:getThreadManagerProcess() +multi.OnObjectCreated(creation_hook) +tmulti.OnObjectCreated(creation_hook) + +--[[ + multi.ROOTPROCESS = "rootprocess" + multi.CONNECTOR = "connector" + multi.TIMEMASTER = "timemaster" + multi.PROCESS = "process" + multi.TIMER = "timer" + multi.EVENT = "event" + multi.UPDATER = "updater" + multi.ALARM = "alarm" + multi.LOOP = "loop" + multi.TLOOP = "tloop" + multi.STEP = "step" + multi.TSTEP = "tstep" + multi.THREAD = "thread" + multi.SERVICE = "service" + multi.PROXY = "proxy" + multi.THREADEDFUNCTION = "threaded_function" +]] + +return dbg \ No newline at end of file diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index f76378e..26cc459 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -133,6 +133,7 @@ function multi:newSystemThreadedJobQueue(n) link = c.OnJobCompleted(function(jid,...) if id==jid then rets = multi.pack(...) + c.OnJobCompleted:Unconnect(link) end end) return thread.hold(function() diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 8a8318c..0892741 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -32,7 +32,9 @@ if multi.integration then -- This allows us to call the lanes manager from suppo } end -- Step 1 get lanes -lanes = require("lanes").configure() +lanes = require("lanes").configure{ + nb_keepers = 4, +} multi.SystemThreads = {} multi.isMainThread = true diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index ba5f00b..806ce69 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -329,33 +329,5 @@ function multi:newSystemThreadedProcessor(cores) return false end - c.getLoad = thread:newFunction(function(self, tp) - local loads = {} - local func - - if tp then - func = "STP_GetThreadCount" - else - func = "STP_GetTaskCount" - end - - for i,v in pairs(self.proc_list) do - local conn - local jid = self:pushJob(v, func) - - conn = self.jobqueue.OnJobCompleted(function(id, data) - if id == jid then - table.insert(loads, {v, data}) - multi:newTask(function() - self.jobqueue.OnJobCompleted:Unconnect(conn) - end) - end - end) - end - - thread.hold(function() return #loads == c.cores end) - return loads - end, true) - return c end