Proxies work with connections now :D

This commit is contained in:
Ryan Ward 2023-05-24 23:17:18 -04:00
parent ab9e949b68
commit de9b08fa2e
4 changed files with 103 additions and 30 deletions

View File

@ -74,6 +74,33 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds
Added Added
--- ---
- multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))`
- multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread
**Note:** Objects with __index=table do not work with the proxy object! The object must have that function in it's own table for proxy to pick it up and have it work properly. Connections on a proxy allow you to subscribe to an event on the thread side of things. The function that is being connected to happens on the thread!
- multi:newSystemThreadedProcessor(name) -- Works like newProcessor(name) each object created returns a proxy object that you can use to interact with the objects on the system thread
```lua
package.path = "?/init.lua;?.lua;"..package.path
multi, thread = require("multi"):init({print=true})
THREAD, GLOBAL = require("multi.integration.lanesManager"):init()
stp = multi:newSystemThreadedProcessor("Test STP")
alarm = stp:newAlarm(3)
alarm.OnRing:Connect(function(alarm)
print("Hmm...", THREAD_NAME)
end)
```
Output:
```
Hmm... SystemThreadedJobQueue_A5tp
```
Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread.
There are currently limitations to proxies. Connection proxy do not receive events on the non thread side. So connection metamethods do not work! Also you cannot use the proxy holds. For full features develop using a systemThreadedConnection() which does support all connection features. I planned on using STCs originally, but decided not to because I didn't want proxy objects to affect the non thread side of things! Subscribing to an event that isn't on the thread being proxied would cause the object to no longer be a proxy.
- thread:newProcessor(name) -- works mostly like a normal process, but all objects are wrapped within a thread. So if you create a few loops, you can use thread.hold() call threaded functions and wait and use all features that using coroutines provide. - thread:newProcessor(name) -- works mostly like a normal process, but all objects are wrapped within a thread. So if you create a few loops, you can use thread.hold() call threaded functions and wait and use all features that using coroutines provide.
- multi.Processors:getHandler() -- returns the thread handler for a process - multi.Processors:getHandler() -- returns the thread handler for a process
- multi.OnPriorityChanged(self, priority) -- Connection is triggered whenever the priority of an object is changed! - multi.OnPriorityChanged(self, priority) -- Connection is triggered whenever the priority of an object is changed!

View File

@ -319,7 +319,7 @@ function multi:newConnection(protect,func,kill)
if conn and not conn.lock then if conn and not conn.lock then
conn.lock = function() end conn.lock = function() end
for i = 1, #fast do for i = 1, #fast do
if conn.ref == fast[i] then if fast[conn.ref] == fast[i] then
fast[i] = conn.lock fast[i] = conn.lock
return self return self
end end
@ -334,7 +334,7 @@ function multi:newConnection(protect,func,kill)
if conn and conn.lock then if conn and conn.lock then
for i = 1, #fast do for i = 1, #fast do
if conn.lock == fast[i] then if conn.lock == fast[i] then
fast[i] = conn.ref fast[i] = fast[conn.ref]
return self return self
end end
end end
@ -376,7 +376,7 @@ function multi:newConnection(protect,func,kill)
function c:Unconnect(conn) function c:Unconnect(conn)
for i = 1, #fast do for i = 1, #fast do
if conn.ref == fast[i] then if fast[conn.ref] == fast[i] then
return table.remove(fast, i), i return table.remove(fast, i), i
end end
end end
@ -445,7 +445,8 @@ function multi:newConnection(protect,func,kill)
rawset(t,k,v) rawset(t,k,v)
end, end,
}) })
temp.ref = func temp.ref = multi.randomString(24)
fast[temp.ref] = func
temp.name = name temp.name = name
if self.rawadd then if self.rawadd then
self.rawadd = false self.rawadd = false
@ -718,7 +719,7 @@ function multi:newTimer()
end end
--Core Actors --Core Actors
function multi:newEvent(task) function multi:newEvent(task, func)
local c=self:newBase() local c=self:newBase()
c.Type=multi.EVENT c.Type=multi.EVENT
local task = task or function() end local task = task or function() end
@ -736,6 +737,9 @@ function multi:newEvent(task)
return self return self
end end
c.OnEvent = self:newConnection():fastMode() c.OnEvent = self:newConnection():fastMode()
if func then
c.OnEvent(func)
end
self:setPriority("core") self:setPriority("core")
c:setName(c.Type) c:setName(c.Type)
self:create(c) self:create(c)
@ -768,7 +772,7 @@ function multi:newUpdater(skip, func)
return c return c
end end
function multi:newAlarm(set) function multi:newAlarm(set, func)
local c=self:newBase() local c=self:newBase()
c.Type=multi.ALARM c.Type=multi.ALARM
c:setPriority("Low") c:setPriority("Low")
@ -801,6 +805,9 @@ function multi:newAlarm(set)
self.Parent.Pause(self) self.Parent.Pause(self)
return self return self
end end
if func then
c.OnRing(func)
end
c:setName(c.Type) c:setName(c.Type)
self:create(c) self:create(c)
return c return c

View File

@ -29,11 +29,20 @@ local multi, thread = require("multi"):init()
function multi:chop(obj) function multi:chop(obj)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
list = {[0] = multi.randomString(12)} local list = {[0] = multi.randomString(12)}
_G[list[0]] = obj _G[list[0]] = obj
for i,v in pairs(obj) do for i,v in pairs(obj) do
if type(v) == "function" then if type(v) == "function" then
list[#list+1] = i table.insert(list, i)
elseif type(v) == "table" and v.Type == multi.CONNECTOR then
table.insert(list, {i, multi:newProxy(multi:chop(v)):init()})
-- local stc = "stc_"..list[0].."_"..i
-- list[-1][#list[-1] + 1] = {i, stc}
-- list[#list+1] = i
-- obj[stc] = multi:newSystemThreadedConnection(stc):init()
-- obj["_"..i.."_"] = function(...)
-- return obj[stc](...)
-- end
end end
end end
return list return list
@ -55,6 +64,7 @@ function multi:newProxy(list)
self.send = multi:newSystemThreadedQueue(self.name.."_S"):init() self.send = multi:newSystemThreadedQueue(self.name.."_S"):init()
self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init() self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init()
self.funcs = list self.funcs = list
self.conns = list[-1]
thread:newThread(function() thread:newThread(function()
while true do while true do
local data = thread.hold(check) local data = thread.hold(check)
@ -65,10 +75,15 @@ function multi:newProxy(list)
ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))} ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))}
else else
ret = {_G[list[0]][func](multi.unpack(data))} ret = {_G[list[0]][func](multi.unpack(data))}
end end
if ret[1] == _G[list[0]] then for i = 1,#ret do
-- We cannot return itself, that return can contain bad values. if type(ret[i]) == "table" and getmetatable(ret[i]) then
ret[1] = {_self_ref_ = true} setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side!
end
if ret[i] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values.
ret[i] = {_self_ref_ = true}
end
end end
table.insert(ret, 1, func) table.insert(ret, 1, func)
self.recv:push(ret) self.recv:push(ret)
@ -83,25 +98,36 @@ function multi:newProxy(list)
self.send = THREAD.waitFor(self.name.."_S") self.send = THREAD.waitFor(self.name.."_S")
self.recv = THREAD.waitFor(self.name.."_R") self.recv = THREAD.waitFor(self.name.."_R")
for _,v in pairs(self.funcs) do for _,v in pairs(self.funcs) do
self[v] = thread:newFunction(function(self,...) if type(v) == "table" then
if self == me then -- We got a connection
me.send:push({v, true, ...}) v[2]:init()
else
me.send:push({v, false, ...}) --setmetatable(v[2],getmetatable(multi:newConnection()))
end
return thread.hold(function() self[v[1]] = v[2]
local data = me.recv:peek() else
if data and data[1] == v then self[v] = thread:newFunction(function(self,...)
me.recv:pop() if self == me then
table.remove(data, 1) me.send:push({v, true, ...})
if type(data[1]) == "table" and data[1]._self_ref_ then else
-- So if we get a self return as a return, we should return the proxy! me.send:push({v, false, ...})
data[1] = me
end
return multi.unpack(data)
end end
end) return thread.hold(function()
end, true) local data = me.recv:peek()
if data and data[1] == v then
me.recv:pop()
table.remove(data, 1)
for i=1,#data do
if type(data[i]) == "table" and data[i]._self_ref_ then
-- So if we get a self return as a return, we should return the proxy!
data[i] = me
end
end
return multi.unpack(data)
end
end)
end, true)
end
end end
return self return self
end end
@ -154,6 +180,18 @@ function multi:newSystemThreadedProcessor(name, cores)
return self.spawnTask("newUpdater", func, notime):init() return self.spawnTask("newUpdater", func, notime):init()
end end
function c:newEvent(task, func)
return self.spawnTask("newEvent", task, func):init()
end
function c:newAlarm(set, func)
return self.spawnTask("newAlarm", set, func):init()
end
function c:newStep(start, reset, count, skip)
return self.spawnTask("newStep", start, reset, count, skip):init()
end
c.OnObjectCreated(function(proc, obj) c.OnObjectCreated(function(proc, obj)
if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then
return multi.error("Invalid type!") return multi.error("Invalid type!")

View File

@ -151,6 +151,7 @@ multi:newThread("Scheduler Thread",function()
end) end)
multi:mainloop() multi:mainloop()
end).OnError(multi.error) end).OnError(multi.error)
connOut = multi:newSystemThreadedConnection("ConnectionNAMEHERE"):init() connOut = multi:newSystemThreadedConnection("ConnectionNAMEHERE"):init()
a=0 a=0
connOut(function(arg) connOut(function(arg)