From 0994ee2d2ad20a7cfd30ca8f711955f435d22888 Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Thu, 19 Jan 2023 00:16:32 -0500 Subject: [PATCH] Working on issue where threads created in threads don't work --- init.lua | 230 ++++++++++++---------------- integration/loveManager/init.lua | 16 +- integration/loveManager/threads.lua | 10 -- 3 files changed, 107 insertions(+), 149 deletions(-) diff --git a/init.lua b/init.lua index 62eeabd..9f38426 100644 --- a/init.lua +++ b/init.lua @@ -22,16 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] -traceback = debug.traceback - local multi = {} local mainloopActive = false local isRunning = false local clock = os.clock local thread = {} -local in_proc = false local processes = {} local find_optimization = false +local threadManager if not _G["$multi"] then _G["$multi"] = {multi=multi,thread=thread} @@ -387,7 +385,7 @@ function multi:newConnection(protect, func, kill) if not call_funcs[i] then return end local suc, err = pcall(call_funcs[i],...) if not suc then - print(err) + multi.print(err) end if kill then table.remove(call_funcs,i) @@ -667,7 +665,7 @@ function multi:newEvent(task) end c.OnEvent = self:newConnection():fastMode() self:setPriority("core") - c:SetName(c.Type) + c:setName(c.Type) multi:create(c) return c end @@ -689,7 +687,7 @@ function multi:newUpdater(skip) return self end c.OnUpdate = self:newConnection():fastMode() - c:SetName(c.Type) + c:setName(c.Type) multi:create(c) return c end @@ -726,7 +724,7 @@ function multi:newAlarm(set) self.Parent.Pause(self) return self end - c:SetName(c.Type) + c:setName(c.Type) multi:create(c) return c end @@ -751,7 +749,7 @@ function multi:newLoop(func,notime) end multi:create(c) - c:SetName(c.Type) + c:setName(c.Type) return c end @@ -809,7 +807,7 @@ function multi:newStep(start,reset,count,skip) self:Resume() return self end - c:SetName(c.Type) + c:setName(c.Type) multi:create(c) return c end @@ -845,13 +843,13 @@ function multi:newTLoop(func,set) if func then c.OnLoop(func) end - c:SetName(c.Type) + c:setName(c.Type) multi:create(c) return c end -function multi:setTimeout(func,t) - thread:newThread(function() thread.sleep(t) func() end) +function multi:setTimeout(func, t) + thread:newThread("TimeoutThread",function() thread.sleep(t) func() end) end function multi:newTStep(start,reset,count,set) @@ -895,7 +893,7 @@ function multi:newTStep(start,reset,count,set) self:Resume() return self end - c:SetName(c.Type) + c:setName(c.Type) multi:create(c) return c end @@ -909,7 +907,7 @@ local function _task_handler() end function multi:newTask(func) - multi:newThread("Task Handler",function() + thread:newThread("Task Handler",function() while true do thread.hold(function() return _tasks > 0 @@ -1022,16 +1020,13 @@ function multi:newProcessor(name, nothread) return self.Name end - function c:newThread(name,func,...) - in_proc = c - local t = thread.newThread(c,name,func,...) - in_proc = false - return t + function c:newThread(name, func,...) + return thread.newThread(c, name, func, ...) end function c:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return c:newThread("TempThread",func,...) + return c:newThread("Threaded Function Handler", func, ...) end, holdme)() end @@ -1324,7 +1319,7 @@ function thread:newFunctionBase(generator, holdme) f(nil,"Function is paused") end } - end + end local t = generator(...) t.OnDeath(function(...) rets = {...} end) t.OnError(function(self,e) err = e end) @@ -1358,10 +1353,10 @@ function thread:newFunctionBase(generator, holdme) end end -function thread:newFunction(func,holdme) +function thread:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return thread:newThread("TempThread",func,...) - end,holdme)() + return thread:newThread("Threaded Function Handler", func, ...) + end, holdme)() end -- A cross version way to set enviroments, not the same as fenv though @@ -1388,7 +1383,7 @@ function thread:newThread(name,func,...) c.Name=name c.thread=create(func) c.sleep=1 - c.Type="thread" + c.Type = "thread" c.TID = threadid c.firstRunDone=false c._isPaused = false @@ -1427,12 +1422,12 @@ function thread:newThread(name,func,...) end function c:Kill() - thread.request(self,"kill") + thread.request(self, "kill") return self end function c:Sleep(n) - thread.request(self,"exec",function() + thread.request(self, "exec",function() thread.sleep(n) resumed = false end) @@ -1440,8 +1435,8 @@ function thread:newThread(name,func,...) end function c:Hold(n,opt) - thread.request(self,"exec",function() - thread.hold(n,opt) + thread.request(self, "exec",function() + thread.hold(n, opt) resumed = false end) return self @@ -1449,13 +1444,17 @@ function thread:newThread(name,func,...) c.Destroy = c.Kill - if self.Type=="process" then - table.insert(self.startme,c) + if self.Type == "process" then + multi.print("Creating thread (" .. self.Name .."):", name) + table.insert(self.startme, c) else - table.insert(startme,c) + multi.print("Creating thread (Global_Thread_Manager):", name) + if type(name) == "function" then + multi.print(debug.traceback()) + end + table.insert(threadManager.startme, c) end - - startme_len = #startme + globalThreads[c] = multi threadid = threadid + 1 multi:create(c) @@ -1463,7 +1462,7 @@ function thread:newThread(name,func,...) return c end -function thread:newISOThread(name,func,_env,...) +function thread:newISOThread(name, func, _env, ...) local func = func or name local env = _env or {} if not env.thread then @@ -1473,10 +1472,10 @@ function thread:newISOThread(name,func,_env,...) env.multi = multi end if type(name) == "function" then - name = "Thread#" .. threadCount + name = "Thread#"..threadCount end - local func = isolateFunction(func, env) - return thread:newThread(name, func,...) + local func = isolateFunction(func,env) + return thread:newThread(name,func,...) end multi.newThread = thread.newThread @@ -1488,7 +1487,7 @@ local ret,_ local task, thd, ref, ready local switch = { function(th,co)--hold - if clock() - th.intervalR >= th.interval then + if clock() - th.intervalR>=th.interval then t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = th.func() if t0 then if t0==NIL then t0 = nil end @@ -1617,7 +1616,6 @@ co_status = { else ref.OnError:Fire(ref,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) end - print(ref.Name, traceback()) if i then table.remove(th,i) else @@ -1632,30 +1630,6 @@ co_status = { ref.__processed = true end, } -handler = coroutine.wrap(function(self) - local temp_start - while true do - for start = #startme, 1, -1 do - temp_start = startme[start] - table.remove(startme) - _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = resume(temp_start.thread,unpack(temp_start.startArgs)) - co_status[status(temp_start.thread)](temp_start.thread,temp_start,t_none,nil,threads) -- Make sure there was no error - table.insert(threads,temp_start) - yield() - end - for i=#threads,1,-1 do - ref = threads[i] - if ref then - task = ref.task - thd = ref.thread - ready = ref.__ready - co_status[status(thd)](thd,ref,task,i,threads) - end - yield() - end - yield() - end -end) function multi:createHandler(threads,startme) return coroutine.wrap(function(self) @@ -1716,7 +1690,7 @@ function multi:newService(func) -- Priority managed threads return c end - local th = thread:newThread(function() + local th = thread:newThread("Service_Handler",function() while true do process() end @@ -1809,7 +1783,6 @@ local function mainloop(self) ctask:Act() __CurrentProcess = self end - handler() end else return nil, "Already Running!" @@ -1839,13 +1812,65 @@ local function p_mainloop(self) end end end - handler() end else return nil, "Already Running!" end end +local function doOpt() + function thread.hold(n,opt) + thread._Requests() + local opt = opt or {} + if type(opt)=="table" then + interval = opt.interval + if opt.cycles then + return yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval) + elseif opt.sleep then + return yield(CMD, t_holdF, opt.sleep, n or dFunc, interval) + elseif opt.skip then + return yield(CMD, t_skip, opt.skip or 1, nil, interval) + end + end + if type(n) == "number" then + thread.getRunningThread().lastSleep = clock() + return yield(CMD, t_sleep, n or 0, nil, interval) + elseif type(n) == "table" and n.Type == "connector" then + local rdy = function() + return false + end + n(function(a1,a2,a3,a4,a5,a6) + rdy = function() + if a1==nil then + return NIL,a2,a3,a4,a5,a6 + end + return a1,a2,a3,a4,a5,a6 + end + end) + return yield(CMD, t_hold, function() + return rdy() + end, nil, interval) + elseif type(n) == "function" then + local cache = string.dump(n) + local f_str = tostring(n) + local good = true + for i=1,#func_cache do + if func_cache[i][1] == cache and func_cache[i][2] ~= f_str and not func_cache[i][3] then + multi:getOptimizationConnection():Fire("It's better to store a function to a variable than to use an anonymous function within the hold method!\n" .. debug.traceback()) + func_cache[i][3] = true + good = false + end + end + if good then + table.insert(func_cache, {cache, f_str}) + end + return yield(CMD, t_hold, n or dFunc, nil, interval) + else + error("Invalid argument passed to thread.hold(...)!") + end + end +end + local init = false function multi.init(settings, realsettings) if settings == multi then settings = realsettings end @@ -1860,23 +1885,23 @@ function multi.init(settings, realsettings) end if settings.findopt then find_optimization = true + doOpt() multi.enableOptimization:Fire(multi, thread) end end return _G["$multi"].multi,_G["$multi"].thread end -function multi:uManager(proc) +function multi:uManager() if self.Active then __CurrentProcess = self multi.OnPreLoad:Fire() self.uManager=self.uManagerRef multi.OnLoad:Fire() - if not proc then handler() end end end -function multi:uManagerRefP1(proc) +function multi:uManagerRefP1() if self.Active then __CurrentProcess = self local Loop=self.Mainloop @@ -1889,11 +1914,10 @@ function multi:uManagerRefP1(proc) end end end - if not proc then handler() end end end -function multi:uManagerRef(proc) +function multi:uManagerRef() if self.Active then __CurrentProcess = self local Loop=self.Mainloop @@ -1902,7 +1926,6 @@ function multi:uManagerRef(proc) __CurrentTask:Act() __CurrentProcess = self end - if not proc then handler() end end end @@ -2211,64 +2234,7 @@ else multi.m.sentinel = newproxy(true) getmetatable(multi.m.sentinel).__gc = multi.m.onexit end -local func_cache = {} -multi:newThread(function() - thread.skip() - if find_optimization then - - function thread.hold(n,opt) - thread._Requests() - local opt = opt or {} - if type(opt)=="table" then - interval = opt.interval - if opt.cycles then - return yield(CMD, t_holdW, opt.cycles or 1, n or dFunc, interval) - elseif opt.sleep then - return yield(CMD, t_holdF, opt.sleep, n or dFunc, interval) - elseif opt.skip then - return yield(CMD, t_skip, opt.skip or 1, nil, interval) - end - end - - if type(n) == "number" then - thread.getRunningThread().lastSleep = clock() - return yield(CMD, t_sleep, n or 0, nil, interval) - elseif type(n) == "table" and n.Type == "connector" then - local rdy = function() - return false - end - n(function(a1,a2,a3,a4,a5,a6) - rdy = function() - if a1==nil then - return NIL,a2,a3,a4,a5,a6 - end - return a1,a2,a3,a4,a5,a6 - end - end) - return yield(CMD, t_hold, function() - return rdy() - end, nil, interval) - elseif type(n) == "function" then - local cache = string.dump(n) - local f_str = tostring(n) - local good = true - for i=1,#func_cache do - if func_cache[i][1] == cache and func_cache[i][2] ~= f_str and not func_cache[i][3] then - multi:getOptimizationConnection():Fire("It's better to store a function to a variable than to use an anonymous function within the hold method!\n" .. debug.traceback()) - func_cache[i][3] = true - good = false - end - end - if good then - table.insert(func_cache, {cache, f_str}) - end - return yield(CMD, t_hold, n or dFunc, nil, interval) - else - error("Invalid argument passed to thread.hold(...)!") - end - end - -- Add more Overrides - end -end) + +threadManager = multi:newProcessor("Global_Thread_Manager").Start() return multi \ No newline at end of file diff --git a/integration/loveManager/init.lua b/integration/loveManager/init.lua index ed415ae..0cb3ee9 100644 --- a/integration/loveManager/init.lua +++ b/integration/loveManager/init.lua @@ -20,9 +20,12 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -]] if ISTHREAD then +]] + +if ISTHREAD then error("You cannot require the loveManager from within a thread!") end + local ThreadFileData = [[ ISTHREAD = true THREAD = require("multi.integration.loveManager.threads") @@ -35,7 +38,7 @@ math.randomseed(__THREADID__) math.random() math.random() math.random() -stab = THREAD.createStaticTable(__THREADNAME__) +stab = THREAD.createStaticTable(__THREADNAME__ .. __THREADID__) GLOBAL = THREAD.getGlobal() multi, thread = require("multi").init() multi.integration={} @@ -68,14 +71,13 @@ function multi:newSystemThread(name, func, ...) GLOBAL["__THREAD_COUNT"] = THREAD_ID THREAD_ID = THREAD_ID + 1 function c:getName() return c.name end - thread:newThread(function() + thread:newThread(name .. "_System_Thread_Handler",function() if name == "TempSystemThread" then local status_channel = love.thread.getChannel("STATCHAN_" .. c.ID) thread.hold(function() -- While the thread is running we might as well do something in the loop - local status = status_channel - if status:peek() ~= nil then - c.statusconnector:Fire(unpack(status:pop())) + if status_channel:peek() ~= nil then + c.statusconnector:Fire(unpack(status_channel:pop())) end return not c.thread:isRunning() end) @@ -98,7 +100,7 @@ end function THREAD:newFunction(func, holdme) return thread:newFunctionBase(function(...) - return multi:newSystemThread("TempSystemThread", func, ...) + return multi:newSystemThread("SystemThreaded Function Handler", func, ...) end, holdme)() end diff --git a/integration/loveManager/threads.lua b/integration/loveManager/threads.lua index d867d6d..41e573f 100644 --- a/integration/loveManager/threads.lua +++ b/integration/loveManager/threads.lua @@ -25,7 +25,6 @@ require("love.timer") require("love.system") require("love.data") require("love.thread") -local socket = require("socket") local multi, thread = require("multi").init() local threads = {} @@ -49,15 +48,6 @@ local function manage(channel, value) end end -local function RandomVariable(length) - local res = {} - math.randomseed(socket.gettime()*10000) - for i = 1, length do - res[#res+1] = string.char(math.random(97, 122)) - end - return table.concat(res) -end - local GNAME = "__GLOBAL_" local proxy = {} function threads.set(name,val)