From f5403969321e4ed063d54a0984a1c79d8764bd6b Mon Sep 17 00:00:00 2001 From: Ryan Ward Date: Sat, 28 Oct 2023 17:57:21 -0400 Subject: [PATCH] Added boost method --- docs/changes.md | 1 + init.lua | 12 ++++- integration/pseudoManager/extensions.lua | 20 ++------ tests/test.lua | 62 +++++++++++++++--------- tests/threadtests.lua | 57 ++++------------------ 5 files changed, 63 insertions(+), 89 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index c7436b4..ba4d36d 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -84,6 +84,7 @@ Allows the user to have multi auto set priorities (Requires chronos). Also adds Added --- +- processor's now have a boost function which causes it to run its processes the number of times specified in the `boost(count)` function - thread.hold will now use a custom hold method for objects with a `Hold` method. This is called like `obj:Hold(opt)`. The only argument passed is the optional options table that thread.hold can pass. There is an exception for connection objects. While they do contain a Hold method, the Hold method isn't used and is there for proxy objects, though they can be used in non proxy/thread situations. Hold returns all the arguments that the connection object was fired with. - shared_table = STP:newSharedTable(tbl_name) -- Allows you to create a shared table that all system threads in a process have access to. Returns a reference to that table for use on the main thread. Sets `_G[tbl_name]` on the system threads so you can access it there. ```lua diff --git a/init.lua b/init.lua index 928237e..44914db 100644 --- a/init.lua +++ b/init.lua @@ -1063,7 +1063,9 @@ function multi:newProcessor(name, nothread, priority) c.parent = self c.OnObjectCreated = self:newConnection() + local boost = 1 local handler + if priority then handler = c:createPriorityHandler(c) else @@ -1113,10 +1115,16 @@ function multi:newProcessor(name, nothread, priority) end, holdme)() end + function c:boost(count) + boost = count or 1 + end + function c.run() if not Active then return end - c:uManager(true) - handler() + for i=1,boost do + c:uManager(true) + handler() + end return c end diff --git a/integration/pseudoManager/extensions.lua b/integration/pseudoManager/extensions.lua index fe0abd8..d21aebb 100644 --- a/integration/pseudoManager/extensions.lua +++ b/integration/pseudoManager/extensions.lua @@ -38,7 +38,7 @@ function multi:newSystemThreadedQueue(name) c.data = {} c.Type = multi.registerType("s_queue") function c:push(v) - table.insert(self,v) + table.insert(self.data,v) end function c:pop() return table.remove(self.data,1) @@ -163,7 +163,6 @@ function multi:newSystemThreadedJobQueue(n) local funcs = THREAD.waitFor("__JobQueue_"..jqc.."_table") local queue = THREAD.waitFor("__JobQueue_"..jqc.."_queue") local queueReturn = THREAD.waitFor("__JobQueue_"..jqc.."_queueReturn") - local lastProc = clock() local queueAll = THREAD.waitFor("__JobQueue_"..jqc.."_queueAll") local registry = {} _G["__QR"] = queueReturn @@ -173,7 +172,6 @@ function multi:newSystemThreadedJobQueue(n) thread.yield() local all = queueAll:peek() if all and not registry[all[1]] then - lastProc = os.clock() queueAll:pop()[2]() end end @@ -184,13 +182,11 @@ function multi:newSystemThreadedJobQueue(n) thread.yield() local all = queueAll:peek() if all and not registry[all[1]] then - lastProc = os.clock() queueAll:pop()[2]() end local dat = thread.hold(queue) if dat then - multi:newThread("Test",function() - lastProc = os.clock() + multi:newThread("JobSubRunner",function() local name = table.remove(dat,1) local id = table.remove(dat,1) local tab = {multi.isolateFunction(funcs[name],_G)(multi.unpack(dat))} @@ -200,18 +196,8 @@ function multi:newSystemThreadedJobQueue(n) end end end) - thread:newThread("Idler",function() - while true do - thread.yield() - if clock()-lastProc> 2 then - THREAD.sleep(.05) - else - THREAD.sleep(.001) - end - end - end) multi:mainloop() - end,jqc) + end, jqc) end function c:Hold(opt) diff --git a/tests/test.lua b/tests/test.lua index f96eeb1..6a0479e 100644 --- a/tests/test.lua +++ b/tests/test.lua @@ -1,14 +1,19 @@ package.path = "../?/init.lua;../?.lua;"..package.path multi, thread = require("multi"):init{print=true,warn=true,error=true,debugging=true} -require("multi.integration.priorityManager") +-- require("multi.integration.priorityManager") + +-- multi.debugging.OnObjectCreated(function(obj, process) +-- multi.print("Created:", obj.Type, "in", process.Type, process:getFullName()) +-- end) + +-- multi.debugging.OnObjectDestroyed(function(obj, process) +-- multi.print("Destroyed:", obj.Type, "in", process.Type, process:getFullName()) +-- end) + + + -multi.debugging.OnObjectCreated(function(obj, process) - multi.print("Created:", obj.Type, "in", process.Type, process:getFullName()) -end) -multi.debugging.OnObjectDestroyed(function(obj, process) - multi.print("Destroyed:", obj.Type, "in", process.Type, process:getFullName()) -end) -- test = multi:newProcessor("Test") -- test:setPriorityScheme(multi.priorityScheme.TimeBased) @@ -60,26 +65,37 @@ end) -- end) -- conn5:Fire() -multi.print("Testing thread:newProcessor()") -proc = thread:newProcessor("Test") -proc:newLoop(function() - multi.print("Running...") - thread.sleep(1) + + +-- multi.print("Testing thread:newProcessor()") + +-- proc = thread:newProcessor("Test") + +-- proc:newLoop(function() +-- multi.print("Running...") +-- thread.sleep(1) +-- end) + +-- proc:newThread(function() +-- while true do +-- multi.warn("Everything is a thread in this proc!") +-- thread.sleep(1) +-- end +-- end) + +-- proc:newAlarm(5):OnRing(function(a) +-- multi.print(";) Goodbye") +-- a:Destroy() +-- end) + +local func = thread:newFunction(function() + thread.sleep(4) + print("Hello!") end) -proc:newThread(function() - while true do - multi.warn("Everything is a thread in this proc!") - thread.sleep(1) - end -end) - -proc:newAlarm(5):OnRing(function(a) - multi.print(";) Goodbye") - a:Destroy() -end) +multi:newTLoop(func, 1) multi:mainloop() diff --git a/tests/threadtests.lua b/tests/threadtests.lua index f3298a2..0b91544 100644 --- a/tests/threadtests.lua +++ b/tests/threadtests.lua @@ -47,7 +47,9 @@ multi:newThread("Scheduler Thread",function() queue = multi:newSystemThreadedQueue("Test_Queue"):init() multi:newSystemThread("Test_Thread_0", function() - print("The name should be Test_Thread_0",THREAD_NAME,THREAD_NAME,_G.THREAD_NAME) + if THREAD_NAME~="Test_Thread_0" then + multi.error("The name should be Test_Thread_0",THREAD_NAME,THREAD_NAME,_G.THREAD_NAME) + end end) th1 = multi:newSystemThread("Test_Thread_1", function(a,b,c,d,e,f) @@ -97,7 +99,7 @@ multi:newThread("Scheduler Thread",function() end).OnError(multi.error) multi:newThread("test2",function() - print(thread.hold(function() return test["test2"] end)) + thread.hold(function() return test["test2"] end) worked = true end) @@ -114,7 +116,7 @@ multi:newThread("Scheduler Thread",function() local ready = false - jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads + jq = multi:newSystemThreadedJobQueue(1) -- Job queue with 4 worker threads func2 = jq:newFunction("sleep",function(a,b) THREAD.sleep(.2) end) @@ -131,7 +133,7 @@ multi:newThread("Scheduler Thread",function() t, val = thread.hold(function() return count == 10 - end,{sleep=2}) + end,{sleep=3}) if val == multi.TIMEOUT then multi.error("SystemThreadedJobQueues: Failed") @@ -140,46 +142,6 @@ multi:newThread("Scheduler Thread",function() multi.success("SystemThreadedJobQueues: Ok") - -- queue2 = multi:newSystemThreadedQueue("Test_Queue2"):init() - -- multi:newSystemThread("Test_Thread_2",function() - -- queue2 = THREAD.waitFor("Test_Queue2"):init() - -- connOut = THREAD.waitFor("ConnectionNAMEHERE"):init() - -- connOut(function(arg) - -- queue2:push("Test_Thread_2") - -- end) - -- multi:mainloop() - -- end).OnError(multi.error) - -- multi:newSystemThread("Test_Thread_3",function() - -- queue2 = THREAD.waitFor("Test_Queue2"):init() - -- connOut = THREAD.waitFor("ConnectionNAMEHERE"):init() - -- connOut(function(arg) - -- queue2:push("Test_Thread_3") - -- end) - -- multi:mainloop() - -- end).OnError(multi.error) - -- connOut = multi:newSystemThreadedConnection("ConnectionNAMEHERE"):init() - -- a=0 - -- connOut(function(arg) - -- queue2:push("Main") - -- end) - -- for i=1,3 do - -- thread.sleep(.1) - -- connOut:Fire("Test From Main Thread: "..i.."\n") - -- end - -- thread.sleep(2) - -- local count = 0 - -- multi:newThread(function() - -- while count < 9 do - -- if queue2:pop() then - -- count = count + 1 - -- end - -- end - -- end).OnError(multi.error) - -- _, err = thread.hold(function() return count == 9 end,{sleep=.3}) - -- if err == multi.TIMEOUT then - -- multi.error("SystemThreadedConnections: Failed") - -- end - -- multi.success("SystemThreadedConnections: Ok") local proxy_test = false local stp = multi:newSystemThreadedProcessor(5) @@ -213,11 +175,12 @@ multi:newThread("Scheduler Thread",function() multi.print("Held on proxy connection... once") thread.hold(tloop.OnLoop) multi.print("Held on proxy connection... twice") + thread.hold(tloop.OnLoop) + multi.print("Held on proxy connection... finally") proxy_test = true end) thread:newThread(function() - print("While Test!") while true do thread.hold(tloop.OnLoop) print(THREAD_NAME,"Local Loopy") @@ -230,7 +193,7 @@ multi:newThread("Scheduler Thread",function() t, val = thread.hold(function() return proxy_test - end--[[,{sleep=5}]]) -- No timeouts + end,{sleep=10}) if val == multi.TIMEOUT then multi.error("SystemThreadedProcessor/Proxies: Failed") @@ -247,7 +210,7 @@ multi:newThread("Scheduler Thread",function() end) multi.OnExit(function(err_or_errorcode) - print("Error Code: ", err_or_errorcode) + multi.print("Error Code: ", err_or_errorcode) if not we_good then multi.print("There was an error running some tests!") return