Fixed issue with the new thread scheduler(again) connections and hold get a buff

This commit is contained in:
Ryan Ward 2022-02-15 15:11:28 -05:00
parent 72e24e8a9b
commit 04d5500374
3 changed files with 85 additions and 115 deletions

View File

@ -31,6 +31,50 @@ Added:
Changed:
---
- `thread.hold(connectionObj)` now passes the returns of that connection to `thread.hold()`! See Exampe below:
```lua
multi, thread = require("multi"):init()
func = thread:newFunction(function(count)
local a = 0
while true do
a = a + 1
thread.sleep(.1)
thread.pushStatus(a,count)
if a == count then break end
end
return "Done", 1, 2, 3
end)
thread:newThread("test",function()
local ret = func(10)
ret.OnStatus(function(part,whole)
print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%")
end)
print("Status:",thread.hold(ret.OnReturn))
print("Function Done!")
os.exit()
end).OnError(function(...)
print("Error:",...)
end)
multi:mainloop()
```
Output:
```
Ret1: 10%
Ret1: 20%
Ret1: 30%
Ret1: 40%
Ret1: 50%
Ret1: 60%
Ret1: 70%
Ret1: 80%
Ret1: 90%
Ret1: 100%
Status: Done 1 2 3 nil nil nil nil nil nil nil nil nil nil nil nil
Function Done!
```
- Modified how threads are handled internally. This changes makes it so threads "regardless of amount" should not impact performance. What you do in the threads might. This change was made by internally only processing one thread per step per processor. If you have 10 processors that are all active expect one step to process 10 threads. However if one processor has 10 threads each step will only process one thread. Simply put each addition of a thread shouldn't impact performance as it did before.
- Moved `multi:newThread(...)` into the thread interface (`thread:newThread(...)`), code using `multi:newThread(...)` will still work. Also using `process:newThread(...)` binds the thread to the process, meaning if the process the thread is bound to is paused so is the thread.

View File

@ -966,12 +966,10 @@ function multi:newProcessor(name,nothread)
return Active
end
function c.Start()
print("Proc Start",mainloopActive)
Active = true
return c
end
function c.Stop()
print("Proc Stop")
Active = false
return c
end
@ -1026,6 +1024,7 @@ thread.requests = {}
local CMD = {} -- We will compare this special local
local interval
local resume, status, create, yield, running = coroutine.resume, coroutine.status, coroutine.create, coroutine.yield, coroutine.running
local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7
function thread.request(t,cmd,...)
@ -1077,13 +1076,20 @@ function thread.hold(n,opt)
thread.getRunningThread().lastSleep = clock()
return yield(CMD, t_sleep, n or 0, nil, interval)
elseif type(n) == "table" and n.Type == "connector" then
local ready = false
n(function()
ready = true
local rdy = function()
return false
end
n(function(a1,a2,a3,a4,a5,a6)
rdy = function()
if a1==nil then
return NIL,a2,a3,a4,a5,a6
end
return a1,a2,a3,a4,a5,a6
end
end)
return yield(CMD, t_hold, function()
return ready
end)
return rdy()
end, nil, interval)
elseif type(n) == "function" then
return yield(CMD, t_hold, n or dFunc, nil, interval)
else
@ -1231,7 +1237,7 @@ function thread:newFunctionBase(generator,holdme)
return tempConn
end
}
t.OnDeath(function(self,status,...) temp.OnReturn:Fire(...) end)
t.OnDeath(function(...) temp.OnReturn:Fire(...) end)
t.OnError(function(self,err) temp.OnError:Fire(err) end)
t.linkedFunction = temp
t.statusconnector = temp.OnStatus
@ -1261,10 +1267,10 @@ local startme_len = 0
function thread:newThread(name,func,...)
multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called
local func = func or name
if type(name) == "function" then
name = "Thread#"..threadCount
end
local c={nil,nil,nil,nil,nil,nil,nil}
local env = {self=c}
c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil}
@ -1330,10 +1336,8 @@ function thread:newThread(name,func,...)
c.Destroy = c.Kill
if self.Type=="process" then
table.insert(self.threads,c)
table.insert(self.startme,c)
else
table.insert(threads,c)
table.insert(startme,c)
end
@ -1478,10 +1482,14 @@ local cmds = {-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_no
function() end
}
setmetatable(cmds,{__index=function() return function() end end})
local co_status = {
["suspended"] = function(thd,ref,task)
local co_status
co_status = {
["suspended"] = function(thd,ref,task,i,th)
switch[task](ref,thd)
cmds[r1](ref,r2,r3,r4,r5)
if ret~=CMD then -- The rework makes this necessary
co_status["dead"](thd,ref,task,i,th)
end
r1=nil r2=nil r3=nil r4=nil r5=nil
end,
["normal"] = function(thd,ref) end,
@ -1506,12 +1514,14 @@ local co_status = {
end,
}
local handler = coroutine.wrap(function(self)
local temp_start
while true do
for start = startme_len,1,-1 do
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs))
co_status[status(startme[startme_len].thread)](startme[startme_len].thread,startme[startme_len],t_none,nil,threads) -- Make sure there was no error
startme[startme_len] = nil
startme_len = #startme
for start = 1, #startme do
temp_start = startme[start]
table.remove(startme)
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(temp_start.thread,unpack(temp_start.startArgs))
co_status[status(temp_start.thread)](temp_start.thread,temp_start,t_none,nil,threads) -- Make sure there was no error
table.insert(threads,temp_start)
yield()
end
for i=#threads,1,-1 do
@ -1531,10 +1541,12 @@ end)
function multi:createHandler(threads,startme)
return coroutine.wrap(function(self)
while true do
for start = #startme,1,-1 do
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs))
co_status[status(startme[start].thread)](startme[start].thread,startme[start],t_none,nil,threads) -- Make sure there was no error
table.remove(startme)
for start = #startme, 1, -1 do
temp_start = startme[start]
table.remove(startme[start])
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(temp_start.thread,unpack(temp_start.startArgs))
co_status[status(temp_start.thread)](temp_start.thread,temp_start,t_none,nil,threads) -- Make sure there was no error
table.insert(threads,temp_start)
yield()
end
for i=#threads,1,-1 do

100
test.lua
View File

@ -1,113 +1,27 @@
--package.path = "./?/init.lua;"..package.path
multi,thread = require("multi"):init()
package.path = "./?/init.lua;"..package.path
multi, thread = require("multi"):init()
func = thread:newFunction(function(count)
local a = 0
while true do
a = a + 1
thread.sleep(.5)
thread.sleep(.1)
thread.pushStatus(a,count)
if a == count then break end
end
return "Done"
return "Done", 1, 2, 3
end)
multi:newThread("test",function()
thread:newThread("test",function()
local ret = func(10)
local ret2 = func(15)
local ret3 = func(20)
ret.OnStatus(function(part,whole)
print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%")
end)
ret2.OnStatus(function(part,whole)
print("Ret2: ",math.ceil((part/whole)*1000)/10 .."%")
end)
ret3.OnStatus(function(part,whole)
print("Ret3: ",math.ceil((part/whole)*1000)/10 .."%")
end)
thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn)
print("Status:",thread.hold(ret.OnReturn))
print("Function Done!")
os.exit()
end)
--GLOBAL,THREAD = require("multi.integration.threading"):init() -- Auto detects your environment and uses what's available
func2 = thread:newFunction(function()
thread.sleep(3)
print("Hello World!")
return true
end,true) -- set holdme to true
func2:holdMe(false) -- reset holdme to false
print("Calling func...")
print(func2())
test = thread:newFunction(function(a,b)
thread.sleep(1)
return a,b
end)
print(test(1,2).connect(function(...)
print(...)
end))
test:Pause()
print(test(1,2).connect(function(...)
print(...)
end))
test:Resume()
print(test(1,2).connect(function(...)
print(...)
end))
test = thread:newFunction(function()
return 1,2,nil,3,4,5,6,7,8,9
end,true)
print(test())
multi:newThread("testing",function()
print("#Test = ",test())
print(thread.hold(function()
print("Hello!")
return false
end,{
interval = 2,
cycles = 3
})) -- End result, 3 attempts within 6 seconds. If still false then timeout
print("held")
end).OnError(function(...)
print(...)
print("Error:",...)
end)
sandbox = multi:newProcessor("Test Processor")
sandbox:newTLoop(function()
print("testing...")
end,1)
test2 = multi:newTLoop(function()
print("testing2...")
end,1)
sandbox:newThread("Test Thread",function()
local a = 0
while true do
thread.sleep(1)
a = a + 1
print("Thread Test: ".. multi.getCurrentProcess().Name)
if a == 10 then
sandbox.Stop()
end
end
end).OnError(function(...)
print(...)
end)
multi:newThread("Test Thread",function()
while true do
thread.sleep(1)
print("Thread Test: ".. multi.getCurrentProcess().Name)
end
end).OnError(function(...)
print(...)
end)
sandbox.Start()
multi:mainloop()