Closer to getting things working...

This commit is contained in:
Ryan Ward 2023-08-06 00:38:22 -04:00
parent bab9b13cb8
commit a660b63581
9 changed files with 147 additions and 79 deletions

View File

@ -62,17 +62,12 @@ Table of contents
Full Update Showcase Full Update Showcase
--- ---
```lua ```lua
multi, thread = require("multi"):init{print=true} multi, thread = require("multi"):init{print=true}
GLOBAL, THREAD = require("multi.integration.lanesManager"):init() GLOBAL, THREAD = require("multi.integration.lanesManager"):init()
``` ```
## Added New Integration: **effilManager**
Another option for multithreading support, works just like all the other threading integrations, but uses the internals of effil and it's unique features.
- Refer to this [doc](https://www.lua.org/wshop18/Kupriyanov.pdf) to read more about it.
- Project github [page](https://github.com/effil/effil/tree/master).
```lua ```lua
package.path = "?/init.lua;?.lua;"..package.path package.path = "?/init.lua;?.lua;"..package.path

View File

@ -482,7 +482,7 @@ function multi:newConnection(protect,func,kill)
return temp return temp
end end
function c:Hold(self) function c:Hold()
return multi.hold(self) return multi.hold(self)
end end
@ -1276,7 +1276,6 @@ function thread.hold(n, opt)
return yield(CMD, t_skip, opt.skip or 1, nil, interval) return yield(CMD, t_skip, opt.skip or 1, nil, interval)
end end
end end
if type(n) == "number" then if type(n) == "number" then
thread.getRunningThread().lastSleep = clock() thread.getRunningThread().lastSleep = clock()
return yield(CMD, t_sleep, n or 0, nil, interval) return yield(CMD, t_sleep, n or 0, nil, interval)
@ -2392,7 +2391,8 @@ function multi.error(self, err)
io.write("\x1b[91mERROR:\x1b[0m " .. err .. " ?\n") io.write("\x1b[91mERROR:\x1b[0m " .. err .. " ?\n")
end end
if multi.defaultSettings.error then if multi.defaultSettings.error then
error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" .. debug.traceback().."\n") error("^^^ " .. multi:getCurrentProcess():getFullName() .. " " .. multi:getCurrentTask().Type .. "\n" ..
((coroutine.running()) and debug.traceback((coroutine.running())) or debug.traceback()) .. "\n")
os.exit(1) os.exit(1)
end end
end end

View File

@ -70,6 +70,8 @@ function multi:newSystemThreadedTable(name)
return self return self
end end
c.__init = c.init
function c:Hold(opt) function c:Hold(opt)
if opt.key then if opt.key then
return thread.hold(function() return thread.hold(function()

View File

@ -92,8 +92,8 @@ function INIT()
repeat repeat
wait() wait()
until GLOBAL[name] ~= nil until GLOBAL[name] ~= nil
if type(GLOBAL[name].init) == "function" then if type(GLOBAL[name].__init) == "function" then
return GLOBAL[name]:init() return GLOBAL[name]:__init()
else else
return GLOBAL[name] return GLOBAL[name]
end end

View File

@ -48,7 +48,6 @@ end
local fRef = {"func",nil} local fRef = {"func",nil}
local function manage(channel, value) local function manage(channel, value)
channel:clear() channel:clear()
print("pushing",value)
if type(value) == "table" then if type(value) == "table" then
channel:push{"DATA",threads.packTable(value)} channel:push{"DATA",threads.packTable(value)}
else else

View File

@ -87,12 +87,12 @@ function multi:newSystemThread(name,func,...)
end end
THREAD.newSystemThread = multi.newSystemThread THREAD.newSystemThread = multi.newSystemThread
function lovr.threaderror(thread, errorstr) function lovr.threaderror(thread, errorstr)
print("Thread error!\n"..errorstr) multi.print("Thread error!\n"..errorstr)
end end
multi.integration.GLOBAL = GLOBAL multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD multi.integration.THREAD = THREAD
require("multi.integration.lovrManager.extensions") require("multi.integration.lovrManager.extensions")
print("Integrated lovr Threading!") multi.print("Integrated lovr Threading!")
return {init=function() return {init=function()
return GLOBAL,THREAD return GLOBAL,THREAD
end} end}

View File

@ -47,6 +47,17 @@ function multi:newSystemThreadedQueue(name)
function c:init() function c:init()
return self return self
end end
function c:Hold(opt)
if opt.peek then
return thread.hold(function()
return self:peek()
end)
else
return thread.hold(function()
return self:pop()
end)
end
end
GLOBAL[name or "_"] = c GLOBAL[name or "_"] = c
return c return c
end end
@ -56,58 +67,59 @@ function multi:newSystemThreadedTable(name)
function c:init() function c:init()
return self return self
end end
function c:Hold(opt)
if opt.key then
return thread.hold(function()
return self.tab[opt.key]
end)
else
multi.error("Must provide a key to check opt.key = 'key'")
end
end
GLOBAL[name or "_"] = c GLOBAL[name or "_"] = c
return c return c
end end
local setfenv = setfenv local setfenv = multi.isolateFunction
if not setfenv then
if not debug then
multi.print("Unable to implement setfenv in lua 5.2+ the debug module is not available!")
else
setfenv = function(f, env)
return load(string.dump(f), nil, nil, env)
end
end
end
local jqc = 1
function multi:newSystemThreadedJobQueue(n) function multi:newSystemThreadedJobQueue(n)
local c = {} local c = {}
c.cores = n or THREAD.getCores()*2
c.cores = n or THREAD.getCores()
c.registerQueue = {}
c.Type = multi.SJOBQUEUE
c.funcs = multi:newSystemThreadedTable("__JobQueue_"..jqc.."_table")
c.queue = multi:newSystemThreadedQueue("__JobQueue_"..jqc.."_queue")
c.queueReturn = multi:newSystemThreadedQueue("__JobQueue_"..jqc.."_queueReturn")
c.queueAll = multi:newSystemThreadedQueue("__JobQueue_"..jqc.."_queueAll")
c.id = 0
c.OnJobCompleted = multi:newConnection() c.OnJobCompleted = multi:newConnection()
local jobs = {}
local ID=1
local jid = 1
local env = {}
setmetatable(env,{ local allfunc = 0
__index = _G
})
local funcs = {}
function c:doToAll(func) function c:doToAll(func)
setfenv(func,env)() for i = 1, self.cores do
return self self.queueAll:push({allfunc, func})
end
allfunc = allfunc + 1
end end
function c:registerFunction(name, func) function c:registerFunction(name, func)
funcs[name] = setfenv(func,env) if self.funcs[name] then
return self multi.error("A function by the name "..name.." has already been registered!")
end
self.funcs[name] = func
end end
function c:pushJob(name,...) function c:pushJob(name,...)
table.insert(jobs,{name,jid,multi.pack(...)}) self.id = self.id + 1
jid = jid + 1 self.queue:push{name,self.id,...}
return jid-1 return self.id
end end
function c:isEmpty() function c:isEmpty()
return #jobs == 0 return queueJob:peek()==nil
end end
local nFunc = 0 local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue function c:newFunction(name,func,holup) -- This registers with the queue
local func = stripUpValues(func)
if type(name)=="function" then if type(name)=="function" then
holup = func holup = func
func = name func = name
@ -131,19 +143,81 @@ function multi:newSystemThreadedJobQueue(n)
end) end)
end,holup),name end,holup),name
end end
for i=1,c.cores do thread:newThread("jobManager",function()
thread:newThread("PesudoThreadedJobQueue_"..i,function()
while true do while true do
thread.yield() thread.yield()
if #jobs>0 then local dat = c.queueReturn:pop()
local j = table.remove(jobs,1) if dat then
c.OnJobCompleted:Fire(j[2],funcs[j[1]](multi.unpack(j[3]))) c.OnJobCompleted:Fire(multi.unpack(dat))
else end
thread.sleep(.05) end
end)
for i=1,c.cores do
multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc)
local multi, thread = require("multi"):init()
local clock = os.clock
local funcs = THREAD.waitFor("__JobQueue_"..jqc.."_table")
local queue = THREAD.waitFor("__JobQueue_"..jqc.."_queue")
local queueReturn = THREAD.waitFor("__JobQueue_"..jqc.."_queueReturn")
local lastProc = clock()
local queueAll = THREAD.waitFor("__JobQueue_"..jqc.."_queueAll")
local registry = {}
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("startUp",function()
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
queueAll:pop()[2]()
end
end
end)
thread:newThread("runner",function()
thread.sleep(.1)
while true do
thread.yield()
local all = queueAll:peek()
if all and not registry[all[1]] then
lastProc = os.clock()
queueAll:pop()[2]()
end
local dat = thread.hold(queue)
if dat then
multi:newThread("Test",function()
lastProc = os.clock()
local name = table.remove(dat,1)
local id = table.remove(dat,1)
local tab = {multi.isolateFunction(funcs[name],_G)(multi.unpack(dat))}
table.insert(tab,1,id)
queueReturn:push(tab)
end)
end end
end end
end).OnError(multi.error) end).OnError(multi.error)
thread:newThread("Idler",function()
while true do
thread.yield()
if clock()-lastProc> 2 then
THREAD.sleep(.05)
else
THREAD.sleep(.001)
end end
end
end)
multi:mainloop()
end,jqc)
end
function c:Hold(opt)
return thread.hold(self.OnJobCompleted)
end
jqc = jqc + 1
self:create(c)
return c return c
end end

View File

@ -189,7 +189,6 @@ function multi:newProxy(list)
THREAD = multi.integration.THREAD THREAD = multi.integration.THREAD
end end
local proxy = THREAD.waitFor(self.proxy_link) local proxy = THREAD.waitFor(self.proxy_link)
print("Got:",proxy)
proxy.funcs = self.funcs proxy.funcs = self.funcs
return proxy:init() return proxy:init()
end end
@ -267,7 +266,8 @@ function multi:newSystemThreadedProcessor(cores)
for _, method in pairs(implement) do for _, method in pairs(implement) do
c[method] = function(self, ...) c[method] = function(self, ...)
proxy = self.spawnTask(method, ...):init() proxy = self.spawnTask(method, ...)
proxy:init()
references[proxy] = self references[proxy] = self
return proxy return proxy
end end

View File

@ -77,7 +77,6 @@ multi:newThread("Scheduler Thread",function()
end, true) -- Hold this end, true) -- Hold this
a, b, c, d = func(3,2,1) a, b, c, d = func(3,2,1)
print(a, b, c, d)
assert(a == 1, "First return was not '1'!") assert(a == 1, "First return was not '1'!")
assert(b == 2, "Second return was not '2'!") assert(b == 2, "Second return was not '2'!")
assert(c == 3, "Third return was not '3'!") assert(c == 3, "Third return was not '3'!")
@ -115,8 +114,11 @@ multi:newThread("Scheduler Thread",function()
local ready = false local ready = false
jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads
func = jq:newFunction("test-thread",function(a,b) func2 = jq:newFunction("sleep",function(a,b)
THREAD.sleep(.2) THREAD.sleep(.2)
end)
func = jq:newFunction("test-thread",function(a,b)
sleep()
return a+b return a+b
end) end)
local count = 0 local count = 0
@ -194,9 +196,6 @@ multi:newThread("Scheduler Thread",function()
multi:newSystemThread("Testing proxy copy THREAD",function(tloop) multi:newSystemThread("Testing proxy copy THREAD",function(tloop)
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
for i,v in pairs(tloop.funcs) do
print(i,v)
end
tloop = tloop:init() tloop = tloop:init()
multi.print("tloop type:",tloop.Type) multi.print("tloop type:",tloop.Type)
multi.print("Testing proxies on other threads") multi.print("Testing proxies on other threads")
@ -224,7 +223,6 @@ multi:newThread("Scheduler Thread",function()
proxy_test = true proxy_test = true
end).OnError(multi.error) end).OnError(multi.error)
thread:newThread(function() thread:newThread(function()
while true do while true do
thread.hold(tloop.OnLoop) thread.hold(tloop.OnLoop)
@ -256,7 +254,7 @@ end).OnError(multi.error)
multi.OnExit(function(err_or_errorcode) multi.OnExit(function(err_or_errorcode)
print("Error Code: ", err_or_errorcode) print("Error Code: ", err_or_errorcode)
if not we_good then if not we_good then
multi.info("There was an error running some tests!") multi.print("There was an error running some tests!")
return return
else else
multi.success("Tests complete!") multi.success("Tests complete!")