From 138d61de85049efa90d4c8594259e68cf9db7814 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sun, 24 Jun 2018 21:59:38 -0400 Subject: [PATCH] awesome stuff being added Almost done with the core features that I want added to this library --- RegisteredFunctions.dat | Bin 0 -> 105 bytes multi/integration/networkManager.lua | 218 ++++++++++++++++++++++++--- multi/integration/shared.lua | 10 +- node.lua | 12 +- nodeManager.lua | 11 ++ rockspecs/multi-2.0-0.rockspec | 32 ++++ test.lua | 47 +++--- 7 files changed, 281 insertions(+), 49 deletions(-) create mode 100644 RegisteredFunctions.dat create mode 100644 nodeManager.lua create mode 100644 rockspecs/multi-2.0-0.rockspec diff --git a/RegisteredFunctions.dat b/RegisteredFunctions.dat new file mode 100644 index 0000000000000000000000000000000000000000..4a297b35a5c74fab5a4adcc8dddc3beece477400 GIT binary patch literal 105 zcmd-GNi8mME6q!eW8g>wQYD%Bd4UWJ4AMScObnb3B|urdoYF)G21XVJCI&WMekL_CPr@tMh1S*5{2^oqU>TT1+Idk%)Amt5MatJPG?|XXJ7yT Dxw#Z3 literal 0 HcmV?d00001 diff --git a/multi/integration/networkManager.lua b/multi/integration/networkManager.lua index 9cb84a9..3ffdb9c 100644 --- a/multi/integration/networkManager.lua +++ b/multi/integration/networkManager.lua @@ -2,7 +2,8 @@ local multi = require("multi") local net = require("net") -local bin = require("bin") +require("bin") +bin.setBitsInterface(infinabits) -- Commands that the master and node will respect, max of 256 commands local CMD_ERROR = 0x00 @@ -15,6 +16,7 @@ local CMD_INITMASTER = 0x06 local CMD_GLOBAL = 0x07 local CMD_LOAD = 0x08 local CMD_CALL = 0x09 +local CMD_REG = 0x0A local char = string.char local byte = string.byte @@ -105,17 +107,91 @@ setmetatable(GLOBAL,{ end }) +-- In case you are unable to use broadcasting this can be used to help connect to nodes +function multi:nodeManager(port) + if not port then + error("You must provide a port in order to host the node manager!") + end + local server = net:newTCPServer(port) + server.nodes = {} + server.timeouts = {} + server.OnNodeAdded = multi:newConnection() + server.OnNodeRemoved = multi:newConnection() + server.OnDataRecieved(function(server,data,cid,ip,port) + local cmd = data:sub(1,1) + if cmd == "R" then + multi:newTLoop(function(loop) + if server.timeouts[cid]==true then + server.OnNodeRemoved:Fire(server.nodes[cid]) + server.nodes[cid] = nil + server.timeouts[cid] = nil + loop:Destroy() + return + end + server.timeouts[cid] = true + server:send(cid,"ping") + end,3) + server.nodes[cid]=data:sub(2,-1) + server.OnNodeAdded:Fire(server.nodes[cid]) + elseif cmd == "G" then + server.OnNodeAdded(function(node) + server:send(cid,node) + end) + server.OnNodeRemoved(function(node) + server:send(cid,"R"..node:match("(.-)|")) + end) + for i,v in pairs(server.nodes) do + server:send(cid,v) + end + elseif cmd == "P" then + server.timeouts[cid] = nil + end + end) +end -- The main driving force of the network manager: Nodes -function multi:newNode(name,settings) +function multi:newNode(settings) + settings = settings or {} -- Here we have to use the net library to broadcast our node across the network math.randomseed(os.time()) - local name = name or multi.randomString(8) + local name = settings.name or multi.randomString(8) local node = {} node.name = name node.server = net:newUDPServer(0) -- hosts the node using the default port node.port = node.server.port node.connections = net.ClientCache node.queue = queue:newQueue() + node.functions = bin.stream("RegisteredFunctions.dat",false) + node.hasFuncs = {} + if settings.managerDetails then + local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) + if not c then + print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false + else + c.OnDataRecieved(function(self,data) + if data == "ping" then + self:send("P") + end + end) + c:send("RNODE_"..name.."|"..net.getLocalIP().."|"..node.port) + end + end + if not settings.preload then + if node.functions:getSize()~=0 then + print("We have function(s) to preload!") + local len = node.functions:getBlock("n",1) + local name,func + while len do + name = node.functions:getBlock("s",len) + len = node.functions:getBlock("n",2) + func = node.functions:getBlock("s",len) + len = node.functions:read(1) + _G[name]=resolveData(func) + node.hasFuncs[name]=true + if not len then break end + len = byte(len) + end + end + end node.loadRate=1 if settings then if settings.crossTalk then @@ -158,10 +234,37 @@ function multi:newNode(name,settings) elseif cmd == CMD_QUEUE then local name,d=dat:match("(.-)|(.+)") multi.OnNetQueue:Fire(name,d) + elseif cmd == CMD_REG then + if not settings.allowRemoteRegistering then + print(ip..": has attempted to register a function when it is currently not allowed!") + return + end + local temp = bin.new(dat) + local len = temp:getBlock("n",1) + local name = temp:getBlock("s",len) + if node.hasFuncs[name] then + print("Function already preloaded onto the node!") + return + end + len = temp:getBlock("n",2) + local func = temp:getBlock("s",len) + _G[name]=resolveData(func) + node.functions:addBlock(dat) + elseif cmd == CMD_CALL then + local temp = bin.new(dat) + local len = temp:getBlock("n",1) + local name = temp:getBlock("s",len) + len = temp:getBlock("n",2) + local args = temp:getBlock("s",len) + _G[name](unpack(resolveData(args))) elseif cmd == CMD_TASK then - local args,func = dat:match("(.-)|(.+)") - func = resolveData(func) + local holder = bin.new(dat) + local len = holder:getBlock("n",4) + local args = holder:getBlock("s",len) + local len2 = holder:getBlock("n",4) + local func = holder:getBlock("s",len2) args = resolveData(args) + func = resolveData(func) func(unpack(args)) elseif cmd == CMD_INITNODE then print("Connected with another node!") @@ -190,33 +293,100 @@ function multi:newNode(name,settings) function node:sendToNode(name,data) self.connections[name]:send(data) end - node.server:broadcast("NODE_"..name) + if not settings.noBroadCast then + node.server:broadcast("NODE_"..name) + end return node end -- Masters -function multi:newMaster(name,settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code. +function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code. local master = {} - master.name = name + local settings = settings or {} + master.name = settings.name or multi.randomString(8) + local name = master.name master.conn = multi:newConnection() master.conn2 = multi:newConnection() master.OnFirstNodeConnected = multi:newConnection() master.queue = queue:newQueue() master.connections = net.ClientCache -- Link to the client cache that is created on the net interface master.loads = {} - master.trigger = multi:newFunction(function(self) - master.OnFirstNodeConnected:Fire() + master.trigger = multi:newFunction(function(self,node) + master.OnFirstNodeConnected:Fire(node) self:Pause() end) - function master:newNetworkThread(tname,name,func,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job + if settings.managerDetails then + local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) + if not client then + print("Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false + else + client.OnDataRecieved(function(client,data) + local cmd = data:sub(1,1) + if cmd == "N" then + local name,ip,port = data:match("(.-)|(.-)|(.+)") + local c = net:newUDPClient(ip,port) + master.connections[name]=c + net.OnCastedClientInfo:Fire(c,name,ip,port) + elseif cmd == "R" then + local name = data:sub(2,-1) + master.connections[name]=nil + end + end) + client:send("G") -- init your connection as a master + end + end + function master:doToAll(func) + for i,v in pairs(master.connections) do + func(i,v) + end + end + function master:register(name,node,func) + if not node then + error("You must specify a node to execute a command on!") + end + local temp = bin.new() + local fData = packData(func) + temp:addBlock(CMD_REG,1) + temp:addBlock(#name,1) + temp:addBlock(name,#name) + temp:addBlock(#fData,2) + temp:addBlock(fData,#fData) + master:sendTo(node,temp.data) + end + function master:execute(name,node,...) + if not node then + error("You must specify a node to execute a command on!") + end + if not name then + error("You must specify a function name to call on the node!") + end + local args = packData{...} + local name = name + local node = node + local temp, len, data + temp = bin.new() + temp:addBlock(CMD_CALL,1) + temp:addBlock(#name,1) + temp:addBlock(name,#name) + temp:addBlock(#args,2) + temp:addBlock(args,#args) + master:sendTo(node,temp.data) + end + function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job local fData = packData(func) local tab = {...} local aData = "" if #tab~=o then - aData = (packData{...}).."|" + aData = (packData{...}) else - aData = (packData{1,1}).."|" + aData = (packData{"NO","ARGS"}) end + local temp = bin.new() + temp:addBlock(#aData,4) + local len = temp.data + local temp2 = bin.new() + temp2:addBlock(#fData,4) + local len2 = temp2.data if not name then local name = self:getFreeNode() if not name then @@ -225,20 +395,22 @@ function multi:newMaster(name,settings) -- You will be able to have more than on if name==nil then multi:newTLoop(function(loop) if name~=nil then - print("Readying Func") - self:sendTo(name,char(CMD_TASK)..aData..fData) + self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) loop:Desrtoy() end end,.1) else - self:sendTo(name,char(CMD_TASK)..aData..fData) + self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) end else local name = "NODE_"..name - self:sendTo(name,char(CMD_TASK)..aData..fData) + self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) end end function master:sendTo(name,data) + if name:sub(1,5)=="NODE_" then + name = name:sub(6,-1) + end if self.connections["NODE_"..name]==nil then multi:newTLoop(function(loop) if self.connections["NODE_"..name]~=nil then @@ -273,16 +445,16 @@ function multi:newMaster(name,settings) -- You will be able to have more than on multi.OnGUpdate(function(k,v) client:send(table.concat{char(CMD_GLOBAL),k,"|",v}) end) - print("Found a new node! Node_List:") + local nodename for i,v in pairs(master.connections) do - print(i) + nodename = i end client.OnClientReady(function() client:send(char(CMD_INITMASTER)..name) -- Tell the node that you are a master trying to connect client.OnDataRecieved(function(client,data) local cmd = byte(data:sub(1,1)) -- the first byte is the command local dat = data:sub(2,-1) -- the data that you want to read - master.trigger() + master.trigger(nodename) if cmd == CMD_ERROR then elseif cmd == CMD_PING then @@ -295,7 +467,6 @@ function multi:newMaster(name,settings) -- You will be able to have more than on elseif cmd == CMD_GLOBAL then local k,v = dat:match("(.-)|(.+)") PROXY[k]=resolveData(v) - print("Got Global Command") elseif cmd == CMD_LOAD then local name,load = dat:match("(.-)|(.+)") master.loads[name]=tonumber(load) @@ -303,11 +474,14 @@ function multi:newMaster(name,settings) -- You will be able to have more than on end) end) end) - net:newCastedClients("NODE_(.+)") -- Searches for nodes and connects to them, the master.clients table will contain them by name + if not settings.noBroadCast then + net:newCastedClients("NODE_(.+)") -- Searches for nodes and connects to them, the master.clients table will contain them by name + end return master end -- The init function that gets returned +print("Integrated Network Parallelism") return {init = function() return GLOBAL end} \ No newline at end of file diff --git a/multi/integration/shared.lua b/multi/integration/shared.lua index 9e5116c..5a8b8bd 100644 --- a/multi/integration/shared.lua +++ b/multi/integration/shared.lua @@ -119,7 +119,7 @@ function multi:newSystemThreadedConnection(name,protect) local sThread=multi.integration.THREAD local GLOBAL=multi.integration.GLOBAL function c:init() - require("multi") + local multi = require("multi") if multi:getPlatform()=="love2d" then GLOBAL=_G.GLOBAL sThread=_G.sThread @@ -134,7 +134,7 @@ function multi:newSystemThreadedConnection(name,protect) conn.queueCall = multi:newSystemThreadedQueue(self.name.."THREADED_CALLQ"):init() end else - require("multi") -- so things don't break, but also allows bi-directional connections to work + local multi = require("multi") -- so things don't break, but also allows bi-directional connections to work conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init() end setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) @@ -216,7 +216,7 @@ function multi:systemThreadedBenchmark(n,p) local GLOBAL=multi.integration.GLOBAL for i=1,cores do multi:newSystemThread("STHREAD_BENCH",function() - require("multi") + local multi = require("multi") if multi:getPlatform()=="love2d" then GLOBAL=_G.GLOBAL sThread=_G.sThread @@ -257,7 +257,7 @@ function multi:newSystemThreadedConsole(name) local sThread=multi.integration.THREAD local GLOBAL=multi.integration.GLOBAL function c:init() - require("multi") + local multi = require("multi") if multi:getPlatform()=="love2d" then GLOBAL=_G.GLOBAL sThread=_G.sThread @@ -381,7 +381,7 @@ function multi:newSystemThreadedJobQueue(numOfCores) GLOBAL["__JQ_COUNT__"]=c.cores for i=1,c.cores do multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function(name,ind) - require("multi") + local multi = require("multi") ThreadName=name __sleep__=.001 if love then -- lets make sure we don't reference up-values if using love2d diff --git a/node.lua b/node.lua index 423f0f7..1a63439 100644 --- a/node.lua +++ b/node.lua @@ -2,9 +2,15 @@ package.path="?/init.lua;?.lua;"..package.path multi = require("multi") local GLOBAL, THREAD = require("multi.integration.lanesManager").init() nGLOBAL = require("multi.integration.networkManager").init() -master = multi:newNode() -function RemoteTest() - print("Yes I work!") +master = multi:newNode{ + crossTalk = false, -- default value + allowRemoteRegistering = true, + name = nil, -- default value + noBroadCast = true, + managerDetails = {"localhost",12345}, -- connects to the node manager if one exists +} +function RemoteTest(a,b,c) + print("Yes I work!",a,b,c) end settings = { priority = 0, -- 1 or 2 diff --git a/nodeManager.lua b/nodeManager.lua new file mode 100644 index 0000000..d520b0c --- /dev/null +++ b/nodeManager.lua @@ -0,0 +1,11 @@ +package.path="?/init.lua;?.lua;"..package.path +multi = require("multi") +local GLOBAL, THREAD = require("multi.integration.lanesManager").init() +nGLOBAL = require("multi.integration.networkManager").init() +multi:nodeManager(12345) +print("Node Manager Running...") +settings = { + priority = 0, -- 1 or 2 + protect = false, +} +multi:mainloop(settings) diff --git a/rockspecs/multi-2.0-0.rockspec b/rockspecs/multi-2.0-0.rockspec new file mode 100644 index 0000000..c7c9f3d --- /dev/null +++ b/rockspecs/multi-2.0-0.rockspec @@ -0,0 +1,32 @@ +package = "multi" +version = "1.11.0" +source = { + url = "git://github.com/rayaman/multi.git", + tag = "v1.11.0", +} +description = { + summary = "Lua Multi tasking library", + detailed = [[ + This library contains many methods for multi tasking. From simple side by side code using multi-objs, to using coroutine based Threads and System threads(When you have lua lanes installed or are using love2d) + ]], + homepage = "https://github.com/rayaman/multi", + license = "MIT" +} +dependencies = { + "lua >= 5.1", + "bin", + "lanes" +} +build = { + type = "builtin", + modules = { + ["multi.init"] = "multi/init.lua", + ["multi.all"] = "multi/all.lua", + ["multi.compat.backwards[1,5,0]"] = "multi/compat/backwards[1,5,0].lua", + ["multi.compat.love2d"] = "multi/compat/love2d.lua", + ["multi.integration.lanesManager"] = "multi/integration/lanesManager.lua", + ["multi.integration.loveManager"] = "multi/integration/loveManager.lua", + ["multi.integration.luvitManager"] = "multi/integration/luvitManager.lua", + ["multi.integration.shared"] = "multi/integration/shared.lua" + } +} \ No newline at end of file diff --git a/test.lua b/test.lua index c178a76..26bacc0 100644 --- a/test.lua +++ b/test.lua @@ -4,31 +4,40 @@ package.path="?/init.lua;?.lua;"..package.path multi = require("multi") local GLOBAL, THREAD = require("multi.integration.lanesManager").init() nGLOBAL = require("multi.integration.networkManager").init() --- Run the code -master = multi:newMaster("Main") +-- Act as a master node +master = multi:newMaster{ + pollManagerRate = 15, + name = "Main", + noBroadCast = true, + managerDetails = {"localhost",12345}, +} -- Starting the multitasker settings = { - priority = 0, -- 1 or 2 + priority = 0, -- 0, 1 or 2 protect = false, } -master.OnFirstNodeConnected(function() - print("Node connected lets go!") - master:newNetworkThread("Test_Thread",nil,function() - RemoteTest() - multi:newThread("test",function() - nGLOBAL["test"]="Did it work?" +--~ master.OnFirstNodeConnected(function(node_name) +multi:newAlarm(3):OnRing(function(alarm) + master:doToAll(function(node_name) + master:register("TestFunc",node_name,function(msg) + print("It works: "..msg) end) - end) - multi:newThread("Checker",function() - while true do - thread.sleep(.5) - if nGLOBAL["test"] then - print(nGLOBAL["test"]) - thread.kill() + multi:newAlarm(2):OnRing(function(alarm) + master:execute("TestFunc",node_name,"Hello!") + alarm:Destroy() + end) + multi:newThread("Checker",function() + while true do + thread.sleep(1) + if nGLOBAL["test"] then + print(nGLOBAL["test"]) + thread.kill() + end end - end + end) + nGLOBAL["test2"]={age=22} end) + alarm:Destroy() end) -os.execute("start lua node.lua") +--~ os.execute("start lua node.lua") multi:mainloop(settings) -