diff --git a/Documentation.md b/Documentation.md
new file mode 100644
index 0000000..eec84c2
--- /dev/null
+++ b/Documentation.md
@@ -0,0 +1,4 @@
+
+ Q1: What is the best Language in the World?
+ A1: Lua
+
\ No newline at end of file
diff --git a/changes.md b/changes.md
index 13edb12..c687a88 100644
--- a/changes.md
+++ b/changes.md
@@ -1,5 +1,41 @@
#Changes
[TOC]
+Update 12.2.1 Time for some bug fixes!
+-------------
+Fixed: SystemThreadedJobQueues
+- You can now make as many job queues as you want! Just a warning when using a large amount of cores for the queue it takes a second or 2 to set up the jobqueues for data transfer. I am unsure if this is a lanes thing or not, but love2d has no such delay when setting up the jobqueue!
+- You now connect to the OnReady in the jobqueue object. No more holding everything else as you wait for a job queue to be ready
+- Jobqueues:doToAll now passes the queues multi interface as the first and currently only argument
+- No longer need to use jobqueue.OnReady() The code is smarter and will send the pushed jobs automatically when the threads are ready
+
+Fixed: SystemThreadedConnection
+- They work the exact same way as before, but actually work as expected now. The issue before was how i implemented it. Now each connection knows the number of instances of that object that ecist. This way I no longer have to do fancy timings that may or may not work. I can send exactly enough info for each connection to consume from the queue.
+
+Removed: multi:newQueuer
+- This feature has no real use after corutine based threads were introduced. You can use those to get the same effect as the queuer and do it better too.
+
+Going forward:
+- Will I ever finish steralization? Who knows, but being able to save state would be nice. The main issue is there is no simple way to save state. While I can provide methods to allow one to turn the objects into strings and back, there is no way for me to make your code work with it in a simple way. For now only the basic functions will be here.
+- I need to make better documentation for this library as well. In its current state, all I have are examples and not a list of what is what.
+
+# Example
+```lua
+package.path="?/init.lua;?.lua;"..package.path
+multi = require("multi")
+GLOBAL, THREAD = require("multi.integration.lanesManager").init()
+jq = multi:newSystemThreadedJobQueue()
+jq:registerJob("test",function(a)
+ return "Hello",a
+end)
+jq.OnJobCompleted(function(ID,...)
+ print(ID,...)
+end)
+for i=1,16 do
+ jq:pushJob("test",5)
+end
+multi:mainloop()
+```
+
Update 12.2.0
-------------
**Added:**
diff --git a/multi/compat/love2d.lua b/multi/compat/love2d.lua
index 4ee68d3..fe77db1 100644
--- a/multi/compat/love2d.lua
+++ b/multi/compat/love2d.lua
@@ -1,3 +1,26 @@
+--[[
+MIT License
+
+Copyright (c) 2018 Ryan Ward
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+]]
local multi = require("multi")
os.sleep=love.timer.sleep
multi.drawF={}
diff --git a/multi/init.lua b/multi/init.lua
index 9d55029..c35f9d2 100644
--- a/multi/init.lua
+++ b/multi/init.lua
@@ -1,7 +1,7 @@
--[[
MIT License
-Copyright (c) 2017 Ryan Ward
+Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -24,8 +24,8 @@ SOFTWARE.
local bin = pcall(require,"bin")
local multi = {}
local clock = os.clock
-multi.Version = "12.2.0"
-multi._VERSION = "12.2.0"
+multi.Version = "12.2.1"
+multi._VERSION = "12.2.1"
multi.stage = "stable"
multi.__index = multi
multi.Mainloop = {}
@@ -522,40 +522,6 @@ function multi:newProcess(file)
--~ c:IngoreObject()
return c
end
-function multi:newQueuer(file)
- local c=self:newProcess()
- c.Type='queue'
- c.last={}
- c.funcE={}
- function c:OnQueueCompleted(func)
- table.insert(self.funcE,func)
- return self
- end
- if file then
- self.Cself=c
- loadstring('local queue=multi.Cself '..io.open(file,'rb'):read('*all'))()
- end
- self:create(c)
- multi.OnObjectCreated(function(self)
- if self.Parent then
- if self.Parent.Type=="queue" then
- if self:isAnActor() then
- if self.Type=="alarm" then
- self.Active=false
- end
- self:Pause()
- self:connectFinal(multi.queuefinal)
- end
- end
- end
- end)
- function c:Start()
- self.Mainloop[#self.Mainloop]:Resume()
- self.l:Resume()
- return self
- end
- return c
-end
function multi:newTimer()
local c={}
c.Type='timer'
@@ -1078,7 +1044,10 @@ function multi:uManager(settings)
if settings.stopOnError then
stopOnError = settings.stopOnError
end
- multi.defaultSettings.p_i = settings.auto_stretch*self.Priority_Idle or self.Priority_Idle
+ multi.defaultSettings.p_i = self.Priority_Idle
+ if settings.auto_stretch then
+ multi.defaultSettings.p_i = settings.auto_stretch*self.Priority_Idle
+ end
multi.defaultSettings.delay = settings.auto_delay or 3
multi.defaultSettings.auto_lowerbound = settings.auto_lowerbound or self.Priority_Idle
protect = settings.protect
diff --git a/multi/integration/lanesManager.lua b/multi/integration/lanesManager.lua
index 68d1b47..ea201f4 100644
--- a/multi/integration/lanesManager.lua
+++ b/multi/integration/lanesManager.lua
@@ -1,12 +1,12 @@
--[[
MIT License
-Copyright (c) 2017 Ryan Ward
+Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
@@ -31,9 +31,8 @@ function os.getOS()
end
-- Step 1 get lanes
lanes=require("lanes").configure()
---~ package.path="lua/?/init.lua;lua/?.lua;"..package.path
local multi = require("multi") -- get it all and have it on all lanes
-isMainThread=true
+multi.isMainThread=true
function multi:canSystemThread()
return true
end
@@ -108,8 +107,10 @@ function THREAD.hold(n)
end
repeat wait() until n()
end
+local rand = math.random(1,10000000)
-- Step 5 Basic Threads!
function multi:newSystemThread(name,func,...)
+ rand = math.random(1,10000000)
local c={}
local __self=c
c.name=name
@@ -117,6 +118,7 @@ function multi:newSystemThread(name,func,...)
local THREAD_NAME=name
local function func2(...)
_G["THREAD_NAME"]=THREAD_NAME
+ math.randomseed(rand)
func(...)
end
c.thread=lanes.gen("*", func2)(...)
@@ -130,8 +132,7 @@ function multi:newSystemThread(name,func,...)
c.status:OnUpdate(function(self)
local v,err,t=self.link.thread:join(.001)
if err then
- multi.OnError:Fire(self.link,err)
- print("Error in systemThread: '"..self.link.name.."' <"..err..">")
+ multi.OnError:Fire(self.link,err,"Error in systemThread: '"..self.link.name.."' <"..err..">")
self:Destroy()
end
end)
diff --git a/multi/integration/loveManager.lua b/multi/integration/loveManager.lua
index e5d8121..bc886eb 100644
--- a/multi/integration/loveManager.lua
+++ b/multi/integration/loveManager.lua
@@ -1,3 +1,26 @@
+--[[
+MIT License
+
+Copyright (c) 2018 Ryan Ward
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+]]
local multi = require("multi.compat.love2d")
function multi:canSystemThread()
return true
diff --git a/multi/integration/luvitManager.lua b/multi/integration/luvitManager.lua
index 5dded04..401501c 100644
--- a/multi/integration/luvitManager.lua
+++ b/multi/integration/luvitManager.lua
@@ -1,12 +1,12 @@
--[[
MIT License
-Copyright (c) 2017 Ryan Ward
+Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
diff --git a/multi/integration/networkManager.lua b/multi/integration/networkManager.lua
index 3316358..ca620de 100644
--- a/multi/integration/networkManager.lua
+++ b/multi/integration/networkManager.lua
@@ -1,5 +1,26 @@
--- CURRENT TASK:
+--[[
+MIT License
+Copyright (c) 2018 Ryan Ward
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+]]
local multi = require("multi")
local net = require("net")
require("bin")
diff --git a/multi/integration/shared.lua b/multi/integration/shared.lua
index d812dbd..1d16d2a 100644
--- a/multi/integration/shared.lua
+++ b/multi/integration/shared.lua
@@ -1,12 +1,12 @@
--[[
MIT License
-Copyright (c) 2017 Ryan Ward
+Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
@@ -114,141 +114,124 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
end
function multi:newSystemThreadedConnection(name,protect)
local c={}
- c.name = name
- c.protect=protect
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
+ c.name = name or error("You must supply a name for this object!")
+ c.protect = protect or false
+ c.count = 0
+ multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init()
+ local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init()
+ local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init()
function c:init()
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
end
+ local conns = 0
+ local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init()
+ local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init()
+ local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init()
+ qSM:push("OK")
local conn = {}
- conn.name = self.name
- conn.count = 0
- if isMainThread then
- if GLOBAL[self.name.."THREADED_CONNQ"] then -- if this thing exists then lets grab it, we are doing something different here. instead of cleaning things up, we will gave a dedicated queue to manage things
- conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init()
- else
- conn.queueCall = multi:newSystemThreadedQueue(self.name.."THREADED_CALLQ"):init()
- end
- else
- local multi = require("multi") -- so things don't break, but also allows bi-directional connections to work
- conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init()
- end
+ conn.obj = multi:newConnection(self.protect)
setmetatable(conn,{__call=function(self,...) return self:connect(...) end})
- conn.obj=multi:newConnection(self.protect)
function conn:connect(func)
return self.obj(func)
end
- function conn:fConnect(func)
- return self.obj:fConnect(func)
- end
function conn:holdUT(n)
self.obj:holdUT(n)
end
- function conn:Bind(t)
- self.obj:Bind(t)
- end
function conn:Remove()
self.obj:Remove()
end
- function conn:getConnection(name,ingore)
- return self.obj:getConnection(name,ingore)
- end
function conn:Fire(...)
- local args = {...}
- table.insert(args,1,multi.randomString(8))
- table.insert(args,1,self.name)
- table.insert(args,1,"F")
- self.queueCall:push(args)
- if self.trigger_self then
- self.obj:Fire(...)
+ local args = {multi.randomString(8),...}
+ for i = 1, conns do
+ qF:push(args)
end
end
- self.cleanup = .01
- function conn:SetCleanUpRate(n)
- self.cleanup=n or .01
- end
- conn.lastid=""
- conn.looper = multi:newLoop(function(self)
- local con = self.link
- local data = con.queueCall:peek()
- if not data then return end
- local id = data[3]
- if data[1]=="F" and data[2]==con.name and con.lastid~=id then
- con.lastid=id
- table.remove(data,1)-- Remove the first 3 elements
- table.remove(data,1)-- Remove the first 3 elements
- table.remove(data,1)-- Remove the first 3 elements
- con.obj:Fire(unpack(data))
- local alarm = multi:newAlarm(con.cleanup)
- alarm:OnRing(function()
- alarm:Destroy()
- local dat = con.queueCall:peek()
- if not dat then return end
- table.remove(data,1)-- Remove the first 3 elements
- table.remove(data,1)-- Remove the first 3 elements
- table.remove(data,1)-- Remove the first 3 elements
- if dat[3]==id then
- con.queueCall:pop()
- end
- end)
+ local lastID = ""
+ local lastCount = 0
+ multi:newThread("syncer",function()
+ while true do
+ thread.skip(1)
+ local fire = qF:peek()
+ local count = qS:peek()
+ if fire and fire[1]~=lastID then
+ lastID = fire[1]
+ qF:pop()
+ table.remove(fire,1)
+ conn.obj:Fire(unpack(fire))
+ end
+ if count and count[1]~=lastCount then
+ conns = count[2]
+ lastCount = count[1]
+ qs:pop()
+ end
end
end)
- conn.HoldUT=conn.holdUT
- conn.looper.link=conn
- conn.Connect=conn.connect
- conn.FConnect=conn.fConnect
- conn.GetConnection=conn.getConnection
return conn
end
+ multi:newThread("connSync",function()
+ while true do
+ thread.skip(1)
+ local syncIN = qsm:pop()
+ if syncIN then
+ if syncIN=="OK" then
+ c.count = c.count + 1
+ else
+ c.count = c.count - 1
+ end
+ local rand = math.random(1,1000000)
+ for i = 1, c.count do
+ qs:push({rand,c.count})
+ end
+ end
+ end
+ end)
GLOBAL[name]=c
return c
end
-function multi:systemThreadedBenchmark(n,p)
+function multi:systemThreadedBenchmark(n)
n=n or 1
local cores=multi.integration.THREAD.getCores()
- local queue=multi:newSystemThreadedQueue("QUEUE")
- multi.integration.GLOBAL["__SYSTEMBENCHMARK__"]=n
+ local queue=multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init()
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
+ local c = {}
for i=1,cores do
- multi:newSystemThread("STHREAD_BENCH",function()
+ multi:newSystemThread("STHREAD_BENCH",function(n)
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
end -- we cannot have upvalues... in love2d globals, not locals must be used
- queue=sThread.waitFor("QUEUE"):init() -- always wait for when looking for a variable at the start of the thread!
- multi:benchMark(sThread.waitFor("__SYSTEMBENCHMARK__")):OnBench(function(self,count)
+ queue=sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread!
+ multi:benchMark(n):OnBench(function(self,count)
queue:push(count)
- multi:Stop()
+ sThread.kill()
end)
multi:mainloop()
- end)
- end
- local c={}
- c.tt=function() end
- c.p=p
- function c:OnBench(func)
- self.tt=func
+ end,n)
end
multi:newThread("THREAD_BENCH",function()
- thread.sleep(n+.1)
- GLOBAL["QUEUE"]=nil -- time to clean up
- local num=0
- data=queue:pop()
- while data do
- num=num+data
- data=queue:pop()
+ local count = 0
+ local cc = 0
+ while true do
+ thread.skip(1)
+ local dat = queue:pop()
+ if dat then
+ cc=cc+1
+ count = count + dat
+ if cc == cores then
+ c.OnBench:Fire(count)
+ thread.kill()
+ end
+ end
end
- if p then
- print(tostring(p)..num)
- end
- c.tt(c,num)
end)
+ c.OnBench = multi:newConnection()
return c
end
function multi:newSystemThreadedConsole(name)
@@ -263,7 +246,7 @@ function multi:newSystemThreadedConsole(name)
sThread=_G.sThread
end
local cc={}
- if isMainThread then
+ if multi.isMainThread then
if GLOBAL["__SYSTEM_CONSLOE__"] then
cc.stream = sThread.waitFor("__SYSTEM_CONSLOE__"):init()
else
@@ -304,14 +287,17 @@ function multi:newSystemThreadedTable(name)
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
function c:init() -- create an init function so we can mimic on both love2d and lanes
+ local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
end
local cc={}
cc.tab={}
- if isMainThread then
- cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init()
+ if multi.isMainThread then
+ if not GLOBAL[self.name.."_Tabled_Connection"] then
+ cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init()
+ end
else
cc.conn = sThread.waitFor(self.name.."_Tabled_Connection"):init()
end
@@ -330,125 +316,112 @@ function multi:newSystemThreadedTable(name)
__newindex=function(t,k,v)
t.tab[k]=v
t.conn:Fire(k,v)
- end,
+ end
})
return cc
end
GLOBAL[c.name]=c
return c
end
-function multi:newSystemThreadedJobQueue(numOfCores)
- local c={}
+local jobqueuecount = 0
+function multi:newSystemThreadedJobQueue(a,b)
+ jobqueuecount=jobqueuecount+1
+ local GLOBAL=multi.integration.GLOBAL
+ local sThread=multi.integration.THREAD
+ local c = {}
+ c.numberofcores = 4
+ c.name = "SYSTEM_THREADED_JOBQUEUE_"..jobqueuecount
+ -- This is done to keep backwards compatability for older code
+ if type(a)=="string" and not(b) then
+ c.name = a
+ elseif type(a)=="number" and not (b) then
+ c.numberofcores = a
+ elseif type(a)=="string" and type(b)=="number" then
+ c.name = a
+ c.numberofcores = b
+ elseif type(a)=="number" and type(b)=="string" then
+ c.name = b
+ c.numberofcores = a
+ end
+ c.isReady = false
c.jobnum=1
- c.cores=numOfCores or multi.integration.THREAD.getCores()
- c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init()
- c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init()
- c.queueALL=multi:newSystemThreadedQueue("THREADED_QALL"):init()
- c.REG=multi:newSystemThreadedQueue("THREADED_JQ_F_REG"):init()
- c.OnReady=multi:newConnection()
+ c.OnJobCompleted = multi:newConnection()
+ local queueIN = self:newSystemThreadedQueue("QUEUE_IN_"..c.name):init()
+ local queueCC = self:newSystemThreadedQueue("QUEUE_CC_"..c.name):init()
+ local queueREG = self:newSystemThreadedQueue("QUEUE_REG_"..c.name):init()
+ local queueJD = self:newSystemThreadedQueue("QUEUE_JD_"..c.name):init()
+ local queueDA = self:newSystemThreadedQueue("QUEUE_DA_"..c.name):init()
+ c.OnReady = multi:newConnection()
function c:registerJob(name,func)
- for i=1,self.cores do
- self.REG:push({name,func})
+ for i = 1, self.numberofcores do
+ queueREG:push({name,func})
end
end
+ c.tempQueue = {}
function c:pushJob(name,...)
- self.queueOUT:push({self.jobnum,name,...})
- self.jobnum=self.jobnum+1
- return self.jobnum-1
- end
- local GLOBAL=multi.integration.GLOBAL -- set up locals in case we are using lanes
- local sThread=multi.integration.THREAD -- set up locals in case we are using lanes
- function c:doToAll(func)
- local TaskName=multi.randomString(16)
- for i=1,self.cores do
- self.queueALL:push({TaskName,func})
+ if not self.isReady then
+ table.insert(c.tempQueue,{self.jobnum,name,...})
+ self.jobnum=self.jobnum+1
+ return self.jobnum-1
+ else
+ queueIN:push{self.jobnum,name,...}
+ self.jobnum=self.jobnum+1
+ return self.jobnum-1
end
end
- function c:start()
- multi:newEvent(function()
- return self.ThreadsLoaded==true
- end):OnEvent(function(evnt)
- GLOBAL["THREADED_JQ"]=nil -- remove it
- GLOBAL["THREADED_JQO"]=nil -- remove it
- GLOBAL["THREADED_JQ_F_REG"]=nil -- remove it
- self:doToAll(function()
- _G["__started__"]=true
- SFunc()
- end)
- evnt:Destroy()
- end)
+ function c:doToAll(func)
+ for i = 1, self.numberofcores do
+ queueDA:push{multi.randomString(12),func}
+ end
end
- GLOBAL["__JQ_COUNT__"]=c.cores
- for i=1,c.cores do
- multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function(name,ind)
+ for i=1,c.numberofcores do
+ multi:newSystemThread(c.name.." Worker Thread #"..i,function(name)
local multi = require("multi")
- ThreadName=name
- __sleep__=.001
if love then -- lets make sure we don't reference up-values if using love2d
GLOBAL=_G.GLOBAL
sThread=_G.sThread
- __sleep__=.1
end
- JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it
- JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it
- REG=sThread.waitFor("THREADED_JQ_F_REG"):init() -- Grab it
- QALL=sThread.waitFor("THREADED_QALL"):init() -- Grab it
- QALLT={}
- FUNCS={}
- SFunc=multi:newFunction(function(self)
- MainLoop:Pause()
- multi:newAlarm(.1):OnRing(function(alarm)
- alarm:Destroy()
- MainLoop:Resume()
- end)
- end)
+ local CC = sThread.waitFor("QUEUE_CC_"..name):init()
+ CC:push("ready")
+ local FUNCS={}
+ local ids = {}
+ local JQI = sThread.waitFor("QUEUE_IN_"..name):init()
+ local JD = sThread.waitFor("QUEUE_JD_"..name):init()
+ local REG = sThread.waitFor("QUEUE_REG_"..name):init()
+ local DA = sThread.waitFor("QUEUE_DA_"..name):init()
+ local lastjob = os.clock()
multi:newLoop(function()
+ local job=JQI:pop()
local rd=REG:peek()
+ local da=DA:peek()
if rd then
if not FUNCS[rd[1]] then
FUNCS[rd[1]]=rd[2]
- rd=nil -- lets clean up
+ rd=nil
REG:pop()
end
end
- local d=QALL:peek()
- if d then
- if not QALLT[d[1]] then
- QALLT[d[1]]=true
- d[2]()
- d=nil -- lets clean up
- QALL:pop()
+ if da then
+ if not ids[da[1]] then
+ local meh = da[1]
+ ids[da[1]]=true
+ da[2](multi)
+ da=nil
+ DA:pop()
+ multi:newAlarm(60):OnRing(function(a)
+ ids[meh] = nil
+ a:Destroy()
+ end)
end
end
- end)
- setmetatable(_G,{
- __index=function(t,k)
- return FUNCS[k]
- end
- })
- lastjob=os.clock()
- MainLoop=multi:newLoop(function(self)
- if __started__ then
- local job=JQI:pop()
- if job then
- lastjob=os.clock()
- local d=QALL:peek()
- if d then
- if not QALLT[d[1]] then
- QALLT[d[1]]=true
- d[2]()
- d=nil -- lets clean up
- QALL:pop()
- end
- end
- local ID=table.remove(job,1) -- return and remove
- local name=table.remove(job,1) -- return and remove
- if FUNCS[name] then
- JQO:push({ID,FUNCS[name](unpack(job))})
- else
- self:hold(function() return FUNCS[name] end)
- JQO:push({ID,FUNCS[name](unpack(job))})
- end
+ if job then
+ lastjob = os.clock()
+ local ID=table.remove(job,1) -- return and remove
+ local _name=table.remove(job,1) -- return and remove
+ if FUNCS[_name] then
+ JD:push({ID,FUNCS[_name](unpack(job))})
+ else -- making use of that new holding feature
+ JD:push({ID,FUNCS:waitFor(_name)(unpack(job))})
end
end
end)
@@ -460,33 +433,40 @@ function multi:newSystemThreadedJobQueue(numOfCores)
thread.sleep(.001)
end
end)
- JQO:push({"_THREADINIT_"})
+ setmetatable(_G,{
+ __index=function(t,k)
+ return FUNCS[k]
+ end
+ })
if not love then
multi:mainloop()
end
- end,"Thread<"..i..">",i)
+ end,c.name)
end
- c.OnJobCompleted=multi:newConnection()
- c.threadsResponded = 0
- c.updater=multi:newLoop(function(self)
- local data=self.link.queueIN:pop()
- while data do
- if data then
- local a=unpack(data)
- if a=="_THREADINIT_" then
- self.link.threadsResponded=self.link.threadsResponded+1
- if self.link.threadsResponded==self.link.cores then
- self.link.ThreadsLoaded=true
- self.link.OnReady:Fire()
- end
- else
- self.link.OnJobCompleted:Fire(unpack(data))
- end
+ multi:newThread("counter",function()
+ print("thread started")
+ local _count = 0
+ while _count",...)
+end)
+
+--~ jq = multi:newSystemThreadedJobQueue()
+--~ jq:registerJob("test",function(a)
+--~ return "Hello",a
+--~ end)
+--~ jq.OnJobCompleted(function(ID,...)
+--~ print(ID,...)
+--~ end)
+--~ for i=1,16 do
+--~ jq:pushJob("test",5)
+--~ end
+multi:mainloop()