diff --git a/integration/sharedExtensions/init.lua b/integration/sharedExtensions/init.lua index 8fc99a3..bd623d5 100644 --- a/integration/sharedExtensions/init.lua +++ b/integration/sharedExtensions/init.lua @@ -128,7 +128,6 @@ function multi:newProxy(list) end).OnError(print) return self else - print("INIT IN",THREAD_NAME) local multi, thread = require("multi"):init() local me = self self.proc_name = proc_name @@ -142,7 +141,6 @@ function multi:newProxy(list) for _,v in pairs(self.funcs) do if type(v) == "table" then -- We have a connection - print("Init Conn",v[1],THREAD_NAME) v[2]:init(proc_name) self["_"..v[1]] = v[2] v[2].Parent = self @@ -150,11 +148,11 @@ function multi:newProxy(list) self[v[1]] = multi:newConnection() thread:newThread(function() - print("HOLD:","_"..v[1],self["_"..v[1]].Type) while true do - self[v[1]]:Fire(thread.hold(self["_"..v[1]])) + local data = thread.hold(self["_"..v[1]]) + self[v[1]]:Fire(data) end - end) + end).OnError(multi.error) else self[v] = thread:newFunction(function(self,...) if self == me then @@ -198,36 +196,72 @@ local targets = {} local references = {} local nFunc = 0 -function multi:newTargetedFunction(ID, name, func, holup) -- This registers with the queue +function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registers with the queue if type(name)=="function" then holup = func func = name name = "JQ_TFunc_"..nFunc end nFunc = nFunc + 1 - proc.jobqueue:registerFunction(name, func) + + multi:executeOnProcess(proxy.proc_name, function(proc, name, func) + proc.jobqueue:registerFunction(name, func) + end, name, func) + return thread:newFunction(function(...) - local id = proc:pushJob(ID, name, ...) - local link - local rets - link = proc.jobqueue.OnJobCompleted(function(jid,...) - if id==jid then - rets = {...} - end - end) - return thread.hold(function() - if rets then - return multi.unpack(rets) or multi.NIL - end - end) + return multi:executeOnProcess(proxy.proc_name, function(proc, name, ID, ...) + local multi, thread = require("multi"):init() + local id = proc:pushJob(ID, name, ...) + local rets + local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init() + return thread.hold(function() + local data = tjq:peek() + if data then + print(data) + end + if data and data[1] == id then + print("Got it sigh") + tjq:pop() + table.remove(data, 1) + return multi.unpack(data) or multi.NIL + end + end) + -- proc.jobqueue.OnJobCompleted(function(jid, ...) + -- if id==jid then + -- rets = {...} + -- print("Got!") + -- end + -- end) + -- return thread.hold(function() + -- if rets then + -- return multi.unpack(rets) or multi.NIL + -- end + -- end) + end, name, ID, ...) end, holup), name end --- local qname = name .. "_tq_" .. THREAD_ID --- local rqname = name .. "_rtq_" .. THREAD_ID -local function getQueue(name) - return THREAD.waitFor(name):init() -end +multi.executeOnProcess = thread:newFunction(function(self, name, func, ...) + local queue = THREAD.get(name .. "_local_proc") + local queueR = THREAD.get(name .. "_local_return") + if queue and queueR then + local multi, thread = require("multi"):init() + local id = multi.randomString(8) + queue = queue:init() + queueR = queueR:init() + queue:push({func, id, ...}) + return thread.hold(function() + local data = queueR:peek() + if data and data[1] == id then + queueR:pop() + table.remove(data, 1) + return multi.unpack(data) or multi.NIL + end + end) + else + return nil, "Unable to find a process queue with name: '" .. name .. "'" + end +end, true) local jid = -1 function multi:newSystemThreadedProcessor(cores) @@ -249,12 +283,16 @@ function multi:newSystemThreadedProcessor(cores) c.OnObjectCreated = multi:newConnection() c.parent = self c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) + c.local_cmd = multi:newSystemThreadedQueue(name .. "_local_proc"):init() + c.local_cmd_return = multi:newSystemThreadedQueue(name .. "_local_return"):init() c.jobqueue:registerFunction("STP_enable_targets",function(name) local multi, thread = require("multi"):init() local qname = name .. "_tq_" .. THREAD_ID local rqname = name .. "_rtq_" .. THREAD_ID + local tjq = multi:newSystemThreadedQueue(qname):init() + local trq = multi:newSystemThreadedQueue(rqname):init() multi:newThread("TargetedJobHandler", function() local th while true do @@ -269,13 +307,13 @@ function multi:newSystemThreadedProcessor(cores) local args = table.remove(dat, 1) th.OnError(function(self,err) -- We want to pass this to the other calling thread incase - rqname:push{jid, err} + trq:push{jid, err} end) - rqname:push{jid, func(multi.unpack(args))} + trq:push{jid, func(multi.unpack(args))} end) end end - end).OnError(multi.error) + end).OnError(print) end) c.jobqueue:registerFunction("STP_GetThreadCount",function() @@ -287,7 +325,10 @@ function multi:newSystemThreadedProcessor(cores) end) function c:pushJob(ID, name, ...) - targets[ID]:push{name, jid, {...}} + print("pushing") + local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() + --targets[ID]:push{name, jid, {...}} + tq:push{name, jid, {...}} jid = jid - 1 return jid + 1 end @@ -432,37 +473,59 @@ function multi:newSystemThreadedProcessor(cores) return loads end, true) + local check = function() + return c.local_cmd:pop() + end + thread:newThread(function() + while true do + local data = thread.hold(check) + if data then + thread:newThread(function() + local func = table.remove(data, 1) + local id = table.remove(data, 1) + local ret = {id, func(c, multi.unpack(data))} + c.local_cmd_return:push(ret) + end).OnError(multi.error) + end + end + end).OnError(multi.error) + return c end -- Modify thread.hold to handle proxies local thread_ref = thread.hold function thread.hold(n, opt) - --if type(n) == "table" then print(n.Type, n.isConnection()) end if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then local ready = false local args local id = n.getThreadID() local name = n:getUniqueName() - local func = multi:newTargetedFunction(id, references[n.Parent], "conn_"..multi.randomString(8), function(_name) + print(id, name) + local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name) local multi, thread = require("multi"):init() local obj = _G[_name] + print("Start") local rets = {thread.hold(obj)} + print("Ring ;)") for i,v in pairs(rets) do if v.Type then rets[i] = {_self_ref_ = "parent"} end end return multi.unpack(rets) - end) + end, true) local conn - local handle = func(name) - conn = handle.OnReturn(function(...) - ready = true - args = {...} - handle.OnReturn:Unconnect(conn) - end) + local args = {func(name)} + -- conn = handle.OnReturn(function(...) + -- ready = true + -- args = {...} + -- for i,v in pairs(args) do + -- print("DATA",i,v) + -- end + -- handle.OnReturn:Unconnect(conn) + -- end) local ret = {thread_ref(function() if ready then @@ -471,7 +534,9 @@ function thread.hold(n, opt) end, opt)} for i,v in pairs(ret) do + print("OBJECT",v.Type) if type(v) == "table" and v._self_ref_ == "parent" then + print("assign") ret[i] = n.Parent end end