queues are done

This commit is contained in:
Ryan Ward 2018-06-26 22:36:33 -04:00
parent 979a6d8674
commit 3a46c4ae44
4 changed files with 120 additions and 58 deletions

View File

@ -5,15 +5,45 @@ Update: 2.0.0 Big update
**Note:** After doing some testing, I have noticed that using multi-objects are slightly, quite a bit, faster than using (coroutines)multi:newthread(). Only create a thread if there is no other possibility! System threads are different and will improve performance if you know what you are doing. Using a (coroutine)thread as a loop with a timer is slower than using a TLoop! If you do not need the holding features I strongly recommend that you use the multi-objects. This could be due to the scheduler that I am using, and I am looking into improving the performance of the scheduler for (coroutine)threads. This is still a work in progress so expect things to only get better as time passes! **Note:** After doing some testing, I have noticed that using multi-objects are slightly, quite a bit, faster than using (coroutines)multi:newthread(). Only create a thread if there is no other possibility! System threads are different and will improve performance if you know what you are doing. Using a (coroutine)thread as a loop with a timer is slower than using a TLoop! If you do not need the holding features I strongly recommend that you use the multi-objects. This could be due to the scheduler that I am using, and I am looking into improving the performance of the scheduler for (coroutine)threads. This is still a work in progress so expect things to only get better as time passes!
#Added: #Added:
- require("multi.integration.networkManager") - `nGLOBAL = require("multi.integration.networkManager").init()`
- multi:newNode(tbl: settings) - `node = multi:newNode(tbl: settings)`
- multi:newMaster(tbl: settings) - `master = multi:newMaster(tbl: settings)`
- multi:nodeManager(port) - `multi:nodeManager(port)`
Node:
- node:sendTo(name,data)
- node:pushTo(name,data)
- node:peek()
- node:pop()
Master:
- master:doToAll(func)
- master:register(name,node,func)
- master:execute(name,node,...)
- master:newNetworkThread(tname,func,name,...)
- master:getFreeNode()
- master:getRandomNode()
- master:sendTo(name,data)
- master:pushTo(name,data)
- master:peek()
- master:pop()
Note: These examples assume that you have already connected the nodes to the node manager. Also you do not need to use the node manager, but sometimes broadcast does not work as expected and the master doesnot connect to the nodes. Using the node manager offers nice features like: removing nodes from the master when they have disconnected, and automatically telling the master when nodes have been added. Note: These examples assume that you have already connected the nodes to the node manager. Also you do not need to use the node manager, but sometimes broadcast does not work as expected and the master doesnot connect to the nodes. Using the node manager offers nice features like: removing nodes from the master when they have disconnected, and automatically telling the master when nodes have been added.
**NodeManager.lua** **NodeManager.lua**
```lua ```lua
package.path="?/init.lua;?.lua;"..package.path
multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
nGLOBAL = require("multi.integration.networkManager").init()
multi:nodeManager(12345) -- Host a node manager on port: 12345
print("Node Manager Running...")
settings = {
priority = 0, -- 1 or 2
protect = false,
}
multi:mainloop(settings)
-- Thats all you need to run the node manager, everything else is done automatically
``` ```

View File

@ -29,26 +29,23 @@ local function pieceCommand(cmd,...)
end end
-- Internal queue system for network queues -- Internal queue system for network queues
local queue = {} local Queue = {}
queue.__index = queue Queue.__index = Queue
function queue:newQueue(name,master) function Queue:newQueue()
if not name then error("You must include a name in order to create a queue!") end
local c = {} local c = {}
c.name = name
c.master = master
setmetatable(c,self) setmetatable(c,self)
return c return c
end end
function queue:push(data) function Queue:push(data)
master:doToAll(char(CMD_QUEUE)..packData(data))
end
function queue:raw_push(data) -- Internal usage only
table.insert(self,data) table.insert(self,data)
end end
function queue:pop() function Queue:raw_push(data) -- Internal usage only
return resolveData(table.remove(self,1)) table.insert(self,data)
end end
function queue:peek() function Queue:pop()
return table.remove(self,1)
end
function Queue:peek()
return self[1] return self[1]
end end
local queues = {} local queues = {}
@ -162,7 +159,7 @@ function multi:newNode(settings)
node.server = net:newUDPServer(0) -- hosts the node using the default port node.server = net:newUDPServer(0) -- hosts the node using the default port
node.port = node.server.port node.port = node.server.port
node.connections = net.ClientCache node.connections = net.ClientCache
node.queue = queue:newQueue() node.queue = Queue:newQueue()
node.functions = bin.stream("RegisteredFunctions.dat",false) node.functions = bin.stream("RegisteredFunctions.dat",false)
node.hasFuncs = {} node.hasFuncs = {}
if settings.managerDetails then if settings.managerDetails then
@ -195,6 +192,15 @@ function multi:newNode(settings)
end end
end end
end end
function node:pushTo(name,data)
node:sendTo(name,char(CMD_QUEUE)..packData(data))
end
function node:peek()
return node.queue:peek()
end
function node:pop()
return node.queue:pop()
end
node.loadRate=1 node.loadRate=1
if settings then if settings then
if settings.crossTalk then if settings.crossTalk then
@ -215,8 +221,7 @@ function multi:newNode(settings)
elseif cmd == CMD_PONG then elseif cmd == CMD_PONG then
elseif cmd == CMD_QUEUE then elseif cmd == CMD_QUEUE then
local name,d=dat:match("(.-)|(.+)") queue:push(resolveData(dat))
multi.OnNetQueue:Fire(name,d)
elseif cmd == CMD_GLOBAL then elseif cmd == CMD_GLOBAL then
local k,v = dat:match("(.-)|(.+)") local k,v = dat:match("(.-)|(.+)")
PROXY[k]=resolveData(v) PROXY[k]=resolveData(v)
@ -235,8 +240,7 @@ function multi:newNode(settings)
elseif cmd == CMD_PONG then elseif cmd == CMD_PONG then
elseif cmd == CMD_QUEUE then elseif cmd == CMD_QUEUE then
local name,d=dat:match("(.-)|(.+)") node.queue:push(resolveData(dat))
multi.OnNetQueue:Fire(name,d)
elseif cmd == CMD_REG then elseif cmd == CMD_REG then
if not settings.allowRemoteRegistering then if not settings.allowRemoteRegistering then
print(ip..": has attempted to register a function when it is currently not allowed!") print(ip..": has attempted to register a function when it is currently not allowed!")
@ -276,7 +280,7 @@ function multi:newNode(settings)
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
end)-- set this up end)-- set this up
elseif cmd == CMD_INITMASTER then elseif cmd == CMD_INITMASTER then
print("Connected to the master!") print("Connected to the master!",dat)
node.connections[dat]={server,ip,port} node.connections[dat]={server,ip,port}
multi.OnGUpdate(function(k,v) multi.OnGUpdate(function(k,v)
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
@ -285,16 +289,15 @@ function multi:newNode(settings)
server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port) server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port)
end,node.loadRate) end,node.loadRate)
server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port) server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port)
server:send(ip,char(CMD_INITNODE)..node.name,port)
elseif cmd == CMD_GLOBAL then elseif cmd == CMD_GLOBAL then
local k,v = dat:match("(.-)|(.+)") local k,v = dat:match("(.-)|(.+)")
PROXY[k]=resolveData(v) PROXY[k]=resolveData(v)
end end
end) end)
function node:sendToMaster(name,data) function node:sendTo(name,data)
self.connections[name]:send(data) local conn = node.connections[name]
end conn[1]:send(conn[2],data,conn[3])
function node:sendToNode(name,data)
self.connections[name]:send(data)
end end
if not settings.noBroadCast then if not settings.noBroadCast then
node.server:broadcast("NODE_"..name) node.server:broadcast("NODE_"..name)
@ -311,7 +314,8 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
master.conn = multi:newConnection() master.conn = multi:newConnection()
master.conn2 = multi:newConnection() master.conn2 = multi:newConnection()
master.OnFirstNodeConnected = multi:newConnection() master.OnFirstNodeConnected = multi:newConnection()
master.queue = queue:newQueue() master.OnNodeConnected = multi:newConnection()
master.queue = Queue:newQueue()
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
master.loads = {} master.loads = {}
master.trigger = multi:newFunction(function(self,node) master.trigger = multi:newFunction(function(self,node)
@ -328,8 +332,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
if cmd == "N" then if cmd == "N" then
local name,ip,port = data:match("(.-)|(.-)|(.+)") local name,ip,port = data:match("(.-)|(.-)|(.+)")
local c = net:newUDPClient(ip,port) local c = net:newUDPClient(ip,port)
master.connections[name]=c net.OnCastedClientInfo:Fire(c,name,ip,port)master.connections[name]=c
net.OnCastedClientInfo:Fire(c,name,ip,port)
elseif cmd == "R" then elseif cmd == "R" then
local name = data:sub(2,-1) local name = data:sub(2,-1)
master.connections[name]=nil master.connections[name]=nil
@ -375,6 +378,15 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
temp:addBlock(args,#args) temp:addBlock(args,#args)
master:sendTo(node,temp.data) master:sendTo(node,temp.data)
end end
function master:pushTo(name,data)
master:sendTo(name,char(CMD_QUEUE)..packData(data))
end
function master:peek()
return self.queue:peek()
end
function master:pop()
return self.queue:pop()
end
function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
local fData = packData(func) local fData = packData(func)
local tab = {...} local tab = {...}
@ -411,18 +423,18 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
end end
end end
function master:sendTo(name,data) function master:sendTo(name,data)
if name:sub(1,5)=="NODE_" then if name:sub(1,5)~="NODE_" then
name = name:sub(6,-1) name = "NODE_"..name
end end
if self.connections["NODE_"..name]==nil then if self.connections[name]==nil then
multi:newTLoop(function(loop) multi:newTLoop(function(loop)
if self.connections["NODE_"..name]~=nil then if self.connections[name]~=nil then
self.connections["NODE_"..name]:send(data) self.connections[name]:send(data)
loop:Desrtoy() loop:Desrtoy()
end end
end,.1) end,.1)
else else
self.connections["NODE_"..name]:send(data) self.connections[name]:send(data)
end end
end end
function master:getFreeNode() function master:getFreeNode()
@ -453,7 +465,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
nodename = i nodename = i
end end
client.OnClientReady(function() client.OnClientReady(function()
client:send(char(CMD_INITMASTER)..name) -- Tell the node that you are a master trying to connect client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect
client.OnDataRecieved(function(client,data) client.OnDataRecieved(function(client,data)
local cmd = byte(data:sub(1,1)) -- the first byte is the command local cmd = byte(data:sub(1,1)) -- the first byte is the command
local dat = data:sub(2,-1) -- the data that you want to read local dat = data:sub(2,-1) -- the data that you want to read
@ -464,9 +476,10 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
elseif cmd == CMD_PONG then elseif cmd == CMD_PONG then
elseif cmd == CMD_INITNODE then
master.OnNodeConnected:Fire(dat)
elseif cmd == CMD_QUEUE then elseif cmd == CMD_QUEUE then
local name,d=dat:match("(.-)|(.+)") master.queue:push(resolveData(dat))
multi.OnNetQueue:Fire(name,d)
elseif cmd == CMD_GLOBAL then elseif cmd == CMD_GLOBAL then
local k,v = dat:match("(.-)|(.+)") local k,v = dat:match("(.-)|(.+)")
PROXY[k]=resolveData(v) PROXY[k]=resolveData(v)

View File

@ -2,16 +2,33 @@ package.path="?/init.lua;?.lua;"..package.path
multi = require("multi") multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init() local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
nGLOBAL = require("multi.integration.networkManager").init() nGLOBAL = require("multi.integration.networkManager").init()
master = multi:newNode{ node = multi:newNode{
crossTalk = false, -- default value, allows nodes to talk to eachother. WIP NOT READY YET! crossTalk = false, -- default value, allows nodes to talk to eachother. WIP NOT READY YET!
allowRemoteRegistering = true, -- allows you to register functions from the master on the node, default is false allowRemoteRegistering = true, -- allows you to register functions from the master on the node, default is false
name = nil, -- default value name = "TESTNODE", -- default value
noBroadCast = true, -- if using the node manager, set this to true to prevent the node from broadcasting noBroadCast = true, -- if using the node manager, set this to true to prevent the node from broadcasting
managerDetails = {"localhost",12345}, -- connects to the node manager if one exists managerDetails = {"localhost",12345}, -- connects to the node manager if one exists
} }
function RemoteTest(a,b,c) -- a function that we will be executing remotely function RemoteTest(a,b,c) -- a function that we will be executing remotely
print("Yes I work!",a,b,c) print("Yes I work!",a,b,c)
multi:newThread("waiter",function()
print("Hello!")
while true do
thread.sleep(2)
node:pushTo("Main","This is a test")
end
end)
end end
multi:newThread("some-test",function()
local dat = node:pop()
while true do
thread.skip(10)
if dat then
print(dat)
end
dat = node:pop()
end
end,"NODE_TESTNODE")
settings = { settings = {
priority = 0, -- 1 or 2 priority = 0, -- 1 or 2
protect = false, -- if something goes wrong we will crash hard, but the speed gain is good protect = false, -- if something goes wrong we will crash hard, but the speed gain is good

View File

@ -11,26 +11,28 @@ master = multi:newMaster{
managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port) managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port)
} }
-- Send to all the nodes that are connected to the master -- Send to all the nodes that are connected to the master
master:doToAll(function(node_name) master.OnNodeConnected(function(node)
master:register("TestFunc",node_name,function(msg) print("Lets Go!")
print("It works: "..msg) master:execute("RemoteTest",node,1,2,3)
end) multi:newThread("waiter",function()
multi:newAlarm(2):OnRing(function(alarm) print("Hello!")
master:execute("TestFunc",node_name,"Hello!")
alarm:Destroy()
end)
multi:newThread("Checker",function()
while true do while true do
thread.sleep(1) thread.sleep(2)
if nGLOBAL["test"] then print("sending")
print(nGLOBAL["test"]) master:pushTo(node,"This is a test 2")
thread.kill()
end
end end
end) end)
nGLOBAL["test2"]={age=22}
end) end)
multi:newThread("some-test",function()
local dat = master:pop()
while true do
thread.skip(10)
if dat then
print(dat)
end
dat = master:pop()
end
end,"NODE_TESTNODE")
-- Starting the multitasker -- Starting the multitasker
settings = { settings = {
priority = 0, -- 0, 1 or 2 priority = 0, -- 0, 1 or 2