From cc20914391bce23d2039456690c51effbbf212fc Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 18 Dec 2021 12:42:14 -0500 Subject: [PATCH] working on 1.16 --- client.lua | 22 +++ multi/init.lua | 90 +++++++++--- multi/integration/lanesManager/init.lua | 161 ++++++++++++++++----- multi/integration/lanesManager/threads.lua | 2 +- multi/integration/loveManager/init.lua | 93 ++++++++++-- multi/integration/loveManager/threads.lua | 2 +- server.lua | 11 ++ 7 files changed, 307 insertions(+), 74 deletions(-) create mode 100644 client.lua create mode 100644 server.lua diff --git a/client.lua b/client.lua new file mode 100644 index 0000000..135e331 --- /dev/null +++ b/client.lua @@ -0,0 +1,22 @@ +package.path = "?/init.lua;?.lua;"..package.path + +local multi, thread = require("multi"):init() +net = require("lnet.tcp") + +client = net.newTCPClient("localhost",12345) +multi:newThread(function() + while true do + thread.sleep(1) + client:send(multi:getTasksDetails()) + end +end) + +multi:newThread(function() + while true do + thread.sleep(.01) + multi:newLoop() + end +end) + + +multi:mainloop() \ No newline at end of file diff --git a/multi/init.lua b/multi/init.lua index 63a5ced..9a3dd80 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -50,6 +50,7 @@ multi.time = os.time multi.LinkedPath = multi multi.lastTime = clock() multi.TIMEOUT = "TIMEOUT" +multi.TID = 0 multi.Priority_Core = 1 multi.Priority_Very_High = 4 @@ -92,8 +93,9 @@ end --Processor local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"} local ProcessName = {[true]="SubProcessor",[false]="MainProcessor"} +local globalThreads = {} function multi:getTasksDetails(t) - if t == "string" or not t then + if not(t) then str = { {"Type ","Uptime","Priority","TID"} } @@ -126,19 +128,37 @@ function multi:getTasksDetails(t) end end local load, steps = self:getLoad() - if thread.__threads then - for i=1,#thread.__threads do - dat = dat .. "\n" + local thread_count = 0 + local process_count = 0 + if globalThreads then + local th_tab = { + {"Thread Name","Uptime","TID","Attached To"} + } + local proc_tab = { + {"Process Name", "Uptime", "PID", "Load", "Cycles per Second per task"} + } + for th,process in pairs(globalThreads) do + if tostring(th.isProcessThread) == "destroyed" then + globalThreads[th] = nil + elseif th.isProcessThread then + local load, steps = process:getLoad() + process_count = process_count + 1 + table.insert(proc_tab,{th.Name,os.clock()-th.creationTime,(th.PID or "-1"),load,steps}) + else + thread_count = thread_count + 1 + table.insert(th_tab,{th.Name,os.clock()-th.creationTime,(th.TID or "-1"),process.Name}) + end end - return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#thread.__threads.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s.."\n\n"..dat..dat2 + dat = multi.AlignTable(proc_tab).. "\n" + dat = dat .. "\n" .. multi.AlignTable(th_tab) + return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: "..thread_count.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat..dat2.."\n\n"..s else - return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2 + return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat2.."\n\n"..s end - elseif t == "t" or t == "table" then + else local load,steps = self:getLoad() str = { ProcessName = (self.Name or "Unnamed"), - ThreadCount = #thread.__threads, MemoryUsage = math.ceil(collectgarbage("count")), PriorityScheme = priorityTable[multi.defaultSettings.priority or 0], SystemLoad = multi.Round(load,2), @@ -148,6 +168,7 @@ function multi:getTasksDetails(t) str.Tasks = {} str.PausedTasks = {} str.Threads = {} + str.Processes = {} str.Systemthreads = {} for i,v in pairs(self.Mainloop) do table.insert(str.Tasks,{Link = v, Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = v.TID}) @@ -155,9 +176,18 @@ function multi:getTasksDetails(t) for v,i in pairs(multi.PausedObjects) do table.insert(str.Tasks,{Link = v, Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = v.TID}) end - for i=1,#thread.__threads do - table.insert(str.Threads,{Uptime = os.clock()-thread.__threads[i].creationTime,Name = thread.__threads[i].Name,Link = thread.__threads[i],TID = thread.__threads[i].TID}) + for th,process in pairs(globalThreads) do + if tostring(th.isProcessThread) == "destroyed" then + globalThreads[th] = nil + elseif th.isProcessThread then + local load, steps = process:getLoad() + table.insert(str.Processes,{Uptime = os.clock()-th.creationTime, Name = th.Name, Link = th, TID = th.TID,Load = load,Steps = steps}) + else + table.insert(str.Threads,{Uptime = os.clock()-th.creationTime,Name = th.Name,Link = th,TID = th.TID,Attached_to = process}) + end end + str.ThreadCount = #str.Threads + str.ProcessCount = #str.Processes if multi.SystemThreads then for i=1,#multi.SystemThreads do table.insert(str.Systemthreads,{Uptime = os.clock()-multi.SystemThreads[i].creationTime,Name = multi.SystemThreads[i].Name,Link = multi.SystemThreads[i],TID = multi.SystemThreads[i].count}) @@ -293,6 +323,17 @@ function multi:Destroy() self.OnObjectDestroyed:Fire(c[i]) c[i]:Destroy() end + local new = {} + for th,proc in pairs(globalThreads) do + if proc == self then + th:Destroy() + table.remove(globalThreads,th) + else + new[th]=proc + end + end + globalThreads = new + multi.setType(self,multi.DestroyedObj) else for i=1,#self.Parent.Mainloop do if self.Parent.Mainloop[i]==self then @@ -806,11 +847,12 @@ function multi:newTLoop(func,set) end function c:Act() if self.timer:Get()>=self.set then + print("Acting...") self.life=self.life+1 + self.timer:Reset() for i=1,#self.func do self.func[i](self,self.life) end - self.timer:Reset() end end function c:Resume() @@ -939,11 +981,10 @@ function multi.getCurrentProcess() return __CurrentProcess end -local globalThreads = {} - -local sandcount = 0 +local sandcount = 1 function multi:newProcessor(name) local c = {} + print("Proc Created:",sandcount) setmetatable(c,{__index = self}) local multi,thread = require("multi"):init() -- We need to capture the t in thread local name = name or "Processor_"..sandcount @@ -951,7 +992,7 @@ function multi:newProcessor(name) c.Mainloop = {} c.Type = "process" c.Active = false - c.Name = "multi.process<".. (name or "") .. ">" + c.Name = name or "" c.process = self:newThread(c.Name,function() while true do thread.hold(function() @@ -962,6 +1003,8 @@ function multi:newProcessor(name) __CurrentProcess = self end end) + c.process.isProcessThread = true + c.process.PID = sandcount c.OnError = c.process.OnError function c.Start() c.Active = true @@ -971,6 +1014,9 @@ function multi:newProcessor(name) c.Active = false return self end + function c:Destroy() + self.OnObjectDestroyed:Fire(c) + end c:attachScheduler() c.initThreads() return c @@ -995,7 +1041,7 @@ function thread.getRunningThread() local t = coroutine.running() if t then for i,v in pairs(threads) do - if t==v.thread then + if t==i.thread then return v end end @@ -1180,7 +1226,7 @@ function thread:newFunction(func,holdme) return nil, "Function is paused" end local rets, err - local function wait(no) + local function wait(no) if thread.isThread() and not (no) then return multi.hold(function() if err then @@ -1300,6 +1346,7 @@ function multi:attachScheduler() return self end c.Destroy = c.Kill + c.kill = c.Kill function c.ref:send(name,val) ret=coroutine.yield({Name=name,Value=val}) end @@ -1327,7 +1374,7 @@ function multi:attachScheduler() self.Globals=v end table.insert(threads,c) - table.insert(globalThreads,c) + globalThreads[c] = self if initT==false then self.initThreads() end @@ -1519,7 +1566,6 @@ function multi:attachScheduler() elseif threads[i] and threads[i].task == "holdW" then if clock() - threads[i].intervalR>=threads[i].interval then threads[i].pos = threads[i].pos + 1 - print(threads[i].pos,threads[i].count) t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func() if t0 then threads[i].task = "" @@ -2155,11 +2201,12 @@ end local busy = false local lastVal = 0 +local last_step = 0 local bb = 0 function multi:getLoad() if not multi.maxSpd then self:enableLoadDetection() end - if busy then return lastVal end + if busy then return lastVal,last_step end local val = nil if thread.isThread() then local bench @@ -2189,7 +2236,8 @@ function multi:getLoad() if val<0 then val = 0 end if val > 100 then val = 100 end lastVal = val - return val,bb*100 + last_step = bb*100 + return val,last_step end function multi:setPriority(s) diff --git a/multi/integration/lanesManager/init.lua b/multi/integration/lanesManager/init.lua index db771d4..16bdcf4 100644 --- a/multi/integration/lanesManager/init.lua +++ b/multi/integration/lanesManager/init.lua @@ -55,19 +55,84 @@ local threads = {} local count = 1 local started = false local livingThreads = {} -function THREAD:newFunction(func,holup) - return function(...) - local t = multi:newSystemThread("SystemThreadedFunction",function(...) - return func(...) - end,...) - return thread:newFunction(function() - thread.hold(function() return t.thread end) - return thread.hold(function() - return t.thread:join(.001) - end) - end,holup)() + +function THREAD:newFunction(func,holdme) + local tfunc = {} + tfunc.Active = true + function tfunc:Pause() + self.Active = false end + function tfunc:Resume() + self.Active = true + end + function tfunc:holdMe(b) + holdme = b + end + local function noWait() + return nil, "Function is paused" + end + local rets, err + local function wait(no) + if thread.isThread() and not (no) then + return multi.hold(function() + if err then + return nil, err + elseif rets then + return unpack(rets) + end + end) + else + while not rets and not err do + multi.scheduler:Act() + end + if err then + return nil,err + end + return unpack(rets) + end + end + tfunc.__call = function(t,...) + if not t.Active then + if holdme then + return nil, "Function is paused" + end + return { + isTFunc = true, + wait = noWait, + connect = function(f) + f(nil,"Function is paused") + end + } + end + local t = multi:newSystemThread("SystemThreadedFunction",func,...) + t.OnDeath(function(self,...) rets = {...} end) + t.OnError(function(self,e) err = e end) + if holdme then + return wait() + end + local temp = { + OnStatus = multi:newConnection(), + OnError = multi:newConnection(), + OnReturn = multi:newConnection(), + isTFunc = true, + wait = wait, + connect = function(f) + local tempConn = multi:newConnection() + t.OnDeath(function(self,...) if f then f(...) else tempConn:Fire(...) end end) + t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end) + return tempConn + end + } + t.OnDeath(function(self,...) temp.OnReturn:Fire(...) end) + t.OnError(function(self,err) temp.OnError:Fire(err) end) + t.linkedFunction = temp + t.statusconnector = temp.OnStatus + return temp + end + setmetatable(tfunc,tfunc) + return tfunc end + function multi:newSystemThread(name, func, ...) multi.InitSystemThreadErrorHandler() rand = math.random(1, 10000000) @@ -104,48 +169,66 @@ function multi:newSystemThread(name, func, ...) self.alive = false end table.insert(multi.SystemThreads, c) + c.OnDeath = multi:newConnection() c.OnError = multi:newConnection() GLOBAL["__THREADS__"] = livingThreads return c end -multi.OnSystemThreadDied = multi:newConnection() + +local function detectLuaError(str) + return type(str)=="string" and str:match("%.lua:%d*:") +end + +local function tableLen(tab) + local len = 0 + for i,v in pairs(tab) do + len = len + 1 + end + return len +end + function multi.InitSystemThreadErrorHandler() if started == true then return end started = true - multi:newThread( -"ThreadErrorHandler", - function() - local threads = multi.SystemThreads - while true do - thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. - for i = #threads, 1, -1 do - local v, err, t = threads[i].thread:join(.001) - if err then - if err:find("Thread was killed!") then - print(err) - livingThreads[threads[i].Id] = {false, threads[i].Name} - threads[i].alive = false - multi.OnSystemThreadDied:Fire(threads[i].Id) - GLOBAL["__THREADS__"] = livingThreads - table.remove(threads, i) - elseif err:find("stack traceback") then - print(err) - threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">") - threads[i].alive = false - livingThreads[threads[i].Id] = {false, threads[i].Name} - multi.OnSystemThreadDied:Fire(threads[i].Id) - GLOBAL["__THREADS__"] = livingThreads - table.remove(threads, i) - end + multi:newThread("ThreadErrorHandler",function() + local threads = multi.SystemThreads + while true do + thread.sleep(.1) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. + for i = #threads, 1, -1 do + local _,data = pcall(function() + return {threads[i].thread:join(1)} + end) + local v, err, t = data[1],data[2],data[3] + if detectLuaError(err) then + if err:find("Thread was killed!\1") then + livingThreads[threads[i].Id] = {false, threads[i].Name} + threads[i].alive = false + threads[i].OnDeath:Fire(threads[i],nil,"Thread was killed!") + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) + else + threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">") + threads[i].alive = false + livingThreads[threads[i].Id] = {false, threads[i].Name} + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) end + elseif tableLen(data)>0 then + livingThreads[threads[i].Id] = {false, threads[i].Name} + threads[i].alive = false + threads[i].OnDeath:Fire(threads[i],unpack(data)) + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) end end end - ) + end).OnError(function(...) + print("Error!",...) + end) end -print("Integrated Lanes!") +multi.print("Integrated Lanes!") multi.integration = {} -- for module creators multi.integration.GLOBAL = GLOBAL multi.integration.THREAD = THREAD diff --git a/multi/integration/lanesManager/threads.lua b/multi/integration/lanesManager/threads.lua index d038f1e..6d9d13d 100644 --- a/multi/integration/lanesManager/threads.lua +++ b/multi/integration/lanesManager/threads.lua @@ -82,7 +82,7 @@ local function INIT(__GlobalLinda,__SleepingLinda) THREAD.__CORES = tonumber(io.popen("nproc --all"):read("*n")) end function THREAD.kill() -- trigger the lane destruction - error("Thread was killed!") + error("Thread was killed!\1") end function THREAD.getName() return THREAD_NAME diff --git a/multi/integration/loveManager/init.lua b/multi/integration/loveManager/init.lua index b88ffd3..16f9b07 100644 --- a/multi/integration/loveManager/init.lua +++ b/multi/integration/loveManager/init.lua @@ -49,19 +49,68 @@ local THREAD_ID = 1 local OBJECT_ID = 0 local stf = 0 function THREAD:newFunction(func,holup) - stf = stf + 1 - return function(...) - local t = multi:newSystemThread("STF"..stf,func,...) - return thread:newFunction(function() - return thread.hold(function() - if t.stab["returns"] then - local dat = t.stab.returns - t.stab.returns = nil - return unpack(dat) - end - end) - end,holup)() + local tfunc = {} + tfunc.Active = true + function tfunc:Pause() + self.Active = false end + function tfunc:Resume() + self.Active = true + end + function tfunc:holdMe(b) + holdme = b + end + local function noWait() + return nil, "Function is paused" + end + local rets, err + local function wait(no) + if thread.isThread() and not (no) then + -- In a thread + else + -- Not in a thread + end + end + tfunc.__call = function(t,...) + if not t.Active then + if holdme then + return nil, "Function is paused" + end + return { + isTFunc = true, + wait = noWait, + connect = function(f) + f(nil,"Function is paused") + end + } + end + local t = multi:newSystemThread("SystemThreadedFunction",func,...) + t.OnDeath(function(self,...) rets = {...} end) + t.OnError(function(self,e) err = e end) + if holdme then + return wait() + end + local temp = { + OnStatus = multi:newConnection(), + OnError = multi:newConnection(), + OnReturn = multi:newConnection(), + isTFunc = true, + wait = wait, + connect = function(f) + local tempConn = multi:newConnection() + t.OnDeath(function(self,...) if f then f(...) else tempConn:Fire(...) end end) + t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end) + return tempConn + end + } + t.OnDeath(function(self,...) temp.OnReturn:Fire(...) end) + t.OnError(function(self,err) temp.OnError:Fire(err) end) + t.linkedFunction = temp + t.statusconnector = temp.OnStatus + return temp + end + setmetatable(tfunc,tfunc) + return tfunc end function multi:newSystemThread(name,func,...) local c = {} @@ -73,6 +122,26 @@ function multi:newSystemThread(name,func,...) GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} GLOBAL["__THREAD_COUNT"] = THREAD_ID THREAD_ID=THREAD_ID+1 + multi:newThread(function() + while true do + thread.yield() + if c.stab["returns"] then + c.OnDeath:Fire(c,unpack(t.stab.returns)) + t.stab.returns = nil + thread.kill() + end + local error = c.thread:getError() + if error then + if error:find("Thread Killed!\1") then + c.OnDeath:Fire(c,"Thread Killed!") + thread.kill() + else + c.OnError:Fire(c,error) + thread.kill() + end + end + end + end) return c end function love.threaderror(thread, errorstr) diff --git a/multi/integration/loveManager/threads.lua b/multi/integration/loveManager/threads.lua index be04fa2..a4ee947 100644 --- a/multi/integration/loveManager/threads.lua +++ b/multi/integration/loveManager/threads.lua @@ -92,7 +92,7 @@ function threads.getCores() return love.system.getProcessorCount() end function threads.kill() - error("Thread Killed!") + error("Thread Killed!\1") end function threads.getThreads() local t = {} diff --git a/server.lua b/server.lua new file mode 100644 index 0000000..d18f851 --- /dev/null +++ b/server.lua @@ -0,0 +1,11 @@ +package.path = "?/init.lua;?.lua;"..package.path + +local multi, thread = require("multi"):init() +net = require("lnet.tcp") + +server = net.newTCPServer(12345) +server.OnDataRecieved(function(self,data) + print(data) +end) + +multi:mainloop() \ No newline at end of file