From 6fe10b22ab995edacd7e5d3adae3430374d4c128 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 17 Jun 2023 21:33:34 -0400 Subject: [PATCH] Testing --- init.lua | 2 + integration/lanesManager/extensions.lua | 3 +- integration/lanesManager/init.lua | 1 - integration/sharedExtensions/init.lua | 190 +++++++++++++----------- 4 files changed, 107 insertions(+), 89 deletions(-) diff --git a/init.lua b/init.lua index 881f832..e434b3c 100644 --- a/init.lua +++ b/init.lua @@ -516,6 +516,8 @@ local function isolateFunction(func, env) end end +multi.isolateFunction = isolateFunction + function multi:Break() self:Pause() self.Active=nil diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 6a98121..073fe6d 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -344,4 +344,5 @@ function multi:newSystemThreadedConnection(name) end return c -end \ No newline at end of file +end +require("multi.integration.sharedExtensions") \ No newline at end of file diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 547c7c6..e22431e 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -185,7 +185,6 @@ multi.integration = {} -- for module creators multi.integration.GLOBAL = GLOBAL multi.integration.THREAD = THREAD require("multi.integration.lanesManager.extensions") -require("multi.integration.sharedExtensions") return { init = function() return GLOBAL, THREAD diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 33ab91c..8fc99a3 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -1,4 +1,4 @@ ---[[ +--[[ todo finish the targeted job! MIT License Copyright (c) 2023 Ryan Ward @@ -22,6 +22,26 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] +function copy(obj) + if type(obj) ~= 'table' then return obj end + local res = {} + for k, v in pairs(obj) do res[copy(k)] = copy(v) end + return res +end + +function tprint (tbl, indent) + if not indent then indent = 0 end + for k, v in pairs(tbl) do + formatting = string.rep(" ", indent) .. k .. ": " + if type(v) == "table" then + print(formatting) + tprint(v, indent+1) + else + print(formatting .. tostring(v)) + end + end + end + local multi, thread = require("multi"):init() -- Returns a handler that allows a user to interact with an object on another thread! @@ -64,10 +84,12 @@ function multi:newProxy(list) local c = {} c.name = multi.randomString(12) + c.is_init = false - function c:init() + function c:init(proc_name) local multi, thread = nil, nil - if THREAD_ID>0 then + if not(c.is_init) then + c.is_init = true local multi, thread = require("multi"):init() local function check() return self.send:pop() @@ -75,7 +97,9 @@ function multi:newProxy(list) self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.funcs = list - self.conns = list[-1] + self._funcs = copy(list) + self.Type = multi.PROXY + self.TID = THREAD_ID thread:newThread(function() while true do local data = thread.hold(check) @@ -104,25 +128,31 @@ function multi:newProxy(list) end).OnError(print) return self else + print("INIT IN",THREAD_NAME) local multi, thread = require("multi"):init() local me = self - GLOBAL = multi.integration.GLOBAL - THREAD = multi.integration.THREAD - self.send = THREAD.waitFor(self.name.."_S") - self.recv = THREAD.waitFor(self.name.."_R") + self.proc_name = proc_name + if multi.integration then + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD + end + self.send = THREAD.waitFor(self.name.."_S"):init() + self.recv = THREAD.waitFor(self.name.."_R"):init() self.Type = multi.PROXY for _,v in pairs(self.funcs) do if type(v) == "table" then -- We have a connection - v[2]:init() + print("Init Conn",v[1],THREAD_NAME) + v[2]:init(proc_name) self["_"..v[1]] = v[2] v[2].Parent = self setmetatable(v[2],getmetatable(multi:newConnection())) self[v[1]] = multi:newConnection() thread:newThread(function() + print("HOLD:","_"..v[1],self["_"..v[1]].Type) while true do - self[v[1]]:Fire(thread.hold(alarm["_"..v[1]])) + self[v[1]]:Fire(thread.hold(self["_"..v[1]])) end end) else @@ -151,13 +181,24 @@ function multi:newProxy(list) return self end end + function c:getTransferable() + local multi, thread = require("multi"):init() + local cp = {} + cp.name = self.name + cp.funcs = copy(self._funcs) + cp._funcs = copy(self._funcs) + cp.Type = self.Type + cp.init = self.init + return cp + end return c end local targets = {} +local references = {} local nFunc = 0 -function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue +function multi:newTargetedFunction(ID, name, func, holup) -- This registers with the queue if type(name)=="function" then holup = func func = name @@ -181,6 +222,12 @@ function multi:newTargetedFunction(ID, proc, name, func, holup) -- This register end) end, holup), name end +-- local qname = name .. "_tq_" .. THREAD_ID +-- local rqname = name .. "_rtq_" .. THREAD_ID + +local function getQueue(name) + return THREAD.waitFor(name):init() +end local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -202,27 +249,30 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init() c.jobqueue:registerFunction("STP_enable_targets",function(name) local multi, thread = require("multi"):init() - local qname = THREAD_NAME .. "_t_queue" - local targetedQueue = THREAD.waitFor(name):init() + local qname = name .. "_tq_" .. THREAD_ID + local rqname = name .. "_rtq_" .. THREAD_ID local tjq = multi:newSystemThreadedQueue(qname):init() - targetedQueue:push({tonumber(THREAD_ID), qname}) multi:newThread("TargetedJobHandler", function() - local queueReturn = _G["__QR"] + local th while true do local dat = thread.hold(function() return tjq:pop() end) if dat then - thread:newThread("JQ-TargetThread",function() + th = thread:newThread("JQ-TargetThread",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) + local func = table.remove(dat, 1) local args = table.remove(dat, 1) - queueReturn:push{jid, _G[name](multi.unpack(args)), queue} - end).OnError(multi.error) + th.OnError(function(self,err) + -- We want to pass this to the other calling thread incase + rqname:push{jid, err} + end) + rqname:push{jid, func(multi.unpack(args))} + end) end end end).OnError(multi.error) @@ -248,16 +298,6 @@ function multi:newSystemThreadedProcessor(cores) _G["__TASKS"] = 0 end, name.."_target") - local count = 0 - while count < c.cores do - local dat = c.targetedQueue:pop() - if dat then - targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init() - table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list - count = count + 1 - end - end - c.jobqueue:registerFunction("packObj",function(obj) local multi, thread = require("multi"):init() obj.getThreadID = function() -- Special functions we are adding @@ -290,46 +330,42 @@ function multi:newSystemThreadedProcessor(cores) return packObj(obj) end, true) - function c:newLoop(func, notime) - proxy = self.spawnTask("newLoop", func, notime):init() - proxy.__proc = self + local implement = { + "newLoop", + "newTLoop", + "newUpdater", + "newEvent", + "newAlarm", + "newStep", + "newTStep" + } + + for _, method in pairs(implement) do + c[method] = function(self, ...) + proxy = self.spawnTask(method, ...):init(self.Name) + references[proxy] = self + return proxy + end + end + + function c:newThread(name, func, ...) + proxy = self.spawnThread(name, func, ...):init(self.Name) + references[proxy] = self + table.insert(self.threads, proxy) return proxy end - function c:newTLoop(func, time) - proxy = self.spawnTask("newTLoop", func, time):init() - proxy.__proc = self - return proxy + function c:newFunction(func, holdme) + return c.jobqueue:newFunction(func, holdme) end - function c:newUpdater(skip, func) - proxy = self.spawnTask("newUpdater", func, notime):init() - proxy.__proc = self - return proxy - end - - function c:newEvent(task, func) - proxy = self.spawnTask("newEvent", task, func):init() - proxy.__proc = self - return proxy - end - - function c:newAlarm(set, func) - proxy = self.spawnTask("newAlarm", set, func):init() - proxy.__proc = self - return proxy - end - - function c:newStep(start, reset, count, skip) - proxy = self.spawnTask("newStep", start, reset, count, skip):init() - proxy.__proc = self - return proxy - end - - function c:newTStep(start ,reset, count, set) - proxy = self.spawnTask("newTStep", start, reset, count, set):init() - proxy.__proc = self - return proxy + function c:newSharedTable(name) + if not name then multi.error("You must provide a name when creating a table!") end + local tbl_name = "TABLE_"..multi.randomString(8) + c.jobqueue:doToAll(function(tbl_name, interaction) + _G[interaction] = THREAD.waitFor(tbl_name):init() + end, tbl_name, name) + return multi:newSystemThreadedTable(tbl_name):init() end function c:getHandler() @@ -348,26 +384,6 @@ function multi:newSystemThreadedProcessor(cores) return self.Name end - function c:newThread(name, func, ...) - proxy = self.spawnThread(name, func, ...):init() - proxy.__proc = self - table.insert(self.threads, proxy) - return proxy - end - - function c:newFunction(func, holdme) - return c.jobqueue:newFunction(func, holdme) - end - - function c:newSharedTable(name) - if not name then multi.error("You must provide a name when creating a table!") end - local tbl_name = "TABLE_"..multi.randomString(8) - c.jobqueue:doToAll(function(tbl_name, interaction) - _G[interaction] = THREAD.waitFor(tbl_name):init() - end, tbl_name, name) - return multi:newSystemThreadedTable(tbl_name):init() - end - function c.run() return self end @@ -428,7 +444,7 @@ function thread.hold(n, opt) local args local id = n.getThreadID() local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name) + local func = multi:newTargetedFunction(id, references[n.Parent], "conn_"..multi.randomString(8), function(_name) local multi, thread = require("multi"):init() local obj = _G[_name] local rets = {thread.hold(obj)} @@ -437,7 +453,7 @@ function thread.hold(n, opt) rets[i] = {_self_ref_ = "parent"} end end - return unpack(rets) + return multi.unpack(rets) end) local conn @@ -460,7 +476,7 @@ function thread.hold(n, opt) end end - return unpack(ret) + return multi.unpack(ret) else return thread_ref(n, opt) end