Added test cases for threading, fixed issues. Todo test love2d

This commit is contained in:
Ryan Ward 2023-04-25 00:13:47 -04:00
parent d2cfdfa8e8
commit 61b5ea9d14
7 changed files with 149 additions and 42 deletions

View File

@ -204,6 +204,7 @@ Removed
Fixed
---
- Issue with lanes not handling errors properly. This is now resolved
- Oversight with how pushStatus worked with nesting threaded functions, connections and forwarding events
```lua
func = thread:newFunction(function()

View File

@ -114,7 +114,6 @@ function multi:newSystemThreadedJobQueue(n)
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Destroy()
end
end)
return thread.hold(function()
@ -278,7 +277,13 @@ function multi:newSystemThreadedConnection(name)
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
local tempMT = {}
for i,v in pairs(mt) do
tempMT[i] = v
end
tempMT.__index = self.proxy_conn
tempMT.__call = function(t,func) self.proxy_conn(func) end
setmetatable(self, tempMT)
if self.CID == THREAD.getID() then return self end
thread:newThread("STC_CONN_MAN"..name,function()
local item

View File

@ -95,10 +95,11 @@ function multi:newSystemThread(name, func, ...)
globals = globe,
priority = c.priority
},function(...)
require("multi"):init(multi_settings)
multi, thread = require("multi"):init(multi_settings)
require("multi.integration.lanesManager.extensions")
local has_error = true
return_linda:set("returns",{func(...)})
returns = {pcall(func, ...)}
return_linda:set("returns", returns)
has_error = false
end)(...)
count = count + 1
@ -138,9 +139,15 @@ function multi.InitSystemThreadErrorHandler()
temp.statusconnector:Fire(unpack(({__StatusLinda:receive(nil, temp.Id)})[2]))
end
if status == "done" or temp.returns:get("returns") then
returns = ({temp.returns:receive(0, "returns")})[2]
livingThreads[temp.Id] = {false, temp.Name}
temp.alive = false
temp.OnDeath:Fire(unpack(({temp.returns:receive(0, "returns")})[2]))
if returns[1] == false then
temp.OnError:Fire(temp, returns[2])
else
table.remove(returns,1)
temp.OnDeath:Fire(unpack(returns))
end
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
elseif status == "running" then
@ -148,11 +155,7 @@ function multi.InitSystemThreadErrorHandler()
elseif status == "waiting" then
--
elseif status == "error" then
livingThreads[temp.Id] = {false, temp.Name}
temp.alive = false
temp.OnError:Fire(temp,unpack(temp.returns:receive(0,"returns") or {"Thread Killed!"}))
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
-- The thread never really errors, we handle this through our linda object
elseif status == "cancelled" then
livingThreads[temp.Id] = {false, temp.Name}
temp.alive = false
@ -168,7 +171,7 @@ function multi.InitSystemThreadErrorHandler()
end
end
end
end)
end).OnError(print)
end
multi.print("Integrated Lanes Threading!")

View File

@ -50,7 +50,6 @@ function multi:newSystemThreadedQueue(name)
GLOBAL[name or "_"] = c
return c
end
function multi:newSystemThreadedTable(name)
local c = {}
function c:init()
@ -69,7 +68,6 @@ if not setfenv then
end
end
end
function multi:newSystemThreadedJobQueue(n)
local c = {}
c.cores = n or THREAD.getCores()*2
@ -96,7 +94,6 @@ function multi:newSystemThreadedJobQueue(n)
return jid-1
end
function c:isEmpty()
print(#jobs)
return #jobs == 0
end
local nFunc = 0
@ -116,7 +113,6 @@ function multi:newSystemThreadedJobQueue(n)
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Destroy()
end
end)
return thread.hold(function()
@ -127,7 +123,7 @@ function multi:newSystemThreadedJobQueue(n)
end,holup),name
end
for i=1,c.cores do
thread:newthread("PesudoThreadedJobQueue_"..i,function()
thread:newThread("PesudoThreadedJobQueue_"..i,function()
while true do
thread.yield()
if #jobs>0 then
@ -137,7 +133,14 @@ function multi:newSystemThreadedJobQueue(n)
thread.sleep(.05)
end
end
end)
end).OnError(print)
end
return c
end
function multi:newSystemThreadedConnection(name)
local conn = multi.newConnection()
conn.init = function(self) return self end
GLOBAL[name or "_"] = conn
return conn
end

View File

@ -68,6 +68,12 @@ function multi:newSystemThread(name,func,...)
thread = thread
}
if GLOBAL["__env"] then
for i,v in pairs(GLOBAL["__env"]) do
env[i] = v
end
end
for i = 1,#tab do
env[tab[i]] = _G[tab[i]]
end

View File

@ -102,7 +102,7 @@ local function INIT(thread)
THREAD.hold = thread.hold
function THREAD.setENV(env)
function THREAD.setENV(env)
GLOBAL["__env"] = env
end

View File

@ -1,6 +1,6 @@
package.path = "../?/init.lua;../?.lua;"..package.path
local multi, thread = require("multi"):init{print=true}--{priority=true}
local proc = multi:newProcessor("Test",true)
multi, thread = require("multi"):init{}--{priority=true}
proc = multi:newProcessor("Thread Test",true)
local LANES, LOVE, PSEUDO = 1, 2, 3
local env
@ -21,9 +21,9 @@ else
end
end
print("Testing THREAD.setENV() if the multi_assert is not found then there is a problem")
THREAD.setENV({
multi_assert = function(expected, actual, s)
print("Testing")
if expected ~= actual then
error(s .. " Expected: '".. expected .."' Actual: '".. actual .."'")
end
@ -31,20 +31,19 @@ THREAD.setENV({
})
multi:newThread("Scheduler Thread",function()
print("Test 1: Thread Spawning, THREAD namaspace in threads, global's working, and queues for passing data.")
queue = multi:newSystemThreadedQueue("Test_Queue"):init()
th1 = multi:newSystemThread("Test_Thread_2", function(a,b,c,d,e,f)
th1 = multi:newSystemThread("Test_Thread_1", function(a,b,c,d,e,f)
queue = THREAD.waitFor("Test_Queue"):init()
print("!")
multi_assert("Test_Thread_1", THREAD.getName(), "Thread name does not match!")
print("!")
multi_assert("Passing some args", a, "First argument is not as expected 'Passing some args'")
multi_assert(true, e, "Argument e is not true!")
multi_assert("table", type(f), "Argument f is not a table!")
queue:push("done")
end,"Passing some args", 1, 2, 3, true, {"Table"}).OnError(print)
end,"Passing some args", 1, 2, 3, true, {"Table"}).OnError(function(self,err)
print("Error:", err)
os.exit()
end)
if thread.hold(function()
return queue:pop() == "done"
@ -52,9 +51,7 @@ multi:newThread("Scheduler Thread",function()
thread.kill()
end
print("Test 1: Ok")
print("Test 2: Threaded Functions, arg passing, return passing, holding.")
print("Thread Spawning, THREAD namaspace in threads, global's working, and queues for passing data: Ok")
func = THREAD:newFunction(function(a,b,c)
assert(a == 3, "First argument expected '3' got '".. a .."'!")
@ -65,24 +62,116 @@ multi:newThread("Scheduler Thread",function()
a, b, c, d = func(3,2,1)
print("Returns passed from function", a, b, c, d)
if not a then print(b) end
assert(a == 1, "First return was not '1'!")
assert(b == 2, "Second return was not '2'!")
assert(c == 3, "Third return was not '3'!")
assert(d[1] == "a table", "Fourth return is not table, or doesn't contain 'a table'!")
print("Test 2: Ok")
print("Threaded Functions, arg passing, return passing, holding: Ok")
print("Test 3: SystemThreadedTables")
test=multi:newSystemThreadedTable("YO"):init()
test["test1"]="tabletest"
local worked = false
multi:newSystemThread("testing tables",function()
tab=THREAD.waitFor("YO"):init()
THREAD.hold(function() return tab["test1"] end)
THREAD.sleep(.1)
tab["test2"] = "Whats so funny?"
end).OnError(print)
multi:newThread("test2",function()
thread.hold(function() return test["test2"] end)
worked = true
end)
t, val = thread.hold(function()
return worked
end,{sleep=1})
if val == multi.TIMEOUT then
print("SystemThreadedTables: Failed")
os.exit()
end
print("SystemThreadedTables: Ok")
local ready = false
jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads
func = jq:newFunction("test",function(a,b)
THREAD.sleep(.2)
return a+b
end)
local count = 0
for i = 1,10 do
func(i, i*3).OnReturn(function(data)
count = count + 1
end)
end
t, val = thread.hold(function()
return count == 10
end,{sleep=2})
if val == multi.TIMEOUT then
print("SystemThreadedJobQueues: Failed")
os.exit()
end
print("SystemThreadedJobQueues: Ok")
queue2 = multi:newSystemThreadedQueue("Test_Queue2"):init()
multi:newSystemThread("Test_Thread_2",function()
queue2 = THREAD.waitFor("Test_Queue2"):init()
connOut = THREAD.waitFor("ConnectionNAMEHERE"):init()
connOut(function(arg)
queue2:push("Test_Thread_2")
end)
multi:mainloop()
end).OnError(print)
multi:newSystemThread("Test_Thread_3",function()
queue2 = THREAD.waitFor("Test_Queue2"):init()
connOut = THREAD.waitFor("ConnectionNAMEHERE"):init()
connOut(function(arg)
queue2:push("Test_Thread_3")
end)
multi:mainloop()
end).OnError(print)
connOut = multi:newSystemThreadedConnection("ConnectionNAMEHERE"):init()
a=0
connOut(function(arg)
queue2:push("Main")
end)
for i=1,3 do
thread.sleep(.1)
connOut:Fire("Test From Main Thread: "..i.."\n")
end
thread.sleep(2)
local count = 0
multi:newThread(function()
while count < 9 do
if queue2:pop() then
count = count + 1
end
end
end).OnError(print)
_, err = thread.hold(function() return count == 9 end,{sleep=.3})
if err == multi.TIMEOUT then
print("SystemThreadedConnections: Failed")
os.exit()
end
print("SystemThreadedConnections: Ok")
print("Tests complete!")
os.exit()
end).OnError(function(self, err)
print(err)
os.exit()
end)
end).OnError(print)