Really close to portable proxies, currently extreamly unstable!

This commit is contained in:
Ryan Ward 2023-06-18 00:08:51 -04:00
parent 6fe10b22ab
commit 1b3e3303e9

View File

@ -128,7 +128,6 @@ function multi:newProxy(list)
end).OnError(print)
return self
else
print("INIT IN",THREAD_NAME)
local multi, thread = require("multi"):init()
local me = self
self.proc_name = proc_name
@ -142,7 +141,6 @@ function multi:newProxy(list)
for _,v in pairs(self.funcs) do
if type(v) == "table" then
-- We have a connection
print("Init Conn",v[1],THREAD_NAME)
v[2]:init(proc_name)
self["_"..v[1]] = v[2]
v[2].Parent = self
@ -150,11 +148,11 @@ function multi:newProxy(list)
self[v[1]] = multi:newConnection()
thread:newThread(function()
print("HOLD:","_"..v[1],self["_"..v[1]].Type)
while true do
self[v[1]]:Fire(thread.hold(self["_"..v[1]]))
local data = thread.hold(self["_"..v[1]])
self[v[1]]:Fire(data)
end
end)
end).OnError(multi.error)
else
self[v] = thread:newFunction(function(self,...)
if self == me then
@ -198,36 +196,72 @@ local targets = {}
local references = {}
local nFunc = 0
function multi:newTargetedFunction(ID, name, func, holup) -- This registers with the queue
function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_TFunc_"..nFunc
end
nFunc = nFunc + 1
proc.jobqueue:registerFunction(name, func)
multi:executeOnProcess(proxy.proc_name, function(proc, name, func)
proc.jobqueue:registerFunction(name, func)
end, name, func)
return thread:newFunction(function(...)
local id = proc:pushJob(ID, name, ...)
local link
local rets
link = proc.jobqueue.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
end
end)
return thread.hold(function()
if rets then
return multi.unpack(rets) or multi.NIL
end
end)
return multi:executeOnProcess(proxy.proc_name, function(proc, name, ID, ...)
local multi, thread = require("multi"):init()
local id = proc:pushJob(ID, name, ...)
local rets
local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init()
return thread.hold(function()
local data = tjq:peek()
if data then
print(data)
end
if data and data[1] == id then
print("Got it sigh")
tjq:pop()
table.remove(data, 1)
return multi.unpack(data) or multi.NIL
end
end)
-- proc.jobqueue.OnJobCompleted(function(jid, ...)
-- if id==jid then
-- rets = {...}
-- print("Got!")
-- end
-- end)
-- return thread.hold(function()
-- if rets then
-- return multi.unpack(rets) or multi.NIL
-- end
-- end)
end, name, ID, ...)
end, holup), name
end
-- local qname = name .. "_tq_" .. THREAD_ID
-- local rqname = name .. "_rtq_" .. THREAD_ID
local function getQueue(name)
return THREAD.waitFor(name):init()
end
multi.executeOnProcess = thread:newFunction(function(self, name, func, ...)
local queue = THREAD.get(name .. "_local_proc")
local queueR = THREAD.get(name .. "_local_return")
if queue and queueR then
local multi, thread = require("multi"):init()
local id = multi.randomString(8)
queue = queue:init()
queueR = queueR:init()
queue:push({func, id, ...})
return thread.hold(function()
local data = queueR:peek()
if data and data[1] == id then
queueR:pop()
table.remove(data, 1)
return multi.unpack(data) or multi.NIL
end
end)
else
return nil, "Unable to find a process queue with name: '" .. name .. "'"
end
end, true)
local jid = -1
function multi:newSystemThreadedProcessor(cores)
@ -249,12 +283,16 @@ function multi:newSystemThreadedProcessor(cores)
c.OnObjectCreated = multi:newConnection()
c.parent = self
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
c.local_cmd = multi:newSystemThreadedQueue(name .. "_local_proc"):init()
c.local_cmd_return = multi:newSystemThreadedQueue(name .. "_local_return"):init()
c.jobqueue:registerFunction("STP_enable_targets",function(name)
local multi, thread = require("multi"):init()
local qname = name .. "_tq_" .. THREAD_ID
local rqname = name .. "_rtq_" .. THREAD_ID
local tjq = multi:newSystemThreadedQueue(qname):init()
local trq = multi:newSystemThreadedQueue(rqname):init()
multi:newThread("TargetedJobHandler", function()
local th
while true do
@ -269,13 +307,13 @@ function multi:newSystemThreadedProcessor(cores)
local args = table.remove(dat, 1)
th.OnError(function(self,err)
-- We want to pass this to the other calling thread incase
rqname:push{jid, err}
trq:push{jid, err}
end)
rqname:push{jid, func(multi.unpack(args))}
trq:push{jid, func(multi.unpack(args))}
end)
end
end
end).OnError(multi.error)
end).OnError(print)
end)
c.jobqueue:registerFunction("STP_GetThreadCount",function()
@ -287,7 +325,10 @@ function multi:newSystemThreadedProcessor(cores)
end)
function c:pushJob(ID, name, ...)
targets[ID]:push{name, jid, {...}}
print("pushing")
local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init()
--targets[ID]:push{name, jid, {...}}
tq:push{name, jid, {...}}
jid = jid - 1
return jid + 1
end
@ -432,37 +473,59 @@ function multi:newSystemThreadedProcessor(cores)
return loads
end, true)
local check = function()
return c.local_cmd:pop()
end
thread:newThread(function()
while true do
local data = thread.hold(check)
if data then
thread:newThread(function()
local func = table.remove(data, 1)
local id = table.remove(data, 1)
local ret = {id, func(c, multi.unpack(data))}
c.local_cmd_return:push(ret)
end).OnError(multi.error)
end
end
end).OnError(multi.error)
return c
end
-- Modify thread.hold to handle proxies
local thread_ref = thread.hold
function thread.hold(n, opt)
--if type(n) == "table" then print(n.Type, n.isConnection()) end
if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then
local ready = false
local args
local id = n.getThreadID()
local name = n:getUniqueName()
local func = multi:newTargetedFunction(id, references[n.Parent], "conn_"..multi.randomString(8), function(_name)
print(id, name)
local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name)
local multi, thread = require("multi"):init()
local obj = _G[_name]
print("Start")
local rets = {thread.hold(obj)}
print("Ring ;)")
for i,v in pairs(rets) do
if v.Type then
rets[i] = {_self_ref_ = "parent"}
end
end
return multi.unpack(rets)
end)
end, true)
local conn
local handle = func(name)
conn = handle.OnReturn(function(...)
ready = true
args = {...}
handle.OnReturn:Unconnect(conn)
end)
local args = {func(name)}
-- conn = handle.OnReturn(function(...)
-- ready = true
-- args = {...}
-- for i,v in pairs(args) do
-- print("DATA",i,v)
-- end
-- handle.OnReturn:Unconnect(conn)
-- end)
local ret = {thread_ref(function()
if ready then
@ -471,7 +534,9 @@ function thread.hold(n, opt)
end, opt)}
for i,v in pairs(ret) do
print("OBJECT",v.Type)
if type(v) == "table" and v._self_ref_ == "parent" then
print("assign")
ret[i] = n.Parent
end
end