diff --git a/changes.md b/changes.md index 4206721..97cba6e 100644 --- a/changes.md +++ b/changes.md @@ -50,7 +50,7 @@ These didn't have much use in their previous form, but with the addition of hype Fixed: - There were some bugs in the networkmanager.lua file. Desrtoy -> Destroy some misspellings. -- Massive object management bugs which caused performance to drop like a rock. Remember to Destroy objects when no longer using them. I should probably start working on a garbage collector for these objects! +- Massive object management bugs which caused performance to drop like a rock. - Found a bug with processors not having the Destroy() function implemented properly. - Found an issue with the rockspec which is due to the networkManager additon. The net Library and the multi Library are now codependent if using that feature. Going forward you will have to now install the network library separately - Insane proformance bug found in the networkManager file, where each connection to a node created a new thread (VERY BAD) If say you connected to 100s of threads, you would lose a lot of processing power due to a bad implementation of this feature. But it goes futhur than this, the net library also creates a new thread for each connection made, so times that initial 100 by about 3, you end up with a system that quickly eats itself. I have to do tons of rewriting of everything. Yet another setback for the 13.0.0 release @@ -58,16 +58,21 @@ Fixed: - Fixed an issue with processors not properly destroying objects within them and not being destroyable themselves - Fixed a bug where pause and resume would duplicate objects! Not good - Noticed that the switching of lua states, corutine based threading, is slow when done often. Code your threads to have an idler of .5 seconds between sleep states. After doing this to a few built in threads I've seen a nice drop in performance. 68%-100% to 0%-40% when using the jobqueue. If you dont need the hold feature then use a multi object! Sleeping can be done in a multi object using timers and alarms. Though if your aim is speed timers are a bit faster than alarms, if you really want to boost speed then local clock = os.clock and use the clock function to do your timings yourself +- multi:newSystemThreadedConnection(name,protect) -- I did it! It works and I believe all the gotchas are fixed as well. +-- Issue one, if a thread died that was connected to that connection all connections would stop since the queue would get clogged! FIXED +-- There is one thing, the connection does have some handshakes that need to be done before it functions as normal! Added: -- Documentation, the purpose of 13.0.0, orginally going to be 12.2.3, but due to the amount of bugs and features I added couldn't become that. I actually still did my tests in the 12.2.3 branch in github. +- Documentation, the purpose of 13.0.0, orginally going to be 12.2.3, but due to the amount of bugs and features added it couldn't be a simple bug fix update. - multi:newHyperThreadedProcess(STRING name) -- This is a version of the threaded process that gives each object created its own coroutine based thread which means you can use thread.* without affecting other objects created within the hyper threaded processes. Though, creating a self contained single thread is a better idea which when I eventually create the wiki page I'll discuss - multi:newConnector() -- A simple object that allows you to use the new connection Fire syntax without using a multi obj or the standard object format that I follow. - multi:purge() -- Removes all references to objects that are contained withing the processes list of tasks to do. Doing this will stop all objects from functioning. Calling Resume on an object should make it work again. - multi:getTasksDetails(STRING format) -- Simple function, will get massive updates in the future, as of right now It will print out the current processes that are running; listing their type, uptime, and priority. More useful additions will be added in due time. Format can be either a string "s" or "t" see below for the table format - multi:endTask(TID) -- Use multi:getTasksDetails("t") to get the tid of a task - multi:enableLoadDetection() -- Reworked how load detection works. It gives better values now, but it still needs some work before I am happy with it -- THREAD.getID() -- returns a unique ID for the current thread. This varaiable is visible to the main thread as well by accessing it through the returned thread object. OBJ.Id Do not confuse this with thread.* this refers to the system threading interface +- THREAD.getID() -- returns a unique ID for the current thread. This varaiable is visible to the main thread as well by accessing it through the returned thread object. OBJ.Id Do not confuse this with thread.* this refers to the system threading interface. Each thread, including the main thread has a threadID the main thread has an ID of 0! +- multi.print(...) works like normal print, but only prints if the setting print is set to true +- setting: `print` enables multi.print() to work ```lua package.path="?/init.lua;?.lua;"..package.path diff --git a/multi/init.lua b/multi/init.lua index b2dabb3..3167e23 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -110,18 +110,6 @@ function table.merge(t1, t2) end return t1 end -_print=print -function print(...) - if not __SUPPRESSPRINTS then - _print(...) - end -end -_write=io.write -function io.write(...) - if not __SUPPRESSWRITES then - _write(...) - end -end function multi:setThrestimed(n) self.deltaTarget=n or .1 end @@ -251,7 +239,7 @@ function multi.executeFunction(name,...) if type(_G[name])=='function' then _G[name](...) else - print('Error: Not a function') + multi.print('Error: Not a function') end end function multi:getChildren() @@ -283,7 +271,7 @@ function multi:benchMark(sec,p,pt) local temp=self:newLoop(function(self,t) if t>sec then if pt then - print(pt.." "..c.." Steps in "..sec.." second(s)!") + multi.print(pt.." "..c.." Steps in "..sec.." second(s)!") end self.tt(sec,c) self:Destroy() @@ -458,7 +446,7 @@ function multi:connectFinal(func) elseif self.Type=='step' or self.Type=='tstep' then self:OnEnd(func) else - print("Warning!!! "..self.Type.." doesn't contain a Final Connection State! Use "..self.Type..":Break(func) to trigger it's final event!") + multi.print("Warning!!! "..self.Type.." doesn't contain a Final Connection State! Use "..self.Type..":Break(func) to trigger it's final event!") self:OnBreak(func) end end @@ -528,7 +516,7 @@ end -- Timer stuff done function multi:Pause() if self.Type=='mainprocess' then - print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()") + multi.print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()") else self.Active=false local loop = self.Parent.Mainloop @@ -795,7 +783,7 @@ function multi:newConnection(protect,func) table.remove(temp,1) table.insert(ret,temp) else - print(temp[2]) + multi.print(temp[2]) end else table.insert(ret,{self.func[i][1](...)}) @@ -1395,7 +1383,7 @@ end function multi:newWatcher(namespace,name) local function WatcherObj(ns,n) if self.Type=='queue' then - print("Cannot create a watcher on a queue! Creating on 'multi' instead!") + multi.print("Cannot create a watcher on a queue! Creating on 'multi' instead!") self=multi end local c=self:newBase() @@ -1423,7 +1411,7 @@ function multi:newWatcher(namespace,name) elseif type(namespace)=='table' and (type(name)=='string' or 'number') then return WatcherObj(namespace,name) else - print('Warning, invalid arguments! Nothing returned!') + multi.print('Warning, invalid arguments! Nothing returned!') end end -- Threading stuff @@ -1485,6 +1473,11 @@ function thread.testFor(name,_val,sym) end) return thread.get(name) end +function multi.print(...) + if multi.defaultSettings.print then + print(...) + end +end multi:setDomainName("Threads") multi:setDomainName("Globals") local initT = false @@ -1571,7 +1564,7 @@ function multi.initThreads() self.Parent.OnError:Fire(self.Threads[i],"Error in thread: <"..self.Threads[i].Name.."> "..ret) end if ret==true or ret==false then - print("Thread Ended!!!") + multi.print("Thread Ended!!!") ret={} end end @@ -2264,7 +2257,7 @@ function multi:ToString() if self.Ingore then return end local t=self.Type local data; - print(t) + multi.print(t) if t:sub(-6)=="Thread" then data={ Type=t, @@ -2341,7 +2334,7 @@ function multi:ToString() set=self.set, }) elseif t=="watcher" then - print("Currently cannot sterilize a watcher object!") + multi.print("Currently cannot sterilize a watcher object!") -- needs testing -- table.merge(data,{ -- ns=self.ns, diff --git a/multi/integration/lanesManager.lua b/multi/integration/lanesManager.lua index 6395b1e..e96368a 100644 --- a/multi/integration/lanesManager.lua +++ b/multi/integration/lanesManager.lua @@ -41,10 +41,10 @@ end function multi:getPlatform() return "lanes" end --- Step 2 set up the linda objects +-- Step 2 set up the Linda objects local __GlobalLinda = lanes.linda() -- handles global stuff local __SleepingLinda = lanes.linda() -- handles sleeping stuff --- For convience a GLOBAL table will be constructed to handle requests +-- For convenience a GLOBAL table will be constructed to handle requests local GLOBAL={} setmetatable(GLOBAL,{ __index=function(t,k) @@ -54,7 +54,7 @@ setmetatable(GLOBAL,{ __GlobalLinda:set(k,v) end, }) --- Step 3 rewrite the thread methods to use lindas +-- Step 3 rewrite the thread methods to use Lindas local THREAD={} function THREAD.set(name,val) __GlobalLinda:set(name,val) @@ -84,6 +84,9 @@ end function THREAD.getCores() return THREAD.__CORES end +function THREAD.getThreads() + return GLOBAL.__THREADS__ +end if os.getOS()=="windows" then THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS")) else @@ -98,6 +101,7 @@ end function THREAD.getID() return THREAD_ID end +_G.THREAD_ID = 0 --[[ Step 4 We need to get sleeping working to handle timing... We want idle wait, not busy wait Idle wait keeps the CPU running better where busy wait wastes CPU cycles... Lanes does not have a sleep method however, a linda recieve will in fact be a idle wait! So we use that and wrap it in a nice package]] @@ -114,9 +118,10 @@ function THREAD.hold(n) end local rand = math.random(1,10000000) -- Step 5 Basic Threads! --- local threads = {} -local count = 0 +local threads = {} +local count = 1 local started = false +local livingThreads = {} function multi:newSystemThread(name,func,...) multi.InitSystemThreadErrorHandler() rand = math.random(1,10000000) @@ -125,6 +130,7 @@ function multi:newSystemThread(name,func,...) c.name=name c.Name = name c.Id = count + livingThreads[count] = {true,name} local THREAD_ID = count count = count + 1 c.Type="sthread" @@ -143,19 +149,19 @@ function multi:newSystemThread(name,func,...) end c.thread=lanes.gen("*", func2)(...) function c:kill() - --self.status:Destroy() self.thread:cancel() - print("Thread: '"..self.name.."' has been stopped!") + multi.print("Thread: '"..self.name.."' has been stopped!") end table.insert(multi.SystemThreads,c) c.OnError = multi:newConnection() + GLOBAL["__THREADS__"]=livingThreads return c end +multi.OnSystemThreadDied = multi:newConnection() function multi.InitSystemThreadErrorHandler() if started==true then return end started = true multi:newThread("ThreadErrorHandler",function() - local deadThreads = {} local threads = multi.SystemThreads while true do thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. @@ -163,19 +169,23 @@ function multi.InitSystemThreadErrorHandler() local v,err,t=threads[i].thread:join(.001) if err then if err:find("Thread was killed!") then + livingThreads[threads[i].Id] = {false,threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"]=livingThreads table.remove(threads,i) else threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">") + livingThreads[threads[i].Id] = {false,threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"]=livingThreads table.remove(threads,i) - table.insert(deadThreads,threads[i].Id) - GLOBAL["__DEAD_THREADS__"]=deadThreads end end end end end) end -print("Integrated Lanes!") +multi.print("Integrated Lanes!") multi.integration={} -- for module creators multi.integration.GLOBAL=GLOBAL multi.integration.THREAD=THREAD diff --git a/multi/integration/loveManager.lua b/multi/integration/loveManager.lua index 50c4e6c..9b19dc2 100644 --- a/multi/integration/loveManager.lua +++ b/multi/integration/loveManager.lua @@ -316,7 +316,7 @@ function multi:newSystemThread(name,func,...) -- the main method end function love.threaderror( thread, errorstr ) multi.OnError:Fire(thread,errorstr) - print("Error in systemThread: "..tostring(thread)..": "..errorstr) + multi.print("Error in systemThread: "..tostring(thread)..": "..errorstr) end local THREAD={} function THREAD.set(name,val) @@ -374,7 +374,7 @@ updater:OnUpdate(function(self) end end) require("multi.integration.shared") -print("Integrated Love2d!") +multi.print("Integrated Love2d!") return { init=function(t) if t then diff --git a/multi/integration/luvitManager.lua b/multi/integration/luvitManager.lua index 401501c..6e7fba4 100644 --- a/multi/integration/luvitManager.lua +++ b/multi/integration/luvitManager.lua @@ -114,7 +114,7 @@ local function _INIT(luvitThread,timer) luvitThread.start(entry,package.path,name,c.func,...) return c end - print("Integrated Luvit!") + multi.print("Integrated Luvit!") multi.integration={} -- for module creators multi.integration.GLOBAL=GLOBAL multi.integration.THREAD=THREAD diff --git a/multi/integration/networkManager.lua b/multi/integration/networkManager.lua index 10ea9a8..21c847a 100644 --- a/multi/integration/networkManager.lua +++ b/multi/integration/networkManager.lua @@ -194,21 +194,21 @@ function multi:newNode(settings) node.hasFuncs = {} node.OnError = multi:newConnection() node.OnError(function(node,err,master) - print("ERROR",err,node.name) + multi.print("ERROR",err,node.name) local temp = bin.new() temp:addBlock(#node.name,2) temp:addBlock(node.name) temp:addBlock(#err,2) temp:addBlock(err) for i,v in pairs(node.connections) do - print(i) + multi.print(i) v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3]) end end) if settings.managerDetails then local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) if not c then - print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false + multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false else c.OnDataRecieved(function(self,data) if data == "ping" then @@ -220,7 +220,7 @@ function multi:newNode(settings) end if not settings.preload then if node.functions:getSize()~=0 then - print("We have function(s) to preload!") + multi.print("We have function(s) to preload!") local len = node.functions:getBlock("n",1) local name,func while len do @@ -270,14 +270,14 @@ function multi:newNode(settings) node.queue:push(resolveData(dat)) elseif cmd == CMD_REG then if not settings.allowRemoteRegistering then - print(ip..": has attempted to register a function when it is currently not allowed!") + multi.print(ip..": has attempted to register a function when it is currently not allowed!") return end local temp = bin.new(dat) local len = temp:getBlock("n",1) local name = temp:getBlock("s",len) if node.hasFuncs[name] then - print("Function already preloaded onto the node!") + multi.print("Function already preloaded onto the node!") return end len = temp:getBlock("n",2) @@ -304,13 +304,13 @@ function multi:newNode(settings) node.OnError:Fire(node,err,server) end elseif cmd == CMD_INITNODE then - print("Connected with another node!") + multi.print("Connected with another node!") node.connections[dat]={server,ip,port} multi.OnGUpdate(function(k,v) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) end)-- set this up elseif cmd == CMD_INITMASTER then - print("Connected to the master!",dat) + multi.print("Connected to the master!",dat) node.connections[dat]={server,ip,port} multi.OnGUpdate(function(k,v) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) @@ -357,7 +357,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas if settings.managerDetails then local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) if not client then - print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false + multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false else client.OnDataRecieved(function(client,data) local cmd = data:sub(1,1) @@ -550,7 +550,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas return master end -- The init function that gets returned -print("Integrated Network Parallelism") +multi.print("Integrated Network Parallelism") return {init = function() return GLOBAL end} diff --git a/multi/integration/shared.lua b/multi/integration/shared.lua index 2a3cb61..fe6a00b 100644 --- a/multi/integration/shared.lua +++ b/multi/integration/shared.lua @@ -113,88 +113,88 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann return c end -- NEEDS WORK -function multi:newSystemThreadedConnection(name,protect) - local c={} - local sThread=multi.integration.THREAD - local GLOBAL=multi.integration.GLOBAL - c.name = name or error("You must supply a name for this object!") - c.protect = protect or false - c.count = 0 - multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init() - local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init() - local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init() - function c:init() - _G.__Needs_Multi = true - local multi = require("multi") - if multi:getPlatform()=="love2d" then - GLOBAL=_G.GLOBAL - sThread=_G.sThread - end - local conns = 0 - local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init() - local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init() - local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init() - qSM:push("OK") - local conn = {} - conn.obj = multi:newConnection(self.protect) - setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) - function conn:connect(func) - return self.obj(func) - end - function conn:holdUT(n) - self.obj:holdUT(n) - end - function conn:Remove() - self.obj:Remove() - end - function conn:Fire(...) - local args = {multi.randomString(8),...} - for i = 1, conns do - qF:push(args) - end - end - local lastID = "" - local lastCount = 0 - multi:newThread("syncer",function() - while true do - thread.skip(1) - local fire = qF:peek() - local count = qS:peek() - if fire and fire[1]~=lastID then - lastID = fire[1] - qF:pop() - table.remove(fire,1) - conn.obj:Fire(unpack(fire)) - end - if count and count[1]~=lastCount then - conns = count[2] - lastCount = count[1] - qs:pop() - end - end - end) - return conn - end - multi:newThread("connSync",function() - while true do - thread.skip(1) - local syncIN = qsm:pop() - if syncIN then - if syncIN=="OK" then - c.count = c.count + 1 - else - c.count = c.count - 1 - end - local rand = math.random(1,1000000) - for i = 1, c.count do - qs:push({rand,c.count}) - end - end - end - end) - GLOBAL[name]=c - return c -end +-- function multi:newSystemThreadedConnection(name,protect) + -- local c={} + -- local sThread=multi.integration.THREAD + -- local GLOBAL=multi.integration.GLOBAL + -- c.name = name or error("You must supply a name for this object!") + -- c.protect = protect or false + -- c.count = 0 + -- multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init() + -- local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init() + -- local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init() + -- function c:init() + -- _G.__Needs_Multi = true + -- local multi = require("multi") + -- if multi:getPlatform()=="love2d" then + -- GLOBAL=_G.GLOBAL + -- sThread=_G.sThread + -- end + -- local conns = 0 + -- local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init() + -- local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init() + -- local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init() + -- qSM:push("OK") + -- local conn = {} + -- conn.obj = multi:newConnection(self.protect) + -- setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) + -- function conn:connect(func) + -- return self.obj(func) + -- end + -- function conn:holdUT(n) + -- self.obj:holdUT(n) + -- end + -- function conn:Remove() + -- self.obj:Remove() + -- end + -- function conn:Fire(...) + -- local args = {multi.randomString(8),...} + -- for i = 1, conns do + -- qF:push(args) + -- end + -- end + -- local lastID = "" + -- local lastCount = 0 + -- multi:newThread("syncer",function() + -- while true do + -- thread.skip(1) + -- local fire = qF:peek() + -- local count = qS:peek() + -- if fire and fire[1]~=lastID then + -- lastID = fire[1] + -- qF:pop() + -- table.remove(fire,1) + -- conn.obj:Fire(unpack(fire)) + -- end + -- if count and count[1]~=lastCount then + -- conns = count[2] + -- lastCount = count[1] + -- qs:pop() + -- end + -- end + -- end) + -- return conn + -- end + -- multi:newThread("connSync",function() + -- while true do + -- thread.skip(1) + -- local syncIN = qsm:pop() + -- if syncIN then + -- if syncIN=="OK" then + -- c.count = c.count + 1 + -- else + -- c.count = c.count - 1 + -- end + -- local rand = math.random(1,1000000) + -- for i = 1, c.count do + -- qs:push({rand,c.count}) + -- end + -- end + -- end + -- end) + -- GLOBAL[name]=c + -- return c +-- end function multi:SystemThreadedBenchmark(n) n=n or 1 local cores=multi.integration.THREAD.getCores() @@ -265,7 +265,7 @@ function multi:newSystemThreadedConsole(name) print(unpack(data)) end end - end) + end):setName("ST.consoleSyncer") end else cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init() @@ -330,6 +330,7 @@ function multi:newSystemThreadedTable(name) return c end local jobqueuecount = 0 +local jqueues = {} function multi:newSystemThreadedJobQueue(a,b) jobqueuecount=jobqueuecount+1 local GLOBAL=multi.integration.GLOBAL @@ -350,6 +351,10 @@ function multi:newSystemThreadedJobQueue(a,b) c.name = b c.numberofcores = a end + if jqueues[c.name] then + error("A job queue by the name: "..c.name.." already exists!") + end + jqueues[c.name] = true c.isReady = false c.jobnum=1 c.OnJobCompleted = multi:newConnection() @@ -378,8 +383,9 @@ function multi:newSystemThreadedJobQueue(a,b) end end function c:doToAll(func) + local r = multi.randomString(12) for i = 1, self.numberofcores do - queueDA:push{multi.randomString(12),func} + queueDA:push{r,func} end end for i=1,c.numberofcores do @@ -450,7 +456,6 @@ function multi:newSystemThreadedJobQueue(a,b) end local clock = os.clock multi:newThread("JQ-"..c.name.." Manager",function() - print("thread started") local _count = 0 while _count= 15 then c.idle = nil end + thread.skip() end dat = queueJD:pop() if dat then c.idle = clock() c.OnJobCompleted:Fire(unpack(dat)) end - thread.skip() end end) return c diff --git a/test.lua b/test.lua index 2264108..1054c42 100644 --- a/test.lua +++ b/test.lua @@ -31,18 +31,155 @@ end) multi.OnError(function(...) print(...) end) -multi:newSystemThreadedConsole("console"):init() -jQueue = multi:newSystemThreadedJobQueue("MainJobQueue") -jQueue.OnReady:holdUT() -jQueue:doToAll(function() - console = THREAD.waitFor("console"):init() + + +local conncount = 0 +function multi:newSystemThreadedConnection(name,protect) + conncount = conncount + 1 + local c={} + c.name = name or error("You must provide a name for the connection object!") + c.protect = protect or false + c.idle = nil + local sThread=multi.integration.THREAD + local GLOBAL=multi.integration.GLOBAL + local connSync = multi:newSystemThreadedQueue(c.name.."_CONN_SYNC") + local connFire = multi:newSystemThreadedQueue(c.name.."_CONN_FIRE") + function c:init() + local multi = require("multi") + if love then -- lets make sure we don't reference up-values if using love2d + GLOBAL=_G.GLOBAL + sThread=_G.sThread + end + local conn = {} + conn.obj = multi:newConnection() + setmetatable(conn,{ + __call=function(self,...) + return self:connect(...) + end + }) + local ID = sThread.getID() + local sync = sThread.waitFor(self.name.."_CONN_SYNC"):init() + local fire = sThread.waitFor(self.name.."_CONN_FIRE"):init() + local connections = {} + if not multi.isMainThread then + connections = {0} + end + sync:push{"INIT",ID} -- Register this as an active connection! + function conn:connect(func) + return self.obj(func) + end + function conn:holdUT(n) + self.obj:holdUT(n) + end + function conn:Remove() + self.obj:Remove() + end + function conn:Fire(...) + for i = 1,#connections do + fire:push{connections[i],ID,{...}} + end + end + -- FIRE {TO,FROM,{ARGS}} + local data + multi:newLoop(function() + data = fire:peek() + if type(data)=="table" and data[1]==ID then + if data[2]==ID and conn.IgnoreSelf then + fire:pop() + return + end + fire:pop() + conn.obj:Fire(unpack(data[3])) + end + -- We need to hangle syncs here as well + data = sync:peek() + if data~=nil and data[1]=="SYNCA" and data[2]==ID then + sync:pop() + table.insert(connections,data[3]) + end + if type(data)=="table" and data[1]=="SYNCR" and data[2]==ID then + sync:pop() + for i=1,#connections do + if connections[i] == data[3] then + table.remove(connections,i) + end + end + end + end) + return conn + end + local cleanUp = {} + multi.OnSystemThreadDied(function(ThreadID) + for i=1,#syncs do + connSync:push{"SYNCR",syncs[i],ThreadID} + end + cleanUp[ThreadID] = true + end) + multi:newThread(c.name.." Connection Handler",function() + local data + local clock = os.clock + local syncs = {} + while true do + if not c.idle then + thread.sleep(.1) + else + if clock() - c.idle >= 15 then + c.idle = nil + end + thread.skip() + end + data = connSync:peek() + if data~= nil and data[1]=="INIT" then + connSync:pop() + c.idle = clock() + table.insert(syncs,data[2]) + for i=1,#syncs do + connSync:push{"SYNCA",syncs[i],data[2]} + end + end + data = connFire:peek() + if data~=nil and cleanUp[data[1]] then + local meh = data[1] + connFire:pop() -- lets remove dead thread stuff + multi:newAlarm(15):OnRing(function(a) + cleanUp[meh] = nil + end) + end + end + end) + GLOBAL[c.name]=c + return c +end + + +local conn = multi:newSystemThreadedConnection("conn"):init() +conn(function(...) + print("MAIN",...) end) -jQueue:registerJob("TEST",function(a,b,c) - console:print(a,b,c) - return "This is a test" +conn.IgnoreSelf = true +multi:newSystemThread("meh",function() + local multi = require("multi") + local conn = THREAD.waitFor("conn"):init() + conn.IgnoreSelf = true + conn(function(...) + print("THREAD:",...) + end) + multi:newAlarm(1):OnRing(function(a) + conn:Fire("Does this work?") + a:Destroy() + end) + multi.OnError(function(...) + print(...) + end) + multi:mainloop() +end).OnError(function(...) + print(...) +end) +multi:newAlarm(2):OnRing(function(a) + conn:Fire("What about this one?") + a:Destroy() end) -jQueue:pushJob("TEST",123,"Hello",false) ---~ multi:benchMark(1,nil,"Bench:") multi:mainloop{ - protect = false + protect = false, +--~ print = true }