Fixed critical issue with coroutine based threads

This commit is contained in:
Ryan Ward 2023-06-20 00:05:12 -04:00
parent c80f44c68e
commit c39aa229f8
3 changed files with 24 additions and 36 deletions

View File

@ -449,6 +449,7 @@ Removed
Fixed Fixed
--- ---
- Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind.
- Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function.
- Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things.
- multi.isMainThread was not properly handled in each integration. This has been resolved. - multi.isMainThread was not properly handled in each integration. This has been resolved.

View File

@ -352,7 +352,7 @@ function multi:newConnection(protect,func,kill)
for i=1,#fast do for i=1,#fast do
local suc, err = pcall(fast[i], ...) local suc, err = pcall(fast[i], ...)
if not suc then if not suc then
print(err) multi.error(err)
end end
if kill then if kill then
table.insert(kills,i) table.insert(kills,i)
@ -1393,7 +1393,6 @@ function thread:newFunctionBase(generator, holdme)
else else
while not rets and not err do while not rets and not err do
multi:getCurrentProcess():getHandler()() multi:getCurrentProcess():getHandler()()
multi:getHandler()()
end end
local g = rets local g = rets
rets = nil rets = nil
@ -1529,9 +1528,9 @@ end
function thread:newThread(name, func, ...) function thread:newThread(name, func, ...)
multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called
local func = func or name if type(name) == "function" then
if func == name then func = name
name = name or multi.randomString(16) name = "UnnamedThread_"..multi.randomString(16)
end end
local c={nil,nil,nil,nil,nil,nil,nil} local c={nil,nil,nil,nil,nil,nil,nil}
c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil}
@ -1776,7 +1775,6 @@ co_status = {
if _ then if _ then
ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
else else
print("Thread: ", ref.Name, ref.creator, THREAD_NAME)
ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
end end
if i then if i then
@ -1799,9 +1797,16 @@ function multi:createHandler()
return coroutine.wrap(function() return coroutine.wrap(function()
local temp_start local temp_start
while true do while true do
for start = #startme, 1, -1 do -- for start = #startme, 1, -1 do
temp_start = startme[start] -- temp_start = startme[start]
table.remove(startme) -- table.remove(startme)
-- _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs))
-- co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads)
-- table.insert(threads, temp_start)
-- yield()
-- end
while #startme>0 do
temp_start = table.remove(startme)
_, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs))
co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads)
table.insert(threads, temp_start) table.insert(threads, temp_start)

View File

@ -180,7 +180,7 @@ function multi:newProxy(list)
end end
end end
function c:getTransferable() function c:getTransferable()
local multi, thread = require("multi"):init() local multi, thread = nil, nil
local cp = {} local cp = {}
cp.name = self.name cp.name = self.name
cp.funcs = copy(self._funcs) cp.funcs = copy(self._funcs)
@ -216,11 +216,7 @@ function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registe
local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init()
return thread.hold(function() return thread.hold(function()
local data = tjq:peek() local data = tjq:peek()
if data then
print(data)
end
if data and data[1] == id then if data and data[1] == id then
print("Got it sigh")
tjq:pop() tjq:pop()
table.remove(data, 1) table.remove(data, 1)
return multi.unpack(data) or multi.NIL return multi.unpack(data) or multi.NIL
@ -300,9 +296,6 @@ function multi:newSystemThreadedProcessor(cores)
return tjq:pop() return tjq:pop()
end) end)
if dat then if dat then
for i,v in pairs(dat) do
print(i,v)
end
th = thread:newThread("JQ-TargetThread",function() th = thread:newThread("JQ-TargetThread",function()
local name = table.remove(dat, 1) local name = table.remove(dat, 1)
local jid = table.remove(dat, 1) local jid = table.remove(dat, 1)
@ -328,9 +321,7 @@ function multi:newSystemThreadedProcessor(cores)
end) end)
function c:pushJob(ID, name, ...) function c:pushJob(ID, name, ...)
print("pushing")
local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init()
--targets[ID]:push{name, jid, {...}}
tq:push{name, jid, {...}} tq:push{name, jid, {...}}
jid = jid - 1 jid = jid - 1
return jid + 1 return jid + 1
@ -492,7 +483,6 @@ function multi:newSystemThreadedProcessor(cores)
end end
end end
end).OnError(multi.error) end).OnError(multi.error)
return c return c
end end
@ -504,31 +494,26 @@ function thread.hold(n, opt)
local args local args
local id = n.getThreadID() local id = n.getThreadID()
local name = n:getUniqueName() local name = n:getUniqueName()
print(id, name)
local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local obj = _G[_name] local obj = _G[_name]
print("Start")
local rets = {thread.hold(obj)} local rets = {thread.hold(obj)}
print("Ring ;)")
for i,v in pairs(rets) do for i,v in pairs(rets) do
if v.Type then if v.Type then
rets[i] = {_self_ref_ = "parent"} rets[i] = {_self_ref_ = "parent"}
end end
end end
return multi.unpack(rets) return multi.unpack(rets)
end, true) end)
local conn local conn
local args = {func(name)} local args
-- conn = handle.OnReturn(function(...) handle = func(name)
-- ready = true conn = handle.OnReturn(function(...)
-- args = {...} ready = true
-- for i,v in pairs(args) do args = {...}
-- print("DATA",i,v) handle.OnReturn:Unconnect(conn)
-- end end)
-- handle.OnReturn:Unconnect(conn)
-- end)
local ret = {thread_ref(function() local ret = {thread_ref(function()
if ready then if ready then
@ -537,13 +522,10 @@ function thread.hold(n, opt)
end, opt)} end, opt)}
for i,v in pairs(ret) do for i,v in pairs(ret) do
print("OBJECT",v.Type)
if type(v) == "table" and v._self_ref_ == "parent" then if type(v) == "table" and v._self_ref_ == "parent" then
print("assign")
ret[i] = n.Parent ret[i] = n.Parent
end end
end end
return multi.unpack(ret) return multi.unpack(ret)
else else
return thread_ref(n, opt) return thread_ref(n, opt)