From 5c03b342904334d348c2b67a876719f224724481 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 27 May 2023 00:10:21 -0400 Subject: [PATCH] thread.hold(proxy.conn) --- docs/changes.md | 29 +++- integration/lanesManager/extensions.lua | 43 +++-- integration/lanesManager/init.lua | 3 + integration/loveManager/extensions.lua | 1 + integration/loveManager/init.lua | 1 + integration/lovrManager/extensions.lua | 1 + integration/luvitManager.lua | 2 +- integration/pseudoManager/init.lua | 1 + integration/sharedExtensions/init.lua | 218 ++++++++++++++++++++---- 9 files changed, 249 insertions(+), 50 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index d8939f4..be715a8 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -74,6 +74,9 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds Added --- +- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn) +- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in +- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to - multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))` - multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread @@ -99,7 +102,7 @@ Added ``` Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread. - There are currently limitations to proxies. Connection proxy do not receive events on the non thread side. So connection metamethods do not work! Also you cannot use the proxy holds. For full features develop using a systemThreadedConnection() which does support all connection features. I planned on using STCs originally, but decided not to because I didn't want proxy objects to affect the non thread side of things! Subscribing to an event that isn't on the thread being proxied would cause the object to no longer be a proxy. + There are currently limitations to proxies. Connection proxy do not receive events on the non thread side. So connection metamethods do not work! thread.hold(proxy.conn) does work! The backend to get this to work was annoying :P This event is subscribed to on the proxy threads side of things! @@ -114,7 +117,28 @@ Added - STP:newThread(...) - STP:newFunction(...) - If you would like to connect to a "STP Connection" object you can do so in a STP Function using hold and connect to the function OnReturn event or have the function wait (When in a coroutine it will only pause execution for that coroutine(multi:newThread(...))). The function is still runs on the Thread that the STP is running on. There is no guarantee that the function will run on the same thread each time, unlike with the multi objects/cothreads. Those stay on the systhread they are created on. + ```lua + package.path = "?/init.lua;?.lua;"..package.path + + multi, thread = require("multi"):init({print=true}) + THREAD, GLOBAL = require("multi.integration.lanesManager"):init() + + stp = multi:newSystemThreadedProcessor() + + alarm = stp:newAlarm(3) + + alarm.OnRing:Connect(function(alarm) + print("Hmm...", THREAD_NAME) + end) + + thread:newThread(function() + print("Holding...") + local a = thread.hold(alarm.OnRing) -- it works :D + print("We work!") + end) + + multi:mainloop() + ``` - thread:newProcessor(name) -- works mostly like a normal process, but all objects are wrapped within a thread. So if you create a few loops, you can use thread.hold() call threaded functions and wait and use all features that using coroutines provide. - multi.Processors:getHandler() -- returns the thread handler for a process @@ -312,6 +336,7 @@ Removed Fixed --- +- multi.isMainThread was not properly handled in each integration. This has been resolved. - Issue with pseudo threading env's being messed up. Required removal of getName and getID! - connections being multiplied together would block the entire connection object from pushing events! This is not the desired effect I wanted. Now only the connection reference involved in the multiplication is locked! - multi:reallocate(processor, index) has been fixed to work with the current changes of the library. diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index af9221b..596f5f6 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -24,7 +24,7 @@ SOFTWARE. local multi, thread = require("multi"):init() if not (GLOBAL and THREAD) then - GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD + GLOBAL, THREAD = multi.integration.GLOBAL, multi.integration.THREAD else lanes = require("lanes") end @@ -34,19 +34,29 @@ function multi:newSystemThreadedQueue(name) local c = {} c.Name = name c.linda = lanes.linda() + function c:push(v) self.linda:send("Q", v) end + function c:pop() return ({self.linda:receive(0, "Q")})[2] end + function c:peek() return self.linda:get("Q") end + function c:init() return self end - GLOBAL[name or "_"] = c + + if multi.isMainThread then + multi.integration.GLOBAL[name] = c + else + GLOBAL[name] = c + end + return c end @@ -56,10 +66,6 @@ function multi:newSystemThreadedTable(name) c.link = lanes.linda() c.Name = name - -- function c:getIndex() - -- return c.link:dump() - -- end - function c:init() return self end @@ -73,7 +79,12 @@ function multi:newSystemThreadedTable(name) end }) - GLOBAL[name or "_"] = c + if multi.isMainThread then + multi.integration.GLOBAL[name] = c + else + GLOBAL[name] = c + end + return c end @@ -90,9 +101,9 @@ function multi:newSystemThreadedJobQueue(n) function c:isEmpty() return queueJob:peek()==nil end - function c:doToAll(func) + function c:doToAll(func,...) for i=1,c.cores do - doAll:push{ID,func} + doAll:push{ID,func,...} end ID = ID + 1 return self @@ -143,11 +154,12 @@ function multi:newSystemThreadedJobQueue(n) end end) for i=1,c.cores do - multi:newSystemThread("SystemThreadedJobQueue_"..multi.randomString(4),function(queue) + multi:newSystemThread("STJQ_"..multi.randomString(8),function(queue) local multi, thread = require("multi"):init() local idle = os.clock() local clock = os.clock local ref = 0 + _G["__QR"] = queueReturn setmetatable(_G,{__index = funcs}) thread:newThread("JobHandler",function() while true do @@ -170,9 +182,10 @@ function multi:newSystemThreadedJobQueue(n) end) if dat then if dat[1]>ref then + ref = table.remove(dat, 1) + func = table.remove(dat, 1) idle = clock() - ref = dat[1] - dat[2]() + func(unpack(dat)) doAll:pop() end end @@ -324,7 +337,11 @@ function multi:newSystemThreadedConnection(name) return self end - GLOBAL[name] = c + if multi.isMainThread then + multi.integration.GLOBAL[name] = c + else + GLOBAL[name] = c + end return c end \ No newline at end of file diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index ebfb569..e4df31a 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -131,6 +131,9 @@ function multi.InitSystemThreadErrorHandler() while true do thread.yield() _,data = __ConsoleLinda:receive(0, "Q") + if data then + print(data[1]) + end for i = #threads, 1, -1 do temp = threads[i] status = temp.thread.status diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 10e8e17..70bad7a 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -162,6 +162,7 @@ function multi:newSystemThreadedJobQueue(n) local lastProc = clock() local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") local registry = {} + _G["__QR"] = queueReturn setmetatable(_G,{__index = funcs}) thread:newThread("startUp",function() while true do diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index 2bb596b..5300a92 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -66,6 +66,7 @@ multi.integration = {} local THREAD = require("multi.integration.loveManager.threads") local GLOBAL = THREAD.getGlobal() local THREAD_ID = 1 +multi.isMainThread = true function multi:newSystemThread(name, func, ...) local c = {} diff --git a/integration/lovrManager/extensions.lua b/integration/lovrManager/extensions.lua index 232183a..a4f231a 100644 --- a/integration/lovrManager/extensions.lua +++ b/integration/lovrManager/extensions.lua @@ -152,6 +152,7 @@ function multi:newSystemThreadedJobQueue(n) local lastProc = clock() local queueAll = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueAll") local registry = {} + _G["__QR"] = queueReturn setmetatable(_G,{__index = funcs}) thread:newThread("startUp",function() while true do diff --git a/integration/luvitManager.lua b/integration/luvitManager.lua index 0f53cb9..c1e8dcb 100644 --- a/integration/luvitManager.lua +++ b/integration/luvitManager.lua @@ -35,7 +35,7 @@ local function _INIT(luvitThread, timer) end -- Step 1 get setup threads on luvit... Sigh how do i even... local multi, thread = require("multi").init() - isMainThread = true + multi.isMainThread = true function multi:canSystemThread() return true end diff --git a/integration/pseudoManager/init.lua b/integration/pseudoManager/init.lua index 7e41344..30879f3 100644 --- a/integration/pseudoManager/init.lua +++ b/integration/pseudoManager/init.lua @@ -31,6 +31,7 @@ if multi.integration then end } end +multi.isMainThread = true local activator = require("multi.integration.pseudoManager.threads") local GLOBAL, THREAD = activator.init(thread) diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 8bdda94..86f292c 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -35,16 +35,27 @@ function multi:chop(obj) if type(v) == "function" then table.insert(list, i) elseif type(v) == "table" and v.Type == multi.CONNECTOR then - table.insert(list, {i, multi:newProxy(multi:chop(v)):init()}) - -- local stc = "stc_"..list[0].."_"..i - -- list[-1][#list[-1] + 1] = {i, stc} - -- list[#list+1] = i - -- obj[stc] = multi:newSystemThreadedConnection(stc):init() - -- obj["_"..i.."_"] = function(...) - -- return obj[stc](...) - -- end + v.getThreadID = function() -- Special function we are adding + return THREAD_ID + end + + v.getUniqueName = function(self) + return self.__link_name + end + + local l = multi:chop(v) + v.__link_name = l[0] + v.__name = i + + table.insert(list, {i, multi:newProxy(l):init()}) end end + table.insert(list, "isConnection") + if obj.Type == multi.CONNECTOR then + obj.isConnection = function() return true end + else + obj.isConnection = function() return false end + end return list end @@ -97,15 +108,14 @@ function multi:newProxy(list) THREAD = multi.integration.THREAD self.send = THREAD.waitFor(self.name.."_S") self.recv = THREAD.waitFor(self.name.."_R") + self.Type = multi.PROXY for _,v in pairs(self.funcs) do if type(v) == "table" then - -- We got a connection v[2]:init() - - --setmetatable(v[2],getmetatable(multi:newConnection())) - self[v[1]] = v[2] + v[2].Parent = self else + lastObj = self self[v] = thread:newFunction(function(self,...) if self == me then me.send:push({v, true, ...}) @@ -119,7 +129,6 @@ function multi:newProxy(list) table.remove(data, 1) for i=1,#data do if type(data[i]) == "table" and data[i]._self_ref_ then - -- So if we get a self return as a return, we should return the proxy! data[i] = me end end @@ -136,9 +145,40 @@ function multi:newProxy(list) return c end -function multi:newSystemThreadedProcessor(name, cores) +multi.PROXY = "proxy" - local name = name or "STP_"..multi.randomString(4) -- set a random name if none was given. +local targets = {} + +local nFunc = 0 +function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue + if type(name)=="function" then + holup = func + func = name + name = "JQ_TFunc_"..nFunc + end + nFunc = nFunc + 1 + proc.jobqueue:registerFunction(name, func) + return thread:newFunction(function(...) + local id = proc:pushJob(ID, name, ...) + local link + local rets + link = proc.jobqueue.OnJobCompleted(function(jid,...) + if id==jid then + rets = {...} + end + end) + return thread.hold(function() + if rets then + return multi.unpack(rets) or multi.NIL + end + end) + end, holup), name +end + +local jid = -1 +function multi:newSystemThreadedProcessor(cores) + + local name = "STP_"..multi.randomString(4) -- set a random name if none was given. local autoscale = autoscale or false -- Will scale up the number of cores that the process uses. local c = {} @@ -154,54 +194,123 @@ function multi:newSystemThreadedProcessor(name, cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - + c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init() + + c.jobqueue:registerFunction("enable_targets",function(name) + local multi, thread = require("multi"):init() + local qname = THREAD_NAME .. "_t_queue" + local targetedQueue = THREAD.waitFor(name):init() + local tjq = multi:newSystemThreadedQueue(qname):init() + targetedQueue:push({tonumber(THREAD_ID), qname}) + multi:newThread("TargetedJobHandler", function() + local queueReturn = _G["__QR"] + while true do + local dat = thread.hold(function() + return tjq:pop() + end) + if dat then + thread:newThread("test",function() + local name = table.remove(dat, 1) + local jid = table.remove(dat, 1) + local args = table.remove(dat, 1) + queueReturn:push{jid, _G[name](multi.unpack(args)), queue} + end).OnError(multi.error) + end + end + end).OnError(multi.error) + end) + + function c:pushJob(ID, name, ...) + targets[ID]:push{name, jid, {...}} + jid = jid - 1 + return jid + 1 + end + + c.jobqueue:doToAll(function(name) + enable_targets(name) + end, name.."_target") + + local count = 0 + while count < c.cores do + local dat = c.targetedQueue:pop() + if dat then + targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init() + count = count + 1 + end + end + + c.jobqueue:registerFunction("packObj",function(obj) + local multi, thread = require("multi"):init() + obj.getThreadID = function() -- Special function we are adding + return THREAD_ID + end + + obj.getUniqueName = function(self) + return self.__link_name + end + + local list = multi:chop(obj) + obj.__link_name = list[0] + + local proxy = multi:newProxy(list):init() + + return proxy + end) + c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) local multi, thread = require("multi"):init() - local proxy = multi:newProxy(multi:chop(thread:newThread(name, func, ...))):init() - return proxy + local obj = thread:newThread(name, func, ...) + return packObj(obj) end, true) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) local multi, thread = require("multi"):init() local obj = multi[obj](multi, func, ...) - local proxy = multi:newProxy(multi:chop(obj)):init() - return proxy + return packObj(obj) end, true) function c:newLoop(func, notime) - return self.spawnTask("newLoop", func, notime):init() + proxy = self.spawnTask("newLoop", func, notime):init() + proxy.__proc = self + return proxy end function c:newTLoop(func, time) - return self.spawnTask("newTLoop", func, time):init() + proxy = self.spawnTask("newTLoop", func, time):init() + proxy.__proc = self + return proxy end function c:newUpdater(skip, func) - return self.spawnTask("newUpdater", func, notime):init() + proxy = self.spawnTask("newUpdater", func, notime):init() + proxy.__proc = self + return proxy end function c:newEvent(task, func) - return self.spawnTask("newEvent", task, func):init() + proxy = self.spawnTask("newEvent", task, func):init() + proxy.__proc = self + return proxy end function c:newAlarm(set, func) - return self.spawnTask("newAlarm", set, func):init() + proxy = self.spawnTask("newAlarm", set, func):init() + proxy.__proc = self + return proxy end function c:newStep(start, reset, count, skip) - return self.spawnTask("newStep", start, reset, count, skip):init() + proxy = self.spawnTask("newStep", start, reset, count, skip):init() + proxy.__proc = self + return proxy end function c:newTStep(start ,reset, count, set) - return self.spawnTask("newTStep", start, reset, count, set):init() + proxy = self.spawnTask("newTStep", start, reset, count, set):init() + proxy.__proc = self + return proxy end - c.OnObjectCreated(function(proc, obj) - if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then - return multi.error("Invalid type!") - end - end) - function c:getHandler() -- Not needed end @@ -219,7 +328,9 @@ function multi:newSystemThreadedProcessor(name, cores) end function c:newThread(name, func, ...) - return self.spawnThread(name, func, ...):init() + proxy = self.spawnThread(name, func, ...):init() + proxy.__proc = self + return proxy end function c:newFunction(func, holdme) @@ -249,3 +360,42 @@ function multi:newSystemThreadedProcessor(name, cores) return c end +-- Modify thread.hold to handle proxies +local thread_ref = thread.hold +function thread.hold(n, opt) + if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then + local ready = false + local args + local id = n.getThreadID() + local name = n:getUniqueName() + local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name) + local multi, thread = require("multi"):init() + local obj = _G[_name] + local rets = {thread.hold(obj)} + for i,v in pairs(rets) do + if v.Type then + rets[i] = {_self_ref_ = "parent"} + end + end + return unpack(rets) + end) + func(name).OnReturn(function(...) + ready = true + args = {...} + end) + local ret = {thread_ref(function() + if ready then + return multi.unpack(args) or multi.NIL + end + end, opt)} + for i,v in pairs(ret) do + if type(v) == "table" and v._self_ref_ == "parent" then + ret[i] = n.Parent + end + end + return unpack(ret) + else + return thread_ref(n, opt) + end +end +