From a660b63581bac987765ea13b2adf92a97b5aad58 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 6 Aug 2023 00:38:22 -0400 Subject: [PATCH] Closer to getting things working... --- docs/changes.md | 7 +- init.lua | 6 +- integration/loveManager/extensions.lua | 2 + integration/loveManager/threads.lua | 4 +- integration/loveManagerold/threads.lua | 1 - integration/lovrManager/init.lua | 4 +- integration/pseudoManager/extensions.lua | 186 ++++++++++++++++------- integration/sharedExtensions/init.lua | 4 +- tests/threadtests.lua | 12 +- 9 files changed, 147 insertions(+), 79 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index e2c9f88..91ef5c8 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -62,17 +62,12 @@ Table of contents Full Update Showcase --- + ```lua multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() ``` -## Added New Integration: **effilManager** - -Another option for multithreading support, works just like all the other threading integrations, but uses the internals of effil and it's unique features. -- Refer to this [doc](https://www.lua.org/wshop18/Kupriyanov.pdf) to read more about it. -- Project github [page](https://github.com/effil/effil/tree/master). - ```lua package.path = "?/init.lua;?.lua;"..package.path diff --git a/init.lua b/init.lua index 3d713e8..5392cda 100644 --- a/init.lua +++ b/init.lua @@ -482,7 +482,7 @@ function multi:newConnection(protect,func,kill) return temp end - function c:Hold(self) + function c:Hold() return multi.hold(self) end @@ -1276,7 +1276,6 @@ function thread.hold(n, opt) return yield(CMD, t_skip, opt.skip or 1, nil, interval) end end - if type(n) == "number" then thread.getRunningThread().lastSleep = clock() return yield(CMD, t_sleep, n or 0, nil, interval) @@ -2392,7 +2391,8 @@ function multi.error(self, err) io.write("\x1b[91mERROR:\x1b[0m " .. err .. " ?\n") end if multi.defaultSettings.error then - error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n") + error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. + ((coroutine.running()) and debug.traceback((coroutine.running())) or debug.traceback()) .. "\n") os.exit(1) end end diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 163ccf6..358cf6e 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -70,6 +70,8 @@ function multi:newSystemThreadedTable(name) return self end + c.__init = c.init + function c:Hold(opt) if opt.key then return thread.hold(function() diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index 83ec3da..d3cec2f 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -92,8 +92,8 @@ function INIT() repeat wait() until GLOBAL[name] ~= nil - if type(GLOBAL[name].init) == "function" then - return GLOBAL[name]:init() + if type(GLOBAL[name].__init) == "function" then + return GLOBAL[name]:__init() else return GLOBAL[name] end diff --git a/integration/loveManagerold/threads.lua b/integration/loveManagerold/threads.lua index 6a2f25a..7749c98 100644 --- a/integration/loveManagerold/threads.lua +++ b/integration/loveManagerold/threads.lua @@ -48,7 +48,6 @@ end local fRef = {"func",nil} local function manage(channel, value) channel:clear() - print("pushing",value) if type(value) == "table" then channel:push{"DATA",threads.packTable(value)} else diff --git a/integration/lovrManager/init.lua b/integration/lovrManager/init.lua index bb86a8a..3d6943d 100644 --- a/integration/lovrManager/init.lua +++ b/integration/lovrManager/init.lua @@ -87,12 +87,12 @@ function multi:newSystemThread(name,func,...) end THREAD.newSystemThread = multi.newSystemThread function lovr.threaderror(thread, errorstr) - print("Thread error!\n"..errorstr) + multi.print("Thread error!\n"..errorstr) end multi.integration.GLOBAL = GLOBAL multi.integration.THREAD = THREAD require("multi.integration.lovrManager.extensions") -print("Integrated lovr Threading!") +multi.print("Integrated lovr Threading!") return {init=function() return GLOBAL,THREAD end} \ No newline at end of file diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index b6a4884..03550b6 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -47,6 +47,17 @@ function multi:newSystemThreadedQueue(name) function c:init() return self end + function c:Hold(opt) + if opt.peek then + return thread.hold(function() + return self:peek() + end) + else + return thread.hold(function() + return self:pop() + end) + end + end GLOBAL[name or "_"] = c return c end @@ -56,58 +67,59 @@ function multi:newSystemThreadedTable(name) function c:init() return self end + function c:Hold(opt) + if opt.key then + return thread.hold(function() + return self.tab[opt.key] + end) + else + multi.error("Must provide a key to check opt.key = 'key'") + end + end GLOBAL[name or "_"] = c return c end -local setfenv = setfenv -if not setfenv then - if not debug then - multi.print("Unable to implement setfenv in lua 5.2+ the debug module is not available!") - else - setfenv = function(f, env) - return load(string.dump(f), nil, nil, env) - end - end -end +local setfenv = multi.isolateFunction +local jqc = 1 function multi:newSystemThreadedJobQueue(n) - local c = {} - c.cores = n or THREAD.getCores()*2 - c.OnJobCompleted = multi:newConnection() - local jobs = {} - local ID=1 - local jid = 1 - local env = {} + local c = {} - setmetatable(env,{ - __index = _G - }) + c.cores = n or THREAD.getCores() + c.registerQueue = {} + c.Type = multi.SJOBQUEUE + c.funcs = multi:newSystemThreadedTable("__JobQueue_"..jqc.."_table") + c.queue = multi:newSystemThreadedQueue("__JobQueue_"..jqc.."_queue") + c.queueReturn = multi:newSystemThreadedQueue("__JobQueue_"..jqc.."_queueReturn") + c.queueAll = multi:newSystemThreadedQueue("__JobQueue_"..jqc.."_queueAll") + c.id = 0 + c.OnJobCompleted = multi:newConnection() - local funcs = {} - function c:doToAll(func) - setfenv(func,env)() - return self + local allfunc = 0 + + function c:doToAll(func) + for i = 1, self.cores do + self.queueAll:push({allfunc, func}) + end + allfunc = allfunc + 1 + end + function c:registerFunction(name, func) + if self.funcs[name] then + multi.error("A function by the name "..name.." has already been registered!") + end + self.funcs[name] = func + end + function c:pushJob(name,...) + self.id = self.id + 1 + self.queue:push{name,self.id,...} + return self.id + end + function c:isEmpty() + return queueJob:peek()==nil end - - function c:registerFunction(name,func) - funcs[name] = setfenv(func,env) - return self - end - - function c:pushJob(name,...) - table.insert(jobs,{name,jid,multi.pack(...)}) - jid = jid + 1 - return jid-1 - end - - function c:isEmpty() - return #jobs == 0 - end - - local nFunc = 0 + local nFunc = 0 function c:newFunction(name,func,holup) -- This registers with the queue - local func = stripUpValues(func) if type(name)=="function" then holup = func func = name @@ -129,22 +141,84 @@ function multi:newSystemThreadedJobQueue(n) return multi.unpack(rets) or multi.NIL end end) - end, holup), name + end,holup),name end - for i=1,c.cores do - thread:newThread("PesudoThreadedJobQueue_"..i,function() - while true do - thread.yield() - if #jobs>0 then - local j = table.remove(jobs,1) - c.OnJobCompleted:Fire(j[2],funcs[j[1]](multi.unpack(j[3]))) - else - thread.sleep(.05) - end - end - end).OnError(multi.error) + thread:newThread("jobManager",function() + while true do + thread.yield() + local dat = c.queueReturn:pop() + if dat then + c.OnJobCompleted:Fire(multi.unpack(dat)) + end + end + end) + for i=1,c.cores do + multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) + local multi, thread = require("multi"):init() + local clock = os.clock + local funcs = THREAD.waitFor("__JobQueue_"..jqc.."_table") + local queue = THREAD.waitFor("__JobQueue_"..jqc.."_queue") + local queueReturn = THREAD.waitFor("__JobQueue_"..jqc.."_queueReturn") + local lastProc = clock() + local queueAll = THREAD.waitFor("__JobQueue_"..jqc.."_queueAll") + local registry = {} + _G["__QR"] = queueReturn + setmetatable(_G,{__index = funcs}) + thread:newThread("startUp",function() + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + queueAll:pop()[2]() + end + end + end) + thread:newThread("runner",function() + thread.sleep(.1) + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + queueAll:pop()[2]() + end + local dat = thread.hold(queue) + if dat then + multi:newThread("Test",function() + lastProc = os.clock() + local name = table.remove(dat,1) + local id = table.remove(dat,1) + local tab = {multi.isolateFunction(funcs[name],_G)(multi.unpack(dat))} + table.insert(tab,1,id) + queueReturn:push(tab) + end) + end + end + end).OnError(multi.error) + thread:newThread("Idler",function() + while true do + thread.yield() + if clock()-lastProc> 2 then + THREAD.sleep(.05) + else + THREAD.sleep(.001) + end + end + end) + multi:mainloop() + end,jqc) + end + + function c:Hold(opt) + return thread.hold(self.OnJobCompleted) end - return c + + jqc = jqc + 1 + + self:create(c) + + return c end function multi:newSystemThreadedConnection(name) diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 9b29a83..c916b0f 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -189,7 +189,6 @@ function multi:newProxy(list) THREAD = multi.integration.THREAD end local proxy = THREAD.waitFor(self.proxy_link) - print("Got:",proxy) proxy.funcs = self.funcs return proxy:init() end @@ -267,7 +266,8 @@ function multi:newSystemThreadedProcessor(cores) for _, method in pairs(implement) do c[method] = function(self, ...) - proxy = self.spawnTask(method, ...):init() + proxy = self.spawnTask(method, ...) + proxy:init() references[proxy] = self return proxy end diff --git a/tests/threadtests.lua b/tests/threadtests.lua index 269b671..d2bb646 100644 --- a/tests/threadtests.lua +++ b/tests/threadtests.lua @@ -77,7 +77,6 @@ multi:newThread("Scheduler Thread",function() end, true) -- Hold this a, b, c, d = func(3,2,1) - print(a, b, c, d) assert(a == 1, "First return was not '1'!") assert(b == 2, "Second return was not '2'!") assert(c == 3, "Third return was not '3'!") @@ -115,8 +114,11 @@ multi:newThread("Scheduler Thread",function() local ready = false jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads - func = jq:newFunction("test-thread",function(a,b) + func2 = jq:newFunction("sleep",function(a,b) THREAD.sleep(.2) + end) + func = jq:newFunction("test-thread",function(a,b) + sleep() return a+b end) local count = 0 @@ -194,9 +196,6 @@ multi:newThread("Scheduler Thread",function() multi:newSystemThread("Testing proxy copy THREAD",function(tloop) local multi, thread = require("multi"):init() - for i,v in pairs(tloop.funcs) do - print(i,v) - end tloop = tloop:init() multi.print("tloop type:",tloop.Type) multi.print("Testing proxies on other threads") @@ -223,7 +222,6 @@ multi:newThread("Scheduler Thread",function() multi.print("Held on proxy connection... twice") proxy_test = true end).OnError(multi.error) - thread:newThread(function() while true do @@ -256,7 +254,7 @@ end).OnError(multi.error) multi.OnExit(function(err_or_errorcode) print("Error Code: ", err_or_errorcode) if not we_good then - multi.info("There was an error running some tests!") + multi.print("There was an error running some tests!") return else multi.success("Tests complete!")