Working with lanes, todo implement for love2d and test. 'Should work' the way it is with love2d

This commit is contained in:
Ryan Ward 2022-09-21 23:16:38 -04:00
parent 62e05788d2
commit 7bbb217018
4 changed files with 253 additions and 117 deletions

View File

@ -7,12 +7,84 @@ Table of contents
# Update 15.3.0 - A world of connection
Full Update Showcase
```lua
multi, thread = require("multi"):init{print=true}
GLOBAL, THREAD = require("multi.integration.lanesManager"):init()
local conn = multi:newSystemThreadedConnection("conn"):init()
multi:newSystemThread("Thread_Test_1",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
conn(function()
print(THREAD:getName().." was triggered!")
end)
multi:mainloop()
end)
multi:newSystemThread("Thread_Test_2",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
conn(function(a,b,c)
print(THREAD:getName().." was triggered!",a,b,c)
end)
multi:newAlarm(2):OnRing(function()
print("Fire 2!!!")
conn:Fire(4,5,6)
THREAD.kill()
end)
multi:mainloop()
end)
conn(function(a,b,c)
print("Mainloop conn got triggered!",a,b,c)
end)
alarm = multi:newAlarm(1)
alarm:OnRing(function()
print("Fire 1!!!")
conn:Fire(1,2,3)
end)
alarm = multi:newAlarm(3):OnRing(function()
multi:newSystemThread("Thread_Test_3",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
conn(function(a,b,c)
print(THREAD:getName().." was triggered!",a,b,c)
end)
multi:newAlarm(2):OnRing(function()
print("Fire 3!!!")
conn:Fire(7,8,9)
end)
multi:mainloop()
end)
end)
multi:newSystemThread("Thread_Test_4",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
local conn2 = multi:newConnection()
multi:newAlarm(2):OnRing(function()
conn2:Fire()
end)
multi:newThread(function()
print("Conn Test!")
thread.hold(conn + conn2)
print("It held!")
end)
multi:mainloop()
end)
multi:mainloop()
```
Added
---
- `multi:newSystemThreadedConnection()`
Allows one to trigger connection events across threads.
Allows one to trigger connection events across threads. Works like how any connection would work. Supports all of the features, can even be `added` with non SystemThreadedConnections as demonstrated in the full showcase.
- `multi:newConnection():SetHelper(func)`
Sets the helper function that the connection object uses when creating connection links.

View File

@ -22,7 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi, thread = require("multi"):init()
local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD
if not (GLOBAL and THREAD) then
local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD
else
lanes = require("lanes")
end
function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16)
local c = {}
@ -174,4 +178,121 @@ function multi:newSystemThreadedJobQueue(n)
end,i).priority = thread.Priority_Core
end
return c
end
end
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.CID = THREAD.getID()
c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init()
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object
local ping
local pong = function(link, links)
local res = thread.hold(function()
return link:peek()[1] == c.PONG
end,{sleep=3})
if not res then
self.links = remove(self.links, pings)
else
link:pop()
end
end
ping = thread:newFunction(function(self)
ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections
link:push{self.PING}
multi:newThread("pong Thread", pong, link, links)
end)
thread.sleep(3)
ping:Resume()
end,false)
thread:newThread("STC_SUB_MAN"..name,function()
local item
while true do
thread.yield()
-- We need to check on broken connections
ping(c) -- Should return instantlly and process this in another thread
item = thread.hold(function() -- This will keep things held up until there is something to process
return c.subscribe:pop()
end)
if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections
item[2]:push{c.CONN, link}
end)
c.links[#c.links+1] = item[2]
elseif item[1] == c.TRIG then
c:Fire(unpack(item[2]))
c.proxy_conn:Fire(unpack(item[2]))
end
end
end)
--- ^^^ This will only exist in the init thread
function c:Fire(...)
local args = {...}
if self.CID == THREAD.getID() then -- Host Call
for _, link in pairs(self.links) do
link:push {self.TRIG, args}
end
else
self.subscribe:push {self.TRIG, args}
end
end
function c:init()
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add})
thread:newThread("STC_CONN_MAN"..name,function()
local item
local link_self_ref = multi:newSystemThreadedQueue()
self.subscribe:push{self.CONN, link_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if item[2].Name ~= link_self_ref.Name then
table.insert(self.links, item[2])
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end)
return self
end
GLOBAL[name] = c
return c
end

View File

@ -88,6 +88,7 @@ function multi:newSystemThread(name, func, ...)
},
priority=c.priority
},function(...)
require("multi.integration.lanesManager.extensions")
local has_error = true
return_linda:set("returns",{func(...)})
has_error = false

170
test.lua
View File

@ -3,128 +3,70 @@ package.cpath = "lua5.2/lib/lua/5.2/?/core.dll;"
multi, thread = require("multi"):init{print=true}
GLOBAL, THREAD = require("multi.integration.lanesManager"):init()
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
local ping
ping = thread:newFunction(function(self)
ping:Pause() -- Don't allow this function to be called until the first instance is done
local pings = {}
local count = #self.links
multi.ForEach(self.links, function(link) -- Sync new connections
link:push{self.PING}
end)
thread.hold(function()
item = self.subscribe:peek()
if item ~= nil and item[1] == self.PONG then
table.insert(pings,item[2])
end
if #pings==count then
return true -- If we get all pings give control back
end
end,{sleep=3}) -- Give all threads time to respond to the ping
-- We waited long enough for a response, anything that did not respond gets removed
self.links = remove(self.links, pings)
ping:Resume()
end,false)
thread:newThread("STC_SUB_MAN"..name,function()
local item
while true do
thread.yield()
-- We need to check on broken connections
ping() -- Should return instantlly and process this in another thread
thread.hold(function()
item = c.subscribe:peek()
if item ~= nil and item[1] == c.CONN then
c.subscribe:pop()
multi.ForEach(c.links, function(link) -- Sync new connections
item[2]:push{c.CONN, link}
end)
c.links[#c.links+1] = item
end
-- Nil return keeps this hold going until timeout
end,{cycles=multi.Priority_Normal})
-- Usually a bunch of threads subscribe close to the same time.
end
end)
function c:Fire(...)
self.proxy_conn:Fire(...)
local args = {...}
multi.ForEach(self.links, function(link) -- Sync new connections
print(link[2]) -- Bugs everywhere
for i,v in pairs(link[2]) do print(i,v) end
link[2]:push{self.TRIG, args}
end)
end
function c:init()
self.links = {}
self.proxy_conn = multi:newConnection()
setmetatable(self, {__index = self.proxy_conn})
self.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() -- Incoming subscriptions
thread:newThread("STC_CONN_MAN"..name,function()
local item
local link_self_ref = multi:newSystemThreadedQueue()
self.subscribe:push{self.CONN,link_self_ref}
while true do
item = thread.hold(function()
return self.subscribe:peek()
end)
if item[1] == self.PING then
self.subscribe:push{self.PONG, link_self_ref}
elseif item[1] == self.CONN then
if item[2] ~= link_self_ref then
table.insert(self.links, item[2])
end
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(unpack(item[2]))
else
-- This shouldn't be the case
end
end
end)
return self
end
GLOBAL[name] = c
return c
end
local conn = multi:newSystemThreadedConnection("conn"):init()
multi:newSystemThread("Test",function()
multi:newSystemThread("Thread_Test_1",function()
local multi, thread = require("multi"):init()
local conn = THREAD.waitFor("conn"):init()
local conn = GLOBAL["conn"]:init()
conn(function()
print("Thread was triggered!")
print(THREAD:getName().." was triggered!")
end)
multi:mainloop()
end)
-- conn(function()
-- print("Mainloop conn got triggered!")
-- end)
multi:newSystemThread("Thread_Test_2",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
conn(function(a,b,c)
print(THREAD:getName().." was triggered!",a,b,c)
end)
multi:newAlarm(2):OnRing(function()
print("Fire 2!!!")
conn:Fire(4,5,6)
THREAD.kill()
end)
multi:mainloop()
end)
conn(function(a,b,c)
print("Mainloop conn got triggered!",a,b,c)
end)
alarm = multi:newAlarm(1)
alarm:OnRing(function()
print("Ring")
conn:Fire()
alarm:OnRing(function()
print("Fire 1!!!")
conn:Fire(1,2,3)
end)
alarm = multi:newAlarm(3):OnRing(function()
multi:newSystemThread("Thread_Test_3",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
conn(function(a,b,c)
print(THREAD:getName().." was triggered!",a,b,c)
end)
multi:newAlarm(2):OnRing(function()
print("Fire 3!!!")
conn:Fire(7,8,9)
end)
multi:mainloop()
end)
end)
multi:newSystemThread("Thread_Test_4",function()
local multi, thread = require("multi"):init()
local conn = GLOBAL["conn"]:init()
local conn2 = multi:newConnection()
multi:newAlarm(2):OnRing(function()
conn2:Fire()
end)
multi:newThread(function()
print("Conn Test!")
thread.hold(conn + conn2)
print("It held!")
end)
multi:mainloop()
end)
multi:mainloop()