Getting loads of processors implemented

This commit is contained in:
Ryan Ward 2023-05-28 00:37:17 -04:00
parent 06c31bee85
commit 3effcb7384

View File

@ -79,25 +79,27 @@ function multi:newProxy(list)
thread:newThread(function() thread:newThread(function()
while true do while true do
local data = thread.hold(check) local data = thread.hold(check)
local func = table.remove(data, 1) if data then
local sref = table.remove(data, 1) local func = table.remove(data, 1)
local ret local sref = table.remove(data, 1)
if sref then local ret
ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} if sref then
else ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))}
ret = {_G[list[0]][func](multi.unpack(data))} else
end ret = {_G[list[0]][func](multi.unpack(data))}
for i = 1,#ret do
if type(ret[i]) == "table" and getmetatable(ret[i]) then
setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side!
end end
if ret[i] == _G[list[0]] then for i = 1,#ret do
-- We cannot return itself, that return can contain bad values. if type(ret[i]) == "table" and getmetatable(ret[i]) then
ret[i] = {_self_ref_ = true} setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side!
end
if ret[i] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values.
ret[i] = {_self_ref_ = true}
end
end end
table.insert(ret, 1, func)
self.recv:push(ret)
end end
table.insert(ret, 1, func)
self.recv:push(ret)
end end
end).OnError(print) end).OnError(print)
return self return self
@ -221,11 +223,11 @@ function multi:newSystemThreadedProcessor(cores)
end) end)
c.jobqueue:registerFunction("STP_GetThreadCount",function() c.jobqueue:registerFunction("STP_GetThreadCount",function()
return {"t_thread", _G["__THREADS"]} return _G["__THREADS"]
end) end)
c.jobqueue:registerFunction("STP_GetTaskCount",function() c.jobqueue:registerFunction("STP_GetTaskCount",function()
return {"t_task", _G["__TASKS"]} return _G["__TASKS"]
end) end)
function c:pushJob(ID, name, ...) function c:pushJob(ID, name, ...)
@ -372,18 +374,40 @@ function multi:newSystemThreadedProcessor(cores)
end end
-- Special functions -- Special functions
c.getLeastLoaded = thread:newFunction(function(self) c.getLeastLoaded = thread:newFunction(function(self, tp)
local loads = {} local loads = {}
local jid = {} local func
for i,v in pairs(self.proc_list) do
table.insert(jid, self:pushJob(v, "STP_GetThreadCount"))
table.insert(jid, self:pushJob(v, "STP_GetTaskCount"))
end
end)
c.jobqueue.OnJobCompleted(function(id, ...) if tp then
-- func = "STP_GetThreadCount"
else
func = "STP_GetTaskCount"
end
for i,v in pairs(self.proc_list) do
local conn
local jid = self:pushJob(v, func)
conn = c.jobqueue.OnJobCompleted(function(id, data)
if id == jid then
table.insert(loads, {v, data})
multi:newAlarm(1):OnRing(function(alarm)
c.jobqueue.OnJobCompleted:Unconnect(conn)
alarm:Destroy()
end)
end
end)
end
thread.hold(function() return #loads == c.cores end)
if tp then
multi.print("Threads\n-------")
else
multi.print("Tasks\n-----")
end
for i,v in pairs(loads) do
print(v[1], v[2])
end
end) end)
return c return c
@ -408,6 +432,7 @@ function thread.hold(n, opt)
end end
return unpack(rets) return unpack(rets)
end) end)
local conn local conn
local handle = func(name) local handle = func(name)
conn = handle.OnReturn(function(...) conn = handle.OnReturn(function(...)
@ -421,11 +446,13 @@ function thread.hold(n, opt)
return multi.unpack(args) or multi.NIL return multi.unpack(args) or multi.NIL
end end
end, opt)} end, opt)}
for i,v in pairs(ret) do for i,v in pairs(ret) do
if type(v) == "table" and v._self_ref_ == "parent" then if type(v) == "table" and v._self_ref_ == "parent" then
ret[i] = n.Parent ret[i] = n.Parent
end end
end end
return unpack(ret) return unpack(ret)
else else
return thread_ref(n, opt) return thread_ref(n, opt)