diff --git a/changes.md b/changes.md index ff8c81f..4ed1957 100644 --- a/changes.md +++ b/changes.md @@ -31,6 +31,50 @@ Added: Changed: --- +- `thread.hold(connectionObj)` now passes the returns of that connection to `thread.hold()`! See Exampe below: + ```lua + multi, thread = require("multi"):init() + + func = thread:newFunction(function(count) + local a = 0 + while true do + a = a + 1 + thread.sleep(.1) + thread.pushStatus(a,count) + if a == count then break end + end + return "Done", 1, 2, 3 + end) + + thread:newThread("test",function() + local ret = func(10) + ret.OnStatus(function(part,whole) + print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%") + end) + print("Status:",thread.hold(ret.OnReturn)) + print("Function Done!") + os.exit() + end).OnError(function(...) + print("Error:",...) + end) + + multi:mainloop() + ``` + Output: + ``` + Ret1: 10% + Ret1: 20% + Ret1: 30% + Ret1: 40% + Ret1: 50% + Ret1: 60% + Ret1: 70% + Ret1: 80% + Ret1: 90% + Ret1: 100% + Status: Done 1 2 3 nil nil nil nil nil nil nil nil nil nil nil nil + Function Done! + ``` - Modified how threads are handled internally. This changes makes it so threads "regardless of amount" should not impact performance. What you do in the threads might. This change was made by internally only processing one thread per step per processor. If you have 10 processors that are all active expect one step to process 10 threads. However if one processor has 10 threads each step will only process one thread. Simply put each addition of a thread shouldn't impact performance as it did before. - Moved `multi:newThread(...)` into the thread interface (`thread:newThread(...)`), code using `multi:newThread(...)` will still work. Also using `process:newThread(...)` binds the thread to the process, meaning if the process the thread is bound to is paused so is the thread. diff --git a/multi/init.lua b/multi/init.lua index d234e52..d6c301e 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -966,12 +966,10 @@ function multi:newProcessor(name,nothread) return Active end function c.Start() - print("Proc Start",mainloopActive) Active = true return c end function c.Stop() - print("Proc Stop") Active = false return c end @@ -1026,6 +1024,7 @@ thread.requests = {} local CMD = {} -- We will compare this special local local interval local resume, status, create, yield, running = coroutine.resume, coroutine.status, coroutine.create, coroutine.yield, coroutine.running + local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7 function thread.request(t,cmd,...) @@ -1077,13 +1076,20 @@ function thread.hold(n,opt) thread.getRunningThread().lastSleep = clock() return yield(CMD, t_sleep, n or 0, nil, interval) elseif type(n) == "table" and n.Type == "connector" then - local ready = false - n(function() - ready = true + local rdy = function() + return false + end + n(function(a1,a2,a3,a4,a5,a6) + rdy = function() + if a1==nil then + return NIL,a2,a3,a4,a5,a6 + end + return a1,a2,a3,a4,a5,a6 + end end) return yield(CMD, t_hold, function() - return ready - end) + return rdy() + end, nil, interval) elseif type(n) == "function" then return yield(CMD, t_hold, n or dFunc, nil, interval) else @@ -1231,7 +1237,7 @@ function thread:newFunctionBase(generator,holdme) return tempConn end } - t.OnDeath(function(self,status,...) temp.OnReturn:Fire(...) end) + t.OnDeath(function(...) temp.OnReturn:Fire(...) end) t.OnError(function(self,err) temp.OnError:Fire(err) end) t.linkedFunction = temp t.statusconnector = temp.OnStatus @@ -1261,10 +1267,10 @@ local startme_len = 0 function thread:newThread(name,func,...) multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called local func = func or name + if type(name) == "function" then name = "Thread#"..threadCount end - local c={nil,nil,nil,nil,nil,nil,nil} local env = {self=c} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} @@ -1330,10 +1336,8 @@ function thread:newThread(name,func,...) c.Destroy = c.Kill if self.Type=="process" then - table.insert(self.threads,c) table.insert(self.startme,c) else - table.insert(threads,c) table.insert(startme,c) end @@ -1478,10 +1482,14 @@ local cmds = {-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_no function() end } setmetatable(cmds,{__index=function() return function() end end}) -local co_status = { - ["suspended"] = function(thd,ref,task) +local co_status +co_status = { + ["suspended"] = function(thd,ref,task,i,th) switch[task](ref,thd) cmds[r1](ref,r2,r3,r4,r5) + if ret~=CMD then -- The rework makes this necessary + co_status["dead"](thd,ref,task,i,th) + end r1=nil r2=nil r3=nil r4=nil r5=nil end, ["normal"] = function(thd,ref) end, @@ -1506,12 +1514,14 @@ local co_status = { end, } local handler = coroutine.wrap(function(self) + local temp_start while true do - for start = startme_len,1,-1 do - _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs)) - co_status[status(startme[startme_len].thread)](startme[startme_len].thread,startme[startme_len],t_none,nil,threads) -- Make sure there was no error - startme[startme_len] = nil - startme_len = #startme + for start = 1, #startme do + temp_start = startme[start] + table.remove(startme) + _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(temp_start.thread,unpack(temp_start.startArgs)) + co_status[status(temp_start.thread)](temp_start.thread,temp_start,t_none,nil,threads) -- Make sure there was no error + table.insert(threads,temp_start) yield() end for i=#threads,1,-1 do @@ -1531,10 +1541,12 @@ end) function multi:createHandler(threads,startme) return coroutine.wrap(function(self) while true do - for start = #startme,1,-1 do - _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs)) - co_status[status(startme[start].thread)](startme[start].thread,startme[start],t_none,nil,threads) -- Make sure there was no error - table.remove(startme) + for start = #startme, 1, -1 do + temp_start = startme[start] + table.remove(startme[start]) + _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(temp_start.thread,unpack(temp_start.startArgs)) + co_status[status(temp_start.thread)](temp_start.thread,temp_start,t_none,nil,threads) -- Make sure there was no error + table.insert(threads,temp_start) yield() end for i=#threads,1,-1 do diff --git a/test.lua b/test.lua index 21fd9e2..5ed2e58 100644 --- a/test.lua +++ b/test.lua @@ -1,113 +1,27 @@ ---package.path = "./?/init.lua;"..package.path -multi,thread = require("multi"):init() +package.path = "./?/init.lua;"..package.path +multi, thread = require("multi"):init() func = thread:newFunction(function(count) local a = 0 while true do a = a + 1 - thread.sleep(.5) + thread.sleep(.1) thread.pushStatus(a,count) if a == count then break end end - return "Done" + return "Done", 1, 2, 3 end) -multi:newThread("test",function() +thread:newThread("test",function() local ret = func(10) - local ret2 = func(15) - local ret3 = func(20) ret.OnStatus(function(part,whole) print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%") end) - ret2.OnStatus(function(part,whole) - print("Ret2: ",math.ceil((part/whole)*1000)/10 .."%") - end) - ret3.OnStatus(function(part,whole) - print("Ret3: ",math.ceil((part/whole)*1000)/10 .."%") - end) - thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn) + print("Status:",thread.hold(ret.OnReturn)) print("Function Done!") os.exit() -end) - ---GLOBAL,THREAD = require("multi.integration.threading"):init() -- Auto detects your environment and uses what's available - -func2 = thread:newFunction(function() - thread.sleep(3) - print("Hello World!") - return true -end,true) -- set holdme to true - -func2:holdMe(false) -- reset holdme to false -print("Calling func...") -print(func2()) - -test = thread:newFunction(function(a,b) - thread.sleep(1) - return a,b -end) -print(test(1,2).connect(function(...) - print(...) -end)) -test:Pause() -print(test(1,2).connect(function(...) - print(...) -end)) -test:Resume() -print(test(1,2).connect(function(...) - print(...) -end)) - -test = thread:newFunction(function() - return 1,2,nil,3,4,5,6,7,8,9 -end,true) -print(test()) -multi:newThread("testing",function() - print("#Test = ",test()) - print(thread.hold(function() - print("Hello!") - return false - end,{ - interval = 2, - cycles = 3 - })) -- End result, 3 attempts within 6 seconds. If still false then timeout - print("held") end).OnError(function(...) - print(...) + print("Error:",...) end) -sandbox = multi:newProcessor("Test Processor") -sandbox:newTLoop(function() - print("testing...") -end,1) - -test2 = multi:newTLoop(function() - print("testing2...") -end,1) - -sandbox:newThread("Test Thread",function() - local a = 0 - while true do - thread.sleep(1) - a = a + 1 - print("Thread Test: ".. multi.getCurrentProcess().Name) - if a == 10 then - sandbox.Stop() - end - end -end).OnError(function(...) - print(...) -end) - -multi:newThread("Test Thread",function() - while true do - thread.sleep(1) - print("Thread Test: ".. multi.getCurrentProcess().Name) - end -end).OnError(function(...) - print(...) -end) - -sandbox.Start() - multi:mainloop() \ No newline at end of file