1.8.4 Update! #3

Merged
rayaman merged 6 commits from Testing into master 2017-07-03 14:01:10 -04:00
10 changed files with 220 additions and 25 deletions
Showing only changes of commit fe123f879e - Show all commits

View File

@ -0,0 +1,26 @@
package.path="?/init.lua;"..package.path
local GLOBAL,sThread=require("multi.integration.lanesManager").init()
jQueue=multi:newSystemThreadedJobQueue() -- this internally creates System threads, We told it to use a maximum of 3 cores at any given time
jQueue:registerJob("TEST_JOB",function(a,s)
math.randomseed(s)
-- We will push a random #
TEST_JOB2() -- You can call other registered functions as well!
return math.random(0,255) -- send the result to the main thread
end)
jQueue:registerJob("TEST_JOB2",function()
print("Test Works!")
end)
tableOfOrder={}
jQueue.OnJobCompleted(function(JOBID,n)
-- JOBID is the completed job, starts at 1 and counts up by 1.
-- Threads finish at different times so jobids may be returned out of order! Be sure to have a way to order them
tableOfOrder[JOBID]=n -- we order ours by putting them into a table
if #tableOfOrder==10 then
print("We got all of the pieces!")
end
end)
for i=1,10 do -- Job Name of registered function, ... varargs
jQueue:pushJob("TEST_JOB","This is a test!",math.random(1,1000000))
end
print("I pushed all of the jobs :)")
multi:mainloop()

View File

@ -0,0 +1,52 @@
--~ require("core.Library")
--~ GLOBAL,sThread=require("multi.integration.loveManager").init() -- load the love2d version of the lanesManager and requires the entire multi library
--~ require("core.GuiManager")
--~ gui.ff.Color=Color.Black
--~ test=multi:newSystemThreadedTable("YO"):init()
--~ test["test1"]="lol"
--~ multi:newSystemThread("test",function()
--~ tab=sThread.waitFor("YO"):init()
--~ print(tab["test1"])
--~ sThread.sleep(3)
--~ tab["test2"]="Whats so funny?"
--~ end)
--~ multi:newThread("test2",function()
--~ thread.sleep(1)
--~ print(test:waitFor("test2"))
--~ t.text="DONE!"
--~ end)
--~ t=gui:newTextLabel("no done yet!",0,0,300,100)
--~ t:centerX()
--~ t:centerY()
require("core.Library")
GLOBAL,sThread=require("multi.integration.loveManager").init() -- load the love2d version of the lanesManager and requires the entire multi library
require("core.GuiManager")
gui.ff.Color=Color.Black
jQueue=multi:newSystemThreadedJobQueue() -- this internally creates System threads, We told it to use a maximum of 3 cores at any given time
jQueue:registerJob("TEST_JOB",function(a,s)
math.randomseed(s)
print("testing...")
-- We will push a random #
TEST_JOB2() -- You can call other registered functions as well!
return math.random(0,255) -- send the result to the main thread
end)
jQueue:registerJob("TEST_JOB2",function(a,s)
print("Test Works!")
end)
tableOfOrder={}
jQueue.OnJobCompleted(function(JOBID,n)
-- JOBID is the completed job, starts at 1 and counts up by 1.
-- Threads finish at different times so jobids may be returned out of order! Be sure to have a way to order them
tableOfOrder[JOBID]=n -- we order ours by putting them into a table
if #tableOfOrder==10 then
print("We got all of the pieces!")
end
end)
for i=1,10 do -- Job Name of registered function, ... varargs
jQueue:pushJob("TEST_JOB","This is a test!",math.random(1,1000000))
end
print("I pushed all of the jobs :)")
t=gui:newTextLabel("no done yet!",0,0,300,100)
t:centerX()
t:centerY()

View File

@ -29,10 +29,16 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
function c:init() -- create an init function so we can mimic on bith love2d and lanes function c:init() -- create an init function so we can mimic on bith love2d and lanes
self.chan=love.thread.getChannel(self.name) -- create channel by the name self.name self.chan=love.thread.getChannel(self.name) -- create channel by the name self.name
function self:push(v) -- push to the channel function self:push(v) -- push to the channel
self.chan:push(v) self.chan:push({type(v),resolveData(v)})
end end
function self:pop() -- pop from the channel function self:pop() -- pop from the channel
return self.chan:pop() local tab=self.chan:pop()
if not tab then return end
return resolveType(tab[1],tab[2])
end
function self:peek()
local tp,d=unpack{self.chan:peek()}
return resolveType(tp,d)
end end
GLOBAL[self.name]=self -- send the object to the thread through the global interface GLOBAL[self.name]=self -- send the object to the thread through the global interface
return self -- return the object return self -- return the object
@ -49,20 +55,23 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
function c:pop() -- pop the queue function c:pop() -- pop the queue
return ({self.linda:receive(0,"Q")})[2] return ({self.linda:receive(0,"Q")})[2]
end end
function c:peek()
return self.linda:get("Q")
end
function c:init() -- mimic the feature that love2d requires, so code can be consistent function c:init() -- mimic the feature that love2d requires, so code can be consistent
return self return self
end end
multi.intergration.GLOBAL[name]=c -- send the object to the thread through the global interface multi.integration.GLOBAL[name]=c -- send the object to the thread through the global interface
end end
return c return c
end end
function multi:systemThreadedBenchmark(n,p) function multi:systemThreadedBenchmark(n,p)
n=n or 1 n=n or 1
local cores=multi.intergration.THREAD.getCores() local cores=multi.integration.THREAD.getCores()
local queue=multi:newSystemThreadedQueue("QUEUE") local queue=multi:newSystemThreadedQueue("QUEUE")
multi.intergration.GLOBAL["__SYSTEMBENCHMARK__"]=n multi.integration.GLOBAL["__SYSTEMBENCHMARK__"]=n
local sThread=multi.intergration.THREAD local sThread=multi.integration.THREAD
local GLOBAL=multi.intergration.GLOBAL local GLOBAL=multi.integration.GLOBAL
for i=1,cores do for i=1,cores do
multi:newSystemThread("STHREAD_BENCH",function() multi:newSystemThread("STHREAD_BENCH",function()
require("multi") require("multi")
@ -170,7 +179,130 @@ function multi:newSystemThreadedTable(name)
}) })
return self return self
end end
multi.intergration.GLOBAL[name]=c -- send the object to the thread through the global interface multi.integration.GLOBAL[name]=c -- send the object to the thread through the global interface
end end
return c return c
end end
function multi:newSystemThreadedJobQueue(numOfCores)
local c={}
c.jobnum=1
c.cores=numOfCores or multi.integration.THREAD.getCores()
c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init()
c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init()
c.REG=multi:newSystemThreadedTable("THREADED_JQ_F_REG"):init()
-- registerJob(name,func)
-- pushJob(...)
function c:registerJob(name,func)
self.REG[name]=func
end
function c:pushJob(name,...)
self.queueOUT:push({self.jobnum,name,...})
self.jobnum=self.jobnum+1
end
local GLOBAL=multi.integration.GLOBAL -- set up locals incase we are using lanes
local sThread=multi.integration.THREAD -- set up locals incase we are using lanes
GLOBAL["__JQ_COUNT__"]=c.cores
for i=1,c.cores do
multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function()
require("multi")
__sleep__=.001
if love then -- lets make sure we don't reference upvalues if using love2d
GLOBAL=_G.GLOBAL
sThread=_G.sThread
__sleep__=.1
end
JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it
JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it
FGLOBAL=sThread.waitFor("THREADED_JQ_F_REG"):init() -- Grab it
sThread.sleep(.1) -- lets wait for things to work out
setmetatable(_G,{
__index=FGLOBAL
})
GLOBAL["THREADED_JQ"]=nil -- remove it
GLOBAL["THREADED_JQO"]=nil -- remove it
GLOBAL["THREADED_JQ_F_REG"]=nil -- remove it
multi:newLoop(function()
sThread.sleep(__sleep__) -- lets allow cpu time for other processes on our system!
local job=JQI:pop()
if job then
local ID=table.remove(job,1) -- return and remove
local name=table.remove(job,1) -- return and remove
local ret={FGLOBAL:waitFor(name)(unpack(job))} -- unpack the rest
JQO:push({ID,ret})
end
end)
multi:mainloop()
end)
end
c.OnJobCompleted=multi:newConnection()
c.updater=multi:newLoop(function(self)
local data=self.link.queueIN:pop()
while data do
if data then
self.link.OnJobCompleted:Fire(unpack(data))
end
data=self.link.queueIN:pop()
end
end)
c.updater.link=c
return c
end
if love then
if love.thread then
function multi:newSystemThreadedJobQueue(numOfCores)
local c={}
c.jobnum=1
c.cores=numOfCores or multi.integration.THREAD.getCores()
c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init()
c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init()
function c:registerJob(name,func)
GLOBAL["__TJQ__"..name.."__"]=func
end
function c:pushJob(name,...)
self.queueOUT:push({self.jobnum,name,...})
self.jobnum=self.jobnum+1
end
local GLOBAL=multi.integration.GLOBAL -- set up locals incase we are using lanes
local sThread=multi.integration.THREAD -- set up locals incase we are using lanes
GLOBAL["__JQ_COUNT__"]=c.cores
for i=1,c.cores do
multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function()
GLOBAL=_G.GLOBAL
sThread=_G.sThread
local JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it
local JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it
sThread.sleep(.1) -- lets wait for things to work out
setmetatable(_G,{
__index=function(t,k,v)
return GLOBAL["__TJQ__"..k.."__"]
end
})
GLOBAL["THREADED_JQ"]=nil -- remove it
GLOBAL["THREADED_JQO"]=nil -- remove it
multi:newLoop(function()
sThread.sleep(.001) -- lets allow cpu time for other processes on our system!
local job=JQI:pop()
if job then
local ID=table.remove(job,1) -- return and remove
local name=table.remove(job,1) -- return and remove
local ret={sThread.waitFor("__TJQ__"..name.."__")(unpack(job))} -- unpack the rest
JQO:push({ID,ret})
end
end)
end)
end
c.OnJobCompleted=multi:newConnection()
c.updater=multi:newLoop(function(self)
local data=self.link.queueIN:pop()
while data do
if data then
self.link.OnJobCompleted:Fire(unpack(data))
end
data=self.link.queueIN:pop()
end
end)
c.updater.link=c
return c
end
end
end

View File

@ -29,6 +29,7 @@ setmetatable(GLOBAL,{
function __sync__() function __sync__()
local data=__mythread__:pop() local data=__mythread__:pop()
while data do while data do
love.timer.sleep(.001)
if type(data)=="string" then if type(data)=="string" then
local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)") local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)")
if name=="__DIEPLZ"..__THREADNAME__.."__" then if name=="__DIEPLZ"..__THREADNAME__.."__" then
@ -153,23 +154,7 @@ function sThread.hold(n)
repeat __sync__() until n() repeat __sync__() until n()
end end
updater=multi:newUpdater() updater=multi:newUpdater()
updater:OnUpdate(function(self) updater:OnUpdate(__sync__)
local data=__mythread__:pop()
while data do
if type(data)=="string" then
local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)")
if name=="__DIEPLZ"..__THREADNAME__.."__" then
error("Thread: "..__THREADNAME__.." has been stopped!")
end
if cmd=="SYNC" then
__proxy__[name]=resolveType(tp,d)
end
else
__proxy__[name]=data
end
data=__mythread__:pop()
end
end)
func=loadDump([=[INSERT_USER_CODE]=])() func=loadDump([=[INSERT_USER_CODE]=])()
multi:mainloop() multi:mainloop()
]] ]]