From 660c10ec3b3ba175794fe8d7a3d052b4bc618472 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 25 Jun 2023 21:46:37 -0400 Subject: [PATCH] Removed extra bloat, proxies are portable now! --- docs/changes.md | 125 +++++---- init.lua | 155 ++++++----- integration/lanesManager/extensions.lua | 16 +- integration/lanesManager/init.lua | 20 ++ integration/lanesManager/threads.lua | 4 +- integration/loveManager/extensions.lua | 6 +- integration/loveManager/init.lua | 7 + integration/loveManager/threads.lua | 4 +- integration/lovrManager/extensions.lua | 2 +- integration/lovrManager/init.lua | 7 + integration/lovrManager/threads.lua | 2 +- integration/pseudoManager/extensions.lua | 4 +- integration/sharedExtensions/init.lua | 315 +++++------------------ 13 files changed, 293 insertions(+), 374 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index f27f5fe..9bb4824 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -74,6 +74,7 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds Added --- +- thread.hold will now use a custom hold method for objects with a `Hold` method. This is called like `obj:Hold(opt)`. The only argument passed is the optional options table that thread.hold can pass. There is an exception for connection objects. While they do contain a Hold method, the Hold method isn't used and is there for proxy objects, though they can be used in non proxy/thread situations. Hold returns all the arguments that the connection object was fired with. - shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there. ```lua package.path = "?/init.lua;?.lua;"..package.path @@ -119,10 +120,6 @@ Added STJQ_cPXT8GOx We work! We work!!! ``` -- STP:getLoad(type) -- returns a table where the index is the threadID and the value is the number of objects[type] running on that thread. `type`: "threads" for coroutines running or nil for all other objects running. -- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn) -- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in -- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to - multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))` - multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread @@ -148,67 +145,105 @@ Added ``` Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread. - Connection proxies break the rules a bit. Normally methods should always work on the thread side, however for connections in order to have actions work on the thread side you would call the connection using `obj._connName` instead of calling `obj.connName`. This allows you to have more control over connection events. See example below: + Proxies can also be shared between threads, just remember to use proxy:getTransferable() before transferring and proxy:init() on the other end. (We need to avoid copying over coroutines) + + The work done with proxies negates the usage of multi:newSystemThreadedConnection(), the only difference is you lose the metatables from connections. + + You cannot connect directly to a proxy connection on the non proxy thread, you can however use proxy_conn:Hold() or thread.hold(proxy_conn) to emulate this, see below. + ```lua package.path = "?/init.lua;?.lua;"..package.path - multi, thread = require("multi"):init({print=true}) + multi, thread = require("multi"):init({print=true, warn=true, error=true}) THREAD, GLOBAL = require("multi.integration.lanesManager"):init() stp = multi:newSystemThreadedProcessor(8) - alarm = stp:newAlarm(3) + tloop = stp:newTLoop(nil, 1) - -- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! - thread:newThread(function() - print("Hold on proxied connection", thread.hold(alarm._OnRing)) - end) + multi:newSystemThread("Testing proxy copy",function(tloop) + local function tprint (tbl, indent) + if not indent then indent = 0 end + for k, v in pairs(tbl) do + formatting = string.rep(" ", indent) .. k .. ": " + if type(v) == "table" then + print(formatting) + tprint(v, indent+1) + else + print(formatting .. tostring(v)) + end + end + end + local multi, thread = require("multi"):init() + tloop = tloop:init() + print("tloop type:",tloop.Type) + print("Testing proxies on other threads") + thread:newThread(function() + while true do + thread.hold(tloop.OnLoop) + print(THREAD_NAME,"Loopy") + end + end) + tloop.OnLoop(function(a) + print(THREAD_NAME, "Got loop...") + end) + multi:mainloop() + end, tloop:getTransferable()).OnError(multi.error) - alarm.OnRing(function(a) - print("OnRing",a, THREAD_NAME, THREAD_ID) - end) - - print("alarm.OnRing", alarm.OnRing.Type) - print("alarm._OnRing", alarm._OnRing.Type) + print("tloop", tloop.Type) thread:newThread(function() - print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) + print("Holding...") + thread.hold(tloop.OnLoop) + print("Held on proxied no proxy connection 1") + end).OnError(print) + + thread:newThread(function() + tloop.OnLoop:Hold() + print("held on proxied no proxy connection 2") + end) + + tloop.OnLoop(function() + print("OnLoop",THREAD_NAME) end) thread:newThread(function() - print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) - end) - - -- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! - thread:newThread(function() - print("Hold on proxied connection", thread.hold(alarm._OnRing)) - end) - - alarm._OnRing(function(a) - print("_OnRing",a, THREAD_NAME, THREAD_ID) - a:Reset(1) - end) + while true do + tloop.OnLoop:Hold() + print("OnLoop",THREAD_NAME) + end + end).OnError(multi.error) multi:mainloop() ``` Output: ``` - INFO: Integrated Lanes Threading! - alarm.OnRing connector - alarm._OnRing proxy - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 <-- This can change each time you run this example! - OnRing table: 018BC0C0 MAIN_THREAD 0 - Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil - Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 - OnRing table: 018BC0C0 MAIN_THREAD 0 - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 - OnRing table: 018BC0C0 MAIN_THREAD 0 + INFO: Integrated Lanes Threading! 1 + tloop proxy + Holding... + tloop type: proxy + Testing proxies on other threads + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + OnLoop MAIN_THREAD + Testing proxy copy Loopy + Held on proxied no proxy connection 1 + held on proxied no proxy connection 2 + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + Testing proxy copy Loopy + OnLoop MAIN_THREAD + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... - ... (Will repeat ever second now) - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 + ... (Will repeat every second) - OnRing table: 018BC0C0 MAIN_THREAD 0 + Testing proxy copy Loopy + OnLoop MAIN_THREAD + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + + ... ``` The proxy version can only subscribe to events on the proxy thread, which means that connection metamethods will not work with the proxy version (`_OnRing` on the non proxy thread side), but the (`OnRing`) version will work. Cleverly handling the proxy thread and the non proxy thread will allow powerful connection logic. Also this is not a full system threaded connection. **Proxies should only be used between 2 threads!** To keep things fast I'm using simple queues to transfer data. There is no guarantee that things will work! @@ -222,6 +257,7 @@ Added - proxyStep = STP:newStep(...) - proxyTStep = STP:newTStep(...) - proxyThread = STP:newThread(...) + - proxyService = STP:newService(...) - threadedFunction = STP:newFunction(...) Unique: @@ -449,6 +485,7 @@ Removed Fixed --- +- Issue with luajit w/5.2 compat breaking with coroutine.running(), fixed the script to properly handle so thread.isThread() returns as expected! - Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. diff --git a/init.lua b/init.lua index d67cdb2..7f3d0ec 100644 --- a/init.lua +++ b/init.lua @@ -33,6 +33,7 @@ local threadManager local __CurrentConnectionThread multi.unpack = table.unpack or unpack +multi.pack = table.pack or function(...) return {...} end if table.unpack then unpack = table.unpack end @@ -81,6 +82,7 @@ multi.TSTEP = "tstep" multi.THREAD = "thread" multi.SERVICE = "service" multi.PROXY = "proxy" +multi.THREADEDFUNCTION = "threaded_function" if not _G["$multi"] then _G["$multi"] = {multi = multi, thread = thread} @@ -133,9 +135,7 @@ function multi.Stop() mainloopActive = false end -local function pack(...) - return {...} -end +local pack = multi.pack --Processor local priorityTable = {[false]="Disabled",[true]="Enabled"} @@ -470,6 +470,10 @@ function multi:newConnection(protect,func,kill) return temp end + c.Hold = thread:newFunction(function(self) + return thread.hold(self) + end, true) + c.connect=c.Connect c.GetConnection=c.getConnection c.HasConnections = c.hasConnections @@ -487,24 +491,6 @@ function multi:newConnection(protect,func,kill) return c end -multi.enableOptimization = multi:newConnection() -multi.optConn = multi:newConnection(true) -multi.optConn(function(msg) - table.insert(optimization_stats, msg) -end) - -function multi:getOptimizationConnection() - return multi.optConn -end - -function multi:getOptimizationStats() - return optimization_stats -end - -function multi:isFindingOptimizing() - return find_optimization -end - -- Used with ISO Threads local function isolateFunction(func, env) if setfenv then @@ -637,6 +623,7 @@ function multi:isDone() end function multi:create(ref) + ref.UID = "U"..multi.randomString(12) self.OnObjectCreated:Fire(ref, self) return self end @@ -686,10 +673,6 @@ function multi:newBase(ins) return c end -multi.OnObjectCreated=multi:newConnection() -multi.OnObjectDestroyed=multi:newConnection() -multi.OnLoad = multi:newConnection(nil,nil,true) -ignoreconn = false function multi:newTimer() local c={} c.Type=multi.TIMER @@ -740,7 +723,7 @@ function multi:newEvent(task, func) task=func return self end - c.OnEvent = self:newConnection():fastMode() + c.OnEvent = self:newConnection() if func then c.OnEvent(func) end @@ -767,7 +750,7 @@ function multi:newUpdater(skip, func) skip=n return self end - c.OnUpdate = self:newConnection():fastMode() + c.OnUpdate = self:newConnection() c:setName(c.Type) if func then c.OnUpdate(func) @@ -803,7 +786,7 @@ function multi:newAlarm(set, func) t = clock() return self end - c.OnRing = self:newConnection():fastMode() + c.OnRing = self:newConnection() function c:Pause() count = clock() self.Parent.Pause(self) @@ -833,7 +816,7 @@ function multi:newLoop(func, notime) end end - c.OnLoop = self:newConnection():fastMode() + c.OnLoop = self:newConnection() if func then c.OnLoop(func) @@ -881,9 +864,9 @@ function multi:newStep(start,reset,count,skip) return true end c.Reset=c.Resume - c.OnStart = self:newConnection():fastMode() - c.OnStep = self:newConnection():fastMode() - c.OnEnd = self:newConnection():fastMode() + c.OnStart = self:newConnection() + c.OnStep = self:newConnection() + c.OnEnd = self:newConnection() function c:Break() self.Active=nil return self @@ -904,40 +887,49 @@ function multi:newStep(start,reset,count,skip) return c end -function multi:newTLoop(func,set) +function multi:newTLoop(func, set) local c=self:newBase() c.Type=multi.TLOOP c.set=set or 0 c.timer=self:newTimer() c.life=0 c:setPriority("Low") + function c:Act() - if self.timer:Get()>=self.set then + if self.timer:Get() >= self.set then self.life=self.life+1 self.timer:Reset() - self.OnLoop:Fire(self,self.life) + self.OnLoop:Fire(self, self.life) return true end end + function c:Set(set) self.set = set end + function c:Resume() self.Parent.Resume(self) self.timer:Resume() return self end + function c:Pause() self.timer:Pause() self.Parent.Pause(self) return self end - c.OnLoop = self:newConnection():fastMode() + + c.OnLoop = self:newConnection() + if func then c.OnLoop(func) end + c:setName(c.Type) + self:create(c) + return c end @@ -1144,6 +1136,7 @@ function multi:newProcessor(name, nothread) end table.insert(processes,c) + self:create(c) return c end @@ -1209,7 +1202,7 @@ function multi:getTasks() end function thread.request(t,cmd,...) - thread.requests[t.thread] = {cmd,{...}} + thread.requests[t.thread] = {cmd, multi.pack(...)} end function thread.getRunningThread() @@ -1248,14 +1241,18 @@ local function conn_test(conn) local args local func = function(...) ready = true - args = {...} + args = multi.pack(...) end local ref = conn(func) return function() if ready then conn:Unconnect(ref) - return multi.unpack(args) or multi.NIL + if #args==0 then + return multi.NIL + else + return multi.unpack(args) + end end end end @@ -1267,7 +1264,7 @@ function thread.chain(...) end end -function thread.hold(n,opt) +function thread.hold(n, opt) thread._Requests() local opt = opt or {} if type(opt)=="table" then @@ -1286,8 +1283,10 @@ function thread.hold(n,opt) return yield(CMD, t_sleep, n or 0, nil, interval) elseif type(n) == "table" and n.Type == multi.CONNECTOR then return yield(CMD, t_hold, conn_test(n), nil, interval) + elseif type(n) == "table" and n.Hold ~= nil then + return n:Hold(opt) elseif type(n) == "function" then - return yield(CMD, t_hold, n or dFunc, nil, interval) + return yield(CMD, t_hold, n, nil, interval) else multi.error("Invalid argument passed to thread.hold(...)!") end @@ -1318,11 +1317,12 @@ function thread.yield() end function thread.isThread() - if _VERSION~="Lua 5.1" then - local a,b = running() + local a,b = running() + if b then + -- We are dealing with luajit compat or 5.2+ return not(b) else - return running()~=nil + return a~=nil end end @@ -1345,7 +1345,7 @@ function thread.waitFor(name) end local function cleanReturns(...) - local returns = {...} + local returns = multi.pack(...) local rets = {} local ind = 0 for i=#returns,1,-1 do @@ -1392,7 +1392,7 @@ function thread:newFunctionBase(generator, holdme) end) else while not rets and not err do - multi:getCurrentProcess():getHandler()() + multi:uManager() end local g = rets rets = nil @@ -1416,7 +1416,7 @@ function thread:newFunctionBase(generator, holdme) } end local t = generator(...) - t.OnDeath(function(...) rets = {...} end) + t.OnDeath(function(...) rets = multi.pack(...) end) t.OnError(function(self,e) err = e end) if holdme then return wait() @@ -1444,15 +1444,14 @@ function thread:newFunctionBase(generator, holdme) return temp end setmetatable(tfunc, tfunc) + tfunc.Type = multi.THREADEDFUNCTION return tfunc end end function thread:newFunction(func, holdme) return thread:newFunctionBase(function(...) - local th = thread:newThread("Free Threaded Function Handler", func, ...) - th.creator = debug.getinfo(2).name - return th + return thread:newThread("Free Threaded Function Handler", func, ...) end, holdme)() end @@ -1492,12 +1491,12 @@ function thread:newProcessor(name) function proc.Start() Active = true - return c + return proc end function proc.Stop() Active = false - return c + return proc end function proc:Destroy() @@ -1515,6 +1514,8 @@ function thread:newProcessor(name) end end) end) + + self:create(proc) return proc end @@ -1534,7 +1535,7 @@ function thread:newThread(name, func, ...) end local c={nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} - c.startArgs = {...} + c.startArgs = multi.pack(...) c.ref={} c.Name=name c.thread=create(func) @@ -1797,14 +1798,6 @@ function multi:createHandler() return coroutine.wrap(function() local temp_start while true do - -- for start = #startme, 1, -1 do - -- temp_start = startme[start] - -- table.remove(startme) - -- _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) - -- co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) - -- table.insert(threads, temp_start) - -- yield() - -- end while #startme>0 do temp_start = table.remove(startme) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) @@ -1929,7 +1922,7 @@ function multi:newService(func) -- Priority managed threads return c end - multi.create(multi,c) + self:create(c) return c end @@ -2338,7 +2331,7 @@ end function multi.print(...) if multi.defaultSettings.print then local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[94mINFO:\x1b[0m " .. table.concat(t," ") .. "\n") end end @@ -2346,7 +2339,7 @@ end function multi.warn(...) if multi.defaultSettings.warn then local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[93mWARNING:\x1b[0m " .. table.concat(t," ") .. "\n") end end @@ -2363,7 +2356,7 @@ end function multi.success(...) local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[92mSUCCESS:\x1b[0m " .. table.concat(t," ") .. "\n") end @@ -2384,6 +2377,29 @@ function os.exit(n) _os(n) end +multi.OnObjectCreated=multi:newConnection() +ignoreconn = false +multi.OnObjectDestroyed=multi:newConnection() +multi.OnLoad = multi:newConnection(nil,nil,true) + +multi.enableOptimization = multi:newConnection() +multi.optConn = multi:newConnection(true) +multi.optConn(function(msg) + table.insert(optimization_stats, msg) +end) + +function multi:getOptimizationConnection() + return multi.optConn +end + +function multi:getOptimizationStats() + return optimization_stats +end + +function multi:isFindingOptimizing() + return find_optimization +end + multi.OnError=multi:newConnection() multi.OnPreLoad = multi:newConnection() multi.OnExit = multi:newConnection(nil,nil,true) @@ -2403,11 +2419,12 @@ function multi:getHandler() end multi:newThread("Task Handler", function() - local check = function() - return table.remove(tasks) - end while true do - thread.hold(check)() + if #tasks > 0 then + table.remove(tasks)() + else + thread.yield() + end end end).OnError(multi.error) diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 94b98c1..f76378e 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -113,7 +113,7 @@ function multi:newSystemThreadedJobQueue(n) return self end function c:pushJob(name,...) - queueJob:push{name,jid,{...}} + queueJob:push{name,jid,multi.pack(...)} jid = jid + 1 return jid-1 end @@ -132,12 +132,16 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() if rets then - return multi.unpack(rets) or multi.NIL + if #rets == 0 then + return multi.NIL + else + return multi.unpack(rets) + end end end) end, holup), name @@ -171,7 +175,7 @@ function multi:newSystemThreadedJobQueue(n) local name = table.remove(dat, 1) local jid = table.remove(dat, 1) local args = table.remove(dat, 1) - queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} + queueReturn:push{jid, funcs[name](args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]), queue} end).OnError(multi.error) end end).OnError(multi.error) @@ -258,7 +262,7 @@ function multi:newSystemThreadedConnection(name) local function fire(...) for _, link in pairs(c.links) do - link:push {c.TRIG, {...}} + link:push {c.TRIG, multi.pack(...)} end end @@ -286,7 +290,7 @@ function multi:newSystemThreadedConnection(name) --- ^^^ This will only exist in the init thread function c:Fire(...) - local args = {...} + local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do link:push {self.TRIG, args} diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 147488a..8a8318c 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -98,6 +98,13 @@ function multi:newSystemThread(name, func, ...) globals = globe, priority = c.priority },function(...) + local profi + + if multi_settings.debug then + profi = require("proFI") + profi:start() + end + multi, thread = require("multi"):init(multi_settings) require("multi.integration.lanesManager.extensions") require("multi.integration.sharedExtensions") @@ -105,6 +112,12 @@ function multi:newSystemThread(name, func, ...) returns = {pcall(func, ...)} return_linda:set("returns", returns) has_error = false + if profi then + multi.OnExit(function(...) + profi:stop() + profi:writeReport("Profiling Report [".. THREAD_NAME .."].txt") + end) + end end)(...) count = count + 1 function c:getName() @@ -118,6 +131,13 @@ function multi:newSystemThread(name, func, ...) c.OnDeath = multi:newConnection() c.OnError = multi:newConnection() GLOBAL["__THREADS__"] = livingThreads + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end diff --git a/integration/lanesManager/threads.lua b/integration/lanesManager/threads.lua index d8a3838..3d05ebf 100644 --- a/integration/lanesManager/threads.lua +++ b/integration/lanesManager/threads.lua @@ -66,7 +66,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) local c = {} c.queue = __Console function c.print(...) - c.queue:send("Q", {...}) + c.queue:send("Q", multi.pack(...)) end function c.error(err) c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}) @@ -90,7 +90,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) end function THREAD.pushStatus(...) - local args = {...} + local args = multi.pack(...) __StatusLinda:send(nil,THREAD_ID, args) end diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 992a10b..738f18f 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() @@ -230,7 +230,7 @@ function multi:newSystemThreadedConnection(name) self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) function self:Fire(...) - local args = {...} + local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do love.thread.getChannel(link):push{self.TRIG, args} @@ -321,7 +321,7 @@ function multi:newSystemThreadedConnection(name) local function fire(...) for _, link in pairs(c.links) do - love.thread.getChannel(link):push {c.TRIG, {...}} + love.thread.getChannel(link):push {c.TRIG, multi.pack(...)} end end diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index 7c5289f..0c23b6b 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -103,6 +103,13 @@ function multi:newSystemThread(name, func, ...) c.stab.returns = nil end end) + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index 12f5471..777f811 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -98,7 +98,7 @@ end function threads.pushStatus(...) local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__) - local args = {...} + local args = multi.pack(...) status_channel:push(args) end @@ -209,7 +209,7 @@ function threads.getConsole() local c = {} c.queue = love.thread.getChannel("__CONSOLE__") function c.print(...) - c.queue:push{...} + c.queue:push(multi.pack(...)) end function c.error(err) c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} diff --git a/integration/lovrManager/extensions.lua b/integration/lovrManager/extensions.lua index a4f231a..ddc66c6 100644 --- a/integration/lovrManager/extensions.lua +++ b/integration/lovrManager/extensions.lua @@ -118,7 +118,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) link:Destroy() end end) diff --git a/integration/lovrManager/init.lua b/integration/lovrManager/init.lua index 6a64c98..bb86a8a 100644 --- a/integration/lovrManager/init.lua +++ b/integration/lovrManager/init.lua @@ -76,6 +76,13 @@ function multi:newSystemThread(name,func,...) GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} GLOBAL["__THREAD_COUNT"] = THREAD_ID THREAD_ID=THREAD_ID+1 + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end THREAD.newSystemThread = multi.newSystemThread diff --git a/integration/lovrManager/threads.lua b/integration/lovrManager/threads.lua index 12429c4..dc919ab 100644 --- a/integration/lovrManager/threads.lua +++ b/integration/lovrManager/threads.lua @@ -156,7 +156,7 @@ function threads.getConsole() local c = {} c.queue = lovr.thread.getChannel("__CONSOLE__") function c.print(...) - c.queue:push{...} + c.queue:push(multi.pack(...)) end function c.error(err) c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index bfdd982..7ad8d6c 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -96,7 +96,7 @@ function multi:newSystemThreadedJobQueue(n) end function c:pushJob(name,...) - table.insert(jobs,{name,jid,{...}}) + table.insert(jobs,{name,jid,multi.pack(...)}) jid = jid + 1 return jid-1 end @@ -121,7 +121,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index f5bb84b..ba5f00b 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -22,8 +22,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] -function copy(obj) - if type(obj) ~= 'table' then return obj end +local function copy(obj) + if type(obj) ~= 'table' then return obj end local res = {} for k, v in pairs(obj) do res[copy(k)] = copy(v) end return res @@ -48,34 +48,19 @@ local multi, thread = require("multi"):init() -- Create on the thread that you want to interact with, send over the handle function multi:chop(obj) + if not _G["UIDS"] then + _G["UIDS"] = {} + end local multi, thread = require("multi"):init() local list = {[0] = multi.randomString(12)} _G[list[0]] = obj for i,v in pairs(obj) do - if type(v) == "function" then + if type(v) == "function" or type(v) == "table" and v.Type == multi.THREADEDFUNCTION then table.insert(list, i) - elseif type(v) == "table" and v.Type == multi.CONNECTOR then - v.getThreadID = function() -- Special function we are adding - return THREAD_ID - end - - v.getUniqueName = function(self) - return self.__link_name - end - - local l = multi:chop(v) - v.__link_name = l[0] - v.__name = i - - table.insert(list, {i, multi:newProxy(l):init()}) + elseif type(v) == "table" and v.Type == multi.CONNECTOR then + table.insert(list, {i, multi:newProxy(multi:chop(v)):init()}) end end - table.insert(list, "isConnection") - if obj.Type == multi.CONNECTOR then - obj.isConnection = function() return true end - else - obj.isConnection = function() return false end - end return list end @@ -85,44 +70,63 @@ function multi:newProxy(list) c.name = multi.randomString(12) c.is_init = false - - function c:init(proc_name) + local multi, thread = nil, nil + function c:init() local multi, thread = nil, nil if not(c.is_init) then c.is_init = true local multi, thread = require("multi"):init() + c.proxy_link = "PL" .. multi.randomString(12) + + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + + GLOBAL[c.proxy_link] = c + local function check() return self.send:pop() end + self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.funcs = list self._funcs = copy(list) self.Type = multi.PROXY self.TID = THREAD_ID - thread:newThread(function() + + thread:newThread("Proxy_Handler_" .. multi.randomString(4), function() while true do local data = thread.hold(check) if data then - local func = table.remove(data, 1) - local sref = table.remove(data, 1) - local ret - if sref then - ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} - else - ret = {_G[list[0]][func](multi.unpack(data))} - end - for i = 1,#ret do - if type(ret[i]) == "table" and getmetatable(ret[i]) then - setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side! + -- Let's not hold the main threadloop + thread:newThread("Temp_Thread", function() + local func = table.remove(data, 1) + local sref = table.remove(data, 1) + local ret + + if sref then + ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} + else + ret = {_G[list[0]][func](multi.unpack(data))} end - if ret[i] == _G[list[0]] then - -- We cannot return itself, that return can contain bad values. - ret[i] = {_self_ref_ = true} + + for i = 1,#ret do + if type(ret[i]) == "table" and ret[i].Type ~= nil and ret[i].Type ~= multi.PROXY then + ret[i] = "\1PARENT_REF" + end + if type(ret[i]) == "table" and getmetatable(ret[i]) then + setmetatable(ret[i],nil) -- remove that metatable, we do not need it on the other side! + end + if ret[i] == _G[list[0]] then + -- We cannot return itself, that return can contain bad values. + ret[i] = "\1SELF_REF" + end end - end - table.insert(ret, 1, func) - self.recv:push(ret) + table.insert(ret, 1, func) + self.recv:push(ret) + end) end end end).OnError(multi.error) @@ -130,7 +134,7 @@ function multi:newProxy(list) else local multi, thread = require("multi"):init() local me = self - self.proc_name = proc_name + local funcs = copy(self.funcs) if multi.integration then GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD @@ -138,21 +142,13 @@ function multi:newProxy(list) self.send = THREAD.waitFor(self.name.."_S"):init() self.recv = THREAD.waitFor(self.name.."_R"):init() self.Type = multi.PROXY - for _,v in pairs(self.funcs) do + for _,v in pairs(funcs) do if type(v) == "table" then -- We have a connection v[2]:init(proc_name) - self["_"..v[1]] = v[2] + self[v[1]] = v[2] v[2].Parent = self setmetatable(v[2],getmetatable(multi:newConnection())) - self[v[1]] = multi:newConnection() - - thread:newThread(function() - while true do - local data = thread.hold(self["_"..v[1]]) - self[v[1]]:Fire(data) - end - end).OnError(multi.error) else self[v] = thread:newFunction(function(self,...) if self == me then @@ -166,8 +162,10 @@ function multi:newProxy(list) me.recv:pop() table.remove(data, 1) for i=1,#data do - if type(data[i]) == "table" and data[i]._self_ref_ then + if data[i] == "\1SELF_REF" then data[i] = me + elseif data[i] == "\1PARENT_REF" then + data[i] = me.Parent end end return multi.unpack(data) @@ -180,85 +178,31 @@ function multi:newProxy(list) end end function c:getTransferable() - local multi, thread = nil, nil local cp = {} + local multi, thread = require("multi"):init() + cp.is_init = true + cp.proxy_link = self.proxy_link cp.name = self.name cp.funcs = copy(self._funcs) - cp._funcs = copy(self._funcs) - cp.Type = self.Type - cp.init = self.init + cp.init = function(self) + local multi, thread = require("multi"):init() + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + local proxy = THREAD.waitFor(self.proxy_link) + proxy.funcs = self.funcs + return proxy:init() + end return cp end + self:create(c) return c end local targets = {} local references = {} -local nFunc = 0 -function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registers with the queue - if type(name)=="function" then - holup = func - func = name - name = "JQ_TFunc_"..nFunc - end - nFunc = nFunc + 1 - - multi:executeOnProcess(proxy.proc_name, function(proc, name, func) - proc.jobqueue:registerFunction(name, func) - end, name, func) - - return thread:newFunction(function(...) - return multi:executeOnProcess(proxy.proc_name, function(proc, name, ID, ...) - local multi, thread = require("multi"):init() - local id = proc:pushJob(ID, name, ...) - local rets - local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() - return thread.hold(function() - local data = tjq:peek() - if data and data[1] == id then - tjq:pop() - table.remove(data, 1) - return multi.unpack(data) or multi.NIL - end - end) - -- proc.jobqueue.OnJobCompleted(function(jid, ...) - -- if id==jid then - -- rets = {...} - -- print("Got!") - -- end - -- end) - -- return thread.hold(function() - -- if rets then - -- return multi.unpack(rets) or multi.NIL - -- end - -- end) - end, name, ID, ...) - end, holup), name -end - -multi.executeOnProcess = thread:newFunction(function(self, name, func, ...) - local queue = THREAD.get(name .. "_local_proc") - local queueR = THREAD.get(name .. "_local_return") - if queue and queueR then - local multi, thread = require("multi"):init() - local id = multi.randomString(8) - queue = queue:init() - queueR = queueR:init() - queue:push({func, id, ...}) - return thread.hold(function() - local data = queueR:peek() - if data and data[1] == id then - queueR:pop() - table.remove(data, 1) - return multi.unpack(data) or multi.NIL - end - end) - else - return nil, "Unable to find a process queue with name: '" .. name .. "'" - end -end, true) - local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -279,69 +223,16 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - c.local_cmd = multi:newSystemThreadedQueue(name .. "_local_proc"):init() - c.local_cmd_return = multi:newSystemThreadedQueue(name .. "_local_return"):init() - - c.jobqueue:registerFunction("STP_enable_targets",function(name) - local multi, thread = require("multi"):init() - local qname = name .. "_tq_" .. THREAD_ID - local rqname = name .. "_rtq_" .. THREAD_ID - - local tjq = multi:newSystemThreadedQueue(qname):init() - local trq = multi:newSystemThreadedQueue(rqname):init() - multi:newThread("TargetedJobHandler", function() - local th - while true do - local dat = thread.hold(function() - return tjq:pop() - end) - if dat then - th = thread:newThread("JQ-TargetThread",function() - local name = table.remove(dat, 1) - local jid = table.remove(dat, 1) - local func = _G[name] - local args = table.remove(dat, 1) - th.OnError(function(self,err) - -- We want to pass this to the other calling thread incase - trq:push{jid, err} - end) - trq:push{jid, func(multi.unpack(args))} - end) - end - end - end).OnError(multi.error) - end) - - c.jobqueue:registerFunction("STP_GetThreadCount",function() - return _G["__THREADS"] - end) - - c.jobqueue:registerFunction("STP_GetTaskCount",function() - return _G["__TASKS"] - end) function c:pushJob(ID, name, ...) local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() - tq:push{name, jid, {...}} + tq:push{name, jid, multi.pack(...)} jid = jid - 1 return jid + 1 end - c.jobqueue:doToAll(function(name) - STP_enable_targets(name) - _G["__THREADS"] = 0 - _G["__TASKS"] = 0 - end, name.."_target") - c.jobqueue:registerFunction("packObj",function(obj) local multi, thread = require("multi"):init() - obj.getThreadID = function() -- Special functions we are adding - return THREAD_ID - end - - obj.getUniqueName = function(self) - return self.__link_name - end local list = multi:chop(obj) obj.__link_name = list[0] @@ -354,14 +245,12 @@ function multi:newSystemThreadedProcessor(cores) c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) local multi, thread = require("multi"):init() local obj = thread:newThread(name, func, ...) - _G["__THREADS"] = _G["__THREADS"] + 1 return packObj(obj) end, true) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) local multi, thread = require("multi"):init() local obj = multi[obj](multi, func, ...) - _G["__TASKS"] = _G["__TASKS"] + 1 return packObj(obj) end, true) @@ -372,12 +261,13 @@ function multi:newSystemThreadedProcessor(cores) "newEvent", "newAlarm", "newStep", - "newTStep" + "newTStep", + "newService" } for _, method in pairs(implement) do c[method] = function(self, ...) - proxy = self.spawnTask(method, ...):init(self.Name) + proxy = self.spawnTask(method, ...):init() references[proxy] = self return proxy end @@ -467,68 +357,5 @@ function multi:newSystemThreadedProcessor(cores) return loads end, true) - local check = function() - return c.local_cmd:pop() - end - thread:newThread(function() - while true do - local data = thread.hold(check) - if data then - thread:newThread(function() - local func = table.remove(data, 1) - local id = table.remove(data, 1) - local ret = {id, func(c, multi.unpack(data))} - c.local_cmd_return:push(ret) - end).OnError(multi.error) - end - end - end).OnError(multi.error) return c end - --- Modify thread.hold to handle proxies -local thread_ref = thread.hold -function thread.hold(n, opt) - if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then - local ready = false - local args - local id = n.getThreadID() - local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) - local multi, thread = require("multi"):init() - local obj = _G[_name] - local rets = {thread.hold(obj)} - for i,v in pairs(rets) do - if v.Type then - rets[i] = {_self_ref_ = "parent"} - end - end - return multi.unpack(rets) - end) - - local conn - local args - handle = func(name) - conn = handle.OnReturn(function(...) - ready = true - args = {...} - handle.OnReturn:Unconnect(conn) - end) - - local ret = {thread_ref(function() - if ready then - return multi.unpack(args) or multi.NIL - end - end, opt)} - - for i,v in pairs(ret) do - if type(v) == "table" and v._self_ref_ == "parent" then - ret[i] = n.Parent - end - end - return multi.unpack(ret) - else - return thread_ref(n, opt) - end -end -