From 772095431f1fc3e8709557a06c80b462f6cb2e08 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Tue, 20 Sep 2022 16:52:51 -0400 Subject: [PATCH] Some changes to extensions, working on STC --- changes.md | 4 +- multi/init.lua | 5 +- multi/integration/lanesManager/extensions.lua | 4 + multi/integration/lanesManager/init.lua | 1 + multi/integration/loveManager/extensions.lua | 6 +- multi/integration/lovrManager/extensions.lua | 6 +- test.lua | 74 +++++++++++++------ test2.lua | 33 +++++++++ 8 files changed, 103 insertions(+), 30 deletions(-) create mode 100644 test2.lua diff --git a/changes.md b/changes.md index bd5bcae..5c5621b 100644 --- a/changes.md +++ b/changes.md @@ -21,6 +21,8 @@ Added Loops through the table and calls callback_function with each element of the array. +- If a name is not supplied when creating threads; a name is randomly generated. Unless sending through an established channel/queue you might not be able to easily init the object. + Changed --- - `Connection:[connect, hasConnections, getConnection]` changed to be `Connection:[Connect, HasConnections, getConnections]`. This was done in an attempt to follow a consistent naming scheme. The old methods still will work to prevent old code breaking. @@ -32,7 +34,7 @@ Removed Fixed --- -- +- SystemThreaded Objects variables weren't consistent. ToDo --- diff --git a/multi/init.lua b/multi/init.lua index fe2f078..e6caf78 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -1254,9 +1254,8 @@ local startme_len = 0 function thread:newThread(name,func,...) multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called local func = func or name - - if type(name) == "function" then - name = "Thread#"..threadCount + if func == name then + name = name or multi.randomString(16) end local c={nil,nil,nil,nil,nil,nil,nil} local env = {self=c} diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index 15f5bb1..54b121a 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -24,7 +24,9 @@ SOFTWARE. local multi, thread = require("multi"):init() local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} + c.Name = name c.linda = lanes.linda() function c:push(v) self.linda:send("Q", v) @@ -43,8 +45,10 @@ function multi:newSystemThreadedQueue(name) end function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} c.link = lanes.linda() + c.Name = name setmetatable(c,{ __index = function(t,k) return c.link:get(k) diff --git a/multi/integration/lanesManager/init.lua b/multi/integration/lanesManager/init.lua index 4bada5a..b016699 100644 --- a/multi/integration/lanesManager/init.lua +++ b/multi/integration/lanesManager/init.lua @@ -62,6 +62,7 @@ function THREAD:newFunction(func,holdme) end function multi:newSystemThread(name, func, ...) + local name = name or multi.randomString(16) multi.InitSystemThreadErrorHandler() local rand = math.random(1, 10000000) local return_linda = lanes.linda() diff --git a/multi/integration/loveManager/extensions.lua b/multi/integration/loveManager/extensions.lua index cf6beb2..9d6fa7f 100644 --- a/multi/integration/loveManager/extensions.lua +++ b/multi/integration/loveManager/extensions.lua @@ -27,6 +27,7 @@ local multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} c.Name = name local fRef = {"func",nil} @@ -64,10 +65,11 @@ function multi:newSystemThreadedQueue(name) return c end function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} - c.name = name + c.Name = name function c:init() - return THREAD.createTable(self.name) + return THREAD.createTable(self.Name) end THREAD.package(name,c) return c diff --git a/multi/integration/lovrManager/extensions.lua b/multi/integration/lovrManager/extensions.lua index 7032b1d..b2082da 100644 --- a/multi/integration/lovrManager/extensions.lua +++ b/multi/integration/lovrManager/extensions.lua @@ -25,6 +25,7 @@ local multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} c.Name = name local fRef = {"func",nil} @@ -62,10 +63,11 @@ function multi:newSystemThreadedQueue(name) return c end function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} - c.name = name + c.Name = name function c:init() - return THREAD.createTable(self.name) + return THREAD.createTable(self.Name) end THREAD.package(name,c) return c diff --git a/test.lua b/test.lua index bba1341..87c2e00 100644 --- a/test.lua +++ b/test.lua @@ -3,59 +3,89 @@ package.cpath = "lua5.4/lib/lua/?/core.dll;"..package.cpath multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() - - function multi:newSystemThreadedConnection(name,...) local c = {} c.CONN = 0x00 c.TRIG = 0x01 c.PING = 0x02 c.PONG = 0x03 - local proxy_conn = multi:newConnection(...) + c.proxy_conn = multi:newConnection(...) + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end local name = name or multi.randomString(16) + c.Name = name local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. local funcs = {} setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects - c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name):init() -- Incoming subscriptions - multi:newThread("STC_"..name,function() + c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..name):init() -- Incoming subscriptions + multi:newThread("STC_SUB_MAN"..name,function() + local item while true do thread.yield() - local item = c.subscribe:pop() -- We need to check on broken connections - -- c:Ping() - -- - if item ~= nil then - connections[#connections+1] = item - multi.ForEach(funcs, function(link) -- Sync new connections - item:push{c.CONN, link} - end) - thread.skip(multi.Priority_Normal) -- Usually a bunch of threads subscribe close to the same time. Process those by ensuring that they come alive around the same time - -- I'm using these "Constants" since they may change with other releases and this should allow these functions to adjust with them. - end + c:Ping() -- Should return instantlly and process this in another thread + thread.hold(function() + item = c.subscribe:peek() + if item ~= nil and item[1] == c.CONN then + c.subscribe:pop() + connections[#connections+1] = item + multi.ForEach(funcs, function(link) -- Sync new connections + item:push{c.CONN, link} + end) + end + -- Nil return keeps this hold going until timeout + end,{cycles=multi.Priority_Normal}) + -- Usually a bunch of threads subscribe close to the same time. + -- Give those threads some time to ready up. end end) - function c:Ping() -- Threaded Function call, can use thread.* - -- - end + c.Ping = thread:newFunction(function(self) + c.Ping:Pause() -- Don't allow this function to be called more than once + local pings = {} + multi.ForEach(funcs, function(link) -- Sync new connections + link:push{self.PING} + end) + thread.hold(function() + item = self.subscribe:peek() + if item ~= nil and item[1] == self.PONG then + table.insert(pings,item[2]) + end + end,{sleep=3}) -- Give all threads time to respond to the ping + -- We waited long enough for a response, anything that did not respond gets removed + remove(funcs, pings) + c.Ping:Resume() + end,false) function c:Fire(...) - -- + master_conn:Fire(...) + multi.ForEach(funcs, function(link) -- Sync new connections + link:push{self.TRIG,{...}} + end) end function c:Sync(link) -- end function c:Connect(func) local conn_func = func + self.proxy_conn(func) proxy_conn(function() funcs[#funcs+1] = func -- Used for syncing new connections to this connection later on - multi.ForEach(c.connections, function(link) + multi.ForEach(self.connections, function(link) link:push{CONN, func} end) end) end function c:init() + self.proxy_conn return self end - GLOBAL[name or "_"] = c + GLOBAL[name] = c return c end diff --git a/test2.lua b/test2.lua new file mode 100644 index 0000000..5603708 --- /dev/null +++ b/test2.lua @@ -0,0 +1,33 @@ +function difference(a, b) + local ai = {} + local r = {} + local rr = {} + for k,v in pairs(a) do r[k] = v; ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(rr,r[k]) end + end + return rr +end +function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r +end + +function printtab(tab,msg) + print(msg or "TABLE") + for i,v in pairs(tab) do + print(i, v) + end + print("") +end + +local tab1 = {1,2,3,4,5} +local tab2 = {3,4,5,6,7} +tab1 = remove(tab1,tab2) +printtab(tab1, "Table 1") +printtab(tab2, "Table 2")