From 0d6a9f92ada2e61c64f0f972f06ce075d0c01649 Mon Sep 17 00:00:00 2001 From: = <=> Date: Tue, 6 Dec 2022 00:53:03 -0500 Subject: [PATCH] Working on getting STC working in love2d --- integration/loveManager/extensions.lua | 324 +++++-------------------- integration/loveManager/init.lua | 3 +- 2 files changed, 66 insertions(+), 261 deletions(-) diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 3eb1a7a..965915a 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -22,218 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] --- TODO make compatible with lovr -local multi, thread = require("multi").init() -GLOBAL = multi.integration.GLOBAL -THREAD = multi.integration.THREAD -function multi:newSystemThreadedQueue(name) - local name = name or multi.randomString(16) - local c = {} - c.Name = name - local fRef = {"func",nil} - function c:init() - local q = {} - q.chan = love.thread.getChannel(self.Name) - function q:push(dat) - if type(dat) == "function" then - fRef[2] = THREAD.dump(dat) - self.chan:push(fRef) - return - else - self.chan:push(dat) - end - end - function q:pop() - local dat = self.chan:pop() - if type(dat)=="table" and dat[1]=="func" then - return THREAD.loadDump(dat[2]) - else - return dat - end - end - function q:peek() - local dat = self.chan:peek() - if type(dat)=="table" and dat[1]=="func" then - return THREAD.loadDump(dat[2]) - else - return dat - end - end - return q - end - THREAD.package(name,c) - return c -end -function multi:newSystemThreadedTable(name) - local name = name or multi.randomString(16) - local c = {} - c.Name = name - function c:init() - return THREAD.createTable(self.Name) - end - THREAD.package(name,c) - return c -end -local jqc = 1 -function multi:newSystemThreadedJobQueue(n) - local c = {} - c.cores = n or THREAD.getCores() - c.registerQueue = {} - c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") - c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") - c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") - c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") - c.id = 0 - c.OnJobCompleted = multi:newConnection() - local allfunc = 0 - function c:doToAll(func) - local f = THREAD.dump(func) - for i = 1, self.cores do - self.queueAll:push({allfunc,f}) - end - allfunc = allfunc + 1 - end - function c:registerFunction(name,func) - if self.funcs[name] then - error("A function by the name "..name.." has already been registered!") - end - self.funcs[name] = func - end - function c:pushJob(name,...) - self.id = self.id + 1 - self.queue:push{name,self.id,...} - return self.id - end - function c:isEmpty() - return queueJob:peek()==nil - end - local nFunc = 0 - function c:newFunction(name,func,holup) -- This registers with the queue - if type(name)=="function" then - holup = func - func = name - name = "JQ_Function_"..nFunc - end - nFunc = nFunc + 1 - c:registerFunction(name,func) - return thread:newFunction(function(...) - local id = c:pushJob(name,...) - local link - local rets - link = c.OnJobCompleted(function(jid,...) - if id==jid then - rets = {...} - link:Destroy() - end - end) - return thread.hold(function() - if rets then - return unpack(rets) or multi.NIL - end - end) - end,holup),name - end - thread:newThread("jobManager",function() - while true do - thread.yield() - local dat = c.queueReturn:pop() - if dat then - c.OnJobCompleted:Fire(unpack(dat)) - end - end - end) - for i=1,c.cores do - multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) - local multi, thread = require("multi"):init() - require("love.timer") - local function atomic(channel) - return channel:pop() - end - local clock = os.clock - local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") - local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") - local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") - local lastProc = clock() - local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") - local registry = {} - setmetatable(_G,{__index = funcs}) - thread:newThread("startUp",function() - while true do - thread.yield() - local all = queueAll:peek() - if all and not registry[all[1]] then - lastProc = os.clock() - THREAD.loadDump(queueAll:pop()[2])() - end - end - end) - thread:newThread("runner",function() - thread.sleep(.1) - while true do - thread.yield() - local all = queueAll:peek() - if all and not registry[all[1]] then - lastProc = os.clock() - THREAD.loadDump(queueAll:pop()[2])() - end - local dat = queue:performAtomic(atomic) - if dat then - lastProc = os.clock() - local name = table.remove(dat,1) - local id = table.remove(dat,1) - local tab = {funcs[name](unpack(dat))} - table.insert(tab,1,id) - queueReturn:push(tab) - end - end - end):OnError(function(...) - error(...) - end) - thread:newThread("Idler",function() - while true do - thread.yield() - if clock()-lastProc> 2 then - THREAD.sleep(.05) - else - THREAD.sleep(.001) - end - end - end) - multi:mainloop() - end,jqc) - end - jqc = jqc + 1 - return c +if not ISTHREAD then + multi, thread = require("multi").init() + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD +else + end ---[[ -MIT License - -Copyright (c) 2022 Ryan Ward - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sub-license, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -]] - --- TODO make compatible with lovr -local multi, thread = require("multi").init() -GLOBAL = multi.integration.GLOBAL -THREAD = multi.integration.THREAD function multi:newSystemThreadedQueue(name) local name = name or multi.randomString(16) local c = {} @@ -421,6 +217,59 @@ function multi:newSystemThreadedConnection(name) c.TRIG = 0x01 c.PING = 0x02 c.PONG = 0x03 + + local subscribe = multi:newSystemThreadedQueue("SUB_STC_" .. name):init() + + function c:init() + + self.subscribe = THREAD.waitFor("SUB_STC_" .. self.Name):init() + + function self:Fire(...) + local args = {...} + if self.CID == THREAD.getID() then -- Host Call + for _, link in pairs(self.links) do + link:push{self.TRIG, args} + end + self.proxy_conn:Fire(...) + else + self.subscribe:push{self.TRIG, args} + end + end + + local multi, thread = require("multi"):init() + self.links = {} + self.proxy_conn = multi:newConnection() + local mt = getmetatable(self.proxy_conn) + setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + if self.CID == THREAD.getID() then return self end + thread:newThread("STC_CONN_MAN" .. self.Name,function() + local item + local string_self_ref = "LSF_" .. multi.randomString(16) + local link_self_ref = multi:newSystemThreadedQueue():init() + self.subscribe:push{self.CONN, string_self_ref} + while true do + item = thread.hold(function() + return link_self_ref:peek() + end) + if item[1] == self.PING then + link_self_ref:push{self.PONG} + link_self_ref:pop() + elseif item[1] == self.CONN then + if string_self_ref ~= item[2] then + table.insert(self.links, THREAD.waitFor(item[2]):init()) + end + link_self_ref:pop() + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(unpack(item[2])) + link_self_ref:pop() + else + -- This shouldn't be the case + end + end + end).OnError(print) + return self + end + local function remove(a, b) local ai = {} local r = {} @@ -431,7 +280,6 @@ function multi:newSystemThreadedConnection(name) return r end c.CID = THREAD.getID() - c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() c.Name = name c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. -- Locals will only live in the thread that creates the original object @@ -455,6 +303,7 @@ function multi:newSystemThreadedConnection(name) ping = thread:newFunction(function(self) ping:Pause() + multi.ForEach(self.links, function(link) -- Sync new connections link:push{self.PING} multi:newThread("pong Thread", pong, link, links) @@ -482,63 +331,18 @@ function multi:newSystemThreadedConnection(name) end) if item[1] == c.CONN then multi.ForEach(c.links, function(link) -- Sync new connections - item[2]:push{c.CONN, link} + THREAD.waitFor(item[2]):init():push{c.CONN, link.Name} end) - c.links[#c.links+1] = item[2] + print("Adding link") + c.links[#c.links+1] = THREAD.waitFor(item[2]):init() elseif item[1] == c.TRIG then fire(unpack(item[2])) c.proxy_conn:Fire(unpack(item[2])) end end - end) + end).OnError(print) --- ^^^ This will only exist in the init thread - function c:Fire(...) - local args = {...} - if self.CID == THREAD.getID() then -- Host Call - for _, link in pairs(self.links) do - link:push {self.TRIG, args} - end - self.proxy_conn:Fire(...) - else - self.subscribe:push {self.TRIG, args} - end - end - - function c:init() - local multi, thread = require("multi"):init() - self.links = {} - self.proxy_conn = multi:newConnection() - local mt = getmetatable(self.proxy_conn) - setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) - if self.CID == THREAD.getID() then return self end - thread:newThread("STC_CONN_MAN"..name,function() - local item - local link_self_ref = multi:newSystemThreadedQueue() - self.subscribe:push{self.CONN, link_self_ref} - while true do - item = thread.hold(function() - return link_self_ref:peek() - end) - if item[1] == self.PING then - link_self_ref:push{self.PONG} - link_self_ref:pop() - elseif item[1] == self.CONN then - if item[2].Name ~= link_self_ref.Name then - table.insert(self.links, item[2]) - end - link_self_ref:pop() - elseif item[1] == self.TRIG then - self.proxy_conn:Fire(unpack(item[2])) - link_self_ref:pop() - else - -- This shouldn't be the case - end - end - end) - return self - end - THREAD.package(name,c) return c diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index f002d86..dd2d402 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -35,6 +35,7 @@ __THREADNAME__=table.remove(__IMPORTS,1) stab = THREAD.createStaticTable(__THREADNAME__) GLOBAL = THREAD.getGlobal() multi, thread = require("multi").init() +print(pcall(require,"multi.integration.loveManager.extensions")) stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))} ]] local multi, thread = require("multi"):init() @@ -103,7 +104,7 @@ end THREAD.newSystemThread = multi.newSystemThread function love.threaderror(thread, errorstr) - mulit.print("Thread error!\n"..errorstr) + multi.print("Thread error!\n"..errorstr) end multi.integration.GLOBAL = GLOBAL