From c39aa229f83616c2ea439974906bfffcb1190423 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Tue, 20 Jun 2023 00:05:12 -0400 Subject: [PATCH] Fixed critical issue with coroutine based threads --- docs/changes.md | 1 + init.lua | 23 ++++++++++------- integration/sharedExtensions/init.lua | 36 +++++++-------------------- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index 44a30ca..f27f5fe 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -449,6 +449,7 @@ Removed Fixed --- +- Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. - multi.isMainThread was not properly handled in each integration. This has been resolved. diff --git a/init.lua b/init.lua index 1667819..d67cdb2 100644 --- a/init.lua +++ b/init.lua @@ -352,7 +352,7 @@ function multi:newConnection(protect,func,kill) for i=1,#fast do local suc, err = pcall(fast[i], ...) if not suc then - print(err) + multi.error(err) end if kill then table.insert(kills,i) @@ -1393,7 +1393,6 @@ function thread:newFunctionBase(generator, holdme) else while not rets and not err do multi:getCurrentProcess():getHandler()() - multi:getHandler()() end local g = rets rets = nil @@ -1529,9 +1528,9 @@ end function thread:newThread(name, func, ...) multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called - local func = func or name - if func == name then - name = name or multi.randomString(16) + if type(name) == "function" then + func = name + name = "UnnamedThread_"..multi.randomString(16) end local c={nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} @@ -1776,7 +1775,6 @@ co_status = { if _ then ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) else - print("Thread: ", ref.Name, ref.creator, THREAD_NAME) ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) end if i then @@ -1799,9 +1797,16 @@ function multi:createHandler() return coroutine.wrap(function() local temp_start while true do - for start = #startme, 1, -1 do - temp_start = startme[start] - table.remove(startme) + -- for start = #startme, 1, -1 do + -- temp_start = startme[start] + -- table.remove(startme) + -- _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) + -- co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) + -- table.insert(threads, temp_start) + -- yield() + -- end + while #startme>0 do + temp_start = table.remove(startme) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads) table.insert(threads, temp_start) diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 48800f3..f5bb84b 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -180,7 +180,7 @@ function multi:newProxy(list) end end function c:getTransferable() - local multi, thread = require("multi"):init() + local multi, thread = nil, nil local cp = {} cp.name = self.name cp.funcs = copy(self._funcs) @@ -216,11 +216,7 @@ function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registe local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() return thread.hold(function() local data = tjq:peek() - if data then - print(data) - end if data and data[1] == id then - print("Got it sigh") tjq:pop() table.remove(data, 1) return multi.unpack(data) or multi.NIL @@ -300,9 +296,6 @@ function multi:newSystemThreadedProcessor(cores) return tjq:pop() end) if dat then - for i,v in pairs(dat) do - print(i,v) - end th = thread:newThread("JQ-TargetThread",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) @@ -328,9 +321,7 @@ function multi:newSystemThreadedProcessor(cores) end) function c:pushJob(ID, name, ...) - print("pushing") local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() - --targets[ID]:push{name, jid, {...}} tq:push{name, jid, {...}} jid = jid - 1 return jid + 1 @@ -492,7 +483,6 @@ function multi:newSystemThreadedProcessor(cores) end end end).OnError(multi.error) - return c end @@ -504,31 +494,26 @@ function thread.hold(n, opt) local args local id = n.getThreadID() local name = n:getUniqueName() - print(id, name) local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) local multi, thread = require("multi"):init() local obj = _G[_name] - print("Start") local rets = {thread.hold(obj)} - print("Ring ;)") for i,v in pairs(rets) do if v.Type then rets[i] = {_self_ref_ = "parent"} end end return multi.unpack(rets) - end, true) + end) local conn - local args = {func(name)} - -- conn = handle.OnReturn(function(...) - -- ready = true - -- args = {...} - -- for i,v in pairs(args) do - -- print("DATA",i,v) - -- end - -- handle.OnReturn:Unconnect(conn) - -- end) + local args + handle = func(name) + conn = handle.OnReturn(function(...) + ready = true + args = {...} + handle.OnReturn:Unconnect(conn) + end) local ret = {thread_ref(function() if ready then @@ -537,13 +522,10 @@ function thread.hold(n, opt) end, opt)} for i,v in pairs(ret) do - print("OBJECT",v.Type) if type(v) == "table" and v._self_ref_ == "parent" then - print("assign") ret[i] = n.Parent end end - return multi.unpack(ret) else return thread_ref(n, opt)