awesome stuff being added

Almost done with the core features that I want added to this library
This commit is contained in:
Ryan Ward 2018-06-24 21:59:38 -04:00
parent 644044901d
commit 138d61de85
7 changed files with 281 additions and 49 deletions

BIN
RegisteredFunctions.dat Normal file

Binary file not shown.

View File

@ -2,7 +2,8 @@
local multi = require("multi")
local net = require("net")
local bin = require("bin")
require("bin")
bin.setBitsInterface(infinabits)
-- Commands that the master and node will respect, max of 256 commands
local CMD_ERROR = 0x00
@ -15,6 +16,7 @@ local CMD_INITMASTER = 0x06
local CMD_GLOBAL = 0x07
local CMD_LOAD = 0x08
local CMD_CALL = 0x09
local CMD_REG = 0x0A
local char = string.char
local byte = string.byte
@ -105,17 +107,91 @@ setmetatable(GLOBAL,{
end
})
-- In case you are unable to use broadcasting this can be used to help connect to nodes
function multi:nodeManager(port)
if not port then
error("You must provide a port in order to host the node manager!")
end
local server = net:newTCPServer(port)
server.nodes = {}
server.timeouts = {}
server.OnNodeAdded = multi:newConnection()
server.OnNodeRemoved = multi:newConnection()
server.OnDataRecieved(function(server,data,cid,ip,port)
local cmd = data:sub(1,1)
if cmd == "R" then
multi:newTLoop(function(loop)
if server.timeouts[cid]==true then
server.OnNodeRemoved:Fire(server.nodes[cid])
server.nodes[cid] = nil
server.timeouts[cid] = nil
loop:Destroy()
return
end
server.timeouts[cid] = true
server:send(cid,"ping")
end,3)
server.nodes[cid]=data:sub(2,-1)
server.OnNodeAdded:Fire(server.nodes[cid])
elseif cmd == "G" then
server.OnNodeAdded(function(node)
server:send(cid,node)
end)
server.OnNodeRemoved(function(node)
server:send(cid,"R"..node:match("(.-)|"))
end)
for i,v in pairs(server.nodes) do
server:send(cid,v)
end
elseif cmd == "P" then
server.timeouts[cid] = nil
end
end)
end
-- The main driving force of the network manager: Nodes
function multi:newNode(name,settings)
function multi:newNode(settings)
settings = settings or {}
-- Here we have to use the net library to broadcast our node across the network
math.randomseed(os.time())
local name = name or multi.randomString(8)
local name = settings.name or multi.randomString(8)
local node = {}
node.name = name
node.server = net:newUDPServer(0) -- hosts the node using the default port
node.port = node.server.port
node.connections = net.ClientCache
node.queue = queue:newQueue()
node.functions = bin.stream("RegisteredFunctions.dat",false)
node.hasFuncs = {}
if settings.managerDetails then
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
if not c then
print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false
else
c.OnDataRecieved(function(self,data)
if data == "ping" then
self:send("P")
end
end)
c:send("RNODE_"..name.."|"..net.getLocalIP().."|"..node.port)
end
end
if not settings.preload then
if node.functions:getSize()~=0 then
print("We have function(s) to preload!")
local len = node.functions:getBlock("n",1)
local name,func
while len do
name = node.functions:getBlock("s",len)
len = node.functions:getBlock("n",2)
func = node.functions:getBlock("s",len)
len = node.functions:read(1)
_G[name]=resolveData(func)
node.hasFuncs[name]=true
if not len then break end
len = byte(len)
end
end
end
node.loadRate=1
if settings then
if settings.crossTalk then
@ -158,10 +234,37 @@ function multi:newNode(name,settings)
elseif cmd == CMD_QUEUE then
local name,d=dat:match("(.-)|(.+)")
multi.OnNetQueue:Fire(name,d)
elseif cmd == CMD_REG then
if not settings.allowRemoteRegistering then
print(ip..": has attempted to register a function when it is currently not allowed!")
return
end
local temp = bin.new(dat)
local len = temp:getBlock("n",1)
local name = temp:getBlock("s",len)
if node.hasFuncs[name] then
print("Function already preloaded onto the node!")
return
end
len = temp:getBlock("n",2)
local func = temp:getBlock("s",len)
_G[name]=resolveData(func)
node.functions:addBlock(dat)
elseif cmd == CMD_CALL then
local temp = bin.new(dat)
local len = temp:getBlock("n",1)
local name = temp:getBlock("s",len)
len = temp:getBlock("n",2)
local args = temp:getBlock("s",len)
_G[name](unpack(resolveData(args)))
elseif cmd == CMD_TASK then
local args,func = dat:match("(.-)|(.+)")
func = resolveData(func)
local holder = bin.new(dat)
local len = holder:getBlock("n",4)
local args = holder:getBlock("s",len)
local len2 = holder:getBlock("n",4)
local func = holder:getBlock("s",len2)
args = resolveData(args)
func = resolveData(func)
func(unpack(args))
elseif cmd == CMD_INITNODE then
print("Connected with another node!")
@ -190,33 +293,100 @@ function multi:newNode(name,settings)
function node:sendToNode(name,data)
self.connections[name]:send(data)
end
if not settings.noBroadCast then
node.server:broadcast("NODE_"..name)
end
return node
end
-- Masters
function multi:newMaster(name,settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code.
function multi:newMaster(settings) -- You will be able to have more than one master connecting to node(s) if that is what you want to do. I want you to be able to have the freedom to code any way that you want to code.
local master = {}
master.name = name
local settings = settings or {}
master.name = settings.name or multi.randomString(8)
local name = master.name
master.conn = multi:newConnection()
master.conn2 = multi:newConnection()
master.OnFirstNodeConnected = multi:newConnection()
master.queue = queue:newQueue()
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
master.loads = {}
master.trigger = multi:newFunction(function(self)
master.OnFirstNodeConnected:Fire()
master.trigger = multi:newFunction(function(self,node)
master.OnFirstNodeConnected:Fire(node)
self:Pause()
end)
function master:newNetworkThread(tname,name,func,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
if settings.managerDetails then
local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
if not client then
print("Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false
else
client.OnDataRecieved(function(client,data)
local cmd = data:sub(1,1)
if cmd == "N" then
local name,ip,port = data:match("(.-)|(.-)|(.+)")
local c = net:newUDPClient(ip,port)
master.connections[name]=c
net.OnCastedClientInfo:Fire(c,name,ip,port)
elseif cmd == "R" then
local name = data:sub(2,-1)
master.connections[name]=nil
end
end)
client:send("G") -- init your connection as a master
end
end
function master:doToAll(func)
for i,v in pairs(master.connections) do
func(i,v)
end
end
function master:register(name,node,func)
if not node then
error("You must specify a node to execute a command on!")
end
local temp = bin.new()
local fData = packData(func)
temp:addBlock(CMD_REG,1)
temp:addBlock(#name,1)
temp:addBlock(name,#name)
temp:addBlock(#fData,2)
temp:addBlock(fData,#fData)
master:sendTo(node,temp.data)
end
function master:execute(name,node,...)
if not node then
error("You must specify a node to execute a command on!")
end
if not name then
error("You must specify a function name to call on the node!")
end
local args = packData{...}
local name = name
local node = node
local temp, len, data
temp = bin.new()
temp:addBlock(CMD_CALL,1)
temp:addBlock(#name,1)
temp:addBlock(name,#name)
temp:addBlock(#args,2)
temp:addBlock(args,#args)
master:sendTo(node,temp.data)
end
function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
local fData = packData(func)
local tab = {...}
local aData = ""
if #tab~=o then
aData = (packData{...}).."|"
aData = (packData{...})
else
aData = (packData{1,1}).."|"
aData = (packData{"NO","ARGS"})
end
local temp = bin.new()
temp:addBlock(#aData,4)
local len = temp.data
local temp2 = bin.new()
temp2:addBlock(#fData,4)
local len2 = temp2.data
if not name then
local name = self:getFreeNode()
if not name then
@ -225,20 +395,22 @@ function multi:newMaster(name,settings) -- You will be able to have more than on
if name==nil then
multi:newTLoop(function(loop)
if name~=nil then
print("Readying Func")
self:sendTo(name,char(CMD_TASK)..aData..fData)
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
loop:Desrtoy()
end
end,.1)
else
self:sendTo(name,char(CMD_TASK)..aData..fData)
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
end
else
local name = "NODE_"..name
self:sendTo(name,char(CMD_TASK)..aData..fData)
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
end
end
function master:sendTo(name,data)
if name:sub(1,5)=="NODE_" then
name = name:sub(6,-1)
end
if self.connections["NODE_"..name]==nil then
multi:newTLoop(function(loop)
if self.connections["NODE_"..name]~=nil then
@ -273,16 +445,16 @@ function multi:newMaster(name,settings) -- You will be able to have more than on
multi.OnGUpdate(function(k,v)
client:send(table.concat{char(CMD_GLOBAL),k,"|",v})
end)
print("Found a new node! Node_List:")
local nodename
for i,v in pairs(master.connections) do
print(i)
nodename = i
end
client.OnClientReady(function()
client:send(char(CMD_INITMASTER)..name) -- Tell the node that you are a master trying to connect
client.OnDataRecieved(function(client,data)
local cmd = byte(data:sub(1,1)) -- the first byte is the command
local dat = data:sub(2,-1) -- the data that you want to read
master.trigger()
master.trigger(nodename)
if cmd == CMD_ERROR then
elseif cmd == CMD_PING then
@ -295,7 +467,6 @@ function multi:newMaster(name,settings) -- You will be able to have more than on
elseif cmd == CMD_GLOBAL then
local k,v = dat:match("(.-)|(.+)")
PROXY[k]=resolveData(v)
print("Got Global Command")
elseif cmd == CMD_LOAD then
local name,load = dat:match("(.-)|(.+)")
master.loads[name]=tonumber(load)
@ -303,11 +474,14 @@ function multi:newMaster(name,settings) -- You will be able to have more than on
end)
end)
end)
if not settings.noBroadCast then
net:newCastedClients("NODE_(.+)") -- Searches for nodes and connects to them, the master.clients table will contain them by name
end
return master
end
-- The init function that gets returned
print("Integrated Network Parallelism")
return {init = function()
return GLOBAL
end}

View File

@ -119,7 +119,7 @@ function multi:newSystemThreadedConnection(name,protect)
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
function c:init()
require("multi")
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
@ -134,7 +134,7 @@ function multi:newSystemThreadedConnection(name,protect)
conn.queueCall = multi:newSystemThreadedQueue(self.name.."THREADED_CALLQ"):init()
end
else
require("multi") -- so things don't break, but also allows bi-directional connections to work
local multi = require("multi") -- so things don't break, but also allows bi-directional connections to work
conn.queueCall = sThread.waitFor(self.name.."THREADED_CALLQ"):init()
end
setmetatable(conn,{__call=function(self,...) return self:connect(...) end})
@ -216,7 +216,7 @@ function multi:systemThreadedBenchmark(n,p)
local GLOBAL=multi.integration.GLOBAL
for i=1,cores do
multi:newSystemThread("STHREAD_BENCH",function()
require("multi")
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
@ -257,7 +257,7 @@ function multi:newSystemThreadedConsole(name)
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
function c:init()
require("multi")
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
@ -381,7 +381,7 @@ function multi:newSystemThreadedJobQueue(numOfCores)
GLOBAL["__JQ_COUNT__"]=c.cores
for i=1,c.cores do
multi:newSystemThread("System Threaded Job Queue Worker Thread #"..i,function(name,ind)
require("multi")
local multi = require("multi")
ThreadName=name
__sleep__=.001
if love then -- lets make sure we don't reference up-values if using love2d

View File

@ -2,9 +2,15 @@ package.path="?/init.lua;?.lua;"..package.path
multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
nGLOBAL = require("multi.integration.networkManager").init()
master = multi:newNode()
function RemoteTest()
print("Yes I work!")
master = multi:newNode{
crossTalk = false, -- default value
allowRemoteRegistering = true,
name = nil, -- default value
noBroadCast = true,
managerDetails = {"localhost",12345}, -- connects to the node manager if one exists
}
function RemoteTest(a,b,c)
print("Yes I work!",a,b,c)
end
settings = {
priority = 0, -- 1 or 2

11
nodeManager.lua Normal file
View File

@ -0,0 +1,11 @@
package.path="?/init.lua;?.lua;"..package.path
multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
nGLOBAL = require("multi.integration.networkManager").init()
multi:nodeManager(12345)
print("Node Manager Running...")
settings = {
priority = 0, -- 1 or 2
protect = false,
}
multi:mainloop(settings)

View File

@ -0,0 +1,32 @@
package = "multi"
version = "1.11.0"
source = {
url = "git://github.com/rayaman/multi.git",
tag = "v1.11.0",
}
description = {
summary = "Lua Multi tasking library",
detailed = [[
This library contains many methods for multi tasking. From simple side by side code using multi-objs, to using coroutine based Threads and System threads(When you have lua lanes installed or are using love2d)
]],
homepage = "https://github.com/rayaman/multi",
license = "MIT"
}
dependencies = {
"lua >= 5.1",
"bin",
"lanes"
}
build = {
type = "builtin",
modules = {
["multi.init"] = "multi/init.lua",
["multi.all"] = "multi/all.lua",
["multi.compat.backwards[1,5,0]"] = "multi/compat/backwards[1,5,0].lua",
["multi.compat.love2d"] = "multi/compat/love2d.lua",
["multi.integration.lanesManager"] = "multi/integration/lanesManager.lua",
["multi.integration.loveManager"] = "multi/integration/loveManager.lua",
["multi.integration.luvitManager"] = "multi/integration/luvitManager.lua",
["multi.integration.shared"] = "multi/integration/shared.lua"
}
}

View File

@ -4,31 +4,40 @@ package.path="?/init.lua;?.lua;"..package.path
multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
nGLOBAL = require("multi.integration.networkManager").init()
-- Run the code
master = multi:newMaster("Main")
-- Act as a master node
master = multi:newMaster{
pollManagerRate = 15,
name = "Main",
noBroadCast = true,
managerDetails = {"localhost",12345},
}
-- Starting the multitasker
settings = {
priority = 0, -- 1 or 2
priority = 0, -- 0, 1 or 2
protect = false,
}
master.OnFirstNodeConnected(function()
print("Node connected lets go!")
master:newNetworkThread("Test_Thread",nil,function()
RemoteTest()
multi:newThread("test",function()
nGLOBAL["test"]="Did it work?"
--~ master.OnFirstNodeConnected(function(node_name)
multi:newAlarm(3):OnRing(function(alarm)
master:doToAll(function(node_name)
master:register("TestFunc",node_name,function(msg)
print("It works: "..msg)
end)
multi:newAlarm(2):OnRing(function(alarm)
master:execute("TestFunc",node_name,"Hello!")
alarm:Destroy()
end)
multi:newThread("Checker",function()
while true do
thread.sleep(.5)
thread.sleep(1)
if nGLOBAL["test"] then
print(nGLOBAL["test"])
thread.kill()
end
end
end)
nGLOBAL["test2"]={age=22}
end)
os.execute("start lua node.lua")
alarm:Destroy()
end)
--~ os.execute("start lua node.lua")
multi:mainloop(settings)