V15.3.0 #46

Merged
rayaman merged 85 commits from v15.3.0 into network_parallelism_test_branch 2022-06-11 23:41:07 -04:00
12 changed files with 319 additions and 371 deletions
Showing only changes of commit 593bfd0d8c - Show all commits

View File

@ -35,6 +35,7 @@ end
multi.Version = "15.2.0"
multi.Name = "multi.root"
multi.NIL = {Type="NIL"}
local NIL = multi.NIL
multi.Mainloop = {}
multi.Children = {}
multi.Active = true
@ -286,9 +287,6 @@ function multi:newConnection(protect,func,kill)
else
function c:Fire(...)
for i=#call_funcs,1,-1 do
if type(call_funcs[i])=="table" then
print(call_funcs[i].Parent.Type)
end
call_funcs[i](...)
if kill then
table.remove(call_funcs,i)
@ -307,9 +305,6 @@ function multi:newConnection(protect,func,kill)
end
end
function self:connect(func)
if type(func) == "table" then
print(debug.traceback())
end
table.insert(fast,func)
end
end
@ -388,9 +383,6 @@ function multi:newConnection(protect,func,kill)
function c:connect(...)--func,name,num
local tab = {...}
local funcs={}
if type(tab[1])=="table" then
print(debug.traceback())
end
for i=1,#tab do
if type(tab[i])=="function" then
funcs[#funcs+1] = tab[i]
@ -714,13 +706,19 @@ function multi:newAlarm(set)
return c
end
function multi:newLoop(func)
function multi:newLoop(func,notime)
local c=self:newBase()
c.Type='loop'
local start=clock()
if notime then
function c:Act()
self.OnLoop:Fire(self)
end
else
function c:Act()
self.OnLoop:Fire(self,clock()-start)
end
end
c.OnLoop = self:newConnection()
function c:fastMode()
self.OnLoop:fastMode()
@ -735,7 +733,6 @@ function multi:newLoop(func)
end
function multi:newStep(start,reset,count,skip)
print(self.Type)
local c=self:newBase()
think=1
c.Type='step'
@ -829,7 +826,7 @@ function multi:newTLoop(func,set)
end
function multi:setTimeout(func,t)
multi:newThread(function() thread.sleep(t) func() end)
thread:newThread(function() thread.sleep(t) func() end)
end
function multi:newTStep(start,reset,count,set)
@ -882,7 +879,7 @@ local sthread
function multi:scheduleJob(time,func)
if not sthread then
sthread = multi:newThread("JobScheduler",function()
sthread = thread:newThread("JobScheduler",function()
local time = os.date("*t", os.time())
local ready = false
while true do
@ -930,7 +927,7 @@ function multi:newProcessor(name,nothread)
c.Type = "process"
c.Active = false or nothread
c.Name = name or ""
c.process = self:newThread(function()
c.process = thread:newThread(function()
while true do
thread.hold(function() return c.Active end)
__CurrentProcess = c
@ -957,8 +954,6 @@ function multi:newProcessor(name,nothread)
function c:Destroy()
self.OnObjectDestroyed:Fire(c)
end
c:attachScheduler()
c.initThreads()
return c
end
@ -973,7 +968,7 @@ function multi.hold(func,opt)
local proc = multi.getCurrentTask()
proc:Pause()
if type(func)=="number" then
self:newThread("Hold_func",function()
thread:newThread("Hold_func",function()
thread.hold(func)
death = true
end)
@ -983,7 +978,7 @@ function multi.hold(func,opt)
proc:Resume()
else
local rets
self:newThread("Hold_func",function()
thread:newThread("Hold_func",function()
rets = {thread.hold(func,opt)}
death = true
end)
@ -996,7 +991,6 @@ function multi.hold(func,opt)
end
-- Threading stuff
local initT = false
local threadCount = 0
local threadid = 0
thread.__threads = {}
@ -1006,6 +1000,7 @@ local dFunc = function() return true end
thread.requests = {}
local CMD = {} -- We will compare this special local
local interval
local resume, status, create, yield, running = coroutine.resume, coroutine.status, coroutine.create, coroutine.yield, coroutine.running
local t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none = 1, 2, 3, 4, 5, 6, 7
function thread.request(t,cmd,...)
@ -1025,9 +1020,9 @@ function thread.getRunningThread()
end
function thread._Requests()
local t = thread.requests[coroutine.running()]
local t = thread.requests[running()]
if t then
thread.requests[coroutine.running()] = nil
thread.requests[running()] = nil
local cmd,args = t[1],t[2]
thread[cmd](unpack(args))
end
@ -1036,7 +1031,7 @@ end
function thread.sleep(n)
thread._Requests()
thread.getRunningThread().lastSleep = clock()
return coroutine.yield(CMD, t_sleep, n or 1)
return yield(CMD, t_sleep, n or 1)
end
function thread.hold(n,opt)
@ -1045,27 +1040,27 @@ function thread.hold(n,opt)
if type(opt)=="table" then
interval = opt.interval
if opt.cycles then
return coroutine.yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval)
return yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval)
elseif opt.sleep then
return coroutine.yield(CMD, t_holdF, opt.sleep, n or dFunc, interval)
return yield(CMD, t_holdF, opt.sleep, n or dFunc, interval)
elseif opt.skip then
return coroutine.yield(CMD, t_skip, opt.skip or 1, nil, interval)
return yield(CMD, t_skip, opt.skip or 1, nil, interval)
end
end
if type(n) == "number" then
thread.getRunningThread().lastSleep = clock()
return coroutine.yield(CMD, t_sleep, n or 0, nil, interval)
return yield(CMD, t_sleep, n or 0, nil, interval)
elseif type(n) == "table" and n.Type == "connector" then
local ready = false
n(function()
ready = true
end)
return coroutine.yield(CMD, t_hold, function()
return yield(CMD, t_hold, function()
return ready
end)
elseif type(n) == "function" then
return coroutine.yield(CMD, t_hold, n or dFunc, nil, interval)
return yield(CMD, t_hold, n or dFunc, nil, interval)
else
error("Invalid argument passed to thread.hold(...)!")
end
@ -1073,17 +1068,17 @@ end
function thread.holdFor(sec,n)
thread._Requests()
return coroutine.yield(CMD, t_holdF, sec, n or dFunc)
return yield(CMD, t_holdF, sec, n or dFunc)
end
function thread.holdWithin(skip,n)
thread._Requests()
return coroutine.yield(CMD, t_holdW, skip or 1, n or dFunc)
return yield(CMD, t_holdW, skip or 1, n or dFunc)
end
function thread.skip(n)
thread._Requests()
return coroutine.yield(CMD, t_skip, n or 1)
return yield(CMD, t_skip, n or 1)
end
function thread.kill()
@ -1092,15 +1087,15 @@ end
function thread.yield()
thread._Requests()
return coroutine.yield(CMD, t_yield)
return yield(CMD, t_yield)
end
function thread.isThread()
if _VERSION~="Lua 5.1" then
local a,b = coroutine.running()
local a,b = running()
return not(b)
else
return coroutine.running()~=nil
return running()~=nil
end
end
@ -1224,7 +1219,7 @@ end
function thread:newFunction(func,holdme)
return thread:newFunctionBase(function(...)
return multi.getCurrentProcess():newThread("TempThread",func,...)
return thread:newThread("TempThread",func,...)
end,holdme)()
end
@ -1235,13 +1230,11 @@ function multi.setEnv(func,env)
return chunk
end
function multi:attachScheduler()
local resume, status, create = coroutine.resume, coroutine.status, coroutine.create
local threads = {}
local startme = {}
local startme_len = 0
function self:newThread(name,func,...)
self.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called
function thread:newThread(name,func,...)
multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called
local func = func or name
if type(name) == "function" then
name = "Thread#"..threadCount
@ -1260,8 +1253,8 @@ function multi:attachScheduler()
c._isPaused = false
c.returns = {}
c.isError = false
c.OnError = self:newConnection(true,nil,true)
c.OnDeath = self:newConnection(true,nil,true)
c.OnError = multi:newConnection(true,nil,true)
c.OnDeath = multi:newConnection(true,nil,true)
function c:isPaused()
return self._isPaused
@ -1313,59 +1306,42 @@ function multi:attachScheduler()
table.insert(threads,c)
table.insert(startme,c)
startme_len = #startme
print("startme:",startme_len)
globalThreads[c] = self
if initT==false then
self.initThreads()
end
globalThreads[c] = multi
threadid = threadid + 1
multi:create(c)
c.creationTime = os.clock()
return c
end
function self:newISOThread(name,func,_env,...)
self.OnLoad:Fire()
function thread:newISOThread(name,func,_env,...)
local func = func or name
local env = _env or {}
if not env.thread then
env.thread = thread
end
if not env.multi then
env.multi = self
env.multi = multi
end
if type(name) == "function" then
name = "Thread#"..threadCount
end
local func = isolateFunction(func,env)
return self:newThread(name,func)
return thread:newThread(name,func)
end
function self.initThreads(justThreads)
initT = true
self.scheduler=self:newLoop():setName("multi.thread")
self.scheduler.Type="scheduler"
function self.scheduler:setStep(n)
self.skip=tonumber(n) or 24
end
self.scheduler.skip=0
multi.newThread = thread.newThread
multi.newISOThread = thread.newISOThread
local t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15
local r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16
local ret,_
local task, thd, ref, ready
--[[
if coroutine.running() ~= threads[i].thread then
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(coreads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
CheckRets(i)
end
]]
-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_conn, t_none <-- Order
local switch = {
function(th,co)--hold
if clock() - th.intervalR>=th.interval then
t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func()
if t0 then
if t0==self.NIL then t0 = nil end
if t0==NIL then t0 = nil end
th.task = t_none
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=resume(co,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15)
end
@ -1383,7 +1359,7 @@ function multi:attachScheduler()
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func()
if t0 then
if t0 then
if t0==self.NIL then t0 = nil end
if t0==NIL then t0 = nil end
th.task = t_none
end
th.task = t_none
@ -1410,7 +1386,7 @@ function multi:attachScheduler()
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = th.func()
if t0 then
if t0 then
if t0==self.NIL then t0 = nil end
if t0==NIL then t0 = nil end
th.task = t_none
end
th.task = ""
@ -1432,21 +1408,17 @@ function multi:attachScheduler()
setmetatable(switch,{__index=function() return function() end end})
local cmds = {-- ipart: t_hold, t_sleep, t_holdF, t_skip, t_holdW, t_yield, t_none <-- Order
function(th,arg1,arg2,arg3)
--print("hold",arg1,arg2,arg3)
--print(_,ret,r1,r2,r3,r4,r5)
th.func = arg1
th.task = t_hold
th.interval = arg3 or 0
th.intervalR = clock()
end,
function(th,arg1,arg2,arg3)
--print("sleep",arg1,arg2,arg3)
th.sec = arg1
th.time = clock()
th.task = t_sleep
end,
function(th,arg1,arg2,arg3)
--print("holdf",arg1,arg2,arg3)
th.sec = arg1
th.func = arg2
th.task = t_holdF
@ -1455,13 +1427,11 @@ function multi:attachScheduler()
th.intervalR = clock()
end,
function(th,arg1,arg2,arg3)
--print("skip",arg1,arg2,arg3)
th.count = arg1
th.pos = 0
th.task = t_skip
end,
function(th,arg1,arg2,arg3)
--print("holdw",arg1,arg2,arg3)
th.count = arg1
th.pos = 0
th.func = arg2
@ -1483,10 +1453,8 @@ function multi:attachScheduler()
["running"] = function(thd,ref) print("Running Status") io.read() end,
["dead"] = function(thd,ref,task,i)
if _ then
print("Death")
ref.OnDeath:Fire(ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
else
print("Error",ref,ret)
ref.OnError:Fire(ref,ret)
end
if i then
@ -1503,15 +1471,17 @@ function multi:attachScheduler()
--self.setType(ref,self.DestroyedObj)
end,
}
self.scheduler:OnLoop(function(self)
for i=#threads,1,-1 do
-- First time setup for threads
local count = 0
local handler = coroutine.wrap(function(self)
while true do
for start = startme_len,1,-1 do
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(startme[start].thread,unpack(startme[start].startArgs))
co_status[status(startme[startme_len].thread)](startme[startme_len].thread,startme[startme_len],t_none) --cmds[r1](startme[start],r2,r3,r4,r5)
co_status[status(startme[startme_len].thread)](startme[startme_len].thread,startme[startme_len],t_none) -- Make sure there was no error
startme[startme_len] = nil
startme_len = #startme
yield()
end
for i=#threads,1,-1 do
if threads[i] then
ref = threads[i]
task = ref.task
@ -1519,10 +1489,11 @@ function multi:attachScheduler()
ready = ref.__ready
co_status[status(thd)](thd,ref,task,i)
end
yield()
end
end
end)
end
end
function multi:newService(func) -- Priority managed threads
local c = {}
@ -1553,7 +1524,7 @@ function multi:newService(func) -- Priority managed threads
task(ap)
return c
end
local th = self:newThread(function()
local th = thread:newThread(function()
while true do
process()
end
@ -1636,6 +1607,7 @@ local function mainloop(self)
ctask:Act()
__CurrentProcess = self
end
handler()
end
else
return nil, "Already Running!"
@ -1665,6 +1637,7 @@ local function p_mainloop(self)
end
end
end
handler()
end
else
return nil, "Already Running!"
@ -1693,6 +1666,7 @@ function multi:uManager()
multi.OnPreLoad:Fire()
self.uManager=self.uManagerRef
multi.OnLoad:Fire()
handler()
end
end
@ -1709,6 +1683,7 @@ function multi:uManagerRefP1()
end
end
end
handler()
end
end
@ -1721,6 +1696,7 @@ function multi:uManagerRef()
__CurrentTask:Act()
__CurrentProcess = self
end
handler()
end
end
@ -2026,5 +2002,4 @@ else
multi.m.sentinel = newproxy(true)
getmetatable(multi.m.sentinel).__gc = multi.m.onexit
end
multi:attachScheduler()
return multi

View File

@ -113,7 +113,7 @@ function multi:newSystemThreadedJobQueue(n)
end)
end,holup),name
end
multi:newThread("JobQueueManager",function()
thread:newthread("JobQueueManager",function()
while true do
local job = thread.hold(function()
return queueReturn:pop()
@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n)
local clock = os.clock
local ref = 0
setmetatable(_G,{__index = funcs})
multi:newThread("JobHandler",function()
thread:newthread("JobHandler",function()
while true do
local dat = thread.hold(function()
return queueJob:pop()
@ -141,7 +141,7 @@ function multi:newSystemThreadedJobQueue(n)
queueReturn:push{jid, funcs[name](unpack(args)),queue}
end
end)
multi:newThread("DoAllHandler",function()
thread:newthread("DoAllHandler",function()
while true do
local dat = thread.hold(function()
return doAll:peek()
@ -156,7 +156,7 @@ function multi:newSystemThreadedJobQueue(n)
end
end
end)
multi:newThread("IdleHandler",function()
thread:newthread("IdleHandler",function()
while true do
thread.hold(function()
return clock()-idle>3

View File

@ -105,14 +105,11 @@ function multi.InitSystemThreadErrorHandler()
return
end
started = true
multi:newThread("SystemThreadScheduler",function()
thread:newthread("SystemThreadScheduler",function()
local threads = multi.SystemThreads
while true do
thread.sleep(.005) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough.
local _,data = __ConsoleLinda:receive(0, "Q")
if data then
print(unpack(data))
end
for i = #threads, 1, -1 do
local status = threads[i].thread.status
local temp = threads[i]
@ -147,8 +144,6 @@ function multi.InitSystemThreadErrorHandler()
end
end
end
end).OnError(function(...)
print(...)
end)
end

View File

@ -131,7 +131,7 @@ function multi:newSystemThreadedJobQueue(n)
end)
end,holup),name
end
multi:newThread("jobManager",function()
thread:newthread("jobManager",function()
while true do
thread.yield()
local dat = c.queueReturn:pop()
@ -155,7 +155,7 @@ function multi:newSystemThreadedJobQueue(n)
local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
setmetatable(_G,{__index = funcs})
multi:newThread("startUp",function()
thread:newthread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
@ -165,7 +165,7 @@ function multi:newSystemThreadedJobQueue(n)
end
end
end)
multi:newThread("runner",function()
thread:newthread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
@ -187,7 +187,7 @@ function multi:newSystemThreadedJobQueue(n)
end):OnError(function(...)
error(...)
end)
multi:newThread("Idler",function()
thread:newthread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then

View File

@ -122,7 +122,7 @@ function multi:newSystemThread(name,func,...)
GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread}
GLOBAL["__THREAD_COUNT"] = THREAD_ID
THREAD_ID=THREAD_ID+1
multi:newThread(function()
thread:newthread(function()
while true do
thread.yield()
if c.stab["returns"] then

View File

@ -167,7 +167,7 @@ if not ISTHREAD then
local clock = os.clock
local lastproc = clock()
local queue = love.thread.getChannel("__CONSOLE__")
multi:newThread("consoleManager",function()
thread:newthread("consoleManager",function()
while true do
thread.yield()
dat = queue:pop()

View File

@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n)
end)
end,holup),name
end
multi:newThread("jobManager",function()
thread:newthread("jobManager",function()
while true do
thread.yield()
local dat = c.queueReturn:pop()
@ -153,7 +153,7 @@ function multi:newSystemThreadedJobQueue(n)
local queueAll = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
setmetatable(_G,{__index = funcs})
multi:newThread("startUp",function()
thread:newthread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
@ -163,7 +163,7 @@ function multi:newSystemThreadedJobQueue(n)
end
end
end)
multi:newThread("runner",function()
thread:newthread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
@ -185,7 +185,7 @@ function multi:newSystemThreadedJobQueue(n)
end):OnError(function(...)
error(...)
end)
multi:newThread("Idler",function()
thread:newthread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then

View File

@ -168,7 +168,7 @@ if not ISTHREAD then
local clock = os.clock
local lastproc = clock()
local queue = lovr.thread.getChannel("__CONSOLE__")
multi:newThread("consoleManager",function()
thread:newthread("consoleManager",function()
while true do
thread.yield()
dat = queue:pop()

View File

@ -72,7 +72,7 @@ function master:newNetworkThread(nodeName,func,...)
local ret
local nID = netID
local conn = multi:newConnection()
multi:newThread(function()
thread:newthread(function()
dat:addBlock{
args = args,
func = func,
@ -143,7 +143,7 @@ function multi:newMasterNode(cd)
else
c:getNodesFromBroadcast()
end
multi:newThread("CMDQueueProcessor",function()
thread:newthread("CMDQueueProcessor",function()
while true do
thread.skip(128)
local data = table.remove(c._queue,1)

View File

@ -125,7 +125,7 @@ function multi:newSystemThreadedJobQueue(n)
end,holup),name
end
for i=1,c.cores do
multi:newThread("PesudoThreadedJobQueue_"..i,function()
thread:newthread("PesudoThreadedJobQueue_"..i,function()
while true do
thread.yield()
if #jobs>0 then

View File

@ -70,7 +70,7 @@ function multi:newSystemThread(name,func,...)
env[tab[i]] = _G[tab[i]]
end
--setmetatable(env,{__index=env})
multi:newISOThread(name,func,env,...).OnError(function(self,msg)
thread:newISOThread(name,func,env,...).OnError(function(self,msg)
print("ERROR:",msg)
end)
id = id + 1

View File

@ -4,42 +4,20 @@ local multi,thread = require("multi"):init()
Before AVG: 522386
Test 1 AVG:
]]
local sleep_for = 5
local sleep_for = 1
local conn = multi:newConnection()
local test = {}
local function bench(_,steps)
print("Steps/5s: "..steps)
print("Steps/1s: "..steps)
os.exit()
end
local ready = false
multi:newAlarm(3):OnRing(function()
conn:Fire()
ready = true
end)
multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench)
multi:newThread("Thread 1",function()
for i = 1,400 do
thread:newThread(function()
while true do
thread.sleep(1)
print("Test 1")
thread.hold(conn)
print("Conn sleep test")
error("hi")
thread.sleep(.1)
end
end).OnError(print)
multi:newThread("Thread 2",function()
print("Thread 2")
return "it worked"
end):OnDeath(print):OnError(error)
multi:newThread("Thread 3",function()
thread.hold(function()
return ready
end)
print("Function test")
return "Yay we did it"
end).OnDeath(print)
-- multi.OnExit(function()
-- print("Total: ".. a)
-- end)
end
multi:benchMark(sleep_for,multi.Priority_Core,"Core:"):OnBench(bench)
multi:mainloop()