From a14cbb45d3971a42ea734c16bed82b67cf263434 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Thu, 22 Sep 2022 10:29:11 -0400 Subject: [PATCH] working on STC --- multi/init.lua | 5 + multi/integration/lanesManager/extensions.lua | 146 +++++++- multi/integration/lanesManager/init.lua | 2 + multi/integration/lanesManager/threads.lua | 2 +- multi/integration/loveManager/extensions.lua | 325 ++++++++++++++++++ test.lua | 84 ++++- 6 files changed, 543 insertions(+), 21 deletions(-) diff --git a/multi/init.lua b/multi/init.lua index 9625e4c..976170a 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -109,6 +109,11 @@ function multi:getStats() end --Helpers + +function multi.ForEach(tab,func) + for i=1,#tab do func(tab[i]) end +end + local ignoreconn = true function multi:newConnection(protect,func,kill) local c={} diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index 198e551..ad6c5cf 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -22,9 +22,15 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] local multi, thread = require("multi"):init() -local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD +if not (GLOBAL and THREAD) then + local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD +else + lanes = require("lanes") +end function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} + c.Name = name c.linda = lanes.linda() function c:push(v) self.linda:send("Q", v) @@ -41,9 +47,12 @@ function multi:newSystemThreadedQueue(name) GLOBAL[name or "_"] = c return c end + function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} c.link = lanes.linda() + c.Name = name setmetatable(c,{ __index = function(t,k) return c.link:get(k) @@ -58,14 +67,15 @@ function multi:newSystemThreadedTable(name) GLOBAL[name or "_"] = c return c end + function multi:newSystemThreadedJobQueue(n) local c = {} c.cores = n or THREAD.getCores()*2 c.OnJobCompleted = multi:newConnection() - local funcs = multi:newSystemThreadedTable() - local queueJob = multi:newSystemThreadedQueue() - local queueReturn = multi:newSystemThreadedQueue() - local doAll = multi:newSystemThreadedQueue() + local funcs = multi:newSystemThreadedTable():init() + local queueJob = multi:newSystemThreadedQueue():init() + local queueReturn = multi:newSystemThreadedQueue():init() + local doAll = multi:newSystemThreadedQueue():init() local ID=1 local jid = 1 function c:isEmpty() @@ -168,4 +178,128 @@ function multi:newSystemThreadedJobQueue(n) end,i).priority = thread.Priority_Core end return c -end \ No newline at end of file +end + +function multi:newSystemThreadedConnection(name) + local name = name or multi.randomString(16) + local c = {} + c.CONN = 0x00 + c.TRIG = 0x01 + c.PING = 0x02 + c.PONG = 0x03 + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end + c.CID = THREAD.getID() + c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() + c.Name = name + c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + -- Locals will only live in the thread that creates the original object + local ping + local pong = function(link, links) + local res = thread.hold(function() + return link:peek()[1] == c.PONG + end,{sleep=3}) + + if not res then + for i=1,#links do + if links[i] == link then + table.remove(links,i,link) + break + end + end + else + link:pop() + end + end + + ping = thread:newFunction(function(self) + ping:Pause() + multi.ForEach(self.links, function(link) -- Sync new connections + link:push{self.PING} + multi:newThread("pong Thread", pong, link, links) + end) + + thread.sleep(3) + + ping:Resume() + end,false) + + thread:newThread("STC_SUB_MAN"..name,function() + local item + while true do + thread.yield() + -- We need to check on broken connections + ping(c) -- Should return instantlly and process this in another thread + item = thread.hold(function() -- This will keep things held up until there is something to process + return c.subscribe:pop() + end) + if item[1] == c.CONN then + multi.ForEach(c.links, function(link) -- Sync new connections + item[2]:push{c.CONN, link} + end) + c.links[#c.links+1] = item[2] + elseif item[1] == c.TRIG then + c:Fire(unpack(item[2])) + c.proxy_conn:Fire(unpack(item[2])) + end + end + end).OnError(print) + --- ^^^ This will only exist in the init thread + + function c:Fire(...) + local args = {...} + if self.CID == THREAD.getID() then -- Host Call + for _, link in pairs(self.links) do + link:push {self.TRIG, args} + end + --c.proxy_conn:Fire(...) + else + self.subscribe:push {self.TRIG, args} + end + end + + function c:init() + local multi, thread = require("multi"):init() + self.links = {} + self.proxy_conn = multi:newConnection() + local mt = getmetatable(self.proxy_conn) + setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + if self.CID == THREAD.getID() then return self end + thread:newThread("STC_CONN_MAN"..name,function() + local item + local link_self_ref = multi:newSystemThreadedQueue() + self.subscribe:push{self.CONN, link_self_ref} + while true do + item = thread.hold(function() + return link_self_ref:peek() + end) + if item[1] == self.PING then + link_self_ref:push{self.PONG} + link_self_ref:pop() + elseif item[1] == self.CONN then + if item[2].Name ~= link_self_ref.Name then + table.insert(self.links, item[2]) + end + link_self_ref:pop() + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(unpack(item[2])) + link_self_ref:pop() + else + -- This shouldn't be the case + end + end + end).OnError(print) + return self + end + + GLOBAL[name] = c + + return c +end \ No newline at end of file diff --git a/multi/integration/lanesManager/init.lua b/multi/integration/lanesManager/init.lua index 4bada5a..57f507e 100644 --- a/multi/integration/lanesManager/init.lua +++ b/multi/integration/lanesManager/init.lua @@ -87,6 +87,7 @@ function multi:newSystemThread(name, func, ...) }, priority=c.priority },function(...) + require("multi.integration.lanesManager.extensions") local has_error = true return_linda:set("returns",{func(...)}) has_error = false @@ -119,6 +120,7 @@ function multi.InitSystemThreadErrorHandler() while true do thread.yield() _,data = __ConsoleLinda:receive(0, "Q") + if data then print(unpack(data)) end for i = #threads, 1, -1 do temp = threads[i] status = temp.thread.status diff --git a/multi/integration/lanesManager/threads.lua b/multi/integration/lanesManager/threads.lua index 68cb733..56b2340 100644 --- a/multi/integration/lanesManager/threads.lua +++ b/multi/integration/lanesManager/threads.lua @@ -75,7 +75,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda) c.queue:send("Q", {...}) end function c.error(err) - c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} + c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}) error(err) end return c diff --git a/multi/integration/loveManager/extensions.lua b/multi/integration/loveManager/extensions.lua index cf6beb2..ad8339e 100644 --- a/multi/integration/loveManager/extensions.lua +++ b/multi/integration/loveManager/extensions.lua @@ -201,5 +201,330 @@ function multi:newSystemThreadedJobQueue(n) end,jqc) end jqc = jqc + 1 + return c +end + +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] + +-- TODO make compatible with lovr +local multi, thread = require("multi").init() +GLOBAL = multi.integration.GLOBAL +THREAD = multi.integration.THREAD +function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) + local c = {} + c.Name = name + local fRef = {"func",nil} + function c:init() + local q = {} + q.chan = love.thread.getChannel(self.Name) + function q:push(dat) + if type(dat) == "function" then + fRef[2] = THREAD.dump(dat) + self.chan:push(fRef) + return + else + self.chan:push(dat) + end + end + function q:pop() + local dat = self.chan:pop() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end + end + function q:peek() + local dat = self.chan:peek() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end + end + return q + end + THREAD.package(name,c) + return c +end +function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) + local c = {} + c.Name = name + function c:init() + return THREAD.createTable(self.Name) + end + THREAD.package(name,c) + return c +end +local jqc = 1 +function multi:newSystemThreadedJobQueue(n) + local c = {} + c.cores = n or THREAD.getCores() + c.registerQueue = {} + c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") + c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") + c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") + c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") + c.id = 0 + c.OnJobCompleted = multi:newConnection() + local allfunc = 0 + function c:doToAll(func) + local f = THREAD.dump(func) + for i = 1, self.cores do + self.queueAll:push({allfunc,f}) + end + allfunc = allfunc + 1 + end + function c:registerFunction(name,func) + if self.funcs[name] then + error("A function by the name "..name.." has already been registered!") + end + self.funcs[name] = func + end + function c:pushJob(name,...) + self.id = self.id + 1 + self.queue:push{name,self.id,...} + return self.id + end + function c:isEmpty() + return queueJob:peek()==nil + end + local nFunc = 0 + function c:newFunction(name,func,holup) -- This registers with the queue + if type(name)=="function" then + holup = func + func = name + name = "JQ_Function_"..nFunc + end + nFunc = nFunc + 1 + c:registerFunction(name,func) + return thread:newFunction(function(...) + local id = c:pushJob(name,...) + local link + local rets + link = c.OnJobCompleted(function(jid,...) + if id==jid then + rets = {...} + link:Destroy() + end + end) + return thread.hold(function() + if rets then + return unpack(rets) or multi.NIL + end + end) + end,holup),name + end + thread:newThread("jobManager",function() + while true do + thread.yield() + local dat = c.queueReturn:pop() + if dat then + c.OnJobCompleted:Fire(unpack(dat)) + end + end + end) + for i=1,c.cores do + multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) + local multi, thread = require("multi"):init() + require("love.timer") + local function atomic(channel) + return channel:pop() + end + local clock = os.clock + local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") + local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") + local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") + local lastProc = clock() + local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") + local registry = {} + setmetatable(_G,{__index = funcs}) + thread:newThread("startUp",function() + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + THREAD.loadDump(queueAll:pop()[2])() + end + end + end) + thread:newThread("runner",function() + thread.sleep(.1) + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + THREAD.loadDump(queueAll:pop()[2])() + end + local dat = queue:performAtomic(atomic) + if dat then + lastProc = os.clock() + local name = table.remove(dat,1) + local id = table.remove(dat,1) + local tab = {funcs[name](unpack(dat))} + table.insert(tab,1,id) + queueReturn:push(tab) + end + end + end):OnError(function(...) + error(...) + end) + thread:newThread("Idler",function() + while true do + thread.yield() + if clock()-lastProc> 2 then + THREAD.sleep(.05) + else + THREAD.sleep(.001) + end + end + end) + multi:mainloop() + end,jqc) + end + jqc = jqc + 1 + return c +end + +function multi:newSystemThreadedConnection(name) + local name = name or multi.randomString(16) + local c = {} + c.CONN = 0x00 + c.TRIG = 0x01 + c.PING = 0x02 + c.PONG = 0x03 + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end + c.CID = THREAD.getID() + c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() + c.Name = name + c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + -- Locals will only live in the thread that creates the original object + local ping + local pong = function(link, links) + local res = thread.hold(function() + return link:peek()[1] == c.PONG + end,{sleep=3}) + + if not res then + self.links = remove(self.links, pings) + else + link:pop() + end + end + + ping = thread:newFunction(function(self) + ping:Pause() + multi.ForEach(self.links, function(link) -- Sync new connections + link:push{self.PING} + multi:newThread("pong Thread", pong, link, links) + end) + + thread.sleep(3) + + ping:Resume() + end,false) + + thread:newThread("STC_SUB_MAN"..name,function() + local item + while true do + thread.yield() + -- We need to check on broken connections + ping(c) -- Should return instantlly and process this in another thread + item = thread.hold(function() -- This will keep things held up until there is something to process + return c.subscribe:pop() + end) + if item[1] == c.CONN then + multi.ForEach(c.links, function(link) -- Sync new connections + item[2]:push{c.CONN, link} + end) + c.links[#c.links+1] = item[2] + elseif item[1] == c.TRIG then + c:Fire(unpack(item[2])) + c.proxy_conn:Fire(unpack(item[2])) + end + end + end) + --- ^^^ This will only exist in the init thread + + function c:Fire(...) + local args = {...} + if self.CID == THREAD.getID() then -- Host Call + for _, link in pairs(self.links) do + link:push {self.TRIG, args} + end + else + self.subscribe:push {self.TRIG, args} + end + end + + function c:init() + local multi, thread = require("multi"):init() + self.links = {} + self.proxy_conn = multi:newConnection() + local mt = getmetatable(self.proxy_conn) + setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + thread:newThread("STC_CONN_MAN"..name,function() + local item + local link_self_ref = multi:newSystemThreadedQueue() + self.subscribe:push{self.CONN, link_self_ref} + while true do + item = thread.hold(function() + return link_self_ref:peek() + end) + if item[1] == self.PING then + link_self_ref:push{self.PONG} + link_self_ref:pop() + elseif item[1] == self.CONN then + if item[2].Name ~= link_self_ref.Name then + table.insert(self.links, item[2]) + end + link_self_ref:pop() + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(unpack(item[2])) + link_self_ref:pop() + else + -- This shouldn't be the case + end + end + end) + return self + end + + GLOBAL[name] = c + return c end \ No newline at end of file diff --git a/test.lua b/test.lua index fc484f9..b77857a 100644 --- a/test.lua +++ b/test.lua @@ -1,20 +1,76 @@ -package.path = "./?/init.lua;?.lua;lua5.4/share/lua/?/init.lua;lua5.4/share/lua/?.lua;"..package.path -package.cpath = "lua5.4/lib/lua/?/core.dll;"..package.cpath +package.path = "./?/init.lua;?.lua;lua5.4/share/lua/5.4/?/init.lua;lua5.4/share/lua/5.4/?.lua;"--..package.path +package.cpath = "lua5.4/lib/lua/5.4/?/core.dll;"--..package.cpath multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() -test = THREAD:newFunction(function() - PNT() - return 1,2 -end,true) -multi:newThread(function() - while true do - print("...") - thread.sleep(1) - end +local conn = multi:newSystemThreadedConnection("conn"):init() + +multi:newSystemThread("Thread_Test_1",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local console = THREAD.getConsole() + conn(function() + console.print(THREAD:getName().." was triggered!") + end) + multi:mainloop() +end) + +multi:newSystemThread("Thread_Test_2",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local console = THREAD.getConsole() + conn(function(a,b,c) + console.print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(3):OnRing(function() + console.print("Fire 2!!!") + conn:Fire(4,5,6) + --THREAD.kill() + end) + + multi:mainloop() +end) + +conn(function(a,b,c) + print("Mainloop conn got triggered!",a,b,c) +end) + +alarm = multi:newAlarm(1) +alarm:OnRing(function() + print("Fire 1!!!") + conn:Fire(1,2,3) +end) + +alarm = multi:newAlarm(3):OnRing(function() + multi:newSystemThread("Thread_Test_3",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local console = THREAD.getConsole() + conn(function(a,b,c) + console.print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(4):OnRing(function() + console.print("Fire 3!!!") + conn:Fire(7,8,9) + end) + multi:mainloop() + end) +end) + +multi:newSystemThread("Thread_Test_4",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local conn2 = multi:newConnection() + local console = THREAD.getConsole() + multi:newAlarm(2):OnRing(function() + conn2:Fire() + end) + multi:newThread(function() + console.print("Conn Test!") + thread.hold(conn + conn2) + console.print("It held!") + end) + multi:mainloop() end) -multi:newAlarm(.1):OnRing(function() os.exit() end) -print(test()) -print("Hi!") multi:mainloop() \ No newline at end of file