Reworked coroutine based threading scheduler

This commit is contained in:
Ryan Ward 2019-08-04 22:54:52 -04:00
parent 498aa1f9aa
commit 29c8282efb
8 changed files with 167 additions and 190 deletions

View File

@ -1,4 +1,4 @@
# multi Version: 13.0.1 Bug fixes # multi Version: 13.1.0 Bug fixes and a few features added (See changes.md)
Found an issue? Please submit it and ill look into it! Found an issue? Please submit it and ill look into it!

View File

@ -1,14 +1,14 @@
# Changes # Changes
[TOC] [TOC]
Update 13.1.0 Bug fixes and some new features Update 13.1.0 Bug fixes and features added
------------- -------------
Added: Added:
- Connections:Lock() -- Prevents a connection object form being fired - Connections:Lock() -- Prevents a connection object form being fired
- Connections:Unlock() -- Removes the restriction imposed by conn:Lock() - Connections:Unlock() -- Removes the restriction imposed by conn:Lock()
- new fucntion added to the thread namespace - new fucntions added to the thread namespace
-- thread.request(THREAD handle,STRING cmd,VARARGS args) -- allows you to push thread requests from outside the running thread! Extremely powerful. -- thread.request(THREAD handle,STRING cmd,VARARGS args) -- allows you to push thread requests from outside the running thread! Extremely powerful.
-- thread.exec(FUNCTION func) -- Allows you to push code to run within the thread execution block! -- thread.exec(FUNCTION func) -- Allows you to push code to run within the thread execution block!
- handle = multi:newThread() now returns a thread handle to interact with the object - handle = multi:newThread() -- now returns a thread handle to interact with the object outside fo the thread
-- handle:Pause() -- handle:Pause()
-- handle:Resume() -- handle:Resume()
-- handle:Kill() -- handle:Kill()
@ -18,16 +18,16 @@ Fixed:
- Major bug with the system thread handler. Saw healthy threads as dead ones - Major bug with the system thread handler. Saw healthy threads as dead ones
- Major bug the thread scheduler was seen creating a massive amount of 'event' causing memory leaks and hard crashes! This has been fixed by changing how the scheduler opperates. - Major bug the thread scheduler was seen creating a massive amount of 'event' causing memory leaks and hard crashes! This has been fixed by changing how the scheduler opperates.
- newSystemThread()'s returned object now matches both the lanes and love2d in terms of methods that are usable. Error handling of System threads now behave the same across both love and lanes implementations. - newSystemThread()'s returned object now matches both the lanes and love2d in terms of methods that are usable. Error handling of System threads now behave the same across both love and lanes implementations.
- looks like I found a typo, thread.yeild -> thread.yield
Changed: Changed:
- getTasksDetails("t"), the table varaiant, formats threads, and system threads in the same way that tasks are formatted. Please see below for the format of the task details - getTasksDetails("t"), the table varaiant, formats threads, and system threads in the same way that tasks are formatted. Please see below for the format of the task details
- TID has been added to multi objects. They count up from 0 and no 2 objects will have the same number - TID has been added to multi objects. They count up from 0 and no 2 objects will have the same number
- thread.hold() -- As part of the memory leaks that I had to fix thread.hold() is slightly different. This change shouldn't impact previous code at all, but thread.hold() can not only return at most 7 arguments! - thread.hold() -- As part of the memory leaks that I had to fix thread.hold() is slightly different. This change shouldn't impact previous code at all, but thread.hold() can not only return at most 7 arguments!
- You should notice some faster code execution from threads, the changes improve preformance of threads greatly. They are now much faster than before! - You should notice some faster code execution from threads, the changes improve preformance of threads greatly. They are now much faster than before!
- multi:threadloop() -- No longer runs normal multi objects at all! The new change completely allows the multi objects to be seperated from the thread objects!
- local multi, thread = require("multi") -- Since coroutine based threading has seen a change to how it works, requring the multi library now returns the namespace for the threading interface as well. For now I will still inject into global the thread namespace, but in release 13.2.0 or 14.0.0 It will be removed!
Removed:
- multi:threadloop()
-- After some reworking to how threads are managed, this feature is no longer needed
# Tasks Details Table format # Tasks Details Table format
``` ```

View File

@ -24,6 +24,7 @@ SOFTWARE.
local bin = pcall(require,"bin") local bin = pcall(require,"bin")
local multi = {} local multi = {}
local clock = os.clock local clock = os.clock
local thread = {}
multi.Version = "13.1.0" multi.Version = "13.1.0"
multi._VERSION = "13.1.0" multi._VERSION = "13.1.0"
multi.stage = "stable" multi.stage = "stable"
@ -364,11 +365,11 @@ function multi:getTasksDetails(t)
end end
end end
local load, steps = multi:getLoad() local load, steps = multi:getLoad()
if multi.scheduler then if thread.__threads then
for i=1,#multi.scheduler.Threads do for i=1,#thread.__threads do
dat = dat .. "<THREAD: "..multi.scheduler.Threads[i].Name.." | "..os.clock()-multi.scheduler.Threads[i].creationTime..">\n" dat = dat .. "<THREAD: "..thread.__threads[i].Name.." | "..os.clock()-thread.__threads[i].creationTime..">\n"
end end
return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#multi.scheduler.Threads.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s.."\n\n"..dat..dat2 return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#thread.__threads.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s.."\n\n"..dat..dat2
else else
return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2 return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2
end end
@ -376,7 +377,7 @@ function multi:getTasksDetails(t)
local load,steps = multi:getLoad() local load,steps = multi:getLoad()
str = { str = {
ProcessName = (self.Name or "Unnamed"), ProcessName = (self.Name or "Unnamed"),
ThreadCount = #multi.scheduler.Threads, ThreadCount = #thread.__threads,
MemoryUsage = math.ceil(collectgarbage("count")), MemoryUsage = math.ceil(collectgarbage("count")),
PriorityScheme = priorityTable[multi.defaultSettings.priority or 0], PriorityScheme = priorityTable[multi.defaultSettings.priority or 0],
SystemLoad = multi.Round(load,2), SystemLoad = multi.Round(load,2),
@ -393,8 +394,8 @@ function multi:getTasksDetails(t)
for v,i in pairs(multi.PausedObjects) do for v,i in pairs(multi.PausedObjects) do
table.insert(str.Tasks,{Link = v, Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = v.TID}) table.insert(str.Tasks,{Link = v, Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = v.TID})
end end
for i=1,#multi.scheduler.Threads do for i=1,#thread.__threads do
table.insert(str.Threads,{Uptime = os.clock()-multi.scheduler.Threads[i].creationTime,Name = multi.scheduler.Threads[i].Name,Link = multi.scheduler.Threads[i],TID = multi.scheduler.Threads[i].TID}) table.insert(str.Threads,{Uptime = os.clock()-thread.__threads[i].creationTime,Name = thread.__threads[i].Name,Link = thread.__threads[i],TID = thread.__threads[i].TID})
end end
if multi.SystemThreads then if multi.SystemThreads then
for i=1,#multi.SystemThreads do for i=1,#multi.SystemThreads do
@ -1459,7 +1460,6 @@ function multi:newWatcher(namespace,name)
end end
end end
-- Threading stuff -- Threading stuff
thread={}
multi.GlobalVariables={} multi.GlobalVariables={}
if os.getOS()=="windows" then if os.getOS()=="windows" then
thread.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS")) thread.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS"))
@ -1491,12 +1491,13 @@ function thread.hold(n)
end end
function thread.skip(n) function thread.skip(n)
thread._Requests() thread._Requests()
coroutine.yield({"_skip_",n or 0}) if not n then n = 1 elseif n<1 then n = 1 end
coroutine.yield({"_skip_",n})
end end
function thread.kill() function thread.kill()
coroutine.yield({"_kill_",":)"}) coroutine.yield({"_kill_",":)"})
end end
function thread.yeild() function thread.yield()
thread._Requests() thread._Requests()
coroutine.yield({"_sleep_",0}) coroutine.yield({"_sleep_",0})
end end
@ -1541,11 +1542,11 @@ function multi.print(...)
print(...) print(...)
end end
end end
multi:setDomainName("Threads")
multi:setDomainName("Globals")
local initT = false local initT = false
local threadCount = 0 local threadCount = 0
local threadid = 0 local threadid = 0
thread.__threads = {}
local threads = thread.__threads
function multi:newThread(name,func) function multi:newThread(name,func)
local func = func or name local func = func or name
if type(name) == "function" then if type(name) == "function" then
@ -1560,7 +1561,6 @@ function multi:newThread(name,func)
c.TID = threadid c.TID = threadid
c.firstRunDone=false c.firstRunDone=false
c.timer=multi:newTimer() c.timer=multi:newTimer()
c.ref.Globals=self:linkDomain("Globals")
c._isPaused = false c._isPaused = false
function c:isPaused() function c:isPaused()
return self._isPaused return self._isPaused
@ -1613,7 +1613,7 @@ function multi:newThread(name,func)
function c.ref:syncGlobals(v) function c.ref:syncGlobals(v)
self.Globals=v self.Globals=v
end end
table.insert(self:linkDomain("Threads"),c) table.insert(threads,c)
if initT==false then if initT==false then
multi.initThreads() multi.initThreads()
end end
@ -1629,87 +1629,139 @@ function multi.initThreads()
self.skip=tonumber(n) or 24 self.skip=tonumber(n) or 24
end end
multi.scheduler.skip=0 multi.scheduler.skip=0
multi.scheduler.Threads=multi:linkDomain("Threads")
multi.scheduler.Globals=multi:linkDomain("Globals")
local holds = {}
local skips = {}
local t0,t1,t2,t3,t4,t5,t6 local t0,t1,t2,t3,t4,t5,t6
local ret local ret,_
multi.scheduler:OnLoop(function(self) local function helper(i)
for i=#skips,1,-1 do if ret then
skips[i].pos = skips[i].pos + 1 if ret[1]=="_kill_" then
if skips[i].count==skips[i].pos then table.remove(threads,i)
skips[i].Link.sleep=0 elseif ret[1]=="_sleep_" then
skips[i]=nil threads[i].sec = ret[2]
end threads[i].time = clock()
end threads[i].task = "sleep"
for i=#holds,1,-1 do threads[i].__ready = false
if holds[i] then ret = nil
t0,t1,t2,t3,t4,t5,t6 = holds[i].func() elseif ret[1]=="_skip_" then
if t0 then threads[i].count = ret[2]
holds[i].Link.sleep = 0 threads[i].pos = 0
holds[i].Link.returns = {t0,t1,t2,t3,t4,t5,t6} threads[i].task = "skip"
holds[i]=nil threads[i].__ready = false
end ret = nil
end elseif ret[1]=="_hold_" then
end threads[i].func = ret[2]
for i=#self.Threads,1,-1 do threads[i].task = "hold"
if coroutine.status(self.Threads[i].thread)=="dead" then threads[i].__ready = false
table.remove(self.Threads,i)
else
if self.Threads[i].timer:Get()>=self.Threads[i].sleep then
if self.Threads[i].firstRunDone==false then
self.Threads[i].firstRunDone=true
self.Threads[i].timer:Start()
if unpack(self.Threads[i].returns or {}) then
_,ret=coroutine.resume(self.Threads[i].thread,unpack(self.Threads[i].returns))
else
_,ret=coroutine.resume(self.Threads[i].thread,self.Threads[i].ref)
end
else
if unpack(self.Threads[i].returns or {}) then
_,ret=coroutine.resume(self.Threads[i].thread,unpack(self.Threads[i].returns))
else
_,ret=coroutine.resume(self.Threads[i].thread,self.Globals)
end
end
if _==false then
self.Parent.OnError:Fire(self.Threads[i],"Error in thread: <"..self.Threads[i].Name.."> "..ret)
end
if ret==true or ret==false then
multi.print("Thread Ended!!!")
ret = nil ret = nil
end end
end end
if ret then end
if ret[1]=="_kill_" then multi.scheduler:OnLoop(function(self)
table.remove(self.Threads,i) for i=#threads,1,-1 do
elseif ret[1]=="_sleep_" then if not threads[i].__started then
self.Threads[i].timer:Reset() _,ret=coroutine.resume(threads[i].thread)
self.Threads[i].sleep=ret[2] threads[i].__started = true
elseif ret[1]=="_skip_" then helper(i)
self.Threads[i].timer:Reset() end
self.Threads[i].sleep=math.huge if not _ then
table.insert(skips,{ multi:OnError("Error in thread <"..threads[i].Name..">", ret)
Link = self.Threads[i], end
count = ret[2], if coroutine.status(threads[i].thread)=="dead" then
pos = 0, table.remove(threads,i)
}) elseif threads[i].task == "skip" then
elseif ret[1]=="_hold_" then threads[i].pos = threads[i].pos + 1
self.Threads[i].timer:Reset() if threads[i].count==threads[i].pos then
self.Threads[i].sleep=math.huge threads[i].task = ""
table.insert(holds,{ threads[i].__ready = true
Link = self.Threads[i], end
func = ret[2] elseif threads[i].task == "hold" then
}) t0,t1,t2,t3,t4,t5,t6 = threads[i].func()
elseif ret.Name then if t0 then
self.Globals[ret.Name]=ret.Value threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i].task == "sleep" then
if clock() - threads[i].time>=threads[i].sec then
threads[i].task = ""
threads[i].__ready = true
end end
end end
if threads[i].__ready then
threads[i].__ready = false
_,ret=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6)
end end
helper(i)
end end
end) end)
end end
function multi:threadloop()
initT = true
multi.scheduler=multi:newLoop():setName("multi.thread")
multi.scheduler.Type="scheduler"
function multi.scheduler:setStep(n)
self.skip=tonumber(n) or 24
end
multi.scheduler.skip=0
local t0,t1,t2,t3,t4,t5,t6
local ret,_
local function helper(i)
if ret then
if ret[1]=="_kill_" then
table.remove(threads,i)
elseif ret[1]=="_sleep_" then
threads[i].sec = ret[2]
threads[i].time = clock()
threads[i].task = "sleep"
threads[i].__ready = false
ret = nil
elseif ret[1]=="_skip_" then
threads[i].count = ret[2]
threads[i].pos = 0
threads[i].task = "skip"
threads[i].__ready = false
ret = nil
elseif ret[1]=="_hold_" then
threads[i].func = ret[2]
threads[i].task = "hold"
threads[i].__ready = false
ret = nil
end
end
end
while true do
for i=#threads,1,-1 do
if not threads[i].__started then
_,ret=coroutine.resume(threads[i].thread)
threads[i].__started = true
helper(i)
end
if coroutine.status(threads[i].thread)=="dead" then
table.remove(threads,i)
elseif threads[i].task == "skip" then
threads[i].pos = threads[i].pos + 1
if threads[i].count==threads[i].pos then
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i].task == "hold" then
t0,t1,t2,t3,t4,t5,t6 = threads[i].func()
if t0 then
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i].task == "sleep" then
if clock() - threads[i].time>=threads[i].sec then
threads[i].task = ""
threads[i].__ready = true
end
end
if threads[i].__ready then
threads[i].__ready = false
_,ret=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6)
end
helper(i)
end
end
end
multi.OnError=multi:newConnection() multi.OnError=multi:newConnection()
function multi:newThreadedProcess(name) function multi:newThreadedProcess(name)
local c = {} local c = {}
@ -2498,4 +2550,7 @@ end
function multi:setDefualtStateFlag(opt) function multi:setDefualtStateFlag(opt)
-- --
end end
return multi if not(multi.Version == "13.2.0" or multi.Version == "14.0.0") then
_G.thread = thread
end
return multi, thread

View File

@ -31,7 +31,7 @@ function os.getOS()
end end
-- Step 1 get lanes -- Step 1 get lanes
lanes=require("lanes").configure() lanes=require("lanes").configure()
local multi = require("multi") -- get it all and have it on all lanes local multi, thread = require("multi") -- get it all and have it on all lanes
multi.SystemThreads = {} multi.SystemThreads = {}
local thread = thread local thread = thread
multi.isMainThread=true multi.isMainThread=true

View File

@ -35,7 +35,7 @@ local function _INIT(luvitThread,timer)
end end
end end
-- Step 1 get setup threads on luvit... Sigh how do i even... -- Step 1 get setup threads on luvit... Sigh how do i even...
local multi = require("multi") local multi, thread = require("multi")
isMainThread=true isMainThread=true
function multi:canSystemThread() function multi:canSystemThread()
return true return true

View File

@ -21,7 +21,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] ]]
local multi = require("multi") local multi, thread = require("multi")
local net = require("net") local net = require("net")
local bin = require("bin") local bin = require("bin")
bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix

View File

@ -21,7 +21,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] ]]
multi = require("multi") multi, thread = require("multi")
function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a channel on both ends function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a channel on both ends
local c={} -- where we will store our object local c={} -- where we will store our object
c.name=name -- set the name this is important for the love2d side c.name=name -- set the name this is important for the love2d side

108
test.lua
View File

@ -1,99 +1,21 @@
package.path="?/init.lua;?.lua;"..package.path package.path="?/init.lua;?.lua;"..package.path
multi = require("multi") multi = require("multi")
--~ local GLOBAL,THREAD = require("multi.integration.lanesManager").init() local a=0
--~ nGLOBAL = require("multi.integration.networkManager").init()
--~ function table.print(tbl, indent)
--~ if type(tbl)~="table" then return end
--~ if not indent then indent = 0 end
--~ for k, v in pairs(tbl) do
--~ formatting = string.rep(' ', indent) .. k .. ': '
--~ if type(v) == 'table' then
--~ print(formatting)
--~ table.print(v, indent+1)
--~ else
--~ print(formatting .. tostring(v))
--~ end
--~ end
--~ end
--~ print(#multi.SystemThreads)
--~ multi:newThread("Detail Updater",function()
--~ while true do
--~ thread.sleep(1)
--~ print(multi:getTasksDetails())
--~ print("-----")
--~ table.print(multi:getTasksDetails("t"))
--~ io.read()
--~ end
--~ end)
--~ multi.OnSystemThreadDied(function(...)
--~ print("why you say dead?",...)
--~ end)
--~ multi.OnError(function(...)
--~ print(...)
--~ end)
--~ multi:newSystemThread("TestSystem",function()
--~ while true do
--~ THREAD.sleep(1)
--~ print("I'm alive")
--~ end
--~ end)
--~ print(#multi.SystemThreads)
--~ multi:mainloop{
--~ protect = false,
--~ print = true
--~ }
--~ function tprint (tbl, indent)
--~ if not indent then indent = 0 end
--~ for k, v in pairs(tbl) do
--~ formatting = string.rep(" ", indent) .. k .. ": "
--~ if type(v) == "table" then
--~ print(formatting)
--~ tprint(v, indent+1)
--~ elseif type(v) == 'boolean' then
--~ print(formatting .. tostring(v))
--~ else
--~ print(formatting .. tostring(v))
--~ end
--~ end
--~ end
--~ t = multi:newThread("test",function()
--~ while true do
--~ thread.sleep(.5)
--~ print("A test!")
--~ end
--~ end)
--~ multi:newAlarm(3):OnRing(function()
--~ multi:newAlarm(3):OnRing(function()
--~ t:Resume()
--~ end)
--~ t:Pause()
--~ end)
--~ multi.OnError(function(...)
--~ print(...)
--~ end)
--~ function test()
--~ while true do
--~ a=a+1
--~ end
--~ end
--~ g=string.dump(test)
--~ print(g)
--~ if g:find(" thread") then
--~ print("Valid Thread!")
--~ elseif (g:find("K")) and not g:find(" thread") then
--~ print("Invalid Thread!")
--~ else
--~ print("Should be safe")
--~ end
a=0
multi:newTLoop(function()
a=a+1
end,1)
multi:newThread("Test",function() multi:newThread("Test",function()
while true do while true do
-- thread.hold(function()
return a%4000000==0 and a~=0
end)
print(thread.getCores(),a)
end end
end) end)
multi:mainloop() multi:newThread("Test2",function()
while true do
thread.yield()
a=a+1
end
end)
multi.OnError(function(...)
print(...)
end)
multi:threadloop()