From 6fe10b22ab995edacd7e5d3adae3430374d4c128 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 17 Jun 2023 21:33:34 -0400 Subject: [PATCH 1/5] Testing --- init.lua | 2 + integration/lanesManager/extensions.lua | 3 +- integration/lanesManager/init.lua | 1 - integration/sharedExtensions/init.lua | 190 +++++++++++++----------- 4 files changed, 107 insertions(+), 89 deletions(-) diff --git a/init.lua b/init.lua index 881f832..e434b3c 100644 --- a/init.lua +++ b/init.lua @@ -516,6 +516,8 @@ local function isolateFunction(func, env) end end +multi.isolateFunction = isolateFunction + function multi:Break() self:Pause() self.Active=nil diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 6a98121..073fe6d 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -344,4 +344,5 @@ function multi:newSystemThreadedConnection(name) end return c -end \ No newline at end of file +end +require("multi.integration.sharedExtensions") \ No newline at end of file diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 547c7c6..e22431e 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -185,7 +185,6 @@ multi.integration = {} -- for module creators multi.integration.GLOBAL = GLOBAL multi.integration.THREAD = THREAD require("multi.integration.lanesManager.extensions") -require("multi.integration.sharedExtensions") return { init = function() return GLOBAL, THREAD diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 33ab91c..8fc99a3 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -1,4 +1,4 @@ ---[[ +--[[ todo finish the targeted job! MIT License Copyright (c) 2023 Ryan Ward @@ -22,6 +22,26 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] +function copy(obj) + if type(obj) ~= 'table' then return obj end + local res = {} + for k, v in pairs(obj) do res[copy(k)] = copy(v) end + return res +end + +function tprint (tbl, indent) + if not indent then indent = 0 end + for k, v in pairs(tbl) do + formatting = string.rep(" ", indent) .. k .. ": " + if type(v) == "table" then + print(formatting) + tprint(v, indent+1) + else + print(formatting .. tostring(v)) + end + end + end + local multi, thread = require("multi"):init() -- Returns a handler that allows a user to interact with an object on another thread! @@ -64,10 +84,12 @@ function multi:newProxy(list) local c = {} c.name = multi.randomString(12) + c.is_init = false - function c:init() + function c:init(proc_name) local multi, thread = nil, nil - if THREAD_ID>0 then + if not(c.is_init) then + c.is_init = true local multi, thread = require("multi"):init() local function check() return self.send:pop() @@ -75,7 +97,9 @@ function multi:newProxy(list) self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.funcs = list - self.conns = list[-1] + self._funcs = copy(list) + self.Type = multi.PROXY + self.TID = THREAD_ID thread:newThread(function() while true do local data = thread.hold(check) @@ -104,25 +128,31 @@ function multi:newProxy(list) end).OnError(print) return self else + print("INIT IN",THREAD_NAME) local multi, thread = require("multi"):init() local me = self - GLOBAL = multi.integration.GLOBAL - THREAD = multi.integration.THREAD - self.send = THREAD.waitFor(self.name.."_S") - self.recv = THREAD.waitFor(self.name.."_R") + self.proc_name = proc_name + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + self.send = THREAD.waitFor(self.name.."_S"):init() + self.recv = THREAD.waitFor(self.name.."_R"):init() self.Type = multi.PROXY for _,v in pairs(self.funcs) do if type(v) == "table" then -- We have a connection - v[2]:init() + print("Init Conn",v[1],THREAD_NAME) + v[2]:init(proc_name) self["_"..v[1]] = v[2] v[2].Parent = self setmetatable(v[2],getmetatable(multi:newConnection())) self[v[1]] = multi:newConnection() thread:newThread(function() + print("HOLD:","_"..v[1],self["_"..v[1]].Type) while true do - self[v[1]]:Fire(thread.hold(alarm["_"..v[1]])) + self[v[1]]:Fire(thread.hold(self["_"..v[1]])) end end) else @@ -151,13 +181,24 @@ function multi:newProxy(list) return self end end + function c:getTransferable() + local multi, thread = require("multi"):init() + local cp = {} + cp.name = self.name + cp.funcs = copy(self._funcs) + cp._funcs = copy(self._funcs) + cp.Type = self.Type + cp.init = self.init + return cp + end return c end local targets = {} +local references = {} local nFunc = 0 -function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue +function multi:newTargetedFunction(ID, name, func, holup) -- This registers with the queue if type(name)=="function" then holup = func func = name @@ -181,6 +222,12 @@ function multi:newTargetedFunction(ID, proc, name, func, holup) -- This register end) end, holup), name end +-- local qname = name .. "_tq_" .. THREAD_ID +-- local rqname = name .. "_rtq_" .. THREAD_ID + +local function getQueue(name) + return THREAD.waitFor(name):init() +end local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -202,27 +249,30 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init() c.jobqueue:registerFunction("STP_enable_targets",function(name) local multi, thread = require("multi"):init() - local qname = THREAD_NAME .. "_t_queue" - local targetedQueue = THREAD.waitFor(name):init() + local qname = name .. "_tq_" .. THREAD_ID + local rqname = name .. "_rtq_" .. THREAD_ID local tjq = multi:newSystemThreadedQueue(qname):init() - targetedQueue:push({tonumber(THREAD_ID), qname}) multi:newThread("TargetedJobHandler", function() - local queueReturn = _G["__QR"] + local th while true do local dat = thread.hold(function() return tjq:pop() end) if dat then - thread:newThread("JQ-TargetThread",function() + th = thread:newThread("JQ-TargetThread",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) + local func = table.remove(dat, 1) local args = table.remove(dat, 1) - queueReturn:push{jid, _G[name](multi.unpack(args)), queue} - end).OnError(multi.error) + th.OnError(function(self,err) + -- We want to pass this to the other calling thread incase + rqname:push{jid, err} + end) + rqname:push{jid, func(multi.unpack(args))} + end) end end end).OnError(multi.error) @@ -248,16 +298,6 @@ function multi:newSystemThreadedProcessor(cores) _G["__TASKS"] = 0 end, name.."_target") - local count = 0 - while count < c.cores do - local dat = c.targetedQueue:pop() - if dat then - targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init() - table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list - count = count + 1 - end - end - c.jobqueue:registerFunction("packObj",function(obj) local multi, thread = require("multi"):init() obj.getThreadID = function() -- Special functions we are adding @@ -290,46 +330,42 @@ function multi:newSystemThreadedProcessor(cores) return packObj(obj) end, true) - function c:newLoop(func, notime) - proxy = self.spawnTask("newLoop", func, notime):init() - proxy.__proc = self + local implement = { + "newLoop", + "newTLoop", + "newUpdater", + "newEvent", + "newAlarm", + "newStep", + "newTStep" + } + + for _, method in pairs(implement) do + c[method] = function(self, ...) + proxy = self.spawnTask(method, ...):init(self.Name) + references[proxy] = self + return proxy + end + end + + function c:newThread(name, func, ...) + proxy = self.spawnThread(name, func, ...):init(self.Name) + references[proxy] = self + table.insert(self.threads, proxy) return proxy end - function c:newTLoop(func, time) - proxy = self.spawnTask("newTLoop", func, time):init() - proxy.__proc = self - return proxy + function c:newFunction(func, holdme) + return c.jobqueue:newFunction(func, holdme) end - function c:newUpdater(skip, func) - proxy = self.spawnTask("newUpdater", func, notime):init() - proxy.__proc = self - return proxy - end - - function c:newEvent(task, func) - proxy = self.spawnTask("newEvent", task, func):init() - proxy.__proc = self - return proxy - end - - function c:newAlarm(set, func) - proxy = self.spawnTask("newAlarm", set, func):init() - proxy.__proc = self - return proxy - end - - function c:newStep(start, reset, count, skip) - proxy = self.spawnTask("newStep", start, reset, count, skip):init() - proxy.__proc = self - return proxy - end - - function c:newTStep(start ,reset, count, set) - proxy = self.spawnTask("newTStep", start, reset, count, set):init() - proxy.__proc = self - return proxy + function c:newSharedTable(name) + if not name then multi.error("You must provide a name when creating a table!") end + local tbl_name = "TABLE_"..multi.randomString(8) + c.jobqueue:doToAll(function(tbl_name, interaction) + _G[interaction] = THREAD.waitFor(tbl_name):init() + end, tbl_name, name) + return multi:newSystemThreadedTable(tbl_name):init() end function c:getHandler() @@ -348,26 +384,6 @@ function multi:newSystemThreadedProcessor(cores) return self.Name end - function c:newThread(name, func, ...) - proxy = self.spawnThread(name, func, ...):init() - proxy.__proc = self - table.insert(self.threads, proxy) - return proxy - end - - function c:newFunction(func, holdme) - return c.jobqueue:newFunction(func, holdme) - end - - function c:newSharedTable(name) - if not name then multi.error("You must provide a name when creating a table!") end - local tbl_name = "TABLE_"..multi.randomString(8) - c.jobqueue:doToAll(function(tbl_name, interaction) - _G[interaction] = THREAD.waitFor(tbl_name):init() - end, tbl_name, name) - return multi:newSystemThreadedTable(tbl_name):init() - end - function c.run() return self end @@ -428,7 +444,7 @@ function thread.hold(n, opt) local args local id = n.getThreadID() local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name) + local func = multi:newTargetedFunction(id, references[n.Parent], "conn_"..multi.randomString(8), function(_name) local multi, thread = require("multi"):init() local obj = _G[_name] local rets = {thread.hold(obj)} @@ -437,7 +453,7 @@ function thread.hold(n, opt) rets[i] = {_self_ref_ = "parent"} end end - return unpack(rets) + return multi.unpack(rets) end) local conn @@ -460,7 +476,7 @@ function thread.hold(n, opt) end end - return unpack(ret) + return multi.unpack(ret) else return thread_ref(n, opt) end -- 2.43.0 From 1b3e3303e9095345169a91d59f5239544ab2d48e Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 18 Jun 2023 00:08:51 -0400 Subject: [PATCH 2/5] Really close to portable proxies, currently extreamly unstable! --- integration/sharedExtensions/init.lua | 141 +++++++++++++++++++------- 1 file changed, 103 insertions(+), 38 deletions(-) diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 8fc99a3..bd623d5 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -128,7 +128,6 @@ function multi:newProxy(list) end).OnError(print) return self else - print("INIT IN",THREAD_NAME) local multi, thread = require("multi"):init() local me = self self.proc_name = proc_name @@ -142,7 +141,6 @@ function multi:newProxy(list) for _,v in pairs(self.funcs) do if type(v) == "table" then -- We have a connection - print("Init Conn",v[1],THREAD_NAME) v[2]:init(proc_name) self["_"..v[1]] = v[2] v[2].Parent = self @@ -150,11 +148,11 @@ function multi:newProxy(list) self[v[1]] = multi:newConnection() thread:newThread(function() - print("HOLD:","_"..v[1],self["_"..v[1]].Type) while true do - self[v[1]]:Fire(thread.hold(self["_"..v[1]])) + local data = thread.hold(self["_"..v[1]]) + self[v[1]]:Fire(data) end - end) + end).OnError(multi.error) else self[v] = thread:newFunction(function(self,...) if self == me then @@ -198,36 +196,72 @@ local targets = {} local references = {} local nFunc = 0 -function multi:newTargetedFunction(ID, name, func, holup) -- This registers with the queue +function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registers with the queue if type(name)=="function" then holup = func func = name name = "JQ_TFunc_"..nFunc end nFunc = nFunc + 1 - proc.jobqueue:registerFunction(name, func) + + multi:executeOnProcess(proxy.proc_name, function(proc, name, func) + proc.jobqueue:registerFunction(name, func) + end, name, func) + return thread:newFunction(function(...) - local id = proc:pushJob(ID, name, ...) - local link - local rets - link = proc.jobqueue.OnJobCompleted(function(jid,...) - if id==jid then - rets = {...} - end - end) - return thread.hold(function() - if rets then - return multi.unpack(rets) or multi.NIL - end - end) + return multi:executeOnProcess(proxy.proc_name, function(proc, name, ID, ...) + local multi, thread = require("multi"):init() + local id = proc:pushJob(ID, name, ...) + local rets + local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() + return thread.hold(function() + local data = tjq:peek() + if data then + print(data) + end + if data and data[1] == id then + print("Got it sigh") + tjq:pop() + table.remove(data, 1) + return multi.unpack(data) or multi.NIL + end + end) + -- proc.jobqueue.OnJobCompleted(function(jid, ...) + -- if id==jid then + -- rets = {...} + -- print("Got!") + -- end + -- end) + -- return thread.hold(function() + -- if rets then + -- return multi.unpack(rets) or multi.NIL + -- end + -- end) + end, name, ID, ...) end, holup), name end --- local qname = name .. "_tq_" .. THREAD_ID --- local rqname = name .. "_rtq_" .. THREAD_ID -local function getQueue(name) - return THREAD.waitFor(name):init() -end +multi.executeOnProcess = thread:newFunction(function(self, name, func, ...) + local queue = THREAD.get(name .. "_local_proc") + local queueR = THREAD.get(name .. "_local_return") + if queue and queueR then + local multi, thread = require("multi"):init() + local id = multi.randomString(8) + queue = queue:init() + queueR = queueR:init() + queue:push({func, id, ...}) + return thread.hold(function() + local data = queueR:peek() + if data and data[1] == id then + queueR:pop() + table.remove(data, 1) + return multi.unpack(data) or multi.NIL + end + end) + else + return nil, "Unable to find a process queue with name: '" .. name .. "'" + end +end, true) local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -249,12 +283,16 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) + c.local_cmd = multi:newSystemThreadedQueue(name .. "_local_proc"):init() + c.local_cmd_return = multi:newSystemThreadedQueue(name .. "_local_return"):init() c.jobqueue:registerFunction("STP_enable_targets",function(name) local multi, thread = require("multi"):init() local qname = name .. "_tq_" .. THREAD_ID local rqname = name .. "_rtq_" .. THREAD_ID + local tjq = multi:newSystemThreadedQueue(qname):init() + local trq = multi:newSystemThreadedQueue(rqname):init() multi:newThread("TargetedJobHandler", function() local th while true do @@ -269,13 +307,13 @@ function multi:newSystemThreadedProcessor(cores) local args = table.remove(dat, 1) th.OnError(function(self,err) -- We want to pass this to the other calling thread incase - rqname:push{jid, err} + trq:push{jid, err} end) - rqname:push{jid, func(multi.unpack(args))} + trq:push{jid, func(multi.unpack(args))} end) end end - end).OnError(multi.error) + end).OnError(print) end) c.jobqueue:registerFunction("STP_GetThreadCount",function() @@ -287,7 +325,10 @@ function multi:newSystemThreadedProcessor(cores) end) function c:pushJob(ID, name, ...) - targets[ID]:push{name, jid, {...}} + print("pushing") + local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() + --targets[ID]:push{name, jid, {...}} + tq:push{name, jid, {...}} jid = jid - 1 return jid + 1 end @@ -432,37 +473,59 @@ function multi:newSystemThreadedProcessor(cores) return loads end, true) + local check = function() + return c.local_cmd:pop() + end + thread:newThread(function() + while true do + local data = thread.hold(check) + if data then + thread:newThread(function() + local func = table.remove(data, 1) + local id = table.remove(data, 1) + local ret = {id, func(c, multi.unpack(data))} + c.local_cmd_return:push(ret) + end).OnError(multi.error) + end + end + end).OnError(multi.error) + return c end -- Modify thread.hold to handle proxies local thread_ref = thread.hold function thread.hold(n, opt) - --if type(n) == "table" then print(n.Type, n.isConnection()) end if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then local ready = false local args local id = n.getThreadID() local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, references[n.Parent], "conn_"..multi.randomString(8), function(_name) + print(id, name) + local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) local multi, thread = require("multi"):init() local obj = _G[_name] + print("Start") local rets = {thread.hold(obj)} + print("Ring ;)") for i,v in pairs(rets) do if v.Type then rets[i] = {_self_ref_ = "parent"} end end return multi.unpack(rets) - end) + end, true) local conn - local handle = func(name) - conn = handle.OnReturn(function(...) - ready = true - args = {...} - handle.OnReturn:Unconnect(conn) - end) + local args = {func(name)} + -- conn = handle.OnReturn(function(...) + -- ready = true + -- args = {...} + -- for i,v in pairs(args) do + -- print("DATA",i,v) + -- end + -- handle.OnReturn:Unconnect(conn) + -- end) local ret = {thread_ref(function() if ready then @@ -471,7 +534,9 @@ function thread.hold(n, opt) end, opt)} for i,v in pairs(ret) do + print("OBJECT",v.Type) if type(v) == "table" and v._self_ref_ == "parent" then + print("assign") ret[i] = n.Parent end end -- 2.43.0 From c80f44c68ec10cce64941a6c23bc68b4fd9927fc Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Mon, 19 Jun 2023 00:24:07 -0400 Subject: [PATCH 3/5] Debugging what is going on... --- init.lua | 13 ++++++++----- integration/lanesManager/extensions.lua | 8 ++++---- integration/lanesManager/init.lua | 2 +- integration/loveManager/extensions.lua | 4 ++-- integration/sharedExtensions/init.lua | 9 ++++++--- 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/init.lua b/init.lua index e434b3c..1667819 100644 --- a/init.lua +++ b/init.lua @@ -1113,7 +1113,7 @@ function multi:newProcessor(name, nothread) function c:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return c:newThread("Threaded Function Handler", func, ...) + return c:newThread("Process Threaded Function Handler", func, ...) end, holdme)() end @@ -1451,7 +1451,9 @@ end function thread:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return thread:newThread("Threaded Function Handler", func, ...) + local th = thread:newThread("Free Threaded Function Handler", func, ...) + th.creator = debug.getinfo(2).name + return th end, holdme)() end @@ -1485,7 +1487,7 @@ function thread:newProcessor(name) function proc:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return thread_proc:newThread("Threaded Function Handler", func, ...) + return thread_proc:newThread("TProc Threaded Function Handler", func, ...) end, holdme)() end @@ -1774,6 +1776,7 @@ co_status = { if _ then ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) else + print("Thread: ", ref.Name, ref.creator, THREAD_NAME) ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) end if i then @@ -2346,8 +2349,8 @@ end function multi.error(self, err) if type(err) == "bool" then crash = err end if type(self) == "string" then err = self end - io.write("\x1b[91mERROR:\x1b[0m " .. err .. "\n") - error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type) + io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. debug.getinfo(2).name .."\n") + error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n") if multi.defaultSettings.error then os.exit(1) end diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 073fe6d..94b98c1 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -174,7 +174,7 @@ function multi:newSystemThreadedJobQueue(n) queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} end).OnError(multi.error) end - end).OnError(print) + end).OnError(multi.error) thread:newThread("DoAllHandler",function() while true do local dat = thread.hold(function() @@ -190,7 +190,7 @@ function multi:newSystemThreadedJobQueue(n) end end end - end).OnError(print) + end).OnError(multi.error) thread:newThread("IdleHandler",function() while true do thread.hold(function() @@ -198,9 +198,9 @@ function multi:newSystemThreadedJobQueue(n) end) THREAD.sleep(.01) end - end).OnError(print) + end).OnError(multi.error) multi:mainloop() - end,i).OnError(print) + end,i).OnError(multi.error) end return c end diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index e22431e..147488a 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -177,7 +177,7 @@ function multi.InitSystemThreadErrorHandler() end end end - end).OnError(print) + end).OnError(multi.error) end multi.print("Integrated Lanes Threading!") diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 8740018..992a10b 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -271,7 +271,7 @@ function multi:newSystemThreadedConnection(name) -- This shouldn't be the case end end - end).OnError(print) + end).OnError(multi.error) return self end @@ -346,7 +346,7 @@ function multi:newSystemThreadedConnection(name) c.proxy_conn:Fire(multi.unpack(item[2])) end end - end).OnError(print) + end).OnError(multi.error) --- ^^^ This will only exist in the init thread THREAD.package(name,c) diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index bd623d5..48800f3 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -125,7 +125,7 @@ function multi:newProxy(list) self.recv:push(ret) end end - end).OnError(print) + end).OnError(multi.error) return self else local multi, thread = require("multi"):init() @@ -300,10 +300,13 @@ function multi:newSystemThreadedProcessor(cores) return tjq:pop() end) if dat then + for i,v in pairs(dat) do + print(i,v) + end th = thread:newThread("JQ-TargetThread",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) - local func = table.remove(dat, 1) + local func = _G[name] local args = table.remove(dat, 1) th.OnError(function(self,err) -- We want to pass this to the other calling thread incase @@ -313,7 +316,7 @@ function multi:newSystemThreadedProcessor(cores) end) end end - end).OnError(print) + end).OnError(multi.error) end) c.jobqueue:registerFunction("STP_GetThreadCount",function() -- 2.43.0 From c39aa229f83616c2ea439974906bfffcb1190423 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Tue, 20 Jun 2023 00:05:12 -0400 Subject: [PATCH 4/5] Fixed critical issue with coroutine based threads --- docs/changes.md | 1 + init.lua | 23 ++++++++++------- integration/sharedExtensions/init.lua | 36 +++++++-------------------- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index 44a30ca..f27f5fe 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -449,6 +449,7 @@ Removed Fixed --- +- Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. - multi.isMainThread was not properly handled in each integration. This has been resolved. diff --git a/init.lua b/init.lua index 1667819..d67cdb2 100644 --- a/init.lua +++ b/init.lua @@ -352,7 +352,7 @@ function multi:newConnection(protect,func,kill) for i=1,#fast do local suc, err = pcall(fast[i], ...) if not suc then - print(err) + multi.error(err) end if kill then table.insert(kills,i) @@ -1393,7 +1393,6 @@ function thread:newFunctionBase(generator, holdme) else while not rets and not err do multi:getCurrentProcess():getHandler()() - multi:getHandler()() end local g = rets rets = nil @@ -1529,9 +1528,9 @@ end function thread:newThread(name, func, ...) multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called - local func = func or name - if func == name then - name = name or multi.randomString(16) + if type(name) == "function" then + func = name + name = "UnnamedThread_"..multi.randomString(16) end local c={nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} @@ -1776,7 +1775,6 @@ co_status = { if _ then ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) else - print("Thread: ", ref.Name, ref.creator, THREAD_NAME) ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) end if i then @@ -1799,9 +1797,16 @@ function multi:createHandler() return coroutine.wrap(function() local temp_start while true do - for start = #startme, 1, -1 do - temp_start = startme[start] - table.remove(startme) + -- for start = #startme, 1, -1 do + -- temp_start = startme[start] + -- table.remove(startme) + -- _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) + -- co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) + -- table.insert(threads, temp_start) + -- yield() + -- end + while #startme>0 do + temp_start = table.remove(startme) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) table.insert(threads, temp_start) diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 48800f3..f5bb84b 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -180,7 +180,7 @@ function multi:newProxy(list) end end function c:getTransferable() - local multi, thread = require("multi"):init() + local multi, thread = nil, nil local cp = {} cp.name = self.name cp.funcs = copy(self._funcs) @@ -216,11 +216,7 @@ function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registe local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() return thread.hold(function() local data = tjq:peek() - if data then - print(data) - end if data and data[1] == id then - print("Got it sigh") tjq:pop() table.remove(data, 1) return multi.unpack(data) or multi.NIL @@ -300,9 +296,6 @@ function multi:newSystemThreadedProcessor(cores) return tjq:pop() end) if dat then - for i,v in pairs(dat) do - print(i,v) - end th = thread:newThread("JQ-TargetThread",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) @@ -328,9 +321,7 @@ function multi:newSystemThreadedProcessor(cores) end) function c:pushJob(ID, name, ...) - print("pushing") local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() - --targets[ID]:push{name, jid, {...}} tq:push{name, jid, {...}} jid = jid - 1 return jid + 1 @@ -492,7 +483,6 @@ function multi:newSystemThreadedProcessor(cores) end end end).OnError(multi.error) - return c end @@ -504,31 +494,26 @@ function thread.hold(n, opt) local args local id = n.getThreadID() local name = n:getUniqueName() - print(id, name) local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) local multi, thread = require("multi"):init() local obj = _G[_name] - print("Start") local rets = {thread.hold(obj)} - print("Ring ;)") for i,v in pairs(rets) do if v.Type then rets[i] = {_self_ref_ = "parent"} end end return multi.unpack(rets) - end, true) + end) local conn - local args = {func(name)} - -- conn = handle.OnReturn(function(...) - -- ready = true - -- args = {...} - -- for i,v in pairs(args) do - -- print("DATA",i,v) - -- end - -- handle.OnReturn:Unconnect(conn) - -- end) + local args + handle = func(name) + conn = handle.OnReturn(function(...) + ready = true + args = {...} + handle.OnReturn:Unconnect(conn) + end) local ret = {thread_ref(function() if ready then @@ -537,13 +522,10 @@ function thread.hold(n, opt) end, opt)} for i,v in pairs(ret) do - print("OBJECT",v.Type) if type(v) == "table" and v._self_ref_ == "parent" then - print("assign") ret[i] = n.Parent end end - return multi.unpack(ret) else return thread_ref(n, opt) -- 2.43.0 From 660c10ec3b3ba175794fe8d7a3d052b4bc618472 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 25 Jun 2023 21:46:37 -0400 Subject: [PATCH 5/5] Removed extra bloat, proxies are portable now! --- docs/changes.md | 125 +++++---- init.lua | 155 ++++++----- integration/lanesManager/extensions.lua | 16 +- integration/lanesManager/init.lua | 20 ++ integration/lanesManager/threads.lua | 4 +- integration/loveManager/extensions.lua | 6 +- integration/loveManager/init.lua | 7 + integration/loveManager/threads.lua | 4 +- integration/lovrManager/extensions.lua | 2 +- integration/lovrManager/init.lua | 7 + integration/lovrManager/threads.lua | 2 +- integration/pseudoManager/extensions.lua | 4 +- integration/sharedExtensions/init.lua | 315 +++++------------------ 13 files changed, 293 insertions(+), 374 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index f27f5fe..9bb4824 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -74,6 +74,7 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds Added --- +- thread.hold will now use a custom hold method for objects with a `Hold` method. This is called like `obj:Hold(opt)`. The only argument passed is the optional options table that thread.hold can pass. There is an exception for connection objects. While they do contain a Hold method, the Hold method isn't used and is there for proxy objects, though they can be used in non proxy/thread situations. Hold returns all the arguments that the connection object was fired with. - shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there. ```lua package.path = "?/init.lua;?.lua;"..package.path @@ -119,10 +120,6 @@ Added STJQ_cPXT8GOx We work! We work!!! ``` -- STP:getLoad(type) -- returns a table where the index is the threadID and the value is the number of objects[type] running on that thread. `type`: "threads" for coroutines running or nil for all other objects running. -- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn) -- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in -- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to - multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))` - multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread @@ -148,67 +145,105 @@ Added ``` Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread. - Connection proxies break the rules a bit. Normally methods should always work on the thread side, however for connections in order to have actions work on the thread side you would call the connection using `obj._connName` instead of calling `obj.connName`. This allows you to have more control over connection events. See example below: + Proxies can also be shared between threads, just remember to use proxy:getTransferable() before transferring and proxy:init() on the other end. (We need to avoid copying over coroutines) + + The work done with proxies negates the usage of multi:newSystemThreadedConnection(), the only difference is you lose the metatables from connections. + + You cannot connect directly to a proxy connection on the non proxy thread, you can however use proxy_conn:Hold() or thread.hold(proxy_conn) to emulate this, see below. + ```lua package.path = "?/init.lua;?.lua;"..package.path - multi, thread = require("multi"):init({print=true}) + multi, thread = require("multi"):init({print=true, warn=true, error=true}) THREAD, GLOBAL = require("multi.integration.lanesManager"):init() stp = multi:newSystemThreadedProcessor(8) - alarm = stp:newAlarm(3) + tloop = stp:newTLoop(nil, 1) - -- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! - thread:newThread(function() - print("Hold on proxied connection", thread.hold(alarm._OnRing)) - end) + multi:newSystemThread("Testing proxy copy",function(tloop) + local function tprint (tbl, indent) + if not indent then indent = 0 end + for k, v in pairs(tbl) do + formatting = string.rep(" ", indent) .. k .. ": " + if type(v) == "table" then + print(formatting) + tprint(v, indent+1) + else + print(formatting .. tostring(v)) + end + end + end + local multi, thread = require("multi"):init() + tloop = tloop:init() + print("tloop type:",tloop.Type) + print("Testing proxies on other threads") + thread:newThread(function() + while true do + thread.hold(tloop.OnLoop) + print(THREAD_NAME,"Loopy") + end + end) + tloop.OnLoop(function(a) + print(THREAD_NAME, "Got loop...") + end) + multi:mainloop() + end, tloop:getTransferable()).OnError(multi.error) - alarm.OnRing(function(a) - print("OnRing",a, THREAD_NAME, THREAD_ID) - end) - - print("alarm.OnRing", alarm.OnRing.Type) - print("alarm._OnRing", alarm._OnRing.Type) + print("tloop", tloop.Type) thread:newThread(function() - print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) + print("Holding...") + thread.hold(tloop.OnLoop) + print("Held on proxied no proxy connection 1") + end).OnError(print) + + thread:newThread(function() + tloop.OnLoop:Hold() + print("held on proxied no proxy connection 2") + end) + + tloop.OnLoop(function() + print("OnLoop",THREAD_NAME) end) thread:newThread(function() - print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) - end) - - -- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! - thread:newThread(function() - print("Hold on proxied connection", thread.hold(alarm._OnRing)) - end) - - alarm._OnRing(function(a) - print("_OnRing",a, THREAD_NAME, THREAD_ID) - a:Reset(1) - end) + while true do + tloop.OnLoop:Hold() + print("OnLoop",THREAD_NAME) + end + end).OnError(multi.error) multi:mainloop() ``` Output: ``` - INFO: Integrated Lanes Threading! - alarm.OnRing connector - alarm._OnRing proxy - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 <-- This can change each time you run this example! - OnRing table: 018BC0C0 MAIN_THREAD 0 - Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil - Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 - OnRing table: 018BC0C0 MAIN_THREAD 0 - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 - OnRing table: 018BC0C0 MAIN_THREAD 0 + INFO: Integrated Lanes Threading! 1 + tloop proxy + Holding... + tloop type: proxy + Testing proxies on other threads + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + OnLoop MAIN_THREAD + Testing proxy copy Loopy + Held on proxied no proxy connection 1 + held on proxied no proxy connection 2 + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + Testing proxy copy Loopy + OnLoop MAIN_THREAD + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... - ... (Will repeat ever second now) - _OnRing table: 025EB128 STJQ_cjKsEZHg 1 + ... (Will repeat every second) - OnRing table: 018BC0C0 MAIN_THREAD 0 + Testing proxy copy Loopy + OnLoop MAIN_THREAD + OnLoop STJQ_W9SZGB6Y + STJQ_W9SZGB6Y Got loop... + + ... ``` The proxy version can only subscribe to events on the proxy thread, which means that connection metamethods will not work with the proxy version (`_OnRing` on the non proxy thread side), but the (`OnRing`) version will work. Cleverly handling the proxy thread and the non proxy thread will allow powerful connection logic. Also this is not a full system threaded connection. **Proxies should only be used between 2 threads!** To keep things fast I'm using simple queues to transfer data. There is no guarantee that things will work! @@ -222,6 +257,7 @@ Added - proxyStep = STP:newStep(...) - proxyTStep = STP:newTStep(...) - proxyThread = STP:newThread(...) + - proxyService = STP:newService(...) - threadedFunction = STP:newFunction(...) Unique: @@ -449,6 +485,7 @@ Removed Fixed --- +- Issue with luajit w/5.2 compat breaking with coroutine.running(), fixed the script to properly handle so thread.isThread() returns as expected! - Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. diff --git a/init.lua b/init.lua index d67cdb2..7f3d0ec 100644 --- a/init.lua +++ b/init.lua @@ -33,6 +33,7 @@ local threadManager local __CurrentConnectionThread multi.unpack = table.unpack or unpack +multi.pack = table.pack or function(...) return {...} end if table.unpack then unpack = table.unpack end @@ -81,6 +82,7 @@ multi.TSTEP = "tstep" multi.THREAD = "thread" multi.SERVICE = "service" multi.PROXY = "proxy" +multi.THREADEDFUNCTION = "threaded_function" if not _G["$multi"] then _G["$multi"] = {multi = multi, thread = thread} @@ -133,9 +135,7 @@ function multi.Stop() mainloopActive = false end -local function pack(...) - return {...} -end +local pack = multi.pack --Processor local priorityTable = {[false]="Disabled",[true]="Enabled"} @@ -470,6 +470,10 @@ function multi:newConnection(protect,func,kill) return temp end + c.Hold = thread:newFunction(function(self) + return thread.hold(self) + end, true) + c.connect=c.Connect c.GetConnection=c.getConnection c.HasConnections = c.hasConnections @@ -487,24 +491,6 @@ function multi:newConnection(protect,func,kill) return c end -multi.enableOptimization = multi:newConnection() -multi.optConn = multi:newConnection(true) -multi.optConn(function(msg) - table.insert(optimization_stats, msg) -end) - -function multi:getOptimizationConnection() - return multi.optConn -end - -function multi:getOptimizationStats() - return optimization_stats -end - -function multi:isFindingOptimizing() - return find_optimization -end - -- Used with ISO Threads local function isolateFunction(func, env) if setfenv then @@ -637,6 +623,7 @@ function multi:isDone() end function multi:create(ref) + ref.UID = "U"..multi.randomString(12) self.OnObjectCreated:Fire(ref, self) return self end @@ -686,10 +673,6 @@ function multi:newBase(ins) return c end -multi.OnObjectCreated=multi:newConnection() -multi.OnObjectDestroyed=multi:newConnection() -multi.OnLoad = multi:newConnection(nil,nil,true) -ignoreconn = false function multi:newTimer() local c={} c.Type=multi.TIMER @@ -740,7 +723,7 @@ function multi:newEvent(task, func) task=func return self end - c.OnEvent = self:newConnection():fastMode() + c.OnEvent = self:newConnection() if func then c.OnEvent(func) end @@ -767,7 +750,7 @@ function multi:newUpdater(skip, func) skip=n return self end - c.OnUpdate = self:newConnection():fastMode() + c.OnUpdate = self:newConnection() c:setName(c.Type) if func then c.OnUpdate(func) @@ -803,7 +786,7 @@ function multi:newAlarm(set, func) t = clock() return self end - c.OnRing = self:newConnection():fastMode() + c.OnRing = self:newConnection() function c:Pause() count = clock() self.Parent.Pause(self) @@ -833,7 +816,7 @@ function multi:newLoop(func, notime) end end - c.OnLoop = self:newConnection():fastMode() + c.OnLoop = self:newConnection() if func then c.OnLoop(func) @@ -881,9 +864,9 @@ function multi:newStep(start,reset,count,skip) return true end c.Reset=c.Resume - c.OnStart = self:newConnection():fastMode() - c.OnStep = self:newConnection():fastMode() - c.OnEnd = self:newConnection():fastMode() + c.OnStart = self:newConnection() + c.OnStep = self:newConnection() + c.OnEnd = self:newConnection() function c:Break() self.Active=nil return self @@ -904,40 +887,49 @@ function multi:newStep(start,reset,count,skip) return c end -function multi:newTLoop(func,set) +function multi:newTLoop(func, set) local c=self:newBase() c.Type=multi.TLOOP c.set=set or 0 c.timer=self:newTimer() c.life=0 c:setPriority("Low") + function c:Act() - if self.timer:Get()>=self.set then + if self.timer:Get() >= self.set then self.life=self.life+1 self.timer:Reset() - self.OnLoop:Fire(self,self.life) + self.OnLoop:Fire(self, self.life) return true end end + function c:Set(set) self.set = set end + function c:Resume() self.Parent.Resume(self) self.timer:Resume() return self end + function c:Pause() self.timer:Pause() self.Parent.Pause(self) return self end - c.OnLoop = self:newConnection():fastMode() + + c.OnLoop = self:newConnection() + if func then c.OnLoop(func) end + c:setName(c.Type) + self:create(c) + return c end @@ -1144,6 +1136,7 @@ function multi:newProcessor(name, nothread) end table.insert(processes,c) + self:create(c) return c end @@ -1209,7 +1202,7 @@ function multi:getTasks() end function thread.request(t,cmd,...) - thread.requests[t.thread] = {cmd,{...}} + thread.requests[t.thread] = {cmd, multi.pack(...)} end function thread.getRunningThread() @@ -1248,14 +1241,18 @@ local function conn_test(conn) local args local func = function(...) ready = true - args = {...} + args = multi.pack(...) end local ref = conn(func) return function() if ready then conn:Unconnect(ref) - return multi.unpack(args) or multi.NIL + if #args==0 then + return multi.NIL + else + return multi.unpack(args) + end end end end @@ -1267,7 +1264,7 @@ function thread.chain(...) end end -function thread.hold(n,opt) +function thread.hold(n, opt) thread._Requests() local opt = opt or {} if type(opt)=="table" then @@ -1286,8 +1283,10 @@ function thread.hold(n,opt) return yield(CMD, t_sleep, n or 0, nil, interval) elseif type(n) == "table" and n.Type == multi.CONNECTOR then return yield(CMD, t_hold, conn_test(n), nil, interval) + elseif type(n) == "table" and n.Hold ~= nil then + return n:Hold(opt) elseif type(n) == "function" then - return yield(CMD, t_hold, n or dFunc, nil, interval) + return yield(CMD, t_hold, n, nil, interval) else multi.error("Invalid argument passed to thread.hold(...)!") end @@ -1318,11 +1317,12 @@ function thread.yield() end function thread.isThread() - if _VERSION~="Lua 5.1" then - local a,b = running() + local a,b = running() + if b then + -- We are dealing with luajit compat or 5.2+ return not(b) else - return running()~=nil + return a~=nil end end @@ -1345,7 +1345,7 @@ function thread.waitFor(name) end local function cleanReturns(...) - local returns = {...} + local returns = multi.pack(...) local rets = {} local ind = 0 for i=#returns,1,-1 do @@ -1392,7 +1392,7 @@ function thread:newFunctionBase(generator, holdme) end) else while not rets and not err do - multi:getCurrentProcess():getHandler()() + multi:uManager() end local g = rets rets = nil @@ -1416,7 +1416,7 @@ function thread:newFunctionBase(generator, holdme) } end local t = generator(...) - t.OnDeath(function(...) rets = {...} end) + t.OnDeath(function(...) rets = multi.pack(...) end) t.OnError(function(self,e) err = e end) if holdme then return wait() @@ -1444,15 +1444,14 @@ function thread:newFunctionBase(generator, holdme) return temp end setmetatable(tfunc, tfunc) + tfunc.Type = multi.THREADEDFUNCTION return tfunc end end function thread:newFunction(func, holdme) return thread:newFunctionBase(function(...) - local th = thread:newThread("Free Threaded Function Handler", func, ...) - th.creator = debug.getinfo(2).name - return th + return thread:newThread("Free Threaded Function Handler", func, ...) end, holdme)() end @@ -1492,12 +1491,12 @@ function thread:newProcessor(name) function proc.Start() Active = true - return c + return proc end function proc.Stop() Active = false - return c + return proc end function proc:Destroy() @@ -1515,6 +1514,8 @@ function thread:newProcessor(name) end end) end) + + self:create(proc) return proc end @@ -1534,7 +1535,7 @@ function thread:newThread(name, func, ...) end local c={nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} - c.startArgs = {...} + c.startArgs = multi.pack(...) c.ref={} c.Name=name c.thread=create(func) @@ -1797,14 +1798,6 @@ function multi:createHandler() return coroutine.wrap(function() local temp_start while true do - -- for start = #startme, 1, -1 do - -- temp_start = startme[start] - -- table.remove(startme) - -- _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) - -- co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) - -- table.insert(threads, temp_start) - -- yield() - -- end while #startme>0 do temp_start = table.remove(startme) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) @@ -1929,7 +1922,7 @@ function multi:newService(func) -- Priority managed threads return c end - multi.create(multi,c) + self:create(c) return c end @@ -2338,7 +2331,7 @@ end function multi.print(...) if multi.defaultSettings.print then local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[94mINFO:\x1b[0m " .. table.concat(t," ") .. "\n") end end @@ -2346,7 +2339,7 @@ end function multi.warn(...) if multi.defaultSettings.warn then local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[93mWARNING:\x1b[0m " .. table.concat(t," ") .. "\n") end end @@ -2363,7 +2356,7 @@ end function multi.success(...) local t = {} - for i,v in pairs({...}) do t[#t+1] = tostring(v) end + for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end io.write("\x1b[92mSUCCESS:\x1b[0m " .. table.concat(t," ") .. "\n") end @@ -2384,6 +2377,29 @@ function os.exit(n) _os(n) end +multi.OnObjectCreated=multi:newConnection() +ignoreconn = false +multi.OnObjectDestroyed=multi:newConnection() +multi.OnLoad = multi:newConnection(nil,nil,true) + +multi.enableOptimization = multi:newConnection() +multi.optConn = multi:newConnection(true) +multi.optConn(function(msg) + table.insert(optimization_stats, msg) +end) + +function multi:getOptimizationConnection() + return multi.optConn +end + +function multi:getOptimizationStats() + return optimization_stats +end + +function multi:isFindingOptimizing() + return find_optimization +end + multi.OnError=multi:newConnection() multi.OnPreLoad = multi:newConnection() multi.OnExit = multi:newConnection(nil,nil,true) @@ -2403,11 +2419,12 @@ function multi:getHandler() end multi:newThread("Task Handler", function() - local check = function() - return table.remove(tasks) - end while true do - thread.hold(check)() + if #tasks > 0 then + table.remove(tasks)() + else + thread.yield() + end end end).OnError(multi.error) diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 94b98c1..f76378e 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -113,7 +113,7 @@ function multi:newSystemThreadedJobQueue(n) return self end function c:pushJob(name,...) - queueJob:push{name,jid,{...}} + queueJob:push{name,jid,multi.pack(...)} jid = jid + 1 return jid-1 end @@ -132,12 +132,16 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() if rets then - return multi.unpack(rets) or multi.NIL + if #rets == 0 then + return multi.NIL + else + return multi.unpack(rets) + end end end) end, holup), name @@ -171,7 +175,7 @@ function multi:newSystemThreadedJobQueue(n) local name = table.remove(dat, 1) local jid = table.remove(dat, 1) local args = table.remove(dat, 1) - queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} + queueReturn:push{jid, funcs[name](args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]), queue} end).OnError(multi.error) end end).OnError(multi.error) @@ -258,7 +262,7 @@ function multi:newSystemThreadedConnection(name) local function fire(...) for _, link in pairs(c.links) do - link:push {c.TRIG, {...}} + link:push {c.TRIG, multi.pack(...)} end end @@ -286,7 +290,7 @@ function multi:newSystemThreadedConnection(name) --- ^^^ This will only exist in the init thread function c:Fire(...) - local args = {...} + local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do link:push {self.TRIG, args} diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 147488a..8a8318c 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -98,6 +98,13 @@ function multi:newSystemThread(name, func, ...) globals = globe, priority = c.priority },function(...) + local profi + + if multi_settings.debug then + profi = require("proFI") + profi:start() + end + multi, thread = require("multi"):init(multi_settings) require("multi.integration.lanesManager.extensions") require("multi.integration.sharedExtensions") @@ -105,6 +112,12 @@ function multi:newSystemThread(name, func, ...) returns = {pcall(func, ...)} return_linda:set("returns", returns) has_error = false + if profi then + multi.OnExit(function(...) + profi:stop() + profi:writeReport("Profiling Report [".. THREAD_NAME .."].txt") + end) + end end)(...) count = count + 1 function c:getName() @@ -118,6 +131,13 @@ function multi:newSystemThread(name, func, ...) c.OnDeath = multi:newConnection() c.OnError = multi:newConnection() GLOBAL["__THREADS__"] = livingThreads + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end diff --git a/integration/lanesManager/threads.lua b/integration/lanesManager/threads.lua index d8a3838..3d05ebf 100644 --- a/integration/lanesManager/threads.lua +++ b/integration/lanesManager/threads.lua @@ -66,7 +66,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) local c = {} c.queue = __Console function c.print(...) - c.queue:send("Q", {...}) + c.queue:send("Q", multi.pack(...)) end function c.error(err) c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}) @@ -90,7 +90,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) end function THREAD.pushStatus(...) - local args = {...} + local args = multi.pack(...) __StatusLinda:send(nil,THREAD_ID, args) end diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 992a10b..738f18f 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() @@ -230,7 +230,7 @@ function multi:newSystemThreadedConnection(name) self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) function self:Fire(...) - local args = {...} + local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do love.thread.getChannel(link):push{self.TRIG, args} @@ -321,7 +321,7 @@ function multi:newSystemThreadedConnection(name) local function fire(...) for _, link in pairs(c.links) do - love.thread.getChannel(link):push {c.TRIG, {...}} + love.thread.getChannel(link):push {c.TRIG, multi.pack(...)} end end diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index 7c5289f..0c23b6b 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -103,6 +103,13 @@ function multi:newSystemThread(name, func, ...) c.stab.returns = nil end end) + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index 12f5471..777f811 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -98,7 +98,7 @@ end function threads.pushStatus(...) local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__) - local args = {...} + local args = multi.pack(...) status_channel:push(args) end @@ -209,7 +209,7 @@ function threads.getConsole() local c = {} c.queue = love.thread.getChannel("__CONSOLE__") function c.print(...) - c.queue:push{...} + c.queue:push(multi.pack(...)) end function c.error(err) c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} diff --git a/integration/lovrManager/extensions.lua b/integration/lovrManager/extensions.lua index a4f231a..ddc66c6 100644 --- a/integration/lovrManager/extensions.lua +++ b/integration/lovrManager/extensions.lua @@ -118,7 +118,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) link:Destroy() end end) diff --git a/integration/lovrManager/init.lua b/integration/lovrManager/init.lua index 6a64c98..bb86a8a 100644 --- a/integration/lovrManager/init.lua +++ b/integration/lovrManager/init.lua @@ -76,6 +76,13 @@ function multi:newSystemThread(name,func,...) GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} GLOBAL["__THREAD_COUNT"] = THREAD_ID THREAD_ID=THREAD_ID+1 + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + return c end THREAD.newSystemThread = multi.newSystemThread diff --git a/integration/lovrManager/threads.lua b/integration/lovrManager/threads.lua index 12429c4..dc919ab 100644 --- a/integration/lovrManager/threads.lua +++ b/integration/lovrManager/threads.lua @@ -156,7 +156,7 @@ function threads.getConsole() local c = {} c.queue = lovr.thread.getChannel("__CONSOLE__") function c.print(...) - c.queue:push{...} + c.queue:push(multi.pack(...)) end function c.error(err) c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index bfdd982..7ad8d6c 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -96,7 +96,7 @@ function multi:newSystemThreadedJobQueue(n) end function c:pushJob(name,...) - table.insert(jobs,{name,jid,{...}}) + table.insert(jobs,{name,jid,multi.pack(...)}) jid = jid + 1 return jid-1 end @@ -121,7 +121,7 @@ function multi:newSystemThreadedJobQueue(n) local rets link = c.OnJobCompleted(function(jid,...) if id==jid then - rets = {...} + rets = multi.pack(...) end end) return thread.hold(function() diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index f5bb84b..ba5f00b 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -22,8 +22,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] -function copy(obj) - if type(obj) ~= 'table' then return obj end +local function copy(obj) + if type(obj) ~= 'table' then return obj end local res = {} for k, v in pairs(obj) do res[copy(k)] = copy(v) end return res @@ -48,34 +48,19 @@ local multi, thread = require("multi"):init() -- Create on the thread that you want to interact with, send over the handle function multi:chop(obj) + if not _G["UIDS"] then + _G["UIDS"] = {} + end local multi, thread = require("multi"):init() local list = {[0] = multi.randomString(12)} _G[list[0]] = obj for i,v in pairs(obj) do - if type(v) == "function" then + if type(v) == "function" or type(v) == "table" and v.Type == multi.THREADEDFUNCTION then table.insert(list, i) - elseif type(v) == "table" and v.Type == multi.CONNECTOR then - v.getThreadID = function() -- Special function we are adding - return THREAD_ID - end - - v.getUniqueName = function(self) - return self.__link_name - end - - local l = multi:chop(v) - v.__link_name = l[0] - v.__name = i - - table.insert(list, {i, multi:newProxy(l):init()}) + elseif type(v) == "table" and v.Type == multi.CONNECTOR then + table.insert(list, {i, multi:newProxy(multi:chop(v)):init()}) end end - table.insert(list, "isConnection") - if obj.Type == multi.CONNECTOR then - obj.isConnection = function() return true end - else - obj.isConnection = function() return false end - end return list end @@ -85,44 +70,63 @@ function multi:newProxy(list) c.name = multi.randomString(12) c.is_init = false - - function c:init(proc_name) + local multi, thread = nil, nil + function c:init() local multi, thread = nil, nil if not(c.is_init) then c.is_init = true local multi, thread = require("multi"):init() + c.proxy_link = "PL" .. multi.randomString(12) + + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + + GLOBAL[c.proxy_link] = c + local function check() return self.send:pop() end + self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.funcs = list self._funcs = copy(list) self.Type = multi.PROXY self.TID = THREAD_ID - thread:newThread(function() + + thread:newThread("Proxy_Handler_" .. multi.randomString(4), function() while true do local data = thread.hold(check) if data then - local func = table.remove(data, 1) - local sref = table.remove(data, 1) - local ret - if sref then - ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} - else - ret = {_G[list[0]][func](multi.unpack(data))} - end - for i = 1,#ret do - if type(ret[i]) == "table" and getmetatable(ret[i]) then - setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side! + -- Let's not hold the main threadloop + thread:newThread("Temp_Thread", function() + local func = table.remove(data, 1) + local sref = table.remove(data, 1) + local ret + + if sref then + ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} + else + ret = {_G[list[0]][func](multi.unpack(data))} end - if ret[i] == _G[list[0]] then - -- We cannot return itself, that return can contain bad values. - ret[i] = {_self_ref_ = true} + + for i = 1,#ret do + if type(ret[i]) == "table" and ret[i].Type ~= nil and ret[i].Type ~= multi.PROXY then + ret[i] = "\1PARENT_REF" + end + if type(ret[i]) == "table" and getmetatable(ret[i]) then + setmetatable(ret[i],nil) -- remove that metatable, we do not need it on the other side! + end + if ret[i] == _G[list[0]] then + -- We cannot return itself, that return can contain bad values. + ret[i] = "\1SELF_REF" + end end - end - table.insert(ret, 1, func) - self.recv:push(ret) + table.insert(ret, 1, func) + self.recv:push(ret) + end) end end end).OnError(multi.error) @@ -130,7 +134,7 @@ function multi:newProxy(list) else local multi, thread = require("multi"):init() local me = self - self.proc_name = proc_name + local funcs = copy(self.funcs) if multi.integration then GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD @@ -138,21 +142,13 @@ function multi:newProxy(list) self.send = THREAD.waitFor(self.name.."_S"):init() self.recv = THREAD.waitFor(self.name.."_R"):init() self.Type = multi.PROXY - for _,v in pairs(self.funcs) do + for _,v in pairs(funcs) do if type(v) == "table" then -- We have a connection v[2]:init(proc_name) - self["_"..v[1]] = v[2] + self[v[1]] = v[2] v[2].Parent = self setmetatable(v[2],getmetatable(multi:newConnection())) - self[v[1]] = multi:newConnection() - - thread:newThread(function() - while true do - local data = thread.hold(self["_"..v[1]]) - self[v[1]]:Fire(data) - end - end).OnError(multi.error) else self[v] = thread:newFunction(function(self,...) if self == me then @@ -166,8 +162,10 @@ function multi:newProxy(list) me.recv:pop() table.remove(data, 1) for i=1,#data do - if type(data[i]) == "table" and data[i]._self_ref_ then + if data[i] == "\1SELF_REF" then data[i] = me + elseif data[i] == "\1PARENT_REF" then + data[i] = me.Parent end end return multi.unpack(data) @@ -180,85 +178,31 @@ function multi:newProxy(list) end end function c:getTransferable() - local multi, thread = nil, nil local cp = {} + local multi, thread = require("multi"):init() + cp.is_init = true + cp.proxy_link = self.proxy_link cp.name = self.name cp.funcs = copy(self._funcs) - cp._funcs = copy(self._funcs) - cp.Type = self.Type - cp.init = self.init + cp.init = function(self) + local multi, thread = require("multi"):init() + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + local proxy = THREAD.waitFor(self.proxy_link) + proxy.funcs = self.funcs + return proxy:init() + end return cp end + self:create(c) return c end local targets = {} local references = {} -local nFunc = 0 -function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registers with the queue - if type(name)=="function" then - holup = func - func = name - name = "JQ_TFunc_"..nFunc - end - nFunc = nFunc + 1 - - multi:executeOnProcess(proxy.proc_name, function(proc, name, func) - proc.jobqueue:registerFunction(name, func) - end, name, func) - - return thread:newFunction(function(...) - return multi:executeOnProcess(proxy.proc_name, function(proc, name, ID, ...) - local multi, thread = require("multi"):init() - local id = proc:pushJob(ID, name, ...) - local rets - local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() - return thread.hold(function() - local data = tjq:peek() - if data and data[1] == id then - tjq:pop() - table.remove(data, 1) - return multi.unpack(data) or multi.NIL - end - end) - -- proc.jobqueue.OnJobCompleted(function(jid, ...) - -- if id==jid then - -- rets = {...} - -- print("Got!") - -- end - -- end) - -- return thread.hold(function() - -- if rets then - -- return multi.unpack(rets) or multi.NIL - -- end - -- end) - end, name, ID, ...) - end, holup), name -end - -multi.executeOnProcess = thread:newFunction(function(self, name, func, ...) - local queue = THREAD.get(name .. "_local_proc") - local queueR = THREAD.get(name .. "_local_return") - if queue and queueR then - local multi, thread = require("multi"):init() - local id = multi.randomString(8) - queue = queue:init() - queueR = queueR:init() - queue:push({func, id, ...}) - return thread.hold(function() - local data = queueR:peek() - if data and data[1] == id then - queueR:pop() - table.remove(data, 1) - return multi.unpack(data) or multi.NIL - end - end) - else - return nil, "Unable to find a process queue with name: '" .. name .. "'" - end -end, true) - local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -279,69 +223,16 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - c.local_cmd = multi:newSystemThreadedQueue(name .. "_local_proc"):init() - c.local_cmd_return = multi:newSystemThreadedQueue(name .. "_local_return"):init() - - c.jobqueue:registerFunction("STP_enable_targets",function(name) - local multi, thread = require("multi"):init() - local qname = name .. "_tq_" .. THREAD_ID - local rqname = name .. "_rtq_" .. THREAD_ID - - local tjq = multi:newSystemThreadedQueue(qname):init() - local trq = multi:newSystemThreadedQueue(rqname):init() - multi:newThread("TargetedJobHandler", function() - local th - while true do - local dat = thread.hold(function() - return tjq:pop() - end) - if dat then - th = thread:newThread("JQ-TargetThread",function() - local name = table.remove(dat, 1) - local jid = table.remove(dat, 1) - local func = _G[name] - local args = table.remove(dat, 1) - th.OnError(function(self,err) - -- We want to pass this to the other calling thread incase - trq:push{jid, err} - end) - trq:push{jid, func(multi.unpack(args))} - end) - end - end - end).OnError(multi.error) - end) - - c.jobqueue:registerFunction("STP_GetThreadCount",function() - return _G["__THREADS"] - end) - - c.jobqueue:registerFunction("STP_GetTaskCount",function() - return _G["__TASKS"] - end) function c:pushJob(ID, name, ...) local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() - tq:push{name, jid, {...}} + tq:push{name, jid, multi.pack(...)} jid = jid - 1 return jid + 1 end - c.jobqueue:doToAll(function(name) - STP_enable_targets(name) - _G["__THREADS"] = 0 - _G["__TASKS"] = 0 - end, name.."_target") - c.jobqueue:registerFunction("packObj",function(obj) local multi, thread = require("multi"):init() - obj.getThreadID = function() -- Special functions we are adding - return THREAD_ID - end - - obj.getUniqueName = function(self) - return self.__link_name - end local list = multi:chop(obj) obj.__link_name = list[0] @@ -354,14 +245,12 @@ function multi:newSystemThreadedProcessor(cores) c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) local multi, thread = require("multi"):init() local obj = thread:newThread(name, func, ...) - _G["__THREADS"] = _G["__THREADS"] + 1 return packObj(obj) end, true) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) local multi, thread = require("multi"):init() local obj = multi[obj](multi, func, ...) - _G["__TASKS"] = _G["__TASKS"] + 1 return packObj(obj) end, true) @@ -372,12 +261,13 @@ function multi:newSystemThreadedProcessor(cores) "newEvent", "newAlarm", "newStep", - "newTStep" + "newTStep", + "newService" } for _, method in pairs(implement) do c[method] = function(self, ...) - proxy = self.spawnTask(method, ...):init(self.Name) + proxy = self.spawnTask(method, ...):init() references[proxy] = self return proxy end @@ -467,68 +357,5 @@ function multi:newSystemThreadedProcessor(cores) return loads end, true) - local check = function() - return c.local_cmd:pop() - end - thread:newThread(function() - while true do - local data = thread.hold(check) - if data then - thread:newThread(function() - local func = table.remove(data, 1) - local id = table.remove(data, 1) - local ret = {id, func(c, multi.unpack(data))} - c.local_cmd_return:push(ret) - end).OnError(multi.error) - end - end - end).OnError(multi.error) return c end - --- Modify thread.hold to handle proxies -local thread_ref = thread.hold -function thread.hold(n, opt) - if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then - local ready = false - local args - local id = n.getThreadID() - local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) - local multi, thread = require("multi"):init() - local obj = _G[_name] - local rets = {thread.hold(obj)} - for i,v in pairs(rets) do - if v.Type then - rets[i] = {_self_ref_ = "parent"} - end - end - return multi.unpack(rets) - end) - - local conn - local args - handle = func(name) - conn = handle.OnReturn(function(...) - ready = true - args = {...} - handle.OnReturn:Unconnect(conn) - end) - - local ret = {thread_ref(function() - if ready then - return multi.unpack(args) or multi.NIL - end - end, opt)} - - for i,v in pairs(ret) do - if type(v) == "table" and v._self_ref_ == "parent" then - ret[i] = n.Parent - end - end - return multi.unpack(ret) - else - return thread_ref(n, opt) - end -end - -- 2.43.0