From e43fe34a7cb4a60651dfc31c5881da0b6b24202d Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Mon, 10 Jul 2023 15:54:36 -0400 Subject: [PATCH] Still implementing new love2d threading code --- init.lua | 7 +- integration/lanesManager/init.lua | 2 +- integration/lanesManager/threads.lua | 6 +- integration/loveManager/extensions.lua | 384 --------------------- integration/loveManager/init.lua | 137 +------- integration/loveManager/serpent.lua | 176 ++++++++++ integration/loveManager/threads.lua | 344 ++++++++----------- integration/loveManagerold/extensions.lua | 389 ++++++++++++++++++++++ integration/loveManagerold/init.lua | 136 ++++++++ integration/loveManagerold/threads.lua | 258 ++++++++++++++ integration/luvitManager.lua | 2 +- tests/threadtests.lua | 26 +- 12 files changed, 1126 insertions(+), 741 deletions(-) create mode 100644 integration/loveManager/serpent.lua create mode 100644 integration/loveManagerold/extensions.lua create mode 100644 integration/loveManagerold/init.lua create mode 100644 integration/loveManagerold/threads.lua diff --git a/init.lua b/init.lua index cb24772..7c79a4a 100644 --- a/init.lua +++ b/init.lua @@ -2405,7 +2405,12 @@ end function multi.error(self, err) if type(err) == "bool" then crash = err end if type(self) == "string" then err = self end - io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. debug.getinfo(2).name .."\n") + local name = debug.getinfo(2).name + if name then + io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. name .."\n") + else + io.write("\x1b[91mERROR:\x1b[0m " .. err .. " ?\n") + end error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n") if multi.defaultSettings.error then os.exit(1) diff --git a/integration/lanesManager/init.lua b/integration/lanesManager/init.lua index b33b7ba..014294e 100644 --- a/integration/lanesManager/init.lua +++ b/integration/lanesManager/init.lua @@ -76,7 +76,7 @@ function multi:newSystemThread(name, func, ...) c.loadString = {"base","package","os","io","math","table","string","coroutine"} livingThreads[count] = {true, name} c.returns = return_linda - c.Type = "sthread" + c.Type = multi.STHREAD c.creationTime = os.clock() c.alive = true c.priority = THREAD.Priority_Normal diff --git a/integration/lanesManager/threads.lua b/integration/lanesManager/threads.lua index 3d05ebf..6db9b79 100644 --- a/integration/lanesManager/threads.lua +++ b/integration/lanesManager/threads.lua @@ -66,11 +66,11 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console) local c = {} c.queue = __Console function c.print(...) - c.queue:send("Q", multi.pack(...)) + c.queue:push("Q", table.concat(multi.pack(...), "\t")) end function c.error(err) - c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}) - error(err) + c.queue:push("Q", "Error in <"..THREAD_NAME..":" .. THREAD_ID .. ">: ".. err) + multi.error(err) end return c end diff --git a/integration/loveManager/extensions.lua b/integration/loveManager/extensions.lua index 32a1215..e69de29 100644 --- a/integration/loveManager/extensions.lua +++ b/integration/loveManager/extensions.lua @@ -1,384 +0,0 @@ ---[[ -MIT License - -Copyright (c) 2022 Ryan Ward - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sub-license, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -]] - -if not ISTHREAD then - multi, thread = require("multi").init() - GLOBAL = multi.integration.GLOBAL - THREAD = multi.integration.THREAD -else - GLOBAL = multi.integration.GLOBAL - THREAD = multi.integration.THREAD -end - -function multi:newSystemThreadedQueue(name) - local name = name or multi.randomString(16) - local c = {} - c.Name = name - c.Type = multi.SQUEUE - local fRef = {"func",nil} - function c:init() - local q = {} - q.chan = love.thread.getChannel(self.Name) - function q:push(dat) - if type(dat) == "function" then - fRef[2] = THREAD.dump(dat) - self.chan:push(fRef) - return - else - self.chan:push(dat) - end - end - function q:pop() - local dat = self.chan:pop() - if type(dat)=="table" and dat[1]=="func" then - return THREAD.loadDump(dat[2]) - else - return dat - end - end - function q:peek() - local dat = self.chan:peek() - if type(dat)=="table" and dat[1]=="func" then - return THREAD.loadDump(dat[2]) - else - return dat - end - end - return q - end - - THREAD.package(name,c) - - self:create(c) - - return c -end - -function multi:newSystemThreadedTable(name) - local name = name or multi.randomString(16) - - local c = {} - - c.Name = name - c.Type = multi.STABLE - - function c:init() - return THREAD.createTable(self.Name) - end - - THREAD.package(name,c) - - self:create(c) - - return c -end - -local jqc = 1 -function multi:newSystemThreadedJobQueue(n) - local c = {} - - c.cores = n or THREAD.getCores() - c.registerQueue = {} - c.Type = multi.SJOBQUEUE - c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") - c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") - c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") - c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") - c.id = 0 - c.OnJobCompleted = multi:newConnection() - - local allfunc = 0 - - function c:doToAll(func) - local f = THREAD.dump(func) - for i = 1, self.cores do - self.queueAll:push({allfunc,f}) - end - allfunc = allfunc + 1 - end - function c:registerFunction(name,func) - if self.funcs[name] then - error("A function by the name "..name.." has already been registered!") - end - self.funcs[name] = func - end - function c:pushJob(name,...) - self.id = self.id + 1 - self.queue:push{name,self.id,...} - return self.id - end - function c:isEmpty() - return queueJob:peek()==nil - end - local nFunc = 0 - function c:newFunction(name,func,holup) -- This registers with the queue - if type(name)=="function" then - holup = func - func = name - name = "JQ_Function_"..nFunc - end - nFunc = nFunc + 1 - c:registerFunction(name,func) - return thread:newFunction(function(...) - local id = c:pushJob(name,...) - local link - local rets - link = c.OnJobCompleted(function(jid,...) - if id==jid then - rets = multi.pack(...) - end - end) - return thread.hold(function() - if rets then - return multi.unpack(rets) or multi.NIL - end - end) - end,holup),name - end - thread:newThread("jobManager",function() - while true do - thread.yield() - local dat = c.queueReturn:pop() - if dat then - c.OnJobCompleted:Fire(multi.unpack(dat)) - end - end - end) - for i=1,c.cores do - multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) - local multi, thread = require("multi"):init() - require("love.timer") - local function atomic(channel) - return channel:pop() - end - local clock = os.clock - local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") - local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") - local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") - local lastProc = clock() - local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") - local registry = {} - _G["__QR"] = queueReturn - setmetatable(_G,{__index = funcs}) - thread:newThread("startUp",function() - while true do - thread.yield() - local all = queueAll:peek() - if all and not registry[all[1]] then - lastProc = os.clock() - THREAD.loadDump(queueAll:pop()[2])() - end - end - end) - thread:newThread("runner",function() - thread.sleep(.1) - while true do - thread.yield() - local all = queueAll:peek() - if all and not registry[all[1]] then - lastProc = os.clock() - THREAD.loadDump(queueAll:pop()[2])() - end - local dat = queue:performAtomic(atomic) - if dat then - multi:newThread("Test",function() - lastProc = os.clock() - local name = table.remove(dat,1) - local id = table.remove(dat,1) - local tab = {funcs[name](multi.unpack(dat))} - table.insert(tab,1,id) - queueReturn:push(tab) - end) - end - end - end):OnError(function(...) - error(...) - end) - thread:newThread("Idler",function() - while true do - thread.yield() - if clock()-lastProc> 2 then - THREAD.sleep(.05) - else - THREAD.sleep(.001) - end - end - end) - multi:mainloop() - end,jqc) - end - - jqc = jqc + 1 - - self:create(c) - - return c -end - -function multi:newSystemThreadedConnection(name) - local name = name or multi.randomString(16) - - local c = {} - - c.Type = multi.SCONNECTION - c.CONN = 0x00 - c.TRIG = 0x01 - c.PING = 0x02 - c.PONG = 0x03 - - local subscribe = love.thread.getChannel("SUB_STC_" .. name) - - function c:init() - - self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) - - function self:Fire(...) - local args = multi.pack(...) - if self.CID == THREAD_ID then -- Host Call - for _, link in pairs(self.links) do - love.thread.getChannel(link):push{self.TRIG, args} - end - self.proxy_conn:Fire(...) - else - self.subscribe:push{self.TRIG, args} - end - end - - local multi, thread = require("multi"):init() - self.links = {} - self.proxy_conn = multi:newConnection() - local mt = getmetatable(self.proxy_conn) - setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) - if self.CID == THREAD_ID then return self end - thread:newThread("STC_CONN_MAN" .. self.Name,function() - local item - local string_self_ref = "LSF_" .. multi.randomString(16) - local link_self_ref = love.thread.getChannel(string_self_ref) - self.subscribe:push{self.CONN, string_self_ref} - while true do - item = thread.hold(function() - return link_self_ref:peek() - end) - if item[1] == self.PING then - link_self_ref:push{self.PONG} - link_self_ref:pop() - elseif item[1] == self.CONN then - if string_self_ref ~= item[2] then - table.insert(self.links, love.thread.getChannel(item[2])) - end - link_self_ref:pop() - elseif item[1] == self.TRIG then - self.proxy_conn:Fire(multi.unpack(item[2])) - link_self_ref:pop() - else - -- This shouldn't be the case - end - end - end).OnError(multi.error) - return self - end - - local function remove(a, b) - local ai = {} - local r = {} - for k,v in pairs(a) do ai[v]=true end - for k,v in pairs(b) do - if ai[v]==nil then table.insert(r,a[k]) end - end - return r - end - - c.CID = THREAD_ID - c.Name = name - c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. - - -- Locals will only live in the thread that creates the original object - local ping - local pong = function(link, links) - local res = thread.hold(function() - return love.thread.getChannel(link):peek()[1] == c.PONG - end,{sleep=3}) - - if not res then - for i=1,#links do - if links[i] == link then - table.remove(links,i,link) - break - end - end - else - love.thread.getChannel(link):pop() - end - end - - ping = thread:newFunction(function(self) - ping:Pause() - - multi.ForEach(self.links, function(link) -- Sync new connections - love.thread.getChannel(link):push{self.PING} - multi:newThread("pong Thread", pong, link, self.links) - end) - - thread.sleep(3) - - ping:Resume() - end, false) - - local function fire(...) - for _, link in pairs(c.links) do - love.thread.getChannel(link):push {c.TRIG, multi.pack(...)} - end - end - - thread:newThread("STC_SUB_MAN"..name,function() - local item - while true do - thread.yield() - -- We need to check on broken connections - ping(c) -- Should return instantlly and process this in another thread - item = thread.hold(function() -- This will keep things held up until there is something to process - return c.subscribe:pop() - end) - if item[1] == c.CONN then - - multi.ForEach(c.links, function(link) -- Sync new connections - love.thread.getChannel(item[2]):push{c.CONN, link} - end) - c.links[#c.links+1] = item[2] - - elseif item[1] == c.TRIG then - fire(multi.unpack(item[2])) - c.proxy_conn:Fire(multi.unpack(item[2])) - end - end - end).OnError(multi.error) - --- ^^^ This will only exist in the init thread - - THREAD.package(name,c) - - self:create(c) - - return c -end -require("multi.integration.sharedExtensions") \ No newline at end of file diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index a2a5d5f..3177adc 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -1,135 +1,8 @@ ---[[ -MIT License +local GLOBAL, THREAD = require("multi.integration.loveManager.threads"):init() -Copyright (c) 2022 Ryan Ward -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sub-license, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -]] - -if ISTHREAD then - error("You cannot require the loveManager from within a thread!") -end - -local ThreadFileData = [[ -ISTHREAD = true -THREAD = require("multi.integration.loveManager.threads") -sThread = THREAD -__IMPORTS = {...} -__FUNC__=table.remove(__IMPORTS,1) -THREAD_ID=table.remove(__IMPORTS,1) -THREAD_NAME=table.remove(__IMPORTS,1) -math.randomseed(THREAD_ID) -math.random() -math.random() -math.random() -stab = THREAD.createStaticTable(THREAD_NAME .. THREAD_ID) -GLOBAL = THREAD.getGlobal() -if GLOBAL["__env"] then - local env = THREAD.unpackENV(GLOBAL["__env"]) - for i,v in pairs(env) do - _G[i] = v +return { + init = function(global_channel, console_channel, status_channel) + return GLOBAL, THREAD end -end -multi, thread = require("multi").init() -multi.integration={} -multi.integration.GLOBAL = GLOBAL -multi.integration.THREAD = THREAD -pcall(require,"multi.integration.loveManager.extensions") -pcall(require,"multi.integration.sharedExtensions") -stab["returns"] = {THREAD.loadDump(__FUNC__)(multi.unpack(__IMPORTS))} -]] - -local multi, thread = require("multi"):init() - -local THREAD = {} -_G.THREAD_NAME = "MAIN_THREAD" -_G.THREAD_ID = 0 -multi.integration = {} -local THREAD = require("multi.integration.loveManager.threads") -local GLOBAL = THREAD.getGlobal() -multi.isMainThread = true - -function multi:newSystemThread(name, func, ...) - THREAD_ID = THREAD_ID + 1 - local c = {} - c.name = name - c.ID = THREAD_ID - c.thread = love.thread.newThread(ThreadFileData) - c.thread:start(THREAD.dump(func), c.ID, c.name, ...) - c.stab = THREAD.createStaticTable(name .. c.ID) - c.OnDeath = multi:newConnection() - c.OnError = multi:newConnection() - GLOBAL["__THREAD_" .. c.ID] = {ID = c.ID, Name = c.name, Thread = c.thread} - GLOBAL["__THREAD_COUNT"] = THREAD_ID - - function c:getName() return c.name end - thread:newThread(name .. "_System_Thread_Handler",function() - if name == "SystemThreaded Function Handler" then - local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID) - thread.hold(function() - -- While the thread is running we might as well do something in the loop - if status_channel:peek() ~= nil then - c.statusconnector:Fire(multi.unpack(status_channel:pop())) - end - return not c.thread:isRunning() - end) - else - thread.hold(function() return not c.thread:isRunning() end) - end - -- If the thread is not running let's handle that. - local thread_err = c.thread:getError() - if thread_err == "Thread Killed!\1" then - c.OnDeath:Fire("Thread Killed!") - elseif thread_err then - c.OnError:Fire(c, thread_err) - elseif c.stab.returns then - c.OnDeath:Fire(multi.unpack(c.stab.returns)) - c.stab.returns = nil - end - end) - - if self.isActor then - self:create(c) - else - multi.create(multi, c) - end - - return c -end - -function THREAD:newFunction(func, holdme) - return thread:newFunctionBase(function(...) - return multi:newSystemThread("SystemThreaded Function Handler", func, ...) - end, holdme, multi.SFUNCTION)() -end - -THREAD.newSystemThread = function(...) - multi:newSystemThread(...) -end - -function love.threaderror(thread, errorstr) - multi.print("Thread error!\n" .. errorstr) -end - -multi.integration.GLOBAL = GLOBAL -multi.integration.THREAD = THREAD -require("multi.integration.loveManager.extensions") -require("multi.integration.sharedExtensions") -multi.print("Integrated Love Threading!") -return {init = function() return GLOBAL, THREAD end} +} diff --git a/integration/loveManager/serpent.lua b/integration/loveManager/serpent.lua new file mode 100644 index 0000000..5063f3a --- /dev/null +++ b/integration/loveManager/serpent.lua @@ -0,0 +1,176 @@ +--[[ +Serpent source is released under the MIT License + +Copyright (c) 2012-2018 Paul Kulchenko (paul@kulchenko.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +]] + +local n, v = "serpent", "0.303" -- (C) 2012-18 Paul Kulchenko; MIT License +local c, d = "Paul Kulchenko", "Lua serializer and pretty printer" +local snum = {[tostring(1/0)]='1/0 --[[math.huge]]',[tostring(-1/0)]='-1/0 --[[-math.huge]]',[tostring(0/0)]='0/0'} +local badtype = {thread = true, userdata = true, cdata = true} +local getmetatable = debug and debug.getmetatable or getmetatable +local pairs = function(t) return next, t end -- avoid using __pairs in Lua 5.2+ +local keyword, globals, G = {}, {}, (_G or _ENV) +for _,k in ipairs({'and', 'break', 'do', 'else', 'elseif', 'end', 'false', + 'for', 'function', 'goto', 'if', 'in', 'local', 'nil', 'not', 'or', 'repeat', + 'return', 'then', 'true', 'until', 'while'}) do keyword[k] = true end +for k,v in pairs(G) do globals[v] = k end -- build func to name mapping +for _,g in ipairs({'coroutine', 'debug', 'io', 'math', 'string', 'table', 'os'}) do + for k,v in pairs(type(G[g]) == 'table' and G[g] or {}) do globals[v] = g..'.'..k end end + +local function s(t, opts) + local name, indent, fatal, maxnum = opts.name, opts.indent, opts.fatal, opts.maxnum + local sparse, custom, huge = opts.sparse, opts.custom, not opts.nohuge + local space, maxl = (opts.compact and '' or ' '), (opts.maxlevel or math.huge) + local maxlen, metatostring = tonumber(opts.maxlength), opts.metatostring + local iname, comm = '_'..(name or ''), opts.comment and (tonumber(opts.comment) or math.huge) + local numformat = opts.numformat or "%.17g" + local seen, sref, syms, symn = {}, {'local '..iname..'={}'}, {}, 0 + local function gensym(val) return '_'..(tostring(tostring(val)):gsub("[^%w]",""):gsub("(%d%w+)", + -- tostring(val) is needed because __tostring may return a non-string value + function(s) if not syms[s] then symn = symn+1; syms[s] = symn end return tostring(syms[s]) end)) end + local function safestr(s) return type(s) == "number" and (huge and snum[tostring(s)] or numformat:format(s)) + or type(s) ~= "string" and tostring(s) -- escape NEWLINE/010 and EOF/026 + or ("%q"):format(s):gsub("\010","n"):gsub("\026","\\026") end + -- handle radix changes in some locales + if opts.fixradix and (".1f"):format(1.2) ~= "1.2" then + local origsafestr = safestr + safestr = function(s) return type(s) == "number" + and (nohuge and snum[tostring(s)] or numformat:format(s):gsub(",",".")) or origsafestr(s) + end + end + local function comment(s,l) return comm and (l or 0) < comm and ' --[['..select(2, pcall(tostring, s))..']]' or '' end + local function globerr(s,l) return globals[s] and globals[s]..comment(s,l) or not fatal + and safestr(select(2, pcall(tostring, s))) or error("Can't serialize "..tostring(s)) end + local function safename(path, name) -- generates foo.bar, foo[3], or foo['b a r'] + local n = name == nil and '' or name + local plain = type(n) == "string" and n:match("^[%l%u_][%w_]*$") and not keyword[n] + local safe = plain and n or '['..safestr(n)..']' + return (path or '')..(plain and path and '.' or '')..safe, safe end + local alphanumsort = type(opts.sortkeys) == 'function' and opts.sortkeys or function(k, o, n) -- k=keys, o=originaltable, n=padding + local maxn, to = tonumber(n) or 12, {number = 'a', string = 'b'} + local function padnum(d) return ("%0"..tostring(maxn).."d"):format(tonumber(d)) end + table.sort(k, function(a,b) + -- sort numeric keys first: k[key] is not nil for numerical keys + return (k[a] ~= nil and 0 or to[type(a)] or 'z')..(tostring(a):gsub("%d+",padnum)) + < (k[b] ~= nil and 0 or to[type(b)] or 'z')..(tostring(b):gsub("%d+",padnum)) end) end + local function val2str(t, name, indent, insref, path, plainindex, level) + local ttype, level, mt = type(t), (level or 0), getmetatable(t) + local spath, sname = safename(path, name) + local tag = plainindex and + ((type(name) == "number") and '' or name..space..'='..space) or + (name ~= nil and sname..space..'='..space or '') + if seen[t] then -- already seen this element + sref[#sref+1] = spath..space..'='..space..seen[t] + return tag..'nil'..comment('ref', level) + end + -- protect from those cases where __tostring may fail + if type(mt) == 'table' and metatostring ~= false then + local to, tr = pcall(function() return mt.__tostring(t) end) + local so, sr = pcall(function() return mt.__serialize(t) end) + if (to or so) then -- knows how to serialize itself + seen[t] = insref or spath + t = so and sr or tr + ttype = type(t) + end -- new value falls through to be serialized + end + if ttype == "table" then + if level >= maxl then return tag..'{}'..comment('maxlvl', level) end + seen[t] = insref or spath + if next(t) == nil then return tag..'{}'..comment(t, level) end -- table empty + if maxlen and maxlen < 0 then return tag..'{}'..comment('maxlen', level) end + local maxn, o, out = math.min(#t, maxnum or #t), {}, {} + for key = 1, maxn do o[key] = key end + if not maxnum or #o < maxnum then + local n = #o -- n = n + 1; o[n] is much faster than o[#o+1] on large tables + for key in pairs(t) do + if o[key] ~= key then n = n + 1; o[n] = key end + end + end + if maxnum and #o > maxnum then o[maxnum+1] = nil end + if opts.sortkeys and #o > maxn then alphanumsort(o, t, opts.sortkeys) end + local sparse = sparse and #o > maxn -- disable sparsness if only numeric keys (shorter output) + for n, key in ipairs(o) do + local value, ktype, plainindex = t[key], type(key), n <= maxn and not sparse + if opts.valignore and opts.valignore[value] -- skip ignored values; do nothing + or opts.keyallow and not opts.keyallow[key] + or opts.keyignore and opts.keyignore[key] + or opts.valtypeignore and opts.valtypeignore[type(value)] -- skipping ignored value types + or sparse and value == nil then -- skipping nils; do nothing + elseif ktype == 'table' or ktype == 'function' or badtype[ktype] then + if not seen[key] and not globals[key] then + sref[#sref+1] = 'placeholder' + local sname = safename(iname, gensym(key)) -- iname is table for local variables + sref[#sref] = val2str(key,sname,indent,sname,iname,true) + end + sref[#sref+1] = 'placeholder' + local path = seen[t]..'['..tostring(seen[key] or globals[key] or gensym(key))..']' + sref[#sref] = path..space..'='..space..tostring(seen[value] or val2str(value,nil,indent,path)) + else + out[#out+1] = val2str(value,key,indent,nil,seen[t],plainindex,level+1) + if maxlen then + maxlen = maxlen - #out[#out] + if maxlen < 0 then break end + end + end + end + local prefix = string.rep(indent or '', level) + local head = indent and '{\n'..prefix..indent or '{' + local body = table.concat(out, ','..(indent and '\n'..prefix..indent or space)) + local tail = indent and "\n"..prefix..'}' or '}' + return (custom and custom(tag,head,body,tail,level) or tag..head..body..tail)..comment(t, level) + elseif badtype[ttype] then + seen[t] = insref or spath + return tag..globerr(t, level) + elseif ttype == 'function' then + seen[t] = insref or spath + if opts.nocode then return tag.."function() --[[..skipped..]] end"..comment(t, level) end + local ok, res = pcall(string.dump, t) + local func = ok and "((loadstring or load)("..safestr(res)..",'@serialized'))"..comment(t, level) + return tag..(func or globerr(t, level)) + else return tag..safestr(t) end -- handle all other types + end + local sepr = indent and "\n" or ";"..space + local body = val2str(t, name, indent) -- this call also populates sref + local tail = #sref>1 and table.concat(sref, sepr)..sepr or '' + local warn = opts.comment and #sref>1 and space.."--[[incomplete output with shared/self-references skipped]]" or '' + return not name and body..warn or "do local "..body..sepr..tail.."return "..name..sepr.."end" +end + +local function deserialize(data, opts) + local env = (opts and opts.safe == false) and G + or setmetatable({}, { + __index = function(t,k) return t end, + __call = function(t,...) error("cannot call functions") end + }) + local f, res = (loadstring or load)('return '..data, nil, nil, env) + if not f then f, res = (loadstring or load)(data, nil, nil, env) end + if not f then return f, res end + if setfenv then setfenv(f, env) end + return pcall(f) +end + +local function merge(a, b) if b then for k,v in pairs(b) do a[k] = v end end; return a; end +return { _NAME = n, _COPYRIGHT = c, _DESCRIPTION = d, _VERSION = v, serialize = s, + load = deserialize, + dump = function(a, opts) return s(a, merge({name = '_', compact = true, sparse = true}, opts)) end, + line = function(a, opts) return s(a, merge({sortkeys = true, comment = true}, opts)) end, + block = function(a, opts) return s(a, merge({indent = ' ', sortkeys = true, comment = true}, opts)) end } \ No newline at end of file diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index 777f811..eedcc02 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -25,228 +25,69 @@ require("love.timer") require("love.system") require("love.data") require("love.thread") +local serpent = require("multi.integration.loveManager.serpent") local multi, thread = require("multi"):init() -local threads = {} -function threads.loadDump(d) - return loadstring(d:getString()) +local function ltype(data) + local tp = type(data) + if tp == "userdata" then + return data:type() + end + return tp end -function threads.dump(func) - return love.data.newByteData(string.dump(func)) -end +local NIL = love.data.newByteData("\3") -local fRef = {"func",nil} -local function manage(channel, value) - channel:clear() - if type(value) == "function" then - fRef[2] = THREAD.dump(value) - channel:push(fRef) - return +-- If a non table/function is supplied we just return it +local function packValue(t) + local tp = type(t) + if tp == "table" then + return love.data.newByteData("\1"..serpent.dump(t,{safe = true})) + elseif tp == "function" then + return love.data.newByteData("\2"..serpent.dump({t,true},{safe = true})) else - channel:push(value) + return t end end -local GNAME = "__GLOBAL_" -local proxy = {} -function threads.set(name,val) - if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end - proxy[name]:performAtomic(manage, val) -end - -function threads.get(name) - if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end - local dat = proxy[name]:peek() - if type(dat)=="table" and dat[1]=="func" then - return THREAD.loadDump(dat[2]) +-- If a non table/function is supplied we just return it +local function unpackValue(d) + if ltype(d) == "ByteData" then + local data = d:getString() + if data:sub(1, 1) == "\1" then + local status, data = serpent.load(data:sub(2,-1),{safe = false}) + if not status then + multi.error(data) + end + return data + elseif data:sub(1, 1) =="\2" then + local status, data = serpent.load(data:sub(2,-1),{safe = false}) + if not status then + multi.error(data) + end + return serpent.load(data:sub(2,-1))[1] + else + return d + end else - return dat + return d end end -function threads.waitFor(name) - if thread.isThread() then - return thread.hold(function() - return threads.get(name) - end) +local function createTable(n) + if not n then + n = "STAB"..multi.randomString(8) end - while threads.get(name)==nil do - love.timer.sleep(.001) - end - local dat = threads.get(name) - if type(dat) == "table" and dat.init then - dat.init = threads.loadDump(dat.init) - end - return dat -end - -function threads.package(name,val) - local init = val.init - val.init=threads.dump(val.init) - GLOBAL[name]=val - val.init=init -end - -function threads.getCores() - return love.system.getProcessorCount() -end - -function threads.kill() - error("Thread Killed!\1") -end - -function threads.pushStatus(...) - local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__) - local args = multi.pack(...) - status_channel:push(args) -end - -function threads.getThreads() - local t = {} - for i=1,GLOBAL["__THREAD_COUNT"] do - t[#t+1]=GLOBAL["__THREAD_"..i] - end - return t -end - -function threads.getThread(n) - return GLOBAL["__THREAD_"..n] -end - -function threads.sleep(n) - love.timer.sleep(n) -end - -function threads.getGlobal() - return setmetatable({}, - { - __index = function(t, k) - return THREAD.get(k) - end, - __newindex = function(t, k, v) - THREAD.set(k,v) - end - } - ) -end - -function threads.packENV(env) - local e = {} - for i,v in pairs(env) do - if type(v) == "function" then - e["$f"..i] = string.dump(v) - elseif type(v) == "table" then - e["$t"..i] = threads.packENV(v) - else - e[i] = v - end - end - return e -end - -function threads.unpackENV(env) - local e = {} - for i,v in pairs(env) do - if type(i) == "string" and i:sub(1,2) == "$f" then - e[i:sub(3,-1)] = loadstring(v) - elseif type(i) == "string" and i:sub(1,2) == "$t" then - e[i:sub(3,-1)] = threads.unpackENV(v) - else - e[i] = v - end - end - return e -end - - -function threads.setENV(env, name) - name = name or "__env" - (threads.getGlobal())[name] = threads.packENV(env) -end - -function threads.getENV(name) - name = name or "__env" - return threads.unpackENV((threads.getGlobal())[name]) -end - -function threads.exposeENV(name) - name = name or "__env" - local env = threads.getENV(name) - for i,v in pairs(env) do - _G[i] = v - end -end - -function threads.createTable(n) - local _proxy = {} - local function set(name,val) - if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end - _proxy[name]:performAtomic(manage, val) - end - local function get(name) - if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end - local dat = _proxy[name]:peek() - if type(dat)=="table" and dat[1]=="func" then - return THREAD.loadDump(dat[2]) - else - return dat - end - end - return setmetatable({}, - { - __index = function(t, k) - return get(k) - end, - __newindex = function(t, k, v) - set(k,v) - end - } - ) -end - -function threads.getConsole() - local c = {} - c.queue = love.thread.getChannel("__CONSOLE__") - function c.print(...) - c.queue:push(multi.pack(...)) - end - function c.error(err) - c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} - error(err) - end - return c -end - -if not ISTHREAD then - local queue = love.thread.getChannel("__CONSOLE__") - multi:newLoop(function(loop) - dat = queue:pop() - if dat then - print(multi.unpack(dat)) - end - end) -end - -function threads.createStaticTable(n) local __proxy = {} - local function set(name,val) - if __proxy[name] then return end - local chan = love.thread.getChannel(n..name) - if chan:getCount()>0 then return end - chan:performAtomic(manage, val) - __proxy[name] = val + local function set(name, val) + local chan = love.thread.getChannel(n .. name) + if chan:getCount() == 1 then chan:pop() end + __proxy[name] = true + chan:push(packValue(val)) end local function get(name) - if __proxy[name] then return __proxy[name] end - local dat = love.thread.getChannel(n..name):peek() - if type(dat)=="table" and dat[1]=="func" then - __proxy[name] = THREAD.loadDump(dat[2]) - return __proxy[name] - else - __proxy[name] = dat - return __proxy[name] - end + local dat = love.thread.getChannel(n .. name):peek() + return unpackValue(dat) end return setmetatable({}, { @@ -260,11 +101,94 @@ function threads.createStaticTable(n) ) end -function threads.hold(n) - local dat - while not(dat) do - dat = n() +function INIT(global_channel, console_channel, status_channel) + local GLOBAL, THREAD = createTable("GLOBAL"), {} + + -- Non portable methods, shouldn't be used unless you know what you are doing + THREAD.packValue = packValue + THREAD.unpackValue = unpackValue + THREAD.createTable = createTable + + function THREAD.set(name, val) + GLOBAL[name] = val end + + function THREAD.get(name, val) + return GLOBAL[name] + end + + function THREAD.waitFor(name) + local function wait() + math.randomseed(os.time()) + love.timer.sleep(.001) + end + repeat + wait() + until GLOBAL[name] + return GLOBAL[name] + end + + function THREAD.getCores() + return love.system.getProcessorCount() + end + + function THREAD.getConsole() + local c = {} + c.queue = console_channel + function c.print(...) + c.queue:push(table.concat(multi.pack(...), "\t")) + end + function c.error(err) + c.queue:push("Error in <"..THREAD_NAME..":" .. THREAD_ID .. ">: ".. err) + multi.error(err) + end + return c + end + + function THREAD.getThreads() + -- + end + + function THREAD.kill() -- trigger the lane destruction + error("Thread was killed!\1") + end + + function THREAD.pushStatus(...) + status_channel:push({THREAD_ID, multi.pack(...)}) + end + + _G.THREAD_ID = 0 + + function THREAD.sleep(n) + love.timer.sleep(n) + end + + function THREAD.hold(n) + -- + end + + function THREAD.setENV(env, name) + GLOBAL[name or "__env"] = env + end + + function THREAD.getENV(name) + return GLOBAL[name or "__env"] + end + + function THREAD.exposeENV(name) + name = name or "__env" + local env = THREAD.getENV(name) + for i,v in pairs(env) do + _G[i] = v + end + end + + return GLOBAL, THREAD end -return threads \ No newline at end of file +return { + -- These are the acutal channels + init = function(global_channel, console_channel, status_channel) + return INIT(global_channel, console_channel, status_channel) + end +} \ No newline at end of file diff --git a/integration/loveManagerold/extensions.lua b/integration/loveManagerold/extensions.lua new file mode 100644 index 0000000..36931b3 --- /dev/null +++ b/integration/loveManagerold/extensions.lua @@ -0,0 +1,389 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] + +if not ISTHREAD then + multi, thread = require("multi").init() + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD +else + GLOBAL = multi.integration.GLOBAL + THREAD = multi.integration.THREAD +end + +function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) + local c = {} + c.Name = name + c.Type = multi.SQUEUE + local fRef = {"\2",nil} + function c:init() + local q = {} + q.chan = love.thread.getChannel(self.Name) + function q:push(dat) + if type(dat) == "table" then + self.chan:push{"DATA",THREAD.packTable(dat)} + else + self.chan:push(dat) + end + -- if type(dat) == "function" then + -- fRef[2] = THREAD.dump(dat) + -- self.chan:push(fRef) + -- return + -- else + -- self.chan:push(dat) + -- end + end + function q:pop() + local dat = self.chan:pop() + if type(dat)=="table" and dat[1]=="DATA" then + return THREAD.unpackTable(dat[2])--THREAD.loadDump(dat[2]) + else + return dat + end + end + function q:peek() + local dat = self.chan:peek() + if type(dat)=="table" and dat[1]=="DATA" then + return THREAD.unpackTable(dat[2])--THREAD.loadDump(dat[2]) + else + return dat + end + end + return q + end + + THREAD.package(name,c) + + self:create(c) + + return c +end + +function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) + + local c = {} + + c.Name = name + c.Type = multi.STABLE + + function c:init() + return THREAD.createTable(self.Name) + end + + THREAD.package(name,c) + + self:create(c) + + return c +end + +local jqc = 1 +function multi:newSystemThreadedJobQueue(n) + local c = {} + + c.cores = n or THREAD.getCores() + c.registerQueue = {} + c.Type = multi.SJOBQUEUE + c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") + c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") + c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") + c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") + c.id = 0 + c.OnJobCompleted = multi:newConnection() + + local allfunc = 0 + + function c:doToAll(func) + local f = THREAD.dump(func) + for i = 1, self.cores do + self.queueAll:push({allfunc,f}) + end + allfunc = allfunc + 1 + end + function c:registerFunction(name,func) + if self.funcs[name] then + error("A function by the name "..name.." has already been registered!") + end + self.funcs[name] = func + end + function c:pushJob(name,...) + self.id = self.id + 1 + self.queue:push{name,self.id,...} + return self.id + end + function c:isEmpty() + return queueJob:peek()==nil + end + local nFunc = 0 + function c:newFunction(name,func,holup) -- This registers with the queue + if type(name)=="function" then + holup = func + func = name + name = "JQ_Function_"..nFunc + end + nFunc = nFunc + 1 + c:registerFunction(name,func) + return thread:newFunction(function(...) + local id = c:pushJob(name,...) + local link + local rets + link = c.OnJobCompleted(function(jid,...) + if id==jid then + rets = multi.pack(...) + end + end) + return thread.hold(function() + if rets then + return multi.unpack(rets) or multi.NIL + end + end) + end,holup),name + end + thread:newThread("jobManager",function() + while true do + thread.yield() + local dat = c.queueReturn:pop() + if dat then + c.OnJobCompleted:Fire(multi.unpack(dat)) + end + end + end) + for i=1,c.cores do + multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) + local multi, thread = require("multi"):init() + require("love.timer") + local function atomic(channel) + return channel:pop() + end + local clock = os.clock + local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") + local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue") + local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") + local lastProc = clock() + local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll") + local registry = {} + _G["__QR"] = queueReturn + setmetatable(_G,{__index = funcs}) + thread:newThread("startUp",function() + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + THREAD.loadDump(queueAll:pop()[2])() + end + end + end) + thread:newThread("runner",function() + thread.sleep(.1) + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + THREAD.loadDump(queueAll:pop()[2])() + end + local dat = queue:performAtomic(atomic) + if dat then + multi:newThread("Test",function() + lastProc = os.clock() + local name = table.remove(dat,1) + local id = table.remove(dat,1) + local tab = {funcs[name](multi.unpack(dat))} + table.insert(tab,1,id) + queueReturn:push(tab) + end) + end + end + end):OnError(function(...) + error(...) + end) + thread:newThread("Idler",function() + while true do + thread.yield() + if clock()-lastProc> 2 then + THREAD.sleep(.05) + else + THREAD.sleep(.001) + end + end + end) + multi:mainloop() + end,jqc) + end + + jqc = jqc + 1 + + self:create(c) + + return c +end + +function multi:newSystemThreadedConnection(name) + local name = name or multi.randomString(16) + + local c = {} + + c.Type = multi.SCONNECTION + c.CONN = 0x00 + c.TRIG = 0x01 + c.PING = 0x02 + c.PONG = 0x03 + + local subscribe = love.thread.getChannel("SUB_STC_" .. name) + + function c:init() + + self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) + + function self:Fire(...) + local args = multi.pack(...) + if self.CID == THREAD_ID then -- Host Call + for _, link in pairs(self.links) do + love.thread.getChannel(link):push{self.TRIG, args} + end + self.proxy_conn:Fire(...) + else + self.subscribe:push{self.TRIG, args} + end + end + + local multi, thread = require("multi"):init() + self.links = {} + self.proxy_conn = multi:newConnection() + local mt = getmetatable(self.proxy_conn) + setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + if self.CID == THREAD_ID then return self end + thread:newThread("STC_CONN_MAN" .. self.Name,function() + local item + local string_self_ref = "LSF_" .. multi.randomString(16) + local link_self_ref = love.thread.getChannel(string_self_ref) + self.subscribe:push{self.CONN, string_self_ref} + while true do + item = thread.hold(function() + return link_self_ref:peek() + end) + if item[1] == self.PING then + link_self_ref:push{self.PONG} + link_self_ref:pop() + elseif item[1] == self.CONN then + if string_self_ref ~= item[2] then + table.insert(self.links, love.thread.getChannel(item[2])) + end + link_self_ref:pop() + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(multi.unpack(item[2])) + link_self_ref:pop() + else + -- This shouldn't be the case + end + end + end).OnError(multi.error) + return self + end + + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end + + c.CID = THREAD_ID + c.Name = name + c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + + -- Locals will only live in the thread that creates the original object + local ping + local pong = function(link, links) + local res = thread.hold(function() + return love.thread.getChannel(link):peek()[1] == c.PONG + end,{sleep=3}) + + if not res then + for i=1,#links do + if links[i] == link then + table.remove(links,i,link) + break + end + end + else + love.thread.getChannel(link):pop() + end + end + + ping = thread:newFunction(function(self) + ping:Pause() + + multi.ForEach(self.links, function(link) -- Sync new connections + love.thread.getChannel(link):push{self.PING} + multi:newThread("pong Thread", pong, link, self.links) + end) + + thread.sleep(3) + + ping:Resume() + end, false) + + local function fire(...) + for _, link in pairs(c.links) do + love.thread.getChannel(link):push {c.TRIG, multi.pack(...)} + end + end + + thread:newThread("STC_SUB_MAN"..name,function() + local item + while true do + thread.yield() + -- We need to check on broken connections + ping(c) -- Should return instantlly and process this in another thread + item = thread.hold(function() -- This will keep things held up until there is something to process + return c.subscribe:pop() + end) + if item[1] == c.CONN then + + multi.ForEach(c.links, function(link) -- Sync new connections + love.thread.getChannel(item[2]):push{c.CONN, link} + end) + c.links[#c.links+1] = item[2] + + elseif item[1] == c.TRIG then + fire(multi.unpack(item[2])) + c.proxy_conn:Fire(multi.unpack(item[2])) + end + end + end).OnError(multi.error) + --- ^^^ This will only exist in the init thread + + THREAD.package(name,c) + + self:create(c) + + return c +end +require("multi.integration.sharedExtensions") \ No newline at end of file diff --git a/integration/loveManagerold/init.lua b/integration/loveManagerold/init.lua new file mode 100644 index 0000000..f913103 --- /dev/null +++ b/integration/loveManagerold/init.lua @@ -0,0 +1,136 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] + +if ISTHREAD then + error("You cannot require the loveManager from within a thread!") +end + +local ThreadFileData = [[ +ISTHREAD = true +THREAD = require("multi.integration.loveManager.threads") +sThread = THREAD +__IMPORTS = {...} +__FUNC__=table.remove(__IMPORTS,1) +THREAD_ID=table.remove(__IMPORTS,1) +THREAD_NAME=table.remove(__IMPORTS,1) +math.randomseed(THREAD_ID) +math.random() +math.random() +math.random() +stab = THREAD.createStaticTable(THREAD_NAME .. THREAD_ID) +GLOBAL = THREAD.getGlobal() +if GLOBAL["__env"] then + local env = THREAD.unpackENV(GLOBAL["__env"]) + for i,v in pairs(env) do + _G[i] = v + end +end +multi, thread = require("multi").init() +multi.integration={} +multi.integration.GLOBAL = GLOBAL +multi.integration.THREAD = THREAD +pcall(require,"multi.integration.loveManager.extensions") +pcall(require,"multi.integration.sharedExtensions") +stab["returns"] = {THREAD.loadDump(__FUNC__)(multi.unpack(__IMPORTS))} +]] + +local multi, thread = require("multi"):init() + +local THREAD = {} +_G.THREAD_NAME = "MAIN_THREAD" +_G.THREAD_ID = 0 +multi.integration = {} +local THREAD = require("multi.integration.loveManager.threads") +local GLOBAL = THREAD.getGlobal() +multi.isMainThread = true + +function multi:newSystemThread(name, func, ...) + THREAD_ID = THREAD_ID + 1 + local c = {} + c.Type = multi.STHREAD + c.name = name + c.ID = THREAD_ID + c.thread = love.thread.newThread(ThreadFileData) + c.thread:start(THREAD.dump(func), c.ID, c.name, ...) + c.stab = THREAD.createStaticTable(name .. c.ID) + c.OnDeath = multi:newConnection() + c.OnError = multi:newConnection() + GLOBAL["__THREAD_" .. c.ID] = {ID = c.ID, Name = c.name, Thread = c.thread} + GLOBAL["__THREAD_COUNT"] = THREAD_ID + + function c:getName() return c.name end + thread:newThread(name .. "_System_Thread_Handler",function() + if name == "SystemThreaded Function Handler" then + local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID) + thread.hold(function() + -- While the thread is running we might as well do something in the loop + if status_channel:peek() ~= nil then + c.statusconnector:Fire(multi.unpack(status_channel:pop())) + end + return not c.thread:isRunning() + end) + else + thread.hold(function() return not c.thread:isRunning() end) + end + -- If the thread is not running let's handle that. + local thread_err = c.thread:getError() + if thread_err == "Thread Killed!\1" then + c.OnDeath:Fire("Thread Killed!") + elseif thread_err then + c.OnError:Fire(c, thread_err) + elseif c.stab.returns then + c.OnDeath:Fire(multi.unpack(c.stab.returns)) + c.stab.returns = nil + end + end) + + if self.isActor then + self:create(c) + else + multi.create(multi, c) + end + + return c +end + +function THREAD:newFunction(func, holdme) + return thread:newFunctionBase(function(...) + return multi:newSystemThread("SystemThreaded Function Handler", func, ...) + end, holdme, multi.SFUNCTION)() +end + +THREAD.newSystemThread = function(...) + multi:newSystemThread(...) +end + +function love.threaderror(thread, errorstr) + multi.print("Thread error!\n" .. errorstr) +end + +multi.integration.GLOBAL = GLOBAL +multi.integration.THREAD = THREAD +require("multi.integration.loveManager.extensions") +require("multi.integration.sharedExtensions") +multi.print("Integrated Love Threading!") +return {init = function() return GLOBAL, THREAD end} diff --git a/integration/loveManagerold/threads.lua b/integration/loveManagerold/threads.lua new file mode 100644 index 0000000..6a2f25a --- /dev/null +++ b/integration/loveManagerold/threads.lua @@ -0,0 +1,258 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +require("love.timer") +require("love.system") +require("love.data") +require("love.thread") +local serpent = require("multi.integration.loveManager.serpent") +local multi, thread = require("multi"):init() +local threads = {} + +function threads.loadDump(d) + return loadstring(d:getString()) +end + +function threads.dump(func) + return love.data.newByteData(string.dump(func)) +end + +function threads.packTable(table) + return love.data.newByteData(serpent.dump(table)) +end + +function threads.unpackTable(data) + return serpent.load(data:getString()) +end + +local fRef = {"func",nil} +local function manage(channel, value) + channel:clear() + print("pushing",value) + if type(value) == "table" then + channel:push{"DATA",threads.packTable(value)} + else + channel:push(value) + end +end + +local GNAME = "__GLOBAL_" +local proxy = {} +function threads.set(name,val) + if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end + proxy[name]:performAtomic(manage, val) +end + +function threads.get(name) + if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end + local dat = proxy[name]:peek() + if type(dat)=="table" and dat[1]=="DATA" then + return threads.unpackTable(dat[2]) + else + return dat + end +end + +function threads.waitFor(name) + if thread.isThread() then + return thread.hold(function() + return threads.get(name) + end) + end + while threads.get(name)==nil do + love.timer.sleep(.001) + end + local dat = threads.get(name) + if type(dat) == "table" and dat.init then + dat.init = threads.loadDump(dat.init) + end + return dat +end + +function threads.package(name,val) + local init = val.init + val.init=threads.dump(val.init) + GLOBAL[name]=val + val.init=init +end + +function threads.getCores() + return love.system.getProcessorCount() +end + +function threads.kill() + error("Thread Killed!\1") +end + +function threads.pushStatus(...) + local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__) + local args = multi.pack(...) + status_channel:push(args) +end + +function threads.getThreads() + local t = {} + for i=1,GLOBAL["__THREAD_COUNT"] do + t[#t+1]=GLOBAL["__THREAD_"..i] + end + return t +end + +function threads.getThread(n) + return GLOBAL["__THREAD_"..n] +end + +function threads.sleep(n) + love.timer.sleep(n) +end + +function threads.getGlobal() + return setmetatable({}, + { + __index = function(t, k) + return THREAD.get(k) + end, + __newindex = function(t, k, v) + THREAD.set(k,v) + end + } + ) +end + +function threads.packENV(env) + return threads.packTable(env) +end + +function threads.unpackENV(env) + return threads.unpackTable(env) +end + + +function threads.setENV(env, name) + name = name or "__env" + (threads.getGlobal())[name] = threads.packTable(env) +end + +function threads.getENV(name) + name = name or "__env" + return threads.unpackTable((threads.getGlobal())[name]) +end + +function threads.exposeENV(name) + name = name or "__env" + local env = threads.getENV(name) + for i,v in pairs(env) do + _G[i] = v + end +end + +function threads.createTable(n) + local _proxy = {} + local function set(name,val) + if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end + _proxy[name]:performAtomic(manage, val) + end + local function get(name) + if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end + local dat = _proxy[name]:peek() + if type(dat)=="table" and dat[1]=="DATA" then + return threads.unpackTable(dat[2]) + else + return dat + end + end + return setmetatable({}, + { + __index = function(t, k) + return get(k) + end, + __newindex = function(t, k, v) + set(k,v) + end + } + ) +end + +function threads.getConsole() + local c = {} + c.queue = love.thread.getChannel("__CONSOLE__") + function c.print(...) + c.queue:push(multi.pack(...)) + end + function c.error(err) + c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} + error(err) + end + return c +end + +if not ISTHREAD then + local queue = love.thread.getChannel("__CONSOLE__") + multi:newLoop(function(loop) + dat = queue:pop() + if dat then + print(multi.unpack(dat)) + end + end) +end + +function threads.createStaticTable(n) + local __proxy = {} + local function set(name,val) + if __proxy[name] then return end + local chan = love.thread.getChannel(n..name) + if chan:getCount()>0 then return end + chan:performAtomic(manage, val) + __proxy[name] = val + end + local function get(name) + if __proxy[name] then return __proxy[name] end + local dat = love.thread.getChannel(n..name):peek() + if type(dat)=="table" and dat[1]=="func" then + __proxy[name] = threads.loadDump(dat[2]) + return __proxy[name] + else + __proxy[name] = dat + return __proxy[name] + end + end + return setmetatable({}, + { + __index = function(t, k) + return get(k) + end, + __newindex = function(t, k, v) + set(k,v) + end + } + ) +end + +function threads.hold(n) + local dat + while not(dat) do + dat = n() + end +end + +return threads \ No newline at end of file diff --git a/integration/luvitManager.lua b/integration/luvitManager.lua index c1e8dcb..46a0dc2 100644 --- a/integration/luvitManager.lua +++ b/integration/luvitManager.lua @@ -107,7 +107,7 @@ local function _INIT(luvitThread, timer) local c = {} local __self = c c.name = name - c.Type = "sthread" + c.Type = multi.STHREAD c.thread = {} c.func = string.dump(func) function c:kill() diff --git a/tests/threadtests.lua b/tests/threadtests.lua index 615f85d..f3d8fed 100644 --- a/tests/threadtests.lua +++ b/tests/threadtests.lua @@ -177,13 +177,21 @@ multi:newThread("Scheduler Thread",function() -- multi.error("SystemThreadedConnections: Failed") -- end -- multi.success("SystemThreadedConnections: Ok") - - local stp = multi:newSystemThreadedProcessor(1) - print(stp) - local tloop = stp:newTLoop(nil, 1) - print(2) local proxy_test = false - print(3) + multi:newThread(function() + t, val = thread.hold(function() + return proxy_test + end,{sleep=5}) + if val == multi.TIMEOUT then + multi.error("SystemThreadedProcessor/Proxies: Failed") + end + thread.sleep(1) + os.exit(1) + end) + local stp = multi:newSystemThreadedProcessor(1) + + local tloop = stp:newTLoop(nil, 1) + multi:newSystemThread("Testing proxy copy THREAD",function(tloop) local multi, thread = require("multi"):init() tloop = tloop:init() @@ -200,10 +208,10 @@ multi:newThread("Scheduler Thread",function() end) multi:mainloop() end, tloop:getTransferable()).OnError(multi.error) - print(4) + multi.print("tloop", tloop.Type) multi.print("tloop.OnLoop", tloop.OnLoop.Type) - print(5) + thread:newThread(function() multi.print("Testing holding on a proxy connection!") thread.hold(tloop.OnLoop) @@ -239,7 +247,7 @@ multi:newThread("Scheduler Thread",function() multi.success("SystemThreadedProcessor: OK") we_good = true - multi:Stop() + multi:Stop() -- Needed in love2d tests to stop the main runner os.exit() end).OnError(multi.error)