Clean up connection events when holding, working on scheduling tasks/threads to system threaded processors

This commit is contained in:
Ryan Ward 2023-05-27 23:57:58 -04:00
parent 5c03b34290
commit 06c31bee85
3 changed files with 51 additions and 16 deletions

View File

@ -1261,9 +1261,11 @@ local function conn_test(conn)
ready = true ready = true
args = {...} args = {...}
end end
conn(func)
local ref = conn(func)
return function() return function()
if ready then if ready then
conn:Unconnect(ref)
return multi.unpack(args) or multi.NIL return multi.unpack(args) or multi.NIL
end end
end end

View File

@ -231,7 +231,7 @@ function multi:newSystemThreadedConnection(name)
function self:Fire(...) function self:Fire(...)
local args = {...} local args = {...}
if self.CID == THREAD.getID() then -- Host Call if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do for _, link in pairs(self.links) do
love.thread.getChannel(link):push{self.TRIG, args} love.thread.getChannel(link):push{self.TRIG, args}
end end
@ -246,7 +246,7 @@ function multi:newSystemThreadedConnection(name)
self.proxy_conn = multi:newConnection() self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn) local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
if self.CID == THREAD.getID() then return self end if self.CID == THREAD_ID then return self end
thread:newThread("STC_CONN_MAN" .. self.Name,function() thread:newThread("STC_CONN_MAN" .. self.Name,function()
local item local item
local string_self_ref = "LSF_" .. multi.randomString(16) local string_self_ref = "LSF_" .. multi.randomString(16)
@ -284,7 +284,7 @@ function multi:newSystemThreadedConnection(name)
end end
return r return r
end end
c.CID = THREAD.getID() c.CID = THREAD_ID
c.Name = name c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object -- Locals will only live in the thread that creates the original object

View File

@ -141,7 +141,6 @@ function multi:newProxy(list)
return self return self
end end
end end
return c return c
end end
@ -185,6 +184,7 @@ function multi:newSystemThreadedProcessor(cores)
setmetatable(c,{__index = multi}) setmetatable(c,{__index = multi})
c.threads = {}
c.cores = cores or 8 c.cores = cores or 8
c.Name = name c.Name = name
c.Mainloop = {} c.Mainloop = {}
@ -196,7 +196,7 @@ function multi:newSystemThreadedProcessor(cores)
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init() c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init()
c.jobqueue:registerFunction("enable_targets",function(name) c.jobqueue:registerFunction("STP_enable_targets",function(name)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local qname = THREAD_NAME .. "_t_queue" local qname = THREAD_NAME .. "_t_queue"
local targetedQueue = THREAD.waitFor(name):init() local targetedQueue = THREAD.waitFor(name):init()
@ -220,6 +220,14 @@ function multi:newSystemThreadedProcessor(cores)
end).OnError(multi.error) end).OnError(multi.error)
end) end)
c.jobqueue:registerFunction("STP_GetThreadCount",function()
return {"t_thread", _G["__THREADS"]}
end)
c.jobqueue:registerFunction("STP_GetTaskCount",function()
return {"t_task", _G["__TASKS"]}
end)
function c:pushJob(ID, name, ...) function c:pushJob(ID, name, ...)
targets[ID]:push{name, jid, {...}} targets[ID]:push{name, jid, {...}}
jid = jid - 1 jid = jid - 1
@ -227,7 +235,9 @@ function multi:newSystemThreadedProcessor(cores)
end end
c.jobqueue:doToAll(function(name) c.jobqueue:doToAll(function(name)
enable_targets(name) STP_enable_targets(name)
_G["__THREADS"] = 0
_G["__TASKS"] = 0
end, name.."_target") end, name.."_target")
local count = 0 local count = 0
@ -235,13 +245,14 @@ function multi:newSystemThreadedProcessor(cores)
local dat = c.targetedQueue:pop() local dat = c.targetedQueue:pop()
if dat then if dat then
targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init() targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init()
table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list
count = count + 1 count = count + 1
end end
end end
c.jobqueue:registerFunction("packObj",function(obj) c.jobqueue:registerFunction("packObj",function(obj)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
obj.getThreadID = function() -- Special function we are adding obj.getThreadID = function() -- Special functions we are adding
return THREAD_ID return THREAD_ID
end end
@ -260,12 +271,14 @@ function multi:newSystemThreadedProcessor(cores)
c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local obj = thread:newThread(name, func, ...) local obj = thread:newThread(name, func, ...)
_G["__THREADS"] = _G["__THREADS"] + 1
return packObj(obj) return packObj(obj)
end, true) end, true)
c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local obj = multi[obj](multi, func, ...) local obj = multi[obj](multi, func, ...)
_G["__TASKS"] = _G["__TASKS"] + 1
return packObj(obj) return packObj(obj)
end, true) end, true)
@ -312,11 +325,11 @@ function multi:newSystemThreadedProcessor(cores)
end end
function c:getHandler() function c:getHandler()
-- Not needed return function() end -- return empty function
end end
function c:getThreads() function c:getThreads()
-- We might want to keep track of the number of threads we have return self.threads
end end
function c:getFullName() function c:getFullName()
@ -330,6 +343,7 @@ function multi:newSystemThreadedProcessor(cores)
function c:newThread(name, func, ...) function c:newThread(name, func, ...)
proxy = self.spawnThread(name, func, ...):init() proxy = self.spawnThread(name, func, ...):init()
proxy.__proc = self proxy.__proc = self
table.insert(self.threads, proxy)
return proxy return proxy
end end
@ -338,25 +352,40 @@ function multi:newSystemThreadedProcessor(cores)
end end
function c.run() function c.run()
-- Not needed return self
end end
function c.isActive() function c.isActive()
-- return true
end end
function c.Start() function c.Start()
-- return self
end end
function c.Stop() function c.Stop()
-- return self
end end
function c:Destroy() function c:Destroy()
-- return false
end end
-- Special functions
c.getLeastLoaded = thread:newFunction(function(self)
local loads = {}
local jid = {}
for i,v in pairs(self.proc_list) do
table.insert(jid, self:pushJob(v, "STP_GetThreadCount"))
table.insert(jid, self:pushJob(v, "STP_GetTaskCount"))
end
end)
c.jobqueue.OnJobCompleted(function(id, ...)
--
end)
return c return c
end end
@ -379,10 +408,14 @@ function thread.hold(n, opt)
end end
return unpack(rets) return unpack(rets)
end) end)
func(name).OnReturn(function(...) local conn
local handle = func(name)
conn = handle.OnReturn(function(...)
ready = true ready = true
args = {...} args = {...}
handle.OnReturn:Unconnect(conn)
end) end)
local ret = {thread_ref(function() local ret = {thread_ref(function()
if ready then if ready then
return multi.unpack(args) or multi.NIL return multi.unpack(args) or multi.NIL