diff --git a/docs/changes.md b/docs/changes.md index afbc542..3b45ee4 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -337,6 +337,8 @@ Removed Fixed --- +- Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. +- Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. - multi.isMainThread was not properly handled in each integration. This has been resolved. - Issue with pseudo threading env's being messed up. Required removal of getName and getID! - connections being multiplied together would block the entire connection object from pushing events! This is not the desired effect I wanted. Now only the connection reference involved in the multiplication is locked! diff --git a/init.lua b/init.lua index 1d45f14..6f9755f 100644 --- a/init.lua +++ b/init.lua @@ -728,7 +728,7 @@ function multi:newEvent(task, func) if t then self:Pause() self.returns = t - c.OnEvent:Fire(self) + self.OnEvent:Fire(self) return true end end @@ -989,24 +989,9 @@ function multi:newTStep(start,reset,count,set) end local tasks = {} -local _tasks = 0 - -local function _task_handler(self, func) - tasks[#tasks + 1] = func - _tasks = _tasks + 1 -end function multi:newTask(func) - multi:newLoop(function(loop) - for i=1,_tasks do - tasks[i]() - end - _tasks = 0 - end):setName("Task Handler") - -- Re bind this method to use the one that doesn't init a thread! - multi.newTask = _task_handler tasks[#tasks + 1] = func - _tasks = _tasks + 1 end local scheduledjobs = {} @@ -1396,17 +1381,22 @@ function thread:newFunctionBase(generator, holdme) if err then return multi.NIL, err elseif rets then - return cleanReturns((rets[1] or multi.NIL),rets[2],rets[3],rets[4],rets[5],rets[6],rets[7],rets[8],rets[9],rets[10],rets[11],rets[12],rets[13],rets[14],rets[15],rets[16]) + local g = rets + rets = nil + return cleanReturns((g[1] or multi.NIL),g[2],g[3],g[4],g[5],g[6],g[7],g[8],g[9],g[10],g[11],g[12],g[13],g[14],g[15],g[16]) end end) else while not rets and not err do multi:getCurrentProcess():getHandler()() + multi:getHandler()() end + local g = rets + rets = nil if err then return nil,err end - return cleanReturns(rets[1],rets[2],rets[3],rets[4],rets[5],rets[6],rets[7],rets[8],rets[9],rets[10],rets[11],rets[12],rets[13],rets[14],rets[15],rets[16]) + return cleanReturns(g[1],g[2],g[3],g[4],g[5],g[6],g[7],g[8],g[9],g[10],g[11],g[12],g[13],g[14],g[15],g[16]) end end tfunc.__call = function(th,...) @@ -1776,7 +1766,7 @@ co_status = { ["normal"] = function(thd,ref) end, ["running"] = function(thd,ref) end, ["dead"] = function(thd,ref,task,i,th) - if ref.__processed then return end + if ref.__processed then table.remove(th,i) return end if _ then ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) else @@ -1792,7 +1782,7 @@ co_status = { end end end - _=nil r1=nil r2=nil r3=nil r4=nil r5=nil + _=nil r1=nil r2=nil r3=nil r4=nil r5=nil r6=nil r7=nil r8=nil r9=nil r10=nil r11=nil r12=nil r13=nil r14=nil r15=nil r16=nil ref.__processed = true end, } @@ -2400,4 +2390,13 @@ function multi:getHandler() return threadManager:getHandler() end +multi:newThread("Task Handler", function() + local check = function() + return table.remove(tasks) + end + while true do + thread.hold(check)() + end +end).OnError(multi.error) + return multi \ No newline at end of file diff --git a/integration/lanesManager/extensions.lua b/integration/lanesManager/extensions.lua index 596f5f6..6a98121 100644 --- a/integration/lanesManager/extensions.lua +++ b/integration/lanesManager/extensions.lua @@ -167,12 +167,12 @@ function multi:newSystemThreadedJobQueue(n) return queueJob:pop() end) idle = clock() - thread:newThread("test",function() + thread:newThread("JobQueue-Spawn",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) local args = table.remove(dat, 1) queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} - end) + end).OnError(multi.error) end end).OnError(print) thread:newThread("DoAllHandler",function() diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index e4df31a..8e9fe59 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -132,7 +132,7 @@ function multi.InitSystemThreadErrorHandler() thread.yield() _,data = __ConsoleLinda:receive(0, "Q") if data then - print(data[1]) + --print(data[1]) end for i = #threads, 1, -1 do temp = threads[i] diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 209cd36..893ebdc 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -211,7 +211,7 @@ function multi:newSystemThreadedProcessor(cores) return tjq:pop() end) if dat then - thread:newThread("test",function() + thread:newThread("JQ-TargetThread",function() local name = table.remove(dat, 1) local jid = table.remove(dat, 1) local args = table.remove(dat, 1) @@ -387,22 +387,20 @@ function multi:newSystemThreadedProcessor(cores) for i,v in pairs(self.proc_list) do local conn local jid = self:pushJob(v, func) - - conn = c.jobqueue.OnJobCompleted(function(id, data) + + conn = self.jobqueue.OnJobCompleted(function(id, data) if id == jid then - loads[v] = data - multi:newAlarm(1):OnRing(function(alarm) - c.jobqueue.OnJobCompleted:Unconnect(conn) - alarm:Destroy() + table.insert(loads, {v, data}) + multi:newTask(function() + self.jobqueue.OnJobCompleted:Unconnect(conn) end) end end) end thread.hold(function() return #loads == c.cores end) - return loads - end) + end, true) return c end diff --git a/tests/threadtests.lua b/tests/threadtests.lua index a3024fc..b67a7e1 100644 --- a/tests/threadtests.lua +++ b/tests/threadtests.lua @@ -110,7 +110,7 @@ multi:newThread("Scheduler Thread",function() jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads - func = jq:newFunction("test",function(a,b) + func = jq:newFunction("test-thread",function(a,b) THREAD.sleep(.2) return a+b end)