--[[ MIT License Copyright (c) 2019 Ryan Ward Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sub-license, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] local multi = require("multi") local net = require("net") local bin = require("bin") bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix -- Commands that the master and node will respect, max of 256 commands local CMD_ERROR = 0x00 local CMD_PING = 0x01 local CMD_PONG = 0x02 local CMD_QUEUE = 0x03 local CMD_TASK = 0x04 local CMD_INITNODE = 0x05 local CMD_INITMASTER = 0x06 local CMD_GLOBAL = 0x07 local CMD_LOAD = 0x08 local CMD_CALL = 0x09 local CMD_REG = 0x0A local CMD_CONSOLE = 0x0B local char = string.char local byte = string.byte -- Process to hold all of the networkManager's muilt objects -- Helper for piecing commands local function pieceCommand(cmd,...) local tab = {...} table.insert(tab,1,cmd) return table.concat(tab) end -- Internal queue system for network queues local Queue = {} Queue.__index = Queue function Queue:newQueue() local c = {} setmetatable(c,self) return c end function Queue:push(data) table.insert(self,data) end function Queue:raw_push(data) -- Internal usage only table.insert(self,data) end function Queue:pop() return table.remove(self,1) end function Queue:peek() return self[1] end local queues = {} multi.OnNetQueue = multi:newConnection() multi.OnGUpdate = multi:newConnection() -- Managing the data that goes through the system local function packData(data) -- returns the data that was sterilized local dat = bin.new() dat:addBlock(#type(data),1) dat:addBlock(type(data)) -- The type is first if type(data)=="table" then dat:addBlock(data,nil,"t") elseif type(data) == "userdata" then error("Cannot sterilize userdata!") elseif type(data) == "number" then dat:addBlock(data,nil,"d") elseif type(data) == "string" then dat:addBlock(#data,4) dat:addBlock(data,nil,"s") elseif type(data) == "boolean" then dat:addBlock(data,1) elseif type(data) == "function" then dat:addBlock(data,nil,"f") end return dat.data end local function resolveData(data) -- returns the data that was sterilized local dat = bin.new(data) local tlen = dat:getBlock("n",1) local tp = dat:getBlock("s",tlen) if tp=="table" then return dat:getBlock("t") elseif tp=="number" then return dat:getBlock("d") elseif tp=="string" then local num = dat:getBlock("n",4) return dat:getBlock("s",num) elseif tp=="boolean" then return dat:getBlock("b",1) elseif tp=="function" then return dat:getBlock("f") end end -- internal global system local GLOBAL = {} local PROXY = {} setmetatable(GLOBAL,{ __index = function(t,k) return PROXY[k] end, __newindex = function(t,k,v) local v = v PROXY[k] = v multi.OnGUpdate:Fire(k,packData(v)) end }) -- In case you are unable to use broadcasting this can be used to help connect to nodes function multi:nodeManager(port) if not port then error("You must provide a port in order to host the node manager!") end local server = net:newTCPServer(port) server.nodes = {} server.timeouts = {} server.OnNodeAdded = multi:newConnection() server.OnNodeRemoved = multi:newConnection() server.OnDataRecieved(function(server,data,cid,ip,port) local cmd = data:sub(1,1) if cmd == "R" then multi:newThread("Node Client Manager",function(loop) while true do if server.timeouts[cid]==true then server.OnNodeRemoved:Fire(server.nodes[cid]) server.nodes[cid] = nil server.timeouts[cid] = nil thread.kill() else server.timeouts[cid] = true server:send(cid,"ping") end thread.sleep(1) end end) server.nodes[cid]=data:sub(2,-1) server.OnNodeAdded:Fire(server.nodes[cid]) elseif cmd == "G" then server.OnNodeAdded(function(node) server:send(cid,node) end) server.OnNodeRemoved(function(node) server:send(cid,"R"..node:match("(.-)|")) end) for i,v in pairs(server.nodes) do server:send(cid,v) end elseif cmd == "P" then server.timeouts[cid] = nil end end) end -- The main driving force of the network manager: Nodes function multi:newNode(settings) multi:enableLoadDetection() settings = settings or {} -- Here we have to use the net library to broadcast our node across the network math.randomseed(os.time()) local name = settings.name or multi.randomString(8) local node = {} node.name = name multi.OnError(function(i,error) node.OnError:Fire(node,error,node.server) end) node.server = net:newUDPServer(0) -- hosts the node using the default port _, node.port = node.server.udp:getsockname() node.connections = net.ClientCache node.queue = Queue:newQueue() node.functions = bin.stream("RegisteredFunctions.dat",false) node.hasFuncs = {} node.OnError = multi:newConnection() node.OnError(function(node,err,master) multi.print("ERROR",err,node.name) local temp = bin.new() temp:addBlock(#node.name,2) temp:addBlock(node.name) temp:addBlock(#err,2) temp:addBlock(err) for i,v in pairs(node.connections) do multi.print(i) v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3]) end end) if settings.managerDetails then local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) if not c then multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false else c.OnDataRecieved(function(self,data) if data == "ping" then self:send("P") end end) c:send("RNODE_"..name.."|"..net.getLocalIP().."|"..node.port) end end if not settings.preload then if node.functions:getSize()~=0 then multi.print("We have function(s) to preload!") local len = node.functions:getBlock("n",1) local name,func while len do name = node.functions:getBlock("s",len) len = node.functions:getBlock("n",2) func = node.functions:getBlock("s",len) len = node.functions:read(1) _G[name]=resolveData(func) node.hasFuncs[name]=true if not len then break end len = byte(len) end end end function node:pushTo(name,data) node:sendTo(name,char(CMD_QUEUE)..packData(data)) end function node:peek() return node.queue:peek() end function node:pop() return node.queue:pop() end function node:getConsole() local c = {} local conn = node.connections function c.print(...) local data = char(CMD_CONSOLE)..packData({...}) for i,v in pairs(conn) do --print(i) v[1]:send(v[2],data,v[3]) end end -- function c:printTo() -- end return c end node.loadRate=1 -- Lets tell the network we are alive! node.server.OnDataRecieved(function(server,data,cid,ip,port) local cmd = byte(data:sub(1,1)) -- the first byte is the command local dat = data:sub(2,-1) -- the data that you want to read if cmd == CMD_PING then server:send(ip,char(CMD_PONG),port) elseif cmd == CMD_QUEUE then node.queue:push(resolveData(dat)) elseif cmd == CMD_REG then if not settings.allowRemoteRegistering then multi.print(ip..": has attempted to register a function when it is currently not allowed!") return end local temp = bin.new(dat) local len = temp:getBlock("n",1) local name = temp:getBlock("s",len) if node.hasFuncs[name] then multi.print("Function already preloaded onto the node!") return end len = temp:getBlock("n",2) local func = temp:getBlock("s",len) _G[name]=resolveData(func) node.functions:addBlock(dat) elseif cmd == CMD_CALL then local temp = bin.new(dat) local len = temp:getBlock("n",1) local name = temp:getBlock("s",len) len = temp:getBlock("n",4) local args = temp:getBlock("s",len) _G[name](unpack(resolveData(args))) elseif cmd == CMD_TASK then local holder = bin.new(dat) local len = holder:getBlock("n",4) local args = holder:getBlock("s",len) local len2 = holder:getBlock("n",4) local func = holder:getBlock("s",len2) args = resolveData(args) func = resolveData(func) status, err = pcall(func,node,unpack(args)) if not status then node.OnError:Fire(node,err,server) end elseif cmd == CMD_INITNODE then multi.print("Connected with another node!") node.connections[dat]={server,ip,port} multi.OnGUpdate(function(k,v) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) end)-- set this up elseif cmd == CMD_INITMASTER then multi.print("Connected to the master!",dat) node.connections[dat]={server,ip,port} multi.OnGUpdate(function(k,v) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) end)-- set this up multi:newTLoop(function() server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port) end,node.loadRate) server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port) server:send(ip,char(CMD_INITNODE)..node.name,port) elseif cmd == CMD_GLOBAL then local k,v = dat:match("(.-)|(.+)") PROXY[k]=resolveData(v) end end) function node:sendTo(name,data) local conn = node.connections[name] conn[1]:send(conn[2],data,conn[3]) end if not settings.noBroadCast then node.server:broadcast("NODE_"..name) end return node end -- Masters function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code. local master = {} local settings = settings or {} master.name = settings.name or multi.randomString(8) local name = master.name master.conn = multi:newConnection() master.conn2 = multi:newConnection() master.OnFirstNodeConnected = multi:newConnection() master.OnNodeConnected = multi:newConnection() master.OnError = multi:newConnection() master.queue = Queue:newQueue() master.connections = net.ClientCache -- Link to the client cache that is created on the net interface master.loads = {} master.timeouts = {} master.trigger = multi:newFunction(function(self,node) master.OnFirstNodeConnected:Fire(node) self:Pause() end) if settings.managerDetails then local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) if not client then multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false else client.OnDataRecieved(function(client,data) local cmd = data:sub(1,1) if cmd == "N" then print(data) local name,ip,port = data:match("(.-)|(.-)|(.+)") local c = net:newUDPClient(ip,port) net.OnCastedClientInfo:Fire(c,name,ip,port)master.connections[name]=c elseif cmd == "R" then local name = data:sub(2,-1) master.connections[name]=nil end end) client:send("G") -- init your connection as a master end end function master:doToAll(func) for i,v in pairs(master.connections) do func(i,v) end end function master:register(name,node,func) if not node then error("You must specify a node to execute a command on!") end local temp = bin.new() local fData = packData(func) temp:addBlock(CMD_REG,1) temp:addBlock(#name,1) temp:addBlock(name,#name) temp:addBlock(#fData,2) temp:addBlock(fData,#fData) master:sendTo(node,temp.data) end function master:execute(name,node,...) if not node then error("You must specify a node to execute a command on!") end if not name then error("You must specify a function name to call on the node!") end local args = packData{...} local name = name local node = node local temp, len, data temp = bin.new() temp:addBlock(CMD_CALL,1) temp:addBlock(#name,1) temp:addBlock(name,#name) temp:addBlock(#args,4) temp:addBlock(args,#args) master:sendTo(node,temp.data) end function master:pushTo(name,data) master:sendTo(name,char(CMD_QUEUE)..packData(data)) end function master:peek() return self.queue:peek() end function master:pop() return self.queue:pop() end function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job local fData = packData(func) local tab = {...} local aData = "" if #tab~=o then aData = (packData{...}) else aData = (packData{"NO","ARGS"}) end local temp = bin.new() temp:addBlock(#aData,4) local len = temp.data local temp2 = bin.new() temp2:addBlock(#fData,4) local len2 = temp2.data if not name then local name = self:getFreeNode() if not name then name = self:getRandomNode() end if name==nil then multi:newEvent(function() return name~=nil end):OnEvent(function(evnt) self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) evnt:Destroy() end):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self) self:Destroy() end) else self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) end else local name = "NODE_"..name self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) end end function master:sendTo(name,data) if name:sub(1,5)~="NODE_" then name = "NODE_"..name end if self.connections[name]==nil then multi:newEvent(function() return self.connections[name]~=nil end):OnEvent(function(evnt) self.connections[name]:send(data) evnt:Destroy() end):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self) self:Destroy() end) else self.connections[name]:send(data) end end function master:getFreeNode() local count = 0 local min = math.huge local refO for i,v in pairs(master.loads) do if v