diff --git a/changes.md b/changes.md index 8890441..e4878da 100644 --- a/changes.md +++ b/changes.md @@ -1,5 +1,38 @@ # Changes [TOC] +Update 14.0.0 Consistency and stability +------------- +Added: +- multi.init() -- Initlizes the library! Must be called for multiple files to have the same handle. Example below +- thread.holdFor(NUMBER sec, FUNCTION condition) -- Works like hold, but timesout when a certain amount of time has passed! +- thread.holdWithin(NUMBER; cycles,FUNCTION; condition) -- Holds until the condition is met! If the number of cycles passed is equal to cycles, hold will return a timeout error +**Note:** when hold has a timeout the first argument will return nil and the second atgument will be TIMEOUT, if not timed out hold will return the values from the conditions + +Fixed: +- Connections had a preformance issue where they would create a non function when using connection.getConnection() of a non existing label. +- An internal mismanagement of the treads scheduler was fixed. Now it should be quicker and free of bugs +- Thread error management is the integrations was not properly implemented. This is now fixed +- + +Changed: +- Ties in to the new function that has been added multi.init() +```lua +local multi, thread = require("multi").init() -- The require multi function still returns the multi object like before +``` +Note: Using init allows you to get access to the thread handle. This was done because thread was modifying the global space as well as multi. I wanted to not modify the global space anymore. +internally most of your code can stay the same, you only need to change how the library is required. I do toy a bit with the global space, buy I use a variable name that is invalid as a variable name. The variable name is $multi. This is used internally to keep some records and maintain a clean space + +Also when using intergrations things now look more consistant. +```lua +local multi, thread = require("multi").init() +local GLOBSL, THREAD = require("multi.integration.lanesManager").init() -- or whichever manager you are using +local nGLOBAL, nTHREAD = require("multi.intergration.networkManager).inti() +``` +Note: You can mix and match integrations together. You can create systemthreads within network threads, and you can also create cotoutine based threads within bothe network and system threads. This gives you quite a bit of flexibility to create something awesome. + +Going forward: +- Sterlization is still being worked on. I was planning of having a way to save state of multi objects and such, but that isn't possible without knowing how your code is strutured or if it is even made to handle something like that. So I decided on giving a tostirng/tofile method for each multi object as well as a fromstring/fromfile method for use. This is technically in the code already, but not documented. It has actually been in the code for a while, but its not done just yet and I want to make it perfect before sending it out. + Update 13.1.0 Bug fixes and features added ------------- Added: diff --git a/multi/compat/love2d.lua b/multi/compat/love2d.lua index 7048deb..8557c4c 100644 --- a/multi/compat/love2d.lua +++ b/multi/compat/love2d.lua @@ -21,12 +21,12 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] -local multi = require("multi") -os.sleep=love.timer.sleep -multi.drawF={} -function multi:onDraw(func,i) - i=i or 1 - table.insert(self.drawF,i,func) +local multi, thread = require("multi").init() +os.sleep = love.timer.sleep +multi.drawF = {} +function multi:onDraw(func, i) + i = i or 1 + table.insert(self.drawF, i, func) end multi.OnKeyPressed = multi:newConnection() multi.OnKeyReleased = multi:newConnection() @@ -38,39 +38,48 @@ multi.OnDraw = multi:newConnection() multi.OnTextInput = multi:newConnection() multi.OnUpdate = multi:newConnection() multi.OnQuit = multi:newConnection() -multi.OnPreLoad(function() - local function Hook(func,conn) - if love[func]~=nil then - love[func] = Library.convert(love[func]) - love[func]:inject(function(...) - conn:Fire(...) - return {...} - end,1) - elseif love[func]==nil then - love[func] = function(...) - conn:Fire(...) +multi.OnPreLoad( + function() + local function Hook(func, conn) + if love[func] ~= nil then + love[func] = Library.convert(love[func]) + love[func]:inject( + function(...) + conn:Fire(...) + return {...} + end, + 1 + ) + elseif love[func] == nil then + love[func] = function(...) + conn:Fire(...) + end end end + Hook("quit", multi.OnQuit) + Hook("keypressed", multi.OnKeyPressed) + Hook("keyreleased", multi.OnKeyReleased) + Hook("mousepressed", multi.OnMousePressed) + Hook("mousereleased", multi.OnMouseReleased) + Hook("wheelmoved", multi.OnMouseWheelMoved) + Hook("mousemoved", multi.OnMouseMoved) + Hook("draw", multi.OnDraw) + Hook("textinput", multi.OnTextInput) + Hook("update", multi.OnUpdate) + multi.OnDraw( + function() + for i = 1, #multi.drawF do + love.graphics.setColor(255, 255, 255, 255) + multi.drawF[i]() + end + end + ) end - Hook("quit",multi.OnQuit) - Hook("keypressed",multi.OnKeyPressed) - Hook("keyreleased",multi.OnKeyReleased) - Hook("mousepressed",multi.OnMousePressed) - Hook("mousereleased",multi.OnMouseReleased) - Hook("wheelmoved",multi.OnMouseWheelMoved) - Hook("mousemoved",multi.OnMouseMoved) - Hook("draw",multi.OnDraw) - Hook("textinput",multi.OnTextInput) - Hook("update",multi.OnUpdate) - multi.OnDraw(function() - for i=1,#multi.drawF do - love.graphics.setColor(255,255,255,255) - multi.drawF[i]() - end - end) -end) -multi.OnQuit(function() - multi.Stop() - love.event.quit() -end) +) +multi.OnQuit( + function() + multi.Stop() + love.event.quit() + end +) return multi diff --git a/multi/init.lua b/multi/init.lua index a7d2a0d..af4c7ac 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -25,6 +25,9 @@ local bin = pcall(require,"bin") local multi = {} local clock = os.clock local thread = {} +if not _G["$multi"] then + _G["$multi"] = {multi=multi,thread=thread} +end multi.Version = "13.1.0" multi._VERSION = "13.1.0" multi.stage = "stable" @@ -78,6 +81,9 @@ multi.PriorityTick=1 -- Between 1, 2 and 4 multi.Priority=multi.Priority_High multi.threshold=256 multi.threstimed=.001 +function multi.init() + return _G["$multi"].multi,_G["$multi"].thread +end function multi.queuefinal(self) self:Destroy() if self.Parent.Mainloop[#self.Parent.Mainloop] then @@ -758,6 +764,9 @@ function multi:newConnector() local c = {Type = "connector"} return c end +local CRef = { + Fire = function() end +} function multi:newConnection(protect,func) local c={} c.callback = func @@ -802,11 +811,9 @@ function multi:newConnection(protect,func) return self end c.FConnect=c.fConnect - function c:getConnection(name,ingore) - if ingore then - return self.connections[name] or { - Fire=function() return end -- if the connection doesn't exist lets call all of them or silently ignore - } + function c:getConnection(name,ignore) + if ignore then + return self.connections[name] or CRef else return self.connections[name] or self end @@ -1489,6 +1496,10 @@ function thread.hold(n) thread._Requests() return coroutine.yield({"_hold_",n or function() return true end}) end +function thread.holdFor(sec,n) + thread._Requests() + return coroutine.yield({"_holdF_", sec, n or function() return true end}) +end function thread.skip(n) thread._Requests() if not n then n = 1 elseif n<1 then n = 1 end @@ -1652,6 +1663,13 @@ function multi.initThreads() threads[i].task = "hold" threads[i].__ready = false ret = nil + elseif ret[1]=="_holdF_" then + threads[i].sec = ret[2] + threads[i].func = ret[3] + threads[i].task = "holdF" + threads[i].time = clock() + threads[i].__ready = false + ret = nil end end end @@ -1684,8 +1702,19 @@ function multi.initThreads() threads[i].task = "" threads[i].__ready = true end + elseif threads[i].task == "holdF" then + t0,t1,t2,t3,t4,t5,t6 = threads[i].func() + if t0 then + threads[i].task = "" + threads[i].__ready = true + elseif clock() - threads[i].time>=threads[i].sec then + threads[i].task = "" + threads[i].__ready = true + t0 = nil + t1 = "TIMEOUT" + end end - if threads[i].__ready then + if threads[i] and threads[i].__ready then threads[i].__ready = false _,ret=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6) end @@ -1724,6 +1753,13 @@ function multi:threadloop() threads[i].task = "hold" threads[i].__ready = false ret = nil + elseif ret[1]=="_holdF_" then + threads[i].sec = ret[2] + threads[i].func = ret[3] + threads[i].task = "holdF" + threads[i].time = clock() + threads[i].__ready = false + ret = nil end end end @@ -1753,6 +1789,17 @@ function multi:threadloop() threads[i].task = "" threads[i].__ready = true end + elseif threads[i].task == "holdF" then + t0,t1,t2,t3,t4,t5,t6 = threads[i].func() + if t0 then + threads[i].task = "" + threads[i].__ready = true + elseif clock() - threads[i].time>=threads[i].sec then + threads[i].task = "" + threads[i].__ready = true + t0 = nil + t1 = "TIMEOUT" + end end if threads[i].__ready then threads[i].__ready = false @@ -2550,7 +2597,4 @@ end function multi:setDefualtStateFlag(opt) -- end -if not(multi.Version == "13.2.0" or multi.Version == "14.0.0") then - _G.thread = thread -end -return multi, thread +return multi diff --git a/multi/integration/lanesManager.lua b/multi/integration/lanesManager.lua index e2db630..4c5845c 100644 --- a/multi/integration/lanesManager.lua +++ b/multi/integration/lanesManager.lua @@ -21,20 +21,26 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] -package.path="?/init.lua;?.lua;"..package.path +package.path = "?/init.lua;?.lua;" .. package.path +local multi, thread = require("multi").init() -- get it all and have it on all lanes +if multi.integration then -- This allows us to call the lanes manager from supporting modules without a hassel + return { + init = function() + return multi.integration.GLOBAL, multi.integration.THREAD + end + } +end function os.getOS() - if package.config:sub(1,1)=='\\' then - return 'windows' + if package.config:sub(1, 1) == "\\" then + return "windows" else - return 'unix' + return "unix" end end -- Step 1 get lanes -lanes=require("lanes").configure() -local multi, thread = require("multi") -- get it all and have it on all lanes +lanes = require("lanes").configure() multi.SystemThreads = {} -local thread = thread -multi.isMainThread=true +multi.isMainThread = true function multi:canSystemThread() return true end @@ -45,40 +51,45 @@ end local __GlobalLinda = lanes.linda() -- handles global stuff local __SleepingLinda = lanes.linda() -- handles sleeping stuff -- For convenience a GLOBAL table will be constructed to handle requests -local GLOBAL={} -setmetatable(GLOBAL,{ - __index=function(t,k) - return __GlobalLinda:get(k) - end, - __newindex=function(t,k,v) - __GlobalLinda:set(k,v) - end, -}) +local GLOBAL = {} +setmetatable( + GLOBAL, + { + __index = function(t, k) + return __GlobalLinda:get(k) + end, + __newindex = function(t, k, v) + __GlobalLinda:set(k, v) + end + } +) -- Step 3 rewrite the thread methods to use Lindas -local THREAD={} -function THREAD.set(name,val) - __GlobalLinda:set(name,val) +local THREAD = {} +function THREAD.set(name, val) + __GlobalLinda:set(name, val) end function THREAD.get(name) __GlobalLinda:get(name) end local function randomString(n) - local str = '' + local str = "" local strings = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','1','2','3','4','5','6','7','8','9','0','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'} - for i=1,n do - str = str..''..strings[math.random(1,#strings)] + for i = 1, n do + str = str .. "" .. strings[math.random(1, #strings)] end return str end function THREAD.waitFor(name) local function wait() math.randomseed(os.time()) - __SleepingLinda:receive(.001,randomString(12)) + __SleepingLinda:receive(.001, randomString(12)) end - repeat wait() until __GlobalLinda:get(name) + repeat + wait() + until __GlobalLinda:get(name) return __GlobalLinda:get(name) end -function THREAD.testFor(name,val,sym) +function THREAD.testFor(name, val, sym) -- end function THREAD.getCores() @@ -87,10 +98,10 @@ end function THREAD.getThreads() return GLOBAL.__THREADS__ end -if os.getOS()=="windows" then - THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS")) +if os.getOS() == "windows" then + THREAD.__CORES = tonumber(os.getenv("NUMBER_OF_PROCESSORS")) else - THREAD.__CORES=tonumber(io.popen("nproc --all"):read("*n")) + THREAD.__CORES = tonumber(io.popen("nproc --all"):read("*n")) end function THREAD.kill() -- trigger the lane destruction error("Thread was killed!") @@ -107,40 +118,42 @@ Idle wait keeps the CPU running better where busy wait wastes CPU cycles... Lane however, a linda recieve will in fact be a idle wait! So we use that and wrap it in a nice package]] function THREAD.sleep(n) math.randomseed(os.time()) - __SleepingLinda:receive(n,randomString(12)) + __SleepingLinda:receive(n, randomString(12)) end function THREAD.hold(n) local function wait() math.randomseed(os.time()) - __SleepingLinda:receive(.001,randomString(12)) + __SleepingLinda:receive(.001, randomString(12)) end - repeat wait() until n() + repeat + wait() + until n() end -local rand = math.random(1,10000000) +local rand = math.random(1, 10000000) -- Step 5 Basic Threads! local threads = {} local count = 1 local started = false local livingThreads = {} -function multi:newSystemThread(name,func,...) +function multi:newSystemThread(name, func, ...) multi.InitSystemThreadErrorHandler() - rand = math.random(1,10000000) - local c={} - local __self=c - c.name=name + rand = math.random(1, 10000000) + local c = {} + local __self = c + c.name = name c.Name = name c.Id = count - livingThreads[count] = {true,name} + livingThreads[count] = {true, name} local THREAD_ID = count count = count + 1 - c.Type="sthread" + c.Type = "sthread" c.creationTime = os.clock() c.alive = true - local THREAD_NAME=name + local THREAD_NAME = name local function func2(...) local multi = require("multi") - _G["THREAD_NAME"]=THREAD_NAME - _G["THREAD_ID"]=THREAD_ID + _G["THREAD_NAME"] = THREAD_NAME + _G["THREAD_ID"] = THREAD_ID math.randomseed(rand) func(...) if _G.__Needs_Multi then @@ -148,52 +161,61 @@ function multi:newSystemThread(name,func,...) end THREAD.kill() end - c.thread=lanes.gen("*", func2)(...) + c.thread = lanes.gen("*", func2)(...) function c:kill() self.thread:cancel() - multi.print("Thread: '"..self.name.."' has been stopped!") + multi.print("Thread: '" .. self.name .. "' has been stopped!") self.alive = false end - table.insert(multi.SystemThreads,c) + table.insert(multi.SystemThreads, c) c.OnError = multi:newConnection() - GLOBAL["__THREADS__"]=livingThreads - return c + GLOBAL["__THREADS__"] = livingThreads + return c end multi.OnSystemThreadDied = multi:newConnection() function multi.InitSystemThreadErrorHandler() - if started==true then return end + if started == true then + return + end started = true - multi:newThread("ThreadErrorHandler",function() - local threads = multi.SystemThreads - while true do - thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. - for i=#threads,1,-1 do - local v,err,t=threads[i].thread:join(.001) - if err then - if err:find("Thread was killed!") then - print(err) - livingThreads[threads[i].Id] = {false,threads[i].Name} - threads[i].alive = false - multi.OnSystemThreadDied:Fire(threads[i].Id) - GLOBAL["__THREADS__"]=livingThreads - table.remove(threads,i) - elseif err:find("stack traceback") then - print(err) - threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">") - threads[i].alive = false - livingThreads[threads[i].Id] = {false,threads[i].Name} - multi.OnSystemThreadDied:Fire(threads[i].Id) - GLOBAL["__THREADS__"]=livingThreads - table.remove(threads,i) + multi:newThread( + "ThreadErrorHandler", + function() + local threads = multi.SystemThreads + while true do + thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. + for i = #threads, 1, -1 do + local v, err, t = threads[i].thread:join(.001) + if err then + if err:find("Thread was killed!") then + print(err) + livingThreads[threads[i].Id] = {false, threads[i].Name} + threads[i].alive = false + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) + elseif err:find("stack traceback") then + print(err) + threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">") + threads[i].alive = false + livingThreads[threads[i].Id] = {false, threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) + end end end end end - end) + ) end multi.print("Integrated Lanes!") -multi.integration={} -- for module creators -multi.integration.GLOBAL=GLOBAL -multi.integration.THREAD=THREAD +multi.integration = {} -- for module creators +multi.integration.GLOBAL = GLOBAL +multi.integration.THREAD = THREAD require("multi.integration.shared") -return {init=function() return GLOBAL, THREAD end} +return { + init = function() + return GLOBAL, THREAD + end +} diff --git a/multi/integration/loveManager.lua b/multi/integration/loveManager.lua index 9de99f2..d43db36 100644 --- a/multi/integration/loveManager.lua +++ b/multi/integration/loveManager.lua @@ -22,15 +22,23 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] local multi = require("multi.compat.love2d") +if multi.integration then -- This allows us to call the lanes manager from supporting modules without a hassel + return { + init = function() + return multi.integration.GLOBAL, multi.integration.THREAD + end + } +end function multi:canSystemThread() return true end function multi:getPlatform() return "love2d" end -multi.integration={} -multi.integration.love2d={} -multi.integration.love2d.ThreadBase=[[ +multi.integration = {} +multi.integration.love2d = {} +multi.integration.love2d.ThreadBase = + [[ tab={...} __THREADID__=table.remove(tab,1) __THREADNAME__=table.remove(tab,1) @@ -198,27 +206,30 @@ updater:OnUpdate(__sync__) func=loadDump([=[INSERT_USER_CODE]=])(unpack(tab)) multi:mainloop() ]] -GLOBAL={} -- Allow main thread to interact with these objects as well +GLOBAL = {} -- Allow main thread to interact with these objects as well _G.THREAD_ID = 0 -__proxy__={} -setmetatable(GLOBAL,{ - __index=function(t,k) - return __proxy__[k] - end, - __newindex=function(t,k,v) - __proxy__[k]=v - for i=1,#__channels__ do - if type(v)=="userdata" then - __channels__[i]:push(v) - else - __channels__[i]:push("SYNC "..type(v).." "..k.." "..resolveData(v)) +__proxy__ = {} +setmetatable( + GLOBAL, + { + __index = function(t, k) + return __proxy__[k] + end, + __newindex = function(t, k, v) + __proxy__[k] = v + for i = 1, #__channels__ do + if type(v) == "userdata" then + __channels__[i]:push(v) + else + __channels__[i]:push("SYNC " .. type(v) .. " " .. k .. " " .. resolveData(v)) + end end end - end, -}) -THREAD={} -- Allow main thread to interact with these objects as well -multi.integration.love2d.mainChannel=love.thread.getChannel("__MainChan__") -isMainThread=true + } +) +THREAD = {} -- Allow main thread to interact with these objects as well +multi.integration.love2d.mainChannel = love.thread.getChannel("__MainChan__") +isMainThread = true multi.SystemThreads = {} function THREAD.getName() return __THREADNAME__ @@ -227,119 +238,125 @@ function THREAD.getID() return THREAD_ID end function ToStr(val, name, skipnewlines, depth) - skipnewlines = skipnewlines or false - depth = depth or 0 - local tmp = string.rep(" ", depth) - if name then + skipnewlines = skipnewlines or false + depth = depth or 0 + local tmp = string.rep(" ", depth) + if name then if type(name) == "string" then - tmp = tmp .. "[\""..name.."\"] = " + tmp = tmp .. '["' .. name .. '"] = ' else - tmp = tmp .. "["..(name or "").."] = " + tmp = tmp .. "[" .. (name or "") .. "] = " end end - if type(val) == "table" then - tmp = tmp .. "{" .. (not skipnewlines and " " or "") - for k, v in pairs(val) do - tmp = tmp .. ToStr(v, k, skipnewlines, depth + 1) .. "," .. (not skipnewlines and " " or "") - end - tmp = tmp .. string.rep(" ", depth) .. "}" - elseif type(val) == "number" then - tmp = tmp .. tostring(val) - elseif type(val) == "string" then - tmp = tmp .. string.format("%q", val) - elseif type(val) == "boolean" then - tmp = tmp .. (val and "true" or "false") + if type(val) == "table" then + tmp = tmp .. "{" .. (not skipnewlines and " " or "") + for k, v in pairs(val) do + tmp = tmp .. ToStr(v, k, skipnewlines, depth + 1) .. "," .. (not skipnewlines and " " or "") + end + tmp = tmp .. string.rep(" ", depth) .. "}" + elseif type(val) == "number" then + tmp = tmp .. tostring(val) + elseif type(val) == "string" then + tmp = tmp .. string.format("%q", val) + elseif type(val) == "boolean" then + tmp = tmp .. (val and "true" or "false") elseif type(val) == "function" then - tmp = tmp .. "loadDump([===["..dump(val).."]===])" - else - tmp = tmp .. "\"[inserializeable datatype:" .. type(val) .. "]\"" - end - return tmp + tmp = tmp .. "loadDump([===[" .. dump(val) .. "]===])" + else + tmp = tmp .. '"[inserializeable datatype:' .. type(val) .. ']"' + end + return tmp end -function resolveType(tp,d) - if tp=="number" then +function resolveType(tp, d) + if tp == "number" then return tonumber(d) - elseif tp=="bool" then - return (d=="true") - elseif tp=="function" then - return loadDump("[==["..d.."]==]") - elseif tp=="table" then - return loadstring("return "..d)() - elseif tp=="nil" then + elseif tp == "bool" then + return (d == "true") + elseif tp == "function" then + return loadDump("[==[" .. d .. "]==]") + elseif tp == "table" then + return loadstring("return " .. d)() + elseif tp == "nil" then return nil else return d end end function resolveData(v) - local data="" - if type(v)=="table" then + local data = "" + if type(v) == "table" then return ToStr(v) - elseif type(v)=="function" then + elseif type(v) == "function" then return dump(v) - elseif type(v)=="string" or type(v)=="number" or type(v)=="bool" or type(v)=="nil" then + elseif type(v) == "string" or type(v) == "number" or type(v) == "bool" or type(v) == "nil" then return tostring(v) end return data end function loadDump(d) - local s={} + local s = {} for p in d:gmatch("(%d-)\\") do - s[#s+1]=string.char(tonumber(p)) + s[#s + 1] = string.char(tonumber(p)) end return loadstring(table.concat(s)) end function dump(func) - local code,t={},string.dump(func) - for i=1,#t do - code[#code+1]=string.byte(t:sub(i,i)).."\\" + local code, t = {}, string.dump(func) + for i = 1, #t do + code[#code + 1] = string.byte(t:sub(i, i)) .. "\\" end return table.concat(code) end local function randomString(n) - local str = '' + local str = "" local strings = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','1','2','3','4','5','6','7','8','9','0','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'} - for i=1,n do - str = str..''..strings[math.random(1,#strings)] + for i = 1, n do + str = str .. "" .. strings[math.random(1, #strings)] end return str end local count = 1 local livingThreads = {} -function multi:newSystemThread(name,func,...) -- the main method +function multi:newSystemThread(name, func, ...) -- the main method multi.InitSystemThreadErrorHandler() - local c={} - c.name=name + local c = {} + c.name = name c.Name = name - c.Type="sthread" - c.ID=c.name.."" - c.Id=count + c.Type = "sthread" + c.ID = c.name .. "" + c.Id = count c.creationTime = os.clock() count = count + 1 - c.thread=love.thread.newThread(multi.integration.love2d.ThreadBase:gsub("INSERT_USER_CODE",dump(func))) - livingThreads[count] = {true,name} + c.thread = love.thread.newThread(multi.integration.love2d.ThreadBase:gsub("INSERT_USER_CODE", dump(func))) + livingThreads[count] = {true, name} livingThreads[c.thread] = c c.OnError = multi:newConnection() - c.thread:start(c.ID,c.name,THREAD_ID,...) + c.thread:start(c.ID, c.name, THREAD_ID, ...) function c:kill() - multi.integration.GLOBAL["__DIEPLZ"..self.ID.."__"]="__DIEPLZ"..self.ID.."__" + multi.integration.GLOBAL["__DIEPLZ" .. self.ID .. "__"] = "__DIEPLZ" .. self.ID .. "__" end return c end -function love.threaderror( thread, errorstr ) - multi.OnError:Fire(thread,errorstr) - livingThreads[thread].OnError:Fire(threads[i],err,"Error in systemThread: '"..livingThreads[thread].name.."' <"..errorstr..">") - multi.print("Error in systemThread: "..tostring(thread)..": "..errorstr) +function love.threaderror(thread, errorstr) + multi.OnError:Fire(thread, errorstr) + livingThreads[thread].OnError:Fire( + threads[i], + err, + "Error in systemThread: '" .. livingThreads[thread].name .. "' <" .. errorstr .. ">" + ) + multi.print("Error in systemThread: " .. tostring(thread) .. ": " .. errorstr) end -local THREAD={} -function THREAD.set(name,val) - GLOBAL[name]=val +local THREAD = {} +function THREAD.set(name, val) + GLOBAL[name] = val end function THREAD.get(name) return GLOBAL[name] end function THREAD.waitFor(name) - repeat multi:uManager() until GLOBAL[name] + repeat + multi:uManager() + until GLOBAL[name] return GLOBAL[name] end function THREAD.getCores() @@ -349,85 +366,95 @@ function THREAD.sleep(n) love.timer.sleep(n) end function THREAD.hold(n) - repeat multi:uManager() until n() + repeat + multi:uManager() + until n() end -__channels__={} -multi.integration.GLOBAL=GLOBAL -multi.integration.THREAD=THREAD -updater=multi:newLoop(function(self) - local data=multi.integration.love2d.mainChannel:pop() - while data do - if type(data)=="string" then - local cmd,tp,name,d=data:match("(%S-) (%S-) (%S-) (.+)") - if cmd=="SYNC" then - __proxy__[name]=resolveType(tp,d) - for i=1,#__channels__ do - -- send data to other threads - if type(v)=="userdata" then - __channels__[i]:push(v) - else - __channels__[i]:push("SYNC "..tp.." "..name.." "..d) - end - end - elseif cmd=="NEWTHREAD" then - __channels__[#__channels__+1]=love.thread.getChannel(tp) - for k,v in pairs(__proxy__) do -- sync the global with each new thread - if type(v)=="userdata" then - __channels__[#__channels__]:push(v) - else - __channels__[#__channels__]:push("SYNC "..type(v).." "..k.." "..resolveData(v)) +__channels__ = {} +multi.integration.GLOBAL = GLOBAL +multi.integration.THREAD = THREAD +updater = + multi:newLoop( + function(self) + local data = multi.integration.love2d.mainChannel:pop() + while data do + if type(data) == "string" then + local cmd, tp, name, d = data:match("(%S-) (%S-) (%S-) (.+)") + if cmd == "SYNC" then + __proxy__[name] = resolveType(tp, d) + for i = 1, #__channels__ do + -- send data to other threads + if type(v) == "userdata" then + __channels__[i]:push(v) + else + __channels__[i]:push("SYNC " .. tp .. " " .. name .. " " .. d) + end + end + elseif cmd == "NEWTHREAD" then + __channels__[#__channels__ + 1] = love.thread.getChannel(tp) + for k, v in pairs(__proxy__) do -- sync the global with each new thread + if type(v) == "userdata" then + __channels__[#__channels__]:push(v) + else + __channels__[#__channels__]:push("SYNC " .. type(v) .. " " .. k .. " " .. resolveData(v)) + end end end + else + __proxy__[name] = data end - else - __proxy__[name]=data + data = multi.integration.love2d.mainChannel:pop() end - data=multi.integration.love2d.mainChannel:pop() end -end) +) multi.OnSystemThreadDied = multi:newConnection() local started = false function multi.InitSystemThreadErrorHandler() - if started==true then return end + if started == true then + return + end started = true - multi:newThread("ThreadErrorHandler",function() - local threads = multi.SystemThreads - while true do - thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. - for i=#threads,1,-1 do - local v,err,t=threads[i].thread:join(.001) - if err then - if err:find("Thread was killed!") then - livingThreads[threads[i].Id] = {false,threads[i].Name} - multi.OnSystemThreadDied:Fire(threads[i].Id) - GLOBAL["__THREADS__"]=livingThreads - table.remove(threads,i) - else - threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">") - livingThreads[threads[i].Id] = {false,threads[i].Name} - multi.OnSystemThreadDied:Fire(threads[i].Id) - GLOBAL["__THREADS__"]=livingThreads - table.remove(threads,i) + multi:newThread( + "ThreadErrorHandler", + function() + local threads = multi.SystemThreads + while true do + thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. + for i = #threads, 1, -1 do + local v, err, t = threads[i].thread:join(.001) + if err then + if err:find("Thread was killed!") then + livingThreads[threads[i].Id] = {false, threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) + else + threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">") + livingThreads[threads[i].Id] = {false, threads[i].Name} + multi.OnSystemThreadDied:Fire(threads[i].Id) + GLOBAL["__THREADS__"] = livingThreads + table.remove(threads, i) + end end end end end - end) + ) end require("multi.integration.shared") multi.print("Integrated Love2d!") return { - init=function(t) + init = function(t) if t then if t.threadNamespace then - multi.integration.THREADNAME=t.threadNamespace - multi.integration.love2d.ThreadBase:gsub("sThread",t.threadNamespace) + multi.integration.THREADNAME = t.threadNamespace + multi.integration.love2d.ThreadBase:gsub("sThread", t.threadNamespace) end if t.globalNamespace then - multi.integration.GLOBALNAME=t.globalNamespace - multi.integration.love2d.ThreadBase:gsub("GLOBAL",t.globalNamespace) + multi.integration.GLOBALNAME = t.globalNamespace + multi.integration.love2d.ThreadBase:gsub("GLOBAL", t.globalNamespace) end end - return GLOBAL,THREAD + return GLOBAL, THREAD end } diff --git a/multi/integration/luvitManager.lua b/multi/integration/luvitManager.lua index 3a092b8..0882775 100644 --- a/multi/integration/luvitManager.lua +++ b/multi/integration/luvitManager.lua @@ -21,68 +21,133 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] - -- I DEMAND USAGE FOR LUVIT -- Cannot use discordia without my multitasking library (Which I love more that the luvit platform... then again i'm partial :P) -package.path="?/init.lua;?.lua;"..package.path -local function _INIT(luvitThread,timer) +package.path = "?/init.lua;?.lua;" .. package.path +local function _INIT(luvitThread, timer) -- lots of this stuff should be able to stay the same function os.getOS() - if package.config:sub(1,1)=='\\' then - return 'windows' + if package.config:sub(1, 1) == "\\" then + return "windows" else - return 'unix' + return "unix" end end -- Step 1 get setup threads on luvit... Sigh how do i even... - local multi, thread = require("multi") - isMainThread=true + local multi, thread = require("multi").init() + isMainThread = true function multi:canSystemThread() return true end function multi:getPlatform() return "luvit" end - local multi=multi + local multi = multi -- Step 2 set up the Global table... is this possible? - local GLOBAL={} - setmetatable(GLOBAL,{ - __index=function(t,k) - --print("No Global table when using luvit integration!") - return nil - end, - __newindex=function(t,k,v) - --print("No Global table when using luvit integration!") - end, - }) - local THREAD={} - function THREAD.set(name,val) + local GLOBAL = {} + setmetatable( + GLOBAL, + { + __index = function(t, k) + --print("No Global table when using luvit integration!") + return nil + end, + __newindex = function(t, k, v) + --print("No Global table when using luvit integration!") + end + } + ) + local THREAD = {} + function THREAD.set(name, val) --print("No Global table when using luvit integration!") end function THREAD.get(name) --print("No Global table when using luvit integration!") end local function randomString(n) - local str = '' - local strings = {'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','1','2','3','4','5','6','7','8','9','0','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'} - for i=1,n do - str = str..''..strings[math.random(1,#strings)] + local str = "" + local strings = { + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + "i", + "j", + "k", + "l", + "m", + "n", + "o", + "p", + "q", + "r", + "s", + "t", + "u", + "v", + "w", + "x", + "y", + "z", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + "0", + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "H", + "I", + "J", + "K", + "L", + "M", + "N", + "O", + "P", + "Q", + "R", + "S", + "T", + "U", + "V", + "W", + "X", + "Y", + "Z" + } + for i = 1, n do + str = str .. "" .. strings[math.random(1, #strings)] end return str end function THREAD.waitFor(name) --print("No Global table when using luvit integration!") end - function THREAD.testFor(name,val,sym) + function THREAD.testFor(name, val, sym) --print("No Global table when using luvit integration!") end function THREAD.getCores() return THREAD.__CORES end - if os.getOS()=="windows" then - THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS")) + if os.getOS() == "windows" then + THREAD.__CORES = tonumber(os.getenv("NUMBER_OF_PROCESSORS")) else - THREAD.__CORES=tonumber(io.popen("nproc --all"):read("*n")) + THREAD.__CORES = tonumber(io.popen("nproc --all"):read("*n")) end function THREAD.kill() -- trigger the thread destruction error("Thread was Killed!") @@ -95,34 +160,41 @@ local function _INIT(luvitThread,timer) --print("No Global table when using luvit integration!") end -- Step 5 Basic Threads! - local function entry(path,name,func,...) - local timer = require'timer' - local luvitThread = require'thread' - package.path=path + local function entry(path, name, func, ...) + local timer = require "timer" + local luvitThread = require "thread" + package.path = path loadstring(func)(...) end - function multi:newSystemThread(name,func,...) - local c={} - local __self=c - c.name=name - c.Type="sthread" - c.thread={} - c.func=string.dump(func) + function multi:newSystemThread(name, func, ...) + local c = {} + local __self = c + c.name = name + c.Type = "sthread" + c.thread = {} + c.func = string.dump(func) function c:kill() -- print("No Global table when using luvit integration!") end - luvitThread.start(entry,package.path,name,c.func,...) + luvitThread.start(entry, package.path, name, c.func, ...) return c end multi.print("Integrated Luvit!") - multi.integration={} -- for module creators - multi.integration.GLOBAL=GLOBAL - multi.integration.THREAD=THREAD + multi.integration = {} -- for module creators + multi.integration.GLOBAL = GLOBAL + multi.integration.THREAD = THREAD require("multi.integration.shared") -- Start the main mainloop... This allows you to process your multi objects, but the engine on the main thread will be limited to .001 or 1 millisecond sigh... - local interval = timer.setInterval(1, function () - multi:uManager() - end) + local interval = + timer.setInterval( + 1, + function() + multi:uManager() + end + ) return multi end -return {init=function(threadHandle,timerHandle) local multi = _INIT(threadHandle,timerHandle) return GLOBAL, THREAD end} +return {init = function(threadHandle, timerHandle) + local multi = _INIT(threadHandle, timerHandle) + return GLOBAL, THREAD + end} diff --git a/multi/integration/networkManager.lua b/multi/integration/networkManager.lua index 241612f..5d5786a 100644 --- a/multi/integration/networkManager.lua +++ b/multi/integration/networkManager.lua @@ -21,33 +21,33 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] -local multi, thread = require("multi") +local multi, thread = require("multi").init() local net = require("net") local bin = require("bin") bin.setBitsInterface(infinabits) -- the bits interface does not work so well, another bug to fix -- Commands that the master and node will respect, max of 256 commands -local CMD_ERROR = 0x00 -local CMD_PING = 0x01 -local CMD_PONG = 0x02 -local CMD_QUEUE = 0x03 -local CMD_TASK = 0x04 -local CMD_INITNODE = 0x05 -local CMD_INITMASTER = 0x06 -local CMD_GLOBAL = 0x07 -local CMD_LOAD = 0x08 -local CMD_CALL = 0x09 -local CMD_REG = 0x0A -local CMD_CONSOLE = 0x0B +local CMD_ERROR = 0x00 +local CMD_PING = 0x01 +local CMD_PONG = 0x02 +local CMD_QUEUE = 0x03 +local CMD_TASK = 0x04 +local CMD_INITNODE = 0x05 +local CMD_INITMASTER = 0x06 +local CMD_GLOBAL = 0x07 +local CMD_LOAD = 0x08 +local CMD_CALL = 0x09 +local CMD_REG = 0x0A +local CMD_CONSOLE = 0x0B local char = string.char local byte = string.byte -- Process to hold all of the networkManager's muilt objects -- Helper for piecing commands -local function pieceCommand(cmd,...) +local function pieceCommand(cmd, ...) local tab = {...} - table.insert(tab,1,cmd) + table.insert(tab, 1, cmd) return table.concat(tab) end @@ -56,17 +56,17 @@ local Queue = {} Queue.__index = Queue function Queue:newQueue() local c = {} - setmetatable(c,self) + setmetatable(c, self) return c end function Queue:push(data) - table.insert(self,data) + table.insert(self, data) end function Queue:raw_push(data) -- Internal usage only - table.insert(self,data) + table.insert(self, data) end function Queue:pop() - return table.remove(self,1) + return table.remove(self, 1) end function Queue:peek() return self[1] @@ -79,39 +79,39 @@ multi.OnGUpdate = multi:newConnection() local function packData(data) -- returns the data that was sterilized local dat = bin.new() - dat:addBlock(#type(data),1) + dat:addBlock(#type(data), 1) dat:addBlock(type(data)) -- The type is first - if type(data)=="table" then - dat:addBlock(data,nil,"t") + if type(data) == "table" then + dat:addBlock(data, nil, "t") elseif type(data) == "userdata" then error("Cannot sterilize userdata!") elseif type(data) == "number" then - dat:addBlock(data,nil,"d") + dat:addBlock(data, nil, "d") elseif type(data) == "string" then - dat:addBlock(#data,4) - dat:addBlock(data,nil,"s") + dat:addBlock(#data, 4) + dat:addBlock(data, nil, "s") elseif type(data) == "boolean" then - dat:addBlock(data,1) + dat:addBlock(data, 1) elseif type(data) == "function" then - dat:addBlock(data,nil,"f") + dat:addBlock(data, nil, "f") end return dat.data end local function resolveData(data) -- returns the data that was sterilized local dat = bin.new(data) - local tlen = dat:getBlock("n",1) - local tp = dat:getBlock("s",tlen) - if tp=="table" then + local tlen = dat:getBlock("n", 1) + local tp = dat:getBlock("s", tlen) + if tp == "table" then return dat:getBlock("t") - elseif tp=="number" then + elseif tp == "number" then return dat:getBlock("d") - elseif tp=="string" then - local num = dat:getBlock("n",4) - return dat:getBlock("s",num) - elseif tp=="boolean" then - return dat:getBlock("b",1) - elseif tp=="function" then + elseif tp == "string" then + local num = dat:getBlock("n", 4) + return dat:getBlock("s", num) + elseif tp == "boolean" then + return dat:getBlock("b", 1) + elseif tp == "function" then return dat:getBlock("f") end end @@ -119,16 +119,19 @@ end -- internal global system local GLOBAL = {} local PROXY = {} -setmetatable(GLOBAL,{ - __index = function(t,k) - return PROXY[k] - end, - __newindex = function(t,k,v) - local v = v - PROXY[k] = v - multi.OnGUpdate:Fire(k,packData(v)) - end -}) +setmetatable( + GLOBAL, + { + __index = function(t, k) + return PROXY[k] + end, + __newindex = function(t, k, v) + local v = v + PROXY[k] = v + multi.OnGUpdate:Fire(k, packData(v)) + end + } +) -- In case you are unable to use broadcasting this can be used to help connect to nodes function multi:nodeManager(port) @@ -140,39 +143,48 @@ function multi:nodeManager(port) server.timeouts = {} server.OnNodeAdded = multi:newConnection() server.OnNodeRemoved = multi:newConnection() - server.OnDataRecieved(function(server,data,cid,ip,port) - local cmd = data:sub(1,1) - if cmd == "R" then - multi:newThread("Node Client Manager",function(loop) - while true do - if server.timeouts[cid]==true then - server.OnNodeRemoved:Fire(server.nodes[cid]) - server.nodes[cid] = nil - server.timeouts[cid] = nil - thread.kill() - else - server.timeouts[cid] = true - server:send(cid,"ping") + server.OnDataRecieved( + function(server, data, cid, ip, port) + local cmd = data:sub(1, 1) + if cmd == "R" then + multi:newThread( + "Node Client Manager", + function(loop) + while true do + if server.timeouts[cid] == true then + server.OnNodeRemoved:Fire(server.nodes[cid]) + server.nodes[cid] = nil + server.timeouts[cid] = nil + thread.kill() + else + server.timeouts[cid] = true + server:send(cid, "ping") + end + thread.sleep(1) + end end - thread.sleep(1) + ) + server.nodes[cid] = data:sub(2, -1) + server.OnNodeAdded:Fire(server.nodes[cid]) + elseif cmd == "G" then + server.OnNodeAdded( + function(node) + server:send(cid, node) + end + ) + server.OnNodeRemoved( + function(node) + server:send(cid, "R" .. node:match("(.-)|")) + end + ) + for i, v in pairs(server.nodes) do + server:send(cid, v) end - end) - server.nodes[cid]=data:sub(2,-1) - server.OnNodeAdded:Fire(server.nodes[cid]) - elseif cmd == "G" then - server.OnNodeAdded(function(node) - server:send(cid,node) - end) - server.OnNodeRemoved(function(node) - server:send(cid,"R"..node:match("(.-)|")) - end) - for i,v in pairs(server.nodes) do - server:send(cid,v) + elseif cmd == "P" then + server.timeouts[cid] = nil end - elseif cmd == "P" then - server.timeouts[cid] = nil end - end) + ) end -- The main driving force of the network manager: Nodes function multi:newNode(settings) @@ -183,60 +195,69 @@ function multi:newNode(settings) local name = settings.name or multi.randomString(8) local node = {} node.name = name - multi.OnError(function(i,error) - node.OnError:Fire(node,error,node.server) - end) + multi.OnError( + function(i, error) + node.OnError:Fire(node, error, node.server) + end + ) node.server = net:newUDPServer(0) -- hosts the node using the default port _, node.port = node.server.udp:getsockname() node.connections = net.ClientCache node.queue = Queue:newQueue() - node.functions = bin.stream("RegisteredFunctions.dat",false) + node.functions = bin.stream("RegisteredFunctions.dat", false) node.hasFuncs = {} node.OnError = multi:newConnection() - node.OnError(function(node,err,master) - multi.print("ERROR",err,node.name) - local temp = bin.new() - temp:addBlock(#node.name,2) - temp:addBlock(node.name) - temp:addBlock(#err,2) - temp:addBlock(err) - for i,v in pairs(node.connections) do - multi.print(i) - v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3]) + node.OnError( + function(node, err, master) + multi.print("ERROR", err, node.name) + local temp = bin.new() + temp:addBlock(#node.name, 2) + temp:addBlock(node.name) + temp:addBlock(#err, 2) + temp:addBlock(err) + for i, v in pairs(node.connections) do + multi.print(i) + v[1]:send(v[2], char(CMD_ERROR) .. temp.data, v[3]) + end end - end) + ) if settings.managerDetails then - local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) + local c = net:newTCPClient(settings.managerDetails[1], settings.managerDetails[2]) if not c then - multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false + multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") + settings.noBroadCast = false else - c.OnDataRecieved(function(self,data) - if data == "ping" then - self:send("P") + c.OnDataRecieved( + function(self, data) + if data == "ping" then + self:send("P") + end end - end) - c:send("RNODE_"..name.."|"..net.getLocalIP().."|"..node.port) + ) + c:send("RNODE_" .. name .. "|" .. net.getLocalIP() .. "|" .. node.port) end end if not settings.preload then - if node.functions:getSize()~=0 then + if node.functions:getSize() ~= 0 then multi.print("We have function(s) to preload!") - local len = node.functions:getBlock("n",1) - local name,func + local len = node.functions:getBlock("n", 1) + local name, func while len do - name = node.functions:getBlock("s",len) - len = node.functions:getBlock("n",2) - func = node.functions:getBlock("s",len) + name = node.functions:getBlock("s", len) + len = node.functions:getBlock("n", 2) + func = node.functions:getBlock("s", len) len = node.functions:read(1) - _G[name]=resolveData(func) - node.hasFuncs[name]=true - if not len then break end + _G[name] = resolveData(func) + node.hasFuncs[name] = true + if not len then + break + end len = byte(len) end end end - function node:pushTo(name,data) - node:sendTo(name,char(CMD_QUEUE)..packData(data)) + function node:pushTo(name, data) + node:sendTo(name, char(CMD_QUEUE) .. packData(data)) end function node:peek() return node.queue:peek() @@ -248,89 +269,100 @@ function multi:newNode(settings) local c = {} local conn = node.connections function c.print(...) - local data = char(CMD_CONSOLE)..packData({...}) - for i,v in pairs(conn) do + local data = char(CMD_CONSOLE) .. packData({...}) + for i, v in pairs(conn) do --print(i) - v[1]:send(v[2],data,v[3]) + v[1]:send(v[2], data, v[3]) end end -- function c:printTo() - + -- end return c end - node.loadRate=1 + node.loadRate = 1 -- Lets tell the network we are alive! - node.server.OnDataRecieved(function(server,data,cid,ip,port) - local cmd = byte(data:sub(1,1)) -- the first byte is the command - local dat = data:sub(2,-1) -- the data that you want to read - if cmd == CMD_PING then - server:send(ip,char(CMD_PONG),port) - elseif cmd == CMD_QUEUE then - node.queue:push(resolveData(dat)) - elseif cmd == CMD_REG then - if not settings.allowRemoteRegistering then - multi.print(ip..": has attempted to register a function when it is currently not allowed!") - return + node.server.OnDataRecieved( + function(server, data, cid, ip, port) + local cmd = byte(data:sub(1, 1)) -- the first byte is the command + local dat = data:sub(2, -1) -- the data that you want to read + if cmd == CMD_PING then + server:send(ip, char(CMD_PONG), port) + elseif cmd == CMD_QUEUE then + node.queue:push(resolveData(dat)) + elseif cmd == CMD_REG then + if not settings.allowRemoteRegistering then + multi.print(ip .. ": has attempted to register a function when it is currently not allowed!") + return + end + local temp = bin.new(dat) + local len = temp:getBlock("n", 1) + local name = temp:getBlock("s", len) + if node.hasFuncs[name] then + multi.print("Function already preloaded onto the node!") + return + end + len = temp:getBlock("n", 2) + local func = temp:getBlock("s", len) + _G[name] = resolveData(func) + node.functions:addBlock(dat) + elseif cmd == CMD_CALL then + local temp = bin.new(dat) + local len = temp:getBlock("n", 1) + local name = temp:getBlock("s", len) + len = temp:getBlock("n", 4) + local args = temp:getBlock("s", len) + _G[name](unpack(resolveData(args))) + elseif cmd == CMD_TASK then + local holder = bin.new(dat) + local len = holder:getBlock("n", 4) + local args = holder:getBlock("s", len) + local len2 = holder:getBlock("n", 4) + local func = holder:getBlock("s", len2) + args = resolveData(args) + func = resolveData(func) + status, err = pcall(func, node, unpack(args)) + if not status then + node.OnError:Fire(node, err, server) + end + elseif cmd == CMD_INITNODE then + multi.print("Connected with another node!") + node.connections[dat] = {server, ip, port} + multi.OnGUpdate( + function(k, v) + server:send(ip, table.concat {char(CMD_GLOBAL), k, "|", v}, port) + end + ) + -- set this up + elseif cmd == CMD_INITMASTER then + multi.print("Connected to the master!", dat) + node.connections[dat] = {server, ip, port} + multi.OnGUpdate( + function(k, v) + server:send(ip, table.concat {char(CMD_GLOBAL), k, "|", v}, port) + end + ) + -- set this up + multi:newTLoop( + function() + server:send(ip, char(CMD_LOAD) .. node.name .. "|" .. multi:getLoad(), port) + end, + node.loadRate + ) + server:send(ip, char(CMD_LOAD) .. node.name .. "|" .. multi:getLoad(), port) + server:send(ip, char(CMD_INITNODE) .. node.name, port) + elseif cmd == CMD_GLOBAL then + local k, v = dat:match("(.-)|(.+)") + PROXY[k] = resolveData(v) end - local temp = bin.new(dat) - local len = temp:getBlock("n",1) - local name = temp:getBlock("s",len) - if node.hasFuncs[name] then - multi.print("Function already preloaded onto the node!") - return - end - len = temp:getBlock("n",2) - local func = temp:getBlock("s",len) - _G[name]=resolveData(func) - node.functions:addBlock(dat) - elseif cmd == CMD_CALL then - local temp = bin.new(dat) - local len = temp:getBlock("n",1) - local name = temp:getBlock("s",len) - len = temp:getBlock("n",4) - local args = temp:getBlock("s",len) - _G[name](unpack(resolveData(args))) - elseif cmd == CMD_TASK then - local holder = bin.new(dat) - local len = holder:getBlock("n",4) - local args = holder:getBlock("s",len) - local len2 = holder:getBlock("n",4) - local func = holder:getBlock("s",len2) - args = resolveData(args) - func = resolveData(func) - status, err = pcall(func,node,unpack(args)) - if not status then - node.OnError:Fire(node,err,server) - end - elseif cmd == CMD_INITNODE then - multi.print("Connected with another node!") - node.connections[dat]={server,ip,port} - multi.OnGUpdate(function(k,v) - server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) - end)-- set this up - elseif cmd == CMD_INITMASTER then - multi.print("Connected to the master!",dat) - node.connections[dat]={server,ip,port} - multi.OnGUpdate(function(k,v) - server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) - end)-- set this up - multi:newTLoop(function() - server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port) - end,node.loadRate) - server:send(ip,char(CMD_LOAD)..node.name.."|"..multi:getLoad(),port) - server:send(ip,char(CMD_INITNODE)..node.name,port) - elseif cmd == CMD_GLOBAL then - local k,v = dat:match("(.-)|(.+)") - PROXY[k]=resolveData(v) end - end) - function node:sendTo(name,data) + ) + function node:sendTo(name, data) local conn = node.connections[name] - conn[1]:send(conn[2],data,conn[3]) + conn[1]:send(conn[2], data, conn[3]) end if not settings.noBroadCast then - node.server:broadcast("NODE_"..name) + node.server:broadcast("NODE_" .. name) end return node end @@ -350,69 +382,76 @@ function multi:newMaster(settings) -- You will be able to have more than one mas master.connections = net.ClientCache -- Link to the client cache that is created on the net interface master.loads = {} master.timeouts = {} - master.trigger = multi:newFunction(function(self,node) - master.OnFirstNodeConnected:Fire(node) - self:Pause() - end) + master.trigger = + multi:newFunction( + function(self, node) + master.OnFirstNodeConnected:Fire(node) + self:Pause() + end + ) if settings.managerDetails then - local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) + local client = net:newTCPClient(settings.managerDetails[1], settings.managerDetails[2]) if not client then - multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false + multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") + settings.noBroadCast = false else - client.OnDataRecieved(function(client,data) - local cmd = data:sub(1,1) - if cmd == "N" then - print(data) - local name,ip,port = data:match("(.-)|(.-)|(.+)") - local c = net:newUDPClient(ip,port) - net.OnCastedClientInfo:Fire(c,name,ip,port)master.connections[name]=c - elseif cmd == "R" then - local name = data:sub(2,-1) - master.connections[name]=nil + client.OnDataRecieved( + function(client, data) + local cmd = data:sub(1, 1) + if cmd == "N" then + print(data) + local name, ip, port = data:match("(.-)|(.-)|(.+)") + local c = net:newUDPClient(ip, port) + net.OnCastedClientInfo:Fire(c, name, ip, port) + master.connections[name] = c + elseif cmd == "R" then + local name = data:sub(2, -1) + master.connections[name] = nil + end end - end) + ) client:send("G") -- init your connection as a master end end function master:doToAll(func) - for i,v in pairs(master.connections) do - func(i,v) + for i, v in pairs(master.connections) do + func(i, v) end end - function master:register(name,node,func) + function master:register(name, node, func) if not node then error("You must specify a node to execute a command on!") end local temp = bin.new() local fData = packData(func) - temp:addBlock(CMD_REG,1) - temp:addBlock(#name,1) - temp:addBlock(name,#name) - temp:addBlock(#fData,2) - temp:addBlock(fData,#fData) - master:sendTo(node,temp.data) + temp:addBlock(CMD_REG, 1) + temp:addBlock(#name, 1) + temp:addBlock(name, #name) + temp:addBlock(#fData, 2) + temp:addBlock(fData, #fData) + master:sendTo(node, temp.data) end - function master:execute(name,node,...) + function master:execute(name, node, ...) if not node then error("You must specify a node to execute a command on!") end if not name then error("You must specify a function name to call on the node!") end - local args = packData{...} + local args = packData {...} local name = name local node = node local temp, len, data temp = bin.new() - temp:addBlock(CMD_CALL,1) - temp:addBlock(#name,1) - temp:addBlock(name,#name) - temp:addBlock(#args,4) - temp:addBlock(args,#args) - master:sendTo(node,temp.data) + temp:addBlock(CMD_CALL, 1) + temp:addBlock(#name, 1) + temp:addBlock(name, #name) + temp:addBlock(#args, 4) + temp:addBlock(args, #args) + master:sendTo(node, temp.data) end - function master:pushTo(name,data) - master:sendTo(name,char(CMD_QUEUE)..packData(data)) + function master:pushTo(name, data) + master:sendTo(name, char(CMD_QUEUE) .. packData(data)) end function master:peek() return self.queue:peek() @@ -420,52 +459,68 @@ function multi:newMaster(settings) -- You will be able to have more than one mas function master:pop() return self.queue:pop() end - function master:newNetworkThread(tname,func,name,...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job + function master:newNetworkThread(tname, func, name, ...) -- If name specified then it will be sent to the specified node! Otherwise the least worked node will get the job local fData = packData(func) local tab = {...} local aData = "" - if #tab~=o then - aData = (packData{...}) + if #tab ~= o then + aData = (packData {...}) else - aData = (packData{"NO","ARGS"}) + aData = (packData {"NO", "ARGS"}) end local temp = bin.new() - temp:addBlock(#aData,4) + temp:addBlock(#aData, 4) local len = temp.data local temp2 = bin.new() - temp2:addBlock(#fData,4) + temp2:addBlock(#fData, 4) local len2 = temp2.data if not name then local name = self:getFreeNode() if not name then name = self:getRandomNode() end - if name==nil then - multi:newEvent(function() return name~=nil end):OnEvent(function(evnt) - self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) - evnt:Destroy() - end):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self) - self:Destroy() - end) + if name == nil then + multi:newEvent( + function() + return name ~= nil + end + ):OnEvent( + function(evnt) + self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData) + evnt:Destroy() + end + ):SetName("DelayedSendTask"):SetName("DelayedSendTask"):SetTime(8):OnTimedOut( + function(self) + self:Destroy() + end + ) else - self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) + self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData) end else - local name = "NODE_"..name - self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) + local name = "NODE_" .. name + self:sendTo(name, char(CMD_TASK) .. len .. aData .. len2 .. fData) end end - function master:sendTo(name,data) - if name:sub(1,5)~="NODE_" then - name = "NODE_"..name + function master:sendTo(name, data) + if name:sub(1, 5) ~= "NODE_" then + name = "NODE_" .. name end - if self.connections[name]==nil then - multi:newEvent(function() return self.connections[name]~=nil end):OnEvent(function(evnt) - self.connections[name]:send(data) - evnt:Destroy() - end):SetName("DelayedSendTask"):SetTime(8):OnTimedOut(function(self) - self:Destroy() - end) + if self.connections[name] == nil then + multi:newEvent( + function() + return self.connections[name] ~= nil + end + ):OnEvent( + function(evnt) + self.connections[name]:send(data) + evnt:Destroy() + end + ):SetName("DelayedSendTask"):SetTime(8):OnTimedOut( + function(self) + self:Destroy() + end + ) else self.connections[name]:send(data) end @@ -474,8 +529,8 @@ function multi:newMaster(settings) -- You will be able to have more than one mas local count = 0 local min = math.huge local refO - for i,v in pairs(master.loads) do - if v= 15 then - c.idle = nil - end - thread.skip() + multi.OnSystemThreadDied( + function(ThreadID) + for i = 1, #syncs do + connSync:push {"SYNCR", syncs[i], ThreadID} end - data = connSync:peek() - if data~= nil and data[1]=="INIT" then - connSync:pop() - c.idle = clock() - table.insert(syncs,data[2]) - for i=1,#syncs do - connSync:push{"SYNCA",syncs[i],data[2]} + cleanUp[ThreadID] = true + end + ) + multi:newThread( + c.name .. " Connection-Handler", + function() + local data + local clock = os.clock + local syncs = {} + while true do + if not c.idle then + thread.sleep(.5) + else + if clock() - c.idle >= 15 then + c.idle = nil + end + thread.skip() + end + data = connSync:peek() + if data ~= nil and data[1] == "INIT" then + connSync:pop() + c.idle = clock() + table.insert(syncs, data[2]) + for i = 1, #syncs do + connSync:push {"SYNCA", syncs[i], data[2]} + end + end + data = connFire:peek() + if data ~= nil and cleanUp[data[1]] then + local meh = data[1] + connFire:pop() -- lets remove dead thread stuff + multi:newAlarm(15):OnRing( + function(a) + cleanUp[meh] = nil + end + ) end end - data = connFire:peek() - if data~=nil and cleanUp[data[1]] then - local meh = data[1] - connFire:pop() -- lets remove dead thread stuff - multi:newAlarm(15):OnRing(function(a) - cleanUp[meh] = nil - end) - end end - end) - GLOBAL[c.name]=c + ) + GLOBAL[c.name] = c return c end function multi:SystemThreadedBenchmark(n) - n=n or 1 - local cores=multi.integration.THREAD.getCores() - local queue=multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init() - local sThread=multi.integration.THREAD - local GLOBAL=multi.integration.GLOBAL + n = n or 1 + local cores = multi.integration.THREAD.getCores() + local queue = multi:newSystemThreadedQueue("THREAD_BENCH_QUEUE"):init() + local sThread = multi.integration.THREAD + local GLOBAL = multi.integration.GLOBAL local c = {} - for i=1,cores do - multi:newSystemThread("STHREAD_BENCH",function(n) - local multi = require("multi") - if multi:getPlatform()=="love2d" then - GLOBAL=_G.GLOBAL - sThread=_G.sThread - end -- we cannot have upvalues... in love2d globals, not locals must be used - queue=sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread! - multi:benchMark(n):OnBench(function(self,count) - queue:push(count) - sThread.kill() - error("Thread was killed!") - end) - multi:mainloop() - end,n) + for i = 1, cores do + multi:newSystemThread( + "STHREAD_BENCH", + function(n) + local multi = require("multi") + if multi:getPlatform() == "love2d" then + GLOBAL = _G.GLOBAL + sThread = _G.sThread + end -- we cannot have upvalues... in love2d globals, not locals must be used + queue = sThread.waitFor("THREAD_BENCH_QUEUE"):init() -- always wait for when looking for a variable at the start of the thread! + multi:benchMark(n):OnBench( + function(self, count) + queue:push(count) + sThread.kill() + error("Thread was killed!") + end + ) + multi:mainloop() + end, + n + ) end - multi:newThread("THREAD_BENCH",function() - local count = 0 - local cc = 0 - while true do - thread.skip(1) - local dat = queue:pop() - if dat then - cc=cc+1 - count = count + dat - if cc == cores then - c.OnBench:Fire(count) - thread.kill() + multi:newThread( + "THREAD_BENCH", + function() + local count = 0 + local cc = 0 + while true do + thread.skip(1) + local dat = queue:pop() + if dat then + cc = cc + 1 + count = count + dat + if cc == cores then + c.OnBench:Fire(count) + thread.kill() + end end end end - end) + ) c.OnBench = multi:newConnection() return c end function multi:newSystemThreadedConsole(name) - local c={} + local c = {} c.name = name - local sThread=multi.integration.THREAD - local GLOBAL=multi.integration.GLOBAL + local sThread = multi.integration.THREAD + local GLOBAL = multi.integration.GLOBAL function c:init() _G.__Needs_Multi = true local multi = require("multi") - if multi:getPlatform()=="love2d" then - GLOBAL=_G.GLOBAL - sThread=_G.sThread + if multi:getPlatform() == "love2d" then + GLOBAL = _G.GLOBAL + sThread = _G.sThread end - local cc={} + local cc = {} if multi.isMainThread then if GLOBAL["__SYSTEM_CONSOLE__"] then cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init() else cc.stream = multi:newSystemThreadedQueue("__SYSTEM_CONSOLE__"):init() - multi:newLoop(function() - local data = cc.stream:pop() - if data then - local dat = table.remove(data,1) - if dat=="w" then - io.write(unpack(data)) - elseif dat=="p" then - print(unpack(data)) + multi:newLoop( + function() + local data = cc.stream:pop() + if data then + local dat = table.remove(data, 1) + if dat == "w" then + io.write(unpack(data)) + elseif dat == "p" then + print(unpack(data)) + end end end - end):setName("ST.consoleSyncer") + ):setName("ST.consoleSyncer") end else cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init() end function cc:write(msg) - self.stream:push({"w",tostring(msg)}) + self.stream:push({"w", tostring(msg)}) end function cc:print(...) local tab = {...} - for i=1,#tab do - tab[i]=tostring(tab[i]) + for i = 1, #tab do + tab[i] = tostring(tab[i]) end - self.stream:push({"p",unpack(tab)}) + self.stream:push({"p", unpack(tab)}) end return cc end - GLOBAL[c.name]=c + GLOBAL[c.name] = c return c end -- NEEDS WORK function multi:newSystemThreadedTable(name) - local c={} - c.name=name -- set the name this is important for identifying what is what - local sThread=multi.integration.THREAD - local GLOBAL=multi.integration.GLOBAL + local c = {} + c.name = name -- set the name this is important for identifying what is what + local sThread = multi.integration.THREAD + local GLOBAL = multi.integration.GLOBAL function c:init() -- create an init function so we can mimic on both love2d and lanes _G.__Needs_Multi = true local multi = require("multi") - if multi:getPlatform()=="love2d" then - GLOBAL=_G.GLOBAL - sThread=_G.sThread + if multi:getPlatform() == "love2d" then + GLOBAL = _G.GLOBAL + sThread = _G.sThread end - local cc={} - cc.tab={} + local cc = {} + cc.tab = {} if multi.isMainThread then - if not GLOBAL[self.name.."_Tabled_Connection"] then - cc.conn = multi:newSystemThreadedConnection(self.name.."_Tabled_Connection"):init() + if not GLOBAL[self.name .. "_Tabled_Connection"] then + cc.conn = multi:newSystemThreadedConnection(self.name .. "_Tabled_Connection"):init() end else - cc.conn = sThread.waitFor(self.name.."_Tabled_Connection"):init() + cc.conn = sThread.waitFor(self.name .. "_Tabled_Connection"):init() end function cc:waitFor(name) - repeat multi:uManager() until tab[name]~=nil + repeat + multi:uManager() + until tab[name] ~= nil return tab[name] end local link = cc - cc.conn(function(k,v) - link.tab[k]=v - end) - setmetatable(cc,{ - __index=function(t,k) - return t.tab[k] - end, - __newindex=function(t,k,v) - t.tab[k]=v - t.conn:Fire(k,v) + cc.conn( + function(k, v) + link.tab[k] = v end - }) + ) + setmetatable( + cc, + { + __index = function(t, k) + return t.tab[k] + end, + __newindex = function(t, k, v) + t.tab[k] = v + t.conn:Fire(k, v) + end + } + ) return cc end - GLOBAL[c.name]=c + GLOBAL[c.name] = c return c end local jobqueuecount = 0 local jqueues = {} -function multi:newSystemThreadedJobQueue(a,b) - jobqueuecount=jobqueuecount+1 - local GLOBAL=multi.integration.GLOBAL - local sThread=multi.integration.THREAD +function multi:newSystemThreadedJobQueue(a, b) + jobqueuecount = jobqueuecount + 1 + local GLOBAL = multi.integration.GLOBAL + local sThread = multi.integration.THREAD local c = {} c.numberofcores = 4 c.idle = nil - c.name = "SYSTEM_THREADED_JOBQUEUE_"..jobqueuecount + c.name = "SYSTEM_THREADED_JOBQUEUE_" .. jobqueuecount -- This is done to keep backwards compatibility for older code - if type(a)=="string" and not(b) then + if type(a) == "string" and not (b) then c.name = a - elseif type(a)=="number" and not (b) then + elseif type(a) == "number" and not (b) then c.numberofcores = a - elseif type(a)=="string" and type(b)=="number" then + elseif type(a) == "string" and type(b) == "number" then c.name = a c.numberofcores = b - elseif type(a)=="number" and type(b)=="string" then + elseif type(a) == "number" and type(b) == "string" then c.name = b c.numberofcores = a end if jqueues[c.name] then - error("A job queue by the name: "..c.name.." already exists!") + error("A job queue by the name: " .. c.name .. " already exists!") end jqueues[c.name] = true c.isReady = false - c.jobnum=1 + c.jobnum = 1 c.OnJobCompleted = multi:newConnection() - local queueIN = self:newSystemThreadedQueue("QUEUE_IN_"..c.name):init() - local queueCC = self:newSystemThreadedQueue("QUEUE_CC_"..c.name):init() - local queueREG = self:newSystemThreadedQueue("QUEUE_REG_"..c.name):init() - local queueJD = self:newSystemThreadedQueue("QUEUE_JD_"..c.name):init() - local queueDA = self:newSystemThreadedQueue("QUEUE_DA_"..c.name):init() + local queueIN = self:newSystemThreadedQueue("QUEUE_IN_" .. c.name):init() + local queueCC = self:newSystemThreadedQueue("QUEUE_CC_" .. c.name):init() + local queueREG = self:newSystemThreadedQueue("QUEUE_REG_" .. c.name):init() + local queueJD = self:newSystemThreadedQueue("QUEUE_JD_" .. c.name):init() + local queueDA = self:newSystemThreadedQueue("QUEUE_DA_" .. c.name):init() c.OnReady = multi:newConnection() - function c:registerJob(name,func) + function c:registerJob(name, func) for i = 1, self.numberofcores do - queueREG:push({name,func}) + queueREG:push({name, func}) end end c.tempQueue = {} - function c:pushJob(name,...) + function c:pushJob(name, ...) c.idle = os.clock() if not self.isReady then - table.insert(c.tempQueue,{self.jobnum,name,...}) - self.jobnum=self.jobnum+1 - return self.jobnum-1 + table.insert(c.tempQueue, {self.jobnum, name, ...}) + self.jobnum = self.jobnum + 1 + return self.jobnum - 1 else - queueIN:push{self.jobnum,name,...} - self.jobnum=self.jobnum+1 - return self.jobnum-1 + queueIN:push {self.jobnum, name, ...} + self.jobnum = self.jobnum + 1 + return self.jobnum - 1 end end function c:doToAll(func) local r = multi.randomString(12) for i = 1, self.numberofcores do - queueDA:push{r,func} + queueDA:push {r, func} end end - for i=1,c.numberofcores do - multi:newSystemThread(c.name.." Worker Thread #"..i,function(name) - local multi = require("multi") - if love then -- lets make sure we don't reference up-values if using love2d - GLOBAL=_G.GLOBAL - sThread=_G.sThread - end - local CC = sThread.waitFor("QUEUE_CC_"..name):init() - CC:push("ready") - local FUNCS={} - local ids = {} - local JQI = sThread.waitFor("QUEUE_IN_"..name):init() - local JD = sThread.waitFor("QUEUE_JD_"..name):init() - local REG = sThread.waitFor("QUEUE_REG_"..name):init() - local DA = sThread.waitFor("QUEUE_DA_"..name):init() - local lastjob = os.clock() - multi:newLoop(function() - local job=JQI:pop() - local rd=REG:peek() - local da=DA:peek() - if rd then - if not FUNCS[rd[1]] then - FUNCS[rd[1]]=rd[2] - rd=nil - REG:pop() + for i = 1, c.numberofcores do + multi:newSystemThread( + c.name .. " Worker Thread #" .. i, + function(name) + local multi = require("multi") + if love then -- lets make sure we don't reference up-values if using love2d + GLOBAL = _G.GLOBAL + sThread = _G.sThread + end + local CC = sThread.waitFor("QUEUE_CC_" .. name):init() + CC:push("ready") + local FUNCS = {} + local ids = {} + local JQI = sThread.waitFor("QUEUE_IN_" .. name):init() + local JD = sThread.waitFor("QUEUE_JD_" .. name):init() + local REG = sThread.waitFor("QUEUE_REG_" .. name):init() + local DA = sThread.waitFor("QUEUE_DA_" .. name):init() + local lastjob = os.clock() + multi:newLoop( + function() + local job = JQI:pop() + local rd = REG:peek() + local da = DA:peek() + if rd then + if not FUNCS[rd[1]] then + FUNCS[rd[1]] = rd[2] + rd = nil + REG:pop() + end + end + if da then + if not ids[da[1]] then + local meh = da[1] + ids[da[1]] = true + da[2](multi) + da = nil + DA:pop() + multi:newAlarm(60):OnRing( + function(a) + ids[meh] = nil + a:Destroy() + end + ) + end + end + if job then + lastjob = os.clock() + local ID = table.remove(job, 1) -- return and remove + local _name = table.remove(job, 1) -- return and remove + if FUNCS[_name] then + JD:push({ID, FUNCS[_name](unpack(job))}) + else -- making use of that new holding feature + JD:push({ID, FUNCS:waitFor(_name)(unpack(job))}) + end + end end - end - if da then - if not ids[da[1]] then - local meh = da[1] - ids[da[1]]=true - da[2](multi) - da=nil - DA:pop() - multi:newAlarm(60):OnRing(function(a) - ids[meh] = nil - a:Destroy() - end) + ) + multi:newLoop( + function() + if os.clock() - lastjob > 1 then + sThread.sleep(.1) + end end + ) + setmetatable( + _G, + { + __index = function(t, k) + return FUNCS[k] + end + } + ) + if not love then + multi:mainloop() end - if job then - lastjob = os.clock() - local ID=table.remove(job,1) -- return and remove - local _name=table.remove(job,1) -- return and remove - if FUNCS[_name] then - JD:push({ID,FUNCS[_name](unpack(job))}) - else -- making use of that new holding feature - JD:push({ID,FUNCS:waitFor(_name)(unpack(job))}) - end - end - end) - multi:newLoop(function() - if os.clock()-lastjob>1 then - sThread.sleep(.1) - end - end) - setmetatable(_G,{ - __index=function(t,k) - return FUNCS[k] - end - }) - if not love then - multi:mainloop() - end - end,c.name) + end, + c.name + ) end local clock = os.clock - multi:newThread("JQ-"..c.name.." Manager",function() - local _count = 0 - while _count= 15 then - c.idle = nil - end + multi:newThread( + "JQ-" .. c.name .. " Manager", + function() + local _count = 0 + while _count < c.numberofcores do thread.skip() + if queueCC:pop() then + _count = _count + 1 + end end - dat = queueJD:pop() - if dat then - c.idle = clock() - c.OnJobCompleted:Fire(unpack(dat)) + c.isReady = true + for i = 1, #c.tempQueue do + queueIN:push(c.tempQueue[i]) + end + c.tempQueue = nil + c.OnReady:Fire(c) + local dat + while true do + if not c.idle then + thread.sleep(.5) + else + if clock() - c.idle >= 15 then + c.idle = nil + end + thread.skip() + end + dat = queueJD:pop() + if dat then + c.idle = clock() + c.OnJobCompleted:Fire(unpack(dat)) + end end end - end) + ) return c end function multi:newSystemThreadedExecute(cmd) - local c={} - local GLOBAL=multi.integration.GLOBAL -- set up locals incase we are using lanes - local sThread=multi.integration.THREAD -- set up locals incase we are using lanes - local name="Execute_Thread"..multi.randomString(16) - c.name=name - GLOBAL[name.."CMD"]=cmd - multi:newSystemThread(name,function() - if love then -- lets make sure we don't reference upvalues if using love2d - GLOBAL=_G.GLOBAL - sThread=_G.sThread - name=__THREADNAME__ -- global data same as the name we used in this functions creation - end -- Lanes should take the local upvalues ^^^ - cmd=sThread.waitFor(name.."CMD") - local ret=os.execute(cmd) - GLOBAL[name.."R"]=ret - end) - c.OnCMDFinished=multi:newConnection() - c.looper=multi:newLoop(function(self) - local ret=GLOBAL[self.link.name.."R"] - if ret then - self.link.OnCMDFinished:Fire(ret) - self:Destroy() + local c = {} + local GLOBAL = multi.integration.GLOBAL -- set up locals incase we are using lanes + local sThread = multi.integration.THREAD -- set up locals incase we are using lanes + local name = "Execute_Thread" .. multi.randomString(16) + c.name = name + GLOBAL[name .. "CMD"] = cmd + multi:newSystemThread( + name, + function() + if love then -- lets make sure we don't reference upvalues if using love2d + GLOBAL = _G.GLOBAL + sThread = _G.sThread + name = __THREADNAME__ -- global data same as the name we used in this functions creation + end -- Lanes should take the local upvalues ^^^ + cmd = sThread.waitFor(name .. "CMD") + local ret = os.execute(cmd) + GLOBAL[name .. "R"] = ret end - end) - c.looper.link=c + ) + c.OnCMDFinished = multi:newConnection() + c.looper = + multi:newLoop( + function(self) + local ret = GLOBAL[self.link.name .. "R"] + if ret then + self.link.OnCMDFinished:Fire(ret) + self:Destroy() + end + end + ) + c.looper.link = c return c end