almost done, just a few more tests

This commit is contained in:
Ryan Ward 2018-07-16 00:06:03 -04:00
parent d4f41c5aa7
commit 2447526a12
5 changed files with 147 additions and 35 deletions

View File

@ -1,22 +1,29 @@
#Changes #Changes
[TOC] [TOC]
Update: 2.0.0 Big update Update: 2.0.0 Big update (Lots of additions some changes)
------------------------ ------------------------
**Note:** After doing some testing, I have noticed that using multi-objects are slightly, quite a bit, faster than using (coroutines)multi:newthread(). Only create a thread if there is no other possibility! System threads are different and will improve performance if you know what you are doing. Using a (coroutine)thread as a loop with a timer is slower than using a TLoop! If you do not need the holding features I strongly recommend that you use the multi-objects. This could be due to the scheduler that I am using, and I am looking into improving the performance of the scheduler for (coroutine)threads. This is still a work in progress so expect things to only get better as time passes! **Note:** ~~After doing some testing, I have noticed that using multi-objects are slightly, quite a bit, faster than using (coroutines)multi:newthread(). Only create a thread if there is no other possibility! System threads are different and will improve performance if you know what you are doing. Using a (coroutine)thread as a loop with a timer is slower than using a TLoop! If you do not need the holding features I strongly recommend that you use the multi-objects. This could be due to the scheduler that I am using, and I am looking into improving the performance of the scheduler for (coroutine)threads. This is still a work in progress so expect things to only get better as time passes!~~ This was the reason threadloop was added. It binds the thread scheduler into the mainloop allowing threads to run much faster than before. Also the use of locals is now possible since I am not dealing with seperate objects. And finally reduced function overhead helps keep the threads running better.
#Added: #Added:
- `nGLOBAL = require("multi.integration.networkManager").init()` - `nGLOBAL = require("multi.integration.networkManager").init()`
- `node = multi:newNode(tbl: settings)` - `node = multi:newNode(tbl: settings)`
- `master = multi:newMaster(tbl: settings)` - `master = multi:newMaster(tbl: settings)`
- `multi:nodeManager(port)` - `multi:nodeManager(port)`
- `thread.isThread()` -- for coroutine based threads
- New setting to the main loop,stopOnError which defaults to true. This will cause the object that crashes when under protect to be destroyed, so the error does not keep happening.
- multi:threadloop(settings) works just like mainloop, but prioritizes (corutine based) threads. Regular multi-objects will still work. This improves the preformance of (coroutine based) threads greatly.
Node: Changed:
- When a (corutine based)thread errors it does not print anymore! Conect to multi.OnError() to get errors when they happen!
#Node:
- node:sendTo(name,data) - node:sendTo(name,data)
- node:pushTo(name,data) - node:pushTo(name,data)
- node:peek() - node:peek()
- node:pop() - node:pop()
- node:getConsole()
Master: #Master:
- master:doToAll(func) - master:doToAll(func)
- master:register(name,node,func) - master:register(name,node,func)
- master:execute(name,node,...) - master:execute(name,node,...)
@ -27,10 +34,20 @@ Master:
- master:pushTo(name,data) - master:pushTo(name,data)
- master:peek() - master:peek()
- master:pop() - master:pop()
- master:OnError(nodename, error) -- if a node has an error this is triggered.
**Note On Queues:** When it comes to network queues, they only send 1 way. What I mean by that is that if the master sends a message to a node, its own queue will not get populated at all. The reason for this is because syncing between which popped from what network queue would make things really slow and would not perform so well. This means you have to code a bit differently. Use: master getFreeNode() to get the name of the node under the least amount of load. Then handle the sending of data to each node that way. #Going forward:
- Improve Performance
- Fix supporting libraries (Bin, and net need tons of work)
- Look for the bugs
- Figure out what I can do to make this library more awesome
**Note:** These examples assume that you have already connected the nodes to the node manager. Also you do not need to use the node manager, but sometimes broadcast does not work as expected and the master doesnot connect to the nodes. Using the node manager offers nice features like: removing nodes from the master when they have disconnected, and automatically telling the master when nodes have been added.
**Note On Queues:** When it comes to network queues, they only send 1 way. What I mean by that is that if the master sends a message to a node, its own queue will not get populated at all. The reason for this is because syncing between which popped from what network queue would make things really slow and would not perform well at all. This means you have to code a bit differently. Use: master getFreeNode() to get the name of the node under the least amount of load. Then handle the sending of data to each node that way.
Now there is a little trick you can do. If you combine both networkmanager and systemthreading manager, then you could have a proxy queue for all system threads that can pull from that "node". Now data passing within a lan network, (And wan network if using the node manager, though p2p isn't working as i would like and you would need to open ports and make things work. Remember you can define an port for your node so you can port forward that if you want), is fast enough, but the waiting problem is something to consider. Ask yourseld what you are coding and if network paralisim is worth using.
**Note:** These examples assume that you have already connected the nodes to the node manager. Also you do not need to use the node manager, but sometimes broadcast does not work as expected and the master doesnot connect to the nodes. Using the node manager offers nice features like: removing nodes from the master when they have disconnected, and automatically telling the master when nodes have been added. A more complete example showing connections regardless of order will be shown in the example folder check it out. New naming scheme too.
**NodeManager.lua** **NodeManager.lua**
```lua ```lua
@ -49,6 +66,8 @@ multi:mainloop(settings)
``` ```
Side note: I had a setting called cross talk that would allow nodes to talk to each other. After some tought I decided to not allow nodes to talk to each other directly! You however can create another master withing the node. (The node will connect to its own master as well). This will give you the ability "Cross talk" with each node. Reimplementing the master features into each node directly was un nessacery.
**Node.lua** **Node.lua**
```lua ```lua
package.path="?/init.lua;?.lua;"..package.path package.path="?/init.lua;?.lua;"..package.path
@ -56,7 +75,6 @@ multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init() local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
nGLOBAL = require("multi.integration.networkManager").init() nGLOBAL = require("multi.integration.networkManager").init()
master = multi:newNode{ master = multi:newNode{
crossTalk = false, -- default value, allows nodes to talk to eachother. WIP NOT READY YET!
allowRemoteRegistering = true, -- allows you to register functions from the master on the node, default is false allowRemoteRegistering = true, -- allows you to register functions from the master on the node, default is false
name = nil, -- default value name = nil, -- default value
noBroadCast = true, -- if using the node manager, set this to true to prevent the node from broadcasting noBroadCast = true, -- if using the node manager, set this to true to prevent the node from broadcasting

View File

@ -6,7 +6,7 @@ Copyright (c) 2017 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
@ -750,15 +750,83 @@ function multi:newCondition(func)
return c return c
end end
multi.NewCondition=multi.newCondition multi.NewCondition=multi.newCondition
function multi:threadloop(settings)
multi.scheduler:Destroy() -- destroy is an interesting thing... if you dont set references to nil, then you only remove it from the mainloop
local Threads=multi:linkDomain("Threads")
local Globals=multi:linkDomain("Globals")
local counter=0
local tick = 0
while true do
tick = tick + 1
if tick == 1024 then
tick = 0
multi:uManager(settings)
end
counter=counter+1
for i=#Threads,1,-1 do
ret={}
if coroutine.status(Threads[i].thread)=="dead" then
table.remove(Threads,i)
else
if Threads[i].timer:Get()>=Threads[i].sleep then
if Threads[i].firstRunDone==false then
Threads[i].firstRunDone=true
Threads[i].timer:Start()
_,ret=coroutine.resume(Threads[i].thread,Threads[i].ref)
else
_,ret=coroutine.resume(Threads[i].thread,Globals)
end
if _==false then
self.Parent.OnError:Fire(Threads[i],"Error in thread: <"..Threads[i].Name.."> "..ret)
end
if ret==true or ret==false then
ret={}
end
end
if ret then
if ret[1]=="_kill_" then
table.remove(Threads,i)
elseif ret[1]=="_sleep_" then
Threads[i].timer:Reset()
Threads[i].sleep=ret[2]
elseif ret[1]=="_skip_" then
Threads[i].timer:Reset()
Threads[i].sleep=math.huge
local event=multi:newEvent(function(evnt) return counter>=evnt.counter end)
event.link=Threads[i]
event.counter=counter+ret[2]
event:OnEvent(function(evnt)
evnt.link.sleep=0
end)
elseif ret[1]=="_hold_" then
Threads[i].timer:Reset()
Threads[i].sleep=math.huge
local event=multi:newEvent(ret[2])
event.link=Threads[i]
event:OnEvent(function(evnt)
evnt.link.sleep=0
end)
elseif ret.Name then
Globals[ret.Name]=ret.Value
end
end
end
end
end
end
function multi:mainloop(settings) function multi:mainloop(settings)
multi.defaultSettings = settings or multi.defaultSettings multi.defaultSettings = settings or multi.defaultSettings
if not multi.isRunning then if not multi.isRunning then
local protect = false local protect = false
local priority = false local priority = false
local stopOnError = true
if settings then if settings then
if settings.preLoop then if settings.preLoop then
settings.preLoop(self) settings.preLoop(self)
end end
if settings.stopOnError then
stopOnError = settings.stopOnError
end
protect = settings.protect protect = settings.protect
priority = settings.priority priority = settings.priority
end end
@ -781,6 +849,9 @@ function multi:mainloop(settings)
if err then if err then
Loop[_D].error=err Loop[_D].error=err
self.OnError:Fire(Loop[_D],err) self.OnError:Fire(Loop[_D],err)
if stopOnError then
Loop[_D]:Destroy()
end
end end
end end
end end
@ -803,6 +874,9 @@ function multi:mainloop(settings)
if err then if err then
Loop[_D].error=err Loop[_D].error=err
self.OnError:Fire(Loop[_D],err) self.OnError:Fire(Loop[_D],err)
if stopOnError then
Loop[_D]:Destroy()
end
end end
end end
end end
@ -826,6 +900,9 @@ function multi:mainloop(settings)
if err then if err then
Loop[_D].error=err Loop[_D].error=err
self.OnError:Fire(Loop[_D],err) self.OnError:Fire(Loop[_D],err)
if stopOnError then
Loop[_D]:Destroy()
end
end end
end end
end end
@ -1386,6 +1463,9 @@ end
function thread.yeild() function thread.yeild()
coroutine.yield({"_sleep_",0}) coroutine.yield({"_sleep_",0})
end end
function thread.isThread()
return coroutine.running()
end
function thread.getCores() function thread.getCores()
return thread.__CORES return thread.__CORES
end end
@ -1488,8 +1568,7 @@ multi.scheduler:OnLoop(function(self)
_,ret=coroutine.resume(self.Threads[i].thread,self.Globals) _,ret=coroutine.resume(self.Threads[i].thread,self.Globals)
end end
if _==false then if _==false then
self.Parent.OnError:Fire(self.Threads[i],ret) self.Parent.OnError:Fire(Threads[i],"Error in thread: <"..Threads[i].Name.."> "..ret)
print("Error in thread: <"..self.Threads[i].Name.."> "..ret)
end end
if ret==true or ret==false then if ret==true or ret==false then
print("Thread Ended!!!") print("Thread Ended!!!")

View File

@ -1,4 +1,4 @@
-- CURRENT TASK: newNetThread() -- CURRENT TASK:
local multi = require("multi") local multi = require("multi")
local net = require("net") local net = require("net")
@ -158,7 +158,7 @@ function multi:newNode(settings)
local node = {} local node = {}
node.name = name node.name = name
multi.OnError(function(i,error) multi.OnError(function(i,error)
node.OnError(node,error,node.server) node.OnError:Fire(node,error,node.server)
end) end)
node.server = net:newUDPServer(0) -- hosts the node using the default port node.server = net:newUDPServer(0) -- hosts the node using the default port
_, node.port = node.server.udp:getsockname() _, node.port = node.server.udp:getsockname()
@ -168,12 +168,16 @@ function multi:newNode(settings)
node.hasFuncs = {} node.hasFuncs = {}
node.OnError = multi:newConnection() node.OnError = multi:newConnection()
node.OnError(function(node,err,master) node.OnError(function(node,err,master)
print("ERROR",err,node.name)
local temp = bin.new() local temp = bin.new()
temp:addBlock(#node.name) temp:addBlock(#node.name,2)
temp:addBlock(node.name) temp:addBlock(node.name)
temp:addBlock(#err) temp:addBlock(#err,2)
temp:addBlock(err) temp:addBlock(err)
node.server:send(char(CMD_ERROR..temp)) for i,v in pairs(node.connections) do
print(i)
v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3])
end
end) end)
if settings.managerDetails then if settings.managerDetails then
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
@ -216,14 +220,17 @@ function multi:newNode(settings)
end end
function node:getConsole() function node:getConsole()
local c = {} local c = {}
c.connections = node.connections local conn = node.connections
function c:print(...) function c.print(...)
local data = char(CMD_CONSOLE)..packData({...}) local data = char(CMD_CONSOLE)..packData({...})
for i,v in pairs(self.connections) do for i,v in pairs(conn) do
--print(i)
v[1]:send(v[2],data,v[3]) v[1]:send(v[2],data,v[3])
end end
print("sent message")
end end
-- function c:printTo()
-- end
return c return c
end end
node.loadRate=1 node.loadRate=1
@ -266,7 +273,10 @@ function multi:newNode(settings)
local func = holder:getBlock("s",len2) local func = holder:getBlock("s",len2)
args = resolveData(args) args = resolveData(args)
func = resolveData(func) func = resolveData(func)
func(unpack(args)) status, err = pcall(func,node,unpack(args))
if not status then
node.OnError:Fire(node,err,server)
end
elseif cmd == CMD_INITNODE then elseif cmd == CMD_INITNODE then
print("Connected with another node!") print("Connected with another node!")
node.connections[dat]={server,ip,port} node.connections[dat]={server,ip,port}
@ -427,7 +437,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
multi:newTLoop(function(loop) multi:newTLoop(function(loop)
if self.connections[name]~=nil then if self.connections[name]~=nil then
self.connections[name]:send(data) self.connections[name]:send(data)
loop:Desrtoy() loop:Destroy()
end end
end,.1) end,.1)
else else
@ -486,7 +496,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
local node = temp:getBlock("s",len) local node = temp:getBlock("s",len)
len = temp:getBlock("n",2) len = temp:getBlock("n",2)
local err = temp:getBlock("s",len) local err = temp:getBlock("s",len)
master.OnError(name,err) master.OnError:Fire(name,err)
elseif cmd == CMD_CONSOLE then elseif cmd == CMD_CONSOLE then
print(unpack(resolveData(dat))) print(unpack(resolveData(dat)))
elseif cmd == CMD_PONG then elseif cmd == CMD_PONG then

View File

@ -10,7 +10,7 @@ node = multi:newNode{
--managerDetails = {"localhost",12345}, -- connects to the node manager if one exists --managerDetails = {"localhost",12345}, -- connects to the node manager if one exists
} }
function RemoteTest(a,b,c) -- a function that we will be executing remotely function RemoteTest(a,b,c) -- a function that we will be executing remotely
print("Yes I work!",a,b,c) --print("Yes I work!",a,b,c)
multi:newThread("waiter",function() multi:newThread("waiter",function()
print("Hello!") print("Hello!")
while true do while true do
@ -31,6 +31,7 @@ multi:newThread("some-test",function()
end,"NODE_TESTNODE") end,"NODE_TESTNODE")
settings = { settings = {
priority = 0, -- 1 or 2 priority = 0, -- 1 or 2
protect = false, -- if something goes wrong we will crash hard, but the speed gain is good stopOnError = true,
protect = true, -- if something goes wrong we will crash hard, but the speed gain is good
} }
multi:mainloop(settings) multi:mainloop(settings)

View File

@ -11,22 +11,25 @@ master = multi:newMaster{
--managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port) --managerDetails = {"localhost",12345}, -- the details to connect to the node manager (ip,port)
} }
-- Send to all the nodes that are connected to the master -- Send to all the nodes that are connected to the master
master.OnNodeConnected(function(node) master.OnError(function(name,err)
print(name.." has encountered an error: "..err)
end)
master.OnNodeConnected(function(name)
print("Lets Go!") print("Lets Go!")
master:newNetworkThread("Thread",function() master:newNetworkThread("Thread",function(node)
local node = _G.node local print = node:getConsole().print -- it is important to define things as local... another thing i could do is fenv to make sure all masters work in a protectd isolated enviroment
local console = node:getConsole()
multi:newTLoop(function() multi:newTLoop(function()
console:print("Yo whats up man!") print("Yo whats up man!")
error("doing a test")
end,1) end,1)
end) end)
master:execute("RemoteTest",node,1,2,3) master:execute("RemoteTest",name,1,2,3)
multi:newThread("waiter",function() multi:newThread("waiter",function()
print("Hello!",node) print("Hello!",name)
while true do while true do
thread.sleep(2) thread.sleep(2)
master:pushTo(node,"This is a test 2") master:pushTo(name,"This is a test 2")
if master.connections["NODE_"..node]==nil then if master.connections["NODE_"..name]==nil then
thread.kill() thread.kill()
end end
end end
@ -45,6 +48,7 @@ end,"NODE_TESTNODE")
-- Starting the multitasker -- Starting the multitasker
settings = { settings = {
priority = 0, -- 0, 1 or 2 priority = 0, -- 0, 1 or 2
protect = false, protect = true,
} }
multi:mainloop(settings) multi:threadloop(settings)
--multi:mainloop(settings)