From ab9e949b682da0eb3350db4999db2e7513a3c092 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 21 May 2023 09:43:44 -0400 Subject: [PATCH] newProxy implemented --- init.lua | 35 +++++--- integration/lanesManager/extensions.lua | 19 +++-- integration/sharedExtensions/init.lua | 106 +++++++++++++++--------- 3 files changed, 100 insertions(+), 60 deletions(-) diff --git a/init.lua b/init.lua index 14c538c..26fbed6 100644 --- a/init.lua +++ b/init.lua @@ -568,14 +568,8 @@ function multi:Pause() multi.print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()") else self.Active=false - local loop = self.Parent.Mainloop - for i=1,#loop do - if loop[i] == self then - multi.PausedObjects[self] = true - table.remove(loop,i) - break - end - end + self._Act = self.Act + self.Act = empty_func end return self end @@ -589,8 +583,7 @@ function multi:Resume() end else if self.Active==false then - table.insert(self.Parent.Mainloop,self) - multi.PausedObjects[self] = nil + self.Act = self._Act self.Active=true end end @@ -668,6 +661,17 @@ function multi:newBase(ins) c.Act=function() end c.Parent=self c.creationTime = clock() + + function c:Pause() + c.Parent.Pause(self) + return self + end + + function c:Resume() + c.Parent.Resume(self) + return self + end + if ins then table.insert(self.Mainloop,ins,c) else @@ -817,6 +821,7 @@ function multi:newLoop(func, notime) return true end end + c.OnLoop = self:newConnection():fastMode() if func then @@ -1232,6 +1237,10 @@ function thread._Requests() end end +function thread.exec(func) + func() +end + function thread.sleep(n) thread._Requests() thread.getRunningThread().lastSleep = clock() @@ -1755,8 +1764,8 @@ co_status = { end r1=nil r2=nil r3=nil r4=nil r5=nil end, - ["normal"] = function(thd,ref) end, - ["running"] = function(thd,ref) end, + ["normal"] = function(thd,ref) end, + ["running"] = function(thd,ref) end, ["dead"] = function(thd,ref,task,i,th) if ref.__processed then return end if _ then @@ -1929,7 +1938,7 @@ function multi:mainloopRef() for _D=#Loop,1,-1 do __CurrentTask = Loop[_D] ctask = __CurrentTask - ctask:Act() + if ctask then ctask:Act() end __CurrentProcess = self end end diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index cd0a73a..af9221b 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -107,7 +107,7 @@ function multi:newSystemThreadedJobQueue(n) return jid-1 end local nFunc = 0 - function c:newFunction(name,func,holup) -- This registers with the queue + function c:newFunction(name, func, holup) -- This registers with the queue if type(name)=="function" then holup = func func = name @@ -116,7 +116,6 @@ function multi:newSystemThreadedJobQueue(n) nFunc = nFunc + 1 c:registerFunction(name,func) return thread:newFunction(function(...) - print("Called!") local id = c:pushJob(name,...) local link local rets @@ -130,19 +129,21 @@ function multi:newSystemThreadedJobQueue(n) return multi.unpack(rets) or multi.NIL end end) - end,holup), name + end, holup), name end thread:newThread("JobQueueManager",function() while true do local job = thread.hold(function() return queueReturn:pop() end) - local id = table.remove(job,1) - c.OnJobCompleted:Fire(id,multi.unpack(job)) + if job then + local id = table.remove(job,1) + c.OnJobCompleted:Fire(id,multi.unpack(job)) + end end end) for i=1,c.cores do - multi:newSystemThread("SystemThreadedJobQueue",function(queue) + multi:newSystemThread("SystemThreadedJobQueue_"..multi.randomString(4),function(queue) local multi, thread = require("multi"):init() local idle = os.clock() local clock = os.clock @@ -176,7 +177,7 @@ function multi:newSystemThreadedJobQueue(n) end end end - end) + end).OnError(print) thread:newThread("IdleHandler",function() while true do thread.hold(function() @@ -184,9 +185,9 @@ function multi:newSystemThreadedJobQueue(n) end) THREAD.sleep(.01) end - end) + end).OnError(print) multi:mainloop() - end,i).priority = thread.Priority_Core + end,i).OnError(print) end return c end diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 368cd39..30f6e37 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -26,52 +26,84 @@ local multi, thread = require("multi"):init() -- Returns a handler that allows a user to interact with an object on another thread! -- Create on the thread that you want to interact with, send over the handle -function multi:newProxy(obj) + +function multi:chop(obj) + local multi, thread = require("multi"):init() + list = {[0] = multi.randomString(12)} + _G[list[0]] = obj + for i,v in pairs(obj) do + if type(v) == "function" then + list[#list+1] = i + end + end + return list +end + +function multi:newProxy(list) + local c = {} + c.name = multi.randomString(12) function c:init() - if not multi.isMainThread then + local multi, thread = nil, nil + if THREAD_NAME then local multi, thread = require("multi"):init() local function check() return self.send:pop() end self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() - self.ref = obj - self.funcs = {} - for i, v in pairs(obj) do - if type(v) == "function" then - self.funcs[#self.funcs] = i - end - end + self.funcs = list thread:newThread(function() while true do local data = thread.hold(check) local func = table.remove(data, 1) - local ret = {self.ref[func](multi.unpack(data))} + local sref = table.remove(data, 1) + local ret + if sref then + ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} + else + ret = {_G[list[0]][func](multi.unpack(data))} + end + if ret[1] == _G[list[0]] then + -- We cannot return itself, that return can contain bad values. + ret[1] = {_self_ref_ = true} + end table.insert(ret, 1, func) self.recv:push(ret) end - end) + end).OnError(print) + return self else + local multi, thread = require("multi"):init() + local me = self GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD self.send = THREAD.waitFor(self.name.."_S") self.recv = THREAD.waitFor(self.name.."_R") for _,v in pairs(self.funcs) do - self[v] = thread:newFunction(function(...) - self.send:push({v, ...}) + self[v] = thread:newFunction(function(self,...) + if self == me then + me.send:push({v, true, ...}) + else + me.send:push({v, false, ...}) + end return thread.hold(function() - local data = self.recv:peek() - if data[1] == v then - self.recv:pop() - thread.remove(data, 1) + local data = me.recv:peek() + if data and data[1] == v then + me.recv:pop() + table.remove(data, 1) + if type(data[1]) == "table" and data[1]._self_ref_ then + -- So if we get a self return as a return, we should return the proxy! + data[1] = me + end return multi.unpack(data) end end) end, true) end + return self end end @@ -80,7 +112,7 @@ end function multi:newSystemThreadedProcessor(name, cores) - local name = name or "STP_"multi.randomString(4) -- set a random name if none was given. + local name = name or "STP_"..multi.randomString(4) -- set a random name if none was given. local autoscale = autoscale or false -- Will scale up the number of cores that the process uses. local c = {} @@ -97,32 +129,30 @@ function multi:newSystemThreadedProcessor(name, cores) c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - local spawnThread = c.jobqueue:newFunction(function(name, func, ...) + c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) local multi, thread = require("multi"):init() - print("hmm") - local proxy = multi:newProxy(thread:newThread(name, func, ...)) - multi:newTask(function() - proxy:init() - end) + local proxy = multi:newProxy(multi:chop(thread:newThread(name, func, ...))):init() return proxy end, true) - local spawnTask = c.jobqueue:newFunction(function(name, func, ...) + c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) local multi, thread = require("multi"):init() - local proxy = multi:newProxy(multi[obj](multi, func)) - multi:newTask(function() - proxy:init() - end) + local obj = multi[obj](multi, func, ...) + local proxy = multi:newProxy(multi:chop(obj)):init() return proxy end, true) - c.newLoop = thread:newFunction(function(self, func, notime) - return spawnTask("newLoop", func, notime):init() - end, true) + function c:newLoop(func, notime) + return self.spawnTask("newLoop", func, notime):init() + end - c.newUpdater = thread:newFunction(function(self, skip, func) - return spawnTask("newUpdater", func, notime):init() - end, true) + function c:newTLoop(func, time) + return self.spawnTask("newTLoop", func, time):init() + end + + function c:newUpdater(skip, func) + return self.spawnTask("newUpdater", func, notime):init() + end c.OnObjectCreated(function(proc, obj) if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then @@ -146,9 +176,9 @@ function multi:newSystemThreadedProcessor(name, cores) return self.Name end - c.newThread = thread:newFunction(function(self, name, func, ...) - return spawnThread(name, func, ...):init() - end, true) + function c:newThread(name, func, ...) + return self.spawnThread(name, func, ...):init() + end function c:newFunction(func, holdme) return c.jobqueue:newFunction(func, holdme)