working on 1.14.4

This commit is contained in:
Ryan Ward 2019-10-29 23:57:04 -04:00
parent 296d56d233
commit 689133e71f
8 changed files with 1279 additions and 944 deletions

View File

@ -1,5 +1,38 @@
# Changes
[TOC]
Update 14.0.0 Consistency and stability
-------------
Added:
- multi.init() -- Initlizes the library! Must be called for multiple files to have the same handle. Example below
- thread.holdFor(NUMBER sec, FUNCTION condition) -- Works like hold, but timesout when a certain amount of time has passed!
- thread.holdWithin(NUMBER; cycles,FUNCTION; condition) -- Holds until the condition is met! If the number of cycles passed is equal to cycles, hold will return a timeout error
**Note:** when hold has a timeout the first argument will return nil and the second atgument will be TIMEOUT, if not timed out hold will return the values from the conditions
Fixed:
- Connections had a preformance issue where they would create a non function when using connection.getConnection() of a non existing label.
- An internal mismanagement of the treads scheduler was fixed. Now it should be quicker and free of bugs
- Thread error management is the integrations was not properly implemented. This is now fixed
-
Changed:
- Ties in to the new function that has been added multi.init()
```lua
local multi, thread = require("multi").init() -- The require multi function still returns the multi object like before
```
Note: Using init allows you to get access to the thread handle. This was done because thread was modifying the global space as well as multi. I wanted to not modify the global space anymore.
internally most of your code can stay the same, you only need to change how the library is required. I do toy a bit with the global space, buy I use a variable name that is invalid as a variable name. The variable name is $multi. This is used internally to keep some records and maintain a clean space
Also when using intergrations things now look more consistant.
```lua
local multi, thread = require("multi").init()
local GLOBSL, THREAD = require("multi.integration.lanesManager").init() -- or whichever manager you are using
local nGLOBAL, nTHREAD = require("multi.intergration.networkManager).inti()
```
Note: You can mix and match integrations together. You can create systemthreads within network threads, and you can also create cotoutine based threads within bothe network and system threads. This gives you quite a bit of flexibility to create something awesome.
Going forward:
- Sterlization is still being worked on. I was planning of having a way to save state of multi objects and such, but that isn't possible without knowing how your code is strutured or if it is even made to handle something like that. So I decided on giving a tostirng/tofile method for each multi object as well as a fromstring/fromfile method for use. This is technically in the code already, but not documented. It has actually been in the code for a while, but its not done just yet and I want to make it perfect before sending it out.
Update 13.1.0 Bug fixes and features added
-------------
Added:

View File

@ -21,12 +21,12 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi = require("multi")
os.sleep=love.timer.sleep
multi.drawF={}
function multi:onDraw(func,i)
i=i or 1
table.insert(self.drawF,i,func)
local multi, thread = require("multi").init()
os.sleep = love.timer.sleep
multi.drawF = {}
function multi:onDraw(func, i)
i = i or 1
table.insert(self.drawF, i, func)
end
multi.OnKeyPressed = multi:newConnection()
multi.OnKeyReleased = multi:newConnection()
@ -38,39 +38,48 @@ multi.OnDraw = multi:newConnection()
multi.OnTextInput = multi:newConnection()
multi.OnUpdate = multi:newConnection()
multi.OnQuit = multi:newConnection()
multi.OnPreLoad(function()
local function Hook(func,conn)
if love[func]~=nil then
multi.OnPreLoad(
function()
local function Hook(func, conn)
if love[func] ~= nil then
love[func] = Library.convert(love[func])
love[func]:inject(function(...)
love[func]:inject(
function(...)
conn:Fire(...)
return {...}
end,1)
elseif love[func]==nil then
end,
1
)
elseif love[func] == nil then
love[func] = function(...)
conn:Fire(...)
end
end
end
Hook("quit",multi.OnQuit)
Hook("keypressed",multi.OnKeyPressed)
Hook("keyreleased",multi.OnKeyReleased)
Hook("mousepressed",multi.OnMousePressed)
Hook("mousereleased",multi.OnMouseReleased)
Hook("wheelmoved",multi.OnMouseWheelMoved)
Hook("mousemoved",multi.OnMouseMoved)
Hook("draw",multi.OnDraw)
Hook("textinput",multi.OnTextInput)
Hook("update",multi.OnUpdate)
multi.OnDraw(function()
for i=1,#multi.drawF do
love.graphics.setColor(255,255,255,255)
Hook("quit", multi.OnQuit)
Hook("keypressed", multi.OnKeyPressed)
Hook("keyreleased", multi.OnKeyReleased)
Hook("mousepressed", multi.OnMousePressed)
Hook("mousereleased", multi.OnMouseReleased)
Hook("wheelmoved", multi.OnMouseWheelMoved)
Hook("mousemoved", multi.OnMouseMoved)
Hook("draw", multi.OnDraw)
Hook("textinput", multi.OnTextInput)
Hook("update", multi.OnUpdate)
multi.OnDraw(
function()
for i = 1, #multi.drawF do
love.graphics.setColor(255, 255, 255, 255)
multi.drawF[i]()
end
end)
end)
multi.OnQuit(function()
end
)
end
)
multi.OnQuit(
function()
multi.Stop()
love.event.quit()
end)
end
)
return multi

View File

@ -25,6 +25,9 @@ local bin = pcall(require,"bin")
local multi = {}
local clock = os.clock
local thread = {}
if not _G["$multi"] then
_G["$multi"] = {multi=multi,thread=thread}
end
multi.Version = "13.1.0"
multi._VERSION = "13.1.0"
multi.stage = "stable"
@ -78,6 +81,9 @@ multi.PriorityTick=1 -- Between 1, 2 and 4
multi.Priority=multi.Priority_High
multi.threshold=256
multi.threstimed=.001
function multi.init()
return _G["$multi"].multi,_G["$multi"].thread
end
function multi.queuefinal(self)
self:Destroy()
if self.Parent.Mainloop[#self.Parent.Mainloop] then
@ -758,6 +764,9 @@ function multi:newConnector()
local c = {Type = "connector"}
return c
end
local CRef = {
Fire = function() end
}
function multi:newConnection(protect,func)
local c={}
c.callback = func
@ -802,11 +811,9 @@ function multi:newConnection(protect,func)
return self
end
c.FConnect=c.fConnect
function c:getConnection(name,ingore)
if ingore then
return self.connections[name] or {
Fire=function() return end -- if the connection doesn't exist lets call all of them or silently ignore
}
function c:getConnection(name,ignore)
if ignore then
return self.connections[name] or CRef
else
return self.connections[name] or self
end
@ -1489,6 +1496,10 @@ function thread.hold(n)
thread._Requests()
return coroutine.yield({"_hold_",n or function() return true end})
end
function thread.holdFor(sec,n)
thread._Requests()
return coroutine.yield({"_holdF_", sec, n or function() return true end})
end
function thread.skip(n)
thread._Requests()
if not n then n = 1 elseif n<1 then n = 1 end
@ -1652,6 +1663,13 @@ function multi.initThreads()
threads[i].task = "hold"
threads[i].__ready = false
ret = nil
elseif ret[1]=="_holdF_" then
threads[i].sec = ret[2]
threads[i].func = ret[3]
threads[i].task = "holdF"
threads[i].time = clock()
threads[i].__ready = false
ret = nil
end
end
end
@ -1684,8 +1702,19 @@ function multi.initThreads()
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i].task == "holdF" then
t0,t1,t2,t3,t4,t5,t6 = threads[i].func()
if t0 then
threads[i].task = ""
threads[i].__ready = true
elseif clock() - threads[i].time>=threads[i].sec then
threads[i].task = ""
threads[i].__ready = true
t0 = nil
t1 = "TIMEOUT"
end
if threads[i].__ready then
end
if threads[i] and threads[i].__ready then
threads[i].__ready = false
_,ret=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6)
end
@ -1724,6 +1753,13 @@ function multi:threadloop()
threads[i].task = "hold"
threads[i].__ready = false
ret = nil
elseif ret[1]=="_holdF_" then
threads[i].sec = ret[2]
threads[i].func = ret[3]
threads[i].task = "holdF"
threads[i].time = clock()
threads[i].__ready = false
ret = nil
end
end
end
@ -1753,6 +1789,17 @@ function multi:threadloop()
threads[i].task = ""
threads[i].__ready = true
end
elseif threads[i].task == "holdF" then
t0,t1,t2,t3,t4,t5,t6 = threads[i].func()
if t0 then
threads[i].task = ""
threads[i].__ready = true
elseif clock() - threads[i].time>=threads[i].sec then
threads[i].task = ""
threads[i].__ready = true
t0 = nil
t1 = "TIMEOUT"
end
end
if threads[i].__ready then
threads[i].__ready = false
@ -2550,7 +2597,4 @@ end
function multi:setDefualtStateFlag(opt)
--
end
if not(multi.Version == "13.2.0" or multi.Version == "14.0.0") then
_G.thread = thread
end
return multi, thread
return multi

View File

@ -21,20 +21,26 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
package.path="?/init.lua;?.lua;"..package.path
package.path = "?/init.lua;?.lua;" .. package.path
local multi, thread = require("multi").init() -- get it all and have it on all lanes
if multi.integration then -- This allows us to call the lanes manager from supporting modules without a hassel
return {
init = function()
return multi.integration.GLOBAL, multi.integration.THREAD
end
}
end
function os.getOS()
if package.config:sub(1,1)=='\\' then
return 'windows'
if package.config:sub(1, 1) == "\\" then
return "windows"
else
return 'unix'
return "unix"
end
end
-- Step 1 get lanes
lanes=require("lanes").configure()
local multi, thread = require("multi") -- get it all and have it on all lanes
lanes = require("lanes").configure()
multi.SystemThreads = {}
local thread = thread
multi.isMainThread=true
multi.isMainThread = true
function multi:canSystemThread()
return true
end
@ -45,40 +51,45 @@ end
local __GlobalLinda = lanes.linda() -- handles global stuff
local __SleepingLinda = lanes.linda() -- handles sleeping stuff
-- For convenience a GLOBAL table will be constructed to handle requests
local GLOBAL={}
setmetatable(GLOBAL,{
__index=function(t,k)
local GLOBAL = {}
setmetatable(
GLOBAL,
{
__index = function(t, k)
return __GlobalLinda:get(k)
end,
__newindex=function(t,k,v)
__GlobalLinda:set(k,v)
end,
})
__newindex = function(t, k, v)
__GlobalLinda:set(k, v)
end
}
)
-- Step 3 rewrite the thread methods to use Lindas
local THREAD={}
function THREAD.set(name,val)
__GlobalLinda:set(name,val)
local THREAD = {}
function THREAD.set(name, val)
__GlobalLinda:set(name, val)
end
function THREAD.get(name)
__GlobalLinda:get(name)
end
local function randomString(n)
local str = ''
local str = ""
local strings = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','1','2','3','4','5','6','7','8','9','0','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'}
for i=1,n do
str = str..''..strings[math.random(1,#strings)]
for i = 1, n do
str = str .. "" .. strings[math.random(1, #strings)]
end
return str
end
function THREAD.waitFor(name)
local function wait()
math.randomseed(os.time())
__SleepingLinda:receive(.001,randomString(12))
__SleepingLinda:receive(.001, randomString(12))
end
repeat wait() until __GlobalLinda:get(name)
repeat
wait()
until __GlobalLinda:get(name)
return __GlobalLinda:get(name)
end
function THREAD.testFor(name,val,sym)
function THREAD.testFor(name, val, sym)
--
end
function THREAD.getCores()
@ -87,10 +98,10 @@ end
function THREAD.getThreads()
return GLOBAL.__THREADS__
end
if os.getOS()=="windows" then
THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS"))
if os.getOS() == "windows" then
THREAD.__CORES = tonumber(os.getenv("NUMBER_OF_PROCESSORS"))
else
THREAD.__CORES=tonumber(io.popen("nproc --all"):read("*n"))
THREAD.__CORES = tonumber(io.popen("nproc --all"):read("*n"))
end
function THREAD.kill() -- trigger the lane destruction
error("Thread was killed!")
@ -107,40 +118,42 @@ Idle wait keeps the CPU running better where busy wait wastes CPU cycles... Lane
however, a linda recieve will in fact be a idle wait! So we use that and wrap it in a nice package]]
function THREAD.sleep(n)
math.randomseed(os.time())
__SleepingLinda:receive(n,randomString(12))
__SleepingLinda:receive(n, randomString(12))
end
function THREAD.hold(n)
local function wait()
math.randomseed(os.time())
__SleepingLinda:receive(.001,randomString(12))
__SleepingLinda:receive(.001, randomString(12))
end
repeat wait() until n()
repeat
wait()
until n()
end
local rand = math.random(1,10000000)
local rand = math.random(1, 10000000)
-- Step 5 Basic Threads!
local threads = {}
local count = 1
local started = false
local livingThreads = {}
function multi:newSystemThread(name,func,...)
function multi:newSystemThread(name, func, ...)
multi.InitSystemThreadErrorHandler()
rand = math.random(1,10000000)
local c={}
local __self=c
c.name=name
rand = math.random(1, 10000000)
local c = {}
local __self = c
c.name = name
c.Name = name
c.Id = count
livingThreads[count] = {true,name}
livingThreads[count] = {true, name}
local THREAD_ID = count
count = count + 1
c.Type="sthread"
c.Type = "sthread"
c.creationTime = os.clock()
c.alive = true
local THREAD_NAME=name
local THREAD_NAME = name
local function func2(...)
local multi = require("multi")
_G["THREAD_NAME"]=THREAD_NAME
_G["THREAD_ID"]=THREAD_ID
_G["THREAD_NAME"] = THREAD_NAME
_G["THREAD_ID"] = THREAD_ID
math.randomseed(rand)
func(...)
if _G.__Needs_Multi then
@ -148,52 +161,61 @@ function multi:newSystemThread(name,func,...)
end
THREAD.kill()
end
c.thread=lanes.gen("*", func2)(...)
c.thread = lanes.gen("*", func2)(...)
function c:kill()
self.thread:cancel()
multi.print("Thread: '"..self.name.."' has been stopped!")
multi.print("Thread: '" .. self.name .. "' has been stopped!")
self.alive = false
end
table.insert(multi.SystemThreads,c)
table.insert(multi.SystemThreads, c)
c.OnError = multi:newConnection()
GLOBAL["__THREADS__"]=livingThreads
GLOBAL["__THREADS__"] = livingThreads
return c
end
multi.OnSystemThreadDied = multi:newConnection()
function multi.InitSystemThreadErrorHandler()
if started==true then return end
if started == true then
return
end
started = true
multi:newThread("ThreadErrorHandler",function()
multi:newThread(
"ThreadErrorHandler",
function()
local threads = multi.SystemThreads
while true do
thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough.
for i=#threads,1,-1 do
local v,err,t=threads[i].thread:join(.001)
for i = #threads, 1, -1 do
local v, err, t = threads[i].thread:join(.001)
if err then
if err:find("Thread was killed!") then
print(err)
livingThreads[threads[i].Id] = {false,threads[i].Name}
livingThreads[threads[i].Id] = {false, threads[i].Name}
threads[i].alive = false
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"]=livingThreads
table.remove(threads,i)
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
elseif err:find("stack traceback") then
print(err)
threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">")
threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">")
threads[i].alive = false
livingThreads[threads[i].Id] = {false,threads[i].Name}
livingThreads[threads[i].Id] = {false, threads[i].Name}
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"]=livingThreads
table.remove(threads,i)
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
end
end
end
end
end)
end
)
end
multi.print("Integrated Lanes!")
multi.integration={} -- for module creators
multi.integration.GLOBAL=GLOBAL
multi.integration.THREAD=THREAD
multi.integration = {} -- for module creators
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
require("multi.integration.shared")
return {init=function() return GLOBAL, THREAD end}
return {
init = function()
return GLOBAL, THREAD
end
}

View File

@ -22,15 +22,23 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi = require("multi.compat.love2d")
if multi.integration then -- This allows us to call the lanes manager from supporting modules without a hassel
return {
init = function()
return multi.integration.GLOBAL, multi.integration.THREAD
end
}
end
function multi:canSystemThread()
return true
end
function multi:getPlatform()
return "love2d"
end
multi.integration={}
multi.integration.love2d={}
multi.integration.love2d.ThreadBase=[[
multi.integration = {}
multi.integration.love2d = {}
multi.integration.love2d.ThreadBase =
[[
tab={...}
__THREADID__=table.remove(tab,1)
__THREADNAME__=table.remove(tab,1)
@ -198,27 +206,30 @@ updater:OnUpdate(__sync__)
func=loadDump([=[INSERT_USER_CODE]=])(unpack(tab))
multi:mainloop()
]]
GLOBAL={} -- Allow main thread to interact with these objects as well
GLOBAL = {} -- Allow main thread to interact with these objects as well
_G.THREAD_ID = 0
__proxy__={}
setmetatable(GLOBAL,{
__index=function(t,k)
__proxy__ = {}
setmetatable(
GLOBAL,
{
__index = function(t, k)
return __proxy__[k]
end,
__newindex=function(t,k,v)
__proxy__[k]=v
for i=1,#__channels__ do
if type(v)=="userdata" then
__newindex = function(t, k, v)
__proxy__[k] = v
for i = 1, #__channels__ do
if type(v) == "userdata" then
__channels__[i]:push(v)
else
__channels__[i]:push("SYNC "..type(v).." "..k.." "..resolveData(v))
__channels__[i]:push("SYNC " .. type(v) .. " " .. k .. " " .. resolveData(v))
end
end
end,
})
THREAD={} -- Allow main thread to interact with these objects as well
multi.integration.love2d.mainChannel=love.thread.getChannel("__MainChan__")
isMainThread=true
end
}
)
THREAD = {} -- Allow main thread to interact with these objects as well
multi.integration.love2d.mainChannel = love.thread.getChannel("__MainChan__")
isMainThread = true
multi.SystemThreads = {}
function THREAD.getName()
return __THREADNAME__
@ -232,9 +243,9 @@ function ToStr(val, name, skipnewlines, depth)
local tmp = string.rep(" ", depth)
if name then
if type(name) == "string" then
tmp = tmp .. "[\""..name.."\"] = "
tmp = tmp .. '["' .. name .. '"] = '
else
tmp = tmp .. "["..(name or "").."] = "
tmp = tmp .. "[" .. (name or "") .. "] = "
end
end
if type(val) == "table" then
@ -250,96 +261,102 @@ function ToStr(val, name, skipnewlines, depth)
elseif type(val) == "boolean" then
tmp = tmp .. (val and "true" or "false")
elseif type(val) == "function" then
tmp = tmp .. "loadDump([===["..dump(val).."]===])"
tmp = tmp .. "loadDump([===[" .. dump(val) .. "]===])"
else
tmp = tmp .. "\"[inserializeable datatype:" .. type(val) .. "]\""
tmp = tmp .. '"[inserializeable datatype:' .. type(val) .. ']"'
end
return tmp
end
function resolveType(tp,d)
if tp=="number" then
function resolveType(tp, d)
if tp == "number" then
return tonumber(d)
elseif tp=="bool" then
return (d=="true")
elseif tp=="function" then
return loadDump("[==["..d.."]==]")
elseif tp=="table" then
return loadstring("return "..d)()
elseif tp=="nil" then
elseif tp == "bool" then
return (d == "true")
elseif tp == "function" then
return loadDump("[==[" .. d .. "]==]")
elseif tp == "table" then
return loadstring("return " .. d)()
elseif tp == "nil" then
return nil
else
return d
end
end
function resolveData(v)
local data=""
if type(v)=="table" then
local data = ""
if type(v) == "table" then
return ToStr(v)
elseif type(v)=="function" then
elseif type(v) == "function" then
return dump(v)
elseif type(v)=="string" or type(v)=="number" or type(v)=="bool" or type(v)=="nil" then
elseif type(v) == "string" or type(v) == "number" or type(v) == "bool" or type(v) == "nil" then
return tostring(v)
end
return data
end
function loadDump(d)
local s={}
local s = {}
for p in d:gmatch("(%d-)\\") do
s[#s+1]=string.char(tonumber(p))
s[#s + 1] = string.char(tonumber(p))
end
return loadstring(table.concat(s))
end
function dump(func)
local code,t={},string.dump(func)
for i=1,#t do
code[#code+1]=string.byte(t:sub(i,i)).."\\"
local code, t = {}, string.dump(func)
for i = 1, #t do
code[#code + 1] = string.byte(t:sub(i, i)) .. "\\"
end
return table.concat(code)
end
local function randomString(n)
local str = ''
local str = ""
local strings = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','1','2','3','4','5','6','7','8','9','0','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'}
for i=1,n do
str = str..''..strings[math.random(1,#strings)]
for i = 1, n do
str = str .. "" .. strings[math.random(1, #strings)]
end
return str
end
local count = 1
local livingThreads = {}
function multi:newSystemThread(name,func,...) -- the main method
function multi:newSystemThread(name, func, ...) -- the main method
multi.InitSystemThreadErrorHandler()
local c={}
c.name=name
local c = {}
c.name = name
c.Name = name
c.Type="sthread"
c.ID=c.name.."<ID|"..randomString(8)..">"
c.Id=count
c.Type = "sthread"
c.ID = c.name .. "<ID|" .. randomString(8) .. ">"
c.Id = count
c.creationTime = os.clock()
count = count + 1
c.thread=love.thread.newThread(multi.integration.love2d.ThreadBase:gsub("INSERT_USER_CODE",dump(func)))
livingThreads[count] = {true,name}
c.thread = love.thread.newThread(multi.integration.love2d.ThreadBase:gsub("INSERT_USER_CODE", dump(func)))
livingThreads[count] = {true, name}
livingThreads[c.thread] = c
c.OnError = multi:newConnection()
c.thread:start(c.ID,c.name,THREAD_ID,...)
c.thread:start(c.ID, c.name, THREAD_ID, ...)
function c:kill()
multi.integration.GLOBAL["__DIEPLZ"..self.ID.."__"]="__DIEPLZ"..self.ID.."__"
multi.integration.GLOBAL["__DIEPLZ" .. self.ID .. "__"] = "__DIEPLZ" .. self.ID .. "__"
end
return c
end
function love.threaderror( thread, errorstr )
multi.OnError:Fire(thread,errorstr)
livingThreads[thread].OnError:Fire(threads[i],err,"Error in systemThread: '"..livingThreads[thread].name.."' <"..errorstr..">")
multi.print("Error in systemThread: "..tostring(thread)..": "..errorstr)
function love.threaderror(thread, errorstr)
multi.OnError:Fire(thread, errorstr)
livingThreads[thread].OnError:Fire(
threads[i],
err,
"Error in systemThread: '" .. livingThreads[thread].name .. "' <" .. errorstr .. ">"
)
multi.print("Error in systemThread: " .. tostring(thread) .. ": " .. errorstr)
end
local THREAD={}
function THREAD.set(name,val)
GLOBAL[name]=val
local THREAD = {}
function THREAD.set(name, val)
GLOBAL[name] = val
end
function THREAD.get(name)
return GLOBAL[name]
end
function THREAD.waitFor(name)
repeat multi:uManager() until GLOBAL[name]
repeat
multi:uManager()
until GLOBAL[name]
return GLOBAL[name]
end
function THREAD.getCores()
@ -349,85 +366,95 @@ function THREAD.sleep(n)
love.timer.sleep(n)
end
function THREAD.hold(n)
repeat multi:uManager() until n()
repeat
multi:uManager()
until n()
end
__channels__={}
multi.integration.GLOBAL=GLOBAL
multi.integration.THREAD=THREAD
updater=multi:newLoop(function(self)
local data=multi.integration.love2d.mainChannel:pop()
__channels__ = {}
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
updater =
multi:newLoop(
function(self)
local data = multi.integration.love2d.mainChannel:pop()
while data do
if type(data)=="string" then
local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)")
if cmd=="SYNC" then
__proxy__[name]=resolveType(tp,d)
for i=1,#__channels__ do
if type(data) == "string" then
local cmd, tp, name, d = data:match("(%S-) (%S-) (%S-) (.+)")
if cmd == "SYNC" then
__proxy__[name] = resolveType(tp, d)
for i = 1, #__channels__ do
-- send data to other threads
if type(v)=="userdata" then
if type(v) == "userdata" then
__channels__[i]:push(v)
else
__channels__[i]:push("SYNC "..tp.." "..name.." "..d)
__channels__[i]:push("SYNC " .. tp .. " " .. name .. " " .. d)
end
end
elseif cmd=="NEWTHREAD" then
__channels__[#__channels__+1]=love.thread.getChannel(tp)
for k,v in pairs(__proxy__) do -- sync the global with each new thread
if type(v)=="userdata" then
elseif cmd == "NEWTHREAD" then
__channels__[#__channels__ + 1] = love.thread.getChannel(tp)
for k, v in pairs(__proxy__) do -- sync the global with each new thread
if type(v) == "userdata" then
__channels__[#__channels__]:push(v)
else
__channels__[#__channels__]:push("SYNC "..type(v).." "..k.." "..resolveData(v))
__channels__[#__channels__]:push("SYNC " .. type(v) .. " " .. k .. " " .. resolveData(v))
end
end
end
else
__proxy__[name]=data
__proxy__[name] = data
end
data=multi.integration.love2d.mainChannel:pop()
data = multi.integration.love2d.mainChannel:pop()
end
end)
end
)
multi.OnSystemThreadDied = multi:newConnection()
local started = false
function multi.InitSystemThreadErrorHandler()
if started==true then return end
if started == true then
return
end
started = true
multi:newThread("ThreadErrorHandler",function()
multi:newThread(
"ThreadErrorHandler",
function()
local threads = multi.SystemThreads
while true do
thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough.
for i=#threads,1,-1 do
local v,err,t=threads[i].thread:join(.001)
for i = #threads, 1, -1 do
local v, err, t = threads[i].thread:join(.001)
if err then
if err:find("Thread was killed!") then
livingThreads[threads[i].Id] = {false,threads[i].Name}
livingThreads[threads[i].Id] = {false, threads[i].Name}
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"]=livingThreads
table.remove(threads,i)
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
else
threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">")
livingThreads[threads[i].Id] = {false,threads[i].Name}
threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">")
livingThreads[threads[i].Id] = {false, threads[i].Name}
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"]=livingThreads
table.remove(threads,i)
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
end
end
end
end
end)
end
)
end
require("multi.integration.shared")
multi.print("Integrated Love2d!")
return {
init=function(t)
init = function(t)
if t then
if t.threadNamespace then
multi.integration.THREADNAME=t.threadNamespace
multi.integration.love2d.ThreadBase:gsub("sThread",t.threadNamespace)
multi.integration.THREADNAME = t.threadNamespace
multi.integration.love2d.ThreadBase:gsub("sThread", t.threadNamespace)
end
if t.globalNamespace then
multi.integration.GLOBALNAME=t.globalNamespace
multi.integration.love2d.ThreadBase:gsub("GLOBAL",t.globalNamespace)
multi.integration.GLOBALNAME = t.globalNamespace
multi.integration.love2d.ThreadBase:gsub("GLOBAL", t.globalNamespace)
end
end
return GLOBAL,THREAD
return GLOBAL, THREAD
end
}

View File

@ -21,68 +21,133 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
-- I DEMAND USAGE FOR LUVIT
-- Cannot use discordia without my multitasking library (Which I love more that the luvit platform... then again i'm partial :P)
package.path="?/init.lua;?.lua;"..package.path
local function _INIT(luvitThread,timer)
package.path = "?/init.lua;?.lua;" .. package.path
local function _INIT(luvitThread, timer)
-- lots of this stuff should be able to stay the same
function os.getOS()
if package.config:sub(1,1)=='\\' then
return 'windows'
if package.config:sub(1, 1) == "\\" then
return "windows"
else
return 'unix'
return "unix"
end
end
-- Step 1 get setup threads on luvit... Sigh how do i even...
local multi, thread = require("multi")
isMainThread=true
local multi, thread = require("multi").init()
isMainThread = true
function multi:canSystemThread()
return true
end
function multi:getPlatform()
return "luvit"
end
local multi=multi
local multi = multi
-- Step 2 set up the Global table... is this possible?
local GLOBAL={}
setmetatable(GLOBAL,{
__index=function(t,k)
local GLOBAL = {}
setmetatable(
GLOBAL,
{
__index = function(t, k)
--print("No Global table when using luvit integration!")
return nil
end,
__newindex=function(t,k,v)
__newindex = function(t, k, v)
--print("No Global table when using luvit integration!")
end,
})
local THREAD={}
function THREAD.set(name,val)
end
}
)
local THREAD = {}
function THREAD.set(name, val)
--print("No Global table when using luvit integration!")
end
function THREAD.get(name)
--print("No Global table when using luvit integration!")
end
local function randomString(n)
local str = ''
local strings = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','1','2','3','4','5','6','7','8','9','0','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'}
for i=1,n do
str = str..''..strings[math.random(1,#strings)]
local str = ""
local strings = {
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"s",
"t",
"u",
"v",
"w",
"x",
"y",
"z",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"0",
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z"
}
for i = 1, n do
str = str .. "" .. strings[math.random(1, #strings)]
end
return str
end
function THREAD.waitFor(name)
--print("No Global table when using luvit integration!")
end
function THREAD.testFor(name,val,sym)
function THREAD.testFor(name, val, sym)
--print("No Global table when using luvit integration!")
end
function THREAD.getCores()
return THREAD.__CORES
end
if os.getOS()=="windows" then
THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS"))
if os.getOS() == "windows" then
THREAD.__CORES = tonumber(os.getenv("NUMBER_OF_PROCESSORS"))
else
THREAD.__CORES=tonumber(io.popen("nproc --all"):read("*n"))
THREAD.__CORES = tonumber(io.popen("nproc --all"):read("*n"))
end
function THREAD.kill() -- trigger the thread destruction
error("Thread was Killed!")
@ -95,34 +160,41 @@ local function _INIT(luvitThread,timer)
--print("No Global table when using luvit integration!")
end
-- Step 5 Basic Threads!
local function entry(path,name,func,...)
local timer = require'timer'
local luvitThread = require'thread'
package.path=path
local function entry(path, name, func, ...)
local timer = require "timer"
local luvitThread = require "thread"
package.path = path
loadstring(func)(...)
end
function multi:newSystemThread(name,func,...)
local c={}
local __self=c
c.name=name
c.Type="sthread"
c.thread={}
c.func=string.dump(func)
function multi:newSystemThread(name, func, ...)
local c = {}
local __self = c
c.name = name
c.Type = "sthread"
c.thread = {}
c.func = string.dump(func)
function c:kill()
-- print("No Global table when using luvit integration!")
end
luvitThread.start(entry,package.path,name,c.func,...)
luvitThread.start(entry, package.path, name, c.func, ...)
return c
end
multi.print("Integrated Luvit!")
multi.integration={} -- for module creators
multi.integration.GLOBAL=GLOBAL
multi.integration.THREAD=THREAD
multi.integration = {} -- for module creators
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
require("multi.integration.shared")
-- Start the main mainloop... This allows you to process your multi objects, but the engine on the main thread will be limited to .001 or 1 millisecond sigh...
local interval = timer.setInterval(1, function ()
local interval =
timer.setInterval(
1,
function()
multi:uManager()
end)
end
)
return multi
end
return {init=function(threadHandle,timerHandle) local multi = _INIT(threadHandle,timerHandle) return GLOBAL, THREAD end}
return {init = function(threadHandle, timerHandle)
local multi = _INIT(threadHandle, timerHandle)
return GLOBAL, THREAD
end}

View File

@ -21,7 +21,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi, thread = require("multi")
local multi, thread = require("multi").init()
local net = require("net")
local bin = require("bin")
bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix
@ -45,9 +45,9 @@ local byte = string.byte
-- Process to hold all of the networkManager's muilt objects
-- Helper for piecing commands
local function pieceCommand(cmd,...)
local function pieceCommand(cmd, ...)
local tab = {...}
table.insert(tab,1,cmd)
table.insert(tab, 1, cmd)
return table.concat(tab)
end
@ -56,17 +56,17 @@ local Queue = {}
Queue.__index = Queue
function Queue:newQueue()
local c = {}
setmetatable(c,self)
setmetatable(c, self)
return c
end
function Queue:push(data)
table.insert(self,data)
table.insert(self, data)
end
function Queue:raw_push(data) -- Internal usage only
table.insert(self,data)
table.insert(self, data)
end
function Queue:pop()
return table.remove(self,1)
return table.remove(self, 1)
end
function Queue:peek()
return self[1]
@ -79,39 +79,39 @@ multi.OnGUpdate = multi:newConnection()
local function packData(data)
-- returns the data that was sterilized
local dat = bin.new()
dat:addBlock(#type(data),1)
dat:addBlock(#type(data), 1)
dat:addBlock(type(data)) -- The type is first
if type(data)=="table" then
dat:addBlock(data,nil,"t")
if type(data) == "table" then
dat:addBlock(data, nil, "t")
elseif type(data) == "userdata" then
error("Cannot sterilize userdata!")
elseif type(data) == "number" then
dat:addBlock(data,nil,"d")
dat:addBlock(data, nil, "d")
elseif type(data) == "string" then
dat:addBlock(#data,4)
dat:addBlock(data,nil,"s")
dat:addBlock(#data, 4)
dat:addBlock(data, nil, "s")
elseif type(data) == "boolean" then
dat:addBlock(data,1)
dat:addBlock(data, 1)
elseif type(data) == "function" then
dat:addBlock(data,nil,"f")
dat:addBlock(data, nil, "f")
end
return dat.data
end
local function resolveData(data)
-- returns the data that was sterilized
local dat = bin.new(data)
local tlen = dat:getBlock("n",1)
local tp = dat:getBlock("s",tlen)
if tp=="table" then
local tlen = dat:getBlock("n", 1)
local tp = dat:getBlock("s", tlen)
if tp == "table" then
return dat:getBlock("t")
elseif tp=="number" then
elseif tp == "number" then
return dat:getBlock("d")
elseif tp=="string" then
local num = dat:getBlock("n",4)
return dat:getBlock("s",num)
elseif tp=="boolean" then
return dat:getBlock("b",1)
elseif tp=="function" then
elseif tp == "string" then
local num = dat:getBlock("n", 4)
return dat:getBlock("s", num)
elseif tp == "boolean" then
return dat:getBlock("b", 1)
elseif tp == "function" then
return dat:getBlock("f")
end
end
@ -119,16 +119,19 @@ end
-- internal global system
local GLOBAL = {}
local PROXY = {}
setmetatable(GLOBAL,{
__index = function(t,k)
setmetatable(
GLOBAL,
{
__index = function(t, k)
return PROXY[k]
end,
__newindex = function(t,k,v)
__newindex = function(t, k, v)
local v = v
PROXY[k] = v
multi.OnGUpdate:Fire(k,packData(v))
multi.OnGUpdate:Fire(k, packData(v))
end
})
}
)
-- In case you are unable to use broadcasting this can be used to help connect to nodes
function multi:nodeManager(port)
@ -140,39 +143,48 @@ function multi:nodeManager(port)
server.timeouts = {}
server.OnNodeAdded = multi:newConnection()
server.OnNodeRemoved = multi:newConnection()
server.OnDataRecieved(function(server,data,cid,ip,port)
local cmd = data:sub(1,1)
server.OnDataRecieved(
function(server, data, cid, ip, port)
local cmd = data:sub(1, 1)
if cmd == "R" then
multi:newThread("Node Client Manager",function(loop)
multi:newThread(
"Node Client Manager",
function(loop)
while true do
if server.timeouts[cid]==true then
if server.timeouts[cid] == true then
server.OnNodeRemoved:Fire(server.nodes[cid])
server.nodes[cid] = nil
server.timeouts[cid] = nil
thread.kill()
else
server.timeouts[cid] = true
server:send(cid,"ping")
server:send(cid, "ping")
end
thread.sleep(1)
end
end)
server.nodes[cid]=data:sub(2,-1)
end
)
server.nodes[cid] = data:sub(2, -1)
server.OnNodeAdded:Fire(server.nodes[cid])
elseif cmd == "G" then
server.OnNodeAdded(function(node)
server:send(cid,node)
end)
server.OnNodeRemoved(function(node)
server:send(cid,"R"..node:match("(.-)|"))
end)
for i,v in pairs(server.nodes) do
server:send(cid,v)
server.OnNodeAdded(
function(node)
server:send(cid, node)
end
)
server.OnNodeRemoved(
function(node)
server:send(cid, "R" .. node:match("(.-)|"))
end
)
for i, v in pairs(server.nodes) do
server:send(cid, v)
end
elseif cmd == "P" then
server.timeouts[cid] = nil
end
end)
end
)
end
-- The main driving force of the network manager: Nodes
function multi:newNode(settings)
@ -183,60 +195,69 @@ function multi:newNode(settings)
local name = settings.name or multi.randomString(8)
local node = {}
node.name = name
multi.OnError(function(i,error)
node.OnError:Fire(node,error,node.server)
end)
multi.OnError(
function(i, error)
node.OnError:Fire(node, error, node.server)
end
)
node.server = net:newUDPServer(0) -- hosts the node using the default port
_, node.port = node.server.udp:getsockname()
node.connections = net.ClientCache
node.queue = Queue:newQueue()
node.functions = bin.stream("RegisteredFunctions.dat",false)
node.functions = bin.stream("RegisteredFunctions.dat", false)
node.hasFuncs = {}
node.OnError = multi:newConnection()
node.OnError(function(node,err,master)
multi.print("ERROR",err,node.name)
node.OnError(
function(node, err, master)
multi.print("ERROR", err, node.name)
local temp = bin.new()
temp:addBlock(#node.name,2)
temp:addBlock(#node.name, 2)
temp:addBlock(node.name)
temp:addBlock(#err,2)
temp:addBlock(#err, 2)
temp:addBlock(err)
for i,v in pairs(node.connections) do
for i, v in pairs(node.connections) do
multi.print(i)
v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3])
v[1]:send(v[2], char(CMD_ERROR) .. temp.data, v[3])
end
end)
end
)
if settings.managerDetails then
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
local c = net:newTCPClient(settings.managerDetails[1], settings.managerDetails[2])
if not c then
multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false
multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!")
settings.noBroadCast = false
else
c.OnDataRecieved(function(self,data)
c.OnDataRecieved(
function(self, data)
if data == "ping" then
self:send("P")
end
end)
c:send("RNODE_"..name.."|"..net.getLocalIP().."|"..node.port)
end
)
c:send("RNODE_" .. name .. "|" .. net.getLocalIP() .. "|" .. node.port)
end
end
if not settings.preload then
if node.functions:getSize()~=0 then
if node.functions:getSize() ~= 0 then
multi.print("We have function(s) to preload!")
local len = node.functions:getBlock("n",1)
local name,func
local len = node.functions:getBlock("n", 1)
local name, func
while len do
name = node.functions:getBlock("s",len)
len = node.functions:getBlock("n",2)
func = node.functions:getBlock("s",len)
name = node.functions:getBlock("s", len)
len = node.functions:getBlock("n", 2)
func = node.functions:getBlock("s", len)
len = node.functions:read(1)
_G[name]=resolveData(func)
node.hasFuncs[name]=true
if not len then break end
_G[name] = resolveData(func)
node.hasFuncs[name] = true
if not len then
break
end
len = byte(len)
end
end
end
function node:pushTo(name,data)
node:sendTo(name,char(CMD_QUEUE)..packData(data))
function node:pushTo(name, data)
node:sendTo(name, char(CMD_QUEUE) .. packData(data))
end
function node:peek()
return node.queue:peek()
@ -248,10 +269,10 @@ function multi:newNode(settings)
local c = {}
local conn = node.connections
function c.print(...)
local data = char(CMD_CONSOLE)..packData({...})
for i,v in pairs(conn) do
local data = char(CMD_CONSOLE) .. packData({...})
for i, v in pairs(conn) do
--print(i)
v[1]:send(v[2],data,v[3])
v[1]:send(v[2], data, v[3])
end
end
-- function c:printTo()
@ -259,78 +280,89 @@ function multi:newNode(settings)
-- end
return c
end
node.loadRate=1
node.loadRate = 1
-- Lets tell the network we are alive!
node.server.OnDataRecieved(function(server,data,cid,ip,port)
local cmd = byte(data:sub(1,1)) -- the first byte is the command
local dat = data:sub(2,-1) -- the data that you want to read
node.server.OnDataRecieved(
function(server, data, cid, ip, port)
local cmd = byte(data:sub(1, 1)) -- the first byte is the command
local dat = data:sub(2, -1) -- the data that you want to read
if cmd == CMD_PING then
server:send(ip,char(CMD_PONG),port)
server:send(ip, char(CMD_PONG), port)
elseif cmd == CMD_QUEUE then
node.queue:push(resolveData(dat))
elseif cmd == CMD_REG then
if not settings.allowRemoteRegistering then
multi.print(ip..": has attempted to register a function when it is currently not allowed!")
multi.print(ip .. ": has attempted to register a function when it is currently not allowed!")
return
end
local temp = bin.new(dat)
local len = temp:getBlock("n",1)
local name = temp:getBlock("s",len)
local len = temp:getBlock("n", 1)
local name = temp:getBlock("s", len)
if node.hasFuncs[name] then
multi.print("Function already preloaded onto the node!")
return
end
len = temp:getBlock("n",2)
local func = temp:getBlock("s",len)
_G[name]=resolveData(func)
len = temp:getBlock("n", 2)
local func = temp:getBlock("s", len)
_G[name] = resolveData(func)
node.functions:addBlock(dat)
elseif cmd == CMD_CALL then
local temp = bin.new(dat)
local len = temp:getBlock("n",1)
local name = temp:getBlock("s",len)
len = temp:getBlock("n",4)
local args = temp:getBlock("s",len)
local len = temp:getBlock("n", 1)
local name = temp:getBlock("s", len)
len = temp:getBlock("n", 4)
local args = temp:getBlock("s", len)
_G[name](unpack(resolveData(args)))
elseif cmd == CMD_TASK then
local holder = bin.new(dat)
local len = holder:getBlock("n",4)
local args = holder:getBlock("s",len)
local len2 = holder:getBlock("n",4)
local func = holder:getBlock("s",len2)
local len = holder:getBlock("n", 4)
local args = holder:getBlock("s", len)
local len2 = holder:getBlock("n", 4)
local func = holder:getBlock("s", len2)
args = resolveData(args)
func = resolveData(func)
status, err = pcall(func,node,unpack(args))
status, err = pcall(func, node, unpack(args))
if not status then
node.OnError:Fire(node,err,server)
node.OnError:Fire(node, err, server)
end
elseif cmd == CMD_INITNODE then
multi.print("Connected with another node!")
node.connections[dat]={server,ip,port}
multi.OnGUpdate(function(k,v)
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
end)-- set this up
elseif cmd == CMD_INITMASTER then
multi.print("Connected to the master!",dat)
node.connections[dat]={server,ip,port}
multi.OnGUpdate(function(k,v)
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
end)-- set this up
multi:newTLoop(function()
server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port)
end,node.loadRate)
server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port)
server:send(ip,char(CMD_INITNODE)..node.name,port)
elseif cmd == CMD_GLOBAL then
local k,v = dat:match("(.-)|(.+)")
PROXY[k]=resolveData(v)
node.connections[dat] = {server, ip, port}
multi.OnGUpdate(
function(k, v)
server:send(ip, table.concat {char(CMD_GLOBAL), k, "|", v}, port)
end
end)
function node:sendTo(name,data)
)
-- set this up
elseif cmd == CMD_INITMASTER then
multi.print("Connected to the master!", dat)
node.connections[dat] = {server, ip, port}
multi.OnGUpdate(
function(k, v)
server:send(ip, table.concat {char(CMD_GLOBAL), k, "|", v}, port)
end
)
-- set this up
multi:newTLoop(
function()
server:send(ip, char(CMD_LOAD) .. node.name .. "|" .. multi:getLoad(), port)
end,
node.loadRate
)
server:send(ip, char(CMD_LOAD) .. node.name .. "|" .. multi:getLoad(), port)
server:send(ip, char(CMD_INITNODE) .. node.name, port)
elseif cmd == CMD_GLOBAL then
local k, v = dat:match("(.-)|(.+)")
PROXY[k] = resolveData(v)
end
end
)
function node:sendTo(name, data)
local conn = node.connections[name]
conn[1]:send(conn[2],data,conn[3])
conn[1]:send(conn[2], data, conn[3])
end
if not settings.noBroadCast then
node.server:broadcast("NODE_"..name)
node.server:broadcast("NODE_" .. name)
end
return node
end
@ -350,69 +382,76 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
master.connections = net.ClientCache -- Link to the client cache that is created on the net interface
master.loads = {}
master.timeouts = {}
master.trigger = multi:newFunction(function(self,node)
master.trigger =
multi:newFunction(
function(self, node)
master.OnFirstNodeConnected:Fire(node)
self:Pause()
end)
end
)
if settings.managerDetails then
local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
local client = net:newTCPClient(settings.managerDetails[1], settings.managerDetails[2])
if not client then
multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false
multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!")
settings.noBroadCast = false
else
client.OnDataRecieved(function(client,data)
local cmd = data:sub(1,1)
client.OnDataRecieved(
function(client, data)
local cmd = data:sub(1, 1)
if cmd == "N" then
print(data)
local name,ip,port = data:match("(.-)|(.-)|(.+)")
local c = net:newUDPClient(ip,port)
net.OnCastedClientInfo:Fire(c,name,ip,port)master.connections[name]=c
local name, ip, port = data:match("(.-)|(.-)|(.+)")
local c = net:newUDPClient(ip, port)
net.OnCastedClientInfo:Fire(c, name, ip, port)
master.connections[name] = c
elseif cmd == "R" then
local name = data:sub(2,-1)
master.connections[name]=nil
local name = data:sub(2, -1)
master.connections[name] = nil
end
end)
end
)
client:send("G") -- init your connection as a master
end
end
function master:doToAll(func)
for i,v in pairs(master.connections) do
func(i,v)
for i, v in pairs(master.connections) do
func(i, v)
end
end
function master:register(name,node,func)
function master:register(name, node, func)
if not node then
error("You must specify a node to execute a command on!")
end
local temp = bin.new()
local fData = packData(func)
temp:addBlock(CMD_REG,1)
temp:addBlock(#name,1)
temp:addBlock(name,#name)
temp:addBlock(#fData,2)
temp:addBlock(fData,#fData)
master:sendTo(node,temp.data)
temp:addBlock(CMD_REG, 1)
temp:addBlock(#name, 1)
temp:addBlock(name, #name)
temp:addBlock(#fData, 2)
temp:addBlock(fData, #fData)
master:sendTo(node, temp.data)
end
function master:execute(name,node,...)
function master:execute(name, node, ...)
if not node then
error("You must specify a node to execute a command on!")
end
if not name then
error("You must specify a function name to call on the node!")
end
local args = packData{...}
local args = packData {...}
local name = name
local node = node
local temp, len, data
temp = bin.new()
temp:addBlock(CMD_CALL,1)
temp:addBlock(#name,1)
temp:addBlock(name,#name)
temp:addBlock(#args,4)
temp:addBlock(args,#args)
master:sendTo(node,temp.data)
temp:addBlock(CMD_CALL, 1)
temp:addBlock(#name, 1)
temp:addBlock(name, #name)
temp:addBlock(#args, 4)
temp:addBlock(args, #args)
master:sendTo(node, temp.data)
end
function master:pushTo(name,data)
master:sendTo(name,char(CMD_QUEUE)..packData(data))
function master:pushTo(name, data)
master:sendTo(name, char(CMD_QUEUE) .. packData(data))
end
function master:peek()
return self.queue:peek()
@ -420,52 +459,68 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
function master:pop()
return self.queue:pop()
end
function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
function master:newNetworkThread(tname, func, name, ...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job
local fData = packData(func)
local tab = {...}
local aData = ""
if #tab~=o then
aData = (packData{...})
if #tab ~= o then
aData = (packData {...})
else
aData = (packData{"NO","ARGS"})
aData = (packData {"NO", "ARGS"})
end
local temp = bin.new()
temp:addBlock(#aData,4)
temp:addBlock(#aData, 4)
local len = temp.data
local temp2 = bin.new()
temp2:addBlock(#fData,4)
temp2:addBlock(#fData, 4)
local len2 = temp2.data
if not name then
local name = self:getFreeNode()
if not name then
name = self:getRandomNode()
end
if name==nil then
multi:newEvent(function() return name~=nil end):OnEvent(function(evnt)
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
if name == nil then
multi:newEvent(
function()
return name ~= nil
end
):OnEvent(
function(evnt)
self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData)
evnt:Destroy()
end):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self)
end
):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(
function(self)
self:Destroy()
end)
end
)
else
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData)
end
else
local name = "NODE_"..name
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
local name = "NODE_" .. name
self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData)
end
end
function master:sendTo(name,data)
if name:sub(1,5)~="NODE_" then
name = "NODE_"..name
function master:sendTo(name, data)
if name:sub(1, 5) ~= "NODE_" then
name = "NODE_" .. name
end
if self.connections[name]==nil then
multi:newEvent(function() return self.connections[name]~=nil end):OnEvent(function(evnt)
if self.connections[name] == nil then
multi:newEvent(
function()
return self.connections[name] ~= nil
end
):OnEvent(
function(evnt)
self.connections[name]:send(data)
evnt:Destroy()
end):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self)
end
):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(
function(self)
self:Destroy()
end)
end
)
else
self.connections[name]:send(data)
end
@ -474,8 +529,8 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
local count = 0
local min = math.huge
local refO
for i,v in pairs(master.loads) do
if v<min then
for i, v in pairs(master.loads) do
if v < min then
min = v
refO = i
end
@ -484,48 +539,56 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
end
function master:getRandomNode()
local list = {}
for i,v in pairs(master.connections) do
list[#list+1]=i:sub(6,-1)
for i, v in pairs(master.connections) do
list[#list + 1] = i:sub(6, -1)
end
return list[math.random(1,#list)]
return list[math.random(1, #list)]
end
net.OnCastedClientInfo(function(client,name,ip,port)
multi.OnGUpdate(function(k,v)
client:send(table.concat{char(CMD_GLOBAL),k,"|",v})
end)
net.OnCastedClientInfo(
function(client, name, ip, port)
multi.OnGUpdate(
function(k, v)
client:send(table.concat {char(CMD_GLOBAL), k, "|", v})
end
)
local nodename
for i,v in pairs(master.connections) do
for i, v in pairs(master.connections) do
nodename = i
end
client.OnClientReady(function()
client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect
client.OnClientReady(
function()
client:send(char(CMD_INITMASTER) .. master.name) -- Tell the node that you are a master trying to connect
if not settings.managerDetails then
multi:newThread("Node Data Link Controller",function(loop)
multi:newThread(
"Node Data Link Controller",
function(loop)
while true do
if master.timeouts[name]==true then
if master.timeouts[name] == true then
master.timeouts[name] = nil
master.connections[name] = nil
thread.kill()
else
master.timeouts[name] = true
master:sendTo(name,char(CMD_PING))
master:sendTo(name, char(CMD_PING))
end
thread.sleep(1)
end
end)
end
)
end
client.name = name
client.OnDataRecieved(function(client,data)
local cmd = byte(data:sub(1,1)) -- the first byte is the command
local dat = data:sub(2,-1) -- the data that you want to read
client.OnDataRecieved(
function(client, data)
local cmd = byte(data:sub(1, 1)) -- the first byte is the command
local dat = data:sub(2, -1) -- the data that you want to read
master.trigger(nodename)
if cmd == CMD_ERROR then
local temp = bin.new(dat)
local len = temp:getBlock("n",2)
local node = temp:getBlock("s",len)
len = temp:getBlock("n",2)
local err = temp:getBlock("s",len)
master.OnError:Fire(name,err)
local len = temp:getBlock("n", 2)
local node = temp:getBlock("s", len)
len = temp:getBlock("n", 2)
local err = temp:getBlock("s", len)
master.OnError:Fire(name, err)
elseif cmd == CMD_CONSOLE then
print(unpack(resolveData(dat)))
elseif cmd == CMD_PONG then
@ -535,15 +598,18 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
elseif cmd == CMD_QUEUE then
master.queue:push(resolveData(dat))
elseif cmd == CMD_GLOBAL then
local k,v = dat:match("(.-)|(.+)")
PROXY[k]=resolveData(v)
local k, v = dat:match("(.-)|(.+)")
PROXY[k] = resolveData(v)
elseif cmd == CMD_LOAD then
local name,load = dat:match("(.-)|(.+)")
master.loads[name]=tonumber(load)
local name, load = dat:match("(.-)|(.+)")
master.loads[name] = tonumber(load)
end
end)
end)
end)
end
)
end
)
end
)
if not settings.noBroadCast then
net:newCastedClients("NODE_(.+)") -- Searches for nodes and connects to them, the master.clients table will contain them by name
end
@ -551,6 +617,8 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
end
-- The init function that gets returned
multi.print("Integrated Network Parallelism")
return {init = function()
return {
init = function()
return GLOBAL
end}
end
}

View File

@ -21,23 +21,23 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
multi, thread = require("multi")
local multi, thread = require("multi").init()
function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a channel on both ends
local c={} -- where we will store our object
c.name=name -- set the name this is important for the love2d side
local c = {} -- where we will store our object
c.name = name -- set the name this is important for the love2d side
if love then -- check love
if love.thread then -- make sure we can use the threading module
function c:init() -- create an init function so we can mimic on both love2d and lanes
self.chan=love.thread.getChannel(self.name) -- create channel by the name self.name
self.chan = love.thread.getChannel(self.name) -- create channel by the name self.name
function self:push(v) -- push to the channel
local tab
if type(v)=="table" then
if type(v) == "table" then
tab = {}
for i,c in pairs(v) do
if type(c)=="function" then
tab[i]="\1"..string.dump(c)
for i, c in pairs(v) do
if type(c) == "function" then
tab[i] = "\1" .. string.dump(c)
else
tab[i]=c
tab[i] = c
end
end
self.chan:push(tab)
@ -46,19 +46,21 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
end
end
function self:pop() -- pop from the channel
local v=self.chan:pop()
if not v then return end
if type(v)=="table" then
local v = self.chan:pop()
if not v then
return
end
if type(v) == "table" then
tab = {}
for i,c in pairs(v) do
if type(c)=="string" then
if c:sub(1,1)=="\1" then
tab[i]=loadstring(c:sub(2,-1))
for i, c in pairs(v) do
if type(c) == "string" then
if c:sub(1, 1) == "\1" then
tab[i] = loadstring(c:sub(2, -1))
else
tab[i]=c
tab[i] = c
end
else
tab[i]=c
tab[i] = c
end
end
return tab
@ -67,19 +69,21 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
end
end
function self:peek()
local v=self.chan:peek()
if not v then return end
if type(v)=="table" then
local v = self.chan:peek()
if not v then
return
end
if type(v) == "table" then
tab = {}
for i,c in pairs(v) do
if type(c)=="string" then
if c:sub(1,1)=="\1" then
tab[i]=loadstring(c:sub(2,-1))
for i, c in pairs(v) do
if type(c) == "string" then
if c:sub(1, 1) == "\1" then
tab[i] = loadstring(c:sub(2, -1))
else
tab[i]=c
tab[i] = c
end
else
tab[i]=c
tab[i] = c
end
end
return tab
@ -87,7 +91,7 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
return self.chan:pop()
end
end
GLOBAL[self.name]=self -- send the object to the thread through the global interface
GLOBAL[self.name] = self -- send the object to the thread through the global interface
return self -- return the object
end
return c
@ -95,12 +99,12 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
error("Make sure you required the love.thread module!") -- tell the user if he/she didn't require said module
end
else
c.linda=lanes.linda() -- lanes is a bit easier, create the linda on the main thread
c.linda = lanes.linda() -- lanes is a bit easier, create the linda on the main thread
function c:push(v) -- push to the queue
self.linda:send("Q",v)
self.linda:send("Q", v)
end
function c:pop() -- pop the queue
return ({self.linda:receive(0,"Q")})[2]
return ({self.linda:receive(0, "Q")})[2]
end
function c:peek()
return self.linda:get("Q")
@ -108,41 +112,44 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
function c:init() -- mimic the feature that love2d requires, so code can be consistent
return self
end
multi.integration.GLOBAL[name]=c -- send the object to the thread through the global interface
multi.integration.GLOBAL[name] = c -- send the object to the thread through the global interface
end
return c
end
function multi:newSystemThreadedConnection(name,protect)
local c={}
function multi:newSystemThreadedConnection(name, protect)
local c = {}
c.name = name or error("You must provide a name for the connection object!")
c.protect = protect or false
c.idle = nil
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
local connSync = multi:newSystemThreadedQueue(c.name.."_CONN_SYNC")
local connFire = multi:newSystemThreadedQueue(c.name.."_CONN_FIRE")
local sThread = multi.integration.THREAD
local GLOBAL = multi.integration.GLOBAL
local connSync = multi:newSystemThreadedQueue(c.name .. "_CONN_SYNC")
local connFire = multi:newSystemThreadedQueue(c.name .. "_CONN_FIRE")
function c:init()
local multi = require("multi")
if love then -- lets make sure we don't reference up-values if using love2d
GLOBAL=_G.GLOBAL
sThread=_G.sThread
GLOBAL = _G.GLOBAL
sThread = _G.sThread
end
local conn = {}
conn.obj = multi:newConnection()
setmetatable(conn,{
__call=function(self,...)
setmetatable(
conn,
{
__call = function(self, ...)
return self:connect(...)
end
})
}
)
local ID = sThread.getID()
local sync = sThread.waitFor(self.name.."_CONN_SYNC"):init()
local fire = sThread.waitFor(self.name.."_CONN_FIRE"):init()
local sync = sThread.waitFor(self.name .. "_CONN_SYNC"):init()
local fire = sThread.waitFor(self.name .. "_CONN_FIRE"):init()
local connections = {}
if not multi.isMainThread then
connections = {0}
end
sync:push{"INIT",ID} -- Register this as an active connection!
sync:push {"INIT", ID} -- Register this as an active connection!
function conn:connect(func)
return self.obj(func)
end
@ -153,29 +160,32 @@ function multi:newSystemThreadedConnection(name,protect)
self.obj:Remove()
end
function conn:Fire(...)
for i = 1,#connections do
fire:push{connections[i],ID,{...}}
for i = 1, #connections do
fire:push {connections[i], ID, {...}}
end
end
function conn:FireTo(to,...)
function conn:FireTo(to, ...)
local good = false
for i = 1,#connections do
if connections[i]==to then
for i = 1, #connections do
if connections[i] == to then
good = true
break
end
end
if not good then return multi.print("NonExisting Connection!") end
fire:push{to,ID,{...}}
if not good then
return multi.print("NonExisting Connection!")
end
fire:push {to, ID, {...}}
end
-- FIRE {TO,FROM,{ARGS}}
local data
local clock = os.clock
conn.OnConnectionAdded = multi:newConnection()
multi:newLoop(function()
multi:newLoop(
function()
data = fire:peek()
if type(data)=="table" and data[1]==ID then
if data[2]==ID and conn.IgnoreSelf then
if type(data) == "table" and data[1] == ID then
if data[2] == ID and conn.IgnoreSelf then
fire:pop()
return
end
@ -183,32 +193,39 @@ function multi:newSystemThreadedConnection(name,protect)
conn.obj:Fire(unpack(data[3]))
end
data = sync:peek()
if data~=nil and data[1]=="SYNCA" and data[2]==ID then
if data ~= nil and data[1] == "SYNCA" and data[2] == ID then
sync:pop()
multi.nextStep(function()
multi.nextStep(
function()
conn.OnConnectionAdded:Fire(data[3])
end)
table.insert(connections,data[3])
end
if type(data)=="table" and data[1]=="SYNCR" and data[2]==ID then
)
table.insert(connections, data[3])
end
if type(data) == "table" and data[1] == "SYNCR" and data[2] == ID then
sync:pop()
for i=1,#connections do
for i = 1, #connections do
if connections[i] == data[3] then
table.remove(connections,i)
table.remove(connections, i)
end
end
end
end):setName("STConn.syncer")
end
):setName("STConn.syncer")
return conn
end
local cleanUp = {}
multi.OnSystemThreadDied(function(ThreadID)
for i=1,#syncs do
connSync:push{"SYNCR",syncs[i],ThreadID}
multi.OnSystemThreadDied(
function(ThreadID)
for i = 1, #syncs do
connSync:push {"SYNCR", syncs[i], ThreadID}
end
cleanUp[ThreadID] = true
end)
multi:newThread(c.name.." Connection-Handler",function()
end
)
multi:newThread(
c.name .. " Connection-Handler",
function()
local data
local clock = os.clock
local syncs = {}
@ -222,59 +239,70 @@ function multi:newSystemThreadedConnection(name,protect)
thread.skip()
end
data = connSync:peek()
if data~= nil and data[1]=="INIT" then
if data ~= nil and data[1] == "INIT" then
connSync:pop()
c.idle = clock()
table.insert(syncs,data[2])
for i=1,#syncs do
connSync:push{"SYNCA",syncs[i],data[2]}
table.insert(syncs, data[2])
for i = 1, #syncs do
connSync:push {"SYNCA", syncs[i], data[2]}
end
end
data = connFire:peek()
if data~=nil and cleanUp[data[1]] then
if data ~= nil and cleanUp[data[1]] then
local meh = data[1]
connFire:pop() -- lets remove dead thread stuff
multi:newAlarm(15):OnRing(function(a)
multi:newAlarm(15):OnRing(
function(a)
cleanUp[meh] = nil
end)
end
)
end
end
end)
GLOBAL[c.name]=c
end
)
GLOBAL[c.name] = c
return c
end
function multi:SystemThreadedBenchmark(n)
n=n or 1
local cores=multi.integration.THREAD.getCores()
local queue=multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init()
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
n = n or 1
local cores = multi.integration.THREAD.getCores()
local queue = multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init()
local sThread = multi.integration.THREAD
local GLOBAL = multi.integration.GLOBAL
local c = {}
for i=1,cores do
multi:newSystemThread("STHREAD_BENCH",function(n)
for i = 1, cores do
multi:newSystemThread(
"STHREAD_BENCH",
function(n)
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
if multi:getPlatform() == "love2d" then
GLOBAL = _G.GLOBAL
sThread = _G.sThread
end -- we cannot have upvalues... in love2d globals, not locals must be used
queue=sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread!
multi:benchMark(n):OnBench(function(self,count)
queue = sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread!
multi:benchMark(n):OnBench(
function(self, count)
queue:push(count)
sThread.kill()
error("Thread was killed!")
end)
multi:mainloop()
end,n)
end
multi:newThread("THREAD_BENCH",function()
)
multi:mainloop()
end,
n
)
end
multi:newThread(
"THREAD_BENCH",
function()
local count = 0
local cc = 0
while true do
thread.skip(1)
local dat = queue:pop()
if dat then
cc=cc+1
cc = cc + 1
count = count + dat
if cc == cores then
c.OnBench:Fire(count)
@ -282,238 +310,263 @@ function multi:SystemThreadedBenchmark(n)
end
end
end
end)
end
)
c.OnBench = multi:newConnection()
return c
end
function multi:newSystemThreadedConsole(name)
local c={}
local c = {}
c.name = name
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
local sThread = multi.integration.THREAD
local GLOBAL = multi.integration.GLOBAL
function c:init()
_G.__Needs_Multi = true
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
if multi:getPlatform() == "love2d" then
GLOBAL = _G.GLOBAL
sThread = _G.sThread
end
local cc={}
local cc = {}
if multi.isMainThread then
if GLOBAL["__SYSTEM_CONSOLE__"] then
cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init()
else
cc.stream = multi:newSystemThreadedQueue("__SYSTEM_CONSOLE__"):init()
multi:newLoop(function()
multi:newLoop(
function()
local data = cc.stream:pop()
if data then
local dat = table.remove(data,1)
if dat=="w" then
local dat = table.remove(data, 1)
if dat == "w" then
io.write(unpack(data))
elseif dat=="p" then
elseif dat == "p" then
print(unpack(data))
end
end
end):setName("ST.consoleSyncer")
end
):setName("ST.consoleSyncer")
end
else
cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init()
end
function cc:write(msg)
self.stream:push({"w",tostring(msg)})
self.stream:push({"w", tostring(msg)})
end
function cc:print(...)
local tab = {...}
for i=1,#tab do
tab[i]=tostring(tab[i])
for i = 1, #tab do
tab[i] = tostring(tab[i])
end
self.stream:push({"p",unpack(tab)})
self.stream:push({"p", unpack(tab)})
end
return cc
end
GLOBAL[c.name]=c
GLOBAL[c.name] = c
return c
end
-- NEEDS WORK
function multi:newSystemThreadedTable(name)
local c={}
c.name=name -- set the name this is important for identifying what is what
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
local c = {}
c.name = name -- set the name this is important for identifying what is what
local sThread = multi.integration.THREAD
local GLOBAL = multi.integration.GLOBAL
function c:init() -- create an init function so we can mimic on both love2d and lanes
_G.__Needs_Multi = true
local multi = require("multi")
if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL
sThread=_G.sThread
if multi:getPlatform() == "love2d" then
GLOBAL = _G.GLOBAL
sThread = _G.sThread
end
local cc={}
cc.tab={}
local cc = {}
cc.tab = {}
if multi.isMainThread then
if not GLOBAL[self.name.."_Tabled_Connection"] then
cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init()
if not GLOBAL[self.name .. "_Tabled_Connection"] then
cc.conn = multi:newSystemThreadedConnection(self.name .. "_Tabled_Connection"):init()
end
else
cc.conn = sThread.waitFor(self.name.."_Tabled_Connection"):init()
cc.conn = sThread.waitFor(self.name .. "_Tabled_Connection"):init()
end
function cc:waitFor(name)
repeat multi:uManager() until tab[name]~=nil
repeat
multi:uManager()
until tab[name] ~= nil
return tab[name]
end
local link = cc
cc.conn(function(k,v)
link.tab[k]=v
end)
setmetatable(cc,{
__index=function(t,k)
cc.conn(
function(k, v)
link.tab[k] = v
end
)
setmetatable(
cc,
{
__index = function(t, k)
return t.tab[k]
end,
__newindex=function(t,k,v)
t.tab[k]=v
t.conn:Fire(k,v)
__newindex = function(t, k, v)
t.tab[k] = v
t.conn:Fire(k, v)
end
})
}
)
return cc
end
GLOBAL[c.name]=c
GLOBAL[c.name] = c
return c
end
local jobqueuecount = 0
local jqueues = {}
function multi:newSystemThreadedJobQueue(a,b)
jobqueuecount=jobqueuecount+1
local GLOBAL=multi.integration.GLOBAL
local sThread=multi.integration.THREAD
function multi:newSystemThreadedJobQueue(a, b)
jobqueuecount = jobqueuecount + 1
local GLOBAL = multi.integration.GLOBAL
local sThread = multi.integration.THREAD
local c = {}
c.numberofcores = 4
c.idle = nil
c.name = "SYSTEM_THREADED_JOBQUEUE_"..jobqueuecount
c.name = "SYSTEM_THREADED_JOBQUEUE_" .. jobqueuecount
-- This is done to keep backwards compatibility for older code
if type(a)=="string" and not(b) then
if type(a) == "string" and not (b) then
c.name = a
elseif type(a)=="number" and not (b) then
elseif type(a) == "number" and not (b) then
c.numberofcores = a
elseif type(a)=="string" and type(b)=="number" then
elseif type(a) == "string" and type(b) == "number" then
c.name = a
c.numberofcores = b
elseif type(a)=="number" and type(b)=="string" then
elseif type(a) == "number" and type(b) == "string" then
c.name = b
c.numberofcores = a
end
if jqueues[c.name] then
error("A job queue by the name: "..c.name.." already exists!")
error("A job queue by the name: " .. c.name .. " already exists!")
end
jqueues[c.name] = true
c.isReady = false
c.jobnum=1
c.jobnum = 1
c.OnJobCompleted = multi:newConnection()
local queueIN = self:newSystemThreadedQueue("QUEUE_IN_"..c.name):init()
local queueCC = self:newSystemThreadedQueue("QUEUE_CC_"..c.name):init()
local queueREG = self:newSystemThreadedQueue("QUEUE_REG_"..c.name):init()
local queueJD = self:newSystemThreadedQueue("QUEUE_JD_"..c.name):init()
local queueDA = self:newSystemThreadedQueue("QUEUE_DA_"..c.name):init()
local queueIN = self:newSystemThreadedQueue("QUEUE_IN_" .. c.name):init()
local queueCC = self:newSystemThreadedQueue("QUEUE_CC_" .. c.name):init()
local queueREG = self:newSystemThreadedQueue("QUEUE_REG_" .. c.name):init()
local queueJD = self:newSystemThreadedQueue("QUEUE_JD_" .. c.name):init()
local queueDA = self:newSystemThreadedQueue("QUEUE_DA_" .. c.name):init()
c.OnReady = multi:newConnection()
function c:registerJob(name,func)
function c:registerJob(name, func)
for i = 1, self.numberofcores do
queueREG:push({name,func})
queueREG:push({name, func})
end
end
c.tempQueue = {}
function c:pushJob(name,...)
function c:pushJob(name, ...)
c.idle = os.clock()
if not self.isReady then
table.insert(c.tempQueue,{self.jobnum,name,...})
self.jobnum=self.jobnum+1
return self.jobnum-1
table.insert(c.tempQueue, {self.jobnum, name, ...})
self.jobnum = self.jobnum + 1
return self.jobnum - 1
else
queueIN:push{self.jobnum,name,...}
self.jobnum=self.jobnum+1
return self.jobnum-1
queueIN:push {self.jobnum, name, ...}
self.jobnum = self.jobnum + 1
return self.jobnum - 1
end
end
function c:doToAll(func)
local r = multi.randomString(12)
for i = 1, self.numberofcores do
queueDA:push{r,func}
queueDA:push {r, func}
end
end
for i=1,c.numberofcores do
multi:newSystemThread(c.name.." Worker Thread #"..i,function(name)
for i = 1, c.numberofcores do
multi:newSystemThread(
c.name .. " Worker Thread #" .. i,
function(name)
local multi = require("multi")
if love then -- lets make sure we don't reference up-values if using love2d
GLOBAL=_G.GLOBAL
sThread=_G.sThread
GLOBAL = _G.GLOBAL
sThread = _G.sThread
end
local CC = sThread.waitFor("QUEUE_CC_"..name):init()
local CC = sThread.waitFor("QUEUE_CC_" .. name):init()
CC:push("ready")
local FUNCS={}
local FUNCS = {}
local ids = {}
local JQI = sThread.waitFor("QUEUE_IN_"..name):init()
local JD = sThread.waitFor("QUEUE_JD_"..name):init()
local REG = sThread.waitFor("QUEUE_REG_"..name):init()
local DA = sThread.waitFor("QUEUE_DA_"..name):init()
local JQI = sThread.waitFor("QUEUE_IN_" .. name):init()
local JD = sThread.waitFor("QUEUE_JD_" .. name):init()
local REG = sThread.waitFor("QUEUE_REG_" .. name):init()
local DA = sThread.waitFor("QUEUE_DA_" .. name):init()
local lastjob = os.clock()
multi:newLoop(function()
local job=JQI:pop()
local rd=REG:peek()
local da=DA:peek()
multi:newLoop(
function()
local job = JQI:pop()
local rd = REG:peek()
local da = DA:peek()
if rd then
if not FUNCS[rd[1]] then
FUNCS[rd[1]]=rd[2]
rd=nil
FUNCS[rd[1]] = rd[2]
rd = nil
REG:pop()
end
end
if da then
if not ids[da[1]] then
local meh = da[1]
ids[da[1]]=true
ids[da[1]] = true
da[2](multi)
da=nil
da = nil
DA:pop()
multi:newAlarm(60):OnRing(function(a)
multi:newAlarm(60):OnRing(
function(a)
ids[meh] = nil
a:Destroy()
end)
end
)
end
end
if job then
lastjob = os.clock()
local ID=table.remove(job,1) -- return and remove
local _name=table.remove(job,1) -- return and remove
local ID = table.remove(job, 1) -- return and remove
local _name = table.remove(job, 1) -- return and remove
if FUNCS[_name] then
JD:push({ID,FUNCS[_name](unpack(job))})
JD:push({ID, FUNCS[_name](unpack(job))})
else -- making use of that new holding feature
JD:push({ID,FUNCS:waitFor(_name)(unpack(job))})
JD:push({ID, FUNCS:waitFor(_name)(unpack(job))})
end
end
end)
multi:newLoop(function()
if os.clock()-lastjob>1 then
end
)
multi:newLoop(
function()
if os.clock() - lastjob > 1 then
sThread.sleep(.1)
end
end)
setmetatable(_G,{
__index=function(t,k)
end
)
setmetatable(
_G,
{
__index = function(t, k)
return FUNCS[k]
end
})
}
)
if not love then
multi:mainloop()
end
end,c.name)
end,
c.name
)
end
local clock = os.clock
multi:newThread("JQ-"..c.name.." Manager",function()
multi:newThread(
"JQ-" .. c.name .. " Manager",
function()
local _count = 0
while _count<c.numberofcores do
while _count < c.numberofcores do
thread.skip()
if queueCC:pop() then
_count = _count + 1
end
end
c.isReady = true
for i=1,#c.tempQueue do
for i = 1, #c.tempQueue do
queueIN:push(c.tempQueue[i])
end
c.tempQueue = nil
@ -534,34 +587,41 @@ function multi:newSystemThreadedJobQueue(a,b)
c.OnJobCompleted:Fire(unpack(dat))
end
end
end)
end
)
return c
end
function multi:newSystemThreadedExecute(cmd)
local c={}
local GLOBAL=multi.integration.GLOBAL -- set up locals incase we are using lanes
local sThread=multi.integration.THREAD -- set up locals incase we are using lanes
local name="Execute_Thread"..multi.randomString(16)
c.name=name
GLOBAL[name.."CMD"]=cmd
multi:newSystemThread(name,function()
local c = {}
local GLOBAL = multi.integration.GLOBAL -- set up locals incase we are using lanes
local sThread = multi.integration.THREAD -- set up locals incase we are using lanes
local name = "Execute_Thread" .. multi.randomString(16)
c.name = name
GLOBAL[name .. "CMD"] = cmd
multi:newSystemThread(
name,
function()
if love then -- lets make sure we don't reference upvalues if using love2d
GLOBAL=_G.GLOBAL
sThread=_G.sThread
name=__THREADNAME__ -- global data same as the name we used in this functions creation
GLOBAL = _G.GLOBAL
sThread = _G.sThread
name = __THREADNAME__ -- global data same as the name we used in this functions creation
end -- Lanes should take the local upvalues ^^^
cmd=sThread.waitFor(name.."CMD")
local ret=os.execute(cmd)
GLOBAL[name.."R"]=ret
end)
c.OnCMDFinished=multi:newConnection()
c.looper=multi:newLoop(function(self)
local ret=GLOBAL[self.link.name.."R"]
cmd = sThread.waitFor(name .. "CMD")
local ret = os.execute(cmd)
GLOBAL[name .. "R"] = ret
end
)
c.OnCMDFinished = multi:newConnection()
c.looper =
multi:newLoop(
function(self)
local ret = GLOBAL[self.link.name .. "R"]
if ret then
self.link.OnCMDFinished:Fire(ret)
self:Destroy()
end
end)
c.looper.link=c
end
)
c.looper.link = c
return c
end