working on STC

This commit is contained in:
Ryan Ward 2022-09-22 10:29:11 -04:00
parent 3d07e774a0
commit a14cbb45d3
6 changed files with 543 additions and 21 deletions

View File

@ -109,6 +109,11 @@ function multi:getStats()
end end
--Helpers --Helpers
function multi.ForEach(tab,func)
for i=1,#tab do func(tab[i]) end
end
local ignoreconn = true local ignoreconn = true
function multi:newConnection(protect,func,kill) function multi:newConnection(protect,func,kill)
local c={} local c={}

View File

@ -22,9 +22,15 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] ]]
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD if not (GLOBAL and THREAD) then
local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD
else
lanes = require("lanes")
end
function multi:newSystemThreadedQueue(name) function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16)
local c = {} local c = {}
c.Name = name
c.linda = lanes.linda() c.linda = lanes.linda()
function c:push(v) function c:push(v)
self.linda:send("Q", v) self.linda:send("Q", v)
@ -41,9 +47,12 @@ function multi:newSystemThreadedQueue(name)
GLOBAL[name or "_"] = c GLOBAL[name or "_"] = c
return c return c
end end
function multi:newSystemThreadedTable(name) function multi:newSystemThreadedTable(name)
local name = name or multi.randomString(16)
local c = {} local c = {}
c.link = lanes.linda() c.link = lanes.linda()
c.Name = name
setmetatable(c,{ setmetatable(c,{
__index = function(t,k) __index = function(t,k)
return c.link:get(k) return c.link:get(k)
@ -58,14 +67,15 @@ function multi:newSystemThreadedTable(name)
GLOBAL[name or "_"] = c GLOBAL[name or "_"] = c
return c return c
end end
function multi:newSystemThreadedJobQueue(n) function multi:newSystemThreadedJobQueue(n)
local c = {} local c = {}
c.cores = n or THREAD.getCores()*2 c.cores = n or THREAD.getCores()*2
c.OnJobCompleted = multi:newConnection() c.OnJobCompleted = multi:newConnection()
local funcs = multi:newSystemThreadedTable() local funcs = multi:newSystemThreadedTable():init()
local queueJob = multi:newSystemThreadedQueue() local queueJob = multi:newSystemThreadedQueue():init()
local queueReturn = multi:newSystemThreadedQueue() local queueReturn = multi:newSystemThreadedQueue():init()
local doAll = multi:newSystemThreadedQueue() local doAll = multi:newSystemThreadedQueue():init()
local ID=1 local ID=1
local jid = 1 local jid = 1
function c:isEmpty() function c:isEmpty()
@ -169,3 +179,127 @@ function multi:newSystemThreadedJobQueue(n)
end end
return c return c
end end
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.CID = THREAD.getID()
c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init()
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object
local ping
local pong = function(link, links)
local res = thread.hold(function()
return link:peek()[1] == c.PONG
end,{sleep=3})
if not res then
for i=1,#links do
if links[i] == link then
table.remove(links,i,link)
break
end
end
else
link:pop()
end
end
ping = thread:newFunction(function(self)
ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections
link:push{self.PING}
multi:newThread("pong Thread", pong, link, links)
end)
thread.sleep(3)
ping:Resume()
end,false)
thread:newThread("STC_SUB_MAN"..name,function()
local item
while true do
thread.yield()
-- We need to check on broken connections
ping(c) -- Should return instantlly and process this in another thread
item = thread.hold(function() -- This will keep things held up until there is something to process
return c.subscribe:pop()
end)
if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections
item[2]:push{c.CONN, link}
end)
c.links[#c.links+1] = item[2]
elseif item[1] == c.TRIG then
c:Fire(unpack(item[2]))
c.proxy_conn:Fire(unpack(item[2]))
end
end
end).OnError(print)
--- ^^^ This will only exist in the init thread
function c:Fire(...)
local args = {...}
if self.CID == THREAD.getID() then -- Host Call
for _, link in pairs(self.links) do
link:push {self.TRIG, args}
end
--c.proxy_conn:Fire(...)
else
self.subscribe:push {self.TRIG, args}
end
end
function c:init()
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
if self.CID == THREAD.getID() then return self end
thread:newThread("STC_CONN_MAN"..name,function()
local item
local link_self_ref = multi:newSystemThreadedQueue()
self.subscribe:push{self.CONN, link_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if item[2].Name ~= link_self_ref.Name then
table.insert(self.links, item[2])
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end).OnError(print)
return self
end
GLOBAL[name] = c
return c
end

View File

@ -87,6 +87,7 @@ function multi:newSystemThread(name, func, ...)
}, },
priority=c.priority priority=c.priority
},function(...) },function(...)
require("multi.integration.lanesManager.extensions")
local has_error = true local has_error = true
return_linda:set("returns",{func(...)}) return_linda:set("returns",{func(...)})
has_error = false has_error = false
@ -119,6 +120,7 @@ function multi.InitSystemThreadErrorHandler()
while true do while true do
thread.yield() thread.yield()
_,data = __ConsoleLinda:receive(0, "Q") _,data = __ConsoleLinda:receive(0, "Q")
if data then print(unpack(data)) end
for i = #threads, 1, -1 do for i = #threads, 1, -1 do
temp = threads[i] temp = threads[i]
status = temp.thread.status status = temp.thread.status

View File

@ -75,7 +75,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda)
c.queue:send("Q", {...}) c.queue:send("Q", {...})
end end
function c.error(err) function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__})
error(err) error(err)
end end
return c return c

View File

@ -203,3 +203,328 @@ function multi:newSystemThreadedJobQueue(n)
jqc = jqc + 1 jqc = jqc + 1
return c return c
end end
--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
-- TODO make compatible with lovr
local multi, thread = require("multi").init()
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
local fRef = {"func",nil}
function c:init()
local q = {}
q.chan = love.thread.getChannel(self.Name)
function q:push(dat)
if type(dat) == "function" then
fRef[2] = THREAD.dump(dat)
self.chan:push(fRef)
return
else
self.chan:push(dat)
end
end
function q:pop()
local dat = self.chan:pop()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
function q:peek()
local dat = self.chan:peek()
if type(dat)=="table" and dat[1]=="func" then
return THREAD.loadDump(dat[2])
else
return dat
end
end
return q
end
THREAD.package(name,c)
return c
end
function multi:newSystemThreadedTable(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
function c:init()
return THREAD.createTable(self.Name)
end
THREAD.package(name,c)
return c
end
local jqc = 1
function multi:newSystemThreadedJobQueue(n)
local c = {}
c.cores = n or THREAD.getCores()
c.registerQueue = {}
c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
c.queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
c.queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
c.queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
c.id = 0
c.OnJobCompleted = multi:newConnection()
local allfunc = 0
function c:doToAll(func)
local f = THREAD.dump(func)
for i = 1, self.cores do
self.queueAll:push({allfunc,f})
end
allfunc = allfunc + 1
end
function c:registerFunction(name,func)
if self.funcs[name] then
error("A function by the name "..name.." has already been registered!")
end
self.funcs[name] = func
end
function c:pushJob(name,...)
self.id = self.id + 1
self.queue:push{name,self.id,...}
return self.id
end
function c:isEmpty()
return queueJob:peek()==nil
end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Destroy()
end
end)
return thread.hold(function()
if rets then
return unpack(rets) or multi.NIL
end
end)
end,holup),name
end
thread:newThread("jobManager",function()
while true do
thread.yield()
local dat = c.queueReturn:pop()
if dat then
c.OnJobCompleted:Fire(unpack(dat))
end
end
end)
for i=1,c.cores do
multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc)
local multi, thread = require("multi"):init()
require("love.timer")
local function atomic(channel)
return channel:pop()
end
local clock = os.clock
local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table")
local queue = love.thread.getChannel("__JobQueue_"..jqc.."_queue")
local queueReturn = love.thread.getChannel("__JobQueue_"..jqc.."_queueReturn")
local lastProc = clock()
local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
end
end)
thread:newThread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
THREAD.loadDump(queueAll:pop()[2])()
end
local dat = queue:performAtomic(atomic)
if dat then
lastProc = os.clock()
local name = table.remove(dat,1)
local id = table.remove(dat,1)
local tab = {funcs[name](unpack(dat))}
table.insert(tab,1,id)
queueReturn:push(tab)
end
end
end):OnError(function(...)
error(...)
end)
thread:newThread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then
THREAD.sleep(.05)
else
THREAD.sleep(.001)
end
end
end)
multi:mainloop()
end,jqc)
end
jqc = jqc + 1
return c
end
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.CID = THREAD.getID()
c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init()
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object
local ping
local pong = function(link, links)
local res = thread.hold(function()
return link:peek()[1] == c.PONG
end,{sleep=3})
if not res then
self.links = remove(self.links, pings)
else
link:pop()
end
end
ping = thread:newFunction(function(self)
ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections
link:push{self.PING}
multi:newThread("pong Thread", pong, link, links)
end)
thread.sleep(3)
ping:Resume()
end,false)
thread:newThread("STC_SUB_MAN"..name,function()
local item
while true do
thread.yield()
-- We need to check on broken connections
ping(c) -- Should return instantlly and process this in another thread
item = thread.hold(function() -- This will keep things held up until there is something to process
return c.subscribe:pop()
end)
if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections
item[2]:push{c.CONN, link}
end)
c.links[#c.links+1] = item[2]
elseif item[1] == c.TRIG then
c:Fire(unpack(item[2]))
c.proxy_conn:Fire(unpack(item[2]))
end
end
end)
--- ^^^ This will only exist in the init thread
function c:Fire(...)
local args = {...}
if self.CID == THREAD.getID() then -- Host Call
for _, link in pairs(self.links) do
link:push {self.TRIG, args}
end
else
self.subscribe:push {self.TRIG, args}
end
end
function c:init()
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
thread:newThread("STC_CONN_MAN"..name,function()
local item
local link_self_ref = multi:newSystemThreadedQueue()
self.subscribe:push{self.CONN, link_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if item[2].Name ~= link_self_ref.Name then
table.insert(self.links, item[2])
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end)
return self
end
GLOBAL[name] = c
return c
end

View File

@ -1,20 +1,76 @@
package.path = "./?/init.lua;?.lua;lua5.4/share/lua/?/init.lua;lua5.4/share/lua/?.lua;"..package.path package.path = "./?/init.lua;?.lua;lua5.4/share/lua/5.4/?/init.lua;lua5.4/share/lua/5.4/?.lua;"--..package.path
package.cpath = "lua5.4/lib/lua/?/core.dll;"..package.cpath package.cpath = "lua5.4/lib/lua/5.4/?/core.dll;"--..package.cpath
multi, thread = require("multi"):init{print=true} multi, thread = require("multi"):init{print=true}
GLOBAL, THREAD = require("multi.integration.lanesManager"):init() GLOBAL, THREAD = require("multi.integration.lanesManager"):init()
test = THREAD:newFunction(function() local conn = multi:newSystemThreadedConnection("conn"):init()
PNT()
return 1,2 multi:newSystemThread("Thread_Test_1",function()
end,true) local multi, thread = require("multi"):init()
multi:newThread(function() local conn = GLOBAL["conn"]:init()
while true do local console = THREAD.getConsole()
print("...") conn(function()
thread.sleep(1) console.print(THREAD:getName().." was triggered!")
end end)
multi:mainloop()
end)
multi:newSystemThread("Thread_Test_2",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
local console = THREAD.getConsole()
conn(function(a,b,c)
console.print(THREAD:getName().." was triggered!",a,b,c)
end)
multi:newAlarm(3):OnRing(function()
console.print("Fire 2!!!")
conn:Fire(4,5,6)
--THREAD.kill()
end)
multi:mainloop()
end)
conn(function(a,b,c)
print("Mainloop conn got triggered!",a,b,c)
end)
alarm = multi:newAlarm(1)
alarm:OnRing(function()
print("Fire 1!!!")
conn:Fire(1,2,3)
end)
alarm = multi:newAlarm(3):OnRing(function()
multi:newSystemThread("Thread_Test_3",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
local console = THREAD.getConsole()
conn(function(a,b,c)
console.print(THREAD:getName().." was triggered!",a,b,c)
end)
multi:newAlarm(4):OnRing(function()
console.print("Fire 3!!!")
conn:Fire(7,8,9)
end)
multi:mainloop()
end)
end)
multi:newSystemThread("Thread_Test_4",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
local conn2 = multi:newConnection()
local console = THREAD.getConsole()
multi:newAlarm(2):OnRing(function()
conn2:Fire()
end)
multi:newThread(function()
console.print("Conn Test!")
thread.hold(conn + conn2)
console.print("It held!")
end)
multi:mainloop()
end) end)
multi:newAlarm(.1):OnRing(function() os.exit() end)
print(test())
print("Hi!")
multi:mainloop() multi:mainloop()