Working on 16.0.0 #53

Merged
rayaman merged 120 commits from v16.0.0 into master 2024-02-25 00:00:51 -05:00
5 changed files with 115 additions and 53 deletions
Showing only changes of commit b2569118a2 - Show all commits

View File

@ -217,7 +217,7 @@ function multi:newConnection(protect,func,kill)
end end
end, end,
__mod = function(obj1, obj2) -- % __mod = function(obj1, obj2) -- %
local cn = multi:newConnection() local cn = self:newConnection()
if type(obj1) == "function" and type(obj2) == "table" then if type(obj1) == "function" and type(obj2) == "table" then
obj2(function(...) obj2(function(...)
cn:Fire(obj1(...)) cn:Fire(obj1(...))
@ -228,7 +228,7 @@ function multi:newConnection(protect,func,kill)
return cn return cn
end, end,
__concat = function(obj1, obj2) -- .. __concat = function(obj1, obj2) -- ..
local cn = multi:newConnection() local cn = self:newConnection()
local ref local ref
if type(obj1) == "function" and type(obj2) == "table" then if type(obj1) == "function" and type(obj2) == "table" then
cn(function(...) cn(function(...)
@ -260,7 +260,7 @@ function multi:newConnection(protect,func,kill)
return cn return cn
end, end,
__add = function(c1,c2) -- Or __add = function(c1,c2) -- Or
local cn = multi:newConnection() local cn = self:newConnection()
c1(function(...) c1(function(...)
cn:Fire(...) cn:Fire(...)
end) end)
@ -270,7 +270,7 @@ function multi:newConnection(protect,func,kill)
return cn return cn
end, end,
__mul = function(c1,c2) -- And __mul = function(c1,c2) -- And
local cn = multi:newConnection() local cn = self:newConnection()
local ref1, ref2 local ref1, ref2
if c1.__hasInstances == nil then if c1.__hasInstances == nil then
cn.__hasInstances = {2} cn.__hasInstances = {2}
@ -531,8 +531,8 @@ function multi:SetTime(n)
c.timer:Start() c.timer:Start()
c.set=n c.set=n
c.link=self c.link=self
c.OnTimedOut = multi:newConnection() c.OnTimedOut = self:newConnection()
c.OnTimerResolved = multi:newConnection() c.OnTimerResolved = self:newConnection()
self._timer=c.timer self._timer=c.timer
function c:Act() function c:Act()
if self.timer:Get()>=self.set then if self.timer:Get()>=self.set then
@ -647,8 +647,8 @@ function multi:newBase(ins)
c.func={} c.func={}
c.funcTM={} c.funcTM={}
c.funcTMR={} c.funcTMR={}
c.OnBreak = multi:newConnection() c.OnBreak = self:newConnection()
c.OnPriorityChanged = multi:newConnection() c.OnPriorityChanged = self:newConnection()
c.TID = _tid c.TID = _tid
c.Act=function() end c.Act=function() end
c.Parent=self c.Parent=self
@ -1050,7 +1050,7 @@ end
local sandcount = 1 local sandcount = 1
function multi:newProcessor(name, nothread) function multi:newProcessor(name, nothread, priority)
local c = {} local c = {}
setmetatable(c,{__index = multi}) setmetatable(c,{__index = multi})
local name = name or "Processor_" .. sandcount local name = name or "Processor_" .. sandcount
@ -1062,9 +1062,14 @@ function multi:newProcessor(name, nothread)
c.threads = {} c.threads = {}
c.startme = {} c.startme = {}
c.parent = self c.parent = self
c.OnObjectCreated = multi:newConnection() c.OnObjectCreated = self:newConnection()
local handler = c:createHandler(c) local handler
if priority then
handler = c:createPriorityHandler(c)
else
handler = c:createHandler(c)
end
if not nothread then -- Don't create a loop if we are triggering this manually if not nothread then -- Don't create a loop if we are triggering this manually
c.process = self:newLoop(function() c.process = self:newLoop(function()
@ -1078,7 +1083,7 @@ function multi:newProcessor(name, nothread)
c.process.PID = sandcount c.process.PID = sandcount
c.OnError = c.process.OnError c.OnError = c.process.OnError
else else
c.OnError = multi:newConnection() c.OnError = self:newConnection()
end end
c.OnError(multi.error) c.OnError(multi.error)
@ -1422,16 +1427,16 @@ function thread:newFunctionBase(generator, holdme)
return wait() return wait()
end end
local temp = { local temp = {
OnStatus = multi:newConnection(true), OnStatus = multi:getCurrentProcess():newConnection(true),
OnError = multi:newConnection(true), OnError = multi:getCurrentProcess():newConnection(true),
OnReturn = multi:newConnection(true), OnReturn = multi:getCurrentProcess():newConnection(true),
isTFunc = true, isTFunc = true,
wait = wait, wait = wait,
getReturns = function() getReturns = function()
return multi.unpack(rets) return multi.unpack(rets)
end, end,
connect = function(f) connect = function(f)
local tempConn = multi:newConnection(true) local tempConn = multi:getCurrentProcess():newConnection(true)
t.OnDeath(function(...) if f then f(...) else tempConn:Fire(...) end end) t.OnDeath(function(...) if f then f(...) else tempConn:Fire(...) end end)
t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end) t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end)
return tempConn return tempConn
@ -1455,13 +1460,18 @@ function thread:newFunction(func, holdme)
end, holdme)() end, holdme)()
end end
function thread:newProcessor(name) function thread:newProcessor(name, nothread, priority)
-- Inactive proxy proc -- Inactive proxy proc
local proc = multi:getCurrentProcess():newProcessor(name, true) local proc = multi:getCurrentProcess():newProcessor(name, true)
local thread_proc = multi:getCurrentProcess():newProcessor(name).Start() local thread_proc = multi:getCurrentProcess():newProcessor(name).Start()
local Active = true local Active = true
local handler = thread_proc:getHandler() local handler
if priority then
handler = thread_proc:createPriorityHandler(c)
else
handler = thread_proc:createHandler(c)
end
function proc:getThreads() function proc:getThreads()
return thread_proc.threads return thread_proc.threads
@ -1545,9 +1555,16 @@ function thread:newThread(name, func, ...)
c.firstRunDone=false c.firstRunDone=false
c._isPaused = false c._isPaused = false
c.returns = {} c.returns = {}
c.isError = false c.isError = false
c.OnError = multi:newConnection(true,nil,true)
c.OnDeath = multi:newConnection(true,nil,true) if self.Type == multi.PROCESS then
c.OnError = self:newConnection(true,nil,true)
c.OnDeath = self:newConnection(true,nil,true)
else
c.OnError = threadManager:newConnection(true,nil,true)
c.OnDeath = threadManager:newConnection(true,nil,true)
end
c.OnError(multi.error) c.OnError(multi.error)
function c:getName() function c:getName()
@ -1619,7 +1636,11 @@ function thread:newThread(name, func, ...)
globalThreads[c] = multi globalThreads[c] = multi
threadid = threadid + 1 threadid = threadid + 1
multi:getCurrentProcess():create(c) if self.Type == multi.PROCESS then
self:create(c)
else
threadManager:create(c)
end
c.creationTime = clock() c.creationTime = clock()
return c return c
end end
@ -1820,6 +1841,31 @@ function multi:createHandler()
end) end)
end end
function multi:createPriorityHandler()
local threads, startme = self.threads, self.startme
return coroutine.wrap(function()
local temp_start
while true do
while #startme>0 do
temp_start = table.remove(startme)
_, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs))
co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads)
table.insert(threads, temp_start)
end
for i=#threads,1,-1 do
ref = threads[i]
if ref then
task = ref.task
thd = ref.thread
ready = ref.__ready
co_status[status(thd)](thd, ref, task, i, threads)
end
end
yield()
end
end)
end
function multi:newService(func) -- Priority managed threads function multi:newService(func) -- Priority managed threads
local c = {} local c = {}
c.Type = multi.SERVICE c.Type = multi.SERVICE
@ -2247,7 +2293,7 @@ function multi:benchMark(sec,p,pt)
c=c+1 c=c+1
end end
end) end)
temp.OnBench = multi:newConnection() temp.OnBench = self:newConnection()
temp:setPriority(p or 1) temp:setPriority(p or 1)
return temp return temp
end end
@ -2412,7 +2458,11 @@ else
getmetatable(multi.m.sentinel).__gc = multi.m.onexit getmetatable(multi.m.sentinel).__gc = multi.m.onexit
end end
threadManager = multi:newProcessor("Global_Thread_Manager").Start() threadManager = multi:newProcessor("Global_Thread_Manager", nil, true).Start()
function multi:getThreadManagerProcess()
return threadManager
end
function multi:getHandler() function multi:getHandler()
return threadManager:getHandler() return threadManager:getHandler()

View File

@ -0,0 +1,37 @@
local multi, thread = require("multi"):init()
local dbg = {}
local creation_hook
creation_hook = function(obj, process)
print("Created: ",obj.Type, "in", process.Type, process:getFullName())
if obj.Type == multi.PROCESS then
obj.OnObjectCreated(creation_hook)
end
end
local tmulti = multi:getThreadManagerProcess()
multi.OnObjectCreated(creation_hook)
tmulti.OnObjectCreated(creation_hook)
--[[
multi.ROOTPROCESS = "rootprocess"
multi.CONNECTOR = "connector"
multi.TIMEMASTER = "timemaster"
multi.PROCESS = "process"
multi.TIMER = "timer"
multi.EVENT = "event"
multi.UPDATER = "updater"
multi.ALARM = "alarm"
multi.LOOP = "loop"
multi.TLOOP = "tloop"
multi.STEP = "step"
multi.TSTEP = "tstep"
multi.THREAD = "thread"
multi.SERVICE = "service"
multi.PROXY = "proxy"
multi.THREADEDFUNCTION = "threaded_function"
]]
return dbg

View File

@ -133,6 +133,7 @@ function multi:newSystemThreadedJobQueue(n)
link = c.OnJobCompleted(function(jid,...) link = c.OnJobCompleted(function(jid,...)
if id==jid then if id==jid then
rets = multi.pack(...) rets = multi.pack(...)
c.OnJobCompleted:Unconnect(link)
end end
end) end)
return thread.hold(function() return thread.hold(function()

View File

@ -32,7 +32,9 @@ if multi.integration then -- This allows us to call the lanes manager from suppo
} }
end end
-- Step 1 get lanes -- Step 1 get lanes
lanes = require("lanes").configure() lanes = require("lanes").configure{
nb_keepers = 4,
}
multi.SystemThreads = {} multi.SystemThreads = {}
multi.isMainThread = true multi.isMainThread = true

View File

@ -329,33 +329,5 @@ function multi:newSystemThreadedProcessor(cores)
return false return false
end end
c.getLoad = thread:newFunction(function(self, tp)
local loads = {}
local func
if tp then
func = "STP_GetThreadCount"
else
func = "STP_GetTaskCount"
end
for i,v in pairs(self.proc_list) do
local conn
local jid = self:pushJob(v, func)
conn = self.jobqueue.OnJobCompleted(function(id, data)
if id == jid then
table.insert(loads, {v, data})
multi:newTask(function()
self.jobqueue.OnJobCompleted:Unconnect(conn)
end)
end
end)
end
thread.hold(function() return #loads == c.cores end)
return loads
end, true)
return c return c
end end