This commit is contained in:
Ryan Ward 2023-06-17 21:33:34 -04:00
parent 257ed03728
commit 6fe10b22ab
4 changed files with 107 additions and 89 deletions

View File

@ -516,6 +516,8 @@ local function isolateFunction(func, env)
end
end
multi.isolateFunction = isolateFunction
function multi:Break()
self:Pause()
self.Active=nil

View File

@ -345,3 +345,4 @@ function multi:newSystemThreadedConnection(name)
return c
end
require("multi.integration.sharedExtensions")

View File

@ -185,7 +185,6 @@ multi.integration = {} -- for module creators
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
require("multi.integration.lanesManager.extensions")
require("multi.integration.sharedExtensions")
return {
init = function()
return GLOBAL, THREAD

View File

@ -1,4 +1,4 @@
--[[
--[[ todo finish the targeted job!
MIT License
Copyright (c) 2023 Ryan Ward
@ -22,6 +22,26 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
function copy(obj)
if type(obj) ~= 'table' then return obj end
local res = {}
for k, v in pairs(obj) do res[copy(k)] = copy(v) end
return res
end
function tprint (tbl, indent)
if not indent then indent = 0 end
for k, v in pairs(tbl) do
formatting = string.rep(" ", indent) .. k .. ": "
if type(v) == "table" then
print(formatting)
tprint(v, indent+1)
else
print(formatting .. tostring(v))
end
end
end
local multi, thread = require("multi"):init()
-- Returns a handler that allows a user to interact with an object on another thread!
@ -64,10 +84,12 @@ function multi:newProxy(list)
local c = {}
c.name = multi.randomString(12)
c.is_init = false
function c:init()
function c:init(proc_name)
local multi, thread = nil, nil
if THREAD_ID>0 then
if not(c.is_init) then
c.is_init = true
local multi, thread = require("multi"):init()
local function check()
return self.send:pop()
@ -75,7 +97,9 @@ function multi:newProxy(list)
self.send = multi:newSystemThreadedQueue(self.name.."_S"):init()
self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init()
self.funcs = list
self.conns = list[-1]
self._funcs = copy(list)
self.Type = multi.PROXY
self.TID = THREAD_ID
thread:newThread(function()
while true do
local data = thread.hold(check)
@ -104,25 +128,31 @@ function multi:newProxy(list)
end).OnError(print)
return self
else
print("INIT IN",THREAD_NAME)
local multi, thread = require("multi"):init()
local me = self
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
self.send = THREAD.waitFor(self.name.."_S")
self.recv = THREAD.waitFor(self.name.."_R")
self.proc_name = proc_name
if multi.integration then
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
self.send = THREAD.waitFor(self.name.."_S"):init()
self.recv = THREAD.waitFor(self.name.."_R"):init()
self.Type = multi.PROXY
for _,v in pairs(self.funcs) do
if type(v) == "table" then
-- We have a connection
v[2]:init()
print("Init Conn",v[1],THREAD_NAME)
v[2]:init(proc_name)
self["_"..v[1]] = v[2]
v[2].Parent = self
setmetatable(v[2],getmetatable(multi:newConnection()))
self[v[1]] = multi:newConnection()
thread:newThread(function()
print("HOLD:","_"..v[1],self["_"..v[1]].Type)
while true do
self[v[1]]:Fire(thread.hold(alarm["_"..v[1]]))
self[v[1]]:Fire(thread.hold(self["_"..v[1]]))
end
end)
else
@ -151,13 +181,24 @@ function multi:newProxy(list)
return self
end
end
function c:getTransferable()
local multi, thread = require("multi"):init()
local cp = {}
cp.name = self.name
cp.funcs = copy(self._funcs)
cp._funcs = copy(self._funcs)
cp.Type = self.Type
cp.init = self.init
return cp
end
return c
end
local targets = {}
local references = {}
local nFunc = 0
function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue
function multi:newTargetedFunction(ID, name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
@ -181,6 +222,12 @@ function multi:newTargetedFunction(ID, proc, name, func, holup) -- This register
end)
end, holup), name
end
-- local qname = name .. "_tq_" .. THREAD_ID
-- local rqname = name .. "_rtq_" .. THREAD_ID
local function getQueue(name)
return THREAD.waitFor(name):init()
end
local jid = -1
function multi:newSystemThreadedProcessor(cores)
@ -202,27 +249,30 @@ function multi:newSystemThreadedProcessor(cores)
c.OnObjectCreated = multi:newConnection()
c.parent = self
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init()
c.jobqueue:registerFunction("STP_enable_targets",function(name)
local multi, thread = require("multi"):init()
local qname = THREAD_NAME .. "_t_queue"
local targetedQueue = THREAD.waitFor(name):init()
local qname = name .. "_tq_" .. THREAD_ID
local rqname = name .. "_rtq_" .. THREAD_ID
local tjq = multi:newSystemThreadedQueue(qname):init()
targetedQueue:push({tonumber(THREAD_ID), qname})
multi:newThread("TargetedJobHandler", function()
local queueReturn = _G["__QR"]
local th
while true do
local dat = thread.hold(function()
return tjq:pop()
end)
if dat then
thread:newThread("JQ-TargetThread",function()
th = thread:newThread("JQ-TargetThread",function()
local name = table.remove(dat, 1)
local jid = table.remove(dat, 1)
local func = table.remove(dat, 1)
local args = table.remove(dat, 1)
queueReturn:push{jid, _G[name](multi.unpack(args)), queue}
end).OnError(multi.error)
th.OnError(function(self,err)
-- We want to pass this to the other calling thread incase
rqname:push{jid, err}
end)
rqname:push{jid, func(multi.unpack(args))}
end)
end
end
end).OnError(multi.error)
@ -248,16 +298,6 @@ function multi:newSystemThreadedProcessor(cores)
_G["__TASKS"] = 0
end, name.."_target")
local count = 0
while count < c.cores do
local dat = c.targetedQueue:pop()
if dat then
targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init()
table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list
count = count + 1
end
end
c.jobqueue:registerFunction("packObj",function(obj)
local multi, thread = require("multi"):init()
obj.getThreadID = function() -- Special functions we are adding
@ -290,46 +330,42 @@ function multi:newSystemThreadedProcessor(cores)
return packObj(obj)
end, true)
function c:newLoop(func, notime)
proxy = self.spawnTask("newLoop", func, notime):init()
proxy.__proc = self
local implement = {
"newLoop",
"newTLoop",
"newUpdater",
"newEvent",
"newAlarm",
"newStep",
"newTStep"
}
for _, method in pairs(implement) do
c[method] = function(self, ...)
proxy = self.spawnTask(method, ...):init(self.Name)
references[proxy] = self
return proxy
end
end
function c:newThread(name, func, ...)
proxy = self.spawnThread(name, func, ...):init(self.Name)
references[proxy] = self
table.insert(self.threads, proxy)
return proxy
end
function c:newTLoop(func, time)
proxy = self.spawnTask("newTLoop", func, time):init()
proxy.__proc = self
return proxy
function c:newFunction(func, holdme)
return c.jobqueue:newFunction(func, holdme)
end
function c:newUpdater(skip, func)
proxy = self.spawnTask("newUpdater", func, notime):init()
proxy.__proc = self
return proxy
end
function c:newEvent(task, func)
proxy = self.spawnTask("newEvent", task, func):init()
proxy.__proc = self
return proxy
end
function c:newAlarm(set, func)
proxy = self.spawnTask("newAlarm", set, func):init()
proxy.__proc = self
return proxy
end
function c:newStep(start, reset, count, skip)
proxy = self.spawnTask("newStep", start, reset, count, skip):init()
proxy.__proc = self
return proxy
end
function c:newTStep(start ,reset, count, set)
proxy = self.spawnTask("newTStep", start, reset, count, set):init()
proxy.__proc = self
return proxy
function c:newSharedTable(name)
if not name then multi.error("You must provide a name when creating a table!") end
local tbl_name = "TABLE_"..multi.randomString(8)
c.jobqueue:doToAll(function(tbl_name, interaction)
_G[interaction] = THREAD.waitFor(tbl_name):init()
end, tbl_name, name)
return multi:newSystemThreadedTable(tbl_name):init()
end
function c:getHandler()
@ -348,26 +384,6 @@ function multi:newSystemThreadedProcessor(cores)
return self.Name
end
function c:newThread(name, func, ...)
proxy = self.spawnThread(name, func, ...):init()
proxy.__proc = self
table.insert(self.threads, proxy)
return proxy
end
function c:newFunction(func, holdme)
return c.jobqueue:newFunction(func, holdme)
end
function c:newSharedTable(name)
if not name then multi.error("You must provide a name when creating a table!") end
local tbl_name = "TABLE_"..multi.randomString(8)
c.jobqueue:doToAll(function(tbl_name, interaction)
_G[interaction] = THREAD.waitFor(tbl_name):init()
end, tbl_name, name)
return multi:newSystemThreadedTable(tbl_name):init()
end
function c.run()
return self
end
@ -428,7 +444,7 @@ function thread.hold(n, opt)
local args
local id = n.getThreadID()
local name = n:getUniqueName()
local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name)
local func = multi:newTargetedFunction(id, references[n.Parent], "conn_"..multi.randomString(8), function(_name)
local multi, thread = require("multi"):init()
local obj = _G[_name]
local rets = {thread.hold(obj)}
@ -437,7 +453,7 @@ function thread.hold(n, opt)
rets[i] = {_self_ref_ = "parent"}
end
end
return unpack(rets)
return multi.unpack(rets)
end)
local conn
@ -460,7 +476,7 @@ function thread.hold(n, opt)
end
end
return unpack(ret)
return multi.unpack(ret)
else
return thread_ref(n, opt)
end