diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 0e7ba6e..188efcd 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -79,25 +79,27 @@ function multi:newProxy(list) thread:newThread(function() while true do local data = thread.hold(check) - local func = table.remove(data, 1) - local sref = table.remove(data, 1) - local ret - if sref then - ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} - else - ret = {_G[list[0]][func](multi.unpack(data))} - end - for i = 1,#ret do - if type(ret[i]) == "table" and getmetatable(ret[i]) then - setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side! + if data then + local func = table.remove(data, 1) + local sref = table.remove(data, 1) + local ret + if sref then + ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} + else + ret = {_G[list[0]][func](multi.unpack(data))} end - if ret[i] == _G[list[0]] then - -- We cannot return itself, that return can contain bad values. - ret[i] = {_self_ref_ = true} + for i = 1,#ret do + if type(ret[i]) == "table" and getmetatable(ret[i]) then + setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side! + end + if ret[i] == _G[list[0]] then + -- We cannot return itself, that return can contain bad values. + ret[i] = {_self_ref_ = true} + end end + table.insert(ret, 1, func) + self.recv:push(ret) end - table.insert(ret, 1, func) - self.recv:push(ret) end end).OnError(print) return self @@ -221,11 +223,11 @@ function multi:newSystemThreadedProcessor(cores) end) c.jobqueue:registerFunction("STP_GetThreadCount",function() - return {"t_thread", _G["__THREADS"]} + return _G["__THREADS"] end) c.jobqueue:registerFunction("STP_GetTaskCount",function() - return {"t_task", _G["__TASKS"]} + return _G["__TASKS"] end) function c:pushJob(ID, name, ...) @@ -372,18 +374,40 @@ function multi:newSystemThreadedProcessor(cores) end -- Special functions - c.getLeastLoaded = thread:newFunction(function(self) + c.getLeastLoaded = thread:newFunction(function(self, tp) local loads = {} - local jid = {} - for i,v in pairs(self.proc_list) do - table.insert(jid, self:pushJob(v, "STP_GetThreadCount")) - table.insert(jid, self:pushJob(v, "STP_GetTaskCount")) - end - - end) + local func - c.jobqueue.OnJobCompleted(function(id, ...) - -- + if tp then + func = "STP_GetThreadCount" + else + func = "STP_GetTaskCount" + end + + for i,v in pairs(self.proc_list) do + local conn + local jid = self:pushJob(v, func) + + conn = c.jobqueue.OnJobCompleted(function(id, data) + if id == jid then + table.insert(loads, {v, data}) + multi:newAlarm(1):OnRing(function(alarm) + c.jobqueue.OnJobCompleted:Unconnect(conn) + alarm:Destroy() + end) + end + end) + end + + thread.hold(function() return #loads == c.cores end) + if tp then + multi.print("Threads\n-------") + else + multi.print("Tasks\n-----") + end + for i,v in pairs(loads) do + print(v[1], v[2]) + end end) return c @@ -408,6 +432,7 @@ function thread.hold(n, opt) end return unpack(rets) end) + local conn local handle = func(name) conn = handle.OnReturn(function(...) @@ -421,11 +446,13 @@ function thread.hold(n, opt) return multi.unpack(args) or multi.NIL end end, opt)} + for i,v in pairs(ret) do if type(v) == "table" and v._self_ref_ == "parent" then ret[i] = n.Parent end end + return unpack(ret) else return thread_ref(n, opt)