working on new network manager

New features coming soon.
Redoing networking management
system threads are mostly done, whats left are some threaded objects like systemthreadedconnection
This commit is contained in:
Ryan Ward 2020-01-05 13:44:52 -05:00
parent 19de228834
commit b3b9e12fb8
11 changed files with 387 additions and 679 deletions

View File

@ -1,21 +1,22 @@
# Changes
[TOC]
Update 14.0.0 Consistency, Stability and Additions
Update 14.0.0 Consistency, Additions and Stability
-------------
Added:
- multi.init() -- Initlizes the library! Must be called for multiple files to have the same handle. Example below
- thread.holdFor(NUMBER sec, FUNCTION condition) -- Works like hold, but timesout when a certain amount of time has passed!
- multi.hold(function or number) -- It's back and uses context switching. Normal multi objs without threading will all be halted where threads will still run. If within a thread continue using thread.hold() and thread.sleep()
- multi.hold(function or number) -- It's back and better than ever! Normal multi objs without threading will all be halted where threads will still run. If within a thread continue using thread.hold() and thread.sleep()
- thread.holdWithin(NUMBER; cycles,FUNCTION; condition) -- Holds until the condition is met! If the number of cycles passed is equal to cycles, hold will return a timeout error
- multi.holdFor(NUMBER; seconds,FUNCTION; condition) -- Follows the same rules as multi.hold while mimicing the functionality of thread.holdWithin
**Note:** when hold has a timeout the first argument will return nil and the second atgument will be TIMEOUT, if not timed out hold will return the values from the conditions
- thread objects got an addition!
-- tobj.OnDeath(self,status,returns[...]) -- This is a connection that passes a reference to the self, the status, whether or not the thread ended or was killed, and the returns of the thread.
-- tobj.OnError(self,error) -- returns a reference to self and the error as a string
-- **Limitations:** only 7 returns are possible! This was done because creating and destroying table objects are slow. Instead I capture the return values from coroutine.resume into local variables and only allowed it to collect 6 max.
- thread objects now have hooks that allow you to interact with it in more refined ways!
-- tObj.OnDeath(self,status,returns[...]) -- This is a connection that passes a reference to the self, the status, whether or not the thread ended or was killed, and the returns of the thread.
-- tObj.OnError(self,error) -- returns a reference to self and the error as a string
-- **Limitations:** only 7 returns are possible! This was done because creating and destroying table objects are slow. (The way the scheduler works this would happen every cycle and thats no good) Instead I capture the return values from coroutine.resume into local variables and only allowed it to collect 7 max.
- thread.run(function) -- Can only be used within a thread, creates another thread that can do work, but automatically returns whatever from the run function -- Use thread newfunctions for a more powerful version of thread.run()
- thread:newFunction(FUNCTION; func)
-- returns a function that gives you the option to wait or connect to the returns of the function.
-- func().wait() -- waits for the function to return
-- func().wait() -- waits for the function to return works both within a thread and outside of one
-- func().connect() -- connects to the function finishing
-- func() -- If your function does not return anything you dont have to use wait or connect at all and the function will return instantly. You could also use wait() to hold until the function does it thing
-- If the created function encounters an error, it will return nil, the error message!
@ -30,6 +31,20 @@ Added:
-- thread.Priority_Below_Normal
-- thread.Priority_Low
-- thread.Priority_Idle
- thread.hold() and multi.hold() now accept connections as an argument. See example below
```lua
package.path = "./?/init.lua;"..package.path
local multi, thread = require("multi"):init()
conn = multi:newConnection()
multi:newThread(function()
thread.hold(conn)
print("Connection Fired!!!")
end)
multi:newAlarm(3):OnRing(function()
conn:Fire()
end)
```
thread newFunction
```lua
@ -83,7 +98,7 @@ Removed:
- multi:newCustomObject() -- No real use
Changed:
- multi connections connect function can now chain connections
- Connections connect function can now chain connections
```lua
package.path = "./?/init.lua;"..package.path
local multi, thread = require("multi").init()
@ -117,7 +132,7 @@ local nGLOBAL, nTHREAD = require("multi.intergration.networkManager).inti()
Note: You can mix and match integrations together. You can create systemthreads within network threads, and you can also create cotoutine based threads within bothe network and system threads. This gives you quite a bit of flexibility to create something awesome.
Going forward:
- Do not expect anything new from the next update. I have a bunch of bugs to iron out. Most pertainint to network and system threading. Maybe
- If all goes well, the future will contain quality of code features. I'll keep an eye out for bugs
Update 13.1.0 Bug fixes and features added
-------------

View File

@ -1512,8 +1512,22 @@ function thread.waitFor(name)
return thread.get(name)
end
function multi.hold(func)
if thread.isThread() then
if type(func) == "function" or type(func) == "table" then
return thread.hold(func)
end
return thread.sleep(func)
end
local death = false
if type(func)=="function" then
if type(func)=="number" then
multi:newThread("Hold_func",function()
thread.sleep(func)
death = true
end)
while not death do
multi.scheduler:Act()
end
else
local rets
multi:newThread("Hold_func",function()
rets = {thread.hold(func)}
@ -1523,15 +1537,21 @@ function multi.hold(func)
multi.scheduler:Act()
end
return unpack(rets)
elseif type(func)=="number" then
multi:newThread("Hold_func",function()
thread.sleep(func)
death = true
end
end
function multi.holdFor(n,func)
local temp
multi:newThread(function()
thread.sleep(n)
temp = true
end)
while not death do
multi.scheduler:Act()
end
return multi.hold(function()
if func() then
return func()
elseif temp then
return multi.NIL, "TIMEOUT"
end
end)
end
function thread:newFunction(func)
local c = {Type = "tfunc"}
@ -1725,6 +1745,15 @@ function multi.initThreads(justThreads)
threads[i].__ready = false
ret = nil
elseif ret[1]=="_hold_" then
if type(ret[2])=="table" and ret[2].Type=='connector' then
local letsgo
ret[2](function(...) letsgo = {...} end)
ret[2] = function()
if letsgo then
return unpack(letsgo)
end
end
end
threads[i].func = ret[2]
threads[i].task = "hold"
threads[i].__ready = false
@ -1766,7 +1795,7 @@ function multi.initThreads(justThreads)
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i].task == "hold" then
elseif threads[i].task == "hold" then --GOHERE
t0,t1,t2,t3,t4,t5,t6 = threads[i].func()
if t0 then
if t0==multi.NIL then

View File

@ -1,624 +0,0 @@
--[[
MIT License
Copyright (c) 2020 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi, thread = require("multi"):init()
local net = require("net")
local bin = require("bin")
bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix
-- Commands that the master and node will respect, max of 256 commands
local CMD_ERROR = 0x00
local CMD_PING = 0x01
local CMD_PONG = 0x02
local CMD_QUEUE = 0x03
local CMD_TASK = 0x04
local CMD_INITNODE = 0x05
local CMD_INITMASTER = 0x06
local CMD_GLOBAL = 0x07
local CMD_LOAD = 0x08
local CMD_CALL = 0x09
local CMD_REG = 0x0A
local CMD_CONSOLE = 0x0B
local char = string.char
local byte = string.byte
-- Process to hold all of the networkManager's muilt objects
-- Helper for piecing commands
local function pieceCommand(cmd, ...)
local tab = {...}
table.insert(tab, 1, cmd)
return table.concat(tab)
end
-- Internal queue system for network queues
local Queue = {}
Queue.__index = Queue
function Queue:newQueue()
local c = {}
setmetatable(c, self)
return c
end
function Queue:push(data)
table.insert(self, data)
end
function Queue:raw_push(data) -- Internal usage only
table.insert(self, data)
end
function Queue:pop()
return table.remove(self, 1)
end
function Queue:peek()
return self[1]
end
local queues = {}
multi.OnNetQueue = multi:newConnection()
multi.OnGUpdate = multi:newConnection()
-- Managing the data that goes through the system
local function packData(data)
-- returns the data that was sterilized
local dat = bin.new()
dat:addBlock(#type(data), 1)
dat:addBlock(type(data)) -- The type is first
if type(data) == "table" then
dat:addBlock(data, nil, "t")
elseif type(data) == "userdata" then
error("Cannot sterilize userdata!")
elseif type(data) == "number" then
dat:addBlock(data, nil, "d")
elseif type(data) == "string" then
dat:addBlock(#data, 4)
dat:addBlock(data, nil, "s")
elseif type(data) == "boolean" then
dat:addBlock(data, 1)
elseif type(data) == "function" then
dat:addBlock(data, nil, "f")
end
return dat.data
end
local function resolveData(data)
-- returns the data that was sterilized
local dat = bin.new(data)
local tlen = dat:getBlock("n", 1)
local tp = dat:getBlock("s", tlen)
if tp == "table" then
return dat:getBlock("t")
elseif tp == "number" then
return dat:getBlock("d")
elseif tp == "string" then
local num = dat:getBlock("n", 4)
return dat:getBlock("s", num)
elseif tp == "boolean" then
return dat:getBlock("b", 1)
elseif tp == "function" then
return dat:getBlock("f")
end
end
-- internal global system
local GLOBAL = {}
local PROXY = {}
setmetatable(
GLOBAL,
{
__index = function(t, k)
return PROXY[k]
end,
__newindex = function(t, k, v)
local v = v
PROXY[k] = v
multi.OnGUpdate:Fire(k, packData(v))
end
}
)
-- In case you are unable to use broadcasting this can be used to help connect to nodes
function multi:nodeManager(port)
if not port then
error("You must provide a port in order to host the node manager!")
end
local server = net:newTCPServer(port)
server.nodes = {}
server.timeouts = {}
server.OnNodeAdded = multi:newConnection()
server.OnNodeRemoved = multi:newConnection()
server.OnDataRecieved(
function(server, data, cid, ip, port)
local cmd = data:sub(1, 1)
if cmd == "R" then
multi:newThread(
"Node Client Manager",
function(loop)
while true do
if server.timeouts[cid] == true then
server.OnNodeRemoved:Fire(server.nodes[cid])
server.nodes[cid] = nil
server.timeouts[cid] = nil
thread.kill()
else
server.timeouts[cid] = true
server:send(cid, "ping")
end
thread.sleep(1)
end
end
)
server.nodes[cid] = data:sub(2, -1)
server.OnNodeAdded:Fire(server.nodes[cid])
elseif cmd == "G" then
server.OnNodeAdded(
function(node)
server:send(cid, node)
end
)
server.OnNodeRemoved(
function(node)
server:send(cid, "R" .. node:match("(.-)|"))
end
)
for i, v in pairs(server.nodes) do
server:send(cid, v)
end
elseif cmd == "P" then
server.timeouts[cid] = nil
end
end
)
end
-- The main driving force of the network manager: Nodes
function multi:newNode(settings)
multi:enableLoadDetection()
settings = settings or {}
-- Here we have to use the net library to broadcast our node across the network
math.randomseed(os.time())
local name = settings.name or multi.randomString(8)
local node = {}
node.name = name
multi.OnError(
function(i, error)
node.OnError:Fire(node, error, node.server)
end
)
node.server = net:newUDPServer(0) -- hosts the node using the default port
_, node.port = node.server.udp:getsockname()
node.connections = net.ClientCache
node.queue = Queue:newQueue()
node.functions = bin.stream("RegisteredFunctions.dat", false)
node.hasFuncs = {}
node.OnError = multi:newConnection()
node.OnError(
function(node, err, master)
multi.print("ERROR", err, node.name)
local temp = bin.new()
temp:addBlock(#node.name, 2)
temp:addBlock(node.name)
temp:addBlock(#err, 2)
temp:addBlock(err)
for i, v in pairs(node.connections) do
multi.print(i)
v[1]:send(v[2], char(CMD_ERROR) .. temp.data, v[3])
end
end
)
if settings.managerDetails then
local c = net:newTCPClient(settings.managerDetails[1], settings.managerDetails[2])
if not c then
multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!")
settings.noBroadCast = false
else
c.OnDataRecieved(
function(self, data)
if data == "ping" then
self:send("P")
end
end
)
c:send("RNODE_" .. name .. "|" .. net.getLocalIP() .. "|" .. node.port)
end
end
if not settings.preload then
if node.functions:getSize() ~= 0 then
multi.print("We have function(s) to preload!")
local len = node.functions:getBlock("n", 1)
local name, func
while len do
name = node.functions:getBlock("s", len)
len = node.functions:getBlock("n", 2)
func = node.functions:getBlock("s", len)
len = node.functions:read(1)
_G[name] = resolveData(func)
node.hasFuncs[name] = true
if not len then
break
end
len = byte(len)
end
end
end
function node:pushTo(name, data)
node:sendTo(name, char(CMD_QUEUE) .. packData(data))
end
function node:peek()
return node.queue:peek()
end
function node:pop()
return node.queue:pop()
end
function node:getConsole()
local c = {}
local conn = node.connections
function c.print(...)
local data = char(CMD_CONSOLE) .. packData({...})
for i, v in pairs(conn) do
--print(i)
v[1]:send(v[2], data, v[3])
end
end
-- function c:printTo()
-- end
return c
end
node.loadRate = 1
-- Lets tell the network we are alive!
node.server.OnDataRecieved(
function(server, data, cid, ip, port)
local cmd = byte(data:sub(1, 1)) -- the first byte is the command
local dat = data:sub(2, -1) -- the data that you want to read
if cmd == CMD_PING then
server:send(ip, char(CMD_PONG), port)
elseif cmd == CMD_QUEUE then
node.queue:push(resolveData(dat))
elseif cmd == CMD_REG then
if not settings.allowRemoteRegistering then
multi.print(ip .. ": has attempted to register a function when it is currently not allowed!")
return
end
local temp = bin.new(dat)
local len = temp:getBlock("n", 1)
local name = temp:getBlock("s", len)
if node.hasFuncs[name] then
multi.print("Function already preloaded onto the node!")
return
end
len = temp:getBlock("n", 2)
local func = temp:getBlock("s", len)
_G[name] = resolveData(func)
node.functions:addBlock(dat)
elseif cmd == CMD_CALL then
local temp = bin.new(dat)
local len = temp:getBlock("n", 1)
local name = temp:getBlock("s", len)
len = temp:getBlock("n", 4)
local args = temp:getBlock("s", len)
_G[name](unpack(resolveData(args)))
elseif cmd == CMD_TASK then
local holder = bin.new(dat)
local len = holder:getBlock("n", 4)
local args = holder:getBlock("s", len)
local len2 = holder:getBlock("n", 4)
local func = holder:getBlock("s", len2)
args = resolveData(args)
func = resolveData(func)
status, err = pcall(func, node, unpack(args))
if not status then
node.OnError:Fire(node, err, server)
end
elseif cmd == CMD_INITNODE then
multi.print("Connected with another node!")
node.connections[dat] = {server, ip, port}
multi.OnGUpdate(
function(k, v)
server:send(ip, table.concat {char(CMD_GLOBAL), k, "|", v}, port)
end
)
-- set this up
elseif cmd == CMD_INITMASTER then
multi.print("Connected to the master!", dat)
node.connections[dat] = {server, ip, port}
multi.OnGUpdate(
function(k, v)
server:send(ip, table.concat {char(CMD_GLOBAL), k, "|", v}, port)
end
)
-- set this up
multi:newTLoop(
function()
server:send(ip, char(CMD_LOAD) .. node.name .. "|" .. multi:getLoad(), port)
end,
node.loadRate
)
server:send(ip, char(CMD_LOAD) .. node.name .. "|" .. multi:getLoad(), port)
server:send(ip, char(CMD_INITNODE) .. node.name, port)
elseif cmd == CMD_GLOBAL then
local k, v = dat:match("(.-)|(.+)")
PROXY[k] = resolveData(v)
end
end
)
function node:sendTo(name, data)
local conn = node.connections[name]
conn[1]:send(conn[2], data, conn[3])
end
if not settings.noBroadCast then
node.server:broadcast("NODE_" .. name)
end
return node
end
-- Masters
function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code.
local master = {}
local settings = settings or {}
master.name = settings.name or multi.randomString(8)
local name = master.name
master.conn = multi:newConnection()
master.conn2 = multi:newConnection()
master.OnFirstNodeConnected = multi:newConnection()
master.OnNodeConnected = multi:newConnection()
master.OnError = multi:newConnection()
master.queue = Queue:newQueue()
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
master.loads = {}
master.timeouts = {}
master.trigger =
multi:newFunction(
function(self, node)
master.OnFirstNodeConnected:Fire(node)
self:Pause()
end
)
if settings.managerDetails then
local client = net:newTCPClient(settings.managerDetails[1], settings.managerDetails[2])
if not client then
multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!")
settings.noBroadCast = false
else
client.OnDataRecieved(
function(client, data)
local cmd = data:sub(1, 1)
if cmd == "N" then
print(data)
local name, ip, port = data:match("(.-)|(.-)|(.+)")
local c = net:newUDPClient(ip, port)
net.OnCastedClientInfo:Fire(c, name, ip, port)
master.connections[name] = c
elseif cmd == "R" then
local name = data:sub(2, -1)
master.connections[name] = nil
end
end
)
client:send("G") -- init your connection as a master
end
end
function master:doToAll(func)
for i, v in pairs(master.connections) do
func(i, v)
end
end
function master:register(name, node, func)
if not node then
error("You must specify a node to execute a command on!")
end
local temp = bin.new()
local fData = packData(func)
temp:addBlock(CMD_REG, 1)
temp:addBlock(#name, 1)
temp:addBlock(name, #name)
temp:addBlock(#fData, 2)
temp:addBlock(fData, #fData)
master:sendTo(node, temp.data)
end
function master:execute(name, node, ...)
if not node then
error("You must specify a node to execute a command on!")
end
if not name then
error("You must specify a function name to call on the node!")
end
local args = packData {...}
local name = name
local node = node
local temp, len, data
temp = bin.new()
temp:addBlock(CMD_CALL, 1)
temp:addBlock(#name, 1)
temp:addBlock(name, #name)
temp:addBlock(#args, 4)
temp:addBlock(args, #args)
master:sendTo(node, temp.data)
end
function master:pushTo(name, data)
master:sendTo(name, char(CMD_QUEUE) .. packData(data))
end
function master:peek()
return self.queue:peek()
end
function master:pop()
return self.queue:pop()
end
function master:newNetworkThread(tname, func, name, ...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
local fData = packData(func)
local tab = {...}
local aData = ""
if #tab ~= o then
aData = (packData {...})
else
aData = (packData {"NO", "ARGS"})
end
local temp = bin.new()
temp:addBlock(#aData, 4)
local len = temp.data
local temp2 = bin.new()
temp2:addBlock(#fData, 4)
local len2 = temp2.data
if not name then
local name = self:getFreeNode()
if not name then
name = self:getRandomNode()
end
if name == nil then
multi:newEvent(
function()
return name ~= nil
end
):OnEvent(
function(evnt)
self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData)
evnt:Destroy()
end
):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(
function(self)
self:Destroy()
end
)
else
self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData)
end
else
local name = "NODE_" .. name
self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData)
end
end
function master:sendTo(name, data)
if name:sub(1, 5) ~= "NODE_" then
name = "NODE_" .. name
end
if self.connections[name] == nil then
multi:newEvent(
function()
return self.connections[name] ~= nil
end
):OnEvent(
function(evnt)
self.connections[name]:send(data)
evnt:Destroy()
end
):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(
function(self)
self:Destroy()
end
)
else
self.connections[name]:send(data)
end
end
function master:getFreeNode()
local count = 0
local min = math.huge
local refO
for i, v in pairs(master.loads) do
if v < min then
min = v
refO = i
end
end
return refO
end
function master:getRandomNode()
local list = {}
for i, v in pairs(master.connections) do
list[#list + 1] = i:sub(6, -1)
end
return list[math.random(1, #list)]
end
net.OnCastedClientInfo(
function(client, name, ip, port)
multi.OnGUpdate(
function(k, v)
client:send(table.concat {char(CMD_GLOBAL), k, "|", v})
end
)
local nodename
for i, v in pairs(master.connections) do
nodename = i
end
client.OnClientReady(
function()
client:send(char(CMD_INITMASTER) .. master.name) -- Tell the node that you are a master trying to connect
if not settings.managerDetails then
multi:newThread(
"Node Data Link Controller",
function(loop)
while true do
if master.timeouts[name] == true then
master.timeouts[name] = nil
master.connections[name] = nil
thread.kill()
else
master.timeouts[name] = true
master:sendTo(name, char(CMD_PING))
end
thread.sleep(1)
end
end
)
end
client.name = name
client.OnDataRecieved(
function(client, data)
local cmd = byte(data:sub(1, 1)) -- the first byte is the command
local dat = data:sub(2, -1) -- the data that you want to read
master.trigger(nodename)
if cmd == CMD_ERROR then
local temp = bin.new(dat)
local len = temp:getBlock("n", 2)
local node = temp:getBlock("s", len)
len = temp:getBlock("n", 2)
local err = temp:getBlock("s", len)
master.OnError:Fire(name, err)
elseif cmd == CMD_CONSOLE then
print(unpack(resolveData(dat)))
elseif cmd == CMD_PONG then
master.timeouts[client.name] = nil
elseif cmd == CMD_INITNODE then
master.OnNodeConnected:Fire(dat)
elseif cmd == CMD_QUEUE then
master.queue:push(resolveData(dat))
elseif cmd == CMD_GLOBAL then
local k, v = dat:match("(.-)|(.+)")
PROXY[k] = resolveData(v)
elseif cmd == CMD_LOAD then
local name, load = dat:match("(.-)|(.+)")
master.loads[name] = tonumber(load)
end
end
)
end
)
end
)
if not settings.noBroadCast then
net:newCastedClients("NODE_(.+)") -- Searches for nodes and connects to them, the master.clients table will contain them by name
end
return master
end
-- The init function that gets returned
multi.print("Integrated Network Parallelism")
return {
init = function()
return GLOBAL
end
}

View File

@ -0,0 +1,11 @@
_G["__CHANNEL__"] = {}
local channel = {}
channel.__index = channel
-- Creates/Gets a channel of name
function channel:newChannel(name)
local chan = _G["__CHANNEL__"]
if chan then
end
end

View File

@ -0,0 +1,23 @@
local multi, thread = require("multi"):init()
local cmd = require("multi.integration.networkManager.cmds")
local node = require("multi.integration.networkManager.node")
local net = require("net")
local bin = require("bin")
local child = {}
child.__index = child
function multi:newChildNode(cd)
local c = {}
setmetatable(c,child)
local name
if cd then
if cd.name then
name = cd.name
end
c.node = node:new(cd.nodePort or cmd.defaultPort,nil,name)
if cd.managerHost then
cd.managerPort = cd.managerPort or cmd.defaultManagerPort
c.node:registerWithManager(cd.managerHost,cd.managerPort)
end
end
return c
end

View File

@ -0,0 +1,11 @@
return function(self,data)
local cmd,data = data:match("!(.-)!(.*)")
if cmd == "PONG" then
--
elseif cmd == "CHANNEL" then
--
elseif cmd == "RETURNS" then
local rets = bin.new(data):getBlock("t")
self.node.master.OnDataReturned:Fire(rets)
end
end

View File

@ -1,15 +1,19 @@
local cmds = {
ERROR = 0x00,
PING = 0x01,
PONG = 0x02,
QUEUE = 0x03,
TASK = 0x04,
INITNODE = 0x05,
INITMASTER = 0x06,
GLOBAL = 0x07,
LOAD = 0x08,
CALL = 0x09,
REG = 0x0A,
CONSOLE = 0x0B,
defaultManagerPort = 0XDE2,
defaultWait = 0X002,
defaultPort = 0X000, -- We will let the OS assign us one
standardSkip = 0X018,
ERROR = 0X000,
PING = 0X001,
PONG = 0X002,
QUEUE = 0X003,
TASK = 0X004,
INITNODE = 0X005,
INITMASTER = 0X006,
GLOBAL = 0X007,
LOAD = 0X008,
CALL = 0X009,
REG = 0X00A,
CONSOLE = 0X00B,
}
return cmds

View File

@ -23,48 +23,141 @@ SOFTWARE.
]]
local multi, thread = require("multi"):init()
local cmd = require("multi.integration.networkManager.cmds")
local node = require("multi.integration.networkManager.node")
local net = require("net")
local bin = require("bin")
local master = {}
master.__index = master
function multi:newMasterNode(connectionDetails)
local c = {}
c.OnDataRecieved = multi:newConnection()
c.defaultNode = ""
c.nodes = {}
if connectionDetails then
-- We do what we do
else
-- Lets make assumptions
function master:addNode(ip,port)
return node:new(ip,port)
end
setmetatable(c, master)
return c
function master:getNodesFromBroadcast()
net:newCastedClients("NODE_.+")
net.OnCastedClientInfo(function(client, n, ip, port)
self.nodes[n] = node:new(client)
end)
end
function master:getNodesFromManager(ip,port)
local mn = self.nodes
if not self.manager then
self.manager = net:newTCPClient(ip,port)
if not self.manager then
error("Unable to connect to the node Manager! Is it running? Perhaps the hostname or port is incorrect!")
end
end
self.manager.OnDataRecieved(function(self,data,client)
local cmd = data:match("!(.+)!")
data = data:gsub("!"..cmd.."!","")
if cmd == "NODE" then
local n,h,p = data:match("(.-)|(.-)|(.+)")
mn[n] = node:new(h,tonumber(p))
end
end)
self.manager:send("!NODES!")
end
function master:setDefaultNode(nodeName)
-- Verify Node is active
if self:nodeExists(nodeName) then
self.defaultNode = nodeName
end
end
function master:newNetworkThread(nodeName,func)
--
function master:getRandomNode()
local t = {}
for i,v in pairs(self.nodes) do t[#t+1] = i end
return t[math.random(1,#t)]
end
local netID = 0
function master:newNetworkThread(nodeName,func,...)
local args = {...}
local dat = bin.new()
local ret
local nID = netID
local conn = multi:newConnection()
multi:newThread(function()
dat:addBlock{
args = args,
func = func,
id = netID
}
netID = netID + 1
if type(nodeName) == "function" then
func = nodeName
nodeName = self.defaultNode or self:getRandomNode()
if not func then
error("You must provide a function!")
end
end
self:sendTo(nodeName,"!N_THREAD!"..dat.data)
self.OnDataReturned(function(rets)
if rets.ID == nID then
conn:Fire(unpack(rets.rets))
end
end)
end)
return conn
end
function master:newNetworkChannel(nodeName)
--
end
function master:sendTo(nodeName,data)
if self:nodeExists(nodeName):wait() then
print("It exists!")
self:queue("send",nodeName,data)
end
end
master.nodeExists = thread:newFunction(function(self,nodeName)
function master:demandNodeExistance(nodeName)
if self.nodes[nodeName] then
local wait = nodes[nodeName]:ping()
local bool = thread.hold(function()
return wait()
end)
return bool
return multi.hold(self.nodes[nodeName]:ping().pong)
else
return false
end
end
function master:queue(c,...)
table.insert(self._queue,{c,{...}})
end
function multi:newMasterNode(cd)
local c = {}
setmetatable(c, master)
c.OnNodeDiscovered = multi:newConnection()
c.OnNodeRemoved = multi:newConnection()
c.OnDataRecieved = multi:newConnection()
c.OnDataReturned = multi:newConnection()
c.defaultNode = ""
c.nodes = {}
setmetatable(c.nodes,
{__newindex = function(t,k,v)
rawset(t,k,v)
v.master = c
c.OnNodeDiscovered:Fire(k,v)
end})
c._queue = {}
if cd then
if cd.nodeHost then
cd.nodePort = cd.nodePort or cmd.defaultPort
local n,no = c:addNode(cd.nodeHost,cd.nodePort)
if n then
c.nodes[n] = no
end
elseif cd.managerHost then
cd.managerPort = cd.managerPort or cmd.defaultManagerPort
c:getNodesFromManager(cd.managerHost,cd.managerPort)
else
c:getNodesFromBroadcast()
end
else
c:getNodesFromBroadcast()
end
multi:newThread("CMDQueueProcessor",function()
while true do
thread.skip(128)
local data = table.remove(c._queue,1)
if data then
local cmd = data[1]
if cmd == "send" then
local nodeName = data[2][1]
local dat = data[2][2]
c.nodes[nodeName]:send(dat)
end
end
end
end):OnError(function(...)
print(...)
end)
return c
end

View File

@ -0,0 +1,104 @@
local net = require("net")
local cmd = require("multi.integration.networkManager.cmds")
local multi,thread = require("multi"):init()
local node = {}
node.__index = node
local rand = {}
for i = 65,90 do
rand[#rand+1] = string.char(i)
end
local function randName(n)
local str = {}
for i=1,(n or 10) do
str[#str+1] = rand[math.random(1,#rand)]
end
return table.concat(str)
end
local getNames = thread:newFunction(function(names)
local listen = socket.udp() -- make a new socket
listen:setsockname(net.getLocalIP(), 11111)
listen:settimeout(0)
local data, ip, port = listen:receivefrom()
thread.holdWithin(1,function()
if data then
local n, tp, ip, port = data:match("(%S-)|(%S-)|(%S-):(%d+)")
if n then
names[n]=true
end
end
end)
return multi.NIL
end)
local function setName(ref,name)
if name then
ref.name = "NODE_"..name
ref.connection:broadcast(name)
return
end
local names = {}
getNames(names).wait() -- Prevents duplicate names from spawning!
local name = randName()
while names["NODE_"..name] do
name = randName()
end
ref.name = "NODE_"..name
ref.connection:broadcast(ref.name)
end
node.ServerCode = require("multi.integration.networkManager.serverSide")
node.ClientCode = require("multi.integration.networkManager.clientSide")
function node.random()
return randName(12)
end
function node:registerWithManager(ip,port)
if self.type ~= "server" then return end
if not self.manager then
self.manager = net:newTCPClient(ip,port)
if not self.manager then
error("Unable to connect to the node Manager! Is it running? Perhaps the hostname or port is incorrect!")
end
end
thread:newFunction(function()
thread.hold(function() return self.name end)
self.manager:send("!REG_NODE!"..self.name.."|"..net.getLocalIP().."|"..self.connection.port)
end)()
end
function node:new(host,port,name)
local c = {}
c.links = {}
setmetatable(c,node)
if type(host)=="number" or type(host)=="nil" then
c.connection = net:newTCPServer(host or cmd.defaultPort)
c.connection:enableBinaryMode()
c.type = "server"
c.connection.node = c
c.connection.OnDataRecieved(self.ServerCode)
setName(c)
elseif type(host)=="table" and host.Type == "tcp" then
c.connection = host
c.connection:enableBinaryMode()
c.type = "client"
c.connection.node = c
c.connection.OnDataRecieved(self.ClientCode)
c.name = "MASTER_NODE"
elseif type(host) == "string" and type(port)=="number" then
c.connection = net:newTCPClient(host, port)
c.connection:enableBinaryMode()
c.type = "client"
c.connection.node = c
c.connection.OnDataRecieved(self.ClientCode)
c.name = "MASTER_NODE"
else
error("Invalid arguments!")
end
return c
end
function node:ping()
if self.type ~= "client" then return end
self:send("!PING!")
return {pong=self.connection.OnDataRecieved}
end
function node:send(data)
if self.type ~= "client" then return end
self.connection:send(data)
end
return node

View File

@ -0,0 +1,25 @@
local multi, thread = require("multi"):init()
local cmd = require("multi.integration.networkManager.cmds")
local net = require("net")
local bin = require("bin")
local nodes = { -- Testing stuff
}
function multi:newNodeManager(port)
print("Running node manager on port: "..(port or cmd.defaultManagerPort))
local server = net:newTCPServer(port or cmd.defaultManagerPort)
server.OnDataRecieved(function(serv, data, client)
local cmd = data:match("!(.+)!")
data = data:gsub("!"..cmd.."!","")
if cmd == "NODES" then
for i,v in ipairs(nodes) do
-- Sample data
serv:send(client, "!NODE!".. v[1].."|"..v[2].."|"..v[3])
end
elseif cmd == "REG_NODE" then
local name, ip, port = data:match("(.-)|(.-)|(.+)")
table.insert(nodes,{name,ip,port})
print("Registering Node:",name, ip, port)
end
end)
end

View File

@ -0,0 +1,17 @@
local bin, bits = require("bin").init()
return function(self,data,client)
local cmd,data = data:match("!(.-)!(.*)")
--print("SERVER",cmd,data)
if cmd == "PING" then
self:send(client,"!PONG!")
elseif cmd == "N_THREAD" then
local dat = bin.new(data)
local t = dat:getBlock("t")
local ret = bin.new()
ret:addBlock{ID = t.id,rets = {t.func(unpack(t.args))}}
self:send(client,"!RETURNS!"..ret:getData())
elseif cmd == "CHANNEL" then
local dat = bin.new(data):getBlock("t")
end
end