ST-Connections Fixed

Fixed system threaded connections
Also added some features to monitor threads
Each thread now has its own ID even the main thread which has an id of 0!
This commit is contained in:
Ryan Ward 2019-02-08 22:19:13 -05:00
parent 8f0f404c36
commit 4272397678
8 changed files with 296 additions and 146 deletions

View File

@ -50,7 +50,7 @@ These didn't have much use in their previous form, but with the addition of hype
Fixed: Fixed:
- There were some bugs in the networkmanager.lua file. Desrtoy -> Destroy some misspellings. - There were some bugs in the networkmanager.lua file. Desrtoy -> Destroy some misspellings.
- Massive object management bugs which caused performance to drop like a rock. Remember to Destroy objects when no longer using them. I should probably start working on a garbage collector for these objects! - Massive object management bugs which caused performance to drop like a rock.
- Found a bug with processors not having the Destroy() function implemented properly. - Found a bug with processors not having the Destroy() function implemented properly.
- Found an issue with the rockspec which is due to the networkManager additon. The net Library and the multi Library are now codependent if using that feature. Going forward you will have to now install the network library separately - Found an issue with the rockspec which is due to the networkManager additon. The net Library and the multi Library are now codependent if using that feature. Going forward you will have to now install the network library separately
- Insane proformance bug found in the networkManager file, where each connection to a node created a new thread (VERY BAD) If say you connected to 100s of threads, you would lose a lot of processing power due to a bad implementation of this feature. But it goes futhur than this, the net library also creates a new thread for each connection made, so times that initial 100 by about 3, you end up with a system that quickly eats itself. I have to do tons of rewriting of everything. Yet another setback for the 13.0.0 release - Insane proformance bug found in the networkManager file, where each connection to a node created a new thread (VERY BAD) If say you connected to 100s of threads, you would lose a lot of processing power due to a bad implementation of this feature. But it goes futhur than this, the net library also creates a new thread for each connection made, so times that initial 100 by about 3, you end up with a system that quickly eats itself. I have to do tons of rewriting of everything. Yet another setback for the 13.0.0 release
@ -58,16 +58,21 @@ Fixed:
- Fixed an issue with processors not properly destroying objects within them and not being destroyable themselves - Fixed an issue with processors not properly destroying objects within them and not being destroyable themselves
- Fixed a bug where pause and resume would duplicate objects! Not good - Fixed a bug where pause and resume would duplicate objects! Not good
- Noticed that the switching of lua states, corutine based threading, is slow when done often. Code your threads to have an idler of .5 seconds between sleep states. After doing this to a few built in threads I've seen a nice drop in performance. 68%-100% to 0%-40% when using the jobqueue. If you dont need the hold feature then use a multi object! Sleeping can be done in a multi object using timers and alarms. Though if your aim is speed timers are a bit faster than alarms, if you really want to boost speed then local clock = os.clock and use the clock function to do your timings yourself - Noticed that the switching of lua states, corutine based threading, is slow when done often. Code your threads to have an idler of .5 seconds between sleep states. After doing this to a few built in threads I've seen a nice drop in performance. 68%-100% to 0%-40% when using the jobqueue. If you dont need the hold feature then use a multi object! Sleeping can be done in a multi object using timers and alarms. Though if your aim is speed timers are a bit faster than alarms, if you really want to boost speed then local clock = os.clock and use the clock function to do your timings yourself
- multi:newSystemThreadedConnection(name,protect) -- I did it! It works and I believe all the gotchas are fixed as well.
-- Issue one, if a thread died that was connected to that connection all connections would stop since the queue would get clogged! FIXED
-- There is one thing, the connection does have some handshakes that need to be done before it functions as normal!
Added: Added:
- Documentation, the purpose of 13.0.0, orginally going to be 12.2.3, but due to the amount of bugs and features I added couldn't become that. I actually still did my tests in the 12.2.3 branch in github. - Documentation, the purpose of 13.0.0, orginally going to be 12.2.3, but due to the amount of bugs and features added it couldn't be a simple bug fix update.
- multi:newHyperThreadedProcess(STRING name) -- This is a version of the threaded process that gives each object created its own coroutine based thread which means you can use thread.* without affecting other objects created within the hyper threaded processes. Though, creating a self contained single thread is a better idea which when I eventually create the wiki page I'll discuss - multi:newHyperThreadedProcess(STRING name) -- This is a version of the threaded process that gives each object created its own coroutine based thread which means you can use thread.* without affecting other objects created within the hyper threaded processes. Though, creating a self contained single thread is a better idea which when I eventually create the wiki page I'll discuss
- multi:newConnector() -- A simple object that allows you to use the new connection Fire syntax without using a multi obj or the standard object format that I follow. - multi:newConnector() -- A simple object that allows you to use the new connection Fire syntax without using a multi obj or the standard object format that I follow.
- multi:purge() -- Removes all references to objects that are contained withing the processes list of tasks to do. Doing this will stop all objects from functioning. Calling Resume on an object should make it work again. - multi:purge() -- Removes all references to objects that are contained withing the processes list of tasks to do. Doing this will stop all objects from functioning. Calling Resume on an object should make it work again.
- multi:getTasksDetails(STRING format) -- Simple function, will get massive updates in the future, as of right now It will print out the current processes that are running; listing their type, uptime, and priority. More useful additions will be added in due time. Format can be either a string "s" or "t" see below for the table format - multi:getTasksDetails(STRING format) -- Simple function, will get massive updates in the future, as of right now It will print out the current processes that are running; listing their type, uptime, and priority. More useful additions will be added in due time. Format can be either a string "s" or "t" see below for the table format
- multi:endTask(TID) -- Use multi:getTasksDetails("t") to get the tid of a task - multi:endTask(TID) -- Use multi:getTasksDetails("t") to get the tid of a task
- multi:enableLoadDetection() -- Reworked how load detection works. It gives better values now, but it still needs some work before I am happy with it - multi:enableLoadDetection() -- Reworked how load detection works. It gives better values now, but it still needs some work before I am happy with it
- THREAD.getID() -- returns a unique ID for the current thread. This varaiable is visible to the main thread as well by accessing it through the returned thread object. OBJ.Id Do not confuse this with thread.* this refers to the system threading interface - THREAD.getID() -- returns a unique ID for the current thread. This varaiable is visible to the main thread as well by accessing it through the returned thread object. OBJ.Id Do not confuse this with thread.* this refers to the system threading interface. Each thread, including the main thread has a threadID the main thread has an ID of 0!
- multi.print(...) works like normal print, but only prints if the setting print is set to true
- setting: `print` enables multi.print() to work
```lua ```lua
package.path="?/init.lua;?.lua;"..package.path package.path="?/init.lua;?.lua;"..package.path

View File

@ -110,18 +110,6 @@ function table.merge(t1, t2)
end end
return t1 return t1
end end
_print=print
function print(...)
if not __SUPPRESSPRINTS then
_print(...)
end
end
_write=io.write
function io.write(...)
if not __SUPPRESSWRITES then
_write(...)
end
end
function multi:setThrestimed(n) function multi:setThrestimed(n)
self.deltaTarget=n or .1 self.deltaTarget=n or .1
end end
@ -251,7 +239,7 @@ function multi.executeFunction(name,...)
if type(_G[name])=='function' then if type(_G[name])=='function' then
_G[name](...) _G[name](...)
else else
print('Error: Not a function') multi.print('Error: Not a function')
end end
end end
function multi:getChildren() function multi:getChildren()
@ -283,7 +271,7 @@ function multi:benchMark(sec,p,pt)
local temp=self:newLoop(function(self,t) local temp=self:newLoop(function(self,t)
if t>sec then if t>sec then
if pt then if pt then
print(pt.." "..c.." Steps in "..sec.." second(s)!") multi.print(pt.." "..c.." Steps in "..sec.." second(s)!")
end end
self.tt(sec,c) self.tt(sec,c)
self:Destroy() self:Destroy()
@ -458,7 +446,7 @@ function multi:connectFinal(func)
elseif self.Type=='step' or self.Type=='tstep' then elseif self.Type=='step' or self.Type=='tstep' then
self:OnEnd(func) self:OnEnd(func)
else else
print("Warning!!! "..self.Type.." doesn't contain a Final Connection State! Use "..self.Type..":Break(func) to trigger it's final event!") multi.print("Warning!!! "..self.Type.." doesn't contain a Final Connection State! Use "..self.Type..":Break(func) to trigger it's final event!")
self:OnBreak(func) self:OnBreak(func)
end end
end end
@ -528,7 +516,7 @@ end
-- Timer stuff done -- Timer stuff done
function multi:Pause() function multi:Pause()
if self.Type=='mainprocess' then if self.Type=='mainprocess' then
print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()") multi.print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()")
else else
self.Active=false self.Active=false
local loop = self.Parent.Mainloop local loop = self.Parent.Mainloop
@ -795,7 +783,7 @@ function multi:newConnection(protect,func)
table.remove(temp,1) table.remove(temp,1)
table.insert(ret,temp) table.insert(ret,temp)
else else
print(temp[2]) multi.print(temp[2])
end end
else else
table.insert(ret,{self.func[i][1](...)}) table.insert(ret,{self.func[i][1](...)})
@ -1395,7 +1383,7 @@ end
function multi:newWatcher(namespace,name) function multi:newWatcher(namespace,name)
local function WatcherObj(ns,n) local function WatcherObj(ns,n)
if self.Type=='queue' then if self.Type=='queue' then
print("Cannot create a watcher on a queue! Creating on 'multi' instead!") multi.print("Cannot create a watcher on a queue! Creating on 'multi' instead!")
self=multi self=multi
end end
local c=self:newBase() local c=self:newBase()
@ -1423,7 +1411,7 @@ function multi:newWatcher(namespace,name)
elseif type(namespace)=='table' and (type(name)=='string' or 'number') then elseif type(namespace)=='table' and (type(name)=='string' or 'number') then
return WatcherObj(namespace,name) return WatcherObj(namespace,name)
else else
print('Warning, invalid arguments! Nothing returned!') multi.print('Warning, invalid arguments! Nothing returned!')
end end
end end
-- Threading stuff -- Threading stuff
@ -1485,6 +1473,11 @@ function thread.testFor(name,_val,sym)
end) end)
return thread.get(name) return thread.get(name)
end end
function multi.print(...)
if multi.defaultSettings.print then
print(...)
end
end
multi:setDomainName("Threads") multi:setDomainName("Threads")
multi:setDomainName("Globals") multi:setDomainName("Globals")
local initT = false local initT = false
@ -1571,7 +1564,7 @@ function multi.initThreads()
self.Parent.OnError:Fire(self.Threads[i],"Error in thread: <"..self.Threads[i].Name.."> "..ret) self.Parent.OnError:Fire(self.Threads[i],"Error in thread: <"..self.Threads[i].Name.."> "..ret)
end end
if ret==true or ret==false then if ret==true or ret==false then
print("Thread Ended!!!") multi.print("Thread Ended!!!")
ret={} ret={}
end end
end end
@ -2264,7 +2257,7 @@ function multi:ToString()
if self.Ingore then return end if self.Ingore then return end
local t=self.Type local t=self.Type
local data; local data;
print(t) multi.print(t)
if t:sub(-6)=="Thread" then if t:sub(-6)=="Thread" then
data={ data={
Type=t, Type=t,
@ -2341,7 +2334,7 @@ function multi:ToString()
set=self.set, set=self.set,
}) })
elseif t=="watcher" then elseif t=="watcher" then
print("Currently cannot sterilize a watcher object!") multi.print("Currently cannot sterilize a watcher object!")
-- needs testing -- needs testing
-- table.merge(data,{ -- table.merge(data,{
-- ns=self.ns, -- ns=self.ns,

View File

@ -41,10 +41,10 @@ end
function multi:getPlatform() function multi:getPlatform()
return "lanes" return "lanes"
end end
-- Step 2 set up the linda objects -- Step 2 set up the Linda objects
local __GlobalLinda = lanes.linda() -- handles global stuff local __GlobalLinda = lanes.linda() -- handles global stuff
local __SleepingLinda = lanes.linda() -- handles sleeping stuff local __SleepingLinda = lanes.linda() -- handles sleeping stuff
-- For convience a GLOBAL table will be constructed to handle requests -- For convenience a GLOBAL table will be constructed to handle requests
local GLOBAL={} local GLOBAL={}
setmetatable(GLOBAL,{ setmetatable(GLOBAL,{
__index=function(t,k) __index=function(t,k)
@ -54,7 +54,7 @@ setmetatable(GLOBAL,{
__GlobalLinda:set(k,v) __GlobalLinda:set(k,v)
end, end,
}) })
-- Step 3 rewrite the thread methods to use lindas -- Step 3 rewrite the thread methods to use Lindas
local THREAD={} local THREAD={}
function THREAD.set(name,val) function THREAD.set(name,val)
__GlobalLinda:set(name,val) __GlobalLinda:set(name,val)
@ -84,6 +84,9 @@ end
function THREAD.getCores() function THREAD.getCores()
return THREAD.__CORES return THREAD.__CORES
end end
function THREAD.getThreads()
return GLOBAL.__THREADS__
end
if os.getOS()=="windows" then if os.getOS()=="windows" then
THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS")) THREAD.__CORES=tonumber(os.getenv("NUMBER_OF_PROCESSORS"))
else else
@ -98,6 +101,7 @@ end
function THREAD.getID() function THREAD.getID()
return THREAD_ID return THREAD_ID
end end
_G.THREAD_ID = 0
--[[ Step 4 We need to get sleeping working to handle timing... We want idle wait, not busy wait --[[ Step 4 We need to get sleeping working to handle timing... We want idle wait, not busy wait
Idle wait keeps the CPU running better where busy wait wastes CPU cycles... Lanes does not have a sleep method Idle wait keeps the CPU running better where busy wait wastes CPU cycles... Lanes does not have a sleep method
however, a linda recieve will in fact be a idle wait! So we use that and wrap it in a nice package]] however, a linda recieve will in fact be a idle wait! So we use that and wrap it in a nice package]]
@ -114,9 +118,10 @@ function THREAD.hold(n)
end end
local rand = math.random(1,10000000) local rand = math.random(1,10000000)
-- Step 5 Basic Threads! -- Step 5 Basic Threads!
-- local threads = {} local threads = {}
local count = 0 local count = 1
local started = false local started = false
local livingThreads = {}
function multi:newSystemThread(name,func,...) function multi:newSystemThread(name,func,...)
multi.InitSystemThreadErrorHandler() multi.InitSystemThreadErrorHandler()
rand = math.random(1,10000000) rand = math.random(1,10000000)
@ -125,6 +130,7 @@ function multi:newSystemThread(name,func,...)
c.name=name c.name=name
c.Name = name c.Name = name
c.Id = count c.Id = count
livingThreads[count] = {true,name}
local THREAD_ID = count local THREAD_ID = count
count = count + 1 count = count + 1
c.Type="sthread" c.Type="sthread"
@ -143,19 +149,19 @@ function multi:newSystemThread(name,func,...)
end end
c.thread=lanes.gen("*", func2)(...) c.thread=lanes.gen("*", func2)(...)
function c:kill() function c:kill()
--self.status:Destroy()
self.thread:cancel() self.thread:cancel()
print("Thread: '"..self.name.."' has been stopped!") multi.print("Thread: '"..self.name.."' has been stopped!")
end end
table.insert(multi.SystemThreads,c) table.insert(multi.SystemThreads,c)
c.OnError = multi:newConnection() c.OnError = multi:newConnection()
GLOBAL["__THREADS__"]=livingThreads
return c return c
end end
multi.OnSystemThreadDied = multi:newConnection()
function multi.InitSystemThreadErrorHandler() function multi.InitSystemThreadErrorHandler()
if started==true then return end if started==true then return end
started = true started = true
multi:newThread("ThreadErrorHandler",function() multi:newThread("ThreadErrorHandler",function()
local deadThreads = {}
local threads = multi.SystemThreads local threads = multi.SystemThreads
while true do while true do
thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough. thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough.
@ -163,19 +169,23 @@ function multi.InitSystemThreadErrorHandler()
local v,err,t=threads[i].thread:join(.001) local v,err,t=threads[i].thread:join(.001)
if err then if err then
if err:find("Thread was killed!") then if err:find("Thread was killed!") then
livingThreads[threads[i].Id] = {false,threads[i].Name}
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"]=livingThreads
table.remove(threads,i) table.remove(threads,i)
else else
threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">") threads[i].OnError:Fire(threads[i],err,"Error in systemThread: '"..threads[i].name.."' <"..err..">")
livingThreads[threads[i].Id] = {false,threads[i].Name}
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"]=livingThreads
table.remove(threads,i) table.remove(threads,i)
table.insert(deadThreads,threads[i].Id)
GLOBAL["__DEAD_THREADS__"]=deadThreads
end end
end end
end end
end end
end) end)
end end
print("Integrated Lanes!") multi.print("Integrated Lanes!")
multi.integration={} -- for module creators multi.integration={} -- for module creators
multi.integration.GLOBAL=GLOBAL multi.integration.GLOBAL=GLOBAL
multi.integration.THREAD=THREAD multi.integration.THREAD=THREAD

View File

@ -316,7 +316,7 @@ function multi:newSystemThread(name,func,...) -- the main method
end end
function love.threaderror( thread, errorstr ) function love.threaderror( thread, errorstr )
multi.OnError:Fire(thread,errorstr) multi.OnError:Fire(thread,errorstr)
print("Error in systemThread: "..tostring(thread)..": "..errorstr) multi.print("Error in systemThread: "..tostring(thread)..": "..errorstr)
end end
local THREAD={} local THREAD={}
function THREAD.set(name,val) function THREAD.set(name,val)
@ -374,7 +374,7 @@ updater:OnUpdate(function(self)
end end
end) end)
require("multi.integration.shared") require("multi.integration.shared")
print("Integrated Love2d!") multi.print("Integrated Love2d!")
return { return {
init=function(t) init=function(t)
if t then if t then

View File

@ -114,7 +114,7 @@ local function _INIT(luvitThread,timer)
luvitThread.start(entry,package.path,name,c.func,...) luvitThread.start(entry,package.path,name,c.func,...)
return c return c
end end
print("Integrated Luvit!") multi.print("Integrated Luvit!")
multi.integration={} -- for module creators multi.integration={} -- for module creators
multi.integration.GLOBAL=GLOBAL multi.integration.GLOBAL=GLOBAL
multi.integration.THREAD=THREAD multi.integration.THREAD=THREAD

View File

@ -194,21 +194,21 @@ function multi:newNode(settings)
node.hasFuncs = {} node.hasFuncs = {}
node.OnError = multi:newConnection() node.OnError = multi:newConnection()
node.OnError(function(node,err,master) node.OnError(function(node,err,master)
print("ERROR",err,node.name) multi.print("ERROR",err,node.name)
local temp = bin.new() local temp = bin.new()
temp:addBlock(#node.name,2) temp:addBlock(#node.name,2)
temp:addBlock(node.name) temp:addBlock(node.name)
temp:addBlock(#err,2) temp:addBlock(#err,2)
temp:addBlock(err) temp:addBlock(err)
for i,v in pairs(node.connections) do for i,v in pairs(node.connections) do
print(i) multi.print(i)
v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3]) v[1]:send(v[2],char(CMD_ERROR)..temp.data,v[3])
end end
end) end)
if settings.managerDetails then if settings.managerDetails then
local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) local c = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
if not c then if not c then
print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false multi.print("Cannot connect to the node manager! Ensuring broadcast is enabled!") settings.noBroadCast = false
else else
c.OnDataRecieved(function(self,data) c.OnDataRecieved(function(self,data)
if data == "ping" then if data == "ping" then
@ -220,7 +220,7 @@ function multi:newNode(settings)
end end
if not settings.preload then if not settings.preload then
if node.functions:getSize()~=0 then if node.functions:getSize()~=0 then
print("We have function(s) to preload!") multi.print("We have function(s) to preload!")
local len = node.functions:getBlock("n",1) local len = node.functions:getBlock("n",1)
local name,func local name,func
while len do while len do
@ -270,14 +270,14 @@ function multi:newNode(settings)
node.queue:push(resolveData(dat)) node.queue:push(resolveData(dat))
elseif cmd == CMD_REG then elseif cmd == CMD_REG then
if not settings.allowRemoteRegistering then if not settings.allowRemoteRegistering then
print(ip..": has attempted to register a function when it is currently not allowed!") multi.print(ip..": has attempted to register a function when it is currently not allowed!")
return return
end end
local temp = bin.new(dat) local temp = bin.new(dat)
local len = temp:getBlock("n",1) local len = temp:getBlock("n",1)
local name = temp:getBlock("s",len) local name = temp:getBlock("s",len)
if node.hasFuncs[name] then if node.hasFuncs[name] then
print("Function already preloaded onto the node!") multi.print("Function already preloaded onto the node!")
return return
end end
len = temp:getBlock("n",2) len = temp:getBlock("n",2)
@ -304,13 +304,13 @@ function multi:newNode(settings)
node.OnError:Fire(node,err,server) node.OnError:Fire(node,err,server)
end end
elseif cmd == CMD_INITNODE then elseif cmd == CMD_INITNODE then
print("Connected with another node!") multi.print("Connected with another node!")
node.connections[dat]={server,ip,port} node.connections[dat]={server,ip,port}
multi.OnGUpdate(function(k,v) multi.OnGUpdate(function(k,v)
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
end)-- set this up end)-- set this up
elseif cmd == CMD_INITMASTER then elseif cmd == CMD_INITMASTER then
print("Connected to the master!",dat) multi.print("Connected to the master!",dat)
node.connections[dat]={server,ip,port} node.connections[dat]={server,ip,port}
multi.OnGUpdate(function(k,v) multi.OnGUpdate(function(k,v)
server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port) server:send(ip,table.concat{char(CMD_GLOBAL),k,"|",v},port)
@ -357,7 +357,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
if settings.managerDetails then if settings.managerDetails then
local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2]) local client = net:newTCPClient(settings.managerDetails[1],settings.managerDetails[2])
if not client then if not client then
print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false multi.print("Warning: Cannot connect to the node manager! Ensuring broadcast listening is enabled!") settings.noBroadCast = false
else else
client.OnDataRecieved(function(client,data) client.OnDataRecieved(function(client,data)
local cmd = data:sub(1,1) local cmd = data:sub(1,1)
@ -550,7 +550,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
return master return master
end end
-- The init function that gets returned -- The init function that gets returned
print("Integrated Network Parallelism") multi.print("Integrated Network Parallelism")
return {init = function() return {init = function()
return GLOBAL return GLOBAL
end} end}

View File

@ -113,88 +113,88 @@ function multi:newSystemThreadedQueue(name) -- in love2d this will spawn a chann
return c return c
end end
-- NEEDS WORK -- NEEDS WORK
function multi:newSystemThreadedConnection(name,protect) -- function multi:newSystemThreadedConnection(name,protect)
local c={} -- local c={}
local sThread=multi.integration.THREAD -- local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL -- local GLOBAL=multi.integration.GLOBAL
c.name = name or error("You must supply a name for this object!") -- c.name = name or error("You must supply a name for this object!")
c.protect = protect or false -- c.protect = protect or false
c.count = 0 -- c.count = 0
multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init() -- multi:newSystemThreadedQueue(name.."THREADED_CALLFIRE"):init()
local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init() -- local qsm = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNCM"):init()
local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init() -- local qs = multi:newSystemThreadedQueue(name.."THREADED_CALLSYNC"):init()
function c:init() -- function c:init()
_G.__Needs_Multi = true -- _G.__Needs_Multi = true
local multi = require("multi") -- local multi = require("multi")
if multi:getPlatform()=="love2d" then -- if multi:getPlatform()=="love2d" then
GLOBAL=_G.GLOBAL -- GLOBAL=_G.GLOBAL
sThread=_G.sThread -- sThread=_G.sThread
end -- end
local conns = 0 -- local conns = 0
local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init() -- local qF = sThread.waitFor(self.name.."THREADED_CALLFIRE"):init()
local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init() -- local qSM = sThread.waitFor(self.name.."THREADED_CALLSYNCM"):init()
local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init() -- local qS = sThread.waitFor(self.name.."THREADED_CALLSYNC"):init()
qSM:push("OK") -- qSM:push("OK")
local conn = {} -- local conn = {}
conn.obj = multi:newConnection(self.protect) -- conn.obj = multi:newConnection(self.protect)
setmetatable(conn,{__call=function(self,...) return self:connect(...) end}) -- setmetatable(conn,{__call=function(self,...) return self:connect(...) end})
function conn:connect(func) -- function conn:connect(func)
return self.obj(func) -- return self.obj(func)
end -- end
function conn:holdUT(n) -- function conn:holdUT(n)
self.obj:holdUT(n) -- self.obj:holdUT(n)
end -- end
function conn:Remove() -- function conn:Remove()
self.obj:Remove() -- self.obj:Remove()
end -- end
function conn:Fire(...) -- function conn:Fire(...)
local args = {multi.randomString(8),...} -- local args = {multi.randomString(8),...}
for i = 1, conns do -- for i = 1, conns do
qF:push(args) -- qF:push(args)
end -- end
end -- end
local lastID = "" -- local lastID = ""
local lastCount = 0 -- local lastCount = 0
multi:newThread("syncer",function() -- multi:newThread("syncer",function()
while true do -- while true do
thread.skip(1) -- thread.skip(1)
local fire = qF:peek() -- local fire = qF:peek()
local count = qS:peek() -- local count = qS:peek()
if fire and fire[1]~=lastID then -- if fire and fire[1]~=lastID then
lastID = fire[1] -- lastID = fire[1]
qF:pop() -- qF:pop()
table.remove(fire,1) -- table.remove(fire,1)
conn.obj:Fire(unpack(fire)) -- conn.obj:Fire(unpack(fire))
end -- end
if count and count[1]~=lastCount then -- if count and count[1]~=lastCount then
conns = count[2] -- conns = count[2]
lastCount = count[1] -- lastCount = count[1]
qs:pop() -- qs:pop()
end -- end
end -- end
end) -- end)
return conn -- return conn
end -- end
multi:newThread("connSync",function() -- multi:newThread("connSync",function()
while true do -- while true do
thread.skip(1) -- thread.skip(1)
local syncIN = qsm:pop() -- local syncIN = qsm:pop()
if syncIN then -- if syncIN then
if syncIN=="OK" then -- if syncIN=="OK" then
c.count = c.count + 1 -- c.count = c.count + 1
else -- else
c.count = c.count - 1 -- c.count = c.count - 1
end -- end
local rand = math.random(1,1000000) -- local rand = math.random(1,1000000)
for i = 1, c.count do -- for i = 1, c.count do
qs:push({rand,c.count}) -- qs:push({rand,c.count})
end -- end
end -- end
end -- end
end) -- end)
GLOBAL[name]=c -- GLOBAL[name]=c
return c -- return c
end -- end
function multi:SystemThreadedBenchmark(n) function multi:SystemThreadedBenchmark(n)
n=n or 1 n=n or 1
local cores=multi.integration.THREAD.getCores() local cores=multi.integration.THREAD.getCores()
@ -265,7 +265,7 @@ function multi:newSystemThreadedConsole(name)
print(unpack(data)) print(unpack(data))
end end
end end
end) end):setName("ST.consoleSyncer")
end end
else else
cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init() cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init()
@ -330,6 +330,7 @@ function multi:newSystemThreadedTable(name)
return c return c
end end
local jobqueuecount = 0 local jobqueuecount = 0
local jqueues = {}
function multi:newSystemThreadedJobQueue(a,b) function multi:newSystemThreadedJobQueue(a,b)
jobqueuecount=jobqueuecount+1 jobqueuecount=jobqueuecount+1
local GLOBAL=multi.integration.GLOBAL local GLOBAL=multi.integration.GLOBAL
@ -350,6 +351,10 @@ function multi:newSystemThreadedJobQueue(a,b)
c.name = b c.name = b
c.numberofcores = a c.numberofcores = a
end end
if jqueues[c.name] then
error("A job queue by the name: "..c.name.." already exists!")
end
jqueues[c.name] = true
c.isReady = false c.isReady = false
c.jobnum=1 c.jobnum=1
c.OnJobCompleted = multi:newConnection() c.OnJobCompleted = multi:newConnection()
@ -378,8 +383,9 @@ function multi:newSystemThreadedJobQueue(a,b)
end end
end end
function c:doToAll(func) function c:doToAll(func)
local r = multi.randomString(12)
for i = 1, self.numberofcores do for i = 1, self.numberofcores do
queueDA:push{multi.randomString(12),func} queueDA:push{r,func}
end end
end end
for i=1,c.numberofcores do for i=1,c.numberofcores do
@ -450,7 +456,6 @@ function multi:newSystemThreadedJobQueue(a,b)
end end
local clock = os.clock local clock = os.clock
multi:newThread("JQ-"..c.name.." Manager",function() multi:newThread("JQ-"..c.name.." Manager",function()
print("thread started")
local _count = 0 local _count = 0
while _count<c.numberofcores do while _count<c.numberofcores do
thread.skip() thread.skip()
@ -472,13 +477,13 @@ function multi:newSystemThreadedJobQueue(a,b)
if clock() - c.idle >= 15 then if clock() - c.idle >= 15 then
c.idle = nil c.idle = nil
end end
thread.skip()
end end
dat = queueJD:pop() dat = queueJD:pop()
if dat then if dat then
c.idle = clock() c.idle = clock()
c.OnJobCompleted:Fire(unpack(dat)) c.OnJobCompleted:Fire(unpack(dat))
end end
thread.skip()
end end
end) end)
return c return c

159
test.lua
View File

@ -31,18 +31,155 @@ end)
multi.OnError(function(...) multi.OnError(function(...)
print(...) print(...)
end) end)
multi:newSystemThreadedConsole("console"):init()
jQueue = multi:newSystemThreadedJobQueue("MainJobQueue")
jQueue.OnReady:holdUT() local conncount = 0
jQueue:doToAll(function() function multi:newSystemThreadedConnection(name,protect)
console = THREAD.waitFor("console"):init() conncount = conncount + 1
local c={}
c.name = name or error("You must provide a name for the connection object!")
c.protect = protect or false
c.idle = nil
local sThread=multi.integration.THREAD
local GLOBAL=multi.integration.GLOBAL
local connSync = multi:newSystemThreadedQueue(c.name.."_CONN_SYNC")
local connFire = multi:newSystemThreadedQueue(c.name.."_CONN_FIRE")
function c:init()
local multi = require("multi")
if love then -- lets make sure we don't reference up-values if using love2d
GLOBAL=_G.GLOBAL
sThread=_G.sThread
end
local conn = {}
conn.obj = multi:newConnection()
setmetatable(conn,{
__call=function(self,...)
return self:connect(...)
end
})
local ID = sThread.getID()
local sync = sThread.waitFor(self.name.."_CONN_SYNC"):init()
local fire = sThread.waitFor(self.name.."_CONN_FIRE"):init()
local connections = {}
if not multi.isMainThread then
connections = {0}
end
sync:push{"INIT",ID} -- Register this as an active connection!
function conn:connect(func)
return self.obj(func)
end
function conn:holdUT(n)
self.obj:holdUT(n)
end
function conn:Remove()
self.obj:Remove()
end
function conn:Fire(...)
for i = 1,#connections do
fire:push{connections[i],ID,{...}}
end
end
-- FIRE {TO,FROM,{ARGS}}
local data
multi:newLoop(function()
data = fire:peek()
if type(data)=="table" and data[1]==ID then
if data[2]==ID and conn.IgnoreSelf then
fire:pop()
return
end
fire:pop()
conn.obj:Fire(unpack(data[3]))
end
-- We need to hangle syncs here as well
data = sync:peek()
if data~=nil and data[1]=="SYNCA" and data[2]==ID then
sync:pop()
table.insert(connections,data[3])
end
if type(data)=="table" and data[1]=="SYNCR" and data[2]==ID then
sync:pop()
for i=1,#connections do
if connections[i] == data[3] then
table.remove(connections,i)
end
end
end
end)
return conn
end
local cleanUp = {}
multi.OnSystemThreadDied(function(ThreadID)
for i=1,#syncs do
connSync:push{"SYNCR",syncs[i],ThreadID}
end
cleanUp[ThreadID] = true
end)
multi:newThread(c.name.." Connection Handler",function()
local data
local clock = os.clock
local syncs = {}
while true do
if not c.idle then
thread.sleep(.1)
else
if clock() - c.idle >= 15 then
c.idle = nil
end
thread.skip()
end
data = connSync:peek()
if data~= nil and data[1]=="INIT" then
connSync:pop()
c.idle = clock()
table.insert(syncs,data[2])
for i=1,#syncs do
connSync:push{"SYNCA",syncs[i],data[2]}
end
end
data = connFire:peek()
if data~=nil and cleanUp[data[1]] then
local meh = data[1]
connFire:pop() -- lets remove dead thread stuff
multi:newAlarm(15):OnRing(function(a)
cleanUp[meh] = nil
end)
end
end
end)
GLOBAL[c.name]=c
return c
end
local conn = multi:newSystemThreadedConnection("conn"):init()
conn(function(...)
print("MAIN",...)
end) end)
jQueue:registerJob("TEST",function(a,b,c) conn.IgnoreSelf = true
console:print(a,b,c) multi:newSystemThread("meh",function()
return "This is a test" local multi = require("multi")
local conn = THREAD.waitFor("conn"):init()
conn.IgnoreSelf = true
conn(function(...)
print("THREAD:",...)
end)
multi:newAlarm(1):OnRing(function(a)
conn:Fire("Does this work?")
a:Destroy()
end)
multi.OnError(function(...)
print(...)
end)
multi:mainloop()
end).OnError(function(...)
print(...)
end)
multi:newAlarm(2):OnRing(function(a)
conn:Fire("What about this one?")
a:Destroy()
end) end)
jQueue:pushJob("TEST",123,"Hello",false)
--~ multi:benchMark(1,nil,"Bench:")
multi:mainloop{ multi:mainloop{
protect = false protect = false,
--~ print = true
} }