Almost there, I took way too long of a break
This commit is contained in:
parent
aec5a88360
commit
d4f41c5aa7
@ -10,6 +10,7 @@ jQueue:registerJob("TEST_JOB2",function()
|
|||||||
print("Test Works!")
|
print("Test Works!")
|
||||||
end)
|
end)
|
||||||
jQueue:start()
|
jQueue:start()
|
||||||
|
jQueue.OnReady:holdUT()
|
||||||
jQueue:doToAll(function()
|
jQueue:doToAll(function()
|
||||||
print("Doing this 16? times!")
|
print("Doing this 16? times!")
|
||||||
end)
|
end)
|
||||||
|
|||||||
@ -79,7 +79,7 @@ function multi.queuefinal(self)
|
|||||||
self.Parent:Remove()
|
self.Parent:Remove()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
if table.unpack then
|
if table.unpack and not unpack then
|
||||||
unpack=table.unpack
|
unpack=table.unpack
|
||||||
end
|
end
|
||||||
function table.merge(t1, t2)
|
function table.merge(t1, t2)
|
||||||
|
|||||||
@ -3,7 +3,7 @@
|
|||||||
local multi = require("multi")
|
local multi = require("multi")
|
||||||
local net = require("net")
|
local net = require("net")
|
||||||
require("bin")
|
require("bin")
|
||||||
bin.setBitsInterface(infinabits)
|
bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix
|
||||||
|
|
||||||
-- Commands that the master and node will respect, max of 256 commands
|
-- Commands that the master and node will respect, max of 256 commands
|
||||||
local CMD_ERROR = 0x00
|
local CMD_ERROR = 0x00
|
||||||
@ -17,6 +17,7 @@ local CMD_GLOBAL = 0x07
|
|||||||
local CMD_LOAD = 0x08
|
local CMD_LOAD = 0x08
|
||||||
local CMD_CALL = 0x09
|
local CMD_CALL = 0x09
|
||||||
local CMD_REG = 0x0A
|
local CMD_REG = 0x0A
|
||||||
|
local CMD_CONSOLE = 0x0B
|
||||||
|
|
||||||
local char = string.char
|
local char = string.char
|
||||||
local byte = string.byte
|
local byte = string.byte
|
||||||
@ -130,7 +131,7 @@ function multi:nodeManager(port)
|
|||||||
end
|
end
|
||||||
server.timeouts[cid] = true
|
server.timeouts[cid] = true
|
||||||
server:send(cid,"ping")
|
server:send(cid,"ping")
|
||||||
end,.1)
|
end,1)
|
||||||
server.nodes[cid]=data:sub(2,-1)
|
server.nodes[cid]=data:sub(2,-1)
|
||||||
server.OnNodeAdded:Fire(server.nodes[cid])
|
server.OnNodeAdded:Fire(server.nodes[cid])
|
||||||
elseif cmd == "G" then
|
elseif cmd == "G" then
|
||||||
@ -156,15 +157,27 @@ function multi:newNode(settings)
|
|||||||
local name = settings.name or multi.randomString(8)
|
local name = settings.name or multi.randomString(8)
|
||||||
local node = {}
|
local node = {}
|
||||||
node.name = name
|
node.name = name
|
||||||
|
multi.OnError(function(i,error)
|
||||||
|
node.OnError(node,error,node.server)
|
||||||
|
end)
|
||||||
node.server = net:newUDPServer(0) -- hosts the node using the default port
|
node.server = net:newUDPServer(0) -- hosts the node using the default port
|
||||||
_, node.port = node.server.udp:getsockname()
|
_, node.port = node.server.udp:getsockname()
|
||||||
node.connections = net.ClientCache
|
node.connections = net.ClientCache
|
||||||
node.queue = Queue:newQueue()
|
node.queue = Queue:newQueue()
|
||||||
node.functions = bin.stream("RegisteredFunctions.dat",false)
|
node.functions = bin.stream("RegisteredFunctions.dat",false)
|
||||||
node.hasFuncs = {}
|
node.hasFuncs = {}
|
||||||
|
node.OnError = multi:newConnection()
|
||||||
|
node.OnError(function(node,err,master)
|
||||||
|
local temp = bin.new()
|
||||||
|
temp:addBlock(#node.name)
|
||||||
|
temp:addBlock(node.name)
|
||||||
|
temp:addBlock(#err)
|
||||||
|
temp:addBlock(err)
|
||||||
|
node.server:send(char(CMD_ERROR..temp))
|
||||||
|
end)
|
||||||
if settings.managerDetails then
|
if settings.managerDetails then
|
||||||
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
|
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
|
||||||
if not c then
|
if not c then
|
||||||
print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false
|
print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false
|
||||||
else
|
else
|
||||||
c.OnDataRecieved(function(self,data)
|
c.OnDataRecieved(function(self,data)
|
||||||
@ -201,45 +214,26 @@ function multi:newNode(settings)
|
|||||||
function node:pop()
|
function node:pop()
|
||||||
return node.queue:pop()
|
return node.queue:pop()
|
||||||
end
|
end
|
||||||
node.loadRate=1
|
function node:getConsole()
|
||||||
if settings then
|
local c = {}
|
||||||
if settings.crossTalk then
|
c.connections = node.connections
|
||||||
net.OnCastedClientInfo(function(client,name,ip,port)
|
function c:print(...)
|
||||||
multi.OnGUpdate(function(k,v)
|
local data = char(CMD_CONSOLE)..packData({...})
|
||||||
client:send(table.concat{char(CMD_GLOBAL),k,"|",v})
|
for i,v in pairs(self.connections) do
|
||||||
end)
|
v[1]:send(v[2],data,v[3])
|
||||||
print("Found a new node! Node_List:")
|
end
|
||||||
for i,v in pairs(node.connections) do
|
print("sent message")
|
||||||
print(i)
|
|
||||||
end
|
|
||||||
client:send(char(CMD_INITNODE)..name) -- Tell the node that you are a node trying to connect
|
|
||||||
client.OnDataRecieved(function(client,data)
|
|
||||||
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
|
||||||
local dat = data:sub(2,-1) -- the data that you want to read
|
|
||||||
if cmd == CMD_PING then
|
|
||||||
|
|
||||||
elseif cmd == CMD_PONG then
|
|
||||||
|
|
||||||
elseif cmd == CMD_QUEUE then
|
|
||||||
queue:push(resolveData(dat))
|
|
||||||
elseif cmd == CMD_GLOBAL then
|
|
||||||
local k,v = dat:match("(.-)|(.+)")
|
|
||||||
PROXY[k]=resolveData(v)
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end)
|
|
||||||
net:newCastedClients("NODE_(.+)")
|
|
||||||
end
|
end
|
||||||
|
return c
|
||||||
end
|
end
|
||||||
|
node.loadRate=1
|
||||||
-- Lets tell the network we are alive!
|
-- Lets tell the network we are alive!
|
||||||
node.server.OnDataRecieved(function(server,data,cid,ip,port)
|
node.server.OnDataRecieved(function(server,data,cid,ip,port)
|
||||||
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
||||||
local dat = data:sub(2,-1) -- the data that you want to read
|
local dat = data:sub(2,-1) -- the data that you want to read
|
||||||
if cmd == CMD_PING then
|
if cmd == CMD_PING then
|
||||||
|
server:send(ip,char(CMD_PONG),port)
|
||||||
elseif cmd == CMD_PONG then
|
elseif cmd == CMD_QUEUE then
|
||||||
|
|
||||||
elseif cmd == CMD_QUEUE then
|
|
||||||
node.queue:push(resolveData(dat))
|
node.queue:push(resolveData(dat))
|
||||||
elseif cmd == CMD_REG then
|
elseif cmd == CMD_REG then
|
||||||
if not settings.allowRemoteRegistering then
|
if not settings.allowRemoteRegistering then
|
||||||
@ -299,14 +293,14 @@ function multi:newNode(settings)
|
|||||||
local conn = node.connections[name]
|
local conn = node.connections[name]
|
||||||
conn[1]:send(conn[2],data,conn[3])
|
conn[1]:send(conn[2],data,conn[3])
|
||||||
end
|
end
|
||||||
if not settings.noBroadCast then
|
if not settings.noBroadCast then
|
||||||
node.server:broadcast("NODE_"..name)
|
node.server:broadcast("NODE_"..name)
|
||||||
end
|
end
|
||||||
return node
|
return node
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Masters
|
-- Masters
|
||||||
function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code.
|
function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code.
|
||||||
local master = {}
|
local master = {}
|
||||||
local settings = settings or {}
|
local settings = settings or {}
|
||||||
master.name = settings.name or multi.randomString(8)
|
master.name = settings.name or multi.randomString(8)
|
||||||
@ -315,16 +309,18 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
|
|||||||
master.conn2 = multi:newConnection()
|
master.conn2 = multi:newConnection()
|
||||||
master.OnFirstNodeConnected = multi:newConnection()
|
master.OnFirstNodeConnected = multi:newConnection()
|
||||||
master.OnNodeConnected = multi:newConnection()
|
master.OnNodeConnected = multi:newConnection()
|
||||||
|
master.OnError = multi:newConnection()
|
||||||
master.queue = Queue:newQueue()
|
master.queue = Queue:newQueue()
|
||||||
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
|
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
|
||||||
master.loads = {}
|
master.loads = {}
|
||||||
|
master.timeouts = {}
|
||||||
master.trigger = multi:newFunction(function(self,node)
|
master.trigger = multi:newFunction(function(self,node)
|
||||||
master.OnFirstNodeConnected:Fire(node)
|
master.OnFirstNodeConnected:Fire(node)
|
||||||
self:Pause()
|
self:Pause()
|
||||||
end)
|
end)
|
||||||
if settings.managerDetails then
|
if settings.managerDetails then
|
||||||
local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
|
local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
|
||||||
if not client then
|
if not client then
|
||||||
print("Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false
|
print("Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false
|
||||||
else
|
else
|
||||||
client.OnDataRecieved(function(client,data)
|
client.OnDataRecieved(function(client,data)
|
||||||
@ -467,16 +463,34 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
|
|||||||
end
|
end
|
||||||
client.OnClientReady(function()
|
client.OnClientReady(function()
|
||||||
client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect
|
client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect
|
||||||
|
if not settings.managerDetails then
|
||||||
|
multi:newTLoop(function(loop)
|
||||||
|
if master.timeouts[name]==true then
|
||||||
|
master.timeouts[name] = nil
|
||||||
|
master.connections[name] = nil
|
||||||
|
loop:Destroy()
|
||||||
|
return
|
||||||
|
end
|
||||||
|
master.timeouts[name] = true
|
||||||
|
master:sendTo(name,char(CMD_PING))
|
||||||
|
end,1)
|
||||||
|
end
|
||||||
|
client.name = name
|
||||||
client.OnDataRecieved(function(client,data)
|
client.OnDataRecieved(function(client,data)
|
||||||
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
local cmd = byte(data:sub(1,1)) -- the first byte is the command
|
||||||
local dat = data:sub(2,-1) -- the data that you want to read
|
local dat = data:sub(2,-1) -- the data that you want to read
|
||||||
master.trigger(nodename)
|
master.trigger(nodename)
|
||||||
if cmd == CMD_ERROR then
|
if cmd == CMD_ERROR then
|
||||||
|
local temp = bin.new(dat)
|
||||||
elseif cmd == CMD_PING then
|
local len = temp:getBlock("n",2)
|
||||||
|
local node = temp:getBlock("s",len)
|
||||||
|
len = temp:getBlock("n",2)
|
||||||
|
local err = temp:getBlock("s",len)
|
||||||
|
master.OnError(name,err)
|
||||||
|
elseif cmd == CMD_CONSOLE then
|
||||||
|
print(unpack(resolveData(dat)))
|
||||||
elseif cmd == CMD_PONG then
|
elseif cmd == CMD_PONG then
|
||||||
|
master.timeouts[client.name] = nil
|
||||||
elseif cmd == CMD_INITNODE then
|
elseif cmd == CMD_INITNODE then
|
||||||
master.OnNodeConnected:Fire(dat)
|
master.OnNodeConnected:Fire(dat)
|
||||||
elseif cmd == CMD_QUEUE then
|
elseif cmd == CMD_QUEUE then
|
||||||
@ -496,9 +510,8 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
|
|||||||
end
|
end
|
||||||
return master
|
return master
|
||||||
end
|
end
|
||||||
|
|
||||||
-- The init function that gets returned
|
-- The init function that gets returned
|
||||||
print("Integrated Network Parallelism")
|
print("Integrated Network Parallelism")
|
||||||
return {init = function()
|
return {init = function()
|
||||||
return GLOBAL
|
return GLOBAL
|
||||||
end}
|
end}
|
||||||
|
|||||||
@ -397,9 +397,10 @@ function multi:newSystemThreadedJobQueue(numOfCores)
|
|||||||
FUNCS={}
|
FUNCS={}
|
||||||
SFunc=multi:newFunction(function(self)
|
SFunc=multi:newFunction(function(self)
|
||||||
MainLoop:Pause()
|
MainLoop:Pause()
|
||||||
self:hold(.1)
|
multi:newAlarm(.1):OnRing(function(alarm)
|
||||||
MainLoop:Resume()
|
alarm:Destroy()
|
||||||
self:Pause()
|
MainLoop:Resume()
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
multi:newLoop(function()
|
multi:newLoop(function()
|
||||||
local rd=REG:peek()
|
local rd=REG:peek()
|
||||||
@ -472,6 +473,7 @@ function multi:newSystemThreadedJobQueue(numOfCores)
|
|||||||
while data do
|
while data do
|
||||||
if data then
|
if data then
|
||||||
local a=unpack(data)
|
local a=unpack(data)
|
||||||
|
print(a)
|
||||||
if a=="_THREADINIT_" then
|
if a=="_THREADINIT_" then
|
||||||
self.link.threadsResponded=self.link.threadsResponded+1
|
self.link.threadsResponded=self.link.threadsResponded+1
|
||||||
if self.link.threadsResponded==self.link.cores then
|
if self.link.threadsResponded==self.link.cores then
|
||||||
|
|||||||
12
test.lua
12
test.lua
@ -7,18 +7,24 @@ nGLOBAL = require("multi.integration.networkManager").init()
|
|||||||
-- Act as a master node
|
-- Act as a master node
|
||||||
master = multi:newMaster{
|
master = multi:newMaster{
|
||||||
name = "Main", -- the name of the master
|
name = "Main", -- the name of the master
|
||||||
noBroadCast = true, -- if using the node manager, set this to true to avoid double connections
|
--noBroadCast = true, -- if using the node manager, set this to true to avoid double connections
|
||||||
managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port)
|
--managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port)
|
||||||
}
|
}
|
||||||
-- Send to all the nodes that are connected to the master
|
-- Send to all the nodes that are connected to the master
|
||||||
master.OnNodeConnected(function(node)
|
master.OnNodeConnected(function(node)
|
||||||
print("Lets Go!")
|
print("Lets Go!")
|
||||||
|
master:newNetworkThread("Thread",function()
|
||||||
|
local node = _G.node
|
||||||
|
local console = node:getConsole()
|
||||||
|
multi:newTLoop(function()
|
||||||
|
console:print("Yo whats up man!")
|
||||||
|
end,1)
|
||||||
|
end)
|
||||||
master:execute("RemoteTest",node,1,2,3)
|
master:execute("RemoteTest",node,1,2,3)
|
||||||
multi:newThread("waiter",function()
|
multi:newThread("waiter",function()
|
||||||
print("Hello!",node)
|
print("Hello!",node)
|
||||||
while true do
|
while true do
|
||||||
thread.sleep(2)
|
thread.sleep(2)
|
||||||
print("sending")
|
|
||||||
master:pushTo(node,"This is a test 2")
|
master:pushTo(node,"This is a test 2")
|
||||||
if master.connections["NODE_"..node]==nil then
|
if master.connections["NODE_"..node]==nil then
|
||||||
thread.kill()
|
thread.kill()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user