From a708fb3f83a8eb0ea1b69fe68dc98c9834679e7e Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Wed, 19 Jan 2022 23:13:58 -0500 Subject: [PATCH 1/9] testing stuff --- multi/integration/networkManager/init.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multi/integration/networkManager/init.lua b/multi/integration/networkManager/init.lua index 98f9831..f6778d8 100644 --- a/multi/integration/networkManager/init.lua +++ b/multi/integration/networkManager/init.lua @@ -23,7 +23,7 @@ SOFTWARE. ]] local multi, thread = require("multi"):init() local net = require("net") -local bin = require("bin") +--local bin = require("bin") local char = string.char local byte = string.byte bin.setBitsInterface(infinabits) -- 2.43.0 From 7b70838567b45ad4628fb8a8e6a08efb4ca85272 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 23 Apr 2022 21:07:56 -0400 Subject: [PATCH 2/9] Planning stuff --- test.lua | 106 ++----------------------------------------------------- 1 file changed, 2 insertions(+), 104 deletions(-) diff --git a/test.lua b/test.lua index af0c2cb..1616eb4 100644 --- a/test.lua +++ b/test.lua @@ -2,110 +2,8 @@ package.path = "./?/init.lua;"..package.path multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.threading"):init() --- Using a system thread, but both system and local threads support this! --- Don't worry if you don't have lanes or love2d. PesudoThreading will kick in to emulate the threading features if you do not have access to system threading. -func = THREAD:newFunction(function(count) - print("Starting Status test: ",count) - local a = 0 - while true do - a = a + 1 - THREAD.sleep(.1) - -- Push the status from the currently running threaded function to the main thread - THREAD.pushStatus(a,count) - if a == count then break end - end - return "Done" -end) - -thread:newThread("test",function() - local ret = func(10) - ret.OnStatus(function(part,whole) - print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%") - end) - print("TEST",func(5).wait()) - -- The results from the OnReturn connection is passed by thread.hold - print("Status:",thread.hold(ret.OnReturn)) - print("Function Done!") -end).OnError(function(...) - print("Error:",...) -end) - -local ret = func(10) -local ret2 = func(15) -local ret3 = func(20) -local s1,s2,s3 = 0,0,0 -ret.OnError(function(...) - print("Error:",...) -end) -ret2.OnError(function(...) - print("Error:",...) -end) -ret3.OnError(function(...) - print("Error:",...) -end) -ret.OnStatus(function(part,whole) - s1 = math.ceil((part/whole)*1000)/10 - print(s1) -end) -ret2.OnStatus(function(part,whole) - s2 = math.ceil((part/whole)*1000)/10 - print(s2) -end) -ret3.OnStatus(function(part,whole) - s3 = math.ceil((part/whole)*1000)/10 - print(s3) -end) - -loop = multi:newTLoop() - -function loop:testing() - print("testing haha") +function multi:newSystemThreadedConnection() + -- end -loop:Set(1) -t = loop:OnLoop(function() - print("Looping...") -end):testing() - -local proc = multi:newProcessor("Test") -local proc2 = multi:newProcessor("Test2") -local proc3 = proc2:newProcessor("Test3") -proc.Start() -proc2.Start() -proc3.Start() -proc:newThread("TestThread_1",function() - while true do - thread.sleep(1) - end -end) -proc:newThread("TestThread_2",function() - while true do - thread.sleep(1) - end -end) -proc2:newThread("TestThread_3",function() - while true do - thread.sleep(1) - end -end) - -thread:newThread(function() - thread.sleep(1) - local tasks = multi:getStats() - - for i,v in pairs(tasks) do - print("Process: " ..i.. "\n\tTasks:") - for ii,vv in pairs(v.tasks) do - print("\t\t"..vv:getName()) - end - print("\tThreads:") - for ii,vv in pairs(v.threads) do - print("\t\t"..vv:getName()) - end - end - - thread.sleep(10) -- Wait 10 seconds then kill the process! - os.exit() -end) - multi:mainloop() \ No newline at end of file -- 2.43.0 From 9050e65d93fcee4691a5ae8fb6e9d1ed63d490da Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 18 Sep 2022 09:33:13 -0400 Subject: [PATCH 3/9] Back and making progress --- README.md | 7 +- changes.md | 31 +++++++ multi/init.lua | 91 ++++++++++--------- multi/integration/lanesManager/extensions.lua | 2 + test.lua | 26 +++++- 5 files changed, 111 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index b1abc29..131db6a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Multi Version: 15.3.0 +# Multi Version: 15.3.0 Connecting the dots **Key Changes** - @@ -16,8 +16,9 @@ Progress is being made in [v15.3.0](https://github.com/rayaman/multi/tree/v15.3. INSTALLING ---------- Link to optional dependencies: -[lanes](https://github.com/LuaLanes/lanes) -[love2d](https://love2d.org/) +- [lanes](https://github.com/LuaLanes/lanes) + +- [love2d](https://love2d.org/) To install copy the multi folder into your environment and you are good to go
If you want to use the system threads, then you'll need to install lanes or love2d game engine! diff --git a/changes.md b/changes.md index 77368e5..7457dfe 100644 --- a/changes.md +++ b/changes.md @@ -3,6 +3,37 @@ Table of contents --- [Update 15.2.0 - Upgrade Complete](#update-1520---upgrade-complete)
[Update 15.1.0 - Hold the thread!](#update-1510---hold-the-thread)
[Update 15.0.0 - The art of faking it](#update-1500---the-art-of-faking-it)
[Update 14.2.0 - Bloatware Removed](#update-1420---bloatware-removed)
[Update 14.1.0 - A whole new world of possibilities](#update-1410---a-whole-new-world-of-possibilities)
[Update 14.0.0 - Consistency, Additions and Stability](#update-1400---consistency-additions-and-stability)
[Update 13.1.0 - Bug fixes and features added](#update-1310---bug-fixes-and-features-added)
[Update 13.0.0 - Added some documentation, and some new features too check it out!](#update-1300---added-some-documentation-and-some-new-features-too-check-it-out)
[Update 12.2.2 - Time for some more bug fixes!](#update-1222---time-for-some-more-bug-fixes)
[Update 12.2.1 - Time for some bug fixes!](#update-1221---time-for-some-bug-fixes)
[Update 12.2.0 - The chains of binding](#update-1220---the-chains-of-binding)
[Update 12.1.0 - Threads just can't hold on anymore](#update-1210---threads-just-cant-hold-on-anymore)
[Update: 12.0.0 - Big update (Lots of additions some changes)](#update-1200---big-update-lots-of-additions-some-changes)
[Update: 1.11.1 - Small Clarification on Love](#update-1111---small-clarification-on-love)
[Update: 1.11.0](#update-1110)
[Update: 1.10.0](#update-1100)
[Update: 1.9.2](#update-192)
[Update: 1.9.1 - Threads can now argue](#update-191---threads-can-now-argue)
[Update: 1.9.0](#update-190)
[Update: 1.8.7](#update-187)
[Update: 1.8.6](#update-186)
[Update: 1.8.5](#update-185)
[Update: 1.8.4](#update-184)
[Update: 1.8.3 - Mainloop recieves some needed overhauling](#update-183---mainloop-recieves-some-needed-overhauling)
[Update: 1.8.2](#update-182)
[Update: 1.8.1](#update-181)
[Update: 1.7.6](#update-176)
[Update: 1.7.5](#update-175)
[Update: 1.7.4](#update-174)
[Update: 1.7.3](#update-173)
[Update: 1.7.2](#update-172)
[Update: 1.7.1 - Bug Fixes Only](#update-171---bug-fixes-only)
[Update: 1.7.0 - Threading the systems](#update-170---threading-the-systems)
[Update: 1.6.0](#update-160)
[Update: 1.5.0](#update-150)
[Update: 1.4.1 (4/10/2017) - First Public release of the library](#update-141-4102017---first-public-release-of-the-library)
[Update: 1.4.0 (3/20/2017)](#update-140-3202017)
[Update: 1.3.0 (1/29/2017)](#update-130-1292017)
[Update: 1.2.0 (12.31.2016)](#update-120-12312016)
[Update: 1.1.0](#update-110)
[Update: 1.0.0](#update-100)
[Update: 0.6.3](#update-063)
[Update: 0.6.2](#update-062)
[Update: 0.6.1-6](#update-061-6)
[Update: 0.5.1-6](#update-051-6)
[Update: 0.4.1](#update-041)
[Update: 0.3.0 - The update that started it all](#update-030---the-update-that-started-it-all)
[Update: EventManager 2.0.0](#update-eventmanager-200)
[Update: EventManager 1.2.0](#update-eventmanager-120)
[Update: EventManager 1.1.0](#update-eventmanager-110)
[Update: EventManager 1.0.0 - Error checking](#update-eventmanager-100---error-checking)
[Version: EventManager 0.0.1 - In The Beginning things were very different](#version-eventmanager-001---in-the-beginning-things-were-very-different) +# Update 15.3.0 - A world of connection + +Full Update Showcase + +Added +--- +- `multi:newSystemThreadedConnection()` + + Allows one to trigger connection events across threads. +- `multi:newConnection():SetHelper(func)` + + Sets the helper function that the connection object uses when creating connection links. + +Changed +--- +- `Connection:[connect, hasConnections, getConnection]` changed to be `Connection:[Connect, HasConnections, getConnections]`. This was done in an attempt to follow a consistent naming scheme. The old methods still will work to prevent old code breaking. + +Removed +--- +- Connection objects methods removed: + - holdUT(), HoldUT() -- With the way `thread.hold(conn)` interacts with connections this method was no longer needed. To emulate this use `multi.hold(conn)`. `multi.hold()` is able to emulate what `thread.hold()` outside of a thread, albeit with some drawbacks. + +Fixed +--- +- + +ToDo +--- + +- Work on network parallelism (I am really excited to start working on this. Not because it will have much use, but because it seems like a cool addition/project to work on. I just need time to actually do work on stuff) + # Update 15.2.0 - Upgrade Complete Full Update Showcase diff --git a/multi/init.lua b/multi/init.lua index cd91633..080c5d9 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -58,15 +58,15 @@ multi.Priority_Very_Low = 16384 multi.Priority_Idle = 65536 multi.PriorityResolve = { - [1]="Core", - [4]="Very High", - [16]="High", - [64]="Above Normal", - [256]="Normal", - [1024]="Below Normal", - [4096]="Low", - [16384]="Very Low", - [65536]="Idle", + [1] = "Core", + [4] = "Very High", + [16] = "High", + [64] = "Above Normal", + [256] = "Normal", + [1024] = "Below Normal", + [4096] = "Low", + [16384] = "Very Low", + [65536] = "Idle", } local PList = {multi.Priority_Core,multi.Priority_Very_High,multi.Priority_High,multi.Priority_Above_Normal,multi.Priority_Normal,multi.Priority_Below_Normal,multi.Priority_Low,multi.Priority_Very_Low,multi.Priority_Idle} @@ -116,19 +116,20 @@ function multi:newConnection(protect,func,kill) local lock = false c.callback = func c.Parent=self + setmetatable(c,{__call=function(self,...) local t = ... if type(t)=="table" then for i,v in pairs(t) do if v==self then - local ref = self:connect(select(2,...)) + local ref = self:Connect(select(2,...)) ref.root_link = select(1,...) return ref end end - return self:connect(...) + return self:Connect(...) else - return self:connect(...) + return self:Connect(...) end end, __add = function(c1,c2) @@ -154,32 +155,18 @@ function multi:newConnection(protect,func,kill) end) return cn end}) + c.Type='connector' c.func={} c.ID=0 local protect=protect or false local connections={} c.FC=0 + function c:hasConnections() return #call_funcs~=0 end - function c:holdUT(n) - local n=n or 0 - self.waiting=true - local count=0 - local id=self:connect(function() - count = count + 1 - if n<=count then - self.waiting=false - end - end) - repeat - self.Parent:uManager() - until self.waiting==false - id:Destroy() - return self - end - c.HoldUT=c.holdUT + function c:getConnection(name,ignore) if ignore then return connections[name] or CRef @@ -187,14 +174,17 @@ function multi:newConnection(protect,func,kill) return connections[name] or self end end + function c:Lock() lock = true return self end + function c:Unlock() lock = false return self end + if protect then function c:Fire(...) if lock then return end @@ -216,48 +206,56 @@ function multi:newConnection(protect,func,kill) end end end + local fast = {} function c:getConnections() return call_funcs end + function c:fastMode() function self:Fire(...) for i=1,#fast do fast[i](...) end end - function self:connect(func) + function self:Connect(func) table.insert(fast,func) end end + function c:Bind(t) local temp = call_funcs call_funcs=t return temp end + function c:Remove() local temp = call_funcs call_funcs={} return temp end + local function conn_helper(self,func,name,num) self.ID=self.ID+1 + if num then table.insert(call_funcs,num,func) else table.insert(call_funcs,1,func) end + local temp = { func=func, Type="connector_link", Parent=self, connect = function(s,...) - return self:connect(...) + return self:Connect(...) end } + setmetatable(temp,{ __call=function(s,...) - return self:connect(...) + return self:Connect(...) end, __index = function(t,k) if rawget(t,"root_link") then @@ -272,6 +270,7 @@ function multi:newConnection(protect,func,kill) rawset(t,k,v) end, }) + function temp:Fire(...) if lock then return end if protect then @@ -283,6 +282,7 @@ function multi:newConnection(protect,func,kill) return call_funcs(...) end end + function temp:Destroy() for i=#call_funcs,1,-1 do if call_funcs[i]~=nil then @@ -294,6 +294,7 @@ function multi:newConnection(protect,func,kill) end end end + if name then connections[name]=temp end @@ -302,7 +303,8 @@ function multi:newConnection(protect,func,kill) end return temp end - function c:connect(...)--func,name,num + + function c:Connect(...)--func,name,num local tab = {...} local funcs={} for i=1,#tab do @@ -320,8 +322,15 @@ function multi:newConnection(protect,func,kill) return conn_helper(self,tab[1],tab[2],tab[3]) end end + + function c:SetHelper(func) + conn_helper = func + end + c.Connect=c.connect c.GetConnection=c.getConnection + c.HasConnections = c.hasConnections + c.GetConnection = c.getConnection if not(ignoreconn) then multi:create(c) end @@ -2043,14 +2052,14 @@ function multi.print(...) end end -multi.GetType=multi.getType -multi.IsPaused=multi.isPaused -multi.IsActive=multi.isActive -multi.Reallocate=multi.Reallocate -multi.ConnectFinal=multi.connectFinal -multi.ResetTime=multi.SetTime -multi.IsDone=multi.isDone -multi.SetName = multi.setName +multi.GetType = multi.getType +multi.IsPaused = multi.isPaused +multi.IsActive = multi.isActive +multi.Reallocate = multi.Reallocate +multi.ConnectFinal = multi.connectFinal +multi.ResetTime = multi.SetTime +multi.IsDone = multi.isDone +multi.SetName = multi.setName -- Special Events local _os = os.exit diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index 198e551..bc37455 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -41,6 +41,7 @@ function multi:newSystemThreadedQueue(name) GLOBAL[name or "_"] = c return c end + function multi:newSystemThreadedTable(name) local c = {} c.link = lanes.linda() @@ -58,6 +59,7 @@ function multi:newSystemThreadedTable(name) GLOBAL[name or "_"] = c return c end + function multi:newSystemThreadedJobQueue(n) local c = {} c.cores = n or THREAD.getCores()*2 diff --git a/test.lua b/test.lua index 1616eb4..dc3361e 100644 --- a/test.lua +++ b/test.lua @@ -2,8 +2,30 @@ package.path = "./?/init.lua;"..package.path multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.threading"):init() -function multi:newSystemThreadedConnection() - -- +function multi:newSystemThreadedConnection(name,...) + local master_conn = multi:newConnection(...) + local c = {} + local name = name or multi.randomString(16) + local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects + c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name) + multi:newThread("STC_"..name,function() + while true do + thread.yield() + local item = c.subscribe:pop() + if item ~= nil then + connections[#connections+1] = item + thread.skip(multi.Priority_Normal) -- Usually a bunch of threads subscribe close to the same time. Process those by ensuring that they come alive around the same time + else -- I'm using these "Constant" values since they may change with other releases and this should allow these functions to adjust with them. + thread.skip(multi.Priority_Idle) + end + end + end) + function c:init() + return self + end + GLOBAL[name or "_"] = c + return c end multi:mainloop() \ No newline at end of file -- 2.43.0 From 8f3eb6d9a35a2266c3d3917394a613e6654c45d9 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 18 Sep 2022 09:36:47 -0400 Subject: [PATCH 4/9] Fixed changelog order --- changes.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/changes.md b/changes.md index 3509c65..8b235e7 100644 --- a/changes.md +++ b/changes.md @@ -4,9 +4,6 @@ Table of contents [Update 15.2.1 - Bug fix](#update-1521---bug-fix)
[Update 15.2.0 - Upgrade Complete](#update-1520---upgrade-complete)
[Update 15.1.0 - Hold the thread!](#update-1510---hold-the-thread)
[Update 15.0.0 - The art of faking it](#update-1500---the-art-of-faking-it)
[Update 14.2.0 - Bloatware Removed](#update-1420---bloatware-removed)
[Update 14.1.0 - A whole new world of possibilities](#update-1410---a-whole-new-world-of-possibilities)
[Update 14.0.0 - Consistency, Additions and Stability](#update-1400---consistency-additions-and-stability)
[Update 13.1.0 - Bug fixes and features added](#update-1310---bug-fixes-and-features-added)
[Update 13.0.0 - Added some documentation, and some new features too check it out!](#update-1300---added-some-documentation-and-some-new-features-too-check-it-out)
[Update 12.2.2 - Time for some more bug fixes!](#update-1222---time-for-some-more-bug-fixes)
[Update 12.2.1 - Time for some bug fixes!](#update-1221---time-for-some-bug-fixes)
[Update 12.2.0 - The chains of binding](#update-1220---the-chains-of-binding)
[Update 12.1.0 - Threads just can't hold on anymore](#update-1210---threads-just-cant-hold-on-anymore)
[Update: 12.0.0 - Big update (Lots of additions some changes)](#update-1200---big-update-lots-of-additions-some-changes)
[Update: 1.11.1 - Small Clarification on Love](#update-1111---small-clarification-on-love)
[Update: 1.11.0](#update-1110)
[Update: 1.10.0](#update-1100)
[Update: 1.9.2](#update-192)
[Update: 1.9.1 - Threads can now argue](#update-191---threads-can-now-argue)
[Update: 1.9.0](#update-190)
[Update: 1.8.7](#update-187)
[Update: 1.8.6](#update-186)
[Update: 1.8.5](#update-185)
[Update: 1.8.4](#update-184)
[Update: 1.8.3 - Mainloop recieves some needed overhauling](#update-183---mainloop-recieves-some-needed-overhauling)
[Update: 1.8.2](#update-182)
[Update: 1.8.1](#update-181)
[Update: 1.7.6](#update-176)
[Update: 1.7.5](#update-175)
[Update: 1.7.4](#update-174)
[Update: 1.7.3](#update-173)
[Update: 1.7.2](#update-172)
[Update: 1.7.1 - Bug Fixes Only](#update-171---bug-fixes-only)
[Update: 1.7.0 - Threading the systems](#update-170---threading-the-systems)
[Update: 1.6.0](#update-160)
[Update: 1.5.0](#update-150)
[Update: 1.4.1 (4/10/2017) - First Public release of the library](#update-141-4102017---first-public-release-of-the-library)
[Update: 1.4.0 (3/20/2017)](#update-140-3202017)
[Update: 1.3.0 (1/29/2017)](#update-130-1292017)
[Update: 1.2.0 (12.31.2016)](#update-120-12312016)
[Update: 1.1.0](#update-110)
[Update: 1.0.0](#update-100)
[Update: 0.6.3](#update-063)
[Update: 0.6.2](#update-062)
[Update: 0.6.1-6](#update-061-6)
[Update: 0.5.1-6](#update-051-6)
[Update: 0.4.1](#update-041)
[Update: 0.3.0 - The update that started it all](#update-030---the-update-that-started-it-all)
[Update: EventManager 2.0.0](#update-eventmanager-200)
[Update: EventManager 1.2.0](#update-eventmanager-120)
[Update: EventManager 1.1.0](#update-eventmanager-110)
[Update: EventManager 1.0.0 - Error checking](#update-eventmanager-100---error-checking)
[Version: EventManager 0.0.1 - In The Beginning things were very different](#version-eventmanager-001---in-the-beginning-things-were-very-different) -# Update 15.2.1 - Bug fix -Fixed issue #41 - # Update 15.3.0 - A world of connection Full Update Showcase @@ -38,6 +35,10 @@ ToDo - Work on network parallelism (I am really excited to start working on this. Not because it will have much use, but because it seems like a cool addition/project to work on. I just need time to actually do work on stuff) +# Update 15.2.1 - Bug fix +Fixed issue #41 +--- + # Update 15.2.0 - Upgrade Complete Full Update Showcase -- 2.43.0 From 1546076c80298ee27964baf7bc1a836f6dd62e62 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 18 Sep 2022 12:22:34 -0400 Subject: [PATCH 5/9] Protection is handled by Fire Funcction --- multi/init.lua | 11 +++-------- multi/integration/lanesManager/extensions.lua | 8 ++++---- test.lua | 19 +++++++++++++++---- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/multi/init.lua b/multi/init.lua index c850a53..2cd2953 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -273,14 +273,7 @@ function multi:newConnection(protect,func,kill) function temp:Fire(...) if lock then return end - if protect then - local t=pcall(call_funcs,...) - if t then - return t - end - else - return call_funcs(...) - end + return call_funcs(...) end function temp:Destroy() @@ -298,9 +291,11 @@ function multi:newConnection(protect,func,kill) if name then connections[name]=temp end + if self.callback then self.callback(temp) end + return temp end diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index bc37455..15f5bb1 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -64,10 +64,10 @@ function multi:newSystemThreadedJobQueue(n) local c = {} c.cores = n or THREAD.getCores()*2 c.OnJobCompleted = multi:newConnection() - local funcs = multi:newSystemThreadedTable() - local queueJob = multi:newSystemThreadedQueue() - local queueReturn = multi:newSystemThreadedQueue() - local doAll = multi:newSystemThreadedQueue() + local funcs = multi:newSystemThreadedTable():init() + local queueJob = multi:newSystemThreadedQueue():init() + local queueReturn = multi:newSystemThreadedQueue():init() + local doAll = multi:newSystemThreadedQueue():init() local ID=1 local jid = 1 function c:isEmpty() diff --git a/test.lua b/test.lua index b5483b9..f92ec6d 100644 --- a/test.lua +++ b/test.lua @@ -4,24 +4,35 @@ multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() function multi:newSystemThreadedConnection(name,...) - local master_conn = multi:newConnection(...) local c = {} + local proxy_conn = multi:newConnection(...) local name = name or multi.randomString(16) local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects - c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name) + c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name):init() -- Incoming subscriptions multi:newThread("STC_"..name,function() while true do - thread.yield() local item = c.subscribe:pop() + -- We need to check on broken connections + -- c:Ping() + -- if item ~= nil then connections[#connections+1] = item thread.skip(multi.Priority_Normal) -- Usually a bunch of threads subscribe close to the same time. Process those by ensuring that they come alive around the same time - else -- I'm using these "Constant" values since they may change with other releases and this should allow these functions to adjust with them. + else -- I'm using these "Constants" since they may change with other releases and this should allow these functions to adjust with them. thread.skip(multi.Priority_Idle) end end end) + function c:Ping() -- Threaded Function call, can use thread.* + -- + end + function c:Fire(...) + -- + end + function c:Connect(...) + -- + end function c:init() return self end -- 2.43.0 From bb7fab0857ebe7a9a6bed60c1e2d9b9619c4e1a8 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 18 Sep 2022 13:56:00 -0400 Subject: [PATCH 6/9] working on systemthreadedconnections --- changes.md | 4 ++++ multi/init.lua | 8 +++++++- test.lua | 25 ++++++++++++++++++++++--- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/changes.md b/changes.md index 8b235e7..bd5bcae 100644 --- a/changes.md +++ b/changes.md @@ -17,6 +17,10 @@ Added Sets the helper function that the connection object uses when creating connection links. +- `multi.ForEach(table, callback_function)` + + Loops through the table and calls callback_function with each element of the array. + Changed --- - `Connection:[connect, hasConnections, getConnection]` changed to be `Connection:[Connect, HasConnections, getConnections]`. This was done in an attempt to follow a consistent naming scheme. The old methods still will work to prevent old code breaking. diff --git a/multi/init.lua b/multi/init.lua index 2cd2953..fe2f078 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -109,6 +109,12 @@ function multi:getStats() end --Helpers +function multi.ForEach(tab,func) + for i=1,#tab do + func(tab[i]) + end +end + local ignoreconn = true function multi:newConnection(protect,func,kill) local c={} @@ -198,6 +204,7 @@ function multi:newConnection(protect,func,kill) end else function c:Fire(...) + if lock then return end for i=#call_funcs,1,-1 do call_funcs[i](...) if kill then @@ -272,7 +279,6 @@ function multi:newConnection(protect,func,kill) }) function temp:Fire(...) - if lock then return end return call_funcs(...) end diff --git a/test.lua b/test.lua index f92ec6d..bba1341 100644 --- a/test.lua +++ b/test.lua @@ -3,24 +3,34 @@ package.cpath = "lua5.4/lib/lua/?/core.dll;"..package.cpath multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() + + function multi:newSystemThreadedConnection(name,...) local c = {} + c.CONN = 0x00 + c.TRIG = 0x01 + c.PING = 0x02 + c.PONG = 0x03 local proxy_conn = multi:newConnection(...) local name = name or multi.randomString(16) local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + local funcs = {} setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name):init() -- Incoming subscriptions multi:newThread("STC_"..name,function() while true do + thread.yield() local item = c.subscribe:pop() -- We need to check on broken connections -- c:Ping() -- if item ~= nil then connections[#connections+1] = item + multi.ForEach(funcs, function(link) -- Sync new connections + item:push{c.CONN, link} + end) thread.skip(multi.Priority_Normal) -- Usually a bunch of threads subscribe close to the same time. Process those by ensuring that they come alive around the same time - else -- I'm using these "Constants" since they may change with other releases and this should allow these functions to adjust with them. - thread.skip(multi.Priority_Idle) + -- I'm using these "Constants" since they may change with other releases and this should allow these functions to adjust with them. end end end) @@ -30,9 +40,18 @@ function multi:newSystemThreadedConnection(name,...) function c:Fire(...) -- end - function c:Connect(...) + function c:Sync(link) -- end + function c:Connect(func) + local conn_func = func + proxy_conn(function() + funcs[#funcs+1] = func -- Used for syncing new connections to this connection later on + multi.ForEach(c.connections, function(link) + link:push{CONN, func} + end) + end) + end function c:init() return self end -- 2.43.0 From 772095431f1fc3e8709557a06c80b462f6cb2e08 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Tue, 20 Sep 2022 16:52:51 -0400 Subject: [PATCH 7/9] Some changes to extensions, working on STC --- changes.md | 4 +- multi/init.lua | 5 +- multi/integration/lanesManager/extensions.lua | 4 + multi/integration/lanesManager/init.lua | 1 + multi/integration/loveManager/extensions.lua | 6 +- multi/integration/lovrManager/extensions.lua | 6 +- test.lua | 74 +++++++++++++------ test2.lua | 33 +++++++++ 8 files changed, 103 insertions(+), 30 deletions(-) create mode 100644 test2.lua diff --git a/changes.md b/changes.md index bd5bcae..5c5621b 100644 --- a/changes.md +++ b/changes.md @@ -21,6 +21,8 @@ Added Loops through the table and calls callback_function with each element of the array. +- If a name is not supplied when creating threads; a name is randomly generated. Unless sending through an established channel/queue you might not be able to easily init the object. + Changed --- - `Connection:[connect, hasConnections, getConnection]` changed to be `Connection:[Connect, HasConnections, getConnections]`. This was done in an attempt to follow a consistent naming scheme. The old methods still will work to prevent old code breaking. @@ -32,7 +34,7 @@ Removed Fixed --- -- +- SystemThreaded Objects variables weren't consistent. ToDo --- diff --git a/multi/init.lua b/multi/init.lua index fe2f078..e6caf78 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -1254,9 +1254,8 @@ local startme_len = 0 function thread:newThread(name,func,...) multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called local func = func or name - - if type(name) == "function" then - name = "Thread#"..threadCount + if func == name then + name = name or multi.randomString(16) end local c={nil,nil,nil,nil,nil,nil,nil} local env = {self=c} diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index 15f5bb1..54b121a 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -24,7 +24,9 @@ SOFTWARE. local multi, thread = require("multi"):init() local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} + c.Name = name c.linda = lanes.linda() function c:push(v) self.linda:send("Q", v) @@ -43,8 +45,10 @@ function multi:newSystemThreadedQueue(name) end function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} c.link = lanes.linda() + c.Name = name setmetatable(c,{ __index = function(t,k) return c.link:get(k) diff --git a/multi/integration/lanesManager/init.lua b/multi/integration/lanesManager/init.lua index 4bada5a..b016699 100644 --- a/multi/integration/lanesManager/init.lua +++ b/multi/integration/lanesManager/init.lua @@ -62,6 +62,7 @@ function THREAD:newFunction(func,holdme) end function multi:newSystemThread(name, func, ...) + local name = name or multi.randomString(16) multi.InitSystemThreadErrorHandler() local rand = math.random(1, 10000000) local return_linda = lanes.linda() diff --git a/multi/integration/loveManager/extensions.lua b/multi/integration/loveManager/extensions.lua index cf6beb2..9d6fa7f 100644 --- a/multi/integration/loveManager/extensions.lua +++ b/multi/integration/loveManager/extensions.lua @@ -27,6 +27,7 @@ local multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} c.Name = name local fRef = {"func",nil} @@ -64,10 +65,11 @@ function multi:newSystemThreadedQueue(name) return c end function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} - c.name = name + c.Name = name function c:init() - return THREAD.createTable(self.name) + return THREAD.createTable(self.Name) end THREAD.package(name,c) return c diff --git a/multi/integration/lovrManager/extensions.lua b/multi/integration/lovrManager/extensions.lua index 7032b1d..b2082da 100644 --- a/multi/integration/lovrManager/extensions.lua +++ b/multi/integration/lovrManager/extensions.lua @@ -25,6 +25,7 @@ local multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD function multi:newSystemThreadedQueue(name) + local name = name or multi.randomString(16) local c = {} c.Name = name local fRef = {"func",nil} @@ -62,10 +63,11 @@ function multi:newSystemThreadedQueue(name) return c end function multi:newSystemThreadedTable(name) + local name = name or multi.randomString(16) local c = {} - c.name = name + c.Name = name function c:init() - return THREAD.createTable(self.name) + return THREAD.createTable(self.Name) end THREAD.package(name,c) return c diff --git a/test.lua b/test.lua index bba1341..87c2e00 100644 --- a/test.lua +++ b/test.lua @@ -3,59 +3,89 @@ package.cpath = "lua5.4/lib/lua/?/core.dll;"..package.cpath multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() - - function multi:newSystemThreadedConnection(name,...) local c = {} c.CONN = 0x00 c.TRIG = 0x01 c.PING = 0x02 c.PONG = 0x03 - local proxy_conn = multi:newConnection(...) + c.proxy_conn = multi:newConnection(...) + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end local name = name or multi.randomString(16) + c.Name = name local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. local funcs = {} setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects - c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name):init() -- Incoming subscriptions - multi:newThread("STC_"..name,function() + c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..name):init() -- Incoming subscriptions + multi:newThread("STC_SUB_MAN"..name,function() + local item while true do thread.yield() - local item = c.subscribe:pop() -- We need to check on broken connections - -- c:Ping() - -- - if item ~= nil then - connections[#connections+1] = item - multi.ForEach(funcs, function(link) -- Sync new connections - item:push{c.CONN, link} - end) - thread.skip(multi.Priority_Normal) -- Usually a bunch of threads subscribe close to the same time. Process those by ensuring that they come alive around the same time - -- I'm using these "Constants" since they may change with other releases and this should allow these functions to adjust with them. - end + c:Ping() -- Should return instantlly and process this in another thread + thread.hold(function() + item = c.subscribe:peek() + if item ~= nil and item[1] == c.CONN then + c.subscribe:pop() + connections[#connections+1] = item + multi.ForEach(funcs, function(link) -- Sync new connections + item:push{c.CONN, link} + end) + end + -- Nil return keeps this hold going until timeout + end,{cycles=multi.Priority_Normal}) + -- Usually a bunch of threads subscribe close to the same time. + -- Give those threads some time to ready up. end end) - function c:Ping() -- Threaded Function call, can use thread.* - -- - end + c.Ping = thread:newFunction(function(self) + c.Ping:Pause() -- Don't allow this function to be called more than once + local pings = {} + multi.ForEach(funcs, function(link) -- Sync new connections + link:push{self.PING} + end) + thread.hold(function() + item = self.subscribe:peek() + if item ~= nil and item[1] == self.PONG then + table.insert(pings,item[2]) + end + end,{sleep=3}) -- Give all threads time to respond to the ping + -- We waited long enough for a response, anything that did not respond gets removed + remove(funcs, pings) + c.Ping:Resume() + end,false) function c:Fire(...) - -- + master_conn:Fire(...) + multi.ForEach(funcs, function(link) -- Sync new connections + link:push{self.TRIG,{...}} + end) end function c:Sync(link) -- end function c:Connect(func) local conn_func = func + self.proxy_conn(func) proxy_conn(function() funcs[#funcs+1] = func -- Used for syncing new connections to this connection later on - multi.ForEach(c.connections, function(link) + multi.ForEach(self.connections, function(link) link:push{CONN, func} end) end) end function c:init() + self.proxy_conn return self end - GLOBAL[name or "_"] = c + GLOBAL[name] = c return c end diff --git a/test2.lua b/test2.lua new file mode 100644 index 0000000..5603708 --- /dev/null +++ b/test2.lua @@ -0,0 +1,33 @@ +function difference(a, b) + local ai = {} + local r = {} + local rr = {} + for k,v in pairs(a) do r[k] = v; ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(rr,r[k]) end + end + return rr +end +function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r +end + +function printtab(tab,msg) + print(msg or "TABLE") + for i,v in pairs(tab) do + print(i, v) + end + print("") +end + +local tab1 = {1,2,3,4,5} +local tab2 = {3,4,5,6,7} +tab1 = remove(tab1,tab2) +printtab(tab1, "Table 1") +printtab(tab2, "Table 2") -- 2.43.0 From 62e05788d2a3d3bba600b973b659b706d7affbdb Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Wed, 21 Sep 2022 00:06:48 -0400 Subject: [PATCH 8/9] Working on STC --- makeENV.lua | 10 ++-- multi/init.lua | 2 +- test.lua | 142 +++++++++++++++++++++++++++++++------------------ 3 files changed, 96 insertions(+), 58 deletions(-) diff --git a/makeENV.lua b/makeENV.lua index acb6fc0..d312349 100644 --- a/makeENV.lua +++ b/makeENV.lua @@ -1,9 +1,9 @@ commands = [[ -mkdir luajit && python -m hererocks -j 2.1.0-beta3 -r latest --patch --compat all ./luajit && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi -mkdir lua5.1 && python -m hererocks -l 5.1 -r latest --patch --compat all ./lua5.1 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi -mkdir lua5.2 && python -m hererocks -l 5.2 -r latest --patch --compat all ./lua5.2 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi -mkdir lua5.3 && python -m hererocks -l 5.3 -r latest --patch --compat all ./lua5.3 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi -mkdir lua5.4 && python -m hererocks -l 5.4 -r latest --patch --compat all ./lua5.4 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi +mkdir luajit && python -m hererocks -j 2.1.0-beta3 -r latest --patch --compat all ./luajit && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi && luarocks install lanes +mkdir lua5.1 && python -m hererocks -l 5.1 -r latest --patch --compat all ./lua5.1 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi && luarocks install lanes +mkdir lua5.2 && python -m hererocks -l 5.2 -r latest --patch --compat all ./lua5.2 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi && luarocks install lanes +mkdir lua5.3 && python -m hererocks -l 5.3 -r latest --patch --compat all ./lua5.3 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi && luarocks install lanes +mkdir lua5.4 && python -m hererocks -l 5.4 -r latest --patch --compat all ./lua5.4 && set "PATH=G:\VSCWorkspace\multi\luajit\bin;%PATH%" && lua -v && luarocks install multi && luarocks install lanes ]] function string.split (inputstr, sep) local sep = sep or "\n" diff --git a/multi/init.lua b/multi/init.lua index e6caf78..63e3bbc 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -328,7 +328,7 @@ function multi:newConnection(protect,func,kill) conn_helper = func end - c.Connect=c.connect + c.connect=c.Connect c.GetConnection=c.getConnection c.HasConnections = c.hasConnections c.GetConnection = c.getConnection diff --git a/test.lua b/test.lua index 87c2e00..d7bcd66 100644 --- a/test.lua +++ b/test.lua @@ -1,15 +1,15 @@ -package.path = "./?/init.lua;?.lua;lua5.4/share/lua/?/init.lua;lua5.4/share/lua/?.lua;"..package.path -package.cpath = "lua5.4/lib/lua/?/core.dll;"..package.cpath +package.path = "./?/init.lua;?.lua;lua5.2/share/lua/5.2/?/init.lua;lua5.2/share/lua/5.2/?.lua;" +package.cpath = "lua5.2/lib/lua/5.2/?/core.dll;" multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() -function multi:newSystemThreadedConnection(name,...) +function multi:newSystemThreadedConnection(name) + local name = name or multi.randomString(16) local c = {} c.CONN = 0x00 c.TRIG = 0x01 c.PING = 0x02 c.PONG = 0x03 - c.proxy_conn = multi:newConnection(...) local function remove(a, b) local ai = {} local r = {} @@ -19,37 +19,14 @@ function multi:newSystemThreadedConnection(name,...) end return r end - local name = name or multi.randomString(16) c.Name = name - local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. - local funcs = {} - setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects - c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..name):init() -- Incoming subscriptions - multi:newThread("STC_SUB_MAN"..name,function() - local item - while true do - thread.yield() - -- We need to check on broken connections - c:Ping() -- Should return instantlly and process this in another thread - thread.hold(function() - item = c.subscribe:peek() - if item ~= nil and item[1] == c.CONN then - c.subscribe:pop() - connections[#connections+1] = item - multi.ForEach(funcs, function(link) -- Sync new connections - item:push{c.CONN, link} - end) - end - -- Nil return keeps this hold going until timeout - end,{cycles=multi.Priority_Normal}) - -- Usually a bunch of threads subscribe close to the same time. - -- Give those threads some time to ready up. - end - end) - c.Ping = thread:newFunction(function(self) - c.Ping:Pause() -- Don't allow this function to be called more than once + c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + local ping + ping = thread:newFunction(function(self) + ping:Pause() -- Don't allow this function to be called until the first instance is done local pings = {} - multi.ForEach(funcs, function(link) -- Sync new connections + local count = #self.links + multi.ForEach(self.links, function(link) -- Sync new connections link:push{self.PING} end) thread.hold(function() @@ -57,36 +34,97 @@ function multi:newSystemThreadedConnection(name,...) if item ~= nil and item[1] == self.PONG then table.insert(pings,item[2]) end + if #pings==count then + return true -- If we get all pings give control back + end end,{sleep=3}) -- Give all threads time to respond to the ping -- We waited long enough for a response, anything that did not respond gets removed - remove(funcs, pings) - c.Ping:Resume() + self.links = remove(self.links, pings) + ping:Resume() end,false) + + thread:newThread("STC_SUB_MAN"..name,function() + local item + while true do + thread.yield() + -- We need to check on broken connections + ping() -- Should return instantlly and process this in another thread + thread.hold(function() + item = c.subscribe:peek() + if item ~= nil and item[1] == c.CONN then + c.subscribe:pop() + multi.ForEach(c.links, function(link) -- Sync new connections + item[2]:push{c.CONN, link} + end) + c.links[#c.links+1] = item + end + -- Nil return keeps this hold going until timeout + end,{cycles=multi.Priority_Normal}) + -- Usually a bunch of threads subscribe close to the same time. + end + end) + function c:Fire(...) - master_conn:Fire(...) - multi.ForEach(funcs, function(link) -- Sync new connections - link:push{self.TRIG,{...}} - end) - end - function c:Sync(link) - -- - end - function c:Connect(func) - local conn_func = func - self.proxy_conn(func) - proxy_conn(function() - funcs[#funcs+1] = func -- Used for syncing new connections to this connection later on - multi.ForEach(self.connections, function(link) - link:push{CONN, func} - end) + self.proxy_conn:Fire(...) + local args = {...} + multi.ForEach(self.links, function(link) -- Sync new connections + print(link[2]) -- Bugs everywhere + for i,v in pairs(link[2]) do print(i,v) end + link[2]:push{self.TRIG, args} end) end + function c:init() - self.proxy_conn + self.links = {} + self.proxy_conn = multi:newConnection() + setmetatable(self, {__index = self.proxy_conn}) + self.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() -- Incoming subscriptions + thread:newThread("STC_CONN_MAN"..name,function() + local item + local link_self_ref = multi:newSystemThreadedQueue() + self.subscribe:push{self.CONN,link_self_ref} + while true do + item = thread.hold(function() + return self.subscribe:peek() + end) + if item[1] == self.PING then + self.subscribe:push{self.PONG, link_self_ref} + elseif item[1] == self.CONN then + if item[2] ~= link_self_ref then + table.insert(self.links, item[2]) + end + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(unpack(item[2])) + else + -- This shouldn't be the case + end + end + end) return self end + GLOBAL[name] = c + return c end +local conn = multi:newSystemThreadedConnection("conn"):init() +multi:newSystemThread("Test",function() + local multi, thread = require("multi"):init() + local conn = THREAD.waitFor("conn"):init() + conn(function() + print("Thread was triggered!") + end) + multi:mainloop() +end) +-- conn(function() +-- print("Mainloop conn got triggered!") +-- end) + +alarm = multi:newAlarm(1) +alarm:OnRing(function() + print("Ring") + conn:Fire() +end) + multi:mainloop() \ No newline at end of file -- 2.43.0 From 7bbb217018d6f2fd2ef5d93116df4b0dee41191c Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Wed, 21 Sep 2022 23:16:38 -0400 Subject: [PATCH 9/9] Working with lanes, todo implement for love2d and test. 'Should work' the way it is with love2d --- changes.md | 74 +++++++- multi/integration/lanesManager/extensions.lua | 125 ++++++++++++- multi/integration/lanesManager/init.lua | 1 + test.lua | 170 ++++++------------ 4 files changed, 253 insertions(+), 117 deletions(-) diff --git a/changes.md b/changes.md index 5c5621b..d6cef4f 100644 --- a/changes.md +++ b/changes.md @@ -7,12 +7,84 @@ Table of contents # Update 15.3.0 - A world of connection Full Update Showcase +```lua +multi, thread = require("multi"):init{print=true} +GLOBAL, THREAD = require("multi.integration.lanesManager"):init() + +local conn = multi:newSystemThreadedConnection("conn"):init() + +multi:newSystemThread("Thread_Test_1",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function() + print(THREAD:getName().." was triggered!") + end) + multi:mainloop() +end) + +multi:newSystemThread("Thread_Test_2",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 2!!!") + conn:Fire(4,5,6) + THREAD.kill() + end) + + multi:mainloop() +end) + +conn(function(a,b,c) + print("Mainloop conn got triggered!",a,b,c) +end) + +alarm = multi:newAlarm(1) +alarm:OnRing(function() + print("Fire 1!!!") + conn:Fire(1,2,3) +end) + +alarm = multi:newAlarm(3):OnRing(function() + multi:newSystemThread("Thread_Test_3",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 3!!!") + conn:Fire(7,8,9) + end) + multi:mainloop() + end) +end) + +multi:newSystemThread("Thread_Test_4",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local conn2 = multi:newConnection() + multi:newAlarm(2):OnRing(function() + conn2:Fire() + end) + multi:newThread(function() + print("Conn Test!") + thread.hold(conn + conn2) + print("It held!") + end) + multi:mainloop() +end) + +multi:mainloop() +``` Added --- - `multi:newSystemThreadedConnection()` - Allows one to trigger connection events across threads. + Allows one to trigger connection events across threads. Works like how any connection would work. Supports all of the features, can even be `added` with non SystemThreadedConnections as demonstrated in the full showcase. - `multi:newConnection():SetHelper(func)` Sets the helper function that the connection object uses when creating connection links. diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index 54b121a..e0add79 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -22,7 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] local multi, thread = require("multi"):init() -local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD +if not (GLOBAL and THREAD) then + local GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD +else + lanes = require("lanes") +end function multi:newSystemThreadedQueue(name) local name = name or multi.randomString(16) local c = {} @@ -174,4 +178,121 @@ function multi:newSystemThreadedJobQueue(n) end,i).priority = thread.Priority_Core end return c -end \ No newline at end of file +end + +function multi:newSystemThreadedConnection(name) + local name = name or multi.randomString(16) + local c = {} + c.CONN = 0x00 + c.TRIG = 0x01 + c.PING = 0x02 + c.PONG = 0x03 + local function remove(a, b) + local ai = {} + local r = {} + for k,v in pairs(a) do ai[v]=true end + for k,v in pairs(b) do + if ai[v]==nil then table.insert(r,a[k]) end + end + return r + end + c.CID = THREAD.getID() + c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() + c.Name = name + c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. + -- Locals will only live in the thread that creates the original object + local ping + local pong = function(link, links) + local res = thread.hold(function() + return link:peek()[1] == c.PONG + end,{sleep=3}) + + if not res then + self.links = remove(self.links, pings) + else + link:pop() + end + end + + ping = thread:newFunction(function(self) + ping:Pause() + multi.ForEach(self.links, function(link) -- Sync new connections + link:push{self.PING} + multi:newThread("pong Thread", pong, link, links) + end) + + thread.sleep(3) + + ping:Resume() + end,false) + + thread:newThread("STC_SUB_MAN"..name,function() + local item + while true do + thread.yield() + -- We need to check on broken connections + ping(c) -- Should return instantlly and process this in another thread + item = thread.hold(function() -- This will keep things held up until there is something to process + return c.subscribe:pop() + end) + if item[1] == c.CONN then + multi.ForEach(c.links, function(link) -- Sync new connections + item[2]:push{c.CONN, link} + end) + c.links[#c.links+1] = item[2] + elseif item[1] == c.TRIG then + c:Fire(unpack(item[2])) + c.proxy_conn:Fire(unpack(item[2])) + end + end + end) + --- ^^^ This will only exist in the init thread + + function c:Fire(...) + local args = {...} + if self.CID == THREAD.getID() then -- Host Call + for _, link in pairs(self.links) do + link:push {self.TRIG, args} + end + else + self.subscribe:push {self.TRIG, args} + end + end + + function c:init() + local multi, thread = require("multi"):init() + self.links = {} + self.proxy_conn = multi:newConnection() + local mt = getmetatable(self.proxy_conn) + setmetatable(self, {__index = self.proxy_conn, __call = function(t,func) self.proxy_conn(func) end, __add = mt.__add}) + thread:newThread("STC_CONN_MAN"..name,function() + local item + local link_self_ref = multi:newSystemThreadedQueue() + self.subscribe:push{self.CONN, link_self_ref} + while true do + item = thread.hold(function() + return link_self_ref:peek() + end) + if item[1] == self.PING then + link_self_ref:push{self.PONG} + link_self_ref:pop() + elseif item[1] == self.CONN then + if item[2].Name ~= link_self_ref.Name then + table.insert(self.links, item[2]) + end + link_self_ref:pop() + elseif item[1] == self.TRIG then + self.proxy_conn:Fire(unpack(item[2])) + link_self_ref:pop() + else + -- This shouldn't be the case + end + end + end) + return self + end + + GLOBAL[name] = c + + return c +end \ No newline at end of file diff --git a/multi/integration/lanesManager/init.lua b/multi/integration/lanesManager/init.lua index b016699..3653b55 100644 --- a/multi/integration/lanesManager/init.lua +++ b/multi/integration/lanesManager/init.lua @@ -88,6 +88,7 @@ function multi:newSystemThread(name, func, ...) }, priority=c.priority },function(...) + require("multi.integration.lanesManager.extensions") local has_error = true return_linda:set("returns",{func(...)}) has_error = false diff --git a/test.lua b/test.lua index d7bcd66..5fdd687 100644 --- a/test.lua +++ b/test.lua @@ -3,128 +3,70 @@ package.cpath = "lua5.2/lib/lua/5.2/?/core.dll;" multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() -function multi:newSystemThreadedConnection(name) - local name = name or multi.randomString(16) - local c = {} - c.CONN = 0x00 - c.TRIG = 0x01 - c.PING = 0x02 - c.PONG = 0x03 - local function remove(a, b) - local ai = {} - local r = {} - for k,v in pairs(a) do ai[v]=true end - for k,v in pairs(b) do - if ai[v]==nil then table.insert(r,a[k]) end - end - return r - end - c.Name = name - c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. - local ping - ping = thread:newFunction(function(self) - ping:Pause() -- Don't allow this function to be called until the first instance is done - local pings = {} - local count = #self.links - multi.ForEach(self.links, function(link) -- Sync new connections - link:push{self.PING} - end) - thread.hold(function() - item = self.subscribe:peek() - if item ~= nil and item[1] == self.PONG then - table.insert(pings,item[2]) - end - if #pings==count then - return true -- If we get all pings give control back - end - end,{sleep=3}) -- Give all threads time to respond to the ping - -- We waited long enough for a response, anything that did not respond gets removed - self.links = remove(self.links, pings) - ping:Resume() - end,false) - - thread:newThread("STC_SUB_MAN"..name,function() - local item - while true do - thread.yield() - -- We need to check on broken connections - ping() -- Should return instantlly and process this in another thread - thread.hold(function() - item = c.subscribe:peek() - if item ~= nil and item[1] == c.CONN then - c.subscribe:pop() - multi.ForEach(c.links, function(link) -- Sync new connections - item[2]:push{c.CONN, link} - end) - c.links[#c.links+1] = item - end - -- Nil return keeps this hold going until timeout - end,{cycles=multi.Priority_Normal}) - -- Usually a bunch of threads subscribe close to the same time. - end - end) - - function c:Fire(...) - self.proxy_conn:Fire(...) - local args = {...} - multi.ForEach(self.links, function(link) -- Sync new connections - print(link[2]) -- Bugs everywhere - for i,v in pairs(link[2]) do print(i,v) end - link[2]:push{self.TRIG, args} - end) - end - - function c:init() - self.links = {} - self.proxy_conn = multi:newConnection() - setmetatable(self, {__index = self.proxy_conn}) - self.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init() -- Incoming subscriptions - thread:newThread("STC_CONN_MAN"..name,function() - local item - local link_self_ref = multi:newSystemThreadedQueue() - self.subscribe:push{self.CONN,link_self_ref} - while true do - item = thread.hold(function() - return self.subscribe:peek() - end) - if item[1] == self.PING then - self.subscribe:push{self.PONG, link_self_ref} - elseif item[1] == self.CONN then - if item[2] ~= link_self_ref then - table.insert(self.links, item[2]) - end - elseif item[1] == self.TRIG then - self.proxy_conn:Fire(unpack(item[2])) - else - -- This shouldn't be the case - end - end - end) - return self - end - - GLOBAL[name] = c - - return c -end - local conn = multi:newSystemThreadedConnection("conn"):init() -multi:newSystemThread("Test",function() + +multi:newSystemThread("Thread_Test_1",function() local multi, thread = require("multi"):init() - local conn = THREAD.waitFor("conn"):init() + local conn = GLOBAL["conn"]:init() conn(function() - print("Thread was triggered!") + print(THREAD:getName().." was triggered!") end) multi:mainloop() end) --- conn(function() --- print("Mainloop conn got triggered!") --- end) + +multi:newSystemThread("Thread_Test_2",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 2!!!") + conn:Fire(4,5,6) + THREAD.kill() + end) + + multi:mainloop() +end) + +conn(function(a,b,c) + print("Mainloop conn got triggered!",a,b,c) +end) alarm = multi:newAlarm(1) -alarm:OnRing(function() - print("Ring") - conn:Fire() +alarm:OnRing(function() + print("Fire 1!!!") + conn:Fire(1,2,3) +end) + +alarm = multi:newAlarm(3):OnRing(function() + multi:newSystemThread("Thread_Test_3",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + conn(function(a,b,c) + print(THREAD:getName().." was triggered!",a,b,c) + end) + multi:newAlarm(2):OnRing(function() + print("Fire 3!!!") + conn:Fire(7,8,9) + end) + multi:mainloop() + end) +end) + +multi:newSystemThread("Thread_Test_4",function() + local multi, thread = require("multi"):init() + local conn = GLOBAL["conn"]:init() + local conn2 = multi:newConnection() + multi:newAlarm(2):OnRing(function() + conn2:Fire() + end) + multi:newThread(function() + print("Conn Test!") + thread.hold(conn + conn2) + print("It held!") + end) + multi:mainloop() end) multi:mainloop() \ No newline at end of file -- 2.43.0