From 4c088b90803e118d42e12c6702d9055d1e4bda3f Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Thu, 29 Sep 2022 23:10:39 -0400 Subject: [PATCH] Working on new features --- changes.md | 32 +++ multi/init.lua | 14 +- multi/integration/lovrManager/extensions.lua | 6 +- multi/integration/lovrManager/init.lua | 89 +++++++ multi/integration/lovrManager/threads.lua | 222 ++++++++++++++++++ multi/integration/networkManager/channel.lua | 34 +++ .../integration/networkManager/childNode.lua | 46 ++++ .../integration/networkManager/clientSide.lua | 35 +++ multi/integration/networkManager/cmds.lua | 42 ++++ .../integration/networkManager/extensions.lua | 23 ++ .../integration/networkManager/masterNode.lua | 163 +++++++++++++ multi/integration/networkManager/node.lua | 127 ++++++++++ .../networkManager/nodeManager.lua | 48 ++++ .../integration/networkManager/serverSide.lua | 47 ++++ multi/integration/networkManager/threads.lua | 23 ++ multi/integration/networkManager/utils.lua | 27 +++ test.lua | 147 +++++++----- 17 files changed, 1056 insertions(+), 69 deletions(-) create mode 100644 multi/integration/lovrManager/init.lua create mode 100644 multi/integration/lovrManager/threads.lua create mode 100644 multi/integration/networkManager/channel.lua create mode 100644 multi/integration/networkManager/childNode.lua create mode 100644 multi/integration/networkManager/clientSide.lua create mode 100644 multi/integration/networkManager/cmds.lua create mode 100644 multi/integration/networkManager/extensions.lua create mode 100644 multi/integration/networkManager/masterNode.lua create mode 100644 multi/integration/networkManager/node.lua create mode 100644 multi/integration/networkManager/nodeManager.lua create mode 100644 multi/integration/networkManager/serverSide.lua create mode 100644 multi/integration/networkManager/threads.lua create mode 100644 multi/integration/networkManager/utils.lua diff --git a/changes.md b/changes.md index d6cef4f..fa53381 100644 --- a/changes.md +++ b/changes.md @@ -99,6 +99,38 @@ Changed --- - `Connection:[connect, hasConnections, getConnection]` changed to be `Connection:[Connect, HasConnections, getConnections]`. This was done in an attempt to follow a consistent naming scheme. The old methods still will work to prevent old code breaking. +- `Connections when added(+) together now act like 'or', to get the 'and' feature multiply(*) them together.` + + **Note:** This is a potentially breaking change for using connections. + + ```lua + multi, thread = require("multi"):init{print=true} + -- GLOBAL, THREAD = require("multi.integration.lanesManager"):init() + + local conn1, conn2, conn3 = multi:newConnection(), multi:newConnection(), multi:newConnection() + + thread:newThread(function() + print("Awaiting status") + thread.hold(conn1 + (conn2 * conn3)) + print("Conn or Conn2 and Conn3") + end) + + multi:newAlarm(1):OnRing(function() + print("Conn") + conn1:Fire() + end) + + multi:newAlarm(2):OnRing(function() + print("Conn2") + conn2:Fire() + end) + + multi:newAlarm(3):OnRing(function() + print("Conn3") + conn3:Fire() + end) + ``` + Removed --- - Connection objects methods removed: diff --git a/multi/init.lua b/multi/init.lua index 2853319..fe73b0e 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -137,8 +137,18 @@ function multi:newConnection(protect,func,kill) return self:Connect(...) end end, - __add = function(c1,c2) - cn = multi:newConnection() + __add = function(c1,c2) -- Or + local cn = multi:newConnection() + c1(function(...) + cn:Fire(...) + end) + c2(function(...) + cn:Fire(...) + end) + return cn + end, + __mul = function(c1,c2) -- And + local cn = multi:newConnection() if not c1.__hasInstances then cn.__hasInstances = 2 cn.__count = 0 diff --git a/multi/integration/lovrManager/extensions.lua b/multi/integration/lovrManager/extensions.lua index b2082da..7032b1d 100644 --- a/multi/integration/lovrManager/extensions.lua +++ b/multi/integration/lovrManager/extensions.lua @@ -25,7 +25,6 @@ local multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD function multi:newSystemThreadedQueue(name) - local name = name or multi.randomString(16) local c = {} c.Name = name local fRef = {"func",nil} @@ -63,11 +62,10 @@ function multi:newSystemThreadedQueue(name) return c end function multi:newSystemThreadedTable(name) - local name = name or multi.randomString(16) local c = {} - c.Name = name + c.name = name function c:init() - return THREAD.createTable(self.Name) + return THREAD.createTable(self.name) end THREAD.package(name,c) return c diff --git a/multi/integration/lovrManager/init.lua b/multi/integration/lovrManager/init.lua new file mode 100644 index 0000000..53cafa3 --- /dev/null +++ b/multi/integration/lovrManager/init.lua @@ -0,0 +1,89 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +-- TODO make compatible with lovr +if ISTHREAD then + error("You cannot require the lovrManager from within a thread!") +end +local ThreadFileData = [[ +ISTHREAD = true +THREAD = require("multi.integration.lovrManager.threads") -- order is important! +sThread = THREAD +__IMPORTS = {...} +__FUNC__=table.remove(__IMPORTS,1) +__THREADID__=table.remove(__IMPORTS,1) +__THREADNAME__=table.remove(__IMPORTS,1) +stab = THREAD.createStaticTable(__THREADNAME__) +GLOBAL = THREAD.getGlobal() +multi, thread = require("multi").init() +stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))} +]] +local multi, thread = require("multi.compat.lovr2d"):init() +local THREAD = {} +__THREADID__ = 0 +__THREADNAME__ = "MainThread" +multi.integration={} +multi.integration.lovr2d={} +local THREAD = require("multi.integration.lovrManager.threads") +local GLOBAL = THREAD.getGlobal() +local THREAD_ID = 1 +local OBJECT_ID = 0 +local stf = 0 +function THREAD:newFunction(func,holup) + stf = stf + 1 + return function(...) + local t = multi:newSystemThread("STF"..stf,func,...) + return thread:newFunction(function() + return thread.hold(function() + if t.stab["returns"] then + local dat = t.stab.returns + t.stab.returns = nil + return unpack(dat) + end + end) + end,holup)() + end +end +function multi:newSystemThread(name,func,...) + local c = {} + c.name = name + c.ID=THREAD_ID + c.thread=lovr.thread.newThread(ThreadFileData) + c.thread:start(THREAD.dump(func),c.ID,c.name,...) + c.stab = THREAD.createStaticTable(name) + GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} + GLOBAL["__THREAD_COUNT"] = THREAD_ID + THREAD_ID=THREAD_ID+1 + return c +end +THREAD.newSystemThread = multi.newSystemThread +function lovr.threaderror(thread, errorstr) + print("Thread error!\n"..errorstr) +end +multi.integration.GLOBAL = GLOBAL +multi.integration.THREAD = THREAD +require("multi.integration.lovrManager.extensions") +print("Integrated lovr Threading!") +return {init=function() + return GLOBAL,THREAD +end} \ No newline at end of file diff --git a/multi/integration/lovrManager/threads.lua b/multi/integration/lovrManager/threads.lua new file mode 100644 index 0000000..6a95a1e --- /dev/null +++ b/multi/integration/lovrManager/threads.lua @@ -0,0 +1,222 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +-- TODO make compatible with lovr +require("lovr.timer") +require("lovr.system") +require("lovr.data") +local socket = require("socket") +local multi, thread = require("multi").init() +local threads = {} +function threads.loadDump(d) + return loadstring(d:getString()) +end +function threads.dump(func) + return lovr.data.newByteData(string.dump(func)) +end +local fRef = {"func",nil} +local function manage(channel, value) + channel:clear() + if type(value) == "function" then + fRef[2] = THREAD.dump(value) + channel:push(fRef) + return + else + channel:push(value) + end +end +local function RandomVariable(length) + local res = {} + math.randomseed(socket.gettime()*10000) + for i = 1, length do + res[#res+1] = string.char(math.random(97, 122)) + end + return table.concat(res) +end +local GNAME = "__GLOBAL_" +local proxy = {} +function threads.set(name,val) + if not proxy[name] then proxy[name] = lovr.thread.getChannel(GNAME..name) end + proxy[name]:performAtomic(manage, val) +end +function threads.get(name) + if not proxy[name] then proxy[name] = lovr.thread.getChannel(GNAME..name) end + local dat = proxy[name]:peek() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end +end +function threads.waitFor(name) + if thread.isThread() then + return thread.hold(function() + return threads.get(name) + end) + end + while threads.get(name)==nil do + lovr.timer.sleep(.001) + end + local dat = threads.get(name) + if type(dat) == "table" and dat.init then + dat.init = threads.loadDump(dat.init) + end + return dat +end +function threads.package(name,val) + local init = val.init + val.init=threads.dump(val.init) + GLOBAL[name]=val + val.init=init +end +function threads.getCores() + return lovr.system.getProcessorCount() +end +function threads.kill() + error("Thread Killed!") +end +function threads.getThreads() + local t = {} + for i=1,GLOBAL["__THREAD_COUNT"] do + t[#t+1]=GLOBAL["__THREAD_"..i] + end + return t +end +function threads.getThread(n) + return GLOBAL["__THREAD_"..n] +end +function threads.getName() + return __THREADNAME__ +end +function threads.getID() + return __THREADID__ +end +function threads.sleep(n) + lovr.timer.sleep(n) +end +function threads.getGlobal() + return setmetatable({}, + { + __index = function(t, k) + return THREAD.get(k) + end, + __newindex = function(t, k, v) + THREAD.set(k,v) + end + } + ) +end +function threads.createTable(n) + local _proxy = {} + local function set(name,val) + if not _proxy[name] then _proxy[name] = lovr.thread.getChannel(n..name) end + _proxy[name]:performAtomic(manage, val) + end + local function get(name) + if not _proxy[name] then _proxy[name] = lovr.thread.getChannel(n..name) end + local dat = _proxy[name]:peek() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end + end + return setmetatable({}, + { + __index = function(t, k) + return get(k) + end, + __newindex = function(t, k, v) + set(k,v) + end + } + ) +end +function threads.getConsole() + local c = {} + c.queue = lovr.thread.getChannel("__CONSOLE__") + function c.print(...) + c.queue:push{...} + end + function c.error(err) + c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} + error(err) + end + return c +end +if not ISTHREAD then + local clock = os.clock + local lastproc = clock() + local queue = lovr.thread.getChannel("__CONSOLE__") + thread:newThread("consoleManager",function() + while true do + thread.yield() + dat = queue:pop() + if dat then + lastproc = clock() + print(unpack(dat)) + end + if clock()-lastproc>2 then + thread.sleep(.1) + end + end + end) +end +function threads.createStaticTable(n) + local __proxy = {} + local function set(name,val) + if __proxy[name] then return end + local chan = lovr.thread.getChannel(n..name) + if chan:getCount()>0 then return end + chan:performAtomic(manage, val) + __proxy[name] = val + end + local function get(name) + if __proxy[name] then return __proxy[name] end + local dat = lovr.thread.getChannel(n..name):peek() + if type(dat)=="table" and dat[1]=="func" then + __proxy[name] = THREAD.loadDump(dat[2]) + return __proxy[name] + else + __proxy[name] = dat + return __proxy[name] + end + end + return setmetatable({}, + { + __index = function(t, k) + return get(k) + end, + __newindex = function(t, k, v) + set(k,v) + end + } + ) +end +function threads.hold(n) + local dat + while not(dat) do + dat = n() + end +end +return threads \ No newline at end of file diff --git a/multi/integration/networkManager/channel.lua b/multi/integration/networkManager/channel.lua new file mode 100644 index 0000000..3083241 --- /dev/null +++ b/multi/integration/networkManager/channel.lua @@ -0,0 +1,34 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +_G["__CHANNEL__"] = {} +local channel = {} +channel.__index = channel + +-- Creates/Gets a channel of name +function channel:newChannel(name) + local chan = _G["__CHANNEL__"] + if chan then + + end +end \ No newline at end of file diff --git a/multi/integration/networkManager/childNode.lua b/multi/integration/networkManager/childNode.lua new file mode 100644 index 0000000..43837d5 --- /dev/null +++ b/multi/integration/networkManager/childNode.lua @@ -0,0 +1,46 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local multi, thread = require("multi"):init() +local cmd = require("multi.integration.networkManager.cmds") +local node = require("multi.integration.networkManager.node") +local net = require("net") +local bin = require("bin") +local child = {} +child.__index = child +function multi:newChildNode(cd) + local c = {} + setmetatable(c,child) + local name + if cd then + if cd.name then + name = cd.name + end + c.node = node:new(cd.nodePort or cmd.defaultPort,nil,name) + if cd.managerHost then + cd.managerPort = cd.managerPort or cmd.defaultManagerPort + c.node:registerWithManager(cd.managerHost,cd.managerPort) + end + end + return c +end \ No newline at end of file diff --git a/multi/integration/networkManager/clientSide.lua b/multi/integration/networkManager/clientSide.lua new file mode 100644 index 0000000..9210840 --- /dev/null +++ b/multi/integration/networkManager/clientSide.lua @@ -0,0 +1,35 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +return function(self,data) + local cmd,data = data:match("!(.-)!(.*)") + --print(">",cmd,data) + if cmd == "PONG" then + self:send("!PONG!") + elseif cmd == "CHANNEL" then + -- + elseif cmd == "RETURNS" then + local rets = bin.new(data):getBlock("t") + self.node.master.OnDataReturned:Fire(rets) + end +end \ No newline at end of file diff --git a/multi/integration/networkManager/cmds.lua b/multi/integration/networkManager/cmds.lua new file mode 100644 index 0000000..ccb3c2f --- /dev/null +++ b/multi/integration/networkManager/cmds.lua @@ -0,0 +1,42 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local cmds = { + defaultManagerPort = 0XDE2, + defaultWait = 0X002, + defaultPort = 0X000, -- We will let the OS assign us one + standardSkip = 0X018, + ERROR = 0X000, + PING = 0X001, + PONG = 0X002, + QUEUE = 0X003, + TASK = 0X004, + INITNODE = 0X005, + INITMASTER = 0X006, + GLOBAL = 0X007, + LOAD = 0X008, + CALL = 0X009, + REG = 0X00A, + CONSOLE = 0X00B, +} +return cmds \ No newline at end of file diff --git a/multi/integration/networkManager/extensions.lua b/multi/integration/networkManager/extensions.lua new file mode 100644 index 0000000..c4cac79 --- /dev/null +++ b/multi/integration/networkManager/extensions.lua @@ -0,0 +1,23 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] \ No newline at end of file diff --git a/multi/integration/networkManager/masterNode.lua b/multi/integration/networkManager/masterNode.lua new file mode 100644 index 0000000..25c2b07 --- /dev/null +++ b/multi/integration/networkManager/masterNode.lua @@ -0,0 +1,163 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local multi, thread = require("multi"):init() +local cmd = require("multi.integration.networkManager.cmds") +local node = require("multi.integration.networkManager.node") +local net = require("net") +local bin = require("bin") +local master = {} +master.__index = master +function master:addNode(ip,port) + return node:new(ip,port) +end +function master:getNodesFromBroadcast() + net:newCastedClients("NODE_.+") + net.OnCastedClientInfo(function(client, n, ip, port) + self.nodes[n] = node:new(client) + end) +end +function master:getNodesFromManager(ip,port) + local mn = self.nodes + if not self.manager then + self.manager = net:newTCPClient(ip,port) + if not self.manager then + error("Unable to connect to the node Manager! Is it running? Perhaps the hostname or port is incorrect!") + end + end + self.manager.OnDataRecieved(function(self,data,client) + local cmd = data:match("!(.+)!") + data = data:gsub("!"..cmd.."!","") + if cmd == "NODE" then + local n,h,p = data:match("(.-)|(.-)|(.+)") + mn[n] = node:new(h,tonumber(p)) + end + end) + self.manager:send("!NODES!") +end +function master:setDefaultNode(nodeName) + if self:nodeExists(nodeName) then + self.defaultNode = nodeName + end +end +function master:getRandomNode() + local t = {} + for i,v in pairs(self.nodes) do t[#t+1] = i end + return t[math.random(1,#t)] +end +local netID = 0 +function master:newNetworkThread(nodeName,func,...) + local args = {...} + local dat = bin.new() + local ret + local nID = netID + local conn = multi:newConnection() + thread:newthread(function() + dat:addBlock{ + args = args, + func = func, + id = netID + } + netID = netID + 1 + if type(nodeName) == "function" then + func = nodeName + nodeName = self.defaultNode or self:getRandomNode() + if not func then + error("You must provide a function!") + end + end + self:sendTo(nodeName,"!N_THREAD!"..dat.data) + self.OnDataReturned(function(rets) + if rets.ID == nID then + conn:Fire(unpack(rets.rets)) + end + end) + end) + return conn +end +function master:newNetworkChannel(nodeName) + -- +end +function master:sendTo(nodeName,data) + self:queue("send",nodeName,data) +end +function master:demandNodeExistance(nodeName) + if self.nodes[nodeName] then + return multi.hold(self.nodes[nodeName]:ping().pong) + else + return false + end +end +function master:queue(c,...) + table.insert(self._queue,{c,{...}}) +end +function multi:newMasterNode(cd) + local c = {} + setmetatable(c, master) + c.OnNodeDiscovered = multi:newConnection() + c.OnNodeRemoved = multi:newConnection() + c.OnDataRecieved = multi:newConnection() + c.OnDataReturned = multi:newConnection() + c.defaultNode = "" + c.nodes = {} + setmetatable(c.nodes, + {__newindex = function(t,k,v) + rawset(t,k,v) + v.master = c + c.OnNodeDiscovered:Fire(k,v) + end}) + c._queue = {} + if cd then + if cd.nodeHost then + cd.nodePort = cd.nodePort or cmd.defaultPort + local n,no = c:addNode(cd.nodeHost,cd.nodePort) + if n then + c.nodes[n] = no + end + elseif cd.managerHost then + cd.managerPort = cd.managerPort or cmd.defaultManagerPort + c:getNodesFromManager(cd.managerHost,cd.managerPort) + else + c:getNodesFromBroadcast() + end + else + c:getNodesFromBroadcast() + end + thread:newthread("CMDQueueProcessor",function() + while true do + thread.skip(128) + local data = table.remove(c._queue,1) + if data then + local cmd = data[1] + if cmd == "send" then + local nodeName = data[2][1] + local dat = data[2][2] + c.nodes[nodeName]:send(dat) + end + end + end + end):OnError(function(...) + print(...) + end) + return c +end \ No newline at end of file diff --git a/multi/integration/networkManager/node.lua b/multi/integration/networkManager/node.lua new file mode 100644 index 0000000..8151b57 --- /dev/null +++ b/multi/integration/networkManager/node.lua @@ -0,0 +1,127 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local net = require("net") +local cmd = require("multi.integration.networkManager.cmds") +local multi,thread = require("multi"):init() +local node = {} +node.__index = node +local rand = {} +for i = 65,90 do + rand[#rand+1] = string.char(i) +end +local function randName(n) + local str = {} + for i=1,(n or 10) do + str[#str+1] = rand[math.random(1,#rand)] + end + return table.concat(str) +end +local getNames = thread:newFunction(function(names) + local listen = socket.udp() -- make a new socket + listen:setsockname(net.getLocalIP(), 11111) + listen:settimeout(0) + local data, ip, port = listen:receivefrom() + thread.holdWithin(1,function() + if data then + local n, tp, ip, port = data:match("(%S-)|(%S-)|(%S-):(%d+)") + if n then + names[n]=true + end + end + end) + return multi.NIL +end) +local function setName(ref,name) + if name then + ref.name = "NODE_"..name + ref.connection:broadcast(name) + return + end + local names = {} + getNames(names).wait() -- Prevents duplicate names from spawning! + local name = randName() + while names["NODE_"..name] do + name = randName() + end + ref.name = "NODE_"..name + ref.connection:broadcast(ref.name) +end +node.ServerCode = require("multi.integration.networkManager.serverSide") +node.ClientCode = require("multi.integration.networkManager.clientSide") +function node.random() + return randName(12) +end +function node:registerWithManager(ip,port) + if self.type ~= "server" then return end + if not self.manager then + self.manager = net:newTCPClient(ip,port) + if not self.manager then + error("Unable to connect to the node Manager! Is it running? Perhaps the hostname or port is incorrect!") + end + end + thread:newFunction(function() + thread.hold(function() return self.name end) + self.manager:send("!REG_NODE!"..self.name.."|"..net.getLocalIP().."|"..self.connection.port) + end)() +end +function node:new(host,port,name) + local c = {} + c.links = {} + setmetatable(c,node) + if type(host)=="number" or type(host)=="nil" then + c.connection = net:newTCPServer(host or cmd.defaultPort) + c.connection:enableBinaryMode() + c.type = "server" + c.connection.node = c + c.connection.OnDataRecieved(self.ServerCode) + setName(c) + elseif type(host)=="table" and host.Type == "tcp" then + c.connection = host + c.connection:enableBinaryMode() + c.type = "client" + c.connection.node = c + c.connection.OnDataRecieved(self.ClientCode) + c.name = "MASTER_NODE" + elseif type(host) == "string" and type(port)=="number" then + c.connection = net:newTCPClient(host, port) + c.connection:enableBinaryMode() + c.type = "client" + c.connection.node = c + c.connection.OnDataRecieved(self.ClientCode) + c.name = "MASTER_NODE" + else + error("Invalid arguments!") + end + return c +end +function node:ping() + if self.type ~= "client" then return end + self:send("!PING!") + return {pong=self.connection.OnDataRecieved} +end +function node:send(data) + if self.type ~= "client" then return end + self.connection:send(data) +end +return node \ No newline at end of file diff --git a/multi/integration/networkManager/nodeManager.lua b/multi/integration/networkManager/nodeManager.lua new file mode 100644 index 0000000..840bc60 --- /dev/null +++ b/multi/integration/networkManager/nodeManager.lua @@ -0,0 +1,48 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local multi, thread = require("multi"):init() +local cmd = require("multi.integration.networkManager.cmds") +local net = require("net") +local bin = require("bin") +local nodes = { -- Testing stuff + +} +function multi:newNodeManager(port) + print("Running node manager on port: "..(port or cmd.defaultManagerPort)) + local server = net:newTCPServer(port or cmd.defaultManagerPort) + server.OnDataRecieved(function(serv, data, client) + local cmd = data:match("!(.+)!") + data = data:gsub("!"..cmd.."!","") + if cmd == "NODES" then + for i,v in ipairs(nodes) do + -- Sample data + serv:send(client, "!NODE!".. v[1].."|"..v[2].."|"..v[3]) + end + elseif cmd == "REG_NODE" then + local name, ip, port = data:match("(.-)|(.-)|(.+)") + table.insert(nodes,{name,ip,port}) + print("Registering Node:",name, ip, port) + end + end) +end \ No newline at end of file diff --git a/multi/integration/networkManager/serverSide.lua b/multi/integration/networkManager/serverSide.lua new file mode 100644 index 0000000..4d8b771 --- /dev/null +++ b/multi/integration/networkManager/serverSide.lua @@ -0,0 +1,47 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local bin, bits = require("bin").init() +return function(self,data,client) + local cmd,data = data:match("!(.-)!(.*)") + --print("SERVER",cmd,data) + if cmd == "PING" then + self:send(client,"!PONG!") + elseif cmd == "N_THREAD" then + print(1) + local dat = bin.new(data) + print(2) + local t = dat:getBlock("t") + print(3) + local ret = bin.new() + print(4) + ret:addBlock{ID = t.id,rets = {t.func(unpack(t.args))}} + print(5) + print(client,"!RETURNS!"..ret:getData()) + self:send(client,"!RETURNS!"..ret:getData()) + print(6) + elseif cmd == "CHANNEL" then + local dat = bin.new(data):getBlock("t") + + end +end \ No newline at end of file diff --git a/multi/integration/networkManager/threads.lua b/multi/integration/networkManager/threads.lua new file mode 100644 index 0000000..c4cac79 --- /dev/null +++ b/multi/integration/networkManager/threads.lua @@ -0,0 +1,23 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] \ No newline at end of file diff --git a/multi/integration/networkManager/utils.lua b/multi/integration/networkManager/utils.lua new file mode 100644 index 0000000..9ebdb1b --- /dev/null +++ b/multi/integration/networkManager/utils.lua @@ -0,0 +1,27 @@ +--[[ +MIT License + +Copyright (c) 2022 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local bin = require("bin") +local utils = {} +-- Will contain data that handles sterilizing and managing data +return utils \ No newline at end of file diff --git a/test.lua b/test.lua index aacb05f..e4bebde 100644 --- a/test.lua +++ b/test.lua @@ -1,76 +1,97 @@ package.path = "./?/init.lua;?.lua;lua5.4/share/lua/5.4/?/init.lua;lua5.4/share/lua/5.4/?.lua;"--..package.path package.cpath = "lua5.4/lib/lua/5.4/?/core.dll;"--..package.cpath multi, thread = require("multi"):init{print=true} -GLOBAL, THREAD = require("multi.integration.lanesManager"):init() +-- GLOBAL, THREAD = require("multi.integration.lanesManager"):init() -local conn = multi:newSystemThreadedConnection("conn"):init() - -multi:newSystemThread("Thread_Test_1",function() - local multi, thread = require("multi"):init() - local conn = GLOBAL["conn"]:init() - local console = THREAD.getConsole() - conn(function(a,b,c) - console.print(THREAD:getName().." was triggered!",a,b,c) - end) - multi:mainloop() +local conn1, conn2, conn3 = multi:newConnection(), multi:newConnection(), multi:newConnection() +thread:newThread(function() + print("Awaiting status") + thread.hold(conn1 + (conn2 * conn3)) + print("Conn or Conn2 and Conn3") end) -multi:newSystemThread("Thread_Test_2",function() - local multi, thread = require("multi"):init() - local conn = GLOBAL["conn"]:init() - local console = THREAD.getConsole() - conn(function(a,b,c) - console.print(THREAD:getName().." was triggered!",a,b,c) - end) - multi:newAlarm(2):OnRing(function() - console.print("Fire 2!!!") - conn:Fire(4,5,6) - THREAD.kill() - end) - - multi:mainloop() +multi:newAlarm(1):OnRing(function() + print("Conn") + conn1:Fire() end) -local console = THREAD.getConsole() -conn(function(a,b,c) - console.print("Mainloop conn got triggered!",a,b,c) +multi:newAlarm(2):OnRing(function() + print("Conn2") + conn2:Fire() +end) +multi:newAlarm(3):OnRing(function() + print("Conn3") + conn3:Fire() end) -alarm = multi:newAlarm(1) -alarm:OnRing(function() - console.print("Fire 1!!!") - conn:Fire(1,2,3) -end) -alarm = multi:newAlarm(3):OnRing(function() - multi:newSystemThread("Thread_Test_3",function() - local multi, thread = require("multi"):init() - local conn = GLOBAL["conn"]:init() - local console = THREAD.getConsole() - conn(function(a,b,c) - console.print(THREAD:getName().." was triggered!",a,b,c) - end) - multi:newAlarm(4):OnRing(function() - console.print("Fire 3!!!") - conn:Fire(7,8,9) - end) - multi:mainloop() - end) -end) +-- local conn = multi:newSystemThreadedConnection("conn"):init() -multi:newSystemThread("Thread_Test_4",function() - local multi, thread = require("multi"):init() - local conn = GLOBAL["conn"]:init() - local conn2 = multi:newConnection() - local console = THREAD.getConsole() - multi:newAlarm(2):OnRing(function() - conn2:Fire() - end) - multi:newThread(function() - console.print("Conn Test!") - thread.hold(conn + conn2) - console.print("It held!") - end) - multi:mainloop() -end) +-- multi:newSystemThread("Thread_Test_1",function() +-- local multi, thread = require("multi"):init() +-- local conn = GLOBAL["conn"]:init() +-- local console = THREAD.getConsole() +-- conn(function(a,b,c) +-- console.print(THREAD:getName().." was triggered!",a,b,c) +-- end) +-- multi:mainloop() +-- end) + +-- multi:newSystemThread("Thread_Test_2",function() +-- local multi, thread = require("multi"):init() +-- local conn = GLOBAL["conn"]:init() +-- local console = THREAD.getConsole() +-- conn(function(a,b,c) +-- console.print(THREAD:getName().." was triggered!",a,b,c) +-- end) +-- multi:newAlarm(2):OnRing(function() +-- console.print("Fire 2!!!") +-- conn:Fire(4,5,6) +-- THREAD.kill() +-- end) + +-- multi:mainloop() +-- end) +-- local console = THREAD.getConsole() +-- conn(function(a,b,c) +-- console.print("Mainloop conn got triggered!",a,b,c) +-- end) + +-- alarm = multi:newAlarm(1) +-- alarm:OnRing(function() +-- console.print("Fire 1!!!") +-- conn:Fire(1,2,3) +-- end) + +-- alarm = multi:newAlarm(3):OnRing(function() +-- multi:newSystemThread("Thread_Test_3",function() +-- local multi, thread = require("multi"):init() +-- local conn = GLOBAL["conn"]:init() +-- local console = THREAD.getConsole() +-- conn(function(a,b,c) +-- console.print(THREAD:getName().." was triggered!",a,b,c) +-- end) +-- multi:newAlarm(4):OnRing(function() +-- console.print("Fire 3!!!") +-- conn:Fire(7,8,9) +-- end) +-- multi:mainloop() +-- end) +-- end) + +-- multi:newSystemThread("Thread_Test_4",function() +-- local multi, thread = require("multi"):init() +-- local conn = GLOBAL["conn"]:init() +-- local conn2 = multi:newConnection() +-- local console = THREAD.getConsole() +-- multi:newAlarm(2):OnRing(function() +-- conn2:Fire() +-- end) +-- multi:newThread(function() +-- console.print("Conn Test!") +-- thread.hold(conn + conn2) +-- console.print("It held!") +-- end) +-- multi:mainloop() +-- end) multi:mainloop() \ No newline at end of file