diff --git a/changes.md b/changes.md index 97cba6e..1c103c7 100644 --- a/changes.md +++ b/changes.md @@ -73,6 +73,9 @@ Added: - THREAD.getID() -- returns a unique ID for the current thread. This varaiable is visible to the main thread as well by accessing it through the returned thread object. OBJ.Id Do not confuse this with thread.* this refers to the system threading interface. Each thread, including the main thread has a threadID the main thread has an ID of 0! - multi.print(...) works like normal print, but only prints if the setting print is set to true - setting: `print` enables multi.print() to work +- STC: IgnoreSelf defaults to false, if true a Fire command will not be sent to the self +- STC: OnConnectionAdded(function(connID)) -- Is fired when a connection is added you can use STC:FireTo(id,...) to trigger a specific connection. Works like the named non threaded connections, only the id's are genereated for you. +- STC: FireTo(id,...) -- Described above. ```lua package.path="?/init.lua;?.lua;"..package.path diff --git a/multi/init.lua b/multi/init.lua index 3167e23..b0a7a10 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -28,7 +28,7 @@ multi.Version = "13.0.0" multi._VERSION = "13.0.0" multi.stage = "stable" multi.__index = multi -multi.Name = "multi.Root" +multi.Name = "multi.root" multi.Mainloop = {} multi.Garbage = {} multi.ender = {} @@ -121,7 +121,7 @@ function multi:enableLoadDetection() local t = os.clock() local stop = false temp:benchMark(.01):OnBench(function(time,steps) - stop = steps*1.1 + stop = steps end) while not stop do temp:uManager() @@ -135,6 +135,7 @@ function multi:setLoad(n) end local busy = false local lastVal = 0 +local bb = 0 function multi:getLoad() if not multi.maxSpd then multi:enableLoadDetection() end if busy then return lastVal end @@ -143,6 +144,7 @@ function multi:getLoad() local bench multi:benchMark(.01):OnBench(function(time,steps) bench = steps + bb = steps end) thread.hold(function() return bench @@ -154,6 +156,7 @@ function multi:getLoad() local bench multi:benchMark(.01):OnBench(function(time,steps) bench = steps + bb = steps end) while not bench do multi:uManager() @@ -165,7 +168,7 @@ function multi:getLoad() if val<0 then val = 0 end if val > 100 then val = 100 end lastVal = val - return val + return val,bb*100 end function multi:setDomainName(name) self[name]={} @@ -197,6 +200,13 @@ function multi:setPriority(s) end self.solid = true end + if not self.PrioritySet then + self.defPriority = self.Priority + self.PrioritySet = true + end +end +function multi:ResetPriority() + self.Priority = self.defPriority end -- System function os.getOS() @@ -345,20 +355,24 @@ function multi:getTasksDetails(t) dat2 = dat2.."\n" end end + local load,steps = multi:getLoad() if multi.scheduler then for i=1,#multi.scheduler.Threads do dat = dat .. "\n" end - return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(multi:getLoad(),2).."%\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#multi.scheduler.Threads.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s.."\n\n"..dat..dat2 + return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#multi.scheduler.Threads.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s.."\n\n"..dat..dat2 else - return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(multi:getLoad(),2).."%\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2 + return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2 end elseif t == "t" or t == "table" then + local load,steps = multi:getLoad() str = { + ProcessName = (self.Name or "Unnamed"), ThreadCount = #multi.scheduler.Threads, MemoryUsage = math.ceil(collectgarbage("count")).." KB", PriorityScheme = priorityTable[multi.defaultSettings.priority or 0], - SystemLoad = multi.Round(multi:getLoad(),2) + SystemLoad = multi.Round(load,2), + CyclesPerSecondPerTask = steps, } str.threads = {} str.systemthreads = {} @@ -970,7 +984,7 @@ end function multi:newAlarm(set) local c=self:newBase() c.Type='alarm' - c.Priority=self.Priority_Low + c:setPriority("Low") c.set=set or 0 local count = 0 local t = clock() @@ -1174,7 +1188,7 @@ function multi:newTStep(start,reset,count,set) local c=self:newBase() think=1 c.Type='tstep' - c.Priority=self.Priority_Low + c:setPriority("Low") c.start=start or 1 local reset = reset or math.huge c.endAt=reset @@ -1266,7 +1280,7 @@ function multi:newTimeStamper() ["12"] = 31, } c.Type='timestamper' - c.Priority=self.Priority_Idle + c:setPriority("Idle") c.hour = {} c.minute = {} c.second = {} @@ -1886,8 +1900,8 @@ function multi:mainloop(settings) local PS=self local PStep = 1 local autoP = 0 - local solid - local sRef + local solid,sRef + local cc=0 while mainloopActive do if next then local DD = table.remove(next,1) @@ -1947,8 +1961,12 @@ function multi:mainloop(settings) PStep=0 end elseif priority == 3 then - tt = clock()-t - t = clock() + cc=cc+1 + if cc == 1000 then + tt = clock()-t + t = clock() + cc=0 + end for _D=#Loop,1,-1 do if Loop[_D] then if Loop[_D].Priority == p_c or (Loop[_D].Priority == p_h and tt<.5) or (Loop[_D].Priority == p_an and tt<.125) or (Loop[_D].Priority == p_n and tt<.063) or (Loop[_D].Priority == p_bn and tt<.016) or (Loop[_D].Priority == p_l and tt<.003) or (Loop[_D].Priority == p_i and tt<.001) then diff --git a/multi/integration/loveManager.lua b/multi/integration/loveManager.lua index 9b19dc2..19bc2b4 100644 --- a/multi/integration/loveManager.lua +++ b/multi/integration/loveManager.lua @@ -168,6 +168,9 @@ end function sThread.getName() return __THREADNAME__ end +function sThread.getID() + return THREAD_ID +end function sThread.kill() error("Thread was killed!") end @@ -196,6 +199,7 @@ func=loadDump([=[INSERT_USER_CODE]=])(unpack(tab)) multi:mainloop() ]] GLOBAL={} -- Allow main thread to interact with these objects as well +_G.THREAD_ID = 0 __proxy__={} setmetatable(GLOBAL,{ __index=function(t,k) @@ -215,6 +219,7 @@ setmetatable(GLOBAL,{ THREAD={} -- Allow main thread to interact with these objects as well multi.integration.love2d.mainChannel=love.thread.getChannel("__MainChan__") isMainThread=true +multi.SystemThreads = {} function THREAD.getName() return __THREADNAME__ end @@ -299,16 +304,19 @@ local function randomString(n) end return str end -local count = 0 +local count = 1 +local livingThreads = {} function multi:newSystemThread(name,func,...) -- the main method + multi.InitSystemThreadErrorHandler() local c={} c.name=name c.Name = name c.ID=c.name.."" c.Id=count count = count + 1 + livingThreads[count] = {true,name} c.thread=love.thread.newThread(multi.integration.love2d.ThreadBase:gsub("INSERT_USER_CODE",dump(func))) - c.thread:start(c.ID,c.name,,...) + c.thread:start(c.ID,c.name,THREAD_ID,...) function c:kill() multi.integration.GLOBAL["__DIEPLZ"..self.ID.."__"]="__DIEPLZ"..self.ID.."__" end @@ -341,8 +349,7 @@ end __channels__={} multi.integration.GLOBAL=GLOBAL multi.integration.THREAD=THREAD -updater=multi:newUpdater() -updater:OnUpdate(function(self) +updater=multi:newLoop(function(self) local data=multi.integration.love2d.mainChannel:pop() while data do if type(data)=="string" then @@ -373,6 +380,35 @@ updater:OnUpdate(function(self) data=multi.integration.love2d.mainChannel:pop() end end) +multi.OnSystemThreadDied = multi:newConnection() +local started = false +function multi.InitSystemThreadErrorHandler() + if started==true then return end + started = true + multi:newThread("ThreadErrorHandler",function() + local threads = multi.SystemThreads + while true do + thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. + for i=#threads,1,-1 do + local v,err,t=threads[i].thread:join(.001) + if err then + if err:find("Thread was killed!") then + livingThreads[threads[i].Id] = {false,threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"]=livingThreads + table.remove(threads,i) + else + threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">") + livingThreads[threads[i].Id] = {false,threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"]=livingThreads + table.remove(threads,i) + end + end + end + end + end) +end require("multi.integration.shared") multi.print("Integrated Love2d!") return { diff --git a/multi/integration/shared.lua b/multi/integration/shared.lua index fe6a00b..e1fd6d1 100644 --- a/multi/integration/shared.lua +++ b/multi/integration/shared.lua @@ -112,89 +112,138 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann end return c end --- NEEDS WORK --- function multi:newSystemThreadedConnection(name,protect) - -- local c={} - -- local sThread=multi.integration.THREAD - -- local GLOBAL=multi.integration.GLOBAL - -- c.name = name or error("You must supply a name for this object!") - -- c.protect = protect or false - -- c.count = 0 - -- multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init() - -- local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init() - -- local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init() - -- function c:init() - -- _G.__Needs_Multi = true - -- local multi = require("multi") - -- if multi:getPlatform()=="love2d" then - -- GLOBAL=_G.GLOBAL - -- sThread=_G.sThread - -- end - -- local conns = 0 - -- local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init() - -- local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init() - -- local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init() - -- qSM:push("OK") - -- local conn = {} - -- conn.obj = multi:newConnection(self.protect) - -- setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) - -- function conn:connect(func) - -- return self.obj(func) - -- end - -- function conn:holdUT(n) - -- self.obj:holdUT(n) - -- end - -- function conn:Remove() - -- self.obj:Remove() - -- end - -- function conn:Fire(...) - -- local args = {multi.randomString(8),...} - -- for i = 1, conns do - -- qF:push(args) - -- end - -- end - -- local lastID = "" - -- local lastCount = 0 - -- multi:newThread("syncer",function() - -- while true do - -- thread.skip(1) - -- local fire = qF:peek() - -- local count = qS:peek() - -- if fire and fire[1]~=lastID then - -- lastID = fire[1] - -- qF:pop() - -- table.remove(fire,1) - -- conn.obj:Fire(unpack(fire)) - -- end - -- if count and count[1]~=lastCount then - -- conns = count[2] - -- lastCount = count[1] - -- qs:pop() - -- end - -- end - -- end) - -- return conn - -- end - -- multi:newThread("connSync",function() - -- while true do - -- thread.skip(1) - -- local syncIN = qsm:pop() - -- if syncIN then - -- if syncIN=="OK" then - -- c.count = c.count + 1 - -- else - -- c.count = c.count - 1 - -- end - -- local rand = math.random(1,1000000) - -- for i = 1, c.count do - -- qs:push({rand,c.count}) - -- end - -- end - -- end - -- end) - -- GLOBAL[name]=c - -- return c --- end + +function multi:newSystemThreadedConnection(name,protect) + local c={} + c.name = name or error("You must provide a name for the connection object!") + c.protect = protect or false + c.idle = nil + local sThread=multi.integration.THREAD + local GLOBAL=multi.integration.GLOBAL + local connSync = multi:newSystemThreadedQueue(c.name.."_CONN_SYNC") + local connFire = multi:newSystemThreadedQueue(c.name.."_CONN_FIRE") + function c:init() + local multi = require("multi") + if love then -- lets make sure we don't reference up-values if using love2d + GLOBAL=_G.GLOBAL + sThread=_G.sThread + end + local conn = {} + conn.obj = multi:newConnection() + setmetatable(conn,{ + __call=function(self,...) + return self:connect(...) + end + }) + local ID = sThread.getID() + local sync = sThread.waitFor(self.name.."_CONN_SYNC"):init() + local fire = sThread.waitFor(self.name.."_CONN_FIRE"):init() + local connections = {} + if not multi.isMainThread then + connections = {0} + end + sync:push{"INIT",ID} -- Register this as an active connection! + function conn:connect(func) + return self.obj(func) + end + function conn:holdUT(n) + self.obj:holdUT(n) + end + function conn:Remove() + self.obj:Remove() + end + function conn:Fire(...) + for i = 1,#connections do + fire:push{connections[i],ID,{...}} + end + end + function conn:FireTo(to,...) + local good = false + for i = 1,#connections do + if connections[i]==to then + good = true + break + end + end + if not good then return multi.print("NonExisting Connection!") end + fire:push{to,ID,{...}} + end + -- FIRE {TO,FROM,{ARGS}} + local data + local clock = os.clock + conn.OnConnectionAdded = multi:newConnection() + multi:newLoop(function() + data = fire:peek() + if type(data)=="table" and data[1]==ID then + if data[2]==ID and conn.IgnoreSelf then + fire:pop() + return + end + fire:pop() + conn.obj:Fire(unpack(data[3])) + end + data = sync:peek() + if data~=nil and data[1]=="SYNCA" and data[2]==ID then + sync:pop() + multi.nextStep(function() + conn.OnConnectionAdded:Fire(data[3]) + end) + table.insert(connections,data[3]) + end + if type(data)=="table" and data[1]=="SYNCR" and data[2]==ID then + sync:pop() + for i=1,#connections do + if connections[i] == data[3] then + table.remove(connections,i) + end + end + end + end):setName("STConn.syncer") + return conn + end + local cleanUp = {} + multi.OnSystemThreadDied(function(ThreadID) + for i=1,#syncs do + connSync:push{"SYNCR",syncs[i],ThreadID} + end + cleanUp[ThreadID] = true + end) + multi:newThread(c.name.." Connection-Handler",function() + local data + local clock = os.clock + local syncs = {} + while true do + if not c.idle then + thread.sleep(.5) + else + if clock() - c.idle >= 15 then + c.idle = nil + end + thread.skip() + end + data = connSync:peek() + if data~= nil and data[1]=="INIT" then + connSync:pop() + c.idle = clock() + table.insert(syncs,data[2]) + for i=1,#syncs do + connSync:push{"SYNCA",syncs[i],data[2]} + end + end + data = connFire:peek() + if data~=nil and cleanUp[data[1]] then + local meh = data[1] + connFire:pop() -- lets remove dead thread stuff + multi:newAlarm(15):OnRing(function(a) + cleanUp[meh] = nil + end) + end + end + end) + GLOBAL[c.name]=c + return c +end + function multi:SystemThreadedBenchmark(n) n=n or 1 local cores=multi.integration.THREAD.getCores() diff --git a/sample-nodeManager.lua b/sample-nodeManager.lua new file mode 100644 index 0000000..7d57596 --- /dev/null +++ b/sample-nodeManager.lua @@ -0,0 +1,12 @@ +package.path="?/init.lua;?.lua;"..package.path +multi = require("multi") +local GLOBAL, THREAD = require("multi.integration.lanesManager").init() +nGLOBAL = require("multi.integration.networkManager").init() +multi:nodeManager(12345) -- Host a node manager on port: 12345 +print("Node Manager Running...") +settings = { + priority = 0, -- 1 or 2 + protect = false, +} +multi:mainloop(settings) +-- Thats all you need to run the node manager, everything else is done automatically diff --git a/test.lua b/test.lua index 1054c42..bca0dc5 100644 --- a/test.lua +++ b/test.lua @@ -11,7 +11,7 @@ end master = multi:newMaster{ name = "Main", -- the name of the master noBroadCast = true, -- if using the node manager, set this to true to avoid double connections - managerDetails = {"192.168.1.4",12345}, -- the details to connect to the node manager (ip,port) + managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port) } master.OnError(function(name,err) print(name.." has encountered an error: "..err) @@ -21,165 +21,18 @@ multi:newThread("NodeUpdater",function() while true do thread.sleep(1) for i=1,#connlist do - conn = master:execute("TASK_MAN",connlist[i], multi:getTasksDetails()) + master:execute("TASK_MAN",connlist[i], multi:getTasksDetails()) end end end) master.OnNodeConnected(function(name) + print("Connected to the node") table.insert(connlist,name) end) multi.OnError(function(...) print(...) end) - - -local conncount = 0 -function multi:newSystemThreadedConnection(name,protect) - conncount = conncount + 1 - local c={} - c.name = name or error("You must provide a name for the connection object!") - c.protect = protect or false - c.idle = nil - local sThread=multi.integration.THREAD - local GLOBAL=multi.integration.GLOBAL - local connSync = multi:newSystemThreadedQueue(c.name.."_CONN_SYNC") - local connFire = multi:newSystemThreadedQueue(c.name.."_CONN_FIRE") - function c:init() - local multi = require("multi") - if love then -- lets make sure we don't reference up-values if using love2d - GLOBAL=_G.GLOBAL - sThread=_G.sThread - end - local conn = {} - conn.obj = multi:newConnection() - setmetatable(conn,{ - __call=function(self,...) - return self:connect(...) - end - }) - local ID = sThread.getID() - local sync = sThread.waitFor(self.name.."_CONN_SYNC"):init() - local fire = sThread.waitFor(self.name.."_CONN_FIRE"):init() - local connections = {} - if not multi.isMainThread then - connections = {0} - end - sync:push{"INIT",ID} -- Register this as an active connection! - function conn:connect(func) - return self.obj(func) - end - function conn:holdUT(n) - self.obj:holdUT(n) - end - function conn:Remove() - self.obj:Remove() - end - function conn:Fire(...) - for i = 1,#connections do - fire:push{connections[i],ID,{...}} - end - end - -- FIRE {TO,FROM,{ARGS}} - local data - multi:newLoop(function() - data = fire:peek() - if type(data)=="table" and data[1]==ID then - if data[2]==ID and conn.IgnoreSelf then - fire:pop() - return - end - fire:pop() - conn.obj:Fire(unpack(data[3])) - end - -- We need to hangle syncs here as well - data = sync:peek() - if data~=nil and data[1]=="SYNCA" and data[2]==ID then - sync:pop() - table.insert(connections,data[3]) - end - if type(data)=="table" and data[1]=="SYNCR" and data[2]==ID then - sync:pop() - for i=1,#connections do - if connections[i] == data[3] then - table.remove(connections,i) - end - end - end - end) - return conn - end - local cleanUp = {} - multi.OnSystemThreadDied(function(ThreadID) - for i=1,#syncs do - connSync:push{"SYNCR",syncs[i],ThreadID} - end - cleanUp[ThreadID] = true - end) - multi:newThread(c.name.." Connection Handler",function() - local data - local clock = os.clock - local syncs = {} - while true do - if not c.idle then - thread.sleep(.1) - else - if clock() - c.idle >= 15 then - c.idle = nil - end - thread.skip() - end - data = connSync:peek() - if data~= nil and data[1]=="INIT" then - connSync:pop() - c.idle = clock() - table.insert(syncs,data[2]) - for i=1,#syncs do - connSync:push{"SYNCA",syncs[i],data[2]} - end - end - data = connFire:peek() - if data~=nil and cleanUp[data[1]] then - local meh = data[1] - connFire:pop() -- lets remove dead thread stuff - multi:newAlarm(15):OnRing(function(a) - cleanUp[meh] = nil - end) - end - end - end) - GLOBAL[c.name]=c - return c -end - - -local conn = multi:newSystemThreadedConnection("conn"):init() -conn(function(...) - print("MAIN",...) -end) -conn.IgnoreSelf = true -multi:newSystemThread("meh",function() - local multi = require("multi") - local conn = THREAD.waitFor("conn"):init() - conn.IgnoreSelf = true - conn(function(...) - print("THREAD:",...) - end) - multi:newAlarm(1):OnRing(function(a) - conn:Fire("Does this work?") - a:Destroy() - end) - multi.OnError(function(...) - print(...) - end) - multi:mainloop() -end).OnError(function(...) - print(...) -end) -multi:newAlarm(2):OnRing(function(a) - conn:Fire("What about this one?") - a:Destroy() -end) multi:mainloop{ protect = false, ---~ print = true + print = true }