Working on getting STC working in love2d

This commit is contained in:
= 2022-12-06 00:53:03 -05:00
parent 4cf80517eb
commit 0d6a9f92ad
2 changed files with 66 additions and 261 deletions

View File

@ -22,218 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] ]]
-- TODO make compatible with lovr if not ISTHREAD then
local multi, thread = require("multi").init() multi, thread = require("multi").init()
GLOBAL = multi.integration.GLOBAL GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD THREAD = multi.integration.THREAD
function multi:newSystemThreadedQueue(name) else
local name = name or multi.randomString(16)
local c = {}
c.Name = name
local fRef = {"func",nil}
function c:init()
local q = {}
q.chan = love.thread.getChannel(self.Name)
function q:push(dat)
if type(dat) == "function" then
fRef[2] = THREAD.dump(dat)
self.chan:push(fRef)
return
else
self.chan:push(dat)
end
end
function q:pop()
local dat = self.chan:pop()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
function q:peek()
local dat = self.chan:peek()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
return q
end
THREAD.package(name,c)
return c
end
function multi:newSystemThreadedTable(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
function c:init()
return THREAD.createTable(self.Name)
end
THREAD.package(name,c)
return c
end
local jqc = 1
function multi:newSystemThreadedJobQueue(n)
local c = {}
c.cores = n or THREAD.getCores()
c.registerQueue = {}
c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
c.id = 0
c.OnJobCompleted = multi:newConnection()
local allfunc = 0
function c:doToAll(func)
local f = THREAD.dump(func)
for i = 1, self.cores do
self.queueAll:push({allfunc,f})
end
allfunc = allfunc + 1
end
function c:registerFunction(name,func)
if self.funcs[name] then
error("A function by the name "..name.." has already been registered!")
end
self.funcs[name] = func
end
function c:pushJob(name,...)
self.id = self.id + 1
self.queue:push{name,self.id,...}
return self.id
end
function c:isEmpty()
return queueJob:peek()==nil
end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Destroy()
end
end)
return thread.hold(function()
if rets then
return unpack(rets) or multi.NIL
end
end)
end,holup),name
end
thread:newThread("jobManager",function()
while true do
thread.yield()
local dat = c.queueReturn:pop()
if dat then
c.OnJobCompleted:Fire(unpack(dat))
end
end
end)
for i=1,c.cores do
multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc)
local multi, thread = require("multi"):init()
require("love.timer")
local function atomic(channel)
return channel:pop()
end
local clock = os.clock
local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
local lastProc = clock()
local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
end
end)
thread:newThread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
local dat = queue:performAtomic(atomic)
if dat then
lastProc = os.clock()
local name = table.remove(dat,1)
local id = table.remove(dat,1)
local tab = {funcs[name](unpack(dat))}
table.insert(tab,1,id)
queueReturn:push(tab)
end
end
end):OnError(function(...)
error(...)
end)
thread:newThread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then
THREAD.sleep(.05)
else
THREAD.sleep(.001)
end
end
end)
multi:mainloop()
end,jqc)
end
jqc = jqc + 1
return c
end end
--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
-- TODO make compatible with lovr
local multi, thread = require("multi").init()
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
function multi:newSystemThreadedQueue(name) function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16) local name = name or multi.randomString(16)
local c = {} local c = {}
@ -421,6 +217,59 @@ function multi:newSystemThreadedConnection(name)
c.TRIG = 0x01 c.TRIG = 0x01
c.PING = 0x02 c.PING = 0x02
c.PONG = 0x03 c.PONG = 0x03
local subscribe = multi:newSystemThreadedQueue("SUB_STC_" .. name):init()
function c:init()
self.subscribe = THREAD.waitFor("SUB_STC_" .. self.Name):init()
function self:Fire(...)
local args = {...}
if self.CID == THREAD.getID() then -- Host Call
for _, link in pairs(self.links) do
link:push{self.TRIG, args}
end
self.proxy_conn:Fire(...)
else
self.subscribe:push{self.TRIG, args}
end
end
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
if self.CID == THREAD.getID() then return self end
thread:newThread("STC_CONN_MAN" .. self.Name,function()
local item
local string_self_ref = "LSF_" .. multi.randomString(16)
local link_self_ref = multi:newSystemThreadedQueue():init()
self.subscribe:push{self.CONN, string_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if string_self_ref ~= item[2] then
table.insert(self.links, THREAD.waitFor(item[2]):init())
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end).OnError(print)
return self
end
local function remove(a, b) local function remove(a, b)
local ai = {} local ai = {}
local r = {} local r = {}
@ -431,7 +280,6 @@ function multi:newSystemThreadedConnection(name)
return r return r
end end
c.CID = THREAD.getID() c.CID = THREAD.getID()
c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init()
c.Name = name c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object -- Locals will only live in the thread that creates the original object
@ -455,6 +303,7 @@ function multi:newSystemThreadedConnection(name)
ping = thread:newFunction(function(self) ping = thread:newFunction(function(self)
ping:Pause() ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections multi.ForEach(self.links, function(link) -- Sync new connections
link:push{self.PING} link:push{self.PING}
multi:newThread("pong Thread", pong, link, links) multi:newThread("pong Thread", pong, link, links)
@ -482,63 +331,18 @@ function multi:newSystemThreadedConnection(name)
end) end)
if item[1] == c.CONN then if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections multi.ForEach(c.links, function(link) -- Sync new connections
item[2]:push{c.CONN, link} THREAD.waitFor(item[2]):init():push{c.CONN, link.Name}
end) end)
c.links[#c.links+1] = item[2] print("Adding link")
c.links[#c.links+1] = THREAD.waitFor(item[2]):init()
elseif item[1] == c.TRIG then elseif item[1] == c.TRIG then
fire(unpack(item[2])) fire(unpack(item[2]))
c.proxy_conn:Fire(unpack(item[2])) c.proxy_conn:Fire(unpack(item[2]))
end end
end end
end) end).OnError(print)
--- ^^^ This will only exist in the init thread --- ^^^ This will only exist in the init thread
function c:Fire(...)
local args = {...}
if self.CID == THREAD.getID() then -- Host Call
for _, link in pairs(self.links) do
link:push {self.TRIG, args}
end
self.proxy_conn:Fire(...)
else
self.subscribe:push {self.TRIG, args}
end
end
function c:init()
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
if self.CID == THREAD.getID() then return self end
thread:newThread("STC_CONN_MAN"..name,function()
local item
local link_self_ref = multi:newSystemThreadedQueue()
self.subscribe:push{self.CONN, link_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if item[2].Name ~= link_self_ref.Name then
table.insert(self.links, item[2])
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end)
return self
end
THREAD.package(name,c) THREAD.package(name,c)
return c return c

View File

@ -35,6 +35,7 @@ __THREADNAME__=table.remove(__IMPORTS,1)
stab = THREAD.createStaticTable(__THREADNAME__) stab = THREAD.createStaticTable(__THREADNAME__)
GLOBAL = THREAD.getGlobal() GLOBAL = THREAD.getGlobal()
multi, thread = require("multi").init() multi, thread = require("multi").init()
print(pcall(require,"multi.integration.loveManager.extensions"))
stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))} stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))}
]] ]]
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
@ -103,7 +104,7 @@ end
THREAD.newSystemThread = multi.newSystemThread THREAD.newSystemThread = multi.newSystemThread
function love.threaderror(thread, errorstr) function love.threaderror(thread, errorstr)
mulit.print("Thread error!\n"..errorstr) multi.print("Thread error!\n"..errorstr)
end end
multi.integration.GLOBAL = GLOBAL multi.integration.GLOBAL = GLOBAL