From fe123f879ed84822c2270a0998d298259f23c1b8 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 2 Jul 2017 22:55:46 -0400 Subject: [PATCH] Still fixing bugs --- ...ergratetest.lua => lanesintegratetest.lua} | 0 ...gratetest2.lua => lanesintegratetest2.lua} | 0 ...gratetest3.lua => lanesintegratetest3.lua} | 0 ...gratetest4.lua => lanesintegratetest4.lua} | 0 ...gratetest5.lua => lanesintegratetest5.lua} | 0 ...gratetest6.lua => lanesintegratetest6.lua} | 0 examples/lanesintegratetest7.lua | 26 +++ examples/love2d Threading Example/main5.lua | 52 ++++++ .../multi/integration/shared/shared.lua | 148 +++++++++++++++++- multi/integration/loveManager.lua | 19 +-- 10 files changed, 220 insertions(+), 25 deletions(-) rename examples/{lanesintergratetest.lua => lanesintegratetest.lua} (100%) rename examples/{lanesintergratetest2.lua => lanesintegratetest2.lua} (100%) rename examples/{lanesintergratetest3.lua => lanesintegratetest3.lua} (100%) rename examples/{lanesintergratetest4.lua => lanesintegratetest4.lua} (100%) rename examples/{lanesintergratetest5.lua => lanesintegratetest5.lua} (100%) rename examples/{lanesintergratetest6.lua => lanesintegratetest6.lua} (100%) create mode 100644 examples/lanesintegratetest7.lua create mode 100644 examples/love2d Threading Example/main5.lua diff --git a/examples/lanesintergratetest.lua b/examples/lanesintegratetest.lua similarity index 100% rename from examples/lanesintergratetest.lua rename to examples/lanesintegratetest.lua diff --git a/examples/lanesintergratetest2.lua b/examples/lanesintegratetest2.lua similarity index 100% rename from examples/lanesintergratetest2.lua rename to examples/lanesintegratetest2.lua diff --git a/examples/lanesintergratetest3.lua b/examples/lanesintegratetest3.lua similarity index 100% rename from examples/lanesintergratetest3.lua rename to examples/lanesintegratetest3.lua diff --git a/examples/lanesintergratetest4.lua b/examples/lanesintegratetest4.lua similarity index 100% rename from examples/lanesintergratetest4.lua rename to examples/lanesintegratetest4.lua diff --git a/examples/lanesintergratetest5.lua b/examples/lanesintegratetest5.lua similarity index 100% rename from examples/lanesintergratetest5.lua rename to examples/lanesintegratetest5.lua diff --git a/examples/lanesintergratetest6.lua b/examples/lanesintegratetest6.lua similarity index 100% rename from examples/lanesintergratetest6.lua rename to examples/lanesintegratetest6.lua diff --git a/examples/lanesintegratetest7.lua b/examples/lanesintegratetest7.lua new file mode 100644 index 0000000..7a9f2df --- /dev/null +++ b/examples/lanesintegratetest7.lua @@ -0,0 +1,26 @@ +package.path="?/init.lua;"..package.path +local GLOBAL,sThread=require("multi.integration.lanesManager").init() +jQueue=multi:newSystemThreadedJobQueue() -- this internally creates System threads, We told it to use a maximum of 3 cores at any given time +jQueue:registerJob("TEST_JOB",function(a,s) + math.randomseed(s) + -- We will push a random # + TEST_JOB2() -- You can call other registered functions as well! + return math.random(0,255) -- send the result to the main thread +end) +jQueue:registerJob("TEST_JOB2",function() + print("Test Works!") +end) +tableOfOrder={} +jQueue.OnJobCompleted(function(JOBID,n) + -- JOBID is the completed job, starts at 1 and counts up by 1. + -- Threads finish at different times so jobids may be returned out of order! Be sure to have a way to order them + tableOfOrder[JOBID]=n -- we order ours by putting them into a table + if #tableOfOrder==10 then + print("We got all of the pieces!") + end +end) +for i=1,10 do -- Job Name of registered function, ... varargs + jQueue:pushJob("TEST_JOB","This is a test!",math.random(1,1000000)) +end +print("I pushed all of the jobs :)") +multi:mainloop() diff --git a/examples/love2d Threading Example/main5.lua b/examples/love2d Threading Example/main5.lua new file mode 100644 index 0000000..3c50850 --- /dev/null +++ b/examples/love2d Threading Example/main5.lua @@ -0,0 +1,52 @@ +--~ require("core.Library") +--~ GLOBAL,sThread=require("multi.integration.loveManager").init() -- load the love2d version of the lanesManager and requires the entire multi library +--~ require("core.GuiManager") +--~ gui.ff.Color=Color.Black +--~ test=multi:newSystemThreadedTable("YO"):init() +--~ test["test1"]="lol" +--~ multi:newSystemThread("test",function() +--~ tab=sThread.waitFor("YO"):init() +--~ print(tab["test1"]) +--~ sThread.sleep(3) +--~ tab["test2"]="Whats so funny?" +--~ end) +--~ multi:newThread("test2",function() +--~ thread.sleep(1) +--~ print(test:waitFor("test2")) +--~ t.text="DONE!" +--~ end) +--~ t=gui:newTextLabel("no done yet!",0,0,300,100) +--~ t:centerX() +--~ t:centerY() + +require("core.Library") +GLOBAL,sThread=require("multi.integration.loveManager").init() -- load the love2d version of the lanesManager and requires the entire multi library +require("core.GuiManager") +gui.ff.Color=Color.Black +jQueue=multi:newSystemThreadedJobQueue() -- this internally creates System threads, We told it to use a maximum of 3 cores at any given time +jQueue:registerJob("TEST_JOB",function(a,s) + math.randomseed(s) + print("testing...") + -- We will push a random # + TEST_JOB2() -- You can call other registered functions as well! + return math.random(0,255) -- send the result to the main thread +end) +jQueue:registerJob("TEST_JOB2",function(a,s) + print("Test Works!") +end) +tableOfOrder={} +jQueue.OnJobCompleted(function(JOBID,n) + -- JOBID is the completed job, starts at 1 and counts up by 1. + -- Threads finish at different times so jobids may be returned out of order! Be sure to have a way to order them + tableOfOrder[JOBID]=n -- we order ours by putting them into a table + if #tableOfOrder==10 then + print("We got all of the pieces!") + end +end) +for i=1,10 do -- Job Name of registered function, ... varargs + jQueue:pushJob("TEST_JOB","This is a test!",math.random(1,1000000)) +end +print("I pushed all of the jobs :)") +t=gui:newTextLabel("no done yet!",0,0,300,100) +t:centerX() +t:centerY() diff --git a/examples/love2d Threading Example/multi/integration/shared/shared.lua b/examples/love2d Threading Example/multi/integration/shared/shared.lua index 023b38f..06bb8cf 100644 --- a/examples/love2d Threading Example/multi/integration/shared/shared.lua +++ b/examples/love2d Threading Example/multi/integration/shared/shared.lua @@ -29,10 +29,16 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann function c:init() -- create an init function so we can mimic on bith love2d and lanes self.chan=love.thread.getChannel(self.name) -- create channel by the name self.name function self:push(v) -- push to the channel - self.chan:push(v) + self.chan:push({type(v),resolveData(v)}) end function self:pop() -- pop from the channel - return self.chan:pop() + local tab=self.chan:pop() + if not tab then return end + return resolveType(tab[1],tab[2]) + end + function self:peek() + local tp,d=unpack{self.chan:peek()} + return resolveType(tp,d) end GLOBAL[self.name]=self -- send the object to the thread through the global interface return self -- return the object @@ -49,20 +55,23 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann function c:pop() -- pop the queue return ({self.linda:receive(0,"Q")})[2] end + function c:peek() + return self.linda:get("Q") + end function c:init() -- mimic the feature that love2d requires, so code can be consistent return self end - multi.intergration.GLOBAL[name]=c -- send the object to the thread through the global interface + multi.integration.GLOBAL[name]=c -- send the object to the thread through the global interface end return c end function multi:systemThreadedBenchmark(n,p) n=n or 1 - local cores=multi.intergration.THREAD.getCores() + local cores=multi.integration.THREAD.getCores() local queue=multi:newSystemThreadedQueue("QUEUE") - multi.intergration.GLOBAL["__SYSTEMBENCHMARK__"]=n - local sThread=multi.intergration.THREAD - local GLOBAL=multi.intergration.GLOBAL + multi.integration.GLOBAL["__SYSTEMBENCHMARK__"]=n + local sThread=multi.integration.THREAD + local GLOBAL=multi.integration.GLOBAL for i=1,cores do multi:newSystemThread("STHREAD_BENCH",function() require("multi") @@ -170,7 +179,130 @@ function multi:newSystemThreadedTable(name) }) return self end - multi.intergration.GLOBAL[name]=c -- send the object to the thread through the global interface + multi.integration.GLOBAL[name]=c -- send the object to the thread through the global interface end return c end +function multi:newSystemThreadedJobQueue(numOfCores) + local c={} + c.jobnum=1 + c.cores=numOfCores or multi.integration.THREAD.getCores() + c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init() + c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init() + c.REG=multi:newSystemThreadedTable("THREADED_JQ_F_REG"):init() + -- registerJob(name,func) + -- pushJob(...) + function c:registerJob(name,func) + self.REG[name]=func + end + function c:pushJob(name,...) + self.queueOUT:push({self.jobnum,name,...}) + self.jobnum=self.jobnum+1 + end + local GLOBAL=multi.integration.GLOBAL -- set up locals incase we are using lanes + local sThread=multi.integration.THREAD -- set up locals incase we are using lanes + GLOBAL["__JQ_COUNT__"]=c.cores + for i=1,c.cores do + multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function() + require("multi") + __sleep__=.001 + if love then -- lets make sure we don't reference upvalues if using love2d + GLOBAL=_G.GLOBAL + sThread=_G.sThread + __sleep__=.1 + end + JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it + JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it + FGLOBAL=sThread.waitFor("THREADED_JQ_F_REG"):init() -- Grab it + sThread.sleep(.1) -- lets wait for things to work out + setmetatable(_G,{ + __index=FGLOBAL + }) + GLOBAL["THREADED_JQ"]=nil -- remove it + GLOBAL["THREADED_JQO"]=nil -- remove it + GLOBAL["THREADED_JQ_F_REG"]=nil -- remove it + multi:newLoop(function() + sThread.sleep(__sleep__) -- lets allow cpu time for other processes on our system! + local job=JQI:pop() + if job then + local ID=table.remove(job,1) -- return and remove + local name=table.remove(job,1) -- return and remove + local ret={FGLOBAL:waitFor(name)(unpack(job))} -- unpack the rest + JQO:push({ID,ret}) + end + end) + multi:mainloop() + end) + end + c.OnJobCompleted=multi:newConnection() + c.updater=multi:newLoop(function(self) + local data=self.link.queueIN:pop() + while data do + if data then + self.link.OnJobCompleted:Fire(unpack(data)) + end + data=self.link.queueIN:pop() + end + end) + c.updater.link=c + return c +end +if love then + if love.thread then + function multi:newSystemThreadedJobQueue(numOfCores) + local c={} + c.jobnum=1 + c.cores=numOfCores or multi.integration.THREAD.getCores() + c.queueIN=multi:newSystemThreadedQueue("THREADED_JQ"):init() + c.queueOUT=multi:newSystemThreadedQueue("THREADED_JQO"):init() + function c:registerJob(name,func) + GLOBAL["__TJQ__"..name.."__"]=func + end + function c:pushJob(name,...) + self.queueOUT:push({self.jobnum,name,...}) + self.jobnum=self.jobnum+1 + end + local GLOBAL=multi.integration.GLOBAL -- set up locals incase we are using lanes + local sThread=multi.integration.THREAD -- set up locals incase we are using lanes + GLOBAL["__JQ_COUNT__"]=c.cores + for i=1,c.cores do + multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function() + GLOBAL=_G.GLOBAL + sThread=_G.sThread + local JQI=sThread.waitFor("THREADED_JQO"):init() -- Grab it + local JQO=sThread.waitFor("THREADED_JQ"):init() -- Grab it + sThread.sleep(.1) -- lets wait for things to work out + setmetatable(_G,{ + __index=function(t,k,v) + return GLOBAL["__TJQ__"..k.."__"] + end + }) + GLOBAL["THREADED_JQ"]=nil -- remove it + GLOBAL["THREADED_JQO"]=nil -- remove it + multi:newLoop(function() + sThread.sleep(.001) -- lets allow cpu time for other processes on our system! + local job=JQI:pop() + if job then + local ID=table.remove(job,1) -- return and remove + local name=table.remove(job,1) -- return and remove + local ret={sThread.waitFor("__TJQ__"..name.."__")(unpack(job))} -- unpack the rest + JQO:push({ID,ret}) + end + end) + end) + end + c.OnJobCompleted=multi:newConnection() + c.updater=multi:newLoop(function(self) + local data=self.link.queueIN:pop() + while data do + if data then + self.link.OnJobCompleted:Fire(unpack(data)) + end + data=self.link.queueIN:pop() + end + end) + c.updater.link=c + return c + end + end +end diff --git a/multi/integration/loveManager.lua b/multi/integration/loveManager.lua index e357732..0a144d9 100644 --- a/multi/integration/loveManager.lua +++ b/multi/integration/loveManager.lua @@ -29,6 +29,7 @@ setmetatable(GLOBAL,{ function __sync__() local data=__mythread__:pop() while data do + love.timer.sleep(.001) if type(data)=="string" then local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)") if name=="__DIEPLZ"..__THREADNAME__.."__" then @@ -153,23 +154,7 @@ function sThread.hold(n) repeat __sync__() until n() end updater=multi:newUpdater() -updater:OnUpdate(function(self) - local data=__mythread__:pop() - while data do - if type(data)=="string" then - local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)") - if name=="__DIEPLZ"..__THREADNAME__.."__" then - error("Thread: "..__THREADNAME__.." has been stopped!") - end - if cmd=="SYNC" then - __proxy__[name]=resolveType(tp,d) - end - else - __proxy__[name]=data - end - data=__mythread__:pop() - end -end) +updater:OnUpdate(__sync__) func=loadDump([=[INSERT_USER_CODE]=])() multi:mainloop() ]]