thread.hold(proxy.conn)

This commit is contained in:
Ryan Ward 2023-05-27 00:10:21 -04:00
parent af38ebbb81
commit 5c03b34290
9 changed files with 249 additions and 50 deletions

View File

@ -74,6 +74,9 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds
Added
---
- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn)
- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in
- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to
- multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))`
- multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread
@ -99,7 +102,7 @@ Added
```
Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread.
There are currently limitations to proxies. Connection proxy do not receive events on the non thread side. So connection metamethods do not work! Also you cannot use the proxy holds. For full features develop using a systemThreadedConnection() which does support all connection features. I planned on using STCs originally, but decided not to because I didn't want proxy objects to affect the non thread side of things! Subscribing to an event that isn't on the thread being proxied would cause the object to no longer be a proxy.
There are currently limitations to proxies. Connection proxy do not receive events on the non thread side. So connection metamethods do not work! thread.hold(proxy.conn) does work! The backend to get this to work was annoying :P
This event is subscribed to on the proxy threads side of things!
@ -114,7 +117,28 @@ Added
- STP:newThread(...)
- STP:newFunction(...)
If you would like to connect to a "STP Connection" object you can do so in a STP Function using hold and connect to the function OnReturn event or have the function wait (When in a coroutine it will only pause execution for that coroutine(multi:newThread(...))). The function is still runs on the Thread that the STP is running on. There is no guarantee that the function will run on the same thread each time, unlike with the multi objects/cothreads. Those stay on the systhread they are created on.
```lua
package.path = "?/init.lua;?.lua;"..package.path
multi, thread = require("multi"):init({print=true})
THREAD, GLOBAL = require("multi.integration.lanesManager"):init()
stp = multi:newSystemThreadedProcessor()
alarm = stp:newAlarm(3)
alarm.OnRing:Connect(function(alarm)
print("Hmm...", THREAD_NAME)
end)
thread:newThread(function()
print("Holding...")
local a = thread.hold(alarm.OnRing) -- it works :D
print("We work!")
end)
multi:mainloop()
```
- thread:newProcessor(name) -- works mostly like a normal process, but all objects are wrapped within a thread. So if you create a few loops, you can use thread.hold() call threaded functions and wait and use all features that using coroutines provide.
- multi.Processors:getHandler() -- returns the thread handler for a process
@ -312,6 +336,7 @@ Removed
Fixed
---
- multi.isMainThread was not properly handled in each integration. This has been resolved.
- Issue with pseudo threading env's being messed up. Required removal of getName and getID!
- connections being multiplied together would block the entire connection object from pushing events! This is not the desired effect I wanted. Now only the connection reference involved in the multiplication is locked!
- multi:reallocate(processor, index) has been fixed to work with the current changes of the library.

View File

@ -24,7 +24,7 @@ SOFTWARE.
local multi, thread = require("multi"):init()
if not (GLOBAL and THREAD) then
GLOBAL, THREAD = multi.integration.GLOBAL,multi.integration.THREAD
GLOBAL, THREAD = multi.integration.GLOBAL, multi.integration.THREAD
else
lanes = require("lanes")
end
@ -34,19 +34,29 @@ function multi:newSystemThreadedQueue(name)
local c = {}
c.Name = name
c.linda = lanes.linda()
function c:push(v)
self.linda:send("Q", v)
end
function c:pop()
return ({self.linda:receive(0, "Q")})[2]
end
function c:peek()
return self.linda:get("Q")
end
function c:init()
return self
end
GLOBAL[name or "_"] = c
if multi.isMainThread then
multi.integration.GLOBAL[name] = c
else
GLOBAL[name] = c
end
return c
end
@ -56,10 +66,6 @@ function multi:newSystemThreadedTable(name)
c.link = lanes.linda()
c.Name = name
-- function c:getIndex()
-- return c.link:dump()
-- end
function c:init()
return self
end
@ -73,7 +79,12 @@ function multi:newSystemThreadedTable(name)
end
})
GLOBAL[name or "_"] = c
if multi.isMainThread then
multi.integration.GLOBAL[name] = c
else
GLOBAL[name] = c
end
return c
end
@ -90,9 +101,9 @@ function multi:newSystemThreadedJobQueue(n)
function c:isEmpty()
return queueJob:peek()==nil
end
function c:doToAll(func)
function c:doToAll(func,...)
for i=1,c.cores do
doAll:push{ID,func}
doAll:push{ID,func,...}
end
ID = ID + 1
return self
@ -143,11 +154,12 @@ function multi:newSystemThreadedJobQueue(n)
end
end)
for i=1,c.cores do
multi:newSystemThread("SystemThreadedJobQueue_"..multi.randomString(4),function(queue)
multi:newSystemThread("STJQ_"..multi.randomString(8),function(queue)
local multi, thread = require("multi"):init()
local idle = os.clock()
local clock = os.clock
local ref = 0
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("JobHandler",function()
while true do
@ -170,9 +182,10 @@ function multi:newSystemThreadedJobQueue(n)
end)
if dat then
if dat[1]>ref then
ref = table.remove(dat, 1)
func = table.remove(dat, 1)
idle = clock()
ref = dat[1]
dat[2]()
func(unpack(dat))
doAll:pop()
end
end
@ -324,7 +337,11 @@ function multi:newSystemThreadedConnection(name)
return self
end
GLOBAL[name] = c
if multi.isMainThread then
multi.integration.GLOBAL[name] = c
else
GLOBAL[name] = c
end
return c
end

View File

@ -131,6 +131,9 @@ function multi.InitSystemThreadErrorHandler()
while true do
thread.yield()
_,data = __ConsoleLinda:receive(0, "Q")
if data then
print(data[1])
end
for i = #threads, 1, -1 do
temp = threads[i]
status = temp.thread.status

View File

@ -162,6 +162,7 @@ function multi:newSystemThreadedJobQueue(n)
local lastProc = clock()
local queueAll = love.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do

View File

@ -66,6 +66,7 @@ multi.integration = {}
local THREAD = require("multi.integration.loveManager.threads")
local GLOBAL = THREAD.getGlobal()
local THREAD_ID = 1
multi.isMainThread = true
function multi:newSystemThread(name, func, ...)
local c = {}

View File

@ -152,6 +152,7 @@ function multi:newSystemThreadedJobQueue(n)
local lastProc = clock()
local queueAll = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueAll")
local registry = {}
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do

View File

@ -35,7 +35,7 @@ local function _INIT(luvitThread, timer)
end
-- Step 1 get setup threads on luvit... Sigh how do i even...
local multi, thread = require("multi").init()
isMainThread = true
multi.isMainThread = true
function multi:canSystemThread()
return true
end

View File

@ -31,6 +31,7 @@ if multi.integration then
end
}
end
multi.isMainThread = true
local activator = require("multi.integration.pseudoManager.threads")
local GLOBAL, THREAD = activator.init(thread)

View File

@ -35,16 +35,27 @@ function multi:chop(obj)
if type(v) == "function" then
table.insert(list, i)
elseif type(v) == "table" and v.Type == multi.CONNECTOR then
table.insert(list, {i, multi:newProxy(multi:chop(v)):init()})
-- local stc = "stc_"..list[0].."_"..i
-- list[-1][#list[-1] + 1] = {i, stc}
-- list[#list+1] = i
-- obj[stc] = multi:newSystemThreadedConnection(stc):init()
-- obj["_"..i.."_"] = function(...)
-- return obj[stc](...)
-- end
v.getThreadID = function() -- Special function we are adding
return THREAD_ID
end
v.getUniqueName = function(self)
return self.__link_name
end
local l = multi:chop(v)
v.__link_name = l[0]
v.__name = i
table.insert(list, {i, multi:newProxy(l):init()})
end
end
table.insert(list, "isConnection")
if obj.Type == multi.CONNECTOR then
obj.isConnection = function() return true end
else
obj.isConnection = function() return false end
end
return list
end
@ -97,15 +108,14 @@ function multi:newProxy(list)
THREAD = multi.integration.THREAD
self.send = THREAD.waitFor(self.name.."_S")
self.recv = THREAD.waitFor(self.name.."_R")
self.Type = multi.PROXY
for _,v in pairs(self.funcs) do
if type(v) == "table" then
-- We got a connection
v[2]:init()
--setmetatable(v[2],getmetatable(multi:newConnection()))
self[v[1]] = v[2]
v[2].Parent = self
else
lastObj = self
self[v] = thread:newFunction(function(self,...)
if self == me then
me.send:push({v, true, ...})
@ -119,7 +129,6 @@ function multi:newProxy(list)
table.remove(data, 1)
for i=1,#data do
if type(data[i]) == "table" and data[i]._self_ref_ then
-- So if we get a self return as a return, we should return the proxy!
data[i] = me
end
end
@ -136,9 +145,40 @@ function multi:newProxy(list)
return c
end
function multi:newSystemThreadedProcessor(name, cores)
multi.PROXY = "proxy"
local name = name or "STP_"..multi.randomString(4) -- set a random name if none was given.
local targets = {}
local nFunc = 0
function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_TFunc_"..nFunc
end
nFunc = nFunc + 1
proc.jobqueue:registerFunction(name, func)
return thread:newFunction(function(...)
local id = proc:pushJob(ID, name, ...)
local link
local rets
link = proc.jobqueue.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
end
end)
return thread.hold(function()
if rets then
return multi.unpack(rets) or multi.NIL
end
end)
end, holup), name
end
local jid = -1
function multi:newSystemThreadedProcessor(cores)
local name = "STP_"..multi.randomString(4) -- set a random name if none was given.
local autoscale = autoscale or false -- Will scale up the number of cores that the process uses.
local c = {}
@ -154,54 +194,123 @@ function multi:newSystemThreadedProcessor(name, cores)
c.OnObjectCreated = multi:newConnection()
c.parent = self
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init()
c.jobqueue:registerFunction("enable_targets",function(name)
local multi, thread = require("multi"):init()
local qname = THREAD_NAME .. "_t_queue"
local targetedQueue = THREAD.waitFor(name):init()
local tjq = multi:newSystemThreadedQueue(qname):init()
targetedQueue:push({tonumber(THREAD_ID), qname})
multi:newThread("TargetedJobHandler", function()
local queueReturn = _G["__QR"]
while true do
local dat = thread.hold(function()
return tjq:pop()
end)
if dat then
thread:newThread("test",function()
local name = table.remove(dat, 1)
local jid = table.remove(dat, 1)
local args = table.remove(dat, 1)
queueReturn:push{jid, _G[name](multi.unpack(args)), queue}
end).OnError(multi.error)
end
end
end).OnError(multi.error)
end)
function c:pushJob(ID, name, ...)
targets[ID]:push{name, jid, {...}}
jid = jid - 1
return jid + 1
end
c.jobqueue:doToAll(function(name)
enable_targets(name)
end, name.."_target")
local count = 0
while count < c.cores do
local dat = c.targetedQueue:pop()
if dat then
targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init()
count = count + 1
end
end
c.jobqueue:registerFunction("packObj",function(obj)
local multi, thread = require("multi"):init()
obj.getThreadID = function() -- Special function we are adding
return THREAD_ID
end
obj.getUniqueName = function(self)
return self.__link_name
end
local list = multi:chop(obj)
obj.__link_name = list[0]
local proxy = multi:newProxy(list):init()
return proxy
end)
c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...)
local multi, thread = require("multi"):init()
local proxy = multi:newProxy(multi:chop(thread:newThread(name, func, ...))):init()
return proxy
local obj = thread:newThread(name, func, ...)
return packObj(obj)
end, true)
c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...)
local multi, thread = require("multi"):init()
local obj = multi[obj](multi, func, ...)
local proxy = multi:newProxy(multi:chop(obj)):init()
return proxy
return packObj(obj)
end, true)
function c:newLoop(func, notime)
return self.spawnTask("newLoop", func, notime):init()
proxy = self.spawnTask("newLoop", func, notime):init()
proxy.__proc = self
return proxy
end
function c:newTLoop(func, time)
return self.spawnTask("newTLoop", func, time):init()
proxy = self.spawnTask("newTLoop", func, time):init()
proxy.__proc = self
return proxy
end
function c:newUpdater(skip, func)
return self.spawnTask("newUpdater", func, notime):init()
proxy = self.spawnTask("newUpdater", func, notime):init()
proxy.__proc = self
return proxy
end
function c:newEvent(task, func)
return self.spawnTask("newEvent", task, func):init()
proxy = self.spawnTask("newEvent", task, func):init()
proxy.__proc = self
return proxy
end
function c:newAlarm(set, func)
return self.spawnTask("newAlarm", set, func):init()
proxy = self.spawnTask("newAlarm", set, func):init()
proxy.__proc = self
return proxy
end
function c:newStep(start, reset, count, skip)
return self.spawnTask("newStep", start, reset, count, skip):init()
proxy = self.spawnTask("newStep", start, reset, count, skip):init()
proxy.__proc = self
return proxy
end
function c:newTStep(start ,reset, count, set)
return self.spawnTask("newTStep", start, reset, count, set):init()
proxy = self.spawnTask("newTStep", start, reset, count, set):init()
proxy.__proc = self
return proxy
end
c.OnObjectCreated(function(proc, obj)
if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then
return multi.error("Invalid type!")
end
end)
function c:getHandler()
-- Not needed
end
@ -219,7 +328,9 @@ function multi:newSystemThreadedProcessor(name, cores)
end
function c:newThread(name, func, ...)
return self.spawnThread(name, func, ...):init()
proxy = self.spawnThread(name, func, ...):init()
proxy.__proc = self
return proxy
end
function c:newFunction(func, holdme)
@ -249,3 +360,42 @@ function multi:newSystemThreadedProcessor(name, cores)
return c
end
-- Modify thread.hold to handle proxies
local thread_ref = thread.hold
function thread.hold(n, opt)
if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then
local ready = false
local args
local id = n.getThreadID()
local name = n:getUniqueName()
local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name)
local multi, thread = require("multi"):init()
local obj = _G[_name]
local rets = {thread.hold(obj)}
for i,v in pairs(rets) do
if v.Type then
rets[i] = {_self_ref_ = "parent"}
end
end
return unpack(rets)
end)
func(name).OnReturn(function(...)
ready = true
args = {...}
end)
local ret = {thread_ref(function()
if ready then
return multi.unpack(args) or multi.NIL
end
end, opt)}
for i,v in pairs(ret) do
if type(v) == "table" and v._self_ref_ == "parent" then
ret[i] = n.Parent
end
end
return unpack(ret)
else
return thread_ref(n, opt)
end
end