Removed extra bloat, proxies are portable now!

This commit is contained in:
Ryan Ward 2023-06-25 21:46:37 -04:00
parent c39aa229f8
commit 660c10ec3b
13 changed files with 293 additions and 374 deletions

View File

@ -74,6 +74,7 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds
Added Added
--- ---
- thread.hold will now use a custom hold method for objects with a `Hold` method. This is called like `obj:Hold(opt)`. The only argument passed is the optional options table that thread.hold can pass. There is an exception for connection objects. While they do contain a Hold method, the Hold method isn't used and is there for proxy objects, though they can be used in non proxy/thread situations. Hold returns all the arguments that the connection object was fired with.
- shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there. - shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there.
```lua ```lua
package.path = "?/init.lua;?.lua;"..package.path package.path = "?/init.lua;?.lua;"..package.path
@ -119,10 +120,6 @@ Added
STJQ_cPXT8GOx We work! We work!!! STJQ_cPXT8GOx We work! We work!!!
``` ```
- STP:getLoad(type) -- returns a table where the index is the threadID and the value is the number of objects[type] running on that thread. `type`: "threads" for coroutines running or nil for all other objects running.
- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn)
- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in
- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to
- multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))` - multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))`
- multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread - multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread
@ -148,67 +145,105 @@ Added
``` ```
Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread. Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread.
Connection proxies break the rules a bit. Normally methods should always work on the thread side, however for connections in order to have actions work on the thread side you would call the connection using `obj._connName` instead of calling `obj.connName`. This allows you to have more control over connection events. See example below: Proxies can also be shared between threads, just remember to use proxy:getTransferable() before transferring and proxy:init() on the other end. (We need to avoid copying over coroutines)
The work done with proxies negates the usage of multi:newSystemThreadedConnection(), the only difference is you lose the metatables from connections.
You cannot connect directly to a proxy connection on the non proxy thread, you can however use proxy_conn:Hold() or thread.hold(proxy_conn) to emulate this, see below.
```lua ```lua
package.path = "?/init.lua;?.lua;"..package.path package.path = "?/init.lua;?.lua;"..package.path
multi, thread = require("multi"):init({print=true}) multi, thread = require("multi"):init({print=true, warn=true, error=true})
THREAD, GLOBAL = require("multi.integration.lanesManager"):init() THREAD, GLOBAL = require("multi.integration.lanesManager"):init()
stp = multi:newSystemThreadedProcessor(8) stp = multi:newSystemThreadedProcessor(8)
alarm = stp:newAlarm(3) tloop = stp:newTLoop(nil, 1)
-- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! multi:newSystemThread("Testing proxy copy",function(tloop)
thread:newThread(function() local function tprint (tbl, indent)
print("Hold on proxied connection", thread.hold(alarm._OnRing)) if not indent then indent = 0 end
end) for k, v in pairs(tbl) do
formatting = string.rep(" ", indent) .. k .. ": "
if type(v) == "table" then
print(formatting)
tprint(v, indent+1)
else
print(formatting .. tostring(v))
end
end
end
local multi, thread = require("multi"):init()
tloop = tloop:init()
print("tloop type:",tloop.Type)
print("Testing proxies on other threads")
thread:newThread(function()
while true do
thread.hold(tloop.OnLoop)
print(THREAD_NAME,"Loopy")
end
end)
tloop.OnLoop(function(a)
print(THREAD_NAME, "Got loop...")
end)
multi:mainloop()
end, tloop:getTransferable()).OnError(multi.error)
alarm.OnRing(function(a) print("tloop", tloop.Type)
print("OnRing",a, THREAD_NAME, THREAD_ID)
end)
print("alarm.OnRing", alarm.OnRing.Type)
print("alarm._OnRing", alarm._OnRing.Type)
thread:newThread(function() thread:newThread(function()
print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) print("Holding...")
thread.hold(tloop.OnLoop)
print("Held on proxied no proxy connection 1")
end).OnError(print)
thread:newThread(function()
tloop.OnLoop:Hold()
print("held on proxied no proxy connection 2")
end)
tloop.OnLoop(function()
print("OnLoop",THREAD_NAME)
end) end)
thread:newThread(function() thread:newThread(function()
print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing)) while true do
end) tloop.OnLoop:Hold()
print("OnLoop",THREAD_NAME)
-- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made! end
thread:newThread(function() end).OnError(multi.error)
print("Hold on proxied connection", thread.hold(alarm._OnRing))
end)
alarm._OnRing(function(a)
print("_OnRing",a, THREAD_NAME, THREAD_ID)
a:Reset(1)
end)
multi:mainloop() multi:mainloop()
``` ```
Output: Output:
``` ```
INFO: Integrated Lanes Threading! INFO: Integrated Lanes Threading! 1
alarm.OnRing connector tloop proxy
alarm._OnRing proxy Holding...
_OnRing table: 025EB128 STJQ_cjKsEZHg 1 <-- This can change each time you run this example! tloop type: proxy
OnRing table: 018BC0C0 MAIN_THREAD 0 Testing proxies on other threads
Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil OnLoop STJQ_W9SZGB6Y
Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil STJQ_W9SZGB6Y Got loop...
_OnRing table: 025EB128 STJQ_cjKsEZHg 1 OnLoop MAIN_THREAD
OnRing table: 018BC0C0 MAIN_THREAD 0 Testing proxy copy Loopy
_OnRing table: 025EB128 STJQ_cjKsEZHg 1 Held on proxied no proxy connection 1
OnRing table: 018BC0C0 MAIN_THREAD 0 held on proxied no proxy connection 2
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
Testing proxy copy Loopy
OnLoop MAIN_THREAD
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
... (Will repeat ever second now) ... (Will repeat every second)
_OnRing table: 025EB128 STJQ_cjKsEZHg 1
OnRing table: 018BC0C0 MAIN_THREAD 0 Testing proxy copy Loopy
OnLoop MAIN_THREAD
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
...
``` ```
The proxy version can only subscribe to events on the proxy thread, which means that connection metamethods will not work with the proxy version (`_OnRing` on the non proxy thread side), but the (`OnRing`) version will work. Cleverly handling the proxy thread and the non proxy thread will allow powerful connection logic. Also this is not a full system threaded connection. **Proxies should only be used between 2 threads!** To keep things fast I'm using simple queues to transfer data. There is no guarantee that things will work! The proxy version can only subscribe to events on the proxy thread, which means that connection metamethods will not work with the proxy version (`_OnRing` on the non proxy thread side), but the (`OnRing`) version will work. Cleverly handling the proxy thread and the non proxy thread will allow powerful connection logic. Also this is not a full system threaded connection. **Proxies should only be used between 2 threads!** To keep things fast I'm using simple queues to transfer data. There is no guarantee that things will work!
@ -222,6 +257,7 @@ Added
- proxyStep = STP:newStep(...) - proxyStep = STP:newStep(...)
- proxyTStep = STP:newTStep(...) - proxyTStep = STP:newTStep(...)
- proxyThread = STP:newThread(...) - proxyThread = STP:newThread(...)
- proxyService = STP:newService(...)
- threadedFunction = STP:newFunction(...) - threadedFunction = STP:newFunction(...)
Unique: Unique:
@ -449,6 +485,7 @@ Removed
Fixed Fixed
--- ---
- Issue with luajit w/5.2 compat breaking with coroutine.running(), fixed the script to properly handle so thread.isThread() returns as expected!
- Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind. - Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind.
- Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function. - Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function.
- Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things. - Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things.

155
init.lua
View File

@ -33,6 +33,7 @@ local threadManager
local __CurrentConnectionThread local __CurrentConnectionThread
multi.unpack = table.unpack or unpack multi.unpack = table.unpack or unpack
multi.pack = table.pack or function(...) return {...} end
if table.unpack then unpack = table.unpack end if table.unpack then unpack = table.unpack end
@ -81,6 +82,7 @@ multi.TSTEP = "tstep"
multi.THREAD = "thread" multi.THREAD = "thread"
multi.SERVICE = "service" multi.SERVICE = "service"
multi.PROXY = "proxy" multi.PROXY = "proxy"
multi.THREADEDFUNCTION = "threaded_function"
if not _G["$multi"] then if not _G["$multi"] then
_G["$multi"] = {multi = multi, thread = thread} _G["$multi"] = {multi = multi, thread = thread}
@ -133,9 +135,7 @@ function multi.Stop()
mainloopActive = false mainloopActive = false
end end
local function pack(...) local pack = multi.pack
return {...}
end
--Processor --Processor
local priorityTable = {[false]="Disabled",[true]="Enabled"} local priorityTable = {[false]="Disabled",[true]="Enabled"}
@ -470,6 +470,10 @@ function multi:newConnection(protect,func,kill)
return temp return temp
end end
c.Hold = thread:newFunction(function(self)
return thread.hold(self)
end, true)
c.connect=c.Connect c.connect=c.Connect
c.GetConnection=c.getConnection c.GetConnection=c.getConnection
c.HasConnections = c.hasConnections c.HasConnections = c.hasConnections
@ -487,24 +491,6 @@ function multi:newConnection(protect,func,kill)
return c return c
end end
multi.enableOptimization = multi:newConnection()
multi.optConn = multi:newConnection(true)
multi.optConn(function(msg)
table.insert(optimization_stats, msg)
end)
function multi:getOptimizationConnection()
return multi.optConn
end
function multi:getOptimizationStats()
return optimization_stats
end
function multi:isFindingOptimizing()
return find_optimization
end
-- Used with ISO Threads -- Used with ISO Threads
local function isolateFunction(func, env) local function isolateFunction(func, env)
if setfenv then if setfenv then
@ -637,6 +623,7 @@ function multi:isDone()
end end
function multi:create(ref) function multi:create(ref)
ref.UID = "U"..multi.randomString(12)
self.OnObjectCreated:Fire(ref, self) self.OnObjectCreated:Fire(ref, self)
return self return self
end end
@ -686,10 +673,6 @@ function multi:newBase(ins)
return c return c
end end
multi.OnObjectCreated=multi:newConnection()
multi.OnObjectDestroyed=multi:newConnection()
multi.OnLoad = multi:newConnection(nil,nil,true)
ignoreconn = false
function multi:newTimer() function multi:newTimer()
local c={} local c={}
c.Type=multi.TIMER c.Type=multi.TIMER
@ -740,7 +723,7 @@ function multi:newEvent(task, func)
task=func task=func
return self return self
end end
c.OnEvent = self:newConnection():fastMode() c.OnEvent = self:newConnection()
if func then if func then
c.OnEvent(func) c.OnEvent(func)
end end
@ -767,7 +750,7 @@ function multi:newUpdater(skip, func)
skip=n skip=n
return self return self
end end
c.OnUpdate = self:newConnection():fastMode() c.OnUpdate = self:newConnection()
c:setName(c.Type) c:setName(c.Type)
if func then if func then
c.OnUpdate(func) c.OnUpdate(func)
@ -803,7 +786,7 @@ function multi:newAlarm(set, func)
t = clock() t = clock()
return self return self
end end
c.OnRing = self:newConnection():fastMode() c.OnRing = self:newConnection()
function c:Pause() function c:Pause()
count = clock() count = clock()
self.Parent.Pause(self) self.Parent.Pause(self)
@ -833,7 +816,7 @@ function multi:newLoop(func, notime)
end end
end end
c.OnLoop = self:newConnection():fastMode() c.OnLoop = self:newConnection()
if func then if func then
c.OnLoop(func) c.OnLoop(func)
@ -881,9 +864,9 @@ function multi:newStep(start,reset,count,skip)
return true return true
end end
c.Reset=c.Resume c.Reset=c.Resume
c.OnStart = self:newConnection():fastMode() c.OnStart = self:newConnection()
c.OnStep = self:newConnection():fastMode() c.OnStep = self:newConnection()
c.OnEnd = self:newConnection():fastMode() c.OnEnd = self:newConnection()
function c:Break() function c:Break()
self.Active=nil self.Active=nil
return self return self
@ -904,40 +887,49 @@ function multi:newStep(start,reset,count,skip)
return c return c
end end
function multi:newTLoop(func,set) function multi:newTLoop(func, set)
local c=self:newBase() local c=self:newBase()
c.Type=multi.TLOOP c.Type=multi.TLOOP
c.set=set or 0 c.set=set or 0
c.timer=self:newTimer() c.timer=self:newTimer()
c.life=0 c.life=0
c:setPriority("Low") c:setPriority("Low")
function c:Act() function c:Act()
if self.timer:Get()>=self.set then if self.timer:Get() >= self.set then
self.life=self.life+1 self.life=self.life+1
self.timer:Reset() self.timer:Reset()
self.OnLoop:Fire(self,self.life) self.OnLoop:Fire(self, self.life)
return true return true
end end
end end
function c:Set(set) function c:Set(set)
self.set = set self.set = set
end end
function c:Resume() function c:Resume()
self.Parent.Resume(self) self.Parent.Resume(self)
self.timer:Resume() self.timer:Resume()
return self return self
end end
function c:Pause() function c:Pause()
self.timer:Pause() self.timer:Pause()
self.Parent.Pause(self) self.Parent.Pause(self)
return self return self
end end
c.OnLoop = self:newConnection():fastMode()
c.OnLoop = self:newConnection()
if func then if func then
c.OnLoop(func) c.OnLoop(func)
end end
c:setName(c.Type) c:setName(c.Type)
self:create(c) self:create(c)
return c return c
end end
@ -1144,6 +1136,7 @@ function multi:newProcessor(name, nothread)
end end
table.insert(processes,c) table.insert(processes,c)
self:create(c)
return c return c
end end
@ -1209,7 +1202,7 @@ function multi:getTasks()
end end
function thread.request(t,cmd,...) function thread.request(t,cmd,...)
thread.requests[t.thread] = {cmd,{...}} thread.requests[t.thread] = {cmd, multi.pack(...)}
end end
function thread.getRunningThread() function thread.getRunningThread()
@ -1248,14 +1241,18 @@ local function conn_test(conn)
local args local args
local func = function(...) local func = function(...)
ready = true ready = true
args = {...} args = multi.pack(...)
end end
local ref = conn(func) local ref = conn(func)
return function() return function()
if ready then if ready then
conn:Unconnect(ref) conn:Unconnect(ref)
return multi.unpack(args) or multi.NIL if #args==0 then
return multi.NIL
else
return multi.unpack(args)
end
end end
end end
end end
@ -1267,7 +1264,7 @@ function thread.chain(...)
end end
end end
function thread.hold(n,opt) function thread.hold(n, opt)
thread._Requests() thread._Requests()
local opt = opt or {} local opt = opt or {}
if type(opt)=="table" then if type(opt)=="table" then
@ -1286,8 +1283,10 @@ function thread.hold(n,opt)
return yield(CMD, t_sleep, n or 0, nil, interval) return yield(CMD, t_sleep, n or 0, nil, interval)
elseif type(n) == "table" and n.Type == multi.CONNECTOR then elseif type(n) == "table" and n.Type == multi.CONNECTOR then
return yield(CMD, t_hold, conn_test(n), nil, interval) return yield(CMD, t_hold, conn_test(n), nil, interval)
elseif type(n) == "table" and n.Hold ~= nil then
return n:Hold(opt)
elseif type(n) == "function" then elseif type(n) == "function" then
return yield(CMD, t_hold, n or dFunc, nil, interval) return yield(CMD, t_hold, n, nil, interval)
else else
multi.error("Invalid argument passed to thread.hold(...)!") multi.error("Invalid argument passed to thread.hold(...)!")
end end
@ -1318,11 +1317,12 @@ function thread.yield()
end end
function thread.isThread() function thread.isThread()
if _VERSION~="Lua 5.1" then local a,b = running()
local a,b = running() if b then
-- We are dealing with luajit compat or 5.2+
return not(b) return not(b)
else else
return running()~=nil return a~=nil
end end
end end
@ -1345,7 +1345,7 @@ function thread.waitFor(name)
end end
local function cleanReturns(...) local function cleanReturns(...)
local returns = {...} local returns = multi.pack(...)
local rets = {} local rets = {}
local ind = 0 local ind = 0
for i=#returns,1,-1 do for i=#returns,1,-1 do
@ -1392,7 +1392,7 @@ function thread:newFunctionBase(generator, holdme)
end) end)
else else
while not rets and not err do while not rets and not err do
multi:getCurrentProcess():getHandler()() multi:uManager()
end end
local g = rets local g = rets
rets = nil rets = nil
@ -1416,7 +1416,7 @@ function thread:newFunctionBase(generator, holdme)
} }
end end
local t = generator(...) local t = generator(...)
t.OnDeath(function(...) rets = {...} end) t.OnDeath(function(...) rets = multi.pack(...) end)
t.OnError(function(self,e) err = e end) t.OnError(function(self,e) err = e end)
if holdme then if holdme then
return wait() return wait()
@ -1444,15 +1444,14 @@ function thread:newFunctionBase(generator, holdme)
return temp return temp
end end
setmetatable(tfunc, tfunc) setmetatable(tfunc, tfunc)
tfunc.Type = multi.THREADEDFUNCTION
return tfunc return tfunc
end end
end end
function thread:newFunction(func, holdme) function thread:newFunction(func, holdme)
return thread:newFunctionBase(function(...) return thread:newFunctionBase(function(...)
local th = thread:newThread("Free Threaded Function Handler", func, ...) return thread:newThread("Free Threaded Function Handler", func, ...)
th.creator = debug.getinfo(2).name
return th
end, holdme)() end, holdme)()
end end
@ -1492,12 +1491,12 @@ function thread:newProcessor(name)
function proc.Start() function proc.Start()
Active = true Active = true
return c return proc
end end
function proc.Stop() function proc.Stop()
Active = false Active = false
return c return proc
end end
function proc:Destroy() function proc:Destroy()
@ -1515,6 +1514,8 @@ function thread:newProcessor(name)
end end
end) end)
end) end)
self:create(proc)
return proc return proc
end end
@ -1534,7 +1535,7 @@ function thread:newThread(name, func, ...)
end end
local c={nil,nil,nil,nil,nil,nil,nil} local c={nil,nil,nil,nil,nil,nil,nil}
c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil}
c.startArgs = {...} c.startArgs = multi.pack(...)
c.ref={} c.ref={}
c.Name=name c.Name=name
c.thread=create(func) c.thread=create(func)
@ -1797,14 +1798,6 @@ function multi:createHandler()
return coroutine.wrap(function() return coroutine.wrap(function()
local temp_start local temp_start
while true do while true do
-- for start = #startme, 1, -1 do
-- temp_start = startme[start]
-- table.remove(startme)
-- _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs))
-- co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads)
-- table.insert(threads, temp_start)
-- yield()
-- end
while #startme>0 do while #startme>0 do
temp_start = table.remove(startme) temp_start = table.remove(startme)
_, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs)) _, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs))
@ -1929,7 +1922,7 @@ function multi:newService(func) -- Priority managed threads
return c return c
end end
multi.create(multi,c) self:create(c)
return c return c
end end
@ -2338,7 +2331,7 @@ end
function multi.print(...) function multi.print(...)
if multi.defaultSettings.print then if multi.defaultSettings.print then
local t = {} local t = {}
for i,v in pairs({...}) do t[#t+1] = tostring(v) end for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end
io.write("\x1b[94mINFO:\x1b[0m " .. table.concat(t," ") .. "\n") io.write("\x1b[94mINFO:\x1b[0m " .. table.concat(t," ") .. "\n")
end end
end end
@ -2346,7 +2339,7 @@ end
function multi.warn(...) function multi.warn(...)
if multi.defaultSettings.warn then if multi.defaultSettings.warn then
local t = {} local t = {}
for i,v in pairs({...}) do t[#t+1] = tostring(v) end for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end
io.write("\x1b[93mWARNING:\x1b[0m " .. table.concat(t," ") .. "\n") io.write("\x1b[93mWARNING:\x1b[0m " .. table.concat(t," ") .. "\n")
end end
end end
@ -2363,7 +2356,7 @@ end
function multi.success(...) function multi.success(...)
local t = {} local t = {}
for i,v in pairs({...}) do t[#t+1] = tostring(v) end for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end
io.write("\x1b[92mSUCCESS:\x1b[0m " .. table.concat(t," ") .. "\n") io.write("\x1b[92mSUCCESS:\x1b[0m " .. table.concat(t," ") .. "\n")
end end
@ -2384,6 +2377,29 @@ function os.exit(n)
_os(n) _os(n)
end end
multi.OnObjectCreated=multi:newConnection()
ignoreconn = false
multi.OnObjectDestroyed=multi:newConnection()
multi.OnLoad = multi:newConnection(nil,nil,true)
multi.enableOptimization = multi:newConnection()
multi.optConn = multi:newConnection(true)
multi.optConn(function(msg)
table.insert(optimization_stats, msg)
end)
function multi:getOptimizationConnection()
return multi.optConn
end
function multi:getOptimizationStats()
return optimization_stats
end
function multi:isFindingOptimizing()
return find_optimization
end
multi.OnError=multi:newConnection() multi.OnError=multi:newConnection()
multi.OnPreLoad = multi:newConnection() multi.OnPreLoad = multi:newConnection()
multi.OnExit = multi:newConnection(nil,nil,true) multi.OnExit = multi:newConnection(nil,nil,true)
@ -2403,11 +2419,12 @@ function multi:getHandler()
end end
multi:newThread("Task Handler", function() multi:newThread("Task Handler", function()
local check = function()
return table.remove(tasks)
end
while true do while true do
thread.hold(check)() if #tasks > 0 then
table.remove(tasks)()
else
thread.yield()
end
end end
end).OnError(multi.error) end).OnError(multi.error)

View File

@ -113,7 +113,7 @@ function multi:newSystemThreadedJobQueue(n)
return self return self
end end
function c:pushJob(name,...) function c:pushJob(name,...)
queueJob:push{name,jid,{...}} queueJob:push{name,jid,multi.pack(...)}
jid = jid + 1 jid = jid + 1
return jid-1 return jid-1
end end
@ -132,12 +132,16 @@ function multi:newSystemThreadedJobQueue(n)
local rets local rets
link = c.OnJobCompleted(function(jid,...) link = c.OnJobCompleted(function(jid,...)
if id==jid then if id==jid then
rets = {...} rets = multi.pack(...)
end end
end) end)
return thread.hold(function() return thread.hold(function()
if rets then if rets then
return multi.unpack(rets) or multi.NIL if #rets == 0 then
return multi.NIL
else
return multi.unpack(rets)
end
end end
end) end)
end, holup), name end, holup), name
@ -171,7 +175,7 @@ function multi:newSystemThreadedJobQueue(n)
local name = table.remove(dat, 1) local name = table.remove(dat, 1)
local jid = table.remove(dat, 1) local jid = table.remove(dat, 1)
local args = table.remove(dat, 1) local args = table.remove(dat, 1)
queueReturn:push{jid, funcs[name](multi.unpack(args)), queue} queueReturn:push{jid, funcs[name](args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]), queue}
end).OnError(multi.error) end).OnError(multi.error)
end end
end).OnError(multi.error) end).OnError(multi.error)
@ -258,7 +262,7 @@ function multi:newSystemThreadedConnection(name)
local function fire(...) local function fire(...)
for _, link in pairs(c.links) do for _, link in pairs(c.links) do
link:push {c.TRIG, {...}} link:push {c.TRIG, multi.pack(...)}
end end
end end
@ -286,7 +290,7 @@ function multi:newSystemThreadedConnection(name)
--- ^^^ This will only exist in the init thread --- ^^^ This will only exist in the init thread
function c:Fire(...) function c:Fire(...)
local args = {...} local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do for _, link in pairs(self.links) do
link:push {self.TRIG, args} link:push {self.TRIG, args}

View File

@ -98,6 +98,13 @@ function multi:newSystemThread(name, func, ...)
globals = globe, globals = globe,
priority = c.priority priority = c.priority
},function(...) },function(...)
local profi
if multi_settings.debug then
profi = require("proFI")
profi:start()
end
multi, thread = require("multi"):init(multi_settings) multi, thread = require("multi"):init(multi_settings)
require("multi.integration.lanesManager.extensions") require("multi.integration.lanesManager.extensions")
require("multi.integration.sharedExtensions") require("multi.integration.sharedExtensions")
@ -105,6 +112,12 @@ function multi:newSystemThread(name, func, ...)
returns = {pcall(func, ...)} returns = {pcall(func, ...)}
return_linda:set("returns", returns) return_linda:set("returns", returns)
has_error = false has_error = false
if profi then
multi.OnExit(function(...)
profi:stop()
profi:writeReport("Profiling Report [".. THREAD_NAME .."].txt")
end)
end
end)(...) end)(...)
count = count + 1 count = count + 1
function c:getName() function c:getName()
@ -118,6 +131,13 @@ function multi:newSystemThread(name, func, ...)
c.OnDeath = multi:newConnection() c.OnDeath = multi:newConnection()
c.OnError = multi:newConnection() c.OnError = multi:newConnection()
GLOBAL["__THREADS__"] = livingThreads GLOBAL["__THREADS__"] = livingThreads
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c return c
end end

View File

@ -66,7 +66,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console)
local c = {} local c = {}
c.queue = __Console c.queue = __Console
function c.print(...) function c.print(...)
c.queue:send("Q", {...}) c.queue:send("Q", multi.pack(...))
end end
function c.error(err) function c.error(err)
c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}) c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__})
@ -90,7 +90,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console)
end end
function THREAD.pushStatus(...) function THREAD.pushStatus(...)
local args = {...} local args = multi.pack(...)
__StatusLinda:send(nil,THREAD_ID, args) __StatusLinda:send(nil,THREAD_ID, args)
end end

View File

@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n)
local rets local rets
link = c.OnJobCompleted(function(jid,...) link = c.OnJobCompleted(function(jid,...)
if id==jid then if id==jid then
rets = {...} rets = multi.pack(...)
end end
end) end)
return thread.hold(function() return thread.hold(function()
@ -230,7 +230,7 @@ function multi:newSystemThreadedConnection(name)
self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name) self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name)
function self:Fire(...) function self:Fire(...)
local args = {...} local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do for _, link in pairs(self.links) do
love.thread.getChannel(link):push{self.TRIG, args} love.thread.getChannel(link):push{self.TRIG, args}
@ -321,7 +321,7 @@ function multi:newSystemThreadedConnection(name)
local function fire(...) local function fire(...)
for _, link in pairs(c.links) do for _, link in pairs(c.links) do
love.thread.getChannel(link):push {c.TRIG, {...}} love.thread.getChannel(link):push {c.TRIG, multi.pack(...)}
end end
end end

View File

@ -103,6 +103,13 @@ function multi:newSystemThread(name, func, ...)
c.stab.returns = nil c.stab.returns = nil
end end
end) end)
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c return c
end end

View File

@ -98,7 +98,7 @@ end
function threads.pushStatus(...) function threads.pushStatus(...)
local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__) local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__)
local args = {...} local args = multi.pack(...)
status_channel:push(args) status_channel:push(args)
end end
@ -209,7 +209,7 @@ function threads.getConsole()
local c = {} local c = {}
c.queue = love.thread.getChannel("__CONSOLE__") c.queue = love.thread.getChannel("__CONSOLE__")
function c.print(...) function c.print(...)
c.queue:push{...} c.queue:push(multi.pack(...))
end end
function c.error(err) function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}

View File

@ -118,7 +118,7 @@ function multi:newSystemThreadedJobQueue(n)
local rets local rets
link = c.OnJobCompleted(function(jid,...) link = c.OnJobCompleted(function(jid,...)
if id==jid then if id==jid then
rets = {...} rets = multi.pack(...)
link:Destroy() link:Destroy()
end end
end) end)

View File

@ -76,6 +76,13 @@ function multi:newSystemThread(name,func,...)
GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread}
GLOBAL["__THREAD_COUNT"] = THREAD_ID GLOBAL["__THREAD_COUNT"] = THREAD_ID
THREAD_ID=THREAD_ID+1 THREAD_ID=THREAD_ID+1
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c return c
end end
THREAD.newSystemThread = multi.newSystemThread THREAD.newSystemThread = multi.newSystemThread

View File

@ -156,7 +156,7 @@ function threads.getConsole()
local c = {} local c = {}
c.queue = lovr.thread.getChannel("__CONSOLE__") c.queue = lovr.thread.getChannel("__CONSOLE__")
function c.print(...) function c.print(...)
c.queue:push{...} c.queue:push(multi.pack(...))
end end
function c.error(err) function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}

View File

@ -96,7 +96,7 @@ function multi:newSystemThreadedJobQueue(n)
end end
function c:pushJob(name,...) function c:pushJob(name,...)
table.insert(jobs,{name,jid,{...}}) table.insert(jobs,{name,jid,multi.pack(...)})
jid = jid + 1 jid = jid + 1
return jid-1 return jid-1
end end
@ -121,7 +121,7 @@ function multi:newSystemThreadedJobQueue(n)
local rets local rets
link = c.OnJobCompleted(function(jid,...) link = c.OnJobCompleted(function(jid,...)
if id==jid then if id==jid then
rets = {...} rets = multi.pack(...)
end end
end) end)
return thread.hold(function() return thread.hold(function()

View File

@ -22,8 +22,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] ]]
function copy(obj) local function copy(obj)
if type(obj) ~= 'table' then return obj end if type(obj) ~= 'table' then return obj end
local res = {} local res = {}
for k, v in pairs(obj) do res[copy(k)] = copy(v) end for k, v in pairs(obj) do res[copy(k)] = copy(v) end
return res return res
@ -48,34 +48,19 @@ local multi, thread = require("multi"):init()
-- Create on the thread that you want to interact with, send over the handle -- Create on the thread that you want to interact with, send over the handle
function multi:chop(obj) function multi:chop(obj)
if not _G["UIDS"] then
_G["UIDS"] = {}
end
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local list = {[0] = multi.randomString(12)} local list = {[0] = multi.randomString(12)}
_G[list[0]] = obj _G[list[0]] = obj
for i,v in pairs(obj) do for i,v in pairs(obj) do
if type(v) == "function" then if type(v) == "function" or type(v) == "table" and v.Type == multi.THREADEDFUNCTION then
table.insert(list, i) table.insert(list, i)
elseif type(v) == "table" and v.Type == multi.CONNECTOR then elseif type(v) == "table" and v.Type == multi.CONNECTOR then
v.getThreadID = function() -- Special function we are adding table.insert(list, {i, multi:newProxy(multi:chop(v)):init()})
return THREAD_ID
end
v.getUniqueName = function(self)
return self.__link_name
end
local l = multi:chop(v)
v.__link_name = l[0]
v.__name = i
table.insert(list, {i, multi:newProxy(l):init()})
end end
end end
table.insert(list, "isConnection")
if obj.Type == multi.CONNECTOR then
obj.isConnection = function() return true end
else
obj.isConnection = function() return false end
end
return list return list
end end
@ -85,44 +70,63 @@ function multi:newProxy(list)
c.name = multi.randomString(12) c.name = multi.randomString(12)
c.is_init = false c.is_init = false
local multi, thread = nil, nil
function c:init(proc_name) function c:init()
local multi, thread = nil, nil local multi, thread = nil, nil
if not(c.is_init) then if not(c.is_init) then
c.is_init = true c.is_init = true
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
c.proxy_link = "PL" .. multi.randomString(12)
if multi.integration then
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
GLOBAL[c.proxy_link] = c
local function check() local function check()
return self.send:pop() return self.send:pop()
end end
self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.send = multi:newSystemThreadedQueue(self.name.."_S"):init()
self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init()
self.funcs = list self.funcs = list
self._funcs = copy(list) self._funcs = copy(list)
self.Type = multi.PROXY self.Type = multi.PROXY
self.TID = THREAD_ID self.TID = THREAD_ID
thread:newThread(function()
thread:newThread("Proxy_Handler_" .. multi.randomString(4), function()
while true do while true do
local data = thread.hold(check) local data = thread.hold(check)
if data then if data then
local func = table.remove(data, 1) -- Let's not hold the main threadloop
local sref = table.remove(data, 1) thread:newThread("Temp_Thread", function()
local ret local func = table.remove(data, 1)
if sref then local sref = table.remove(data, 1)
ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} local ret
else
ret = {_G[list[0]][func](multi.unpack(data))} if sref then
end ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))}
for i = 1,#ret do else
if type(ret[i]) == "table" and getmetatable(ret[i]) then ret = {_G[list[0]][func](multi.unpack(data))}
setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side!
end end
if ret[i] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values. for i = 1,#ret do
ret[i] = {_self_ref_ = true} if type(ret[i]) == "table" and ret[i].Type ~= nil and ret[i].Type ~= multi.PROXY then
ret[i] = "\1PARENT_REF"
end
if type(ret[i]) == "table" and getmetatable(ret[i]) then
setmetatable(ret[i],nil) -- remove that metatable, we do not need it on the other side!
end
if ret[i] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values.
ret[i] = "\1SELF_REF"
end
end end
end table.insert(ret, 1, func)
table.insert(ret, 1, func) self.recv:push(ret)
self.recv:push(ret) end)
end end
end end
end).OnError(multi.error) end).OnError(multi.error)
@ -130,7 +134,7 @@ function multi:newProxy(list)
else else
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local me = self local me = self
self.proc_name = proc_name local funcs = copy(self.funcs)
if multi.integration then if multi.integration then
GLOBAL = multi.integration.GLOBAL GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD THREAD = multi.integration.THREAD
@ -138,21 +142,13 @@ function multi:newProxy(list)
self.send = THREAD.waitFor(self.name.."_S"):init() self.send = THREAD.waitFor(self.name.."_S"):init()
self.recv = THREAD.waitFor(self.name.."_R"):init() self.recv = THREAD.waitFor(self.name.."_R"):init()
self.Type = multi.PROXY self.Type = multi.PROXY
for _,v in pairs(self.funcs) do for _,v in pairs(funcs) do
if type(v) == "table" then if type(v) == "table" then
-- We have a connection -- We have a connection
v[2]:init(proc_name) v[2]:init(proc_name)
self["_"..v[1]] = v[2] self[v[1]] = v[2]
v[2].Parent = self v[2].Parent = self
setmetatable(v[2],getmetatable(multi:newConnection())) setmetatable(v[2],getmetatable(multi:newConnection()))
self[v[1]] = multi:newConnection()
thread:newThread(function()
while true do
local data = thread.hold(self["_"..v[1]])
self[v[1]]:Fire(data)
end
end).OnError(multi.error)
else else
self[v] = thread:newFunction(function(self,...) self[v] = thread:newFunction(function(self,...)
if self == me then if self == me then
@ -166,8 +162,10 @@ function multi:newProxy(list)
me.recv:pop() me.recv:pop()
table.remove(data, 1) table.remove(data, 1)
for i=1,#data do for i=1,#data do
if type(data[i]) == "table" and data[i]._self_ref_ then if data[i] == "\1SELF_REF" then
data[i] = me data[i] = me
elseif data[i] == "\1PARENT_REF" then
data[i] = me.Parent
end end
end end
return multi.unpack(data) return multi.unpack(data)
@ -180,85 +178,31 @@ function multi:newProxy(list)
end end
end end
function c:getTransferable() function c:getTransferable()
local multi, thread = nil, nil
local cp = {} local cp = {}
local multi, thread = require("multi"):init()
cp.is_init = true
cp.proxy_link = self.proxy_link
cp.name = self.name cp.name = self.name
cp.funcs = copy(self._funcs) cp.funcs = copy(self._funcs)
cp._funcs = copy(self._funcs) cp.init = function(self)
cp.Type = self.Type local multi, thread = require("multi"):init()
cp.init = self.init if multi.integration then
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
local proxy = THREAD.waitFor(self.proxy_link)
proxy.funcs = self.funcs
return proxy:init()
end
return cp return cp
end end
self:create(c)
return c return c
end end
local targets = {} local targets = {}
local references = {} local references = {}
local nFunc = 0
function multi:newTargetedFunction(ID, proxy, name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_TFunc_"..nFunc
end
nFunc = nFunc + 1
multi:executeOnProcess(proxy.proc_name, function(proc, name, func)
proc.jobqueue:registerFunction(name, func)
end, name, func)
return thread:newFunction(function(...)
return multi:executeOnProcess(proxy.proc_name, function(proc, name, ID, ...)
local multi, thread = require("multi"):init()
local id = proc:pushJob(ID, name, ...)
local rets
local tjq = THREAD.get(proc.Name .. "_target_rtq_" .. ID):init()
return thread.hold(function()
local data = tjq:peek()
if data and data[1] == id then
tjq:pop()
table.remove(data, 1)
return multi.unpack(data) or multi.NIL
end
end)
-- proc.jobqueue.OnJobCompleted(function(jid, ...)
-- if id==jid then
-- rets = {...}
-- print("Got!")
-- end
-- end)
-- return thread.hold(function()
-- if rets then
-- return multi.unpack(rets) or multi.NIL
-- end
-- end)
end, name, ID, ...)
end, holup), name
end
multi.executeOnProcess = thread:newFunction(function(self, name, func, ...)
local queue = THREAD.get(name .. "_local_proc")
local queueR = THREAD.get(name .. "_local_return")
if queue and queueR then
local multi, thread = require("multi"):init()
local id = multi.randomString(8)
queue = queue:init()
queueR = queueR:init()
queue:push({func, id, ...})
return thread.hold(function()
local data = queueR:peek()
if data and data[1] == id then
queueR:pop()
table.remove(data, 1)
return multi.unpack(data) or multi.NIL
end
end)
else
return nil, "Unable to find a process queue with name: '" .. name .. "'"
end
end, true)
local jid = -1 local jid = -1
function multi:newSystemThreadedProcessor(cores) function multi:newSystemThreadedProcessor(cores)
@ -279,69 +223,16 @@ function multi:newSystemThreadedProcessor(cores)
c.OnObjectCreated = multi:newConnection() c.OnObjectCreated = multi:newConnection()
c.parent = self c.parent = self
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores) c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
c.local_cmd = multi:newSystemThreadedQueue(name .. "_local_proc"):init()
c.local_cmd_return = multi:newSystemThreadedQueue(name .. "_local_return"):init()
c.jobqueue:registerFunction("STP_enable_targets",function(name)
local multi, thread = require("multi"):init()
local qname = name .. "_tq_" .. THREAD_ID
local rqname = name .. "_rtq_" .. THREAD_ID
local tjq = multi:newSystemThreadedQueue(qname):init()
local trq = multi:newSystemThreadedQueue(rqname):init()
multi:newThread("TargetedJobHandler", function()
local th
while true do
local dat = thread.hold(function()
return tjq:pop()
end)
if dat then
th = thread:newThread("JQ-TargetThread",function()
local name = table.remove(dat, 1)
local jid = table.remove(dat, 1)
local func = _G[name]
local args = table.remove(dat, 1)
th.OnError(function(self,err)
-- We want to pass this to the other calling thread incase
trq:push{jid, err}
end)
trq:push{jid, func(multi.unpack(args))}
end)
end
end
end).OnError(multi.error)
end)
c.jobqueue:registerFunction("STP_GetThreadCount",function()
return _G["__THREADS"]
end)
c.jobqueue:registerFunction("STP_GetTaskCount",function()
return _G["__TASKS"]
end)
function c:pushJob(ID, name, ...) function c:pushJob(ID, name, ...)
local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init() local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init()
tq:push{name, jid, {...}} tq:push{name, jid, multi.pack(...)}
jid = jid - 1 jid = jid - 1
return jid + 1 return jid + 1
end end
c.jobqueue:doToAll(function(name)
STP_enable_targets(name)
_G["__THREADS"] = 0
_G["__TASKS"] = 0
end, name.."_target")
c.jobqueue:registerFunction("packObj",function(obj) c.jobqueue:registerFunction("packObj",function(obj)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
obj.getThreadID = function() -- Special functions we are adding
return THREAD_ID
end
obj.getUniqueName = function(self)
return self.__link_name
end
local list = multi:chop(obj) local list = multi:chop(obj)
obj.__link_name = list[0] obj.__link_name = list[0]
@ -354,14 +245,12 @@ function multi:newSystemThreadedProcessor(cores)
c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...) c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local obj = thread:newThread(name, func, ...) local obj = thread:newThread(name, func, ...)
_G["__THREADS"] = _G["__THREADS"] + 1
return packObj(obj) return packObj(obj)
end, true) end, true)
c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...) c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
local obj = multi[obj](multi, func, ...) local obj = multi[obj](multi, func, ...)
_G["__TASKS"] = _G["__TASKS"] + 1
return packObj(obj) return packObj(obj)
end, true) end, true)
@ -372,12 +261,13 @@ function multi:newSystemThreadedProcessor(cores)
"newEvent", "newEvent",
"newAlarm", "newAlarm",
"newStep", "newStep",
"newTStep" "newTStep",
"newService"
} }
for _, method in pairs(implement) do for _, method in pairs(implement) do
c[method] = function(self, ...) c[method] = function(self, ...)
proxy = self.spawnTask(method, ...):init(self.Name) proxy = self.spawnTask(method, ...):init()
references[proxy] = self references[proxy] = self
return proxy return proxy
end end
@ -467,68 +357,5 @@ function multi:newSystemThreadedProcessor(cores)
return loads return loads
end, true) end, true)
local check = function()
return c.local_cmd:pop()
end
thread:newThread(function()
while true do
local data = thread.hold(check)
if data then
thread:newThread(function()
local func = table.remove(data, 1)
local id = table.remove(data, 1)
local ret = {id, func(c, multi.unpack(data))}
c.local_cmd_return:push(ret)
end).OnError(multi.error)
end
end
end).OnError(multi.error)
return c return c
end end
-- Modify thread.hold to handle proxies
local thread_ref = thread.hold
function thread.hold(n, opt)
if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then
local ready = false
local args
local id = n.getThreadID()
local name = n:getUniqueName()
local func = multi:newTargetedFunction(id, n, "conn_"..multi.randomString(8), function(_name)
local multi, thread = require("multi"):init()
local obj = _G[_name]
local rets = {thread.hold(obj)}
for i,v in pairs(rets) do
if v.Type then
rets[i] = {_self_ref_ = "parent"}
end
end
return multi.unpack(rets)
end)
local conn
local args
handle = func(name)
conn = handle.OnReturn(function(...)
ready = true
args = {...}
handle.OnReturn:Unconnect(conn)
end)
local ret = {thread_ref(function()
if ready then
return multi.unpack(args) or multi.NIL
end
end, opt)}
for i,v in pairs(ret) do
if type(v) == "table" and v._self_ref_ == "parent" then
ret[i] = n.Parent
end
end
return multi.unpack(ret)
else
return thread_ref(n, opt)
end
end