--[[ MIT License Copyright (c) 2022 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sub-license, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] if not ISTHREAD then multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD else GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD end function multi:newSystemThreadedQueue(name) local name = name or multi.randomString(16) local c = {} c.Name = name c.Type = multi.SQUEUE local fRef = {"\2",nil} function c:init() local q = {} q.chan = love.thread.getChannel(self.Name) function q:push(dat) if type(dat) == "table" then self.chan:push{"DATA",THREAD.packTable(dat)} else self.chan:push(dat) end -- if type(dat) == "function" then -- fRef[2] = THREAD.dump(dat) -- self.chan:push(fRef) -- return -- else -- self.chan:push(dat) -- end end function q:pop() local dat = self.chan:pop() if type(dat)=="table" and dat[1]=="DATA" then return THREAD.unpackTable(dat[2])--THREAD.loadDump(dat[2]) else return dat end end function q:peek() local dat = self.chan:peek() if type(dat)=="table" and dat[1]=="DATA" then return THREAD.unpackTable(dat[2])--THREAD.loadDump(dat[2]) else return dat end end return q end THREAD.package(name,c) self:create(c) return c end function multi:newSystemThreadedTable(name) local name = name or multi.randomString(16) local c = {} c.Name = name c.Type = multi.STABLE function c:init() return THREAD.createTable(self.Name) end THREAD.package(name,c) self:create(c) return c end local jqc = 1 function multi:newSystemThreadedJobQueue(n) local c = {} c.cores = n or THREAD.getCores() c.registerQueue = {} c.Type = multi.SJOBQUEUE c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") c.id = 0 c.OnJobCompleted = multi:newConnection() local allfunc = 0 function c:doToAll(func) local f = THREAD.dump(func) for i = 1, self.cores do self.queueAll:push({allfunc,f}) end allfunc = allfunc + 1 end function c:registerFunction(name,func) if self.funcs[name] then error("A function by the name "..name.." has already been registered!") end self.funcs[name] = func end function c:pushJob(name,...) self.id = self.id + 1 self.queue:push{name,self.id,...} return self.id end function c:isEmpty() return queueJob:peek()==nil end local nFunc = 0 function c:newFunction(name,func,holup) -- This registers with the queue if type(name)=="function" then holup = func func = name name = "JQ_Function_"..nFunc end nFunc = nFunc + 1 c:registerFunction(name,func) return thread:newFunction(function(...) local id = c:pushJob(name,...) local link local rets link = c.OnJobCompleted(function(jid,...) if id==jid then rets = multi.pack(...) end end) return thread.hold(function() if rets then return multi.unpack(rets) or multi.NIL end end) end,holup),name end thread:newThread("jobManager",function() while true do thread.yield() local dat = c.queueReturn:pop() if dat then c.OnJobCompleted:Fire(multi.unpack(dat)) end end end) for i=1,c.cores do multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) local multi, thread = require("multi"):init() require("love.timer") local function atomic(channel) return channel:pop() end local clock = os.clock local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") local lastProc = clock() local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") local registry = {} _G["__QR"] = queueReturn setmetatable(_G,{__index = funcs}) thread:newThread("startUp",function() while true do thread.yield() local all = queueAll:peek() if all and not registry[all[1]] then lastProc = os.clock() THREAD.loadDump(queueAll:pop()[2])() end end end) thread:newThread("runner",function() thread.sleep(.1) while true do thread.yield() local all = queueAll:peek() if all and not registry[all[1]] then lastProc = os.clock() THREAD.loadDump(queueAll:pop()[2])() end local dat = queue:performAtomic(atomic) if dat then multi:newThread("Test",function() lastProc = os.clock() local name = table.remove(dat,1) local id = table.remove(dat,1) local tab = {funcs[name](multi.unpack(dat))} table.insert(tab,1,id) queueReturn:push(tab) end) end end end):OnError(function(...) error(...) end) thread:newThread("Idler",function() while true do thread.yield() if clock()-lastProc> 2 then THREAD.sleep(.05) else THREAD.sleep(.001) end end end) multi:mainloop() end,jqc) end jqc = jqc + 1 self:create(c) return c end function multi:newSystemThreadedConnection(name) local name = name or multi.randomString(16) local c = {} c.Type = multi.SCONNECTION c.CONN = 0x00 c.TRIG = 0x01 c.PING = 0x02 c.PONG = 0x03 local subscribe = love.thread.getChannel("SUB_STC_" .. name) function c:init() self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) function self:Fire(...) local args = multi.pack(...) if self.CID == THREAD_ID then -- Host Call for _, link in pairs(self.links) do love.thread.getChannel(link):push{self.TRIG, args} end self.proxy_conn:Fire(...) else self.subscribe:push{self.TRIG, args} end end local multi, thread = require("multi"):init() self.links = {} self.proxy_conn = multi:newConnection() local mt = getmetatable(self.proxy_conn) setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) if self.CID == THREAD_ID then return self end thread:newThread("STC_CONN_MAN" .. self.Name,function() local item local string_self_ref = "LSF_" .. multi.randomString(16) local link_self_ref = love.thread.getChannel(string_self_ref) self.subscribe:push{self.CONN, string_self_ref} while true do item = thread.hold(function() return link_self_ref:peek() end) if item[1] == self.PING then link_self_ref:push{self.PONG} link_self_ref:pop() elseif item[1] == self.CONN then if string_self_ref ~= item[2] then table.insert(self.links, love.thread.getChannel(item[2])) end link_self_ref:pop() elseif item[1] == self.TRIG then self.proxy_conn:Fire(multi.unpack(item[2])) link_self_ref:pop() else -- This shouldn't be the case end end end).OnError(multi.error) return self end local function remove(a, b) local ai = {} local r = {} for k,v in pairs(a) do ai[v]=true end for k,v in pairs(b) do if ai[v]==nil then table.insert(r,a[k]) end end return r end c.CID = THREAD_ID c.Name = name c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. -- Locals will only live in the thread that creates the original object local ping local pong = function(link, links) local res = thread.hold(function() return love.thread.getChannel(link):peek()[1] == c.PONG end,{sleep=3}) if not res then for i=1,#links do if links[i] == link then table.remove(links,i,link) break end end else love.thread.getChannel(link):pop() end end ping = thread:newFunction(function(self) ping:Pause() multi.ForEach(self.links, function(link) -- Sync new connections love.thread.getChannel(link):push{self.PING} multi:newThread("pong Thread", pong, link, self.links) end) thread.sleep(3) ping:Resume() end, false) local function fire(...) for _, link in pairs(c.links) do love.thread.getChannel(link):push {c.TRIG, multi.pack(...)} end end thread:newThread("STC_SUB_MAN"..name,function() local item while true do thread.yield() -- We need to check on broken connections ping(c) -- Should return instantlly and process this in another thread item = thread.hold(function() -- This will keep things held up until there is something to process return c.subscribe:pop() end) if item[1] == c.CONN then multi.ForEach(c.links, function(link) -- Sync new connections love.thread.getChannel(item[2]):push{c.CONN, link} end) c.links[#c.links+1] = item[2] elseif item[1] == c.TRIG then fire(multi.unpack(item[2])) c.proxy_conn:Fire(multi.unpack(item[2])) end end end).OnError(multi.error) --- ^^^ This will only exist in the init thread THREAD.package(name,c) self:create(c) return c end require("multi.integration.sharedExtensions")