Working on 16.0.0 #53

Merged
rayaman merged 120 commits from v16.0.0 into master 2024-02-25 00:00:51 -05:00
13 changed files with 384 additions and 388 deletions
Showing only changes of commit d36204c87f - Show all commits

View File

@ -74,6 +74,7 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds
Added
---
- thread.hold will now use a custom hold method for objects with a `Hold` method. This is called like `obj:Hold(opt)`. The only argument passed is the optional options table that thread.hold can pass. There is an exception for connection objects. While they do contain a Hold method, the Hold method isn't used and is there for proxy objects, though they can be used in non proxy/thread situations. Hold returns all the arguments that the connection object was fired with.
- shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there.
```lua
package.path = "?/init.lua;?.lua;"..package.path
@ -119,10 +120,6 @@ Added
STJQ_cPXT8GOx We work! We work!!!
```
- STP:getLoad(type) -- returns a table where the index is the threadID and the value is the number of objects[type] running on that thread. `type`: "threads" for coroutines running or nil for all other objects running.
- multi:newTargetedFunction(ID, proc, name, func, holup) -- This is used internally to handle thread.hold(proxy.conn)
- proxy.getThreadID() -- Returns the threadID of the thread that the proxy is running in
- proxy:getUniqueName() -- Gets the special name that identifies the object on the thread the proxy refers to
- multi:chop(obj) -- We cannot directly interact with a local object on lanes, so we chop the object and set some globals on the thread side. Should use like: `mulit:newProxy(multi:chop(multi:newThread(function() ... end)))`
- multi:newProxy(ChoppedObject) -- Creates a proxy object that allows you to interact with an object on a thread
@ -148,67 +145,105 @@ Added
```
Internally the SystemThreadedProcessor uses a JobQueue to handle things. The proxy function allows you to interact with these objects as if they were on the main thread, though there actions are carried out on the main thread.
Connection proxies break the rules a bit. Normally methods should always work on the thread side, however for connections in order to have actions work on the thread side you would call the connection using `obj._connName` instead of calling `obj.connName`. This allows you to have more control over connection events. See example below:
Proxies can also be shared between threads, just remember to use proxy:getTransferable() before transferring and proxy:init() on the other end. (We need to avoid copying over coroutines)
The work done with proxies negates the usage of multi:newSystemThreadedConnection(), the only difference is you lose the metatables from connections.
You cannot connect directly to a proxy connection on the non proxy thread, you can however use proxy_conn:Hold() or thread.hold(proxy_conn) to emulate this, see below.
```lua
package.path = "?/init.lua;?.lua;"..package.path
multi, thread = require("multi"):init({print=true})
multi, thread = require("multi"):init({print=true, warn=true, error=true})
THREAD, GLOBAL = require("multi.integration.lanesManager"):init()
stp = multi:newSystemThreadedProcessor(8)
alarm = stp:newAlarm(3)
tloop = stp:newTLoop(nil, 1)
-- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made!
thread:newThread(function()
print("Hold on proxied connection", thread.hold(alarm._OnRing))
end)
multi:newSystemThread("Testing proxy copy",function(tloop)
local function tprint (tbl, indent)
if not indent then indent = 0 end
for k, v in pairs(tbl) do
formatting = string.rep(" ", indent) .. k .. ": "
if type(v) == "table" then
print(formatting)
tprint(v, indent+1)
else
print(formatting .. tostring(v))
end
end
end
local multi, thread = require("multi"):init()
tloop = tloop:init()
print("tloop type:",tloop.Type)
print("Testing proxies on other threads")
thread:newThread(function()
while true do
thread.hold(tloop.OnLoop)
print(THREAD_NAME,"Loopy")
end
end)
tloop.OnLoop(function(a)
print(THREAD_NAME, "Got loop...")
end)
multi:mainloop()
end, tloop:getTransferable()).OnError(multi.error)
alarm.OnRing(function(a)
print("OnRing",a, THREAD_NAME, THREAD_ID)
end)
print("alarm.OnRing", alarm.OnRing.Type)
print("alarm._OnRing", alarm._OnRing.Type)
print("tloop", tloop.Type)
thread:newThread(function()
print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing))
print("Holding...")
thread.hold(tloop.OnLoop)
print("Held on proxied no proxy connection 1")
end).OnError(print)
thread:newThread(function()
tloop.OnLoop:Hold()
print("held on proxied no proxy connection 2")
end)
tloop.OnLoop(function()
print("OnLoop",THREAD_NAME)
end)
thread:newThread(function()
print("Hold on proxied no proxy connection", thread.hold(alarm.OnRing))
end)
-- This doesn't work since this event has already been subscribed to internally on the thread to get thread.hold(alarm.OnRing) to work. But as many events to alarm.OnRing can be made!
thread:newThread(function()
print("Hold on proxied connection", thread.hold(alarm._OnRing))
end)
alarm._OnRing(function(a)
print("_OnRing",a, THREAD_NAME, THREAD_ID)
a:Reset(1)
end)
while true do
tloop.OnLoop:Hold()
print("OnLoop",THREAD_NAME)
end
end).OnError(multi.error)
multi:mainloop()
```
Output:
```
INFO: Integrated Lanes Threading!
alarm.OnRing connector
alarm._OnRing proxy
_OnRing table: 025EB128 STJQ_cjKsEZHg 1 <-- This can change each time you run this example!
OnRing table: 018BC0C0 MAIN_THREAD 0
Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil
Hold on proxied no proxy connection table: 018BC0C0 nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil
_OnRing table: 025EB128 STJQ_cjKsEZHg 1
OnRing table: 018BC0C0 MAIN_THREAD 0
_OnRing table: 025EB128 STJQ_cjKsEZHg 1
OnRing table: 018BC0C0 MAIN_THREAD 0
INFO: Integrated Lanes Threading! 1
tloop proxy
Holding...
tloop type: proxy
Testing proxies on other threads
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
OnLoop MAIN_THREAD
Testing proxy copy Loopy
Held on proxied no proxy connection 1
held on proxied no proxy connection 2
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
Testing proxy copy Loopy
OnLoop MAIN_THREAD
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
... (Will repeat ever second now)
_OnRing table: 025EB128 STJQ_cjKsEZHg 1
... (Will repeat every second)
OnRing table: 018BC0C0 MAIN_THREAD 0
Testing proxy copy Loopy
OnLoop MAIN_THREAD
OnLoop STJQ_W9SZGB6Y
STJQ_W9SZGB6Y Got loop...
...
```
The proxy version can only subscribe to events on the proxy thread, which means that connection metamethods will not work with the proxy version (`_OnRing` on the non proxy thread side), but the (`OnRing`) version will work. Cleverly handling the proxy thread and the non proxy thread will allow powerful connection logic. Also this is not a full system threaded connection. **Proxies should only be used between 2 threads!** To keep things fast I'm using simple queues to transfer data. There is no guarantee that things will work!
@ -222,6 +257,7 @@ Added
- proxyStep = STP:newStep(...)
- proxyTStep = STP:newTStep(...)
- proxyThread = STP:newThread(...)
- proxyService = STP:newService(...)
- threadedFunction = STP:newFunction(...)
Unique:
@ -449,6 +485,8 @@ Removed
Fixed
---
- Issue with luajit w/5.2 compat breaking with coroutine.running(), fixed the script to properly handle so thread.isThread() returns as expected!
- Issue with coroutine based threads where they weren't all being scheduled due to a bad for loop. Replaced with a while to ensure all threads are consumed properly. If a thread created a thread that created a thread that may or may not be on the same process, things got messed up due to the original function not being built with these abstractions in mind.
- Issue with thread:newFunction() where a threaded function will keep a record of their returns and pass them to future calls of the function.
- Issue with multi:newTask(func) not properly handling tasks to be removed. Now uses a thread internally to manage things.
- multi.isMainThread was not properly handled in each integration. This has been resolved.

169
init.lua
View File

@ -33,6 +33,7 @@ local threadManager
local __CurrentConnectionThread
multi.unpack = table.unpack or unpack
multi.pack = table.pack or function(...) return {...} end
if table.unpack then unpack = table.unpack end
@ -81,6 +82,7 @@ multi.TSTEP = "tstep"
multi.THREAD = "thread"
multi.SERVICE = "service"
multi.PROXY = "proxy"
multi.THREADEDFUNCTION = "threaded_function"
if not _G["$multi"] then
_G["$multi"] = {multi = multi, thread = thread}
@ -133,9 +135,7 @@ function multi.Stop()
mainloopActive = false
end
local function pack(...)
return {...}
end
local pack = multi.pack
--Processor
local priorityTable = {[false]="Disabled",[true]="Enabled"}
@ -352,7 +352,7 @@ function multi:newConnection(protect,func,kill)
for i=1,#fast do
local suc, err = pcall(fast[i], ...)
if not suc then
print(err)
multi.error(err)
end
if kill then
table.insert(kills,i)
@ -470,6 +470,10 @@ function multi:newConnection(protect,func,kill)
return temp
end
c.Hold = thread:newFunction(function(self)
return thread.hold(self)
end, true)
c.connect=c.Connect
c.GetConnection=c.getConnection
c.HasConnections = c.hasConnections
@ -487,24 +491,6 @@ function multi:newConnection(protect,func,kill)
return c
end
multi.enableOptimization = multi:newConnection()
multi.optConn = multi:newConnection(true)
multi.optConn(function(msg)
table.insert(optimization_stats, msg)
end)
function multi:getOptimizationConnection()
return multi.optConn
end
function multi:getOptimizationStats()
return optimization_stats
end
function multi:isFindingOptimizing()
return find_optimization
end
-- Used with ISO Threads
local function isolateFunction(func, env)
if setfenv then
@ -516,6 +502,8 @@ local function isolateFunction(func, env)
end
end
multi.isolateFunction = isolateFunction
function multi:Break()
self:Pause()
self.Active=nil
@ -635,6 +623,7 @@ function multi:isDone()
end
function multi:create(ref)
ref.UID = "U"..multi.randomString(12)
self.OnObjectCreated:Fire(ref, self)
return self
end
@ -684,10 +673,6 @@ function multi:newBase(ins)
return c
end
multi.OnObjectCreated=multi:newConnection()
multi.OnObjectDestroyed=multi:newConnection()
multi.OnLoad = multi:newConnection(nil,nil,true)
ignoreconn = false
function multi:newTimer()
local c={}
c.Type=multi.TIMER
@ -738,7 +723,7 @@ function multi:newEvent(task, func)
task=func
return self
end
c.OnEvent = self:newConnection():fastMode()
c.OnEvent = self:newConnection()
if func then
c.OnEvent(func)
end
@ -765,7 +750,7 @@ function multi:newUpdater(skip, func)
skip=n
return self
end
c.OnUpdate = self:newConnection():fastMode()
c.OnUpdate = self:newConnection()
c:setName(c.Type)
if func then
c.OnUpdate(func)
@ -801,7 +786,7 @@ function multi:newAlarm(set, func)
t = clock()
return self
end
c.OnRing = self:newConnection():fastMode()
c.OnRing = self:newConnection()
function c:Pause()
count = clock()
self.Parent.Pause(self)
@ -831,7 +816,7 @@ function multi:newLoop(func, notime)
end
end
c.OnLoop = self:newConnection():fastMode()
c.OnLoop = self:newConnection()
if func then
c.OnLoop(func)
@ -879,9 +864,9 @@ function multi:newStep(start,reset,count,skip)
return true
end
c.Reset=c.Resume
c.OnStart = self:newConnection():fastMode()
c.OnStep = self:newConnection():fastMode()
c.OnEnd = self:newConnection():fastMode()
c.OnStart = self:newConnection()
c.OnStep = self:newConnection()
c.OnEnd = self:newConnection()
function c:Break()
self.Active=nil
return self
@ -902,40 +887,49 @@ function multi:newStep(start,reset,count,skip)
return c
end
function multi:newTLoop(func,set)
function multi:newTLoop(func, set)
local c=self:newBase()
c.Type=multi.TLOOP
c.set=set or 0
c.timer=self:newTimer()
c.life=0
c:setPriority("Low")
function c:Act()
if self.timer:Get()>=self.set then
if self.timer:Get() >= self.set then
self.life=self.life+1
self.timer:Reset()
self.OnLoop:Fire(self,self.life)
self.OnLoop:Fire(self, self.life)
return true
end
end
function c:Set(set)
self.set = set
end
function c:Resume()
self.Parent.Resume(self)
self.timer:Resume()
return self
end
function c:Pause()
self.timer:Pause()
self.Parent.Pause(self)
return self
end
c.OnLoop = self:newConnection():fastMode()
c.OnLoop = self:newConnection()
if func then
c.OnLoop(func)
end
c:setName(c.Type)
self:create(c)
return c
end
@ -1111,7 +1105,7 @@ function multi:newProcessor(name, nothread)
function c:newFunction(func, holdme)
return thread:newFunctionBase(function(...)
return c:newThread("Threaded Function Handler", func, ...)
return c:newThread("Process Threaded Function Handler", func, ...)
end, holdme)()
end
@ -1142,6 +1136,7 @@ function multi:newProcessor(name, nothread)
end
table.insert(processes,c)
self:create(c)
return c
end
@ -1207,7 +1202,7 @@ function multi:getTasks()
end
function thread.request(t,cmd,...)
thread.requests[t.thread] = {cmd,{...}}
thread.requests[t.thread] = {cmd, multi.pack(...)}
end
function thread.getRunningThread()
@ -1246,14 +1241,18 @@ local function conn_test(conn)
local args
local func = function(...)
ready = true
args = {...}
args = multi.pack(...)
end
local ref = conn(func)
return function()
if ready then
conn:Unconnect(ref)
return multi.unpack(args) or multi.NIL
if #args==0 then
return multi.NIL
else
return multi.unpack(args)
end
end
end
end
@ -1265,7 +1264,7 @@ function thread.chain(...)
end
end
function thread.hold(n,opt)
function thread.hold(n, opt)
thread._Requests()
local opt = opt or {}
if type(opt)=="table" then
@ -1284,8 +1283,10 @@ function thread.hold(n,opt)
return yield(CMD, t_sleep, n or 0, nil, interval)
elseif type(n) == "table" and n.Type == multi.CONNECTOR then
return yield(CMD, t_hold, conn_test(n), nil, interval)
elseif type(n) == "table" and n.Hold ~= nil then
return n:Hold(opt)
elseif type(n) == "function" then
return yield(CMD, t_hold, n or dFunc, nil, interval)
return yield(CMD, t_hold, n, nil, interval)
else
multi.error("Invalid argument passed to thread.hold(...)!")
end
@ -1316,11 +1317,12 @@ function thread.yield()
end
function thread.isThread()
if _VERSION~="Lua 5.1" then
local a,b = running()
local a,b = running()
if b then
-- We are dealing with luajit compat or 5.2+
return not(b)
else
return running()~=nil
return a~=nil
end
end
@ -1343,7 +1345,7 @@ function thread.waitFor(name)
end
local function cleanReturns(...)
local returns = {...}
local returns = multi.pack(...)
local rets = {}
local ind = 0
for i=#returns,1,-1 do
@ -1390,8 +1392,7 @@ function thread:newFunctionBase(generator, holdme)
end)
else
while not rets and not err do
multi:getCurrentProcess():getHandler()()
multi:getHandler()()
multi:uManager()
end
local g = rets
rets = nil
@ -1415,7 +1416,7 @@ function thread:newFunctionBase(generator, holdme)
}
end
local t = generator(...)
t.OnDeath(function(...) rets = {...} end)
t.OnDeath(function(...) rets = multi.pack(...) end)
t.OnError(function(self,e) err = e end)
if holdme then
return wait()
@ -1443,13 +1444,14 @@ function thread:newFunctionBase(generator, holdme)
return temp
end
setmetatable(tfunc, tfunc)
tfunc.Type = multi.THREADEDFUNCTION
return tfunc
end
end
function thread:newFunction(func, holdme)
return thread:newFunctionBase(function(...)
return thread:newThread("Threaded Function Handler", func, ...)
return thread:newThread("Free Threaded Function Handler", func, ...)
end, holdme)()
end
@ -1483,18 +1485,18 @@ function thread:newProcessor(name)
function proc:newFunction(func, holdme)
return thread:newFunctionBase(function(...)
return thread_proc:newThread("Threaded Function Handler", func, ...)
return thread_proc:newThread("TProc Threaded Function Handler", func, ...)
end, holdme)()
end
function proc.Start()
Active = true
return c
return proc
end
function proc.Stop()
Active = false
return c
return proc
end
function proc:Destroy()
@ -1512,6 +1514,8 @@ function thread:newProcessor(name)
end
end)
end)
self:create(proc)
return proc
end
@ -1525,13 +1529,13 @@ end
function thread:newThread(name, func, ...)
multi.OnLoad:Fire() -- This was done incase a threaded function was called before mainloop/uManager was called
local func = func or name
if func == name then
name = name or multi.randomString(16)
if type(name) == "function" then
func = name
name = "UnnamedThread_"..multi.randomString(16)
end
local c={nil,nil,nil,nil,nil,nil,nil}
c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil}
c.startArgs = {...}
c.startArgs = multi.pack(...)
c.ref={}
c.Name=name
c.thread=create(func)
@ -1794,9 +1798,8 @@ function multi:createHandler()
return coroutine.wrap(function()
local temp_start
while true do
for start = #startme, 1, -1 do
temp_start = startme[start]
table.remove(startme)
while #startme>0 do
temp_start = table.remove(startme)
_, ret, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15, r16 = resume(temp_start.thread, multi.unpack(temp_start.startArgs))
co_status[status(temp_start.thread)](temp_start.thread, temp_start, t_none, nil, threads)
table.insert(threads, temp_start)
@ -1919,7 +1922,7 @@ function multi:newService(func) -- Priority managed threads
return c
end
multi.create(multi,c)
self:create(c)
return c
end
@ -2328,7 +2331,7 @@ end
function multi.print(...)
if multi.defaultSettings.print then
local t = {}
for i,v in pairs({...}) do t[#t+1] = tostring(v) end
for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end
io.write("\x1b[94mINFO:\x1b[0m " .. table.concat(t," ") .. "\n")
end
end
@ -2336,7 +2339,7 @@ end
function multi.warn(...)
if multi.defaultSettings.warn then
local t = {}
for i,v in pairs({...}) do t[#t+1] = tostring(v) end
for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end
io.write("\x1b[93mWARNING:\x1b[0m " .. table.concat(t," ") .. "\n")
end
end
@ -2344,8 +2347,8 @@ end
function multi.error(self, err)
if type(err) == "bool" then crash = err end
if type(self) == "string" then err = self end
io.write("\x1b[91mERROR:\x1b[0m " .. err .. "\n")
error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type)
io.write("\x1b[91mERROR:\x1b[0m " .. err .. " " .. debug.getinfo(2).name .."\n")
error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n")
if multi.defaultSettings.error then
os.exit(1)
end
@ -2353,7 +2356,7 @@ end
function multi.success(...)
local t = {}
for i,v in pairs({...}) do t[#t+1] = tostring(v) end
for i,v in pairs(multi.pack(...)) do t[#t+1] = tostring(v) end
io.write("\x1b[92mSUCCESS:\x1b[0m " .. table.concat(t," ") .. "\n")
end
@ -2374,6 +2377,29 @@ function os.exit(n)
_os(n)
end
multi.OnObjectCreated=multi:newConnection()
ignoreconn = false
multi.OnObjectDestroyed=multi:newConnection()
multi.OnLoad = multi:newConnection(nil,nil,true)
multi.enableOptimization = multi:newConnection()
multi.optConn = multi:newConnection(true)
multi.optConn(function(msg)
table.insert(optimization_stats, msg)
end)
function multi:getOptimizationConnection()
return multi.optConn
end
function multi:getOptimizationStats()
return optimization_stats
end
function multi:isFindingOptimizing()
return find_optimization
end
multi.OnError=multi:newConnection()
multi.OnPreLoad = multi:newConnection()
multi.OnExit = multi:newConnection(nil,nil,true)
@ -2393,11 +2419,12 @@ function multi:getHandler()
end
multi:newThread("Task Handler", function()
local check = function()
return table.remove(tasks)
end
while true do
thread.hold(check)()
if #tasks > 0 then
table.remove(tasks)()
else
thread.yield()
end
end
end).OnError(multi.error)

View File

@ -113,7 +113,7 @@ function multi:newSystemThreadedJobQueue(n)
return self
end
function c:pushJob(name,...)
queueJob:push{name,jid,{...}}
queueJob:push{name,jid,multi.pack(...)}
jid = jid + 1
return jid-1
end
@ -132,12 +132,16 @@ function multi:newSystemThreadedJobQueue(n)
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
rets = multi.pack(...)
end
end)
return thread.hold(function()
if rets then
return multi.unpack(rets) or multi.NIL
if #rets == 0 then
return multi.NIL
else
return multi.unpack(rets)
end
end
end)
end, holup), name
@ -171,10 +175,10 @@ function multi:newSystemThreadedJobQueue(n)
local name = table.remove(dat, 1)
local jid = table.remove(dat, 1)
local args = table.remove(dat, 1)
queueReturn:push{jid, funcs[name](multi.unpack(args)), queue}
queueReturn:push{jid, funcs[name](args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]), queue}
end).OnError(multi.error)
end
end).OnError(print)
end).OnError(multi.error)
thread:newThread("DoAllHandler",function()
while true do
local dat = thread.hold(function()
@ -190,7 +194,7 @@ function multi:newSystemThreadedJobQueue(n)
end
end
end
end).OnError(print)
end).OnError(multi.error)
thread:newThread("IdleHandler",function()
while true do
thread.hold(function()
@ -198,9 +202,9 @@ function multi:newSystemThreadedJobQueue(n)
end)
THREAD.sleep(.01)
end
end).OnError(print)
end).OnError(multi.error)
multi:mainloop()
end,i).OnError(print)
end,i).OnError(multi.error)
end
return c
end
@ -258,7 +262,7 @@ function multi:newSystemThreadedConnection(name)
local function fire(...)
for _, link in pairs(c.links) do
link:push {c.TRIG, {...}}
link:push {c.TRIG, multi.pack(...)}
end
end
@ -286,7 +290,7 @@ function multi:newSystemThreadedConnection(name)
--- ^^^ This will only exist in the init thread
function c:Fire(...)
local args = {...}
local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do
link:push {self.TRIG, args}
@ -344,4 +348,5 @@ function multi:newSystemThreadedConnection(name)
end
return c
end
end
require("multi.integration.sharedExtensions")

View File

@ -98,6 +98,13 @@ function multi:newSystemThread(name, func, ...)
globals = globe,
priority = c.priority
},function(...)
local profi
if multi_settings.debug then
profi = require("proFI")
profi:start()
end
multi, thread = require("multi"):init(multi_settings)
require("multi.integration.lanesManager.extensions")
require("multi.integration.sharedExtensions")
@ -105,6 +112,12 @@ function multi:newSystemThread(name, func, ...)
returns = {pcall(func, ...)}
return_linda:set("returns", returns)
has_error = false
if profi then
multi.OnExit(function(...)
profi:stop()
profi:writeReport("Profiling Report [".. THREAD_NAME .."].txt")
end)
end
end)(...)
count = count + 1
function c:getName()
@ -118,6 +131,13 @@ function multi:newSystemThread(name, func, ...)
c.OnDeath = multi:newConnection()
c.OnError = multi:newConnection()
GLOBAL["__THREADS__"] = livingThreads
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c
end
@ -177,7 +197,7 @@ function multi.InitSystemThreadErrorHandler()
end
end
end
end).OnError(print)
end).OnError(multi.error)
end
multi.print("Integrated Lanes Threading!")
@ -185,7 +205,6 @@ multi.integration = {} -- for module creators
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD
require("multi.integration.lanesManager.extensions")
require("multi.integration.sharedExtensions")
return {
init = function()
return GLOBAL, THREAD

View File

@ -66,7 +66,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console)
local c = {}
c.queue = __Console
function c.print(...)
c.queue:send("Q", {...})
c.queue:send("Q", multi.pack(...))
end
function c.error(err)
c.queue:push("Q",{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__})
@ -90,7 +90,7 @@ local function INIT(__GlobalLinda, __SleepingLinda, __StatusLinda, __Console)
end
function THREAD.pushStatus(...)
local args = {...}
local args = multi.pack(...)
__StatusLinda:send(nil,THREAD_ID, args)
end

View File

@ -129,7 +129,7 @@ function multi:newSystemThreadedJobQueue(n)
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
rets = multi.pack(...)
end
end)
return thread.hold(function()
@ -230,7 +230,7 @@ function multi:newSystemThreadedConnection(name)
self.subscribe = love.thread.getChannel("SUB_STC_" .. self.Name)
function self:Fire(...)
local args = {...}
local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do
love.thread.getChannel(link):push{self.TRIG, args}
@ -271,7 +271,7 @@ function multi:newSystemThreadedConnection(name)
-- This shouldn't be the case
end
end
end).OnError(print)
end).OnError(multi.error)
return self
end
@ -321,7 +321,7 @@ function multi:newSystemThreadedConnection(name)
local function fire(...)
for _, link in pairs(c.links) do
love.thread.getChannel(link):push {c.TRIG, {...}}
love.thread.getChannel(link):push {c.TRIG, multi.pack(...)}
end
end
@ -346,7 +346,7 @@ function multi:newSystemThreadedConnection(name)
c.proxy_conn:Fire(multi.unpack(item[2]))
end
end
end).OnError(print)
end).OnError(multi.error)
--- ^^^ This will only exist in the init thread
THREAD.package(name,c)

View File

@ -103,6 +103,13 @@ function multi:newSystemThread(name, func, ...)
c.stab.returns = nil
end
end)
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c
end

View File

@ -98,7 +98,7 @@ end
function threads.pushStatus(...)
local status_channel = love.thread.getChannel("STATCHAN_" ..__THREADID__)
local args = {...}
local args = multi.pack(...)
status_channel:push(args)
end
@ -209,7 +209,7 @@ function threads.getConsole()
local c = {}
c.queue = love.thread.getChannel("__CONSOLE__")
function c.print(...)
c.queue:push{...}
c.queue:push(multi.pack(...))
end
function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}

View File

@ -118,7 +118,7 @@ function multi:newSystemThreadedJobQueue(n)
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
rets = multi.pack(...)
link:Destroy()
end
end)

View File

@ -76,6 +76,13 @@ function multi:newSystemThread(name,func,...)
GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread}
GLOBAL["__THREAD_COUNT"] = THREAD_ID
THREAD_ID=THREAD_ID+1
if self.isActor then
self:create(c)
else
multi.create(multi, c)
end
return c
end
THREAD.newSystemThread = multi.newSystemThread

View File

@ -156,7 +156,7 @@ function threads.getConsole()
local c = {}
c.queue = lovr.thread.getChannel("__CONSOLE__")
function c.print(...)
c.queue:push{...}
c.queue:push(multi.pack(...))
end
function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}

View File

@ -96,7 +96,7 @@ function multi:newSystemThreadedJobQueue(n)
end
function c:pushJob(name,...)
table.insert(jobs,{name,jid,{...}})
table.insert(jobs,{name,jid,multi.pack(...)})
jid = jid + 1
return jid-1
end
@ -121,7 +121,7 @@ function multi:newSystemThreadedJobQueue(n)
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
rets = multi.pack(...)
end
end)
return thread.hold(function()

View File

@ -1,4 +1,4 @@
--[[
--[[ todo finish the targeted job!
MIT License
Copyright (c) 2023 Ryan Ward
@ -22,40 +22,45 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local function copy(obj)
if type(obj) ~= 'table' then return obj end
local res = {}
for k, v in pairs(obj) do res[copy(k)] = copy(v) end
return res
end
function tprint (tbl, indent)
if not indent then indent = 0 end
for k, v in pairs(tbl) do
formatting = string.rep(" ", indent) .. k .. ": "
if type(v) == "table" then
print(formatting)
tprint(v, indent+1)
else
print(formatting .. tostring(v))
end
end
end
local multi, thread = require("multi"):init()
-- Returns a handler that allows a user to interact with an object on another thread!
-- Create on the thread that you want to interact with, send over the handle
function multi:chop(obj)
if not _G["UIDS"] then
_G["UIDS"] = {}
end
local multi, thread = require("multi"):init()
local list = {[0] = multi.randomString(12)}
_G[list[0]] = obj
for i,v in pairs(obj) do
if type(v) == "function" then
if type(v) == "function" or type(v) == "table" and v.Type == multi.THREADEDFUNCTION then
table.insert(list, i)
elseif type(v) == "table" and v.Type == multi.CONNECTOR then
v.getThreadID = function() -- Special function we are adding
return THREAD_ID
end
v.getUniqueName = function(self)
return self.__link_name
end
local l = multi:chop(v)
v.__link_name = l[0]
v.__name = i
table.insert(list, {i, multi:newProxy(l):init()})
elseif type(v) == "table" and v.Type == multi.CONNECTOR then
table.insert(list, {i, multi:newProxy(multi:chop(v)):init()})
end
end
table.insert(list, "isConnection")
if obj.Type == multi.CONNECTOR then
obj.isConnection = function() return true end
else
obj.isConnection = function() return false end
end
return list
end
@ -64,67 +69,86 @@ function multi:newProxy(list)
local c = {}
c.name = multi.randomString(12)
c.is_init = false
local multi, thread = nil, nil
function c:init()
local multi, thread = nil, nil
if THREAD_ID>0 then
if not(c.is_init) then
c.is_init = true
local multi, thread = require("multi"):init()
c.proxy_link = "PL" .. multi.randomString(12)
if multi.integration then
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
GLOBAL[c.proxy_link] = c
local function check()
return self.send:pop()
end
self.send = multi:newSystemThreadedQueue(self.name.."_S"):init()
self.recv = multi:newSystemThreadedQueue(self.name.."_R"):init()
self.funcs = list
self.conns = list[-1]
thread:newThread(function()
self._funcs = copy(list)
self.Type = multi.PROXY
self.TID = THREAD_ID
thread:newThread("Proxy_Handler_" .. multi.randomString(4), function()
while true do
local data = thread.hold(check)
if data then
local func = table.remove(data, 1)
local sref = table.remove(data, 1)
local ret
if sref then
ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))}
else
ret = {_G[list[0]][func](multi.unpack(data))}
end
for i = 1,#ret do
if type(ret[i]) == "table" and getmetatable(ret[i]) then
setmetatable(ret[i],{}) -- remove that metatable, we do not need it on the other side!
-- Let's not hold the main threadloop
thread:newThread("Temp_Thread", function()
local func = table.remove(data, 1)
local sref = table.remove(data, 1)
local ret
if sref then
ret = {_G[list[0]][func](_G[list[0]], multi.unpack(data))}
else
ret = {_G[list[0]][func](multi.unpack(data))}
end
if ret[i] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values.
ret[i] = {_self_ref_ = true}
for i = 1,#ret do
if type(ret[i]) == "table" and ret[i].Type ~= nil and ret[i].Type ~= multi.PROXY then
ret[i] = "\1PARENT_REF"
end
if type(ret[i]) == "table" and getmetatable(ret[i]) then
setmetatable(ret[i],nil) -- remove that metatable, we do not need it on the other side!
end
if ret[i] == _G[list[0]] then
-- We cannot return itself, that return can contain bad values.
ret[i] = "\1SELF_REF"
end
end
end
table.insert(ret, 1, func)
self.recv:push(ret)
table.insert(ret, 1, func)
self.recv:push(ret)
end)
end
end
end).OnError(print)
end).OnError(multi.error)
return self
else
local multi, thread = require("multi"):init()
local me = self
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
self.send = THREAD.waitFor(self.name.."_S")
self.recv = THREAD.waitFor(self.name.."_R")
local funcs = copy(self.funcs)
if multi.integration then
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
self.send = THREAD.waitFor(self.name.."_S"):init()
self.recv = THREAD.waitFor(self.name.."_R"):init()
self.Type = multi.PROXY
for _,v in pairs(self.funcs) do
for _,v in pairs(funcs) do
if type(v) == "table" then
-- We have a connection
v[2]:init()
self["_"..v[1]] = v[2]
v[2]:init(proc_name)
self[v[1]] = v[2]
v[2].Parent = self
setmetatable(v[2],getmetatable(multi:newConnection()))
self[v[1]] = multi:newConnection()
thread:newThread(function()
while true do
self[v[1]]:Fire(thread.hold(alarm["_"..v[1]]))
end
end)
else
self[v] = thread:newFunction(function(self,...)
if self == me then
@ -138,8 +162,10 @@ function multi:newProxy(list)
me.recv:pop()
table.remove(data, 1)
for i=1,#data do
if type(data[i]) == "table" and data[i]._self_ref_ then
if data[i] == "\1SELF_REF" then
data[i] = me
elseif data[i] == "\1PARENT_REF" then
data[i] = me.Parent
end
end
return multi.unpack(data)
@ -151,36 +177,31 @@ function multi:newProxy(list)
return self
end
end
function c:getTransferable()
local cp = {}
local multi, thread = require("multi"):init()
cp.is_init = true
cp.proxy_link = self.proxy_link
cp.name = self.name
cp.funcs = copy(self._funcs)
cp.init = function(self)
local multi, thread = require("multi"):init()
if multi.integration then
GLOBAL = multi.integration.GLOBAL
THREAD = multi.integration.THREAD
end
local proxy = THREAD.waitFor(self.proxy_link)
proxy.funcs = self.funcs
return proxy:init()
end
return cp
end
self:create(c)
return c
end
local targets = {}
local nFunc = 0
function multi:newTargetedFunction(ID, proc, name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_TFunc_"..nFunc
end
nFunc = nFunc + 1
proc.jobqueue:registerFunction(name, func)
return thread:newFunction(function(...)
local id = proc:pushJob(ID, name, ...)
local link
local rets
link = proc.jobqueue.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
end
end)
return thread.hold(function()
if rets then
return multi.unpack(rets) or multi.NIL
end
end)
end, holup), name
end
local references = {}
local jid = -1
function multi:newSystemThreadedProcessor(cores)
@ -202,71 +223,16 @@ function multi:newSystemThreadedProcessor(cores)
c.OnObjectCreated = multi:newConnection()
c.parent = self
c.jobqueue = multi:newSystemThreadedJobQueue(c.cores)
c.targetedQueue = multi:newSystemThreadedQueue(name.."_target"):init()
c.jobqueue:registerFunction("STP_enable_targets",function(name)
local multi, thread = require("multi"):init()
local qname = THREAD_NAME .. "_t_queue"
local targetedQueue = THREAD.waitFor(name):init()
local tjq = multi:newSystemThreadedQueue(qname):init()
targetedQueue:push({tonumber(THREAD_ID), qname})
multi:newThread("TargetedJobHandler", function()
local queueReturn = _G["__QR"]
while true do
local dat = thread.hold(function()
return tjq:pop()
end)
if dat then
thread:newThread("JQ-TargetThread",function()
local name = table.remove(dat, 1)
local jid = table.remove(dat, 1)
local args = table.remove(dat, 1)
queueReturn:push{jid, _G[name](multi.unpack(args)), queue}
end).OnError(multi.error)
end
end
end).OnError(multi.error)
end)
c.jobqueue:registerFunction("STP_GetThreadCount",function()
return _G["__THREADS"]
end)
c.jobqueue:registerFunction("STP_GetTaskCount",function()
return _G["__TASKS"]
end)
function c:pushJob(ID, name, ...)
targets[ID]:push{name, jid, {...}}
local tq = THREAD.waitFor(self.Name .. "_target_tq_" .. ID):init()
tq:push{name, jid, multi.pack(...)}
jid = jid - 1
return jid + 1
end
c.jobqueue:doToAll(function(name)
STP_enable_targets(name)
_G["__THREADS"] = 0
_G["__TASKS"] = 0
end, name.."_target")
local count = 0
while count < c.cores do
local dat = c.targetedQueue:pop()
if dat then
targets[dat[1]] = multi.integration.THREAD.waitFor(dat[2]):init()
table.insert(c.proc_list, dat[1]) -- Add thread_id to proc list
count = count + 1
end
end
c.jobqueue:registerFunction("packObj",function(obj)
local multi, thread = require("multi"):init()
obj.getThreadID = function() -- Special functions we are adding
return THREAD_ID
end
obj.getUniqueName = function(self)
return self.__link_name
end
local list = multi:chop(obj)
obj.__link_name = list[0]
@ -279,57 +245,52 @@ function multi:newSystemThreadedProcessor(cores)
c.spawnThread = c.jobqueue:newFunction("__spawnThread__", function(name, func, ...)
local multi, thread = require("multi"):init()
local obj = thread:newThread(name, func, ...)
_G["__THREADS"] = _G["__THREADS"] + 1
return packObj(obj)
end, true)
c.spawnTask = c.jobqueue:newFunction("__spawnTask__", function(obj, func, ...)
local multi, thread = require("multi"):init()
local obj = multi[obj](multi, func, ...)
_G["__TASKS"] = _G["__TASKS"] + 1
return packObj(obj)
end, true)
function c:newLoop(func, notime)
proxy = self.spawnTask("newLoop", func, notime):init()
proxy.__proc = self
local implement = {
"newLoop",
"newTLoop",
"newUpdater",
"newEvent",
"newAlarm",
"newStep",
"newTStep",
"newService"
}
for _, method in pairs(implement) do
c[method] = function(self, ...)
proxy = self.spawnTask(method, ...):init()
references[proxy] = self
return proxy
end
end
function c:newThread(name, func, ...)
proxy = self.spawnThread(name, func, ...):init(self.Name)
references[proxy] = self
table.insert(self.threads, proxy)
return proxy
end
function c:newTLoop(func, time)
proxy = self.spawnTask("newTLoop", func, time):init()
proxy.__proc = self
return proxy
function c:newFunction(func, holdme)
return c.jobqueue:newFunction(func, holdme)
end
function c:newUpdater(skip, func)
proxy = self.spawnTask("newUpdater", func, notime):init()
proxy.__proc = self
return proxy
end
function c:newEvent(task, func)
proxy = self.spawnTask("newEvent", task, func):init()
proxy.__proc = self
return proxy
end
function c:newAlarm(set, func)
proxy = self.spawnTask("newAlarm", set, func):init()
proxy.__proc = self
return proxy
end
function c:newStep(start, reset, count, skip)
proxy = self.spawnTask("newStep", start, reset, count, skip):init()
proxy.__proc = self
return proxy
end
function c:newTStep(start ,reset, count, set)
proxy = self.spawnTask("newTStep", start, reset, count, set):init()
proxy.__proc = self
return proxy
function c:newSharedTable(name)
if not name then multi.error("You must provide a name when creating a table!") end
local tbl_name = "TABLE_"..multi.randomString(8)
c.jobqueue:doToAll(function(tbl_name, interaction)
_G[interaction] = THREAD.waitFor(tbl_name):init()
end, tbl_name, name)
return multi:newSystemThreadedTable(tbl_name):init()
end
function c:getHandler()
@ -348,26 +309,6 @@ function multi:newSystemThreadedProcessor(cores)
return self.Name
end
function c:newThread(name, func, ...)
proxy = self.spawnThread(name, func, ...):init()
proxy.__proc = self
table.insert(self.threads, proxy)
return proxy
end
function c:newFunction(func, holdme)
return c.jobqueue:newFunction(func, holdme)
end
function c:newSharedTable(name)
if not name then multi.error("You must provide a name when creating a table!") end
local tbl_name = "TABLE_"..multi.randomString(8)
c.jobqueue:doToAll(function(tbl_name, interaction)
_G[interaction] = THREAD.waitFor(tbl_name):init()
end, tbl_name, name)
return multi:newSystemThreadedTable(tbl_name):init()
end
function c.run()
return self
end
@ -418,51 +359,3 @@ function multi:newSystemThreadedProcessor(cores)
return c
end
-- Modify thread.hold to handle proxies
local thread_ref = thread.hold
function thread.hold(n, opt)
--if type(n) == "table" then print(n.Type, n.isConnection()) end
if type(n) == "table" and n.Type == multi.PROXY and n.isConnection() then
local ready = false
local args
local id = n.getThreadID()
local name = n:getUniqueName()
local func = multi:newTargetedFunction(id, n.Parent.__proc, "conn_"..multi.randomString(8), function(_name)
local multi, thread = require("multi"):init()
local obj = _G[_name]
local rets = {thread.hold(obj)}
for i,v in pairs(rets) do
if v.Type then
rets[i] = {_self_ref_ = "parent"}
end
end
return unpack(rets)
end)
local conn
local handle = func(name)
conn = handle.OnReturn(function(...)
ready = true
args = {...}
handle.OnReturn:Unconnect(conn)
end)
local ret = {thread_ref(function()
if ready then
return multi.unpack(args) or multi.NIL
end
end, opt)}
for i,v in pairs(ret) do
if type(v) == "table" and v._self_ref_ == "parent" then
ret[i] = n.Parent
end
end
return unpack(ret)
else
return thread_ref(n, opt)
end
end