12.2.1 is out

Fixed some systemthreaded objects. More tests are needed though.

About to start making real documentation soon!
This commit is contained in:
Ryan Ward 2018-09-17 11:33:23 -04:00
parent eebd942555
commit 7eb9cd37e9
10 changed files with 340 additions and 301 deletions

4
Documentation.md Normal file
View File

@ -0,0 +1,4 @@
<details>
<summary>Q1: What is the best Language in the World? </summary>
A1: Lua
</details>

View File

@ -1,5 +1,41 @@
#Changes #Changes
[TOC] [TOC]
Update 12.2.1 Time for some bug fixes!
-------------
Fixed: SystemThreadedJobQueues
- You can now make as many job queues as you want! Just a warning when using a large amount of cores for the queue it takes a second or 2 to set up the jobqueues for data transfer. I am unsure if this is a lanes thing or not, but love2d has no such delay when setting up the jobqueue!
- You now connect to the OnReady in the jobqueue object. No more holding everything else as you wait for a job queue to be ready
- Jobqueues:doToAll now passes the queues multi interface as the first and currently only argument
- No longer need to use jobqueue.OnReady() The code is smarter and will send the pushed jobs automatically when the threads are ready
Fixed: SystemThreadedConnection
- They work the exact same way as before, but actually work as expected now. The issue before was how i implemented it. Now each connection knows the number of instances of that object that ecist. This way I no longer have to do fancy timings that may or may not work. I can send exactly enough info for each connection to consume from the queue.
Removed: multi:newQueuer
- This feature has no real use after corutine based threads were introduced. You can use those to get the same effect as the queuer and do it better too.
Going forward:
- Will I ever finish steralization? Who knows, but being able to save state would be nice. The main issue is there is no simple way to save state. While I can provide methods to allow one to turn the objects into strings and back, there is no way for me to make your code work with it in a simple way. For now only the basic functions will be here.
- I need to make better documentation for this library as well. In its current state, all I have are examples and not a list of what is what.
# Example
```lua
package.path="?/init.lua;?.lua;"..package.path
multi = require("multi")
GLOBAL, THREAD = require("multi.integration.lanesManager").init()
jq = multi:newSystemThreadedJobQueue()
jq:registerJob("test",function(a)
return "Hello",a
end)
jq.OnJobCompleted(function(ID,...)
print(ID,...)
end)
for i=1,16 do
jq:pushJob("test",5)
end
multi:mainloop()
```
Update 12.2.0 Update 12.2.0
------------- -------------
**Added:** **Added:**

View File

@ -1,3 +1,26 @@
--[[
MIT License
Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi = require("multi") local multi = require("multi")
os.sleep=love.timer.sleep os.sleep=love.timer.sleep
multi.drawF={} multi.drawF={}

View File

@ -1,7 +1,7 @@
--[[ --[[
MIT License MIT License
Copyright (c) 2017 Ryan Ward Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
@ -24,8 +24,8 @@ SOFTWARE.
local bin = pcall(require,"bin") local bin = pcall(require,"bin")
local multi = {} local multi = {}
local clock = os.clock local clock = os.clock
multi.Version = "12.2.0" multi.Version = "12.2.1"
multi._VERSION = "12.2.0" multi._VERSION = "12.2.1"
multi.stage = "stable" multi.stage = "stable"
multi.__index = multi multi.__index = multi
multi.Mainloop = {} multi.Mainloop = {}
@ -522,40 +522,6 @@ function multi:newProcess(file)
--~ c:IngoreObject() --~ c:IngoreObject()
return c return c
end end
function multi:newQueuer(file)
local c=self:newProcess()
c.Type='queue'
c.last={}
c.funcE={}
function c:OnQueueCompleted(func)
table.insert(self.funcE,func)
return self
end
if file then
self.Cself=c
loadstring('local queue=multi.Cself '..io.open(file,'rb'):read('*all'))()
end
self:create(c)
multi.OnObjectCreated(function(self)
if self.Parent then
if self.Parent.Type=="queue" then
if self:isAnActor() then
if self.Type=="alarm" then
self.Active=false
end
self:Pause()
self:connectFinal(multi.queuefinal)
end
end
end
end)
function c:Start()
self.Mainloop[#self.Mainloop]:Resume()
self.l:Resume()
return self
end
return c
end
function multi:newTimer() function multi:newTimer()
local c={} local c={}
c.Type='timer' c.Type='timer'
@ -1078,7 +1044,10 @@ function multi:uManager(settings)
if settings.stopOnError then if settings.stopOnError then
stopOnError = settings.stopOnError stopOnError = settings.stopOnError
end end
multi.defaultSettings.p_i = settings.auto_stretch*self.Priority_Idle or self.Priority_Idle multi.defaultSettings.p_i = self.Priority_Idle
if settings.auto_stretch then
multi.defaultSettings.p_i = settings.auto_stretch*self.Priority_Idle
end
multi.defaultSettings.delay = settings.auto_delay or 3 multi.defaultSettings.delay = settings.auto_delay or 3
multi.defaultSettings.auto_lowerbound = settings.auto_lowerbound or self.Priority_Idle multi.defaultSettings.auto_lowerbound = settings.auto_lowerbound or self.Priority_Idle
protect = settings.protect protect = settings.protect

View File

@ -1,12 +1,12 @@
--[[ --[[
MIT License MIT License
Copyright (c) 2017 Ryan Ward Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
@ -31,9 +31,8 @@ function os.getOS()
end end
-- Step 1 get lanes -- Step 1 get lanes
lanes=require("lanes").configure() lanes=require("lanes").configure()
--~ package.path="lua/?/init.lua;lua/?.lua;"..package.path
local multi = require("multi") -- get it all and have it on all lanes local multi = require("multi") -- get it all and have it on all lanes
isMainThread=true multi.isMainThread=true
function multi:canSystemThread() function multi:canSystemThread()
return true return true
end end
@ -108,8 +107,10 @@ function THREAD.hold(n)
end end
repeat wait() until n() repeat wait() until n()
end end
local rand = math.random(1,10000000)
-- Step 5 Basic Threads! -- Step 5 Basic Threads!
function multi:newSystemThread(name,func,...) function multi:newSystemThread(name,func,...)
rand = math.random(1,10000000)
local c={} local c={}
local __self=c local __self=c
c.name=name c.name=name
@ -117,6 +118,7 @@ function multi:newSystemThread(name,func,...)
local THREAD_NAME=name local THREAD_NAME=name
local function func2(...) local function func2(...)
_G["THREAD_NAME"]=THREAD_NAME _G["THREAD_NAME"]=THREAD_NAME
math.randomseed(rand)
func(...) func(...)
end end
c.thread=lanes.gen("*", func2)(...) c.thread=lanes.gen("*", func2)(...)
@ -130,8 +132,7 @@ function multi:newSystemThread(name,func,...)
c.status:OnUpdate(function(self) c.status:OnUpdate(function(self)
local v,err,t=self.link.thread:join(.001) local v,err,t=self.link.thread:join(.001)
if err then if err then
multi.OnError:Fire(self.link,err) multi.OnError:Fire(self.link,err,"Error in systemThread: '"..self.link.name.."' <"..err..">")
print("Error in systemThread: '"..self.link.name.."' <"..err..">")
self:Destroy() self:Destroy()
end end
end) end)

View File

@ -1,3 +1,26 @@
--[[
MIT License
Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi = require("multi.compat.love2d") local multi = require("multi.compat.love2d")
function multi:canSystemThread() function multi:canSystemThread()
return true return true

View File

@ -1,12 +1,12 @@
--[[ --[[
MIT License MIT License
Copyright (c) 2017 Ryan Ward Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:

View File

@ -1,5 +1,26 @@
-- CURRENT TASK: --[[
MIT License
Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi = require("multi") local multi = require("multi")
local net = require("net") local net = require("net")
require("bin") require("bin")

View File

@ -1,12 +1,12 @@
--[[ --[[
MIT License MIT License
Copyright (c) 2017 Ryan Ward Copyright (c) 2018 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
@ -114,141 +114,124 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
end end
function multi:newSystemThreadedConnection(name,protect) function multi:newSystemThreadedConnection(name,protect)
local c={} local c={}
c.name = name
c.protect=protect
local sThread=multi.integration.THREAD local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL local GLOBAL=multi.integration.GLOBAL
c.name = name or error("You must supply a name for this object!")
c.protect = protect or false
c.count = 0
multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init()
local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init()
local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init()
function c:init() function c:init()
local multi = require("multi") local multi = require("multi")
if multi:getPlatform()=="love2d" then if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL GLOBAL=_G.GLOBAL
sThread=_G.sThread sThread=_G.sThread
end end
local conns = 0
local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init()
local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init()
local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init()
qSM:push("OK")
local conn = {} local conn = {}
conn.name = self.name conn.obj = multi:newConnection(self.protect)
conn.count = 0
if isMainThread then
if GLOBAL[self.name.."THREADED_CONNQ"] then -- if this thing exists then lets grab it, we are doing something different here. instead of cleaning things up, we will gave a dedicated queue to manage things
conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init()
else
conn.queueCall = multi:newSystemThreadedQueue(self.name.."THREADED_CALLQ"):init()
end
else
local multi = require("multi") -- so things don't break, but also allows bi-directional connections to work
conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init()
end
setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) setmetatable(conn,{__call=function(self,...) return self:connect(...) end})
conn.obj=multi:newConnection(self.protect)
function conn:connect(func) function conn:connect(func)
return self.obj(func) return self.obj(func)
end end
function conn:fConnect(func)
return self.obj:fConnect(func)
end
function conn:holdUT(n) function conn:holdUT(n)
self.obj:holdUT(n) self.obj:holdUT(n)
end end
function conn:Bind(t)
self.obj:Bind(t)
end
function conn:Remove() function conn:Remove()
self.obj:Remove() self.obj:Remove()
end end
function conn:getConnection(name,ingore)
return self.obj:getConnection(name,ingore)
end
function conn:Fire(...) function conn:Fire(...)
local args = {...} local args = {multi.randomString(8),...}
table.insert(args,1,multi.randomString(8)) for i = 1, conns do
table.insert(args,1,self.name) qF:push(args)
table.insert(args,1,"F")
self.queueCall:push(args)
if self.trigger_self then
self.obj:Fire(...)
end end
end end
self.cleanup = .01 local lastID = ""
function conn:SetCleanUpRate(n) local lastCount = 0
self.cleanup=n or .01 multi:newThread("syncer",function()
while true do
thread.skip(1)
local fire = qF:peek()
local count = qS:peek()
if fire and fire[1]~=lastID then
lastID = fire[1]
qF:pop()
table.remove(fire,1)
conn.obj:Fire(unpack(fire))
end
if count and count[1]~=lastCount then
conns = count[2]
lastCount = count[1]
qs:pop()
end end
conn.lastid=""
conn.looper = multi:newLoop(function(self)
local con = self.link
local data = con.queueCall:peek()
if not data then return end
local id = data[3]
if data[1]=="F" and data[2]==con.name and con.lastid~=id then
con.lastid=id
table.remove(data,1)-- Remove the first 3 elements
table.remove(data,1)-- Remove the first 3 elements
table.remove(data,1)-- Remove the first 3 elements
con.obj:Fire(unpack(data))
local alarm = multi:newAlarm(con.cleanup)
alarm:OnRing(function()
alarm:Destroy()
local dat = con.queueCall:peek()
if not dat then return end
table.remove(data,1)-- Remove the first 3 elements
table.remove(data,1)-- Remove the first 3 elements
table.remove(data,1)-- Remove the first 3 elements
if dat[3]==id then
con.queueCall:pop()
end end
end) end)
end
end)
conn.HoldUT=conn.holdUT
conn.looper.link=conn
conn.Connect=conn.connect
conn.FConnect=conn.fConnect
conn.GetConnection=conn.getConnection
return conn return conn
end end
multi:newThread("connSync",function()
while true do
thread.skip(1)
local syncIN = qsm:pop()
if syncIN then
if syncIN=="OK" then
c.count = c.count + 1
else
c.count = c.count - 1
end
local rand = math.random(1,1000000)
for i = 1, c.count do
qs:push({rand,c.count})
end
end
end
end)
GLOBAL[name]=c GLOBAL[name]=c
return c return c
end end
function multi:systemThreadedBenchmark(n,p) function multi:systemThreadedBenchmark(n)
n=n or 1 n=n or 1
local cores=multi.integration.THREAD.getCores() local cores=multi.integration.THREAD.getCores()
local queue=multi:newSystemThreadedQueue("QUEUE") local queue=multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init()
multi.integration.GLOBAL["__SYSTEMBENCHMARK__"]=n
local sThread=multi.integration.THREAD local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL local GLOBAL=multi.integration.GLOBAL
local c = {}
for i=1,cores do for i=1,cores do
multi:newSystemThread("STHREAD_BENCH",function() multi:newSystemThread("STHREAD_BENCH",function(n)
local multi = require("multi") local multi = require("multi")
if multi:getPlatform()=="love2d" then if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL GLOBAL=_G.GLOBAL
sThread=_G.sThread sThread=_G.sThread
end -- we cannot have upvalues... in love2d globals, not locals must be used end -- we cannot have upvalues... in love2d globals, not locals must be used
queue=sThread.waitFor("QUEUE"):init() -- always wait for when looking for a variable at the start of the thread! queue=sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread!
multi:benchMark(sThread.waitFor("__SYSTEMBENCHMARK__")):OnBench(function(self,count) multi:benchMark(n):OnBench(function(self,count)
queue:push(count) queue:push(count)
multi:Stop() sThread.kill()
end) end)
multi:mainloop() multi:mainloop()
end) end,n)
end
local c={}
c.tt=function() end
c.p=p
function c:OnBench(func)
self.tt=func
end end
multi:newThread("THREAD_BENCH",function() multi:newThread("THREAD_BENCH",function()
thread.sleep(n+.1) local count = 0
GLOBAL["QUEUE"]=nil -- time to clean up local cc = 0
local num=0 while true do
data=queue:pop() thread.skip(1)
while data do local dat = queue:pop()
num=num+data if dat then
data=queue:pop() cc=cc+1
count = count + dat
if cc == cores then
c.OnBench:Fire(count)
thread.kill()
end
end end
if p then
print(tostring(p)..num)
end end
c.tt(c,num)
end) end)
c.OnBench = multi:newConnection()
return c return c
end end
function multi:newSystemThreadedConsole(name) function multi:newSystemThreadedConsole(name)
@ -263,7 +246,7 @@ function multi:newSystemThreadedConsole(name)
sThread=_G.sThread sThread=_G.sThread
end end
local cc={} local cc={}
if isMainThread then if multi.isMainThread then
if GLOBAL["__SYSTEM_CONSLOE__"] then if GLOBAL["__SYSTEM_CONSLOE__"] then
cc.stream = sThread.waitFor("__SYSTEM_CONSLOE__"):init() cc.stream = sThread.waitFor("__SYSTEM_CONSLOE__"):init()
else else
@ -304,14 +287,17 @@ function multi:newSystemThreadedTable(name)
local sThread=multi.integration.THREAD local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL local GLOBAL=multi.integration.GLOBAL
function c:init() -- create an init function so we can mimic on both love2d and lanes function c:init() -- create an init function so we can mimic on both love2d and lanes
local multi = require("multi")
if multi:getPlatform()=="love2d" then if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL GLOBAL=_G.GLOBAL
sThread=_G.sThread sThread=_G.sThread
end end
local cc={} local cc={}
cc.tab={} cc.tab={}
if isMainThread then if multi.isMainThread then
if not GLOBAL[self.name.."_Tabled_Connection"] then
cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init() cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init()
end
else else
cc.conn = sThread.waitFor(self.name.."_Tabled_Connection"):init() cc.conn = sThread.waitFor(self.name.."_Tabled_Connection"):init()
end end
@ -330,125 +316,112 @@ function multi:newSystemThreadedTable(name)
__newindex=function(t,k,v) __newindex=function(t,k,v)
t.tab[k]=v t.tab[k]=v
t.conn:Fire(k,v) t.conn:Fire(k,v)
end, end
}) })
return cc return cc
end end
GLOBAL[c.name]=c GLOBAL[c.name]=c
return c return c
end end
function multi:newSystemThreadedJobQueue(numOfCores) local jobqueuecount = 0
local c={} function multi:newSystemThreadedJobQueue(a,b)
jobqueuecount=jobqueuecount+1
local GLOBAL=multi.integration.GLOBAL
local sThread=multi.integration.THREAD
local c = {}
c.numberofcores = 4
c.name = "SYSTEM_THREADED_JOBQUEUE_"..jobqueuecount
-- This is done to keep backwards compatability for older code
if type(a)=="string" and not(b) then
c.name = a
elseif type(a)=="number" and not (b) then
c.numberofcores = a
elseif type(a)=="string" and type(b)=="number" then
c.name = a
c.numberofcores = b
elseif type(a)=="number" and type(b)=="string" then
c.name = b
c.numberofcores = a
end
c.isReady = false
c.jobnum=1 c.jobnum=1
c.cores=numOfCores or multi.integration.THREAD.getCores() c.OnJobCompleted = multi:newConnection()
c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init() local queueIN = self:newSystemThreadedQueue("QUEUE_IN_"..c.name):init()
c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init() local queueCC = self:newSystemThreadedQueue("QUEUE_CC_"..c.name):init()
c.queueALL=multi:newSystemThreadedQueue("THREADED_QALL"):init() local queueREG = self:newSystemThreadedQueue("QUEUE_REG_"..c.name):init()
c.REG=multi:newSystemThreadedQueue("THREADED_JQ_F_REG"):init() local queueJD = self:newSystemThreadedQueue("QUEUE_JD_"..c.name):init()
c.OnReady=multi:newConnection() local queueDA = self:newSystemThreadedQueue("QUEUE_DA_"..c.name):init()
c.OnReady = multi:newConnection()
function c:registerJob(name,func) function c:registerJob(name,func)
for i=1,self.cores do for i = 1, self.numberofcores do
self.REG:push({name,func}) queueREG:push({name,func})
end end
end end
c.tempQueue = {}
function c:pushJob(name,...) function c:pushJob(name,...)
self.queueOUT:push({self.jobnum,name,...}) if not self.isReady then
table.insert(c.tempQueue,{self.jobnum,name,...})
self.jobnum=self.jobnum+1
return self.jobnum-1
else
queueIN:push{self.jobnum,name,...}
self.jobnum=self.jobnum+1 self.jobnum=self.jobnum+1
return self.jobnum-1 return self.jobnum-1
end end
local GLOBAL=multi.integration.GLOBAL -- set up locals in case we are using lanes end
local sThread=multi.integration.THREAD -- set up locals in case we are using lanes
function c:doToAll(func) function c:doToAll(func)
local TaskName=multi.randomString(16) for i = 1, self.numberofcores do
for i=1,self.cores do queueDA:push{multi.randomString(12),func}
self.queueALL:push({TaskName,func})
end end
end end
function c:start() for i=1,c.numberofcores do
multi:newEvent(function() multi:newSystemThread(c.name.." Worker Thread #"..i,function(name)
return self.ThreadsLoaded==true
end):OnEvent(function(evnt)
GLOBAL["THREADED_JQ"]=nil -- remove it
GLOBAL["THREADED_JQO"]=nil -- remove it
GLOBAL["THREADED_JQ_F_REG"]=nil -- remove it
self:doToAll(function()
_G["__started__"]=true
SFunc()
end)
evnt:Destroy()
end)
end
GLOBAL["__JQ_COUNT__"]=c.cores
for i=1,c.cores do
multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function(name,ind)
local multi = require("multi") local multi = require("multi")
ThreadName=name
__sleep__=.001
if love then -- lets make sure we don't reference up-values if using love2d if love then -- lets make sure we don't reference up-values if using love2d
GLOBAL=_G.GLOBAL GLOBAL=_G.GLOBAL
sThread=_G.sThread sThread=_G.sThread
__sleep__=.1
end end
JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it local CC = sThread.waitFor("QUEUE_CC_"..name):init()
JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it CC:push("ready")
REG=sThread.waitFor("THREADED_JQ_F_REG"):init() -- Grab it local FUNCS={}
QALL=sThread.waitFor("THREADED_QALL"):init() -- Grab it local ids = {}
QALLT={} local JQI = sThread.waitFor("QUEUE_IN_"..name):init()
FUNCS={} local JD = sThread.waitFor("QUEUE_JD_"..name):init()
SFunc=multi:newFunction(function(self) local REG = sThread.waitFor("QUEUE_REG_"..name):init()
MainLoop:Pause() local DA = sThread.waitFor("QUEUE_DA_"..name):init()
multi:newAlarm(.1):OnRing(function(alarm) local lastjob = os.clock()
alarm:Destroy()
MainLoop:Resume()
end)
end)
multi:newLoop(function() multi:newLoop(function()
local job=JQI:pop()
local rd=REG:peek() local rd=REG:peek()
local da=DA:peek()
if rd then if rd then
if not FUNCS[rd[1]] then if not FUNCS[rd[1]] then
FUNCS[rd[1]]=rd[2] FUNCS[rd[1]]=rd[2]
rd=nil -- lets clean up rd=nil
REG:pop() REG:pop()
end end
end end
local d=QALL:peek() if da then
if d then if not ids[da[1]] then
if not QALLT[d[1]] then local meh = da[1]
QALLT[d[1]]=true ids[da[1]]=true
d[2]() da[2](multi)
d=nil -- lets clean up da=nil
QALL:pop() DA:pop()
end multi:newAlarm(60):OnRing(function(a)
end ids[meh] = nil
a:Destroy()
end) end)
setmetatable(_G,{
__index=function(t,k)
return FUNCS[k]
end end
}) end
lastjob=os.clock()
MainLoop=multi:newLoop(function(self)
if __started__ then
local job=JQI:pop()
if job then if job then
lastjob=os.clock() lastjob = os.clock()
local d=QALL:peek()
if d then
if not QALLT[d[1]] then
QALLT[d[1]]=true
d[2]()
d=nil -- lets clean up
QALL:pop()
end
end
local ID=table.remove(job,1) -- return and remove local ID=table.remove(job,1) -- return and remove
local name=table.remove(job,1) -- return and remove local _name=table.remove(job,1) -- return and remove
if FUNCS[name] then if FUNCS[_name] then
JQO:push({ID,FUNCS[name](unpack(job))}) JD:push({ID,FUNCS[_name](unpack(job))})
else else -- making use of that new holding feature
self:hold(function() return FUNCS[name] end) JD:push({ID,FUNCS:waitFor(_name)(unpack(job))})
JQO:push({ID,FUNCS[name](unpack(job))})
end
end end
end end
end) end)
@ -460,33 +433,40 @@ function multi:newSystemThreadedJobQueue(numOfCores)
thread.sleep(.001) thread.sleep(.001)
end end
end) end)
JQO:push({"_THREADINIT_"}) setmetatable(_G,{
__index=function(t,k)
return FUNCS[k]
end
})
if not love then if not love then
multi:mainloop() multi:mainloop()
end end
end,"Thread<"..i..">",i) end,c.name)
end end
c.OnJobCompleted=multi:newConnection() multi:newThread("counter",function()
c.threadsResponded = 0 print("thread started")
c.updater=multi:newLoop(function(self) local _count = 0
local data=self.link.queueIN:pop() while _count<c.numberofcores do
while data do thread.skip(1)
if data then if queueCC:pop() then
local a=unpack(data) _count = _count + 1
if a=="_THREADINIT_" then
self.link.threadsResponded=self.link.threadsResponded+1
if self.link.threadsResponded==self.link.cores then
self.link.ThreadsLoaded=true
self.link.OnReady:Fire()
end
else
self.link.OnJobCompleted:Fire(unpack(data))
end end
end end
data=self.link.queueIN:pop() c.isReady = true
for i=1,#c.tempQueue do
queueIN:push(c.tempQueue[i])
end
c.tempQueue = nil
c.OnReady:Fire(c)
local dat
while true do
thread.skip(1)
dat = queueJD:pop()
if dat then
c.OnJobCompleted:Fire(unpack(dat))
end
end end
end) end)
c.updater.link=c
return c return c
end end
function multi:newSystemThreadedExecute(cmd) function multi:newSystemThreadedExecute(cmd)

View File

@ -1,54 +1,36 @@
package.path="?/init.lua;?.lua;"..package.path package.path="?/init.lua;?.lua;"..package.path
--~ package.cpath="./?.dll;"..package.cpath
--~ time = require("time")
--~ local d1 = time.date(2012, 4, 30)
--~ a=time.nowlocal()
--~ while true do
--~ print(time.nowlocal():ticks())
--~ end
multi = require("multi") multi = require("multi")
--~ multi:newTLoop(function(self) local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
--~ a = 0 conn = multi:newSystemThreadedConnection("test"):init()
--~ end,.001) multi:newSystemThread("Work",function()
--~ multi:benchMark(1,nil,"Steps/s:"):OnBench(function() local multi = require("multi")
--~ os.exit() conn = THREAD.waitFor("test"):init()
--~ end) conn(function(...)
function multi:ResetPriority() print(...)
self.solid = false
end
local clock = os.clock
function sleep(n) -- seconds
local t0 = clock()
while clock() - t0 <= n do end
end
local a=0
local b=0
local c=0
multi:benchMark(1,multi.Priority_Core,"Regular Bench:"):OnBench(function() -- the onbench() allows us to do each bench after each other!
print("AutoP\n---------------")
multi:newLoop(function()
a=a+1
end)
t=multi:newLoop(function()
c=c+1
sleep(.001)
end)
multi:newLoop(function()
b=b+1
end)
multi:benchMark(1,multi.Priority_Core,"Hmm:"):OnBench(function()
multi.nextStep(function()
print(a,b,c)
--~ os.exit()
end)
end) end)
multi:newTLoop(function()
conn:Fire("meh2")
end,1)
multi:mainloop()
end) end)
settings = { multi.OnError(function(a,b,c)
--~ priority = 3, -- this is overwritten while auto_priority is being used! You can also use -1 for this setting as well print(c)
auto_priority = true, end)
auto_stretch = 1000, multi:newTLoop(function()
auto_lowerbound = multi.Priority_Idle conn:Fire("meh")
} end,1)
while true do conn(function(...)
multi:uManager(settings) print(">",...)
end end)
--~ jq = multi:newSystemThreadedJobQueue()
--~ jq:registerJob("test",function(a)
--~ return "Hello",a
--~ end)
--~ jq.OnJobCompleted(function(ID,...)
--~ print(ID,...)
--~ end)
--~ for i=1,16 do
--~ jq:pushJob("test",5)
--~ end
multi:mainloop()