From 7eb9cd37e9621933d9c35d877daed27bb9073ca6 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Mon, 17 Sep 2018 11:33:23 -0400 Subject: [PATCH] 12.2.1 is out Fixed some systemthreaded objects. More tests are needed though. About to start making real documentation soon! --- Documentation.md | 4 + changes.md | 36 +++ multi/compat/love2d.lua | 23 ++ multi/init.lua | 45 +--- multi/integration/lanesManager.lua | 13 +- multi/integration/loveManager.lua | 23 ++ multi/integration/luvitManager.lua | 4 +- multi/integration/networkManager.lua | 23 +- multi/integration/shared.lua | 388 +++++++++++++-------------- test.lua | 82 +++--- 10 files changed, 340 insertions(+), 301 deletions(-) create mode 100644 Documentation.md diff --git a/Documentation.md b/Documentation.md new file mode 100644 index 0000000..eec84c2 --- /dev/null +++ b/Documentation.md @@ -0,0 +1,4 @@ +
+ Q1: What is the best Language in the World? + A1: Lua +
\ No newline at end of file diff --git a/changes.md b/changes.md index 13edb12..c687a88 100644 --- a/changes.md +++ b/changes.md @@ -1,5 +1,41 @@ #Changes [TOC] +Update 12.2.1 Time for some bug fixes! +------------- +Fixed: SystemThreadedJobQueues +- You can now make as many job queues as you want! Just a warning when using a large amount of cores for the queue it takes a second or 2 to set up the jobqueues for data transfer. I am unsure if this is a lanes thing or not, but love2d has no such delay when setting up the jobqueue! +- You now connect to the OnReady in the jobqueue object. No more holding everything else as you wait for a job queue to be ready +- Jobqueues:doToAll now passes the queues multi interface as the first and currently only argument +- No longer need to use jobqueue.OnReady() The code is smarter and will send the pushed jobs automatically when the threads are ready + +Fixed: SystemThreadedConnection +- They work the exact same way as before, but actually work as expected now. The issue before was how i implemented it. Now each connection knows the number of instances of that object that ecist. This way I no longer have to do fancy timings that may or may not work. I can send exactly enough info for each connection to consume from the queue. + +Removed: multi:newQueuer +- This feature has no real use after corutine based threads were introduced. You can use those to get the same effect as the queuer and do it better too. + +Going forward: +- Will I ever finish steralization? Who knows, but being able to save state would be nice. The main issue is there is no simple way to save state. While I can provide methods to allow one to turn the objects into strings and back, there is no way for me to make your code work with it in a simple way. For now only the basic functions will be here. +- I need to make better documentation for this library as well. In its current state, all I have are examples and not a list of what is what. + +# Example +```lua +package.path="?/init.lua;?.lua;"..package.path +multi = require("multi") +GLOBAL, THREAD = require("multi.integration.lanesManager").init() +jq = multi:newSystemThreadedJobQueue() +jq:registerJob("test",function(a) + return "Hello",a +end) +jq.OnJobCompleted(function(ID,...) + print(ID,...) +end) +for i=1,16 do + jq:pushJob("test",5) +end +multi:mainloop() +``` + Update 12.2.0 ------------- **Added:** diff --git a/multi/compat/love2d.lua b/multi/compat/love2d.lua index 4ee68d3..fe77db1 100644 --- a/multi/compat/love2d.lua +++ b/multi/compat/love2d.lua @@ -1,3 +1,26 @@ +--[[ +MIT License + +Copyright (c) 2018 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] local multi = require("multi") os.sleep=love.timer.sleep multi.drawF={} diff --git a/multi/init.lua b/multi/init.lua index 9d55029..c35f9d2 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -1,7 +1,7 @@ --[[ MIT License -Copyright (c) 2017 Ryan Ward +Copyright (c) 2018 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -24,8 +24,8 @@ SOFTWARE. local bin = pcall(require,"bin") local multi = {} local clock = os.clock -multi.Version = "12.2.0" -multi._VERSION = "12.2.0" +multi.Version = "12.2.1" +multi._VERSION = "12.2.1" multi.stage = "stable" multi.__index = multi multi.Mainloop = {} @@ -522,40 +522,6 @@ function multi:newProcess(file) --~ c:IngoreObject() return c end -function multi:newQueuer(file) - local c=self:newProcess() - c.Type='queue' - c.last={} - c.funcE={} - function c:OnQueueCompleted(func) - table.insert(self.funcE,func) - return self - end - if file then - self.Cself=c - loadstring('local queue=multi.Cself '..io.open(file,'rb'):read('*all'))() - end - self:create(c) - multi.OnObjectCreated(function(self) - if self.Parent then - if self.Parent.Type=="queue" then - if self:isAnActor() then - if self.Type=="alarm" then - self.Active=false - end - self:Pause() - self:connectFinal(multi.queuefinal) - end - end - end - end) - function c:Start() - self.Mainloop[#self.Mainloop]:Resume() - self.l:Resume() - return self - end - return c -end function multi:newTimer() local c={} c.Type='timer' @@ -1078,7 +1044,10 @@ function multi:uManager(settings) if settings.stopOnError then stopOnError = settings.stopOnError end - multi.defaultSettings.p_i = settings.auto_stretch*self.Priority_Idle or self.Priority_Idle + multi.defaultSettings.p_i = self.Priority_Idle + if settings.auto_stretch then + multi.defaultSettings.p_i = settings.auto_stretch*self.Priority_Idle + end multi.defaultSettings.delay = settings.auto_delay or 3 multi.defaultSettings.auto_lowerbound = settings.auto_lowerbound or self.Priority_Idle protect = settings.protect diff --git a/multi/integration/lanesManager.lua b/multi/integration/lanesManager.lua index 68d1b47..ea201f4 100644 --- a/multi/integration/lanesManager.lua +++ b/multi/integration/lanesManager.lua @@ -1,12 +1,12 @@ --[[ MIT License -Copyright (c) 2017 Ryan Ward +Copyright (c) 2018 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: @@ -31,9 +31,8 @@ function os.getOS() end -- Step 1 get lanes lanes=require("lanes").configure() ---~ package.path="lua/?/init.lua;lua/?.lua;"..package.path local multi = require("multi") -- get it all and have it on all lanes -isMainThread=true +multi.isMainThread=true function multi:canSystemThread() return true end @@ -108,8 +107,10 @@ function THREAD.hold(n) end repeat wait() until n() end +local rand = math.random(1,10000000) -- Step 5 Basic Threads! function multi:newSystemThread(name,func,...) + rand = math.random(1,10000000) local c={} local __self=c c.name=name @@ -117,6 +118,7 @@ function multi:newSystemThread(name,func,...) local THREAD_NAME=name local function func2(...) _G["THREAD_NAME"]=THREAD_NAME + math.randomseed(rand) func(...) end c.thread=lanes.gen("*", func2)(...) @@ -130,8 +132,7 @@ function multi:newSystemThread(name,func,...) c.status:OnUpdate(function(self) local v,err,t=self.link.thread:join(.001) if err then - multi.OnError:Fire(self.link,err) - print("Error in systemThread: '"..self.link.name.."' <"..err..">") + multi.OnError:Fire(self.link,err,"Error in systemThread: '"..self.link.name.."' <"..err..">") self:Destroy() end end) diff --git a/multi/integration/loveManager.lua b/multi/integration/loveManager.lua index e5d8121..bc886eb 100644 --- a/multi/integration/loveManager.lua +++ b/multi/integration/loveManager.lua @@ -1,3 +1,26 @@ +--[[ +MIT License + +Copyright (c) 2018 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] local multi = require("multi.compat.love2d") function multi:canSystemThread() return true diff --git a/multi/integration/luvitManager.lua b/multi/integration/luvitManager.lua index 5dded04..401501c 100644 --- a/multi/integration/luvitManager.lua +++ b/multi/integration/luvitManager.lua @@ -1,12 +1,12 @@ --[[ MIT License -Copyright (c) 2017 Ryan Ward +Copyright (c) 2018 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/multi/integration/networkManager.lua b/multi/integration/networkManager.lua index 3316358..ca620de 100644 --- a/multi/integration/networkManager.lua +++ b/multi/integration/networkManager.lua @@ -1,5 +1,26 @@ --- CURRENT TASK: +--[[ +MIT License +Copyright (c) 2018 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] local multi = require("multi") local net = require("net") require("bin") diff --git a/multi/integration/shared.lua b/multi/integration/shared.lua index d812dbd..1d16d2a 100644 --- a/multi/integration/shared.lua +++ b/multi/integration/shared.lua @@ -1,12 +1,12 @@ --[[ MIT License -Copyright (c) 2017 Ryan Ward +Copyright (c) 2018 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: @@ -114,141 +114,124 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann end function multi:newSystemThreadedConnection(name,protect) local c={} - c.name = name - c.protect=protect local sThread=multi.integration.THREAD local GLOBAL=multi.integration.GLOBAL + c.name = name or error("You must supply a name for this object!") + c.protect = protect or false + c.count = 0 + multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init() + local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init() + local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init() function c:init() local multi = require("multi") if multi:getPlatform()=="love2d" then GLOBAL=_G.GLOBAL sThread=_G.sThread end + local conns = 0 + local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init() + local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init() + local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init() + qSM:push("OK") local conn = {} - conn.name = self.name - conn.count = 0 - if isMainThread then - if GLOBAL[self.name.."THREADED_CONNQ"] then -- if this thing exists then lets grab it, we are doing something different here. instead of cleaning things up, we will gave a dedicated queue to manage things - conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init() - else - conn.queueCall = multi:newSystemThreadedQueue(self.name.."THREADED_CALLQ"):init() - end - else - local multi = require("multi") -- so things don't break, but also allows bi-directional connections to work - conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init() - end + conn.obj = multi:newConnection(self.protect) setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) - conn.obj=multi:newConnection(self.protect) function conn:connect(func) return self.obj(func) end - function conn:fConnect(func) - return self.obj:fConnect(func) - end function conn:holdUT(n) self.obj:holdUT(n) end - function conn:Bind(t) - self.obj:Bind(t) - end function conn:Remove() self.obj:Remove() end - function conn:getConnection(name,ingore) - return self.obj:getConnection(name,ingore) - end function conn:Fire(...) - local args = {...} - table.insert(args,1,multi.randomString(8)) - table.insert(args,1,self.name) - table.insert(args,1,"F") - self.queueCall:push(args) - if self.trigger_self then - self.obj:Fire(...) + local args = {multi.randomString(8),...} + for i = 1, conns do + qF:push(args) end end - self.cleanup = .01 - function conn:SetCleanUpRate(n) - self.cleanup=n or .01 - end - conn.lastid="" - conn.looper = multi:newLoop(function(self) - local con = self.link - local data = con.queueCall:peek() - if not data then return end - local id = data[3] - if data[1]=="F" and data[2]==con.name and con.lastid~=id then - con.lastid=id - table.remove(data,1)-- Remove the first 3 elements - table.remove(data,1)-- Remove the first 3 elements - table.remove(data,1)-- Remove the first 3 elements - con.obj:Fire(unpack(data)) - local alarm = multi:newAlarm(con.cleanup) - alarm:OnRing(function() - alarm:Destroy() - local dat = con.queueCall:peek() - if not dat then return end - table.remove(data,1)-- Remove the first 3 elements - table.remove(data,1)-- Remove the first 3 elements - table.remove(data,1)-- Remove the first 3 elements - if dat[3]==id then - con.queueCall:pop() - end - end) + local lastID = "" + local lastCount = 0 + multi:newThread("syncer",function() + while true do + thread.skip(1) + local fire = qF:peek() + local count = qS:peek() + if fire and fire[1]~=lastID then + lastID = fire[1] + qF:pop() + table.remove(fire,1) + conn.obj:Fire(unpack(fire)) + end + if count and count[1]~=lastCount then + conns = count[2] + lastCount = count[1] + qs:pop() + end end end) - conn.HoldUT=conn.holdUT - conn.looper.link=conn - conn.Connect=conn.connect - conn.FConnect=conn.fConnect - conn.GetConnection=conn.getConnection return conn end + multi:newThread("connSync",function() + while true do + thread.skip(1) + local syncIN = qsm:pop() + if syncIN then + if syncIN=="OK" then + c.count = c.count + 1 + else + c.count = c.count - 1 + end + local rand = math.random(1,1000000) + for i = 1, c.count do + qs:push({rand,c.count}) + end + end + end + end) GLOBAL[name]=c return c end -function multi:systemThreadedBenchmark(n,p) +function multi:systemThreadedBenchmark(n) n=n or 1 local cores=multi.integration.THREAD.getCores() - local queue=multi:newSystemThreadedQueue("QUEUE") - multi.integration.GLOBAL["__SYSTEMBENCHMARK__"]=n + local queue=multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init() local sThread=multi.integration.THREAD local GLOBAL=multi.integration.GLOBAL + local c = {} for i=1,cores do - multi:newSystemThread("STHREAD_BENCH",function() + multi:newSystemThread("STHREAD_BENCH",function(n) local multi = require("multi") if multi:getPlatform()=="love2d" then GLOBAL=_G.GLOBAL sThread=_G.sThread end -- we cannot have upvalues... in love2d globals, not locals must be used - queue=sThread.waitFor("QUEUE"):init() -- always wait for when looking for a variable at the start of the thread! - multi:benchMark(sThread.waitFor("__SYSTEMBENCHMARK__")):OnBench(function(self,count) + queue=sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread! + multi:benchMark(n):OnBench(function(self,count) queue:push(count) - multi:Stop() + sThread.kill() end) multi:mainloop() - end) - end - local c={} - c.tt=function() end - c.p=p - function c:OnBench(func) - self.tt=func + end,n) end multi:newThread("THREAD_BENCH",function() - thread.sleep(n+.1) - GLOBAL["QUEUE"]=nil -- time to clean up - local num=0 - data=queue:pop() - while data do - num=num+data - data=queue:pop() + local count = 0 + local cc = 0 + while true do + thread.skip(1) + local dat = queue:pop() + if dat then + cc=cc+1 + count = count + dat + if cc == cores then + c.OnBench:Fire(count) + thread.kill() + end + end end - if p then - print(tostring(p)..num) - end - c.tt(c,num) end) + c.OnBench = multi:newConnection() return c end function multi:newSystemThreadedConsole(name) @@ -263,7 +246,7 @@ function multi:newSystemThreadedConsole(name) sThread=_G.sThread end local cc={} - if isMainThread then + if multi.isMainThread then if GLOBAL["__SYSTEM_CONSLOE__"] then cc.stream = sThread.waitFor("__SYSTEM_CONSLOE__"):init() else @@ -304,14 +287,17 @@ function multi:newSystemThreadedTable(name) local sThread=multi.integration.THREAD local GLOBAL=multi.integration.GLOBAL function c:init() -- create an init function so we can mimic on both love2d and lanes + local multi = require("multi") if multi:getPlatform()=="love2d" then GLOBAL=_G.GLOBAL sThread=_G.sThread end local cc={} cc.tab={} - if isMainThread then - cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init() + if multi.isMainThread then + if not GLOBAL[self.name.."_Tabled_Connection"] then + cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init() + end else cc.conn = sThread.waitFor(self.name.."_Tabled_Connection"):init() end @@ -330,125 +316,112 @@ function multi:newSystemThreadedTable(name) __newindex=function(t,k,v) t.tab[k]=v t.conn:Fire(k,v) - end, + end }) return cc end GLOBAL[c.name]=c return c end -function multi:newSystemThreadedJobQueue(numOfCores) - local c={} +local jobqueuecount = 0 +function multi:newSystemThreadedJobQueue(a,b) + jobqueuecount=jobqueuecount+1 + local GLOBAL=multi.integration.GLOBAL + local sThread=multi.integration.THREAD + local c = {} + c.numberofcores = 4 + c.name = "SYSTEM_THREADED_JOBQUEUE_"..jobqueuecount + -- This is done to keep backwards compatability for older code + if type(a)=="string" and not(b) then + c.name = a + elseif type(a)=="number" and not (b) then + c.numberofcores = a + elseif type(a)=="string" and type(b)=="number" then + c.name = a + c.numberofcores = b + elseif type(a)=="number" and type(b)=="string" then + c.name = b + c.numberofcores = a + end + c.isReady = false c.jobnum=1 - c.cores=numOfCores or multi.integration.THREAD.getCores() - c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init() - c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init() - c.queueALL=multi:newSystemThreadedQueue("THREADED_QALL"):init() - c.REG=multi:newSystemThreadedQueue("THREADED_JQ_F_REG"):init() - c.OnReady=multi:newConnection() + c.OnJobCompleted = multi:newConnection() + local queueIN = self:newSystemThreadedQueue("QUEUE_IN_"..c.name):init() + local queueCC = self:newSystemThreadedQueue("QUEUE_CC_"..c.name):init() + local queueREG = self:newSystemThreadedQueue("QUEUE_REG_"..c.name):init() + local queueJD = self:newSystemThreadedQueue("QUEUE_JD_"..c.name):init() + local queueDA = self:newSystemThreadedQueue("QUEUE_DA_"..c.name):init() + c.OnReady = multi:newConnection() function c:registerJob(name,func) - for i=1,self.cores do - self.REG:push({name,func}) + for i = 1, self.numberofcores do + queueREG:push({name,func}) end end + c.tempQueue = {} function c:pushJob(name,...) - self.queueOUT:push({self.jobnum,name,...}) - self.jobnum=self.jobnum+1 - return self.jobnum-1 - end - local GLOBAL=multi.integration.GLOBAL -- set up locals in case we are using lanes - local sThread=multi.integration.THREAD -- set up locals in case we are using lanes - function c:doToAll(func) - local TaskName=multi.randomString(16) - for i=1,self.cores do - self.queueALL:push({TaskName,func}) + if not self.isReady then + table.insert(c.tempQueue,{self.jobnum,name,...}) + self.jobnum=self.jobnum+1 + return self.jobnum-1 + else + queueIN:push{self.jobnum,name,...} + self.jobnum=self.jobnum+1 + return self.jobnum-1 end end - function c:start() - multi:newEvent(function() - return self.ThreadsLoaded==true - end):OnEvent(function(evnt) - GLOBAL["THREADED_JQ"]=nil -- remove it - GLOBAL["THREADED_JQO"]=nil -- remove it - GLOBAL["THREADED_JQ_F_REG"]=nil -- remove it - self:doToAll(function() - _G["__started__"]=true - SFunc() - end) - evnt:Destroy() - end) + function c:doToAll(func) + for i = 1, self.numberofcores do + queueDA:push{multi.randomString(12),func} + end end - GLOBAL["__JQ_COUNT__"]=c.cores - for i=1,c.cores do - multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function(name,ind) + for i=1,c.numberofcores do + multi:newSystemThread(c.name.." Worker Thread #"..i,function(name) local multi = require("multi") - ThreadName=name - __sleep__=.001 if love then -- lets make sure we don't reference up-values if using love2d GLOBAL=_G.GLOBAL sThread=_G.sThread - __sleep__=.1 end - JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it - JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it - REG=sThread.waitFor("THREADED_JQ_F_REG"):init() -- Grab it - QALL=sThread.waitFor("THREADED_QALL"):init() -- Grab it - QALLT={} - FUNCS={} - SFunc=multi:newFunction(function(self) - MainLoop:Pause() - multi:newAlarm(.1):OnRing(function(alarm) - alarm:Destroy() - MainLoop:Resume() - end) - end) + local CC = sThread.waitFor("QUEUE_CC_"..name):init() + CC:push("ready") + local FUNCS={} + local ids = {} + local JQI = sThread.waitFor("QUEUE_IN_"..name):init() + local JD = sThread.waitFor("QUEUE_JD_"..name):init() + local REG = sThread.waitFor("QUEUE_REG_"..name):init() + local DA = sThread.waitFor("QUEUE_DA_"..name):init() + local lastjob = os.clock() multi:newLoop(function() + local job=JQI:pop() local rd=REG:peek() + local da=DA:peek() if rd then if not FUNCS[rd[1]] then FUNCS[rd[1]]=rd[2] - rd=nil -- lets clean up + rd=nil REG:pop() end end - local d=QALL:peek() - if d then - if not QALLT[d[1]] then - QALLT[d[1]]=true - d[2]() - d=nil -- lets clean up - QALL:pop() + if da then + if not ids[da[1]] then + local meh = da[1] + ids[da[1]]=true + da[2](multi) + da=nil + DA:pop() + multi:newAlarm(60):OnRing(function(a) + ids[meh] = nil + a:Destroy() + end) end end - end) - setmetatable(_G,{ - __index=function(t,k) - return FUNCS[k] - end - }) - lastjob=os.clock() - MainLoop=multi:newLoop(function(self) - if __started__ then - local job=JQI:pop() - if job then - lastjob=os.clock() - local d=QALL:peek() - if d then - if not QALLT[d[1]] then - QALLT[d[1]]=true - d[2]() - d=nil -- lets clean up - QALL:pop() - end - end - local ID=table.remove(job,1) -- return and remove - local name=table.remove(job,1) -- return and remove - if FUNCS[name] then - JQO:push({ID,FUNCS[name](unpack(job))}) - else - self:hold(function() return FUNCS[name] end) - JQO:push({ID,FUNCS[name](unpack(job))}) - end + if job then + lastjob = os.clock() + local ID=table.remove(job,1) -- return and remove + local _name=table.remove(job,1) -- return and remove + if FUNCS[_name] then + JD:push({ID,FUNCS[_name](unpack(job))}) + else -- making use of that new holding feature + JD:push({ID,FUNCS:waitFor(_name)(unpack(job))}) end end end) @@ -460,33 +433,40 @@ function multi:newSystemThreadedJobQueue(numOfCores) thread.sleep(.001) end end) - JQO:push({"_THREADINIT_"}) + setmetatable(_G,{ + __index=function(t,k) + return FUNCS[k] + end + }) if not love then multi:mainloop() end - end,"Thread<"..i..">",i) + end,c.name) end - c.OnJobCompleted=multi:newConnection() - c.threadsResponded = 0 - c.updater=multi:newLoop(function(self) - local data=self.link.queueIN:pop() - while data do - if data then - local a=unpack(data) - if a=="_THREADINIT_" then - self.link.threadsResponded=self.link.threadsResponded+1 - if self.link.threadsResponded==self.link.cores then - self.link.ThreadsLoaded=true - self.link.OnReady:Fire() - end - else - self.link.OnJobCompleted:Fire(unpack(data)) - end + multi:newThread("counter",function() + print("thread started") + local _count = 0 + while _count",...) +end) + +--~ jq = multi:newSystemThreadedJobQueue() +--~ jq:registerJob("test",function(a) +--~ return "Hello",a +--~ end) +--~ jq.OnJobCompleted(function(ID,...) +--~ print(ID,...) +--~ end) +--~ for i=1,16 do +--~ jq:pushJob("test",5) +--~ end +multi:mainloop()