diff --git a/multi/init.lua b/multi/init.lua index c850a53..2cd2953 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -273,14 +273,7 @@ function multi:newConnection(protect,func,kill) function temp:Fire(...) if lock then return end - if protect then - local t=pcall(call_funcs,...) - if t then - return t - end - else - return call_funcs(...) - end + return call_funcs(...) end function temp:Destroy() @@ -298,9 +291,11 @@ function multi:newConnection(protect,func,kill) if name then connections[name]=temp end + if self.callback then self.callback(temp) end + return temp end diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index bc37455..15f5bb1 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -64,10 +64,10 @@ function multi:newSystemThreadedJobQueue(n) local c = {} c.cores = n or THREAD.getCores()*2 c.OnJobCompleted = multi:newConnection() - local funcs = multi:newSystemThreadedTable() - local queueJob = multi:newSystemThreadedQueue() - local queueReturn = multi:newSystemThreadedQueue() - local doAll = multi:newSystemThreadedQueue() + local funcs = multi:newSystemThreadedTable():init() + local queueJob = multi:newSystemThreadedQueue():init() + local queueReturn = multi:newSystemThreadedQueue():init() + local doAll = multi:newSystemThreadedQueue():init() local ID=1 local jid = 1 function c:isEmpty() diff --git a/test.lua b/test.lua index b5483b9..f92ec6d 100644 --- a/test.lua +++ b/test.lua @@ -4,24 +4,35 @@ multi, thread = require("multi"):init{print=true} GLOBAL, THREAD = require("multi.integration.lanesManager"):init() function multi:newSystemThreadedConnection(name,...) - local master_conn = multi:newConnection(...) local c = {} + local proxy_conn = multi:newConnection(...) local name = name or multi.randomString(16) local connections = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out. setmetatable(c,master_conn) -- A different approach will be taken for the non main connection objects - c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name) + c.subscribe = multi:newSystemThreadedQueue("Subscribe_"..name):init() -- Incoming subscriptions multi:newThread("STC_"..name,function() while true do - thread.yield() local item = c.subscribe:pop() + -- We need to check on broken connections + -- c:Ping() + -- if item ~= nil then connections[#connections+1] = item thread.skip(multi.Priority_Normal) -- Usually a bunch of threads subscribe close to the same time. Process those by ensuring that they come alive around the same time - else -- I'm using these "Constant" values since they may change with other releases and this should allow these functions to adjust with them. + else -- I'm using these "Constants" since they may change with other releases and this should allow these functions to adjust with them. thread.skip(multi.Priority_Idle) end end end) + function c:Ping() -- Threaded Function call, can use thread.* + -- + end + function c:Fire(...) + -- + end + function c:Connect(...) + -- + end function c:init() return self end