Working on issue where threads created in threads don't work

This commit is contained in:
Ryan Ward 2023-01-19 00:16:32 -05:00
parent 22f1375380
commit 0994ee2d2a
3 changed files with 107 additions and 149 deletions

224
init.lua
View File

@ -22,16 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] ]]
traceback = debug.traceback
local multi = {} local multi = {}
local mainloopActive = false local mainloopActive = false
local isRunning = false local isRunning = false
local clock = os.clock local clock = os.clock
local thread = {} local thread = {}
local in_proc = false
local processes = {} local processes = {}
local find_optimization = false local find_optimization = false
local threadManager
if not _G["$multi"] then if not _G["$multi"] then
_G["$multi"] = {multi=multi,thread=thread} _G["$multi"] = {multi=multi,thread=thread}
@ -387,7 +385,7 @@ function multi:newConnection(protect, func, kill)
if not call_funcs[i] then return end if not call_funcs[i] then return end
local suc, err = pcall(call_funcs[i],...) local suc, err = pcall(call_funcs[i],...)
if not suc then if not suc then
print(err) multi.print(err)
end end
if kill then if kill then
table.remove(call_funcs,i) table.remove(call_funcs,i)
@ -667,7 +665,7 @@ function multi:newEvent(task)
end end
c.OnEvent = self:newConnection():fastMode() c.OnEvent = self:newConnection():fastMode()
self:setPriority("core") self:setPriority("core")
c:SetName(c.Type) c:setName(c.Type)
multi:create(c) multi:create(c)
return c return c
end end
@ -689,7 +687,7 @@ function multi:newUpdater(skip)
return self return self
end end
c.OnUpdate = self:newConnection():fastMode() c.OnUpdate = self:newConnection():fastMode()
c:SetName(c.Type) c:setName(c.Type)
multi:create(c) multi:create(c)
return c return c
end end
@ -726,7 +724,7 @@ function multi:newAlarm(set)
self.Parent.Pause(self) self.Parent.Pause(self)
return self return self
end end
c:SetName(c.Type) c:setName(c.Type)
multi:create(c) multi:create(c)
return c return c
end end
@ -751,7 +749,7 @@ function multi:newLoop(func,notime)
end end
multi:create(c) multi:create(c)
c:SetName(c.Type) c:setName(c.Type)
return c return c
end end
@ -809,7 +807,7 @@ function multi:newStep(start,reset,count,skip)
self:Resume() self:Resume()
return self return self
end end
c:SetName(c.Type) c:setName(c.Type)
multi:create(c) multi:create(c)
return c return c
end end
@ -845,13 +843,13 @@ function multi:newTLoop(func,set)
if func then if func then
c.OnLoop(func) c.OnLoop(func)
end end
c:SetName(c.Type) c:setName(c.Type)
multi:create(c) multi:create(c)
return c return c
end end
function multi:setTimeout(func,t) function multi:setTimeout(func, t)
thread:newThread(function() thread.sleep(t) func() end) thread:newThread("TimeoutThread",function() thread.sleep(t) func() end)
end end
function multi:newTStep(start,reset,count,set) function multi:newTStep(start,reset,count,set)
@ -895,7 +893,7 @@ function multi:newTStep(start,reset,count,set)
self:Resume() self:Resume()
return self return self
end end
c:SetName(c.Type) c:setName(c.Type)
multi:create(c) multi:create(c)
return c return c
end end
@ -909,7 +907,7 @@ local function _task_handler()
end end
function multi:newTask(func) function multi:newTask(func)
multi:newThread("Task Handler",function() thread:newThread("Task Handler",function()
while true do while true do
thread.hold(function() thread.hold(function()
return _tasks > 0 return _tasks > 0
@ -1022,16 +1020,13 @@ function multi:newProcessor(name, nothread)
return self.Name return self.Name
end end
function c:newThread(name,func,...) function c:newThread(name, func,...)
in_proc = c return thread.newThread(c, name, func, ...)
local t = thread.newThread(c,name,func,...)
in_proc = false
return t
end end
function c:newFunction(func, holdme) function c:newFunction(func, holdme)
return thread:newFunctionBase(function(...) return thread:newFunctionBase(function(...)
return c:newThread("TempThread",func,...) return c:newThread("Threaded Function Handler", func, ...)
end, holdme)() end, holdme)()
end end
@ -1358,10 +1353,10 @@ function thread:newFunctionBase(generator, holdme)
end end
end end
function thread:newFunction(func,holdme) function thread:newFunction(func, holdme)
return thread:newFunctionBase(function(...) return thread:newFunctionBase(function(...)
return thread:newThread("TempThread",func,...) return thread:newThread("Threaded Function Handler", func, ...)
end,holdme)() end, holdme)()
end end
-- A cross version way to set enviroments, not the same as fenv though -- A cross version way to set enviroments, not the same as fenv though
@ -1388,7 +1383,7 @@ function thread:newThread(name,func,...)
c.Name=name c.Name=name
c.thread=create(func) c.thread=create(func)
c.sleep=1 c.sleep=1
c.Type="thread" c.Type = "thread"
c.TID = threadid c.TID = threadid
c.firstRunDone=false c.firstRunDone=false
c._isPaused = false c._isPaused = false
@ -1427,12 +1422,12 @@ function thread:newThread(name,func,...)
end end
function c:Kill() function c:Kill()
thread.request(self,"kill") thread.request(self, "kill")
return self return self
end end
function c:Sleep(n) function c:Sleep(n)
thread.request(self,"exec",function() thread.request(self, "exec",function()
thread.sleep(n) thread.sleep(n)
resumed = false resumed = false
end) end)
@ -1440,8 +1435,8 @@ function thread:newThread(name,func,...)
end end
function c:Hold(n,opt) function c:Hold(n,opt)
thread.request(self,"exec",function() thread.request(self, "exec",function()
thread.hold(n,opt) thread.hold(n, opt)
resumed = false resumed = false
end) end)
return self return self
@ -1449,13 +1444,17 @@ function thread:newThread(name,func,...)
c.Destroy = c.Kill c.Destroy = c.Kill
if self.Type=="process" then if self.Type == "process" then
table.insert(self.startme,c) multi.print("Creating thread (" .. self.Name .."):", name)
table.insert(self.startme, c)
else else
table.insert(startme,c) multi.print("Creating thread (Global_Thread_Manager):", name)
if type(name) == "function" then
multi.print(debug.traceback())
end
table.insert(threadManager.startme, c)
end end
startme_len = #startme
globalThreads[c] = multi globalThreads[c] = multi
threadid = threadid + 1 threadid = threadid + 1
multi:create(c) multi:create(c)
@ -1463,7 +1462,7 @@ function thread:newThread(name,func,...)
return c return c
end end
function thread:newISOThread(name,func,_env,...) function thread:newISOThread(name, func, _env, ...)
local func = func or name local func = func or name
local env = _env or {} local env = _env or {}
if not env.thread then if not env.thread then
@ -1473,10 +1472,10 @@ function thread:newISOThread(name,func,_env,...)
env.multi = multi env.multi = multi
end end
if type(name) == "function" then if type(name) == "function" then
name = "Thread#" .. threadCount name = "Thread#"..threadCount
end end
local func = isolateFunction(func, env) local func = isolateFunction(func,env)
return thread:newThread(name, func,...) return thread:newThread(name,func,...)
end end
multi.newThread = thread.newThread multi.newThread = thread.newThread
@ -1488,7 +1487,7 @@ local ret,_
local task, thd, ref, ready local task, thd, ref, ready
local switch = { local switch = {
function(th,co)--hold function(th,co)--hold
if clock() - th.intervalR >= th.interval then if clock() - th.intervalR>=th.interval then
t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func() t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func()
if t0 then if t0 then
if t0==NIL then t0 = nil end if t0==NIL then t0 = nil end
@ -1617,7 +1616,6 @@ co_status = {
else else
ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16)
end end
print(ref.Name, traceback())
if i then if i then
table.remove(th,i) table.remove(th,i)
else else
@ -1632,30 +1630,6 @@ co_status = {
ref.__processed = true ref.__processed = true
end, end,
} }
handler = coroutine.wrap(function(self)
local temp_start
while true do
for start = #startme, 1, -1 do
temp_start = startme[start]
table.remove(startme)
_,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(temp_start.thread,unpack(temp_start.startArgs))
co_status[status(temp_start.thread)](temp_start.thread,temp_start,t_none,nil,threads) -- Make sure there was no error
table.insert(threads,temp_start)
yield()
end
for i=#threads,1,-1 do
ref = threads[i]
if ref then
task = ref.task
thd = ref.thread
ready = ref.__ready
co_status[status(thd)](thd,ref,task,i,threads)
end
yield()
end
yield()
end
end)
function multi:createHandler(threads,startme) function multi:createHandler(threads,startme)
return coroutine.wrap(function(self) return coroutine.wrap(function(self)
@ -1716,7 +1690,7 @@ function multi:newService(func) -- Priority managed threads
return c return c
end end
local th = thread:newThread(function() local th = thread:newThread("Service_Handler",function()
while true do while true do
process() process()
end end
@ -1809,7 +1783,6 @@ local function mainloop(self)
ctask:Act() ctask:Act()
__CurrentProcess = self __CurrentProcess = self
end end
handler()
end end
else else
return nil, "Already Running!" return nil, "Already Running!"
@ -1839,13 +1812,65 @@ local function p_mainloop(self)
end end
end end
end end
handler()
end end
else else
return nil, "Already Running!" return nil, "Already Running!"
end end
end end
local function doOpt()
function thread.hold(n,opt)
thread._Requests()
local opt = opt or {}
if type(opt)=="table" then
interval = opt.interval
if opt.cycles then
return yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval)
elseif opt.sleep then
return yield(CMD, t_holdF, opt.sleep, n or dFunc, interval)
elseif opt.skip then
return yield(CMD, t_skip, opt.skip or 1, nil, interval)
end
end
if type(n) == "number" then
thread.getRunningThread().lastSleep = clock()
return yield(CMD, t_sleep, n or 0, nil, interval)
elseif type(n) == "table" and n.Type == "connector" then
local rdy = function()
return false
end
n(function(a1,a2,a3,a4,a5,a6)
rdy = function()
if a1==nil then
return NIL,a2,a3,a4,a5,a6
end
return a1,a2,a3,a4,a5,a6
end
end)
return yield(CMD, t_hold, function()
return rdy()
end, nil, interval)
elseif type(n) == "function" then
local cache = string.dump(n)
local f_str = tostring(n)
local good = true
for i=1,#func_cache do
if func_cache[i][1] == cache and func_cache[i][2] ~= f_str and not func_cache[i][3] then
multi:getOptimizationConnection():Fire("It's better to store a function to a variable than to use an anonymous function within the hold method!\n" .. debug.traceback())
func_cache[i][3] = true
good = false
end
end
if good then
table.insert(func_cache, {cache, f_str})
end
return yield(CMD, t_hold, n or dFunc, nil, interval)
else
error("Invalid argument passed to thread.hold(...)!")
end
end
end
local init = false local init = false
function multi.init(settings, realsettings) function multi.init(settings, realsettings)
if settings == multi then settings = realsettings end if settings == multi then settings = realsettings end
@ -1860,23 +1885,23 @@ function multi.init(settings, realsettings)
end end
if settings.findopt then if settings.findopt then
find_optimization = true find_optimization = true
doOpt()
multi.enableOptimization:Fire(multi, thread) multi.enableOptimization:Fire(multi, thread)
end end
end end
return _G["$multi"].multi,_G["$multi"].thread return _G["$multi"].multi,_G["$multi"].thread
end end
function multi:uManager(proc) function multi:uManager()
if self.Active then if self.Active then
__CurrentProcess = self __CurrentProcess = self
multi.OnPreLoad:Fire() multi.OnPreLoad:Fire()
self.uManager=self.uManagerRef self.uManager=self.uManagerRef
multi.OnLoad:Fire() multi.OnLoad:Fire()
if not proc then handler() end
end end
end end
function multi:uManagerRefP1(proc) function multi:uManagerRefP1()
if self.Active then if self.Active then
__CurrentProcess = self __CurrentProcess = self
local Loop=self.Mainloop local Loop=self.Mainloop
@ -1889,11 +1914,10 @@ function multi:uManagerRefP1(proc)
end end
end end
end end
if not proc then handler() end
end end
end end
function multi:uManagerRef(proc) function multi:uManagerRef()
if self.Active then if self.Active then
__CurrentProcess = self __CurrentProcess = self
local Loop=self.Mainloop local Loop=self.Mainloop
@ -1902,7 +1926,6 @@ function multi:uManagerRef(proc)
__CurrentTask:Act() __CurrentTask:Act()
__CurrentProcess = self __CurrentProcess = self
end end
if not proc then handler() end
end end
end end
@ -2211,64 +2234,7 @@ else
multi.m.sentinel = newproxy(true) multi.m.sentinel = newproxy(true)
getmetatable(multi.m.sentinel).__gc = multi.m.onexit getmetatable(multi.m.sentinel).__gc = multi.m.onexit
end end
local func_cache = {}
multi:newThread(function()
thread.skip()
if find_optimization then
function thread.hold(n,opt) threadManager = multi:newProcessor("Global_Thread_Manager").Start()
thread._Requests()
local opt = opt or {}
if type(opt)=="table" then
interval = opt.interval
if opt.cycles then
return yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval)
elseif opt.sleep then
return yield(CMD, t_holdF, opt.sleep, n or dFunc, interval)
elseif opt.skip then
return yield(CMD, t_skip, opt.skip or 1, nil, interval)
end
end
if type(n) == "number" then
thread.getRunningThread().lastSleep = clock()
return yield(CMD, t_sleep, n or 0, nil, interval)
elseif type(n) == "table" and n.Type == "connector" then
local rdy = function()
return false
end
n(function(a1,a2,a3,a4,a5,a6)
rdy = function()
if a1==nil then
return NIL,a2,a3,a4,a5,a6
end
return a1,a2,a3,a4,a5,a6
end
end)
return yield(CMD, t_hold, function()
return rdy()
end, nil, interval)
elseif type(n) == "function" then
local cache = string.dump(n)
local f_str = tostring(n)
local good = true
for i=1,#func_cache do
if func_cache[i][1] == cache and func_cache[i][2] ~= f_str and not func_cache[i][3] then
multi:getOptimizationConnection():Fire("It's better to store a function to a variable than to use an anonymous function within the hold method!\n" .. debug.traceback())
func_cache[i][3] = true
good = false
end
end
if good then
table.insert(func_cache, {cache, f_str})
end
return yield(CMD, t_hold, n or dFunc, nil, interval)
else
error("Invalid argument passed to thread.hold(...)!")
end
end
-- Add more Overrides
end
end)
return multi return multi

View File

@ -20,9 +20,12 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
]] if ISTHREAD then ]]
if ISTHREAD then
error("You cannot require the loveManager from within a thread!") error("You cannot require the loveManager from within a thread!")
end end
local ThreadFileData = [[ local ThreadFileData = [[
ISTHREAD = true ISTHREAD = true
THREAD = require("multi.integration.loveManager.threads") THREAD = require("multi.integration.loveManager.threads")
@ -35,7 +38,7 @@ math.randomseed(__THREADID__)
math.random() math.random()
math.random() math.random()
math.random() math.random()
stab = THREAD.createStaticTable(__THREADNAME__) stab = THREAD.createStaticTable(__THREADNAME__ .. __THREADID__)
GLOBAL = THREAD.getGlobal() GLOBAL = THREAD.getGlobal()
multi, thread = require("multi").init() multi, thread = require("multi").init()
multi.integration={} multi.integration={}
@ -68,14 +71,13 @@ function multi:newSystemThread(name, func, ...)
GLOBAL["__THREAD_COUNT"] = THREAD_ID GLOBAL["__THREAD_COUNT"] = THREAD_ID
THREAD_ID = THREAD_ID + 1 THREAD_ID = THREAD_ID + 1
function c:getName() return c.name end function c:getName() return c.name end
thread:newThread(function() thread:newThread(name .. "_System_Thread_Handler",function()
if name == "TempSystemThread" then if name == "TempSystemThread" then
local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID) local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID)
thread.hold(function() thread.hold(function()
-- While the thread is running we might as well do something in the loop -- While the thread is running we might as well do something in the loop
local status = status_channel if status_channel:peek() ~= nil then
if status:peek() ~= nil then c.statusconnector:Fire(unpack(status_channel:pop()))
c.statusconnector:Fire(unpack(status:pop()))
end end
return not c.thread:isRunning() return not c.thread:isRunning()
end) end)
@ -98,7 +100,7 @@ end
function THREAD:newFunction(func, holdme) function THREAD:newFunction(func, holdme)
return thread:newFunctionBase(function(...) return thread:newFunctionBase(function(...)
return multi:newSystemThread("TempSystemThread", func, ...) return multi:newSystemThread("SystemThreaded Function Handler", func, ...)
end, holdme)() end, holdme)()
end end

View File

@ -25,7 +25,6 @@ require("love.timer")
require("love.system") require("love.system")
require("love.data") require("love.data")
require("love.thread") require("love.thread")
local socket = require("socket")
local multi, thread = require("multi").init() local multi, thread = require("multi").init()
local threads = {} local threads = {}
@ -49,15 +48,6 @@ local function manage(channel, value)
end end
end end
local function RandomVariable(length)
local res = {}
math.randomseed(socket.gettime()*10000)
for i = 1, length do
res[#res+1] = string.char(math.random(97, 122))
end
return table.concat(res)
end
local GNAME = "__GLOBAL_" local GNAME = "__GLOBAL_"
local proxy = {} local proxy = {}
function threads.set(name,val) function threads.set(name,val)