diff --git a/changes.md b/changes.md index 5c5621b..d6cef4f 100644 --- a/changes.md +++ b/changes.md @@ -7,12 +7,84 @@ Table of contents # Update 15.3.0 - A world of connection Full Update Showcase +```lua +multi, thread = require("multi"):init{print=true} +GLOBAL, THREAD = require("multi.integration.lanesManager"):init() + +local conn = multi:newSystemThreadedConnection("conn"):init() + +multi:newSystemThread("Thread_Test_1",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function() + print(THREAD:getName().." was triggered!") + end) + multi:mainloop() +end) + +multi:newSystemThread("Thread_Test_2",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 2!!!") + conn:Fire(4,5,6) + THREAD.kill() + end) + + multi:mainloop() +end) + +conn(function(a,b,c) + print("Mainloop conn got triggered!",a,b,c) +end) + +alarm = multi:newAlarm(1) +alarm:OnRing(function() + print("Fire 1!!!") + conn:Fire(1,2,3) +end) + +alarm = multi:newAlarm(3):OnRing(function() + multi:newSystemThread("Thread_Test_3",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 3!!!") + conn:Fire(7,8,9) + end) + multi:mainloop() + end) +end) + +multi:newSystemThread("Thread_Test_4",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local conn2 = multi:newConnection() + multi:newAlarm(2):OnRing(function() + conn2:Fire() + end) + multi:newThread(function() + print("Conn Test!") + thread.hold(conn + conn2) + print("It held!") + end) + multi:mainloop() +end) + +multi:mainloop() +``` Added --- - `multi:newSystemThreadedConnection()` - Allows one to trigger connection events across threads. + Allows one to trigger connection events across threads. Works like how any connection would work. Supports all of the features, can even be `added` with non SystemThreadedConnections as demonstrated in the full showcase. - `multi:newConnection():SetHelper(func)` Sets the helper function that the connection object uses when creating connection links. diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index 54b121a..e0add79 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -22,7 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] local multi, thread = require("multi"):init() -local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD +if not (GLOBAL and THREAD) then + local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD +else + lanes = require("lanes") +end function multi:newSystemThreadedQueue(name) local name = name or multi.randomString(16) local c = {} @@ -174,4 +178,121 @@ function multi:newSystemThreadedJobQueue(n) end,i).priority = thread.Priority_Core end return c -end \ No newline at end of file +end + +function multi:newSystemThreadedConnection(name) + local name = name or multi.randomString(16) + local c = {} + c.CONN = 0x00 + c.TRIG = 0x01 + c.PING = 0x02 + c.PONG = 0x03 + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end + c.CID = THREAD.getID() + c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() + c.Name = name + c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + -- Locals will only live in the thread that creates the original object + local ping + local pong = function(link, links) + local res = thread.hold(function() + return link:peek()[1] == c.PONG + end,{sleep=3}) + + if not res then + self.links = remove(self.links, pings) + else + link:pop() + end + end + + ping = thread:newFunction(function(self) + ping:Pause() + multi.ForEach(self.links, function(link) -- Sync new connections + link:push{self.PING} + multi:newThread("pong Thread", pong, link, links) + end) + + thread.sleep(3) + + ping:Resume() + end,false) + + thread:newThread("STC_SUB_MAN"..name,function() + local item + while true do + thread.yield() + -- We need to check on broken connections + ping(c) -- Should return instantlly and process this in another thread + item = thread.hold(function() -- This will keep things held up until there is something to process + return c.subscribe:pop() + end) + if item[1] == c.CONN then + multi.ForEach(c.links, function(link) -- Sync new connections + item[2]:push{c.CONN, link} + end) + c.links[#c.links+1] = item[2] + elseif item[1] == c.TRIG then + c:Fire(unpack(item[2])) + c.proxy_conn:Fire(unpack(item[2])) + end + end + end) + --- ^^^ This will only exist in the init thread + + function c:Fire(...) + local args = {...} + if self.CID == THREAD.getID() then -- Host Call + for _, link in pairs(self.links) do + link:push {self.TRIG, args} + end + else + self.subscribe:push {self.TRIG, args} + end + end + + function c:init() + local multi, thread = require("multi"):init() + self.links = {} + self.proxy_conn = multi:newConnection() + local mt = getmetatable(self.proxy_conn) + setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + thread:newThread("STC_CONN_MAN"..name,function() + local item + local link_self_ref = multi:newSystemThreadedQueue() + self.subscribe:push{self.CONN, link_self_ref} + while true do + item = thread.hold(function() + return link_self_ref:peek() + end) + if item[1] == self.PING then + link_self_ref:push{self.PONG} + link_self_ref:pop() + elseif item[1] == self.CONN then + if item[2].Name ~= link_self_ref.Name then + table.insert(self.links, item[2]) + end + link_self_ref:pop() + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(unpack(item[2])) + link_self_ref:pop() + else + -- This shouldn't be the case + end + end + end) + return self + end + + GLOBAL[name] = c + + return c +end \ No newline at end of file diff --git a/multi/integration/lanesManager/init.lua b/multi/integration/lanesManager/init.lua index b016699..3653b55 100644 --- a/multi/integration/lanesManager/init.lua +++ b/multi/integration/lanesManager/init.lua @@ -88,6 +88,7 @@ function multi:newSystemThread(name, func, ...) }, priority=c.priority },function(...) + require("multi.integration.lanesManager.extensions") local has_error = true return_linda:set("returns",{func(...)}) has_error = false diff --git a/test.lua b/test.lua index d7bcd66..5fdd687 100644 --- a/test.lua +++ b/test.lua @@ -3,128 +3,70 @@ package.cpath = "lua5.2/lib/lua/5.2/?/core.dll;" multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() -function multi:newSystemThreadedConnection(name) - local name = name or multi.randomString(16) - local c = {} - c.CONN = 0x00 - c.TRIG = 0x01 - c.PING = 0x02 - c.PONG = 0x03 - local function remove(a, b) - local ai = {} - local r = {} - for k,v in pairs(a) do ai[v]=true end - for k,v in pairs(b) do - if ai[v]==nil then table.insert(r,a[k]) end - end - return r - end - c.Name = name - c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. - local ping - ping = thread:newFunction(function(self) - ping:Pause() -- Don't allow this function to be called until the first instance is done - local pings = {} - local count = #self.links - multi.ForEach(self.links, function(link) -- Sync new connections - link:push{self.PING} - end) - thread.hold(function() - item = self.subscribe:peek() - if item ~= nil and item[1] == self.PONG then - table.insert(pings,item[2]) - end - if #pings==count then - return true -- If we get all pings give control back - end - end,{sleep=3}) -- Give all threads time to respond to the ping - -- We waited long enough for a response, anything that did not respond gets removed - self.links = remove(self.links, pings) - ping:Resume() - end,false) - - thread:newThread("STC_SUB_MAN"..name,function() - local item - while true do - thread.yield() - -- We need to check on broken connections - ping() -- Should return instantlly and process this in another thread - thread.hold(function() - item = c.subscribe:peek() - if item ~= nil and item[1] == c.CONN then - c.subscribe:pop() - multi.ForEach(c.links, function(link) -- Sync new connections - item[2]:push{c.CONN, link} - end) - c.links[#c.links+1] = item - end - -- Nil return keeps this hold going until timeout - end,{cycles=multi.Priority_Normal}) - -- Usually a bunch of threads subscribe close to the same time. - end - end) - - function c:Fire(...) - self.proxy_conn:Fire(...) - local args = {...} - multi.ForEach(self.links, function(link) -- Sync new connections - print(link[2]) -- Bugs everywhere - for i,v in pairs(link[2]) do print(i,v) end - link[2]:push{self.TRIG, args} - end) - end - - function c:init() - self.links = {} - self.proxy_conn = multi:newConnection() - setmetatable(self, {__index = self.proxy_conn}) - self.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() -- Incoming subscriptions - thread:newThread("STC_CONN_MAN"..name,function() - local item - local link_self_ref = multi:newSystemThreadedQueue() - self.subscribe:push{self.CONN,link_self_ref} - while true do - item = thread.hold(function() - return self.subscribe:peek() - end) - if item[1] == self.PING then - self.subscribe:push{self.PONG, link_self_ref} - elseif item[1] == self.CONN then - if item[2] ~= link_self_ref then - table.insert(self.links, item[2]) - end - elseif item[1] == self.TRIG then - self.proxy_conn:Fire(unpack(item[2])) - else - -- This shouldn't be the case - end - end - end) - return self - end - - GLOBAL[name] = c - - return c -end - local conn = multi:newSystemThreadedConnection("conn"):init() -multi:newSystemThread("Test",function() + +multi:newSystemThread("Thread_Test_1",function() local multi, thread = require("multi"):init() - local conn = THREAD.waitFor("conn"):init() + local conn = GLOBAL["conn"]:init() conn(function() - print("Thread was triggered!") + print(THREAD:getName().." was triggered!") end) multi:mainloop() end) --- conn(function() --- print("Mainloop conn got triggered!") --- end) + +multi:newSystemThread("Thread_Test_2",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 2!!!") + conn:Fire(4,5,6) + THREAD.kill() + end) + + multi:mainloop() +end) + +conn(function(a,b,c) + print("Mainloop conn got triggered!",a,b,c) +end) alarm = multi:newAlarm(1) -alarm:OnRing(function() - print("Ring") - conn:Fire() +alarm:OnRing(function() + print("Fire 1!!!") + conn:Fire(1,2,3) +end) + +alarm = multi:newAlarm(3):OnRing(function() + multi:newSystemThread("Thread_Test_3",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 3!!!") + conn:Fire(7,8,9) + end) + multi:mainloop() + end) +end) + +multi:newSystemThread("Thread_Test_4",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local conn2 = multi:newConnection() + multi:newAlarm(2):OnRing(function() + conn2:Fire() + end) + multi:newThread(function() + print("Conn Test!") + thread.hold(conn + conn2) + print("It held!") + end) + multi:mainloop() end) multi:mainloop() \ No newline at end of file