diff --git a/docs/changes.md b/docs/changes.md index 82e882a..3b49ce0 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -237,6 +237,7 @@ Added Changed --- +- changed how multi adds unpack to the global namespace. Instead we capture that value into multi.unpack. - multi:newUpdater(skip, func) -- Now accepts func as the second argument. So you don't need to call OnUpdate(func) after creation. - multi errors now internally call `multi.error` instead of `multi.print` - Actors Act() method now returns true when the main event is fired. Steps/Loops always return true. Nil is returned otherwise. @@ -260,6 +261,8 @@ Changed Removed --- +- multi.CONNECTOR_LINK -- No longer used +- multi:newConnector() -- No longer used - THREAD.getName() use THREAD_NAME instead - THREAD.getID() use THREAD_ID instead - conn:SetHelper(func) -- With the removal of old Connect this function is no longer needed diff --git a/init.lua b/init.lua index 10202d9..14c538c 100644 --- a/init.lua +++ b/init.lua @@ -1,7 +1,7 @@ --[[ MIT License -Copyright (c) 2022 Ryan Ward +Copyright (c) 2023 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -32,6 +32,10 @@ local find_optimization = false local threadManager local __CurrentConnectionThread +multi.unpack = table.unpack or unpack + +if table.unpack then unpack = table.unpack end + -- Types multi.DestroyedObj = { @@ -1171,7 +1175,7 @@ function multi.hold(func,opt) multi:uManager() end proc:Resume() - return unpack(rets) + return multi.unpack(rets) end end @@ -1224,7 +1228,7 @@ function thread._Requests() if t then thread.requests[running()] = nil local cmd,args = t[1],t[2] - thread[cmd](unpack(args)) + thread[cmd](multi.unpack(args)) end end @@ -1244,7 +1248,7 @@ local function conn_test(conn) conn(func) return function() if ready then - return unpack(args) or multi.NIL + return multi.unpack(args) or multi.NIL end end end @@ -1343,7 +1347,7 @@ local function cleanReturns(...) break end end - return unpack(returns,1,ind) + return multi.unpack(returns,1,ind) end function thread.pushStatus(...) @@ -1351,8 +1355,6 @@ function thread.pushStatus(...) t.statusconnector:Fire(...) end -local handler - function thread:newFunctionBase(generator, holdme) return function() local tfunc = {} @@ -1381,7 +1383,7 @@ function thread:newFunctionBase(generator, holdme) end) else while not rets and not err do - handler() + multi:getCurrentProcess():getHandler()() end if err then return nil,err @@ -1415,7 +1417,7 @@ function thread:newFunctionBase(generator, holdme) isTFunc = true, wait = wait, getReturns = function() - return unpack(rets) + return multi.unpack(rets) end, connect = function(f) local tempConn = multi:newConnection(true) @@ -1785,7 +1787,7 @@ function multi:createHandler() for start = #startme, 1, -1 do temp_start = startme[start] table.remove(startme) - _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, unpack(temp_start.startArgs)) + _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) table.insert(threads, temp_start) yield() @@ -2095,10 +2097,6 @@ function table.merge(t1, t2) return t1 end -if table.unpack and not unpack then - unpack=table.unpack -end - math.randomseed(os.time()) function multi:enableLoadDetection() @@ -2308,7 +2306,7 @@ function multi.timer(func,...) args={func(...)} local t = timer:Get() timer = nil - return t,unpack(args) + return t,multi.unpack(args) end if os.getOS()=="windows" then @@ -2380,4 +2378,8 @@ end threadManager = multi:newProcessor("Global_Thread_Manager").Start() +function multi:getHandler() + return threadManager:getHandler() +end + return multi \ No newline at end of file diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index fd687d2..cd0a73a 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -116,6 +116,7 @@ function multi:newSystemThreadedJobQueue(n) nFunc = nFunc + 1 c:registerFunction(name,func) return thread:newFunction(function(...) + print("Called!") local id = c:pushJob(name,...) local link local rets @@ -126,10 +127,10 @@ function multi:newSystemThreadedJobQueue(n) end) return thread.hold(function() if rets then - return unpack(rets) or multi.NIL + return multi.unpack(rets) or multi.NIL end end) - end,holup),name + end,holup), name end thread:newThread("JobQueueManager",function() while true do @@ -137,7 +138,7 @@ function multi:newSystemThreadedJobQueue(n) return queueReturn:pop() end) local id = table.remove(job,1) - c.OnJobCompleted:Fire(id,unpack(job)) + c.OnJobCompleted:Fire(id,multi.unpack(job)) end end) for i=1,c.cores do @@ -157,10 +158,10 @@ function multi:newSystemThreadedJobQueue(n) local name = table.remove(dat, 1) local jid = table.remove(dat, 1) local args = table.remove(dat, 1) - queueReturn:push{jid, funcs[name](unpack(args)), queue} + queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} end) end - end).OnError(multi.error) + end).OnError(print) thread:newThread("DoAllHandler",function() while true do local dat = thread.hold(function() @@ -263,8 +264,8 @@ function multi:newSystemThreadedConnection(name) end) c.links[#c.links+1] = item[2] elseif item[1] == c.TRIG then - fire(unpack(item[2])) - c.proxy_conn:Fire(unpack(item[2])) + fire(multi.unpack(item[2])) + c.proxy_conn:Fire(multi.unpack(item[2])) end end end) @@ -312,7 +313,7 @@ function multi:newSystemThreadedConnection(name) end link_self_ref:pop() elseif item[1] == self.TRIG then - self.proxy_conn:Fire(unpack(item[2])) + self.proxy_conn:Fire(multi.unpack(item[2])) link_self_ref:pop() else -- This shouldn't be the case diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index 3cb85a4..ebfb569 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -131,13 +131,12 @@ function multi.InitSystemThreadErrorHandler() while true do thread.yield() _,data = __ConsoleLinda:receive(0, "Q") - if data then print(unpack(data)) end for i = #threads, 1, -1 do temp = threads[i] status = temp.thread.status push = __StatusLinda:get(temp.Id) if push then - temp.statusconnector:Fire(unpack(({__StatusLinda:receive(nil, temp.Id)})[2])) + temp.statusconnector:Fire(multi.unpack(({__StatusLinda:receive(nil, temp.Id)})[2])) end if status == "done" or temp.returns:get("returns") then returns = ({temp.returns:receive(0, "returns")})[2] @@ -147,7 +146,7 @@ function multi.InitSystemThreadErrorHandler() temp.OnError:Fire(temp, returns[2]) else table.remove(returns,1) - temp.OnDeath:Fire(unpack(returns)) + temp.OnDeath:Fire(multi.unpack(returns)) end GLOBAL["__THREADS__"] = livingThreads table.remove(threads, i) diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index dfa9688..10e8e17 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -134,7 +134,7 @@ function multi:newSystemThreadedJobQueue(n) end) return thread.hold(function() if rets then - return unpack(rets) or multi.NIL + return multi.unpack(rets) or multi.NIL end end) end,holup),name @@ -144,7 +144,7 @@ function multi:newSystemThreadedJobQueue(n) thread.yield() local dat = c.queueReturn:pop() if dat then - c.OnJobCompleted:Fire(unpack(dat)) + c.OnJobCompleted:Fire(multi.unpack(dat)) end end end) @@ -188,7 +188,7 @@ function multi:newSystemThreadedJobQueue(n) lastProc = os.clock() local name = table.remove(dat,1) local id = table.remove(dat,1) - local tab = {funcs[name](unpack(dat))} + local tab = {funcs[name](multi.unpack(dat))} table.insert(tab,1,id) queueReturn:push(tab) end) @@ -264,7 +264,7 @@ function multi:newSystemThreadedConnection(name) end link_self_ref:pop() elseif item[1] == self.TRIG then - self.proxy_conn:Fire(unpack(item[2])) + self.proxy_conn:Fire(multi.unpack(item[2])) link_self_ref:pop() else -- This shouldn't be the case @@ -341,8 +341,8 @@ function multi:newSystemThreadedConnection(name) c.links[#c.links+1] = item[2] elseif item[1] == c.TRIG then - fire(unpack(item[2])) - c.proxy_conn:Fire(unpack(item[2])) + fire(multi.unpack(item[2])) + c.proxy_conn:Fire(multi.unpack(item[2])) end end end).OnError(print) diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index 2b5245c..2bb596b 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -54,7 +54,7 @@ multi.integration.GLOBAL = GLOBAL multi.integration.THREAD = THREAD pcall(require,"multi.integration.loveManager.extensions") pcall(require,"multi.integration.sharedExtensions") -stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))} +stab["returns"] = {THREAD.loadDump(__FUNC__)(multi.unpack(__IMPORTS))} ]] local multi, thread = require("multi"):init() @@ -86,7 +86,7 @@ function multi:newSystemThread(name, func, ...) thread.hold(function() -- While the thread is running we might as well do something in the loop if status_channel:peek() ~= nil then - c.statusconnector:Fire(unpack(status_channel:pop())) + c.statusconnector:Fire(multi.unpack(status_channel:pop())) end return not c.thread:isRunning() end) @@ -100,7 +100,7 @@ function multi:newSystemThread(name, func, ...) elseif thread_err then c.OnError:Fire(c, thread_err) elseif c.stab.returns then - c.OnDeath:Fire(unpack(c.stab.returns)) + c.OnDeath:Fire(multi.unpack(c.stab.returns)) c.stab.returns = nil end end) diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index 25008fe..12f5471 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -223,7 +223,7 @@ if not ISTHREAD then multi:newLoop(function(loop) dat = queue:pop() if dat then - print(unpack(dat)) + print(multi.unpack(dat)) end end) end diff --git a/integration/lovrManager/extensions.lua b/integration/lovrManager/extensions.lua index 7032b1d..232183a 100644 --- a/integration/lovrManager/extensions.lua +++ b/integration/lovrManager/extensions.lua @@ -124,7 +124,7 @@ function multi:newSystemThreadedJobQueue(n) end) return thread.hold(function() if rets then - return unpack(rets) or multi.NIL + return multi.unpack(rets) or multi.NIL end end) end,holup),name @@ -134,7 +134,7 @@ function multi:newSystemThreadedJobQueue(n) thread.yield() local dat = c.queueReturn:pop() if dat then - c.OnJobCompleted:Fire(unpack(dat)) + c.OnJobCompleted:Fire(multi.unpack(dat)) end end end) @@ -177,7 +177,7 @@ function multi:newSystemThreadedJobQueue(n) lastProc = os.clock() local name = table.remove(dat,1) local id = table.remove(dat,1) - local tab = {funcs[name](unpack(dat))} + local tab = {funcs[name](multi.unpack(dat))} table.insert(tab,1,id) queueReturn:push(tab) end diff --git a/integration/lovrManager/init.lua b/integration/lovrManager/init.lua index 53cafa3..d09b2d9 100644 --- a/integration/lovrManager/init.lua +++ b/integration/lovrManager/init.lua @@ -36,7 +36,7 @@ __THREADNAME__=table.remove(__IMPORTS,1) stab = THREAD.createStaticTable(__THREADNAME__) GLOBAL = THREAD.getGlobal() multi, thread = require("multi").init() -stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))} +stab["returns"] = {THREAD.loadDump(__FUNC__)(multi.unpack(__IMPORTS))} ]] local multi, thread = require("multi.compat.lovr2d"):init() local THREAD = {} @@ -58,7 +58,7 @@ function THREAD:newFunction(func,holup) if t.stab["returns"] then local dat = t.stab.returns t.stab.returns = nil - return unpack(dat) + return multi.unpack(dat) end end) end,holup)() diff --git a/integration/lovrManager/threads.lua b/integration/lovrManager/threads.lua index 6a95a1e..12429c4 100644 --- a/integration/lovrManager/threads.lua +++ b/integration/lovrManager/threads.lua @@ -174,7 +174,7 @@ if not ISTHREAD then dat = queue:pop() if dat then lastproc = clock() - print(unpack(dat)) + print(multi.unpack(dat)) end if clock()-lastproc>2 then thread.sleep(.1) diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index 5227573..bfdd982 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -126,7 +126,7 @@ function multi:newSystemThreadedJobQueue(n) end) return thread.hold(function() if rets then - return unpack(rets) or multi.NIL + return multi.unpack(rets) or multi.NIL end end) end, holup), name @@ -137,7 +137,7 @@ function multi:newSystemThreadedJobQueue(n) thread.yield() if #jobs>0 then local j = table.remove(jobs,1) - c.OnJobCompleted:Fire(j[2],funcs[j[1]](unpack(j[3]))) + c.OnJobCompleted:Fire(j[2],funcs[j[1]](multi.unpack(j[3]))) else thread.sleep(.05) end diff --git a/integration/pseudoManager/init.lua b/integration/pseudoManager/init.lua index 6c8d187..7e41344 100644 --- a/integration/pseudoManager/init.lua +++ b/integration/pseudoManager/init.lua @@ -50,7 +50,7 @@ local function split(str) return tab end -local tab = [[_VERSION,io,os,require,load,debug,assert,collectgarbage,error,getfenv,getmetatable,ipairs,loadstring,module,next,pairs,pcall,print,rawequal,rawget,rawset,select,setfenv,setmetatable,tonumber,tostring,type,unpack,xpcall,math,coroutine,string,table]] +local tab = [[_VERSION,io,os,require,load,debug,assert,collectgarbage,error,getfenv,getmetatable,ipairs,loadstring,module,next,pairs,pcall,print,rawequal,rawget,rawset,select,setfenv,setmetatable,tonumber,tostring,type,xpcall,math,coroutine,string,table]] tab = split(tab) local id = 0 diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index cbb81a5..368cd39 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -1,34 +1,77 @@ -local multi, thread = require("multi"):init() +--[[ +MIT License +Copyright (c) 2023 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] + +local multi, thread = require("multi"):init() -- Returns a handler that allows a user to interact with an object on another thread! -- Create on the thread that you want to interact with, send over the handle function multi:newProxy(obj) - local c = { - __index = function() - -- - end, - __newindex = function() - -- - end, - __call = function() - -- - end - } - c.name = multi.randomString(12) function c:init() if not multi.isMainThread then - c.send = multi:newSystemThreadedQueue(self.name.."_S"):init() - c.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() - c.ref = obj + local multi, thread = require("multi"):init() + local function check() + return self.send:pop() + end + self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() + self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() + self.ref = obj + self.funcs = {} + for i, v in pairs(obj) do + if type(v) == "function" then + self.funcs[#self.funcs] = i + end + end + thread:newThread(function() + while true do + local data = thread.hold(check) + local func = table.remove(data, 1) + local ret = {self.ref[func](multi.unpack(data))} + table.insert(ret, 1, func) + self.recv:push(ret) + end + end) else GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD - c.send = THREAD.waitFor(self.name.."_S") - c.recv = THREAD.waitFor(self.name.."_R") + self.send = THREAD.waitFor(self.name.."_S") + self.recv = THREAD.waitFor(self.name.."_R") + for _,v in pairs(self.funcs) do + self[v] = thread:newFunction(function(...) + self.send:push({v, ...}) + return thread.hold(function() + local data = self.recv:peek() + if data[1] == v then + self.recv:pop() + thread.remove(data, 1) + return multi.unpack(data) + end + end) + end, true) + end end end @@ -54,26 +97,35 @@ function multi:newSystemThreadedProcessor(name, cores) c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) - c.jobqueue:registerFunction("__spawnThread__", function(name, func, ...) + local spawnThread = c.jobqueue:newFunction(function(name, func, ...) local multi, thread = require("multi"):init() - thread:newThread(name, func, ...) - return true - end) + print("hmm") + local proxy = multi:newProxy(thread:newThread(name, func, ...)) + multi:newTask(function() + proxy:init() + end) + return proxy + end, true) - c.jobqueue:registerFunction("__spawnTask__", function(obj, ...) + local spawnTask = c.jobqueue:newFunction(function(name, func, ...) local multi, thread = require("multi"):init() - multi[obj](multi, func) - return true - end) + local proxy = multi:newProxy(multi[obj](multi, func)) + multi:newTask(function() + proxy:init() + end) + return proxy + end, true) + + c.newLoop = thread:newFunction(function(self, func, notime) + return spawnTask("newLoop", func, notime):init() + end, true) + + c.newUpdater = thread:newFunction(function(self, skip, func) + return spawnTask("newUpdater", func, notime):init() + end, true) c.OnObjectCreated(function(proc, obj) - if obj.Type == multi.UPDATER then - local func = obj.OnUpdate:Remove()[1] - c.jobqueue:pushJob("__spawnTask__", "newUpdater", func) - elseif obj.Type == multi.LOOP then - local func = obj.OnLoop:Remove()[1] - c.jobqueue:pushJob("__spawnTask__", "newLoop", func) - else + if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then return multi.error("Invalid type!") end end) @@ -94,9 +146,9 @@ function multi:newSystemThreadedProcessor(name, cores) return self.Name end - function c:newThread(name, func, ...) - c.jobqueue:pushJob("__spawnThread__", name, func, ...) - end + c.newThread = thread:newFunction(function(self, name, func, ...) + return spawnThread(name, func, ...):init() + end, true) function c:newFunction(func, holdme) return c.jobqueue:newFunction(func, holdme)