From 61b5ea9d14b6510ae273126b2bdc94241a71318f Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Tue, 25 Apr 2023 00:13:47 -0400 Subject: [PATCH] Added test cases for threading, fixed issues. Todo test love2d --- docs/changes.md | 1 + integration/lanesManager/extensions.lua | 9 +- integration/lanesManager/init.lua | 21 ++-- integration/pseudoManager/extensions.lua | 17 +-- integration/pseudoManager/init.lua | 6 + integration/pseudoManager/threads.lua | 4 +- tests/threadtests.lua | 133 +++++++++++++++++++---- 7 files changed, 149 insertions(+), 42 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index f900ae9..98d1588 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -204,6 +204,7 @@ Removed Fixed --- +- Issue with lanes not handling errors properly. This is now resolved - Oversight with how pushStatus worked with nesting threaded functions, connections and forwarding events ```lua func = thread:newFunction(function() diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index e9461d2..42eee9f 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -114,7 +114,6 @@ function multi:newSystemThreadedJobQueue(n) link = c.OnJobCompleted(function(jid,...) if id==jid then rets = {...} - link:Destroy() end end) return thread.hold(function() @@ -278,7 +277,13 @@ function multi:newSystemThreadedConnection(name) self.links = {} self.proxy_conn = multi:newConnection() local mt = getmetatable(self.proxy_conn) - setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + local tempMT = {} + for i,v in pairs(mt) do + tempMT[i] = v + end + tempMT.__index = self.proxy_conn + tempMT.__call = function(t,func) self.proxy_conn(func) end + setmetatable(self, tempMT) if self.CID == THREAD.getID() then return self end thread:newThread("STC_CONN_MAN"..name,function() local item diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index e9178b9..398dc0c 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -95,10 +95,11 @@ function multi:newSystemThread(name, func, ...) globals = globe, priority = c.priority },function(...) - require("multi"):init(multi_settings) + multi, thread = require("multi"):init(multi_settings) require("multi.integration.lanesManager.extensions") local has_error = true - return_linda:set("returns",{func(...)}) + returns = {pcall(func, ...)} + return_linda:set("returns", returns) has_error = false end)(...) count = count + 1 @@ -138,9 +139,15 @@ function multi.InitSystemThreadErrorHandler() temp.statusconnector:Fire(unpack(({__StatusLinda:receive(nil, temp.Id)})[2])) end if status == "done" or temp.returns:get("returns") then + returns = ({temp.returns:receive(0, "returns")})[2] livingThreads[temp.Id] = {false, temp.Name} temp.alive = false - temp.OnDeath:Fire(unpack(({temp.returns:receive(0, "returns")})[2])) + if returns[1] == false then + temp.OnError:Fire(temp, returns[2]) + else + table.remove(returns,1) + temp.OnDeath:Fire(unpack(returns)) + end GLOBAL["__THREADS__"] = livingThreads table.remove(threads, i) elseif status == "running" then @@ -148,11 +155,7 @@ function multi.InitSystemThreadErrorHandler() elseif status == "waiting" then -- elseif status == "error" then - livingThreads[temp.Id] = {false, temp.Name} - temp.alive = false - temp.OnError:Fire(temp,unpack(temp.returns:receive(0,"returns") or {"Thread Killed!"})) - GLOBAL["__THREADS__"] = livingThreads - table.remove(threads, i) + -- The thread never really errors, we handle this through our linda object elseif status == "cancelled" then livingThreads[temp.Id] = {false, temp.Name} temp.alive = false @@ -168,7 +171,7 @@ function multi.InitSystemThreadErrorHandler() end end end - end) + end).OnError(print) end multi.print("Integrated Lanes Threading!") diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index f40ccc2..4ae0d72 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -50,7 +50,6 @@ function multi:newSystemThreadedQueue(name) GLOBAL[name or "_"] = c return c end - function multi:newSystemThreadedTable(name) local c = {} function c:init() @@ -69,7 +68,6 @@ if not setfenv then end end end - function multi:newSystemThreadedJobQueue(n) local c = {} c.cores = n or THREAD.getCores()*2 @@ -96,7 +94,6 @@ function multi:newSystemThreadedJobQueue(n) return jid-1 end function c:isEmpty() - print(#jobs) return #jobs == 0 end local nFunc = 0 @@ -116,7 +113,6 @@ function multi:newSystemThreadedJobQueue(n) link = c.OnJobCompleted(function(jid,...) if id==jid then rets = {...} - link:Destroy() end end) return thread.hold(function() @@ -127,7 +123,7 @@ function multi:newSystemThreadedJobQueue(n) end,holup),name end for i=1,c.cores do - thread:newthread("PesudoThreadedJobQueue_"..i,function() + thread:newThread("PesudoThreadedJobQueue_"..i,function() while true do thread.yield() if #jobs>0 then @@ -137,7 +133,14 @@ function multi:newSystemThreadedJobQueue(n) thread.sleep(.05) end end - end) + end).OnError(print) end return c -end \ No newline at end of file +end + +function multi:newSystemThreadedConnection(name) + local conn = multi.newConnection() + conn.init = function(self) return self end + GLOBAL[name or "_"] = conn + return conn +end \ No newline at end of file diff --git a/integration/pseudoManager/init.lua b/integration/pseudoManager/init.lua index 84bdca3..26a7144 100644 --- a/integration/pseudoManager/init.lua +++ b/integration/pseudoManager/init.lua @@ -67,6 +67,12 @@ function multi:newSystemThread(name,func,...) THREAD_ID = id, thread = thread } + + if GLOBAL["__env"] then + for i,v in pairs(GLOBAL["__env"]) do + env[i] = v + end + end for i = 1,#tab do env[tab[i]] = _G[tab[i]] diff --git a/integration/pseudoManager/threads.lua b/integration/pseudoManager/threads.lua index 55e1313..51e060a 100644 --- a/integration/pseudoManager/threads.lua +++ b/integration/pseudoManager/threads.lua @@ -101,8 +101,8 @@ local function INIT(thread) THREAD.sleep = thread.sleep THREAD.hold = thread.hold - - function THREAD.setENV(env) + + function THREAD.setENV(env) GLOBAL["__env"] = env end diff --git a/tests/threadtests.lua b/tests/threadtests.lua index a04c367..52dd112 100644 --- a/tests/threadtests.lua +++ b/tests/threadtests.lua @@ -1,6 +1,6 @@ package.path = "../?/init.lua;../?.lua;"..package.path -local multi, thread = require("multi"):init{print=true}--{priority=true} -local proc = multi:newProcessor("Test",true) +multi, thread = require("multi"):init{}--{priority=true} +proc = multi:newProcessor("Thread Test",true) local LANES, LOVE, PSEUDO = 1, 2, 3 local env @@ -21,9 +21,9 @@ else end end +print("Testing THREAD.setENV() if the multi_assert is not found then there is a problem") THREAD.setENV({ multi_assert = function(expected, actual, s) - print("Testing") if expected ~= actual then error(s .. " Expected: '".. expected .."' Actual: '".. actual .."'") end @@ -31,20 +31,19 @@ THREAD.setENV({ }) multi:newThread("Scheduler Thread",function() - print("Test 1: Thread Spawning, THREAD namaspace in threads, global's working, and queues for passing data.") - queue = multi:newSystemThreadedQueue("Test_Queue"):init() - th1 = multi:newSystemThread("Test_Thread_2", function(a,b,c,d,e,f) + th1 = multi:newSystemThread("Test_Thread_1", function(a,b,c,d,e,f) queue = THREAD.waitFor("Test_Queue"):init() - print("!") multi_assert("Test_Thread_1", THREAD.getName(), "Thread name does not match!") - print("!") multi_assert("Passing some args", a, "First argument is not as expected 'Passing some args'") multi_assert(true, e, "Argument e is not true!") multi_assert("table", type(f), "Argument f is not a table!") queue:push("done") - end,"Passing some args", 1, 2, 3, true, {"Table"}).OnError(print) + end,"Passing some args", 1, 2, 3, true, {"Table"}).OnError(function(self,err) + print("Error:", err) + os.exit() + end) if thread.hold(function() return queue:pop() == "done" @@ -52,9 +51,7 @@ multi:newThread("Scheduler Thread",function() thread.kill() end - print("Test 1: Ok") - - print("Test 2: Threaded Functions, arg passing, return passing, holding.") + print("Thread Spawning, THREAD namaspace in threads, global's working, and queues for passing data: Ok") func = THREAD:newFunction(function(a,b,c) assert(a == 3, "First argument expected '3' got '".. a .."'!") @@ -65,24 +62,116 @@ multi:newThread("Scheduler Thread",function() a, b, c, d = func(3,2,1) - print("Returns passed from function", a, b, c, d) - - if not a then print(b) end - assert(a == 1, "First return was not '1'!") assert(b == 2, "Second return was not '2'!") assert(c == 3, "Third return was not '3'!") assert(d[1] == "a table", "Fourth return is not table, or doesn't contain 'a table'!") - print("Test 2: Ok") + print("Threaded Functions, arg passing, return passing, holding: Ok") - print("Test 3: SystemThreadedTables") + test=multi:newSystemThreadedTable("YO"):init() + test["test1"]="tabletest" + local worked = false + multi:newSystemThread("testing tables",function() + tab=THREAD.waitFor("YO"):init() + THREAD.hold(function() return tab["test1"] end) + THREAD.sleep(.1) + tab["test2"] = "Whats so funny?" + end).OnError(print) + + multi:newThread("test2",function() + thread.hold(function() return test["test2"] end) + worked = true + end) + + t, val = thread.hold(function() + return worked + end,{sleep=1}) + + if val == multi.TIMEOUT then + print("SystemThreadedTables: Failed") + os.exit() + end + + print("SystemThreadedTables: Ok") + + local ready = false + + jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads + + func = jq:newFunction("test",function(a,b) + THREAD.sleep(.2) + return a+b + end) + + local count = 0 + for i = 1,10 do + func(i, i*3).OnReturn(function(data) + count = count + 1 + end) + end + + t, val = thread.hold(function() + return count == 10 + end,{sleep=2}) + + if val == multi.TIMEOUT then + print("SystemThreadedJobQueues: Failed") + os.exit() + end + + print("SystemThreadedJobQueues: Ok") + + queue2 = multi:newSystemThreadedQueue("Test_Queue2"):init() + multi:newSystemThread("Test_Thread_2",function() + queue2 = THREAD.waitFor("Test_Queue2"):init() + connOut = THREAD.waitFor("ConnectionNAMEHERE"):init() + connOut(function(arg) + queue2:push("Test_Thread_2") + end) + multi:mainloop() + end).OnError(print) + + multi:newSystemThread("Test_Thread_3",function() + queue2 = THREAD.waitFor("Test_Queue2"):init() + connOut = THREAD.waitFor("ConnectionNAMEHERE"):init() + connOut(function(arg) + queue2:push("Test_Thread_3") + end) + multi:mainloop() + end).OnError(print) + connOut = multi:newSystemThreadedConnection("ConnectionNAMEHERE"):init() + a=0 + connOut(function(arg) + queue2:push("Main") + end) + for i=1,3 do + thread.sleep(.1) + connOut:Fire("Test From Main Thread: "..i.."\n") + end + thread.sleep(2) + local count = 0 + multi:newThread(function() + while count < 9 do + if queue2:pop() then + count = count + 1 + end + end + end).OnError(print) + + _, err = thread.hold(function() return count == 9 end,{sleep=.3}) + + if err == multi.TIMEOUT then + print("SystemThreadedConnections: Failed") + os.exit() + end + + print("SystemThreadedConnections: Ok") + + print("Tests complete!") os.exit() -end).OnError(function(self, err) - print(err) - os.exit() -end) +end).OnError(print)