Still implementing new love2d threading code

This commit is contained in:
Ryan Ward 2023-07-10 15:54:36 -04:00
parent f4cdde6040
commit e43fe34a7c
12 changed files with 1126 additions and 741 deletions

View File

@ -2405,7 +2405,12 @@ end
function multi.error(self, err)
if type(err) == "bool" then crash = err end
if type(self) == "string" then err = self end
io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. debug.getinfo(2).name .."\n")
local name = debug.getinfo(2).name
if name then
io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. name .."\n")
else
io.write("\x1b[91mERROR:\x1b[0m " .. err .. " ?\n")
end
error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n")
if multi.defaultSettings.error then
os.exit(1)

View File

@ -76,7 +76,7 @@ function multi:newSystemThread(name, func, ...)
c.loadString = {"base","package","os","io","math","table","string","coroutine"}
livingThreads[count] = {true, name}
c.returns = return_linda
c.Type = "sthread"
c.Type = multi.STHREAD
c.creationTime = os.clock()
c.alive = true
c.priority = THREAD.Priority_Normal

View File

@ -66,11 +66,11 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console)
local c = {}
c.queue = __Console
function c.print(...)
c.queue:send("Q", multi.pack(...))
c.queue:push("Q", table.concat(multi.pack(...), "\t"))
end
function c.error(err)
c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__})
error(err)
c.queue:push("Q", "Error in <"..THREAD_NAME..":" .. THREAD_ID .. ">: ".. err)
multi.error(err)
end
return c
end

View File

@ -1,384 +0,0 @@
--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
if not ISTHREAD then
multi, thread = require("multi").init()
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
else
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
c.Type = multi.SQUEUE
local fRef = {"func",nil}
function c:init()
local q = {}
q.chan = love.thread.getChannel(self.Name)
function q:push(dat)
if type(dat) == "function" then
fRef[2] = THREAD.dump(dat)
self.chan:push(fRef)
return
else
self.chan:push(dat)
end
end
function q:pop()
local dat = self.chan:pop()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
function q:peek()
local dat = self.chan:peek()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
return q
end
THREAD.package(name,c)
self:create(c)
return c
end
function multi:newSystemThreadedTable(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
c.Type = multi.STABLE
function c:init()
return THREAD.createTable(self.Name)
end
THREAD.package(name,c)
self:create(c)
return c
end
local jqc = 1
function multi:newSystemThreadedJobQueue(n)
local c = {}
c.cores = n or THREAD.getCores()
c.registerQueue = {}
c.Type = multi.SJOBQUEUE
c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
c.id = 0
c.OnJobCompleted = multi:newConnection()
local allfunc = 0
function c:doToAll(func)
local f = THREAD.dump(func)
for i = 1, self.cores do
self.queueAll:push({allfunc,f})
end
allfunc = allfunc + 1
end
function c:registerFunction(name,func)
if self.funcs[name] then
error("A function by the name "..name.." has already been registered!")
end
self.funcs[name] = func
end
function c:pushJob(name,...)
self.id = self.id + 1
self.queue:push{name,self.id,...}
return self.id
end
function c:isEmpty()
return queueJob:peek()==nil
end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = multi.pack(...)
end
end)
return thread.hold(function()
if rets then
return multi.unpack(rets) or multi.NIL
end
end)
end,holup),name
end
thread:newThread("jobManager",function()
while true do
thread.yield()
local dat = c.queueReturn:pop()
if dat then
c.OnJobCompleted:Fire(multi.unpack(dat))
end
end
end)
for i=1,c.cores do
multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc)
local multi, thread = require("multi"):init()
require("love.timer")
local function atomic(channel)
return channel:pop()
end
local clock = os.clock
local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
local lastProc = clock()
local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
end
end)
thread:newThread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
local dat = queue:performAtomic(atomic)
if dat then
multi:newThread("Test",function()
lastProc = os.clock()
local name = table.remove(dat,1)
local id = table.remove(dat,1)
local tab = {funcs[name](multi.unpack(dat))}
table.insert(tab,1,id)
queueReturn:push(tab)
end)
end
end
end):OnError(function(...)
error(...)
end)
thread:newThread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then
THREAD.sleep(.05)
else
THREAD.sleep(.001)
end
end
end)
multi:mainloop()
end,jqc)
end
jqc = jqc + 1
self:create(c)
return c
end
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.Type = multi.SCONNECTION
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local subscribe = love.thread.getChannel("SUB_STC_" .. name)
function c:init()
self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name)
function self:Fire(...)
local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do
love.thread.getChannel(link):push{self.TRIG, args}
end
self.proxy_conn:Fire(...)
else
self.subscribe:push{self.TRIG, args}
end
end
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
if self.CID == THREAD_ID then return self end
thread:newThread("STC_CONN_MAN" .. self.Name,function()
local item
local string_self_ref = "LSF_" .. multi.randomString(16)
local link_self_ref = love.thread.getChannel(string_self_ref)
self.subscribe:push{self.CONN, string_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if string_self_ref ~= item[2] then
table.insert(self.links, love.thread.getChannel(item[2]))
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(multi.unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end).OnError(multi.error)
return self
end
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.CID = THREAD_ID
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object
local ping
local pong = function(link, links)
local res = thread.hold(function()
return love.thread.getChannel(link):peek()[1] == c.PONG
end,{sleep=3})
if not res then
for i=1,#links do
if links[i] == link then
table.remove(links,i,link)
break
end
end
else
love.thread.getChannel(link):pop()
end
end
ping = thread:newFunction(function(self)
ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections
love.thread.getChannel(link):push{self.PING}
multi:newThread("pong Thread", pong, link, self.links)
end)
thread.sleep(3)
ping:Resume()
end, false)
local function fire(...)
for _, link in pairs(c.links) do
love.thread.getChannel(link):push {c.TRIG, multi.pack(...)}
end
end
thread:newThread("STC_SUB_MAN"..name,function()
local item
while true do
thread.yield()
-- We need to check on broken connections
ping(c) -- Should return instantlly and process this in another thread
item = thread.hold(function() -- This will keep things held up until there is something to process
return c.subscribe:pop()
end)
if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections
love.thread.getChannel(item[2]):push{c.CONN, link}
end)
c.links[#c.links+1] = item[2]
elseif item[1] == c.TRIG then
fire(multi.unpack(item[2]))
c.proxy_conn:Fire(multi.unpack(item[2]))
end
end
end).OnError(multi.error)
--- ^^^ This will only exist in the init thread
THREAD.package(name,c)
self:create(c)
return c
end
require("multi.integration.sharedExtensions")

View File

@ -1,135 +1,8 @@
--[[
MIT License
local GLOBAL, THREAD = require("multi.integration.loveManager.threads"):init()
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
if ISTHREAD then
error("You cannot require the loveManager from within a thread!")
return {
init = function(global_channel, console_channel, status_channel)
return GLOBAL, THREAD
end
local ThreadFileData = [[
ISTHREAD = true
THREAD = require("multi.integration.loveManager.threads")
sThread = THREAD
__IMPORTS = {...}
__FUNC__=table.remove(__IMPORTS,1)
THREAD_ID=table.remove(__IMPORTS,1)
THREAD_NAME=table.remove(__IMPORTS,1)
math.randomseed(THREAD_ID)
math.random()
math.random()
math.random()
stab = THREAD.createStaticTable(THREAD_NAME .. THREAD_ID)
GLOBAL = THREAD.getGlobal()
if GLOBAL["__env"] then
local env = THREAD.unpackENV(GLOBAL["__env"])
for i,v in pairs(env) do
_G[i] = v
end
end
multi, thread = require("multi").init()
multi.integration={}
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
pcall(require,"multi.integration.loveManager.extensions")
pcall(require,"multi.integration.sharedExtensions")
stab["returns"] = {THREAD.loadDump(__FUNC__)(multi.unpack(__IMPORTS))}
]]
local multi, thread = require("multi"):init()
local THREAD = {}
_G.THREAD_NAME = "MAIN_THREAD"
_G.THREAD_ID = 0
multi.integration = {}
local THREAD = require("multi.integration.loveManager.threads")
local GLOBAL = THREAD.getGlobal()
multi.isMainThread = true
function multi:newSystemThread(name, func, ...)
THREAD_ID = THREAD_ID + 1
local c = {}
c.name = name
c.ID = THREAD_ID
c.thread = love.thread.newThread(ThreadFileData)
c.thread:start(THREAD.dump(func), c.ID, c.name, ...)
c.stab = THREAD.createStaticTable(name .. c.ID)
c.OnDeath = multi:newConnection()
c.OnError = multi:newConnection()
GLOBAL["__THREAD_" .. c.ID] = {ID = c.ID, Name = c.name, Thread = c.thread}
GLOBAL["__THREAD_COUNT"] = THREAD_ID
function c:getName() return c.name end
thread:newThread(name .. "_System_Thread_Handler",function()
if name == "SystemThreaded Function Handler" then
local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID)
thread.hold(function()
-- While the thread is running we might as well do something in the loop
if status_channel:peek() ~= nil then
c.statusconnector:Fire(multi.unpack(status_channel:pop()))
end
return not c.thread:isRunning()
end)
else
thread.hold(function() return not c.thread:isRunning() end)
end
-- If the thread is not running let's handle that.
local thread_err = c.thread:getError()
if thread_err == "Thread Killed!\1" then
c.OnDeath:Fire("Thread Killed!")
elseif thread_err then
c.OnError:Fire(c, thread_err)
elseif c.stab.returns then
c.OnDeath:Fire(multi.unpack(c.stab.returns))
c.stab.returns = nil
end
end)
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c
end
function THREAD:newFunction(func, holdme)
return thread:newFunctionBase(function(...)
return multi:newSystemThread("SystemThreaded Function Handler", func, ...)
end, holdme, multi.SFUNCTION)()
end
THREAD.newSystemThread = function(...)
multi:newSystemThread(...)
end
function love.threaderror(thread, errorstr)
multi.print("Thread error!\n" .. errorstr)
end
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
require("multi.integration.loveManager.extensions")
require("multi.integration.sharedExtensions")
multi.print("Integrated Love Threading!")
return {init = function() return GLOBAL, THREAD end}
}

View File

@ -0,0 +1,176 @@
--[[
Serpent source is released under the MIT License
Copyright (c) 2012-2018 Paul Kulchenko (paul@kulchenko.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
]]
local n, v = "serpent", "0.303" -- (C) 2012-18 Paul Kulchenko; MIT License
local c, d = "Paul Kulchenko", "Lua serializer and pretty printer"
local snum = {[tostring(1/0)]='1/0 --[[math.huge]]',[tostring(-1/0)]='-1/0 --[[-math.huge]]',[tostring(0/0)]='0/0'}
local badtype = {thread = true, userdata = true, cdata = true}
local getmetatable = debug and debug.getmetatable or getmetatable
local pairs = function(t) return next, t end -- avoid using __pairs in Lua 5.2+
local keyword, globals, G = {}, {}, (_G or _ENV)
for _,k in ipairs({'and', 'break', 'do', 'else', 'elseif', 'end', 'false',
'for', 'function', 'goto', 'if', 'in', 'local', 'nil', 'not', 'or', 'repeat',
'return', 'then', 'true', 'until', 'while'}) do keyword[k] = true end
for k,v in pairs(G) do globals[v] = k end -- build func to name mapping
for _,g in ipairs({'coroutine', 'debug', 'io', 'math', 'string', 'table', 'os'}) do
for k,v in pairs(type(G[g]) == 'table' and G[g] or {}) do globals[v] = g..'.'..k end end
local function s(t, opts)
local name, indent, fatal, maxnum = opts.name, opts.indent, opts.fatal, opts.maxnum
local sparse, custom, huge = opts.sparse, opts.custom, not opts.nohuge
local space, maxl = (opts.compact and '' or ' '), (opts.maxlevel or math.huge)
local maxlen, metatostring = tonumber(opts.maxlength), opts.metatostring
local iname, comm = '_'..(name or ''), opts.comment and (tonumber(opts.comment) or math.huge)
local numformat = opts.numformat or "%.17g"
local seen, sref, syms, symn = {}, {'local '..iname..'={}'}, {}, 0
local function gensym(val) return '_'..(tostring(tostring(val)):gsub("[^%w]",""):gsub("(%d%w+)",
-- tostring(val) is needed because __tostring may return a non-string value
function(s) if not syms[s] then symn = symn+1; syms[s] = symn end return tostring(syms[s]) end)) end
local function safestr(s) return type(s) == "number" and (huge and snum[tostring(s)] or numformat:format(s))
or type(s) ~= "string" and tostring(s) -- escape NEWLINE/010 and EOF/026
or ("%q"):format(s):gsub("\010","n"):gsub("\026","\\026") end
-- handle radix changes in some locales
if opts.fixradix and (".1f"):format(1.2) ~= "1.2" then
local origsafestr = safestr
safestr = function(s) return type(s) == "number"
and (nohuge and snum[tostring(s)] or numformat:format(s):gsub(",",".")) or origsafestr(s)
end
end
local function comment(s,l) return comm and (l or 0) < comm and ' --[['..select(2, pcall(tostring, s))..']]' or '' end
local function globerr(s,l) return globals[s] and globals[s]..comment(s,l) or not fatal
and safestr(select(2, pcall(tostring, s))) or error("Can't serialize "..tostring(s)) end
local function safename(path, name) -- generates foo.bar, foo[3], or foo['b a r']
local n = name == nil and '' or name
local plain = type(n) == "string" and n:match("^[%l%u_][%w_]*$") and not keyword[n]
local safe = plain and n or '['..safestr(n)..']'
return (path or '')..(plain and path and '.' or '')..safe, safe end
local alphanumsort = type(opts.sortkeys) == 'function' and opts.sortkeys or function(k, o, n) -- k=keys, o=originaltable, n=padding
local maxn, to = tonumber(n) or 12, {number = 'a', string = 'b'}
local function padnum(d) return ("%0"..tostring(maxn).."d"):format(tonumber(d)) end
table.sort(k, function(a,b)
-- sort numeric keys first: k[key] is not nil for numerical keys
return (k[a] ~= nil and 0 or to[type(a)] or 'z')..(tostring(a):gsub("%d+",padnum))
< (k[b] ~= nil and 0 or to[type(b)] or 'z')..(tostring(b):gsub("%d+",padnum)) end) end
local function val2str(t, name, indent, insref, path, plainindex, level)
local ttype, level, mt = type(t), (level or 0), getmetatable(t)
local spath, sname = safename(path, name)
local tag = plainindex and
((type(name) == "number") and '' or name..space..'='..space) or
(name ~= nil and sname..space..'='..space or '')
if seen[t] then -- already seen this element
sref[#sref+1] = spath..space..'='..space..seen[t]
return tag..'nil'..comment('ref', level)
end
-- protect from those cases where __tostring may fail
if type(mt) == 'table' and metatostring ~= false then
local to, tr = pcall(function() return mt.__tostring(t) end)
local so, sr = pcall(function() return mt.__serialize(t) end)
if (to or so) then -- knows how to serialize itself
seen[t] = insref or spath
t = so and sr or tr
ttype = type(t)
end -- new value falls through to be serialized
end
if ttype == "table" then
if level >= maxl then return tag..'{}'..comment('maxlvl', level) end
seen[t] = insref or spath
if next(t) == nil then return tag..'{}'..comment(t, level) end -- table empty
if maxlen and maxlen < 0 then return tag..'{}'..comment('maxlen', level) end
local maxn, o, out = math.min(#t, maxnum or #t), {}, {}
for key = 1, maxn do o[key] = key end
if not maxnum or #o < maxnum then
local n = #o -- n = n + 1; o[n] is much faster than o[#o+1] on large tables
for key in pairs(t) do
if o[key] ~= key then n = n + 1; o[n] = key end
end
end
if maxnum and #o > maxnum then o[maxnum+1] = nil end
if opts.sortkeys and #o > maxn then alphanumsort(o, t, opts.sortkeys) end
local sparse = sparse and #o > maxn -- disable sparsness if only numeric keys (shorter output)
for n, key in ipairs(o) do
local value, ktype, plainindex = t[key], type(key), n <= maxn and not sparse
if opts.valignore and opts.valignore[value] -- skip ignored values; do nothing
or opts.keyallow and not opts.keyallow[key]
or opts.keyignore and opts.keyignore[key]
or opts.valtypeignore and opts.valtypeignore[type(value)] -- skipping ignored value types
or sparse and value == nil then -- skipping nils; do nothing
elseif ktype == 'table' or ktype == 'function' or badtype[ktype] then
if not seen[key] and not globals[key] then
sref[#sref+1] = 'placeholder'
local sname = safename(iname, gensym(key)) -- iname is table for local variables
sref[#sref] = val2str(key,sname,indent,sname,iname,true)
end
sref[#sref+1] = 'placeholder'
local path = seen[t]..'['..tostring(seen[key] or globals[key] or gensym(key))..']'
sref[#sref] = path..space..'='..space..tostring(seen[value] or val2str(value,nil,indent,path))
else
out[#out+1] = val2str(value,key,indent,nil,seen[t],plainindex,level+1)
if maxlen then
maxlen = maxlen - #out[#out]
if maxlen < 0 then break end
end
end
end
local prefix = string.rep(indent or '', level)
local head = indent and '{\n'..prefix..indent or '{'
local body = table.concat(out, ','..(indent and '\n'..prefix..indent or space))
local tail = indent and "\n"..prefix..'}' or '}'
return (custom and custom(tag,head,body,tail,level) or tag..head..body..tail)..comment(t, level)
elseif badtype[ttype] then
seen[t] = insref or spath
return tag..globerr(t, level)
elseif ttype == 'function' then
seen[t] = insref or spath
if opts.nocode then return tag.."function() --[[..skipped..]] end"..comment(t, level) end
local ok, res = pcall(string.dump, t)
local func = ok and "((loadstring or load)("..safestr(res)..",'@serialized'))"..comment(t, level)
return tag..(func or globerr(t, level))
else return tag..safestr(t) end -- handle all other types
end
local sepr = indent and "\n" or ";"..space
local body = val2str(t, name, indent) -- this call also populates sref
local tail = #sref>1 and table.concat(sref, sepr)..sepr or ''
local warn = opts.comment and #sref>1 and space.."--[[incomplete output with shared/self-references skipped]]" or ''
return not name and body..warn or "do local "..body..sepr..tail.."return "..name..sepr.."end"
end
local function deserialize(data, opts)
local env = (opts and opts.safe == false) and G
or setmetatable({}, {
__index = function(t,k) return t end,
__call = function(t,...) error("cannot call functions") end
})
local f, res = (loadstring or load)('return '..data, nil, nil, env)
if not f then f, res = (loadstring or load)(data, nil, nil, env) end
if not f then return f, res end
if setfenv then setfenv(f, env) end
return pcall(f)
end
local function merge(a, b) if b then for k,v in pairs(b) do a[k] = v end end; return a; end
return { _NAME = n, _COPYRIGHT = c, _DESCRIPTION = d, _VERSION = v, serialize = s,
load = deserialize,
dump = function(a, opts) return s(a, merge({name = '_', compact = true, sparse = true}, opts)) end,
line = function(a, opts) return s(a, merge({sortkeys = true, comment = true}, opts)) end,
block = function(a, opts) return s(a, merge({indent = ' ', sortkeys = true, comment = true}, opts)) end }

View File

@ -25,246 +25,170 @@ require("love.timer")
require("love.system")
require("love.data")
require("love.thread")
local serpent = require("multi.integration.loveManager.serpent")
local multi, thread = require("multi"):init()
local threads = {}
function threads.loadDump(d)
return loadstring(d:getString())
local function ltype(data)
local tp = type(data)
if tp == "userdata" then
return data:type()
end
return tp
end
function threads.dump(func)
return love.data.newByteData(string.dump(func))
end
local NIL = love.data.newByteData("\3")
local fRef = {"func",nil}
local function manage(channel, value)
channel:clear()
if type(value) == "function" then
fRef[2] = THREAD.dump(value)
channel:push(fRef)
return
-- If a non table/function is supplied we just return it
local function packValue(t)
local tp = type(t)
if tp == "table" then
return love.data.newByteData("\1"..serpent.dump(t,{safe = true}))
elseif tp == "function" then
return love.data.newByteData("\2"..serpent.dump({t,true},{safe = true}))
else
channel:push(value)
end
end
local GNAME = "__GLOBAL_"
local proxy = {}
function threads.set(name,val)
if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end
proxy[name]:performAtomic(manage, val)
end
function threads.get(name)
if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end
local dat = proxy[name]:peek()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
function threads.waitFor(name)
if thread.isThread() then
return thread.hold(function()
return threads.get(name)
end)
end
while threads.get(name)==nil do
love.timer.sleep(.001)
end
local dat = threads.get(name)
if type(dat) == "table" and dat.init then
dat.init = threads.loadDump(dat.init)
end
return dat
end
function threads.package(name,val)
local init = val.init
val.init=threads.dump(val.init)
GLOBAL[name]=val
val.init=init
end
function threads.getCores()
return love.system.getProcessorCount()
end
function threads.kill()
error("Thread Killed!\1")
end
function threads.pushStatus(...)
local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__)
local args = multi.pack(...)
status_channel:push(args)
end
function threads.getThreads()
local t = {}
for i=1,GLOBAL["__THREAD_COUNT"] do
t[#t+1]=GLOBAL["__THREAD_"..i]
end
return t
end
function threads.getThread(n)
return GLOBAL["__THREAD_"..n]
end
function threads.sleep(n)
love.timer.sleep(n)
-- If a non table/function is supplied we just return it
local function unpackValue(d)
if ltype(d) == "ByteData" then
local data = d:getString()
if data:sub(1, 1) == "\1" then
local status, data = serpent.load(data:sub(2,-1),{safe = false})
if not status then
multi.error(data)
end
return data
elseif data:sub(1, 1) =="\2" then
local status, data = serpent.load(data:sub(2,-1),{safe = false})
if not status then
multi.error(data)
end
return serpent.load(data:sub(2,-1))[1]
else
return d
end
else
return d
end
end
function threads.getGlobal()
local function createTable(n)
if not n then
n = "STAB"..multi.randomString(8)
end
local __proxy = {}
local function set(name, val)
local chan = love.thread.getChannel(n .. name)
if chan:getCount() == 1 then chan:pop() end
__proxy[name] = true
chan:push(packValue(val))
end
local function get(name)
local dat = love.thread.getChannel(n .. name):peek()
return unpackValue(dat)
end
return setmetatable({},
{
__index = function(t, k)
return THREAD.get(k)
return get(k)
end,
__newindex = function(t, k, v)
THREAD.set(k,v)
set(k,v)
end
}
)
end
function threads.packENV(env)
local e = {}
for i,v in pairs(env) do
if type(v) == "function" then
e["$f"..i] = string.dump(v)
elseif type(v) == "table" then
e["$t"..i] = threads.packENV(v)
else
e[i] = v
end
end
return e
function INIT(global_channel, console_channel, status_channel)
local GLOBAL, THREAD = createTable("GLOBAL"), {}
-- Non portable methods, shouldn't be used unless you know what you are doing
THREAD.packValue = packValue
THREAD.unpackValue = unpackValue
THREAD.createTable = createTable
function THREAD.set(name, val)
GLOBAL[name] = val
end
function threads.unpackENV(env)
local e = {}
for i,v in pairs(env) do
if type(i) == "string" and i:sub(1,2) == "$f" then
e[i:sub(3,-1)] = loadstring(v)
elseif type(i) == "string" and i:sub(1,2) == "$t" then
e[i:sub(3,-1)] = threads.unpackENV(v)
else
e[i] = v
end
end
return e
function THREAD.get(name, val)
return GLOBAL[name]
end
function THREAD.waitFor(name)
local function wait()
math.randomseed(os.time())
love.timer.sleep(.001)
end
repeat
wait()
until GLOBAL[name]
return GLOBAL[name]
end
function threads.setENV(env, name)
function THREAD.getCores()
return love.system.getProcessorCount()
end
function THREAD.getConsole()
local c = {}
c.queue = console_channel
function c.print(...)
c.queue:push(table.concat(multi.pack(...), "\t"))
end
function c.error(err)
c.queue:push("Error in <"..THREAD_NAME..":" .. THREAD_ID .. ">: ".. err)
multi.error(err)
end
return c
end
function THREAD.getThreads()
--
end
function THREAD.kill() -- trigger the lane destruction
error("Thread was killed!\1")
end
function THREAD.pushStatus(...)
status_channel:push({THREAD_ID, multi.pack(...)})
end
_G.THREAD_ID = 0
function THREAD.sleep(n)
love.timer.sleep(n)
end
function THREAD.hold(n)
--
end
function THREAD.setENV(env, name)
GLOBAL[name or "__env"] = env
end
function THREAD.getENV(name)
return GLOBAL[name or "__env"]
end
function THREAD.exposeENV(name)
name = name or "__env"
(threads.getGlobal())[name] = threads.packENV(env)
end
function threads.getENV(name)
name = name or "__env"
return threads.unpackENV((threads.getGlobal())[name])
end
function threads.exposeENV(name)
name = name or "__env"
local env = threads.getENV(name)
local env = THREAD.getENV(name)
for i,v in pairs(env) do
_G[i] = v
end
end
function threads.createTable(n)
local _proxy = {}
local function set(name,val)
if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end
_proxy[name]:performAtomic(manage, val)
return GLOBAL, THREAD
end
local function get(name)
if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end
local dat = _proxy[name]:peek()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
return setmetatable({},
{
__index = function(t, k)
return get(k)
end,
__newindex = function(t, k, v)
set(k,v)
return {
-- These are the acutal channels
init = function(global_channel, console_channel, status_channel)
return INIT(global_channel, console_channel, status_channel)
end
}
)
end
function threads.getConsole()
local c = {}
c.queue = love.thread.getChannel("__CONSOLE__")
function c.print(...)
c.queue:push(multi.pack(...))
end
function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}
error(err)
end
return c
end
if not ISTHREAD then
local queue = love.thread.getChannel("__CONSOLE__")
multi:newLoop(function(loop)
dat = queue:pop()
if dat then
print(multi.unpack(dat))
end
end)
end
function threads.createStaticTable(n)
local __proxy = {}
local function set(name,val)
if __proxy[name] then return end
local chan = love.thread.getChannel(n..name)
if chan:getCount()>0 then return end
chan:performAtomic(manage, val)
__proxy[name] = val
end
local function get(name)
if __proxy[name] then return __proxy[name] end
local dat = love.thread.getChannel(n..name):peek()
if type(dat)=="table" and dat[1]=="func" then
__proxy[name] = THREAD.loadDump(dat[2])
return __proxy[name]
else
__proxy[name] = dat
return __proxy[name]
end
end
return setmetatable({},
{
__index = function(t, k)
return get(k)
end,
__newindex = function(t, k, v)
set(k,v)
end
}
)
end
function threads.hold(n)
local dat
while not(dat) do
dat = n()
end
end
return threads

View File

@ -0,0 +1,389 @@
--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
if not ISTHREAD then
multi, thread = require("multi").init()
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
else
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
c.Type = multi.SQUEUE
local fRef = {"\2",nil}
function c:init()
local q = {}
q.chan = love.thread.getChannel(self.Name)
function q:push(dat)
if type(dat) == "table" then
self.chan:push{"DATA",THREAD.packTable(dat)}
else
self.chan:push(dat)
end
-- if type(dat) == "function" then
-- fRef[2] = THREAD.dump(dat)
-- self.chan:push(fRef)
-- return
-- else
-- self.chan:push(dat)
-- end
end
function q:pop()
local dat = self.chan:pop()
if type(dat)=="table" and dat[1]=="DATA" then
return THREAD.unpackTable(dat[2])--THREAD.loadDump(dat[2])
else
return dat
end
end
function q:peek()
local dat = self.chan:peek()
if type(dat)=="table" and dat[1]=="DATA" then
return THREAD.unpackTable(dat[2])--THREAD.loadDump(dat[2])
else
return dat
end
end
return q
end
THREAD.package(name,c)
self:create(c)
return c
end
function multi:newSystemThreadedTable(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
c.Type = multi.STABLE
function c:init()
return THREAD.createTable(self.Name)
end
THREAD.package(name,c)
self:create(c)
return c
end
local jqc = 1
function multi:newSystemThreadedJobQueue(n)
local c = {}
c.cores = n or THREAD.getCores()
c.registerQueue = {}
c.Type = multi.SJOBQUEUE
c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
c.id = 0
c.OnJobCompleted = multi:newConnection()
local allfunc = 0
function c:doToAll(func)
local f = THREAD.dump(func)
for i = 1, self.cores do
self.queueAll:push({allfunc,f})
end
allfunc = allfunc + 1
end
function c:registerFunction(name,func)
if self.funcs[name] then
error("A function by the name "..name.." has already been registered!")
end
self.funcs[name] = func
end
function c:pushJob(name,...)
self.id = self.id + 1
self.queue:push{name,self.id,...}
return self.id
end
function c:isEmpty()
return queueJob:peek()==nil
end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = multi.pack(...)
end
end)
return thread.hold(function()
if rets then
return multi.unpack(rets) or multi.NIL
end
end)
end,holup),name
end
thread:newThread("jobManager",function()
while true do
thread.yield()
local dat = c.queueReturn:pop()
if dat then
c.OnJobCompleted:Fire(multi.unpack(dat))
end
end
end)
for i=1,c.cores do
multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc)
local multi, thread = require("multi"):init()
require("love.timer")
local function atomic(channel)
return channel:pop()
end
local clock = os.clock
local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
local lastProc = clock()
local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
end
end)
thread:newThread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
local dat = queue:performAtomic(atomic)
if dat then
multi:newThread("Test",function()
lastProc = os.clock()
local name = table.remove(dat,1)
local id = table.remove(dat,1)
local tab = {funcs[name](multi.unpack(dat))}
table.insert(tab,1,id)
queueReturn:push(tab)
end)
end
end
end):OnError(function(...)
error(...)
end)
thread:newThread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then
THREAD.sleep(.05)
else
THREAD.sleep(.001)
end
end
end)
multi:mainloop()
end,jqc)
end
jqc = jqc + 1
self:create(c)
return c
end
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.Type = multi.SCONNECTION
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local subscribe = love.thread.getChannel("SUB_STC_" .. name)
function c:init()
self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name)
function self:Fire(...)
local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do
love.thread.getChannel(link):push{self.TRIG, args}
end
self.proxy_conn:Fire(...)
else
self.subscribe:push{self.TRIG, args}
end
end
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
if self.CID == THREAD_ID then return self end
thread:newThread("STC_CONN_MAN" .. self.Name,function()
local item
local string_self_ref = "LSF_" .. multi.randomString(16)
local link_self_ref = love.thread.getChannel(string_self_ref)
self.subscribe:push{self.CONN, string_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if string_self_ref ~= item[2] then
table.insert(self.links, love.thread.getChannel(item[2]))
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(multi.unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end).OnError(multi.error)
return self
end
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.CID = THREAD_ID
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object
local ping
local pong = function(link, links)
local res = thread.hold(function()
return love.thread.getChannel(link):peek()[1] == c.PONG
end,{sleep=3})
if not res then
for i=1,#links do
if links[i] == link then
table.remove(links,i,link)
break
end
end
else
love.thread.getChannel(link):pop()
end
end
ping = thread:newFunction(function(self)
ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections
love.thread.getChannel(link):push{self.PING}
multi:newThread("pong Thread", pong, link, self.links)
end)
thread.sleep(3)
ping:Resume()
end, false)
local function fire(...)
for _, link in pairs(c.links) do
love.thread.getChannel(link):push {c.TRIG, multi.pack(...)}
end
end
thread:newThread("STC_SUB_MAN"..name,function()
local item
while true do
thread.yield()
-- We need to check on broken connections
ping(c) -- Should return instantlly and process this in another thread
item = thread.hold(function() -- This will keep things held up until there is something to process
return c.subscribe:pop()
end)
if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections
love.thread.getChannel(item[2]):push{c.CONN, link}
end)
c.links[#c.links+1] = item[2]
elseif item[1] == c.TRIG then
fire(multi.unpack(item[2]))
c.proxy_conn:Fire(multi.unpack(item[2]))
end
end
end).OnError(multi.error)
--- ^^^ This will only exist in the init thread
THREAD.package(name,c)
self:create(c)
return c
end
require("multi.integration.sharedExtensions")

View File

@ -0,0 +1,136 @@
--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
if ISTHREAD then
error("You cannot require the loveManager from within a thread!")
end
local ThreadFileData = [[
ISTHREAD = true
THREAD = require("multi.integration.loveManager.threads")
sThread = THREAD
__IMPORTS = {...}
__FUNC__=table.remove(__IMPORTS,1)
THREAD_ID=table.remove(__IMPORTS,1)
THREAD_NAME=table.remove(__IMPORTS,1)
math.randomseed(THREAD_ID)
math.random()
math.random()
math.random()
stab = THREAD.createStaticTable(THREAD_NAME .. THREAD_ID)
GLOBAL = THREAD.getGlobal()
if GLOBAL["__env"] then
local env = THREAD.unpackENV(GLOBAL["__env"])
for i,v in pairs(env) do
_G[i] = v
end
end
multi, thread = require("multi").init()
multi.integration={}
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
pcall(require,"multi.integration.loveManager.extensions")
pcall(require,"multi.integration.sharedExtensions")
stab["returns"] = {THREAD.loadDump(__FUNC__)(multi.unpack(__IMPORTS))}
]]
local multi, thread = require("multi"):init()
local THREAD = {}
_G.THREAD_NAME = "MAIN_THREAD"
_G.THREAD_ID = 0
multi.integration = {}
local THREAD = require("multi.integration.loveManager.threads")
local GLOBAL = THREAD.getGlobal()
multi.isMainThread = true
function multi:newSystemThread(name, func, ...)
THREAD_ID = THREAD_ID + 1
local c = {}
c.Type = multi.STHREAD
c.name = name
c.ID = THREAD_ID
c.thread = love.thread.newThread(ThreadFileData)
c.thread:start(THREAD.dump(func), c.ID, c.name, ...)
c.stab = THREAD.createStaticTable(name .. c.ID)
c.OnDeath = multi:newConnection()
c.OnError = multi:newConnection()
GLOBAL["__THREAD_" .. c.ID] = {ID = c.ID, Name = c.name, Thread = c.thread}
GLOBAL["__THREAD_COUNT"] = THREAD_ID
function c:getName() return c.name end
thread:newThread(name .. "_System_Thread_Handler",function()
if name == "SystemThreaded Function Handler" then
local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID)
thread.hold(function()
-- While the thread is running we might as well do something in the loop
if status_channel:peek() ~= nil then
c.statusconnector:Fire(multi.unpack(status_channel:pop()))
end
return not c.thread:isRunning()
end)
else
thread.hold(function() return not c.thread:isRunning() end)
end
-- If the thread is not running let's handle that.
local thread_err = c.thread:getError()
if thread_err == "Thread Killed!\1" then
c.OnDeath:Fire("Thread Killed!")
elseif thread_err then
c.OnError:Fire(c, thread_err)
elseif c.stab.returns then
c.OnDeath:Fire(multi.unpack(c.stab.returns))
c.stab.returns = nil
end
end)
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c
end
function THREAD:newFunction(func, holdme)
return thread:newFunctionBase(function(...)
return multi:newSystemThread("SystemThreaded Function Handler", func, ...)
end, holdme, multi.SFUNCTION)()
end
THREAD.newSystemThread = function(...)
multi:newSystemThread(...)
end
function love.threaderror(thread, errorstr)
multi.print("Thread error!\n" .. errorstr)
end
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
require("multi.integration.loveManager.extensions")
require("multi.integration.sharedExtensions")
multi.print("Integrated Love Threading!")
return {init = function() return GLOBAL, THREAD end}

View File

@ -0,0 +1,258 @@
--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
require("love.timer")
require("love.system")
require("love.data")
require("love.thread")
local serpent = require("multi.integration.loveManager.serpent")
local multi, thread = require("multi"):init()
local threads = {}
function threads.loadDump(d)
return loadstring(d:getString())
end
function threads.dump(func)
return love.data.newByteData(string.dump(func))
end
function threads.packTable(table)
return love.data.newByteData(serpent.dump(table))
end
function threads.unpackTable(data)
return serpent.load(data:getString())
end
local fRef = {"func",nil}
local function manage(channel, value)
channel:clear()
print("pushing",value)
if type(value) == "table" then
channel:push{"DATA",threads.packTable(value)}
else
channel:push(value)
end
end
local GNAME = "__GLOBAL_"
local proxy = {}
function threads.set(name,val)
if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end
proxy[name]:performAtomic(manage, val)
end
function threads.get(name)
if not proxy[name] then proxy[name] = love.thread.getChannel(GNAME..name) end
local dat = proxy[name]:peek()
if type(dat)=="table" and dat[1]=="DATA" then
return threads.unpackTable(dat[2])
else
return dat
end
end
function threads.waitFor(name)
if thread.isThread() then
return thread.hold(function()
return threads.get(name)
end)
end
while threads.get(name)==nil do
love.timer.sleep(.001)
end
local dat = threads.get(name)
if type(dat) == "table" and dat.init then
dat.init = threads.loadDump(dat.init)
end
return dat
end
function threads.package(name,val)
local init = val.init
val.init=threads.dump(val.init)
GLOBAL[name]=val
val.init=init
end
function threads.getCores()
return love.system.getProcessorCount()
end
function threads.kill()
error("Thread Killed!\1")
end
function threads.pushStatus(...)
local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__)
local args = multi.pack(...)
status_channel:push(args)
end
function threads.getThreads()
local t = {}
for i=1,GLOBAL["__THREAD_COUNT"] do
t[#t+1]=GLOBAL["__THREAD_"..i]
end
return t
end
function threads.getThread(n)
return GLOBAL["__THREAD_"..n]
end
function threads.sleep(n)
love.timer.sleep(n)
end
function threads.getGlobal()
return setmetatable({},
{
__index = function(t, k)
return THREAD.get(k)
end,
__newindex = function(t, k, v)
THREAD.set(k,v)
end
}
)
end
function threads.packENV(env)
return threads.packTable(env)
end
function threads.unpackENV(env)
return threads.unpackTable(env)
end
function threads.setENV(env, name)
name = name or "__env"
(threads.getGlobal())[name] = threads.packTable(env)
end
function threads.getENV(name)
name = name or "__env"
return threads.unpackTable((threads.getGlobal())[name])
end
function threads.exposeENV(name)
name = name or "__env"
local env = threads.getENV(name)
for i,v in pairs(env) do
_G[i] = v
end
end
function threads.createTable(n)
local _proxy = {}
local function set(name,val)
if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end
_proxy[name]:performAtomic(manage, val)
end
local function get(name)
if not _proxy[name] then _proxy[name] = love.thread.getChannel(n..name) end
local dat = _proxy[name]:peek()
if type(dat)=="table" and dat[1]=="DATA" then
return threads.unpackTable(dat[2])
else
return dat
end
end
return setmetatable({},
{
__index = function(t, k)
return get(k)
end,
__newindex = function(t, k, v)
set(k,v)
end
}
)
end
function threads.getConsole()
local c = {}
c.queue = love.thread.getChannel("__CONSOLE__")
function c.print(...)
c.queue:push(multi.pack(...))
end
function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}
error(err)
end
return c
end
if not ISTHREAD then
local queue = love.thread.getChannel("__CONSOLE__")
multi:newLoop(function(loop)
dat = queue:pop()
if dat then
print(multi.unpack(dat))
end
end)
end
function threads.createStaticTable(n)
local __proxy = {}
local function set(name,val)
if __proxy[name] then return end
local chan = love.thread.getChannel(n..name)
if chan:getCount()>0 then return end
chan:performAtomic(manage, val)
__proxy[name] = val
end
local function get(name)
if __proxy[name] then return __proxy[name] end
local dat = love.thread.getChannel(n..name):peek()
if type(dat)=="table" and dat[1]=="func" then
__proxy[name] = threads.loadDump(dat[2])
return __proxy[name]
else
__proxy[name] = dat
return __proxy[name]
end
end
return setmetatable({},
{
__index = function(t, k)
return get(k)
end,
__newindex = function(t, k, v)
set(k,v)
end
}
)
end
function threads.hold(n)
local dat
while not(dat) do
dat = n()
end
end
return threads

View File

@ -107,7 +107,7 @@ local function _INIT(luvitThread, timer)
local c = {}
local __self = c
c.name = name
c.Type = "sthread"
c.Type = multi.STHREAD
c.thread = {}
c.func = string.dump(func)
function c:kill()

View File

@ -177,13 +177,21 @@ multi:newThread("Scheduler Thread",function()
-- multi.error("SystemThreadedConnections: Failed")
-- end
-- multi.success("SystemThreadedConnections: Ok")
local stp = multi:newSystemThreadedProcessor(1)
print(stp)
local tloop = stp:newTLoop(nil, 1)
print(2)
local proxy_test = false
print(3)
multi:newThread(function()
t, val = thread.hold(function()
return proxy_test
end,{sleep=5})
if val == multi.TIMEOUT then
multi.error("SystemThreadedProcessor/Proxies: Failed")
end
thread.sleep(1)
os.exit(1)
end)
local stp = multi:newSystemThreadedProcessor(1)
local tloop = stp:newTLoop(nil, 1)
multi:newSystemThread("Testing proxy copy THREAD",function(tloop)
local multi, thread = require("multi"):init()
tloop = tloop:init()
@ -200,10 +208,10 @@ multi:newThread("Scheduler Thread",function()
end)
multi:mainloop()
end, tloop:getTransferable()).OnError(multi.error)
print(4)
multi.print("tloop", tloop.Type)
multi.print("tloop.OnLoop", tloop.OnLoop.Type)
print(5)
thread:newThread(function()
multi.print("Testing holding on a proxy connection!")
thread.hold(tloop.OnLoop)
@ -239,7 +247,7 @@ multi:newThread("Scheduler Thread",function()
multi.success("SystemThreadedProcessor: OK")
we_good = true
multi:Stop()
multi:Stop() -- Needed in love2d tests to stop the main runner
os.exit()
end).OnError(multi.error)