diff --git a/examples/lanesintegratetest9.lua b/examples/lanesintegratetest9.lua index 29182a0..a9f7a22 100644 --- a/examples/lanesintegratetest9.lua +++ b/examples/lanesintegratetest9.lua @@ -10,6 +10,7 @@ jQueue:registerJob("TEST_JOB2",function() print("Test Works!") end) jQueue:start() +jQueue.OnReady:holdUT() jQueue:doToAll(function() print("Doing this 16? times!") end) diff --git a/multi/init.lua b/multi/init.lua index 0839c83..1d67f9f 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -79,7 +79,7 @@ function multi.queuefinal(self) self.Parent:Remove() end end -if table.unpack then +if table.unpack and not unpack then unpack=table.unpack end function table.merge(t1, t2) diff --git a/multi/integration/networkManager.lua b/multi/integration/networkManager.lua index 0c7cac0..4928d0d 100644 --- a/multi/integration/networkManager.lua +++ b/multi/integration/networkManager.lua @@ -3,7 +3,7 @@ local multi = require("multi") local net = require("net") require("bin") -bin.setBitsInterface(infinabits) +bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix -- Commands that the master and node will respect, max of 256 commands local CMD_ERROR = 0x00 @@ -17,6 +17,7 @@ local CMD_GLOBAL = 0x07 local CMD_LOAD = 0x08 local CMD_CALL = 0x09 local CMD_REG = 0x0A +local CMD_CONSOLE = 0x0B local char = string.char local byte = string.byte @@ -130,7 +131,7 @@ function multi:nodeManager(port) end server.timeouts[cid] = true server:send(cid,"ping") - end,.1) + end,1) server.nodes[cid]=data:sub(2,-1) server.OnNodeAdded:Fire(server.nodes[cid]) elseif cmd == "G" then @@ -156,15 +157,27 @@ function multi:newNode(settings) local name = settings.name or multi.randomString(8) local node = {} node.name = name + multi.OnError(function(i,error) + node.OnError(node,error,node.server) + end) node.server = net:newUDPServer(0) -- hosts the node using the default port _, node.port = node.server.udp:getsockname() node.connections = net.ClientCache node.queue = Queue:newQueue() node.functions = bin.stream("RegisteredFunctions.dat",false) node.hasFuncs = {} + node.OnError = multi:newConnection() + node.OnError(function(node,err,master) + local temp = bin.new() + temp:addBlock(#node.name) + temp:addBlock(node.name) + temp:addBlock(#err) + temp:addBlock(err) + node.server:send(char(CMD_ERROR..temp)) + end) if settings.managerDetails then local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) - if not c then + if not c then print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false else c.OnDataRecieved(function(self,data) @@ -201,45 +214,26 @@ function multi:newNode(settings) function node:pop() return node.queue:pop() end - node.loadRate=1 - if settings then - if settings.crossTalk then - net.OnCastedClientInfo(function(client,name,ip,port) - multi.OnGUpdate(function(k,v) - client:send(table.concat{char(CMD_GLOBAL),k,"|",v}) - end) - print("Found a new node! Node_List:") - for i,v in pairs(node.connections) do - print(i) - end - client:send(char(CMD_INITNODE)..name) -- Tell the node that you are a node trying to connect - client.OnDataRecieved(function(client,data) - local cmd = byte(data:sub(1,1)) -- the first byte is the command - local dat = data:sub(2,-1) -- the data that you want to read - if cmd == CMD_PING then - - elseif cmd == CMD_PONG then - - elseif cmd == CMD_QUEUE then - queue:push(resolveData(dat)) - elseif cmd == CMD_GLOBAL then - local k,v = dat:match("(.-)|(.+)") - PROXY[k]=resolveData(v) - end - end) - end) - net:newCastedClients("NODE_(.+)") + function node:getConsole() + local c = {} + c.connections = node.connections + function c:print(...) + local data = char(CMD_CONSOLE)..packData({...}) + for i,v in pairs(self.connections) do + v[1]:send(v[2],data,v[3]) + end + print("sent message") end + return c end + node.loadRate=1 -- Lets tell the network we are alive! node.server.OnDataRecieved(function(server,data,cid,ip,port) local cmd = byte(data:sub(1,1)) -- the first byte is the command local dat = data:sub(2,-1) -- the data that you want to read if cmd == CMD_PING then - - elseif cmd == CMD_PONG then - - elseif cmd == CMD_QUEUE then + server:send(ip,char(CMD_PONG),port) + elseif cmd == CMD_QUEUE then node.queue:push(resolveData(dat)) elseif cmd == CMD_REG then if not settings.allowRemoteRegistering then @@ -299,14 +293,14 @@ function multi:newNode(settings) local conn = node.connections[name] conn[1]:send(conn[2],data,conn[3]) end - if not settings.noBroadCast then + if not settings.noBroadCast then node.server:broadcast("NODE_"..name) end return node end -- Masters -function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code. +function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code. local master = {} local settings = settings or {} master.name = settings.name or multi.randomString(8) @@ -315,16 +309,18 @@ function multi:newMaster(settings) -- You will be able to have more than one mas master.conn2 = multi:newConnection() master.OnFirstNodeConnected = multi:newConnection() master.OnNodeConnected = multi:newConnection() + master.OnError = multi:newConnection() master.queue = Queue:newQueue() master.connections = net.ClientCache -- Link to the client cache that is created on the net interface master.loads = {} + master.timeouts = {} master.trigger = multi:newFunction(function(self,node) master.OnFirstNodeConnected:Fire(node) self:Pause() end) if settings.managerDetails then local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) - if not client then + if not client then print("Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false else client.OnDataRecieved(function(client,data) @@ -467,16 +463,34 @@ function multi:newMaster(settings) -- You will be able to have more than one mas end client.OnClientReady(function() client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect + if not settings.managerDetails then + multi:newTLoop(function(loop) + if master.timeouts[name]==true then + master.timeouts[name] = nil + master.connections[name] = nil + loop:Destroy() + return + end + master.timeouts[name] = true + master:sendTo(name,char(CMD_PING)) + end,1) + end + client.name = name client.OnDataRecieved(function(client,data) local cmd = byte(data:sub(1,1)) -- the first byte is the command local dat = data:sub(2,-1) -- the data that you want to read master.trigger(nodename) if cmd == CMD_ERROR then - - elseif cmd == CMD_PING then - + local temp = bin.new(dat) + local len = temp:getBlock("n",2) + local node = temp:getBlock("s",len) + len = temp:getBlock("n",2) + local err = temp:getBlock("s",len) + master.OnError(name,err) + elseif cmd == CMD_CONSOLE then + print(unpack(resolveData(dat))) elseif cmd == CMD_PONG then - + master.timeouts[client.name] = nil elseif cmd == CMD_INITNODE then master.OnNodeConnected:Fire(dat) elseif cmd == CMD_QUEUE then @@ -496,9 +510,8 @@ function multi:newMaster(settings) -- You will be able to have more than one mas end return master end - -- The init function that gets returned print("Integrated Network Parallelism") return {init = function() return GLOBAL -end} \ No newline at end of file +end} diff --git a/multi/integration/shared.lua b/multi/integration/shared.lua index 5a8b8bd..c17f733 100644 --- a/multi/integration/shared.lua +++ b/multi/integration/shared.lua @@ -397,9 +397,10 @@ function multi:newSystemThreadedJobQueue(numOfCores) FUNCS={} SFunc=multi:newFunction(function(self) MainLoop:Pause() - self:hold(.1) - MainLoop:Resume() - self:Pause() + multi:newAlarm(.1):OnRing(function(alarm) + alarm:Destroy() + MainLoop:Resume() + end) end) multi:newLoop(function() local rd=REG:peek() @@ -472,6 +473,7 @@ function multi:newSystemThreadedJobQueue(numOfCores) while data do if data then local a=unpack(data) + print(a) if a=="_THREADINIT_" then self.link.threadsResponded=self.link.threadsResponded+1 if self.link.threadsResponded==self.link.cores then diff --git a/test.lua b/test.lua index 2e2cc36..f00ceb5 100644 --- a/test.lua +++ b/test.lua @@ -7,18 +7,24 @@ nGLOBAL = require("multi.integration.networkManager").init() -- Act as a master node master = multi:newMaster{ name = "Main", -- the name of the master - noBroadCast = true, -- if using the node manager, set this to true to avoid double connections - managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port) + --noBroadCast = true, -- if using the node manager, set this to true to avoid double connections + --managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port) } -- Send to all the nodes that are connected to the master master.OnNodeConnected(function(node) print("Lets Go!") + master:newNetworkThread("Thread",function() + local node = _G.node + local console = node:getConsole() + multi:newTLoop(function() + console:print("Yo whats up man!") + end,1) + end) master:execute("RemoteTest",node,1,2,3) multi:newThread("waiter",function() print("Hello!",node) while true do thread.sleep(2) - print("sending") master:pushTo(node,"This is a test 2") if master.connections["NODE_"..node]==nil then thread.kill()