newProxy implemented

This commit is contained in:
Ryan Ward 2023-05-21 09:43:44 -04:00
parent ea4be86ae2
commit ab9e949b68
3 changed files with 100 additions and 60 deletions

View File

@ -568,14 +568,8 @@ function multi:Pause()
multi.print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()")
else
self.Active=false
local loop = self.Parent.Mainloop
for i=1,#loop do
if loop[i] == self then
multi.PausedObjects[self] = true
table.remove(loop,i)
break
end
end
self._Act = self.Act
self.Act = empty_func
end
return self
end
@ -589,8 +583,7 @@ function multi:Resume()
end
else
if self.Active==false then
table.insert(self.Parent.Mainloop,self)
multi.PausedObjects[self] = nil
self.Act = self._Act
self.Active=true
end
end
@ -668,6 +661,17 @@ function multi:newBase(ins)
c.Act=function() end
c.Parent=self
c.creationTime = clock()
function c:Pause()
c.Parent.Pause(self)
return self
end
function c:Resume()
c.Parent.Resume(self)
return self
end
if ins then
table.insert(self.Mainloop,ins,c)
else
@ -817,6 +821,7 @@ function multi:newLoop(func, notime)
return true
end
end
c.OnLoop = self:newConnection():fastMode()
if func then
@ -1232,6 +1237,10 @@ function thread._Requests()
end
end
function thread.exec(func)
func()
end
function thread.sleep(n)
thread._Requests()
thread.getRunningThread().lastSleep = clock()
@ -1755,8 +1764,8 @@ co_status = {
end
r1=nil r2=nil r3=nil r4=nil r5=nil
end,
["normal"] = function(thd,ref) end,
["running"] = function(thd,ref) end,
["normal"] = function(thd,ref) end,
["running"] = function(thd,ref) end,
["dead"] = function(thd,ref,task,i,th)
if ref.__processed then return end
if _ then
@ -1929,7 +1938,7 @@ function multi:mainloopRef()
for _D=#Loop,1,-1 do
__CurrentTask = Loop[_D]
ctask = __CurrentTask
ctask:Act()
if ctask then ctask:Act() end
__CurrentProcess = self
end
end

View File

@ -107,7 +107,7 @@ function multi:newSystemThreadedJobQueue(n)
return jid-1
end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
function c:newFunction(name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
@ -116,7 +116,6 @@ function multi:newSystemThreadedJobQueue(n)
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
print("Called!")
local id = c:pushJob(name,...)
local link
local rets
@ -130,19 +129,21 @@ function multi:newSystemThreadedJobQueue(n)
return multi.unpack(rets) or multi.NIL
end
end)
end,holup), name
end, holup), name
end
thread:newThread("JobQueueManager",function()
while true do
local job = thread.hold(function()
return queueReturn:pop()
end)
local id = table.remove(job,1)
c.OnJobCompleted:Fire(id,multi.unpack(job))
if job then
local id = table.remove(job,1)
c.OnJobCompleted:Fire(id,multi.unpack(job))
end
end
end)
for i=1,c.cores do
multi:newSystemThread("SystemThreadedJobQueue",function(queue)
multi:newSystemThread("SystemThreadedJobQueue_"..multi.randomString(4),function(queue)
local multi, thread = require("multi"):init()
local idle = os.clock()
local clock = os.clock
@ -176,7 +177,7 @@ function multi:newSystemThreadedJobQueue(n)
end
end
end
end)
end).OnError(print)
thread:newThread("IdleHandler",function()
while true do
thread.hold(function()
@ -184,9 +185,9 @@ function multi:newSystemThreadedJobQueue(n)
end)
THREAD.sleep(.01)
end
end)
end).OnError(print)
multi:mainloop()
end,i).priority = thread.Priority_Core
end,i).OnError(print)
end
return c
end

View File

@ -26,52 +26,84 @@ local multi, thread = require("multi"):init()
-- Returns a handler that allows a user to interact with an object on another thread!
-- Create on the thread that you want to interact with, send over the handle
function multi:newProxy(obj)
function multi:chop(obj)
local multi, thread = require("multi"):init()
list = {[0] = multi.randomString(12)}
_G[list[0]] = obj
for i,v in pairs(obj) do
if type(v) == "function" then
list[#list+1] = i
end
end
return list
end
function multi:newProxy(list)
local c = {}
c.name = multi.randomString(12)
function c:init()
if not multi.isMainThread then
local multi, thread = nil, nil
if THREAD_NAME then
local multi, thread = require("multi"):init()
local function check()
return self.send:pop()
end
self.send = multi:newSystemThreadedQueue(self.name.."_S"):init()
self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init()
self.ref = obj
self.funcs = {}
for i, v in pairs(obj) do
if type(v) == "function" then
self.funcs[#self.funcs] = i
end
end
self.funcs = list
thread:newThread(function()
while true do
local data = thread.hold(check)
local func = table.remove(data, 1)
local ret = {self.ref[func](multi.unpack(data))}
local sref = table.remove(data, 1)
local ret
if sref then
ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))}
else
ret = {_G[list[0]][func](multi.unpack(data))}
end
if ret[1] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values.
ret[1] = {_self_ref_ = true}
end
table.insert(ret, 1, func)
self.recv:push(ret)
end
end)
end).OnError(print)
return self
else
local multi, thread = require("multi"):init()
local me = self
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
self.send = THREAD.waitFor(self.name.."_S")
self.recv = THREAD.waitFor(self.name.."_R")
for _,v in pairs(self.funcs) do
self[v] = thread:newFunction(function(...)
self.send:push({v, ...})
self[v] = thread:newFunction(function(self,...)
if self == me then
me.send:push({v, true, ...})
else
me.send:push({v, false, ...})
end
return thread.hold(function()
local data = self.recv:peek()
if data[1] == v then
self.recv:pop()
thread.remove(data, 1)
local data = me.recv:peek()
if data and data[1] == v then
me.recv:pop()
table.remove(data, 1)
if type(data[1]) == "table" and data[1]._self_ref_ then
-- So if we get a self return as a return, we should return the proxy!
data[1] = me
end
return multi.unpack(data)
end
end)
end, true)
end
return self
end
end
@ -80,7 +112,7 @@ end
function multi:newSystemThreadedProcessor(name, cores)
local name = name or "STP_"multi.randomString(4) -- set a random name if none was given.
local name = name or "STP_"..multi.randomString(4) -- set a random name if none was given.
local autoscale = autoscale or false -- Will scale up the number of cores that the process uses.
local c = {}
@ -97,32 +129,30 @@ function multi:newSystemThreadedProcessor(name, cores)
c.parent = self
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
local spawnThread = c.jobqueue:newFunction(function(name, func, ...)
c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...)
local multi, thread = require("multi"):init()
print("hmm")
local proxy = multi:newProxy(thread:newThread(name, func, ...))
multi:newTask(function()
proxy:init()
end)
local proxy = multi:newProxy(multi:chop(thread:newThread(name, func, ...))):init()
return proxy
end, true)
local spawnTask = c.jobqueue:newFunction(function(name, func, ...)
c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...)
local multi, thread = require("multi"):init()
local proxy = multi:newProxy(multi[obj](multi, func))
multi:newTask(function()
proxy:init()
end)
local obj = multi[obj](multi, func, ...)
local proxy = multi:newProxy(multi:chop(obj)):init()
return proxy
end, true)
c.newLoop = thread:newFunction(function(self, func, notime)
return spawnTask("newLoop", func, notime):init()
end, true)
function c:newLoop(func, notime)
return self.spawnTask("newLoop", func, notime):init()
end
c.newUpdater = thread:newFunction(function(self, skip, func)
return spawnTask("newUpdater", func, notime):init()
end, true)
function c:newTLoop(func, time)
return self.spawnTask("newTLoop", func, time):init()
end
function c:newUpdater(skip, func)
return self.spawnTask("newUpdater", func, notime):init()
end
c.OnObjectCreated(function(proc, obj)
if not(obj.Type == multi.UPDATER or obj.Type == multi.LOOP) then
@ -146,9 +176,9 @@ function multi:newSystemThreadedProcessor(name, cores)
return self.Name
end
c.newThread = thread:newFunction(function(self, name, func, ...)
return spawnThread(name, func, ...):init()
end, true)
function c:newThread(name, func, ...)
return self.spawnThread(name, func, ...):init()
end
function c:newFunction(func, holdme)
return c.jobqueue:newFunction(func, holdme)