557 lines
16 KiB
Lua
557 lines
16 KiB
Lua
--[[
|
|
MIT License
|
|
|
|
Copyright (c) 2019 Ryan Ward
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all
|
|
copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
SOFTWARE.
|
|
]]
|
|
local multi = require("multi")
|
|
local net = require("net")
|
|
local bin = require("bin")
|
|
bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix
|
|
|
|
-- Commands that the master and node will respect, max of 256 commands
|
|
local CMD_ERROR = 0x00
|
|
local CMD_PING = 0x01
|
|
local CMD_PONG = 0x02
|
|
local CMD_QUEUE = 0x03
|
|
local CMD_TASK = 0x04
|
|
local CMD_INITNODE = 0x05
|
|
local CMD_INITMASTER = 0x06
|
|
local CMD_GLOBAL = 0x07
|
|
local CMD_LOAD = 0x08
|
|
local CMD_CALL = 0x09
|
|
local CMD_REG = 0x0A
|
|
local CMD_CONSOLE = 0x0B
|
|
|
|
local char = string.char
|
|
local byte = string.byte
|
|
-- Process to hold all of the networkManager's muilt objects
|
|
|
|
-- Helper for piecing commands
|
|
local function pieceCommand(cmd,...)
|
|
local tab = {...}
|
|
table.insert(tab,1,cmd)
|
|
return table.concat(tab)
|
|
end
|
|
|
|
-- Internal queue system for network queues
|
|
local Queue = {}
|
|
Queue.__index = Queue
|
|
function Queue:newQueue()
|
|
local c = {}
|
|
setmetatable(c,self)
|
|
return c
|
|
end
|
|
function Queue:push(data)
|
|
table.insert(self,data)
|
|
end
|
|
function Queue:raw_push(data) -- Internal usage only
|
|
table.insert(self,data)
|
|
end
|
|
function Queue:pop()
|
|
return table.remove(self,1)
|
|
end
|
|
function Queue:peek()
|
|
return self[1]
|
|
end
|
|
local queues = {}
|
|
multi.OnNetQueue = multi:newConnection()
|
|
multi.OnGUpdate = multi:newConnection()
|
|
|
|
-- Managing the data that goes through the system
|
|
local function packData(data)
|
|
-- returns the data that was sterilized
|
|
local dat = bin.new()
|
|
dat:addBlock(#type(data),1)
|
|
dat:addBlock(type(data)) -- The type is first
|
|
if type(data)=="table" then
|
|
dat:addBlock(data,nil,"t")
|
|
elseif type(data) == "userdata" then
|
|
error("Cannot sterilize userdata!")
|
|
elseif type(data) == "number" then
|
|
dat:addBlock(data,nil,"d")
|
|
elseif type(data) == "string" then
|
|
dat:addBlock(#data,4)
|
|
dat:addBlock(data,nil,"s")
|
|
elseif type(data) == "boolean" then
|
|
dat:addBlock(data,1)
|
|
elseif type(data) == "function" then
|
|
dat:addBlock(data,nil,"f")
|
|
end
|
|
return dat.data
|
|
end
|
|
local function resolveData(data)
|
|
-- returns the data that was sterilized
|
|
local dat = bin.new(data)
|
|
local tlen = dat:getBlock("n",1)
|
|
local tp = dat:getBlock("s",tlen)
|
|
if tp=="table" then
|
|
return dat:getBlock("t")
|
|
elseif tp=="number" then
|
|
return dat:getBlock("d")
|
|
elseif tp=="string" then
|
|
local num = dat:getBlock("n",4)
|
|
return dat:getBlock("s",num)
|
|
elseif tp=="boolean" then
|
|
return dat:getBlock("b",1)
|
|
elseif tp=="function" then
|
|
return dat:getBlock("f")
|
|
end
|
|
end
|
|
|
|
-- internal global system
|
|
local GLOBAL = {}
|
|
local PROXY = {}
|
|
setmetatable(GLOBAL,{
|
|
__index = function(t,k)
|
|
return PROXY[k]
|
|
end,
|
|
__newindex = function(t,k,v)
|
|
local v = v
|
|
PROXY[k] = v
|
|
multi.OnGUpdate:Fire(k,packData(v))
|
|
end
|
|
})
|
|
|
|
-- In case you are unable to use broadcasting this can be used to help connect to nodes
|
|
function multi:nodeManager(port)
|
|
if not port then
|
|
error("You must provide a port in order to host the node manager!")
|
|
end
|
|
local server = net:newTCPServer(port)
|
|
server.nodes = {}
|
|
server.timeouts = {}
|
|
server.OnNodeAdded = multi:newConnection()
|
|
server.OnNodeRemoved = multi:newConnection()
|
|
server.OnDataRecieved(function(server,data,cid,ip,port)
|
|
local cmd = data:sub(1,1)
|
|
if cmd == "R" then
|
|
multi:newThread("Node Client Manager",function(loop)
|
|
while true do
|
|
if server.timeouts[cid]==true then
|
|
server.OnNodeRemoved:Fire(server.nodes[cid])
|
|
server.nodes[cid] = nil
|
|
server.timeouts[cid] = nil
|
|
thread.kill()
|
|
else
|
|
server.timeouts[cid] = true
|
|
server:send(cid,"ping")
|
|
end
|
|
thread.sleep(1)
|
|
end
|
|
end)
|
|
server.nodes[cid]=data:sub(2,-1)
|
|
server.OnNodeAdded:Fire(server.nodes[cid])
|
|
elseif cmd == "G" then
|
|
server.OnNodeAdded(function(node)
|
|
server:send(cid,node)
|
|
end)
|
|
server.OnNodeRemoved(function(node)
|
|
server:send(cid,"R"..node:match("(.-)|"))
|
|
end)
|
|
for i,v in pairs(server.nodes) do
|
|
server:send(cid,v)
|
|
end
|
|
elseif cmd == "P" then
|
|
server.timeouts[cid] = nil
|
|
end
|
|
end)
|
|
end
|
|
-- The main driving force of the network manager: Nodes
|
|
function multi:newNode(settings)
|
|
multi:enableLoadDetection()
|
|
settings = settings or {}
|
|
-- Here we have to use the net library to broadcast our node across the network
|
|
math.randomseed(os.time())
|
|
local name = settings.name or multi.randomString(8)
|
|
local node = {}
|
|
node.name = name
|
|
multi.OnError(function(i,error)
|
|
node.OnError:Fire(node,error,node.server)
|
|
end)
|
|
node.server = net:newUDPServer(0) -- hosts the node using the default port
|
|
_, node.port = node.server.udp:getsockname()
|
|
node.connections = net.ClientCache
|
|
node.queue = Queue:newQueue()
|
|
node.functions = bin.stream("RegisteredFunctions.dat",false)
|
|
node.hasFuncs = {}
|
|
node.OnError = multi:newConnection()
|
|
node.OnError(function(node,err,master)
|
|
multi.print("ERROR",err,node.name)
|
|
local temp = bin.new()
|
|
temp:addBlock(#node.name,2)
|
|
temp:addBlock(node.name)
|
|
temp:addBlock(#err,2)
|
|
temp:addBlock(err)
|
|
for i,v in pairs(node.connections) do
|
|
multi.print(i)
|
|
v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3])
|
|
end
|
|
end)
|
|
if settings.managerDetails then
|
|
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
|
|
if not c then
|
|
multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false
|
|
else
|
|
c.OnDataRecieved(function(self,data)
|
|
if data == "ping" then
|
|
self:send("P")
|
|
end
|
|
end)
|
|
c:send("RNODE_"..name.."|"..net.getLocalIP().."|"..node.port)
|
|
end
|
|
end
|
|
if not settings.preload then
|
|
if node.functions:getSize()~=0 then
|
|
multi.print("We have function(s) to preload!")
|
|
local len = node.functions:getBlock("n",1)
|
|
local name,func
|
|
while len do
|
|
name = node.functions:getBlock("s",len)
|
|
len = node.functions:getBlock("n",2)
|
|
func = node.functions:getBlock("s",len)
|
|
len = node.functions:read(1)
|
|
_G[name]=resolveData(func)
|
|
node.hasFuncs[name]=true
|
|
if not len then break end
|
|
len = byte(len)
|
|
end
|
|
end
|
|
end
|
|
function node:pushTo(name,data)
|
|
node:sendTo(name,char(CMD_QUEUE)..packData(data))
|
|
end
|
|
function node:peek()
|
|
return node.queue:peek()
|
|
end
|
|
function node:pop()
|
|
return node.queue:pop()
|
|
end
|
|
function node:getConsole()
|
|
local c = {}
|
|
local conn = node.connections
|
|
function c.print(...)
|
|
local data = char(CMD_CONSOLE)..packData({...})
|
|
for i,v in pairs(conn) do
|
|
--print(i)
|
|
v[1]:send(v[2],data,v[3])
|
|
end
|
|
end
|
|
-- function c:printTo()
|
|
|
|
-- end
|
|
return c
|
|
end
|
|
node.loadRate=1
|
|
-- Lets tell the network we are alive!
|
|
node.server.OnDataRecieved(function(server,data,cid,ip,port)
|
|
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
|
local dat = data:sub(2,-1) -- the data that you want to read
|
|
if cmd == CMD_PING then
|
|
server:send(ip,char(CMD_PONG),port)
|
|
elseif cmd == CMD_QUEUE then
|
|
node.queue:push(resolveData(dat))
|
|
elseif cmd == CMD_REG then
|
|
if not settings.allowRemoteRegistering then
|
|
multi.print(ip..": has attempted to register a function when it is currently not allowed!")
|
|
return
|
|
end
|
|
local temp = bin.new(dat)
|
|
local len = temp:getBlock("n",1)
|
|
local name = temp:getBlock("s",len)
|
|
if node.hasFuncs[name] then
|
|
multi.print("Function already preloaded onto the node!")
|
|
return
|
|
end
|
|
len = temp:getBlock("n",2)
|
|
local func = temp:getBlock("s",len)
|
|
_G[name]=resolveData(func)
|
|
node.functions:addBlock(dat)
|
|
elseif cmd == CMD_CALL then
|
|
local temp = bin.new(dat)
|
|
local len = temp:getBlock("n",1)
|
|
local name = temp:getBlock("s",len)
|
|
len = temp:getBlock("n",4)
|
|
local args = temp:getBlock("s",len)
|
|
_G[name](unpack(resolveData(args)))
|
|
elseif cmd == CMD_TASK then
|
|
local holder = bin.new(dat)
|
|
local len = holder:getBlock("n",4)
|
|
local args = holder:getBlock("s",len)
|
|
local len2 = holder:getBlock("n",4)
|
|
local func = holder:getBlock("s",len2)
|
|
args = resolveData(args)
|
|
func = resolveData(func)
|
|
status, err = pcall(func,node,unpack(args))
|
|
if not status then
|
|
node.OnError:Fire(node,err,server)
|
|
end
|
|
elseif cmd == CMD_INITNODE then
|
|
multi.print("Connected with another node!")
|
|
node.connections[dat]={server,ip,port}
|
|
multi.OnGUpdate(function(k,v)
|
|
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
|
|
end)-- set this up
|
|
elseif cmd == CMD_INITMASTER then
|
|
multi.print("Connected to the master!",dat)
|
|
node.connections[dat]={server,ip,port}
|
|
multi.OnGUpdate(function(k,v)
|
|
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
|
|
end)-- set this up
|
|
multi:newTLoop(function()
|
|
server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port)
|
|
end,node.loadRate)
|
|
server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port)
|
|
server:send(ip,char(CMD_INITNODE)..node.name,port)
|
|
elseif cmd == CMD_GLOBAL then
|
|
local k,v = dat:match("(.-)|(.+)")
|
|
PROXY[k]=resolveData(v)
|
|
end
|
|
end)
|
|
function node:sendTo(name,data)
|
|
local conn = node.connections[name]
|
|
conn[1]:send(conn[2],data,conn[3])
|
|
end
|
|
if not settings.noBroadCast then
|
|
node.server:broadcast("NODE_"..name)
|
|
end
|
|
return node
|
|
end
|
|
|
|
-- Masters
|
|
function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code.
|
|
local master = {}
|
|
local settings = settings or {}
|
|
master.name = settings.name or multi.randomString(8)
|
|
local name = master.name
|
|
master.conn = multi:newConnection()
|
|
master.conn2 = multi:newConnection()
|
|
master.OnFirstNodeConnected = multi:newConnection()
|
|
master.OnNodeConnected = multi:newConnection()
|
|
master.OnError = multi:newConnection()
|
|
master.queue = Queue:newQueue()
|
|
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
|
|
master.loads = {}
|
|
master.timeouts = {}
|
|
master.trigger = multi:newFunction(function(self,node)
|
|
master.OnFirstNodeConnected:Fire(node)
|
|
self:Pause()
|
|
end)
|
|
if settings.managerDetails then
|
|
local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
|
|
if not client then
|
|
multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false
|
|
else
|
|
client.OnDataRecieved(function(client,data)
|
|
local cmd = data:sub(1,1)
|
|
if cmd == "N" then
|
|
print(data)
|
|
local name,ip,port = data:match("(.-)|(.-)|(.+)")
|
|
local c = net:newUDPClient(ip,port)
|
|
net.OnCastedClientInfo:Fire(c,name,ip,port)master.connections[name]=c
|
|
elseif cmd == "R" then
|
|
local name = data:sub(2,-1)
|
|
master.connections[name]=nil
|
|
end
|
|
end)
|
|
client:send("G") -- init your connection as a master
|
|
end
|
|
end
|
|
function master:doToAll(func)
|
|
for i,v in pairs(master.connections) do
|
|
func(i,v)
|
|
end
|
|
end
|
|
function master:register(name,node,func)
|
|
if not node then
|
|
error("You must specify a node to execute a command on!")
|
|
end
|
|
local temp = bin.new()
|
|
local fData = packData(func)
|
|
temp:addBlock(CMD_REG,1)
|
|
temp:addBlock(#name,1)
|
|
temp:addBlock(name,#name)
|
|
temp:addBlock(#fData,2)
|
|
temp:addBlock(fData,#fData)
|
|
master:sendTo(node,temp.data)
|
|
end
|
|
function master:execute(name,node,...)
|
|
if not node then
|
|
error("You must specify a node to execute a command on!")
|
|
end
|
|
if not name then
|
|
error("You must specify a function name to call on the node!")
|
|
end
|
|
local args = packData{...}
|
|
local name = name
|
|
local node = node
|
|
local temp, len, data
|
|
temp = bin.new()
|
|
temp:addBlock(CMD_CALL,1)
|
|
temp:addBlock(#name,1)
|
|
temp:addBlock(name,#name)
|
|
temp:addBlock(#args,4)
|
|
temp:addBlock(args,#args)
|
|
master:sendTo(node,temp.data)
|
|
end
|
|
function master:pushTo(name,data)
|
|
master:sendTo(name,char(CMD_QUEUE)..packData(data))
|
|
end
|
|
function master:peek()
|
|
return self.queue:peek()
|
|
end
|
|
function master:pop()
|
|
return self.queue:pop()
|
|
end
|
|
function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
|
|
local fData = packData(func)
|
|
local tab = {...}
|
|
local aData = ""
|
|
if #tab~=o then
|
|
aData = (packData{...})
|
|
else
|
|
aData = (packData{"NO","ARGS"})
|
|
end
|
|
local temp = bin.new()
|
|
temp:addBlock(#aData,4)
|
|
local len = temp.data
|
|
local temp2 = bin.new()
|
|
temp2:addBlock(#fData,4)
|
|
local len2 = temp2.data
|
|
if not name then
|
|
local name = self:getFreeNode()
|
|
if not name then
|
|
name = self:getRandomNode()
|
|
end
|
|
if name==nil then
|
|
multi:newEvent(function() return name~=nil end):OnEvent(function(evnt)
|
|
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
|
|
evnt:Destroy()
|
|
end):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self)
|
|
self:Destroy()
|
|
end)
|
|
else
|
|
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
|
|
end
|
|
else
|
|
local name = "NODE_"..name
|
|
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
|
|
end
|
|
end
|
|
function master:sendTo(name,data)
|
|
if name:sub(1,5)~="NODE_" then
|
|
name = "NODE_"..name
|
|
end
|
|
if self.connections[name]==nil then
|
|
multi:newEvent(function() return self.connections[name]~=nil end):OnEvent(function(evnt)
|
|
self.connections[name]:send(data)
|
|
evnt:Destroy()
|
|
end):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self)
|
|
self:Destroy()
|
|
end)
|
|
else
|
|
self.connections[name]:send(data)
|
|
end
|
|
end
|
|
function master:getFreeNode()
|
|
local count = 0
|
|
local min = math.huge
|
|
local refO
|
|
for i,v in pairs(master.loads) do
|
|
if v<min then
|
|
min = v
|
|
refO = i
|
|
end
|
|
end
|
|
return refO
|
|
end
|
|
function master:getRandomNode()
|
|
local list = {}
|
|
for i,v in pairs(master.connections) do
|
|
list[#list+1]=i:sub(6,-1)
|
|
end
|
|
return list[math.random(1,#list)]
|
|
end
|
|
net.OnCastedClientInfo(function(client,name,ip,port)
|
|
multi.OnGUpdate(function(k,v)
|
|
client:send(table.concat{char(CMD_GLOBAL),k,"|",v})
|
|
end)
|
|
local nodename
|
|
for i,v in pairs(master.connections) do
|
|
nodename = i
|
|
end
|
|
client.OnClientReady(function()
|
|
client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect
|
|
if not settings.managerDetails then
|
|
multi:newThread("Node Data Link Controller",function(loop)
|
|
while true do
|
|
if master.timeouts[name]==true then
|
|
master.timeouts[name] = nil
|
|
master.connections[name] = nil
|
|
thread.kill()
|
|
else
|
|
master.timeouts[name] = true
|
|
master:sendTo(name,char(CMD_PING))
|
|
end
|
|
thread.sleep(1)
|
|
end
|
|
end)
|
|
end
|
|
client.name = name
|
|
client.OnDataRecieved(function(client,data)
|
|
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
|
local dat = data:sub(2,-1) -- the data that you want to read
|
|
master.trigger(nodename)
|
|
if cmd == CMD_ERROR then
|
|
local temp = bin.new(dat)
|
|
local len = temp:getBlock("n",2)
|
|
local node = temp:getBlock("s",len)
|
|
len = temp:getBlock("n",2)
|
|
local err = temp:getBlock("s",len)
|
|
master.OnError:Fire(name,err)
|
|
elseif cmd == CMD_CONSOLE then
|
|
print(unpack(resolveData(dat)))
|
|
elseif cmd == CMD_PONG then
|
|
master.timeouts[client.name] = nil
|
|
elseif cmd == CMD_INITNODE then
|
|
master.OnNodeConnected:Fire(dat)
|
|
elseif cmd == CMD_QUEUE then
|
|
master.queue:push(resolveData(dat))
|
|
elseif cmd == CMD_GLOBAL then
|
|
local k,v = dat:match("(.-)|(.+)")
|
|
PROXY[k]=resolveData(v)
|
|
elseif cmd == CMD_LOAD then
|
|
local name,load = dat:match("(.-)|(.+)")
|
|
master.loads[name]=tonumber(load)
|
|
end
|
|
end)
|
|
end)
|
|
end)
|
|
if not settings.noBroadCast then
|
|
net:newCastedClients("NODE_(.+)") -- Searches for nodes and connects to them, the master.clients table will contain them by name
|
|
end
|
|
return master
|
|
end
|
|
-- The init function that gets returned
|
|
multi.print("Integrated Network Parallelism")
|
|
return {init = function()
|
|
return GLOBAL
|
|
end}
|