diff --git a/docs/changes.md b/docs/changes.md index 44a30ca..9bb4824 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -74,6 +74,7 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds Added --- +- thread.hold will now use a custom hold method for objects with a `Hold` method. This is called like `obj:Hold(opt)`. The only argument passed is the optional options table that thread.hold can pass. There is an exception for connection objects. While they do contain a Hold method, the Hold method isn't used and is there for proxy objects, though they can be used in non proxy/thread situations. Hold returns all the arguments that the connection object was fired with. - shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there. ```lua package.path = "?/init.lua;?.lua;"..package.path @@ -119,10 +120,6 @@ Added STJQ_cPXT8GOx We work! We work!!! ``` -- STP:getLoad(type) -- returns a table where the index is the threadID and the value is the number of objects[type] running on that thread. `type`: "threads" for coroutines running or nil for all other objects running. -- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn) -- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in -- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to - multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))` - multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread @@ -148,67 +145,105 @@ Added ``` Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread. - Connection proxies break the rules a bit. Normally methods should always work on the thread side, however for connections in order to have actions work on the thread side you would call the connection using `obj._connName` instead of calling `obj.connName`. This allows you to have more control over connection events. See example below: + Proxies can also be shared between threads, just remember to use proxy:getTransferable() before transferring and proxy:init() on the other end. (We need to avoid copying over coroutines) + + The work done with proxies negates the usage of multi:newSystemThreadedConnection(), the only difference is you lose the metatables from connections. + + You cannot connect directly to a proxy connection on the non proxy thread, you can however use proxy_conn:Hold() or thread.hold(proxy_conn) to emulate this, see below. + ```lua package.path = "?/init.lua;?.lua;"..package.path - multi, thread = require("multi"):init({print=true}) + multi, thread = require("multi"):init({print=true, warn=true, error=true}) THREAD, GLOBAL = require("multi.integration.lanesManager"):init() stp = multi:newSystemThreadedProcessor(8) - alarm = stp:newAlarm(3) + tloop = stp:newTLoop(nil, 1) - -- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! - thread:newThread(function() - print("Hold on proxied connection", thread.hold(alarm._OnRing)) - end) + multi:newSystemThread("Testing proxy copy",function(tloop) + local function tprint (tbl, indent) + if not indent then indent = 0 end + for k, v in pairs(tbl) do + formatting = string.rep(" ", indent) .. k .. ": " + if type(v) == "table" then + print(formatting) + tprint(v, indent+1) + else + print(formatting .. tostring(v)) + end + end + end + local multi, thread = require("multi"):init() + tloop = tloop:init() + print("tloop type:",tloop.Type) + print("Testing proxies on other threads") + thread:newThread(function() + while true do + thread.hold(tloop.OnLoop) + print(THREAD_NAME,"Loopy") + end + end) + tloop.OnLoop(function(a) + print(THREAD_NAME, "Got loop...") + end) + multi:mainloop() + end, tloop:getTransferable()).OnError(multi.error) - alarm.OnRing(function(a) - print("OnRing",a, THREAD_NAME, THREAD_ID) - end) - - print("alarm.OnRing", alarm.OnRing.Type) - print("alarm._OnRing", alarm._OnRing.Type) + print("tloop", tloop.Type) thread:newThread(function() - print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) + print("Holding...") + thread.hold(tloop.OnLoop) + print("Held on proxied no proxy connection 1") + end).OnError(print) + + thread:newThread(function() + tloop.OnLoop:Hold() + print("held on proxied no proxy connection 2") + end) + + tloop.OnLoop(function() + print("OnLoop",THREAD_NAME) end) thread:newThread(function() - print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) - end) - - -- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! - thread:newThread(function() - print("Hold on proxied connection", thread.hold(alarm._OnRing)) - end) - - alarm._OnRing(function(a) - print("_OnRing",a, THREAD_NAME, THREAD_ID) - a:Reset(1) - end) + while true do + tloop.OnLoop:Hold() + print("OnLoop",THREAD_NAME) + end + end).OnError(multi.error) multi:mainloop() ``` Output: ``` - INFO: Integrated Lanes Threading! - alarm.OnRing connector - alarm._OnRing proxy - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 <-- This can change each time you run this example! - OnRing table: 018BC0C0 MAIN_THREAD 0 - Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil - Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 - OnRing table: 018BC0C0 MAIN_THREAD 0 - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 - OnRing table: 018BC0C0 MAIN_THREAD 0 + INFO: Integrated Lanes Threading! 1 + tloop proxy + Holding... + tloop type: proxy + Testing proxies on other threads + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + OnLoop MAIN_THREAD + Testing proxy copy Loopy + Held on proxied no proxy connection 1 + held on proxied no proxy connection 2 + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + Testing proxy copy Loopy + OnLoop MAIN_THREAD + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... - ... (Will repeat ever second now) - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 + ... (Will repeat every second) - OnRing table: 018BC0C0 MAIN_THREAD 0 + Testing proxy copy Loopy + OnLoop MAIN_THREAD + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + + ... ``` The proxy version can only subscribe to events on the proxy thread, which means that connection metamethods will not work with the proxy version (`_OnRing` on the non proxy thread side), but the (`OnRing`) version will work. Cleverly handling the proxy thread and the non proxy thread will allow powerful connection logic. Also this is not a full system threaded connection. **Proxies should only be used between 2 threads!** To keep things fast I'm using simple queues to transfer data. There is no guarantee that things will work! @@ -222,6 +257,7 @@ Added - proxyStep = STP:newStep(...) - proxyTStep = STP:newTStep(...) - proxyThread = STP:newThread(...) + - proxyService = STP:newService(...) - threadedFunction = STP:newFunction(...) Unique: @@ -449,6 +485,8 @@ Removed Fixed --- +- Issue with luajit w/5.2 compat breaking with coroutine.running(), fixed the script to properly handle so thread.isThread() returns as expected! +- Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. - multi.isMainThread was not properly handled in each integration. This has been resolved. diff --git a/init.lua b/init.lua index 881f832..7f3d0ec 100644 --- a/init.lua +++ b/init.lua @@ -33,6 +33,7 @@ local threadManager local __CurrentConnectionThread multi.unpack = table.unpack or unpack +multi.pack = table.pack or function(...) return {...} end if table.unpack then unpack = table.unpack end @@ -81,6 +82,7 @@ multi.TSTEP = "tstep" multi.THREAD = "thread" multi.SERVICE = "service" multi.PROXY = "proxy" +multi.THREADEDFUNCTION = "threaded_function" if not _G["$multi"] then _G["$multi"] = {multi = multi, thread = thread} @@ -133,9 +135,7 @@ function multi.Stop() mainloopActive = false end -local function pack(...) - return {...} -end +local pack = multi.pack --Processor local priorityTable = {[false]="Disabled",[true]="Enabled"} @@ -352,7 +352,7 @@ function multi:newConnection(protect,func,kill) for i=1,#fast do local suc, err = pcall(fast[i], ...) if not suc then - print(err) + multi.error(err) end if kill then table.insert(kills,i) @@ -470,6 +470,10 @@ function multi:newConnection(protect,func,kill) return temp end + c.Hold = thread:newFunction(function(self) + return thread.hold(self) + end, true) + c.connect=c.Connect c.GetConnection=c.getConnection c.HasConnections = c.hasConnections @@ -487,24 +491,6 @@ function multi:newConnection(protect,func,kill) return c end -multi.enableOptimization = multi:newConnection() -multi.optConn = multi:newConnection(true) -multi.optConn(function(msg) - table.insert(optimization_stats, msg) -end) - -function multi:getOptimizationConnection() - return multi.optConn -end - -function multi:getOptimizationStats() - return optimization_stats -end - -function multi:isFindingOptimizing() - return find_optimization -end - -- Used with ISO Threads local function isolateFunction(func, env) if setfenv then @@ -516,6 +502,8 @@ local function isolateFunction(func, env) end end +multi.isolateFunction = isolateFunction + function multi:Break() self:Pause() self.Active=nil @@ -635,6 +623,7 @@ function multi:isDone() end function multi:create(ref) + ref.UID = "U"..multi.randomString(12) self.OnObjectCreated:Fire(ref, self) return self end @@ -684,10 +673,6 @@ function multi:newBase(ins) return c end -multi.OnObjectCreated=multi:newConnection() -multi.OnObjectDestroyed=multi:newConnection() -multi.OnLoad = multi:newConnection(nil,nil,true) -ignoreconn = false function multi:newTimer() local c={} c.Type=multi.TIMER @@ -738,7 +723,7 @@ function multi:newEvent(task, func) task=func return self end - c.OnEvent = self:newConnection():fastMode() + c.OnEvent = self:newConnection() if func then c.OnEvent(func) end @@ -765,7 +750,7 @@ function multi:newUpdater(skip, func) skip=n return self end - c.OnUpdate = self:newConnection():fastMode() + c.OnUpdate = self:newConnection() c:setName(c.Type) if func then c.OnUpdate(func) @@ -801,7 +786,7 @@ function multi:newAlarm(set, func) t = clock() return self end - c.OnRing = self:newConnection():fastMode() + c.OnRing = self:newConnection() function c:Pause() count = clock() self.Parent.Pause(self) @@ -831,7 +816,7 @@ function multi:newLoop(func, notime) end end - c.OnLoop = self:newConnection():fastMode() + c.OnLoop = self:newConnection() if func then c.OnLoop(func) @@ -879,9 +864,9 @@ function multi:newStep(start,reset,count,skip) return true end c.Reset=c.Resume - c.OnStart = self:newConnection():fastMode() - c.OnStep = self:newConnection():fastMode() - c.OnEnd = self:newConnection():fastMode() + c.OnStart = self:newConnection() + c.OnStep = self:newConnection() + c.OnEnd = self:newConnection() function c:Break() self.Active=nil return self @@ -902,40 +887,49 @@ function multi:newStep(start,reset,count,skip) return c end -function multi:newTLoop(func,set) +function multi:newTLoop(func, set) local c=self:newBase() c.Type=multi.TLOOP c.set=set or 0 c.timer=self:newTimer() c.life=0 c:setPriority("Low") + function c:Act() - if self.timer:Get()>=self.set then + if self.timer:Get() >= self.set then self.life=self.life+1 self.timer:Reset() - self.OnLoop:Fire(self,self.life) + self.OnLoop:Fire(self, self.life) return true end end + function c:Set(set) self.set = set end + function c:Resume() self.Parent.Resume(self) self.timer:Resume() return self end + function c:Pause() self.timer:Pause() self.Parent.Pause(self) return self end - c.OnLoop = self:newConnection():fastMode() + + c.OnLoop = self:newConnection() + if func then c.OnLoop(func) end + c:setName(c.Type) + self:create(c) + return c end @@ -1111,7 +1105,7 @@ function multi:newProcessor(name, nothread) function c:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return c:newThread("Threaded Function Handler", func, ...) + return c:newThread("Process Threaded Function Handler", func, ...) end, holdme)() end @@ -1142,6 +1136,7 @@ function multi:newProcessor(name, nothread) end table.insert(processes,c) + self:create(c) return c end @@ -1207,7 +1202,7 @@ function multi:getTasks() end function thread.request(t,cmd,...) - thread.requests[t.thread] = {cmd,{...}} + thread.requests[t.thread] = {cmd, multi.pack(...)} end function thread.getRunningThread() @@ -1246,14 +1241,18 @@ local function conn_test(conn) local args local func = function(...) ready = true - args = {...} + args = multi.pack(...) end local ref = conn(func) return function() if ready then conn:Unconnect(ref) - return multi.unpack(args) or multi.NIL + if #args==0 then + return multi.NIL + else + return multi.unpack(args) + end end end end @@ -1265,7 +1264,7 @@ function thread.chain(...) end end -function thread.hold(n,opt) +function thread.hold(n, opt) thread._Requests() local opt = opt or {} if type(opt)=="table" then @@ -1284,8 +1283,10 @@ function thread.hold(n,opt) return yield(CMD, t_sleep, n or 0, nil, interval) elseif type(n) == "table" and n.Type == multi.CONNECTOR then return yield(CMD, t_hold, conn_test(n), nil, interval) + elseif type(n) == "table" and n.Hold ~= nil then + return n:Hold(opt) elseif type(n) == "function" then - return yield(CMD, t_hold, n or dFunc, nil, interval) + return yield(CMD, t_hold, n, nil, interval) else multi.error("Invalid argument passed to thread.hold(...)!") end @@ -1316,11 +1317,12 @@ function thread.yield() end function thread.isThread() - if _VERSION~="Lua 5.1" then - local a,b = running() + local a,b = running() + if b then + -- We are dealing with luajit compat or 5.2+ return not(b) else - return running()~=nil + return a~=nil end end @@ -1343,7 +1345,7 @@ function thread.waitFor(name) end local function cleanReturns(...) - local returns = {...} + local returns = multi.pack(...) local rets = {} local ind = 0 for i=#returns,1,-1 do @@ -1390,8 +1392,7 @@ function thread:newFunctionBase(generator, holdme) end) else while not rets and not err do - multi:getCurrentProcess():getHandler()() - multi:getHandler()() + multi:uManager() end local g = rets rets = nil @@ -1415,7 +1416,7 @@ function thread:newFunctionBase(generator, holdme) } end local t = generator(...) - t.OnDeath(function(...) rets = {...} end) + t.OnDeath(function(...) rets = multi.pack(...) end) t.OnError(function(self,e) err = e end) if holdme then return wait() @@ -1443,13 +1444,14 @@ function thread:newFunctionBase(generator, holdme) return temp end setmetatable(tfunc, tfunc) + tfunc.Type = multi.THREADEDFUNCTION return tfunc end end function thread:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return thread:newThread("Threaded Function Handler", func, ...) + return thread:newThread("Free Threaded Function Handler", func, ...) end, holdme)() end @@ -1483,18 +1485,18 @@ function thread:newProcessor(name) function proc:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return thread_proc:newThread("Threaded Function Handler", func, ...) + return thread_proc:newThread("TProc Threaded Function Handler", func, ...) end, holdme)() end function proc.Start() Active = true - return c + return proc end function proc.Stop() Active = false - return c + return proc end function proc:Destroy() @@ -1512,6 +1514,8 @@ function thread:newProcessor(name) end end) end) + + self:create(proc) return proc end @@ -1525,13 +1529,13 @@ end function thread:newThread(name, func, ...) multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called - local func = func or name - if func == name then - name = name or multi.randomString(16) + if type(name) == "function" then + func = name + name = "UnnamedThread_"..multi.randomString(16) end local c={nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} - c.startArgs = {...} + c.startArgs = multi.pack(...) c.ref={} c.Name=name c.thread=create(func) @@ -1794,9 +1798,8 @@ function multi:createHandler() return coroutine.wrap(function() local temp_start while true do - for start = #startme, 1, -1 do - temp_start = startme[start] - table.remove(startme) + while #startme>0 do + temp_start = table.remove(startme) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) table.insert(threads, temp_start) @@ -1919,7 +1922,7 @@ function multi:newService(func) -- Priority managed threads return c end - multi.create(multi,c) + self:create(c) return c end @@ -2328,7 +2331,7 @@ end function multi.print(...) if multi.defaultSettings.print then local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[94mINFO:\x1b[0m " .. table.concat(t," ") .. "\n") end end @@ -2336,7 +2339,7 @@ end function multi.warn(...) if multi.defaultSettings.warn then local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[93mWARNING:\x1b[0m " .. table.concat(t," ") .. "\n") end end @@ -2344,8 +2347,8 @@ end function multi.error(self, err) if type(err) == "bool" then crash = err end if type(self) == "string" then err = self end - io.write("\x1b[91mERROR:\x1b[0m " .. err .. "\n") - error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type) + io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. debug.getinfo(2).name .."\n") + error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n") if multi.defaultSettings.error then os.exit(1) end @@ -2353,7 +2356,7 @@ end function multi.success(...) local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[92mSUCCESS:\x1b[0m " .. table.concat(t," ") .. "\n") end @@ -2374,6 +2377,29 @@ function os.exit(n) _os(n) end +multi.OnObjectCreated=multi:newConnection() +ignoreconn = false +multi.OnObjectDestroyed=multi:newConnection() +multi.OnLoad = multi:newConnection(nil,nil,true) + +multi.enableOptimization = multi:newConnection() +multi.optConn = multi:newConnection(true) +multi.optConn(function(msg) + table.insert(optimization_stats, msg) +end) + +function multi:getOptimizationConnection() + return multi.optConn +end + +function multi:getOptimizationStats() + return optimization_stats +end + +function multi:isFindingOptimizing() + return find_optimization +end + multi.OnError=multi:newConnection() multi.OnPreLoad = multi:newConnection() multi.OnExit = multi:newConnection(nil,nil,true) @@ -2393,11 +2419,12 @@ function multi:getHandler() end multi:newThread("Task Handler", function() - local check = function() - return table.remove(tasks) - end while true do - thread.hold(check)() + if #tasks > 0 then + table.remove(tasks)() + else + thread.yield() + end end end).OnError(multi.error) diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 6a98121..f76378e 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -113,7 +113,7 @@ function multi:newSystemThreadedJobQueue(n) return self end function c:pushJob(name,...) - queueJob:push{name,jid,{...}} + queueJob:push{name,jid,multi.pack(...)} jid = jid + 1 return jid-1 end @@ -132,12 +132,16 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() if rets then - return multi.unpack(rets) or multi.NIL + if #rets == 0 then + return multi.NIL + else + return multi.unpack(rets) + end end end) end, holup), name @@ -171,10 +175,10 @@ function multi:newSystemThreadedJobQueue(n) local name = table.remove(dat, 1) local jid = table.remove(dat, 1) local args = table.remove(dat, 1) - queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} + queueReturn:push{jid, funcs[name](args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]), queue} end).OnError(multi.error) end - end).OnError(print) + end).OnError(multi.error) thread:newThread("DoAllHandler",function() while true do local dat = thread.hold(function() @@ -190,7 +194,7 @@ function multi:newSystemThreadedJobQueue(n) end end end - end).OnError(print) + end).OnError(multi.error) thread:newThread("IdleHandler",function() while true do thread.hold(function() @@ -198,9 +202,9 @@ function multi:newSystemThreadedJobQueue(n) end) THREAD.sleep(.01) end - end).OnError(print) + end).OnError(multi.error) multi:mainloop() - end,i).OnError(print) + end,i).OnError(multi.error) end return c end @@ -258,7 +262,7 @@ function multi:newSystemThreadedConnection(name) local function fire(...) for _, link in pairs(c.links) do - link:push {c.TRIG, {...}} + link:push {c.TRIG, multi.pack(...)} end end @@ -286,7 +290,7 @@ function multi:newSystemThreadedConnection(name) --- ^^^ This will only exist in the init thread function c:Fire(...) - local args = {...} + local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do link:push {self.TRIG, args} @@ -344,4 +348,5 @@ function multi:newSystemThreadedConnection(name) end return c -end \ No newline at end of file +end +require("multi.integration.sharedExtensions") \ No newline at end of file diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 547c7c6..8a8318c 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -98,6 +98,13 @@ function multi:newSystemThread(name, func, ...) globals = globe, priority = c.priority },function(...) + local profi + + if multi_settings.debug then + profi = require("proFI") + profi:start() + end + multi, thread = require("multi"):init(multi_settings) require("multi.integration.lanesManager.extensions") require("multi.integration.sharedExtensions") @@ -105,6 +112,12 @@ function multi:newSystemThread(name, func, ...) returns = {pcall(func, ...)} return_linda:set("returns", returns) has_error = false + if profi then + multi.OnExit(function(...) + profi:stop() + profi:writeReport("Profiling Report [".. THREAD_NAME .."].txt") + end) + end end)(...) count = count + 1 function c:getName() @@ -118,6 +131,13 @@ function multi:newSystemThread(name, func, ...) c.OnDeath = multi:newConnection() c.OnError = multi:newConnection() GLOBAL["__THREADS__"] = livingThreads + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end @@ -177,7 +197,7 @@ function multi.InitSystemThreadErrorHandler() end end end - end).OnError(print) + end).OnError(multi.error) end multi.print("Integrated Lanes Threading!") @@ -185,7 +205,6 @@ multi.integration = {} -- for module creators multi.integration.GLOBAL = GLOBAL multi.integration.THREAD = THREAD require("multi.integration.lanesManager.extensions") -require("multi.integration.sharedExtensions") return { init = function() return GLOBAL, THREAD diff --git a/integration/lanesManager/threads.lua b/integration/lanesManager/threads.lua index d8a3838..3d05ebf 100644 --- a/integration/lanesManager/threads.lua +++ b/integration/lanesManager/threads.lua @@ -66,7 +66,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) local c = {} c.queue = __Console function c.print(...) - c.queue:send("Q", {...}) + c.queue:send("Q", multi.pack(...)) end function c.error(err) c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}) @@ -90,7 +90,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) end function THREAD.pushStatus(...) - local args = {...} + local args = multi.pack(...) __StatusLinda:send(nil,THREAD_ID, args) end diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 8740018..738f18f 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() @@ -230,7 +230,7 @@ function multi:newSystemThreadedConnection(name) self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) function self:Fire(...) - local args = {...} + local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do love.thread.getChannel(link):push{self.TRIG, args} @@ -271,7 +271,7 @@ function multi:newSystemThreadedConnection(name) -- This shouldn't be the case end end - end).OnError(print) + end).OnError(multi.error) return self end @@ -321,7 +321,7 @@ function multi:newSystemThreadedConnection(name) local function fire(...) for _, link in pairs(c.links) do - love.thread.getChannel(link):push {c.TRIG, {...}} + love.thread.getChannel(link):push {c.TRIG, multi.pack(...)} end end @@ -346,7 +346,7 @@ function multi:newSystemThreadedConnection(name) c.proxy_conn:Fire(multi.unpack(item[2])) end end - end).OnError(print) + end).OnError(multi.error) --- ^^^ This will only exist in the init thread THREAD.package(name,c) diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index 7c5289f..0c23b6b 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -103,6 +103,13 @@ function multi:newSystemThread(name, func, ...) c.stab.returns = nil end end) + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index 12f5471..777f811 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -98,7 +98,7 @@ end function threads.pushStatus(...) local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__) - local args = {...} + local args = multi.pack(...) status_channel:push(args) end @@ -209,7 +209,7 @@ function threads.getConsole() local c = {} c.queue = love.thread.getChannel("__CONSOLE__") function c.print(...) - c.queue:push{...} + c.queue:push(multi.pack(...)) end function c.error(err) c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} diff --git a/integration/lovrManager/extensions.lua b/integration/lovrManager/extensions.lua index a4f231a..ddc66c6 100644 --- a/integration/lovrManager/extensions.lua +++ b/integration/lovrManager/extensions.lua @@ -118,7 +118,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) link:Destroy() end end) diff --git a/integration/lovrManager/init.lua b/integration/lovrManager/init.lua index 6a64c98..bb86a8a 100644 --- a/integration/lovrManager/init.lua +++ b/integration/lovrManager/init.lua @@ -76,6 +76,13 @@ function multi:newSystemThread(name,func,...) GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} GLOBAL["__THREAD_COUNT"] = THREAD_ID THREAD_ID=THREAD_ID+1 + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end THREAD.newSystemThread = multi.newSystemThread diff --git a/integration/lovrManager/threads.lua b/integration/lovrManager/threads.lua index 12429c4..dc919ab 100644 --- a/integration/lovrManager/threads.lua +++ b/integration/lovrManager/threads.lua @@ -156,7 +156,7 @@ function threads.getConsole() local c = {} c.queue = lovr.thread.getChannel("__CONSOLE__") function c.print(...) - c.queue:push{...} + c.queue:push(multi.pack(...)) end function c.error(err) c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index bfdd982..7ad8d6c 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -96,7 +96,7 @@ function multi:newSystemThreadedJobQueue(n) end function c:pushJob(name,...) - table.insert(jobs,{name,jid,{...}}) + table.insert(jobs,{name,jid,multi.pack(...)}) jid = jid + 1 return jid-1 end @@ -121,7 +121,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 33ab91c..ba5f00b 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -1,4 +1,4 @@ ---[[ +--[[ todo finish the targeted job! MIT License Copyright (c) 2023 Ryan Ward @@ -22,40 +22,45 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] +local function copy(obj) + if type(obj) ~= 'table' then return obj end + local res = {} + for k, v in pairs(obj) do res[copy(k)] = copy(v) end + return res +end + +function tprint (tbl, indent) + if not indent then indent = 0 end + for k, v in pairs(tbl) do + formatting = string.rep(" ", indent) .. k .. ": " + if type(v) == "table" then + print(formatting) + tprint(v, indent+1) + else + print(formatting .. tostring(v)) + end + end + end + local multi, thread = require("multi"):init() -- Returns a handler that allows a user to interact with an object on another thread! -- Create on the thread that you want to interact with, send over the handle function multi:chop(obj) + if not _G["UIDS"] then + _G["UIDS"] = {} + end local multi, thread = require("multi"):init() local list = {[0] = multi.randomString(12)} _G[list[0]] = obj for i,v in pairs(obj) do - if type(v) == "function" then + if type(v) == "function" or type(v) == "table" and v.Type == multi.THREADEDFUNCTION then table.insert(list, i) - elseif type(v) == "table" and v.Type == multi.CONNECTOR then - v.getThreadID = function() -- Special function we are adding - return THREAD_ID - end - - v.getUniqueName = function(self) - return self.__link_name - end - - local l = multi:chop(v) - v.__link_name = l[0] - v.__name = i - - table.insert(list, {i, multi:newProxy(l):init()}) + elseif type(v) == "table" and v.Type == multi.CONNECTOR then + table.insert(list, {i, multi:newProxy(multi:chop(v)):init()}) end end - table.insert(list, "isConnection") - if obj.Type == multi.CONNECTOR then - obj.isConnection = function() return true end - else - obj.isConnection = function() return false end - end return list end @@ -64,67 +69,86 @@ function multi:newProxy(list) local c = {} c.name = multi.randomString(12) - + c.is_init = false + local multi, thread = nil, nil function c:init() local multi, thread = nil, nil - if THREAD_ID>0 then + if not(c.is_init) then + c.is_init = true local multi, thread = require("multi"):init() + c.proxy_link = "PL" .. multi.randomString(12) + + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + + GLOBAL[c.proxy_link] = c + local function check() return self.send:pop() end + self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.funcs = list - self.conns = list[-1] - thread:newThread(function() + self._funcs = copy(list) + self.Type = multi.PROXY + self.TID = THREAD_ID + + thread:newThread("Proxy_Handler_" .. multi.randomString(4), function() while true do local data = thread.hold(check) if data then - local func = table.remove(data, 1) - local sref = table.remove(data, 1) - local ret - if sref then - ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} - else - ret = {_G[list[0]][func](multi.unpack(data))} - end - for i = 1,#ret do - if type(ret[i]) == "table" and getmetatable(ret[i]) then - setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side! + -- Let's not hold the main threadloop + thread:newThread("Temp_Thread", function() + local func = table.remove(data, 1) + local sref = table.remove(data, 1) + local ret + + if sref then + ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} + else + ret = {_G[list[0]][func](multi.unpack(data))} end - if ret[i] == _G[list[0]] then - -- We cannot return itself, that return can contain bad values. - ret[i] = {_self_ref_ = true} + + for i = 1,#ret do + if type(ret[i]) == "table" and ret[i].Type ~= nil and ret[i].Type ~= multi.PROXY then + ret[i] = "\1PARENT_REF" + end + if type(ret[i]) == "table" and getmetatable(ret[i]) then + setmetatable(ret[i],nil) -- remove that metatable, we do not need it on the other side! + end + if ret[i] == _G[list[0]] then + -- We cannot return itself, that return can contain bad values. + ret[i] = "\1SELF_REF" + end end - end - table.insert(ret, 1, func) - self.recv:push(ret) + table.insert(ret, 1, func) + self.recv:push(ret) + end) end end - end).OnError(print) + end).OnError(multi.error) return self else local multi, thread = require("multi"):init() local me = self - GLOBAL = multi.integration.GLOBAL - THREAD = multi.integration.THREAD - self.send = THREAD.waitFor(self.name.."_S") - self.recv = THREAD.waitFor(self.name.."_R") + local funcs = copy(self.funcs) + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + self.send = THREAD.waitFor(self.name.."_S"):init() + self.recv = THREAD.waitFor(self.name.."_R"):init() self.Type = multi.PROXY - for _,v in pairs(self.funcs) do + for _,v in pairs(funcs) do if type(v) == "table" then -- We have a connection - v[2]:init() - self["_"..v[1]] = v[2] + v[2]:init(proc_name) + self[v[1]] = v[2] v[2].Parent = self setmetatable(v[2],getmetatable(multi:newConnection())) - self[v[1]] = multi:newConnection() - - thread:newThread(function() - while true do - self[v[1]]:Fire(thread.hold(alarm["_"..v[1]])) - end - end) else self[v] = thread:newFunction(function(self,...) if self == me then @@ -138,8 +162,10 @@ function multi:newProxy(list) me.recv:pop() table.remove(data, 1) for i=1,#data do - if type(data[i]) == "table" and data[i]._self_ref_ then + if data[i] == "\1SELF_REF" then data[i] = me + elseif data[i] == "\1PARENT_REF" then + data[i] = me.Parent end end return multi.unpack(data) @@ -151,36 +177,31 @@ function multi:newProxy(list) return self end end + function c:getTransferable() + local cp = {} + local multi, thread = require("multi"):init() + cp.is_init = true + cp.proxy_link = self.proxy_link + cp.name = self.name + cp.funcs = copy(self._funcs) + cp.init = function(self) + local multi, thread = require("multi"):init() + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + local proxy = THREAD.waitFor(self.proxy_link) + proxy.funcs = self.funcs + return proxy:init() + end + return cp + end + self:create(c) return c end local targets = {} - -local nFunc = 0 -function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue - if type(name)=="function" then - holup = func - func = name - name = "JQ_TFunc_"..nFunc - end - nFunc = nFunc + 1 - proc.jobqueue:registerFunction(name, func) - return thread:newFunction(function(...) - local id = proc:pushJob(ID, name, ...) - local link - local rets - link = proc.jobqueue.OnJobCompleted(function(jid,...) - if id==jid then - rets = {...} - end - end) - return thread.hold(function() - if rets then - return multi.unpack(rets) or multi.NIL - end - end) - end, holup), name -end +local references = {} local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -202,71 +223,16 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init() - - c.jobqueue:registerFunction("STP_enable_targets",function(name) - local multi, thread = require("multi"):init() - local qname = THREAD_NAME .. "_t_queue" - local targetedQueue = THREAD.waitFor(name):init() - local tjq = multi:newSystemThreadedQueue(qname):init() - targetedQueue:push({tonumber(THREAD_ID), qname}) - multi:newThread("TargetedJobHandler", function() - local queueReturn = _G["__QR"] - while true do - local dat = thread.hold(function() - return tjq:pop() - end) - if dat then - thread:newThread("JQ-TargetThread",function() - local name = table.remove(dat, 1) - local jid = table.remove(dat, 1) - local args = table.remove(dat, 1) - queueReturn:push{jid, _G[name](multi.unpack(args)), queue} - end).OnError(multi.error) - end - end - end).OnError(multi.error) - end) - - c.jobqueue:registerFunction("STP_GetThreadCount",function() - return _G["__THREADS"] - end) - - c.jobqueue:registerFunction("STP_GetTaskCount",function() - return _G["__TASKS"] - end) function c:pushJob(ID, name, ...) - targets[ID]:push{name, jid, {...}} + local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() + tq:push{name, jid, multi.pack(...)} jid = jid - 1 return jid + 1 end - c.jobqueue:doToAll(function(name) - STP_enable_targets(name) - _G["__THREADS"] = 0 - _G["__TASKS"] = 0 - end, name.."_target") - - local count = 0 - while count < c.cores do - local dat = c.targetedQueue:pop() - if dat then - targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init() - table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list - count = count + 1 - end - end - c.jobqueue:registerFunction("packObj",function(obj) local multi, thread = require("multi"):init() - obj.getThreadID = function() -- Special functions we are adding - return THREAD_ID - end - - obj.getUniqueName = function(self) - return self.__link_name - end local list = multi:chop(obj) obj.__link_name = list[0] @@ -279,57 +245,52 @@ function multi:newSystemThreadedProcessor(cores) c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) local multi, thread = require("multi"):init() local obj = thread:newThread(name, func, ...) - _G["__THREADS"] = _G["__THREADS"] + 1 return packObj(obj) end, true) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) local multi, thread = require("multi"):init() local obj = multi[obj](multi, func, ...) - _G["__TASKS"] = _G["__TASKS"] + 1 return packObj(obj) end, true) - function c:newLoop(func, notime) - proxy = self.spawnTask("newLoop", func, notime):init() - proxy.__proc = self + local implement = { + "newLoop", + "newTLoop", + "newUpdater", + "newEvent", + "newAlarm", + "newStep", + "newTStep", + "newService" + } + + for _, method in pairs(implement) do + c[method] = function(self, ...) + proxy = self.spawnTask(method, ...):init() + references[proxy] = self + return proxy + end + end + + function c:newThread(name, func, ...) + proxy = self.spawnThread(name, func, ...):init(self.Name) + references[proxy] = self + table.insert(self.threads, proxy) return proxy end - function c:newTLoop(func, time) - proxy = self.spawnTask("newTLoop", func, time):init() - proxy.__proc = self - return proxy + function c:newFunction(func, holdme) + return c.jobqueue:newFunction(func, holdme) end - function c:newUpdater(skip, func) - proxy = self.spawnTask("newUpdater", func, notime):init() - proxy.__proc = self - return proxy - end - - function c:newEvent(task, func) - proxy = self.spawnTask("newEvent", task, func):init() - proxy.__proc = self - return proxy - end - - function c:newAlarm(set, func) - proxy = self.spawnTask("newAlarm", set, func):init() - proxy.__proc = self - return proxy - end - - function c:newStep(start, reset, count, skip) - proxy = self.spawnTask("newStep", start, reset, count, skip):init() - proxy.__proc = self - return proxy - end - - function c:newTStep(start ,reset, count, set) - proxy = self.spawnTask("newTStep", start, reset, count, set):init() - proxy.__proc = self - return proxy + function c:newSharedTable(name) + if not name then multi.error("You must provide a name when creating a table!") end + local tbl_name = "TABLE_"..multi.randomString(8) + c.jobqueue:doToAll(function(tbl_name, interaction) + _G[interaction] = THREAD.waitFor(tbl_name):init() + end, tbl_name, name) + return multi:newSystemThreadedTable(tbl_name):init() end function c:getHandler() @@ -348,26 +309,6 @@ function multi:newSystemThreadedProcessor(cores) return self.Name end - function c:newThread(name, func, ...) - proxy = self.spawnThread(name, func, ...):init() - proxy.__proc = self - table.insert(self.threads, proxy) - return proxy - end - - function c:newFunction(func, holdme) - return c.jobqueue:newFunction(func, holdme) - end - - function c:newSharedTable(name) - if not name then multi.error("You must provide a name when creating a table!") end - local tbl_name = "TABLE_"..multi.randomString(8) - c.jobqueue:doToAll(function(tbl_name, interaction) - _G[interaction] = THREAD.waitFor(tbl_name):init() - end, tbl_name, name) - return multi:newSystemThreadedTable(tbl_name):init() - end - function c.run() return self end @@ -418,51 +359,3 @@ function multi:newSystemThreadedProcessor(cores) return c end - --- Modify thread.hold to handle proxies -local thread_ref = thread.hold -function thread.hold(n, opt) - --if type(n) == "table" then print(n.Type, n.isConnection()) end - if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then - local ready = false - local args - local id = n.getThreadID() - local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name) - local multi, thread = require("multi"):init() - local obj = _G[_name] - local rets = {thread.hold(obj)} - for i,v in pairs(rets) do - if v.Type then - rets[i] = {_self_ref_ = "parent"} - end - end - return unpack(rets) - end) - - local conn - local handle = func(name) - conn = handle.OnReturn(function(...) - ready = true - args = {...} - handle.OnReturn:Unconnect(conn) - end) - - local ret = {thread_ref(function() - if ready then - return multi.unpack(args) or multi.NIL - end - end, opt)} - - for i,v in pairs(ret) do - if type(v) == "table" and v._self_ref_ == "parent" then - ret[i] = n.Parent - end - end - - return unpack(ret) - else - return thread_ref(n, opt) - end -end -