From 06c31bee85873a143b38937738a1d8a831d1052a Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 27 May 2023 23:57:58 -0400 Subject: [PATCH] Clean up connection events when holding, working on scheduling tasks/threads to system threaded processors --- init.lua | 4 +- integration/loveManager/extensions.lua | 6 +-- integration/sharedExtensions/init.lua | 57 ++++++++++++++++++++------ 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/init.lua b/init.lua index 2c38a88..1d45f14 100644 --- a/init.lua +++ b/init.lua @@ -1261,9 +1261,11 @@ local function conn_test(conn) ready = true args = {...} end - conn(func) + + local ref = conn(func) return function() if ready then + conn:Unconnect(ref) return multi.unpack(args) or multi.NIL end end diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 70bad7a..8740018 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -231,7 +231,7 @@ function multi:newSystemThreadedConnection(name) function self:Fire(...) local args = {...} - if self.CID == THREAD.getID() then -- Host Call + if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do love.thread.getChannel(link):push{self.TRIG, args} end @@ -246,7 +246,7 @@ function multi:newSystemThreadedConnection(name) self.proxy_conn = multi:newConnection() local mt = getmetatable(self.proxy_conn) setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) - if self.CID == THREAD.getID() then return self end + if self.CID == THREAD_ID then return self end thread:newThread("STC_CONN_MAN" .. self.Name,function() local item local string_self_ref = "LSF_" .. multi.randomString(16) @@ -284,7 +284,7 @@ function multi:newSystemThreadedConnection(name) end return r end - c.CID = THREAD.getID() + c.CID = THREAD_ID c.Name = name c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. -- Locals will only live in the thread that creates the original object diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 86f292c..0e7ba6e 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -141,7 +141,6 @@ function multi:newProxy(list) return self end end - return c end @@ -185,6 +184,7 @@ function multi:newSystemThreadedProcessor(cores) setmetatable(c,{__index = multi}) + c.threads = {} c.cores = cores or 8 c.Name = name c.Mainloop = {} @@ -196,7 +196,7 @@ function multi:newSystemThreadedProcessor(cores) c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init() - c.jobqueue:registerFunction("enable_targets",function(name) + c.jobqueue:registerFunction("STP_enable_targets",function(name) local multi, thread = require("multi"):init() local qname = THREAD_NAME .. "_t_queue" local targetedQueue = THREAD.waitFor(name):init() @@ -220,6 +220,14 @@ function multi:newSystemThreadedProcessor(cores) end).OnError(multi.error) end) + c.jobqueue:registerFunction("STP_GetThreadCount",function() + return {"t_thread", _G["__THREADS"]} + end) + + c.jobqueue:registerFunction("STP_GetTaskCount",function() + return {"t_task", _G["__TASKS"]} + end) + function c:pushJob(ID, name, ...) targets[ID]:push{name, jid, {...}} jid = jid - 1 @@ -227,7 +235,9 @@ function multi:newSystemThreadedProcessor(cores) end c.jobqueue:doToAll(function(name) - enable_targets(name) + STP_enable_targets(name) + _G["__THREADS"] = 0 + _G["__TASKS"] = 0 end, name.."_target") local count = 0 @@ -235,13 +245,14 @@ function multi:newSystemThreadedProcessor(cores) local dat = c.targetedQueue:pop() if dat then targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init() + table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list count = count + 1 end end c.jobqueue:registerFunction("packObj",function(obj) local multi, thread = require("multi"):init() - obj.getThreadID = function() -- Special function we are adding + obj.getThreadID = function() -- Special functions we are adding return THREAD_ID end @@ -260,12 +271,14 @@ function multi:newSystemThreadedProcessor(cores) c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) local multi, thread = require("multi"):init() local obj = thread:newThread(name, func, ...) + _G["__THREADS"] = _G["__THREADS"] + 1 return packObj(obj) end, true) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) local multi, thread = require("multi"):init() local obj = multi[obj](multi, func, ...) + _G["__TASKS"] = _G["__TASKS"] + 1 return packObj(obj) end, true) @@ -312,11 +325,11 @@ function multi:newSystemThreadedProcessor(cores) end function c:getHandler() - -- Not needed + return function() end -- return empty function end function c:getThreads() - -- We might want to keep track of the number of threads we have + return self.threads end function c:getFullName() @@ -330,6 +343,7 @@ function multi:newSystemThreadedProcessor(cores) function c:newThread(name, func, ...) proxy = self.spawnThread(name, func, ...):init() proxy.__proc = self + table.insert(self.threads, proxy) return proxy end @@ -338,25 +352,40 @@ function multi:newSystemThreadedProcessor(cores) end function c.run() - -- Not needed + return self end function c.isActive() - -- + return true end function c.Start() - -- + return self end function c.Stop() - -- + return self end function c:Destroy() - -- + return false end + -- Special functions + c.getLeastLoaded = thread:newFunction(function(self) + local loads = {} + local jid = {} + for i,v in pairs(self.proc_list) do + table.insert(jid, self:pushJob(v, "STP_GetThreadCount")) + table.insert(jid, self:pushJob(v, "STP_GetTaskCount")) + end + + end) + + c.jobqueue.OnJobCompleted(function(id, ...) + -- + end) + return c end @@ -379,10 +408,14 @@ function thread.hold(n, opt) end return unpack(rets) end) - func(name).OnReturn(function(...) + local conn + local handle = func(name) + conn = handle.OnReturn(function(...) ready = true args = {...} + handle.OnReturn:Unconnect(conn) end) + local ret = {thread_ref(function() if ready then return multi.unpack(args) or multi.NIL