diff --git a/Documentation.md b/Documentation.md index 4216445..4fa437b 100644 --- a/Documentation.md +++ b/Documentation.md @@ -1,8 +1,10 @@ -Current Multi Version: 15.0.0 +Current Multi Version: 15.1.0 # Multi static variables `multi.Version` — The current version of the library +`multi.TIMEOUT` — The value returned when a timed method times out + `multi.Priority_Core` — Highest level of pirority that can be given to a process
`multi.Priority_Very_High`
`multi.Priority_High` @@ -20,6 +22,74 @@ Current Multi Version: 15.0.0
`multi:threadloop([TABLE settings])` — This runs the mainloop by having its own internal while loop running, but prioritizes threads over multi-objects
`multi:uManager([TABLE settings])` — This runs the mainloop, but does not have its own while loop and thus needs to be within a loop of some kind. +# Global Methods + +`multi:init()` — Uesd to initiate the library, should only be called once +`multi.getCurrentProcess()` — Returns currently running Process +`multi.` + +# Processor Methods + +These methods can be called either on the multi namespace or a process returned by `proc = multi:newProcessor()` + +`proc.Stop()` — Stops the main process/child process. **Note:** If the main process is stopped all child processes are stopped as well +`proc:getTasksDetails([STRING: displaytype])` — Gets a table or string of all the running tasks + +Processor Attributes +--- + +| Attribute | Type | Returns | Description | +---|---|---|--- +Start|Method()|self| Starts the process +Stop|Method()|self| Stops the process +OnError|Connection|connection| Allows connection to the process error handler +Type|Member:`string`|"process"| Contains the type of object +Active|Member:`boolean`|variable| If false the process is not active +Name|Member:`string`|variable| The name set at process creation +process|Thread|thread| A handle to a multi thread object + +[Refer to the objects for more methods](#non-actors) + +Example: +```lua +package.path = "./?/init.lua;"..package.path +multi,thread = require("multi"):init() + +-- Create a processor object, it works a lot like the multi object +sandbox = multi:newProcessor() + +-- On our processor object create a TLoop that prints "testing..." every second +sandbox:newTLoop(function() + print("testing...") +end,1) + +-- Create a thread on the processor object +sandbox:newThread("Test Thread",function() + -- Create a counter named 'a' + local a = 0 + -- Start of the while loop that ends when a = 10 + while true do + -- pause execution of the thread for 1 second + thread.sleep(1) + -- increment a by 1 + a = a + 1 + -- display the name of the current process + print("Thread Test: ".. multi.getCurrentProcess().Name) + if a == 10 then + -- Stopping the processor stops all objects created inside that process including threads. In the backend threads use a regular multiobject to handle the scheduler and all of the holding functions. These all stop when a processor is stopped. This can be really useful to sandbox processes that might need to turned on and off with ease and not having to think about it. + sandbox.Stop() + end + end + -- Catch any errors that may come up +end).OnError(function(...) + print(...) +end) + +sandbox.Start() -- Start the process + +multi:mainloop() -- The main loop that allows all processes to continue +``` + # Multi Settings **Note:** Most settings have been fined tuned to be at the peak of performance already, however preLoop, protect (Which drastically lowers preformance), and stopOnError should be used freely to fit your needs. @@ -150,6 +220,9 @@ returns or nil The connect feature has some syntax sugar to it as seen below - `link = conn(FUNCTION func, [STRING name nil], [NUMBER #conns+1])` +- `combinedconn = conn1 + conn2` — A combined connection is triggered when all connections are triggered. See example [here](#coroutine-based-threading-cbt) + + Example: ```lua @@ -425,7 +498,23 @@ Helpful methods are wrapped around the builtin coroutine module which make it fe **threads.\* used within threaded enviroments** - `thread.sleep(NUMBER n)` — Holds execution of the thread until a certain amount of time has passed -- `VARIABLE returns = thread.hold(FUNCTION func)` — Hold execution until the function returns non nil. All returns are passed to the thread once the conditions have been met. To pass nil use `multi.NIL`\* +- `VARIABLE val = THREAD.hold(FUNCTION|CONNCETION|NUMBER func, TABLE options)` — Holds the current thread until a condition is met + + | Option | Description | + ---|--- + | interval | Time between each poll | + | cycles | Number of cycles before timing out | + | sleep | Number of seconds before timing out | + | skip | Number of cycles before testing again, does not cause a timeout! | + + **Note:** cycles and sleep options cannot both be used at the same time. Interval and skip cannot be used at the same time either. Cycles take priority over sleep if both are present! HoldFor and HoldWithin can be emulated using the new features. Old functions will remain for backward compatibility. + + Using cycles, sleep or interval will cause a timeout; returning nil, multi.TIMEOUT + + `func` can be a number and `thread.hold` will act like `thread.sleep`. When `func` is a number the option table will be ignored! + + `func` can be a connection and will hold until the condition is triggered. When using a connection the option table is ignored! + - `thread.skip(NUMBER n)` — How many cycles should be skipped until I execute again - `thread.kill()` — Kills the thread - `thread.yeild()` — Is the same as using thread.skip(0) or thread.sleep(0), hands off control until the next cycle @@ -438,9 +527,64 @@ Helpful methods are wrapped around the builtin coroutine module which make it fe - `th = thread.getRunningThread()` — Returns the currently running thread - `VARIABLE returns or nil, "TIMEOUT" = thread.holdFor(NUMBER: sec, FUNCTION: condition)` — Holds until a condidtion is met, or if there is a timeout nil,"TIMEOUT" - `VARIABLE returns or nil, "TIMEOUT" = thread.holdWithin(NUMBER: skip, FUNCTION: func)` — Holds until a condition is met or n cycles have happened. -- `returns or handler = thread:newFunction(FUNCTION: func, [BOOLEAN: holdme false])` — func: The function you want to be threaded. holdme: If true the function waits until it has returns and then returns them. Otherwise the function returns a table - - `handler.connect(Function: func(returns))` — Connects to the event that is triggered when the returns are avaiable - - `VARIAABLE returns = handler.wait()` — Waits until returns are avaiable and then returns them +- `func = thread:newFunction(FUNCTION: func, [BOOLEAN: holdme false])` — func: The function you want to be threaded. holdme: If true the function waits until it has returns and then returns them. Otherwise the function returns a table + - `func:Pause()` — Pauses a function, function will return `nil`, `"Function is paused"` + - `func:Resume()` — Resumes a paused function + - `func:holdMe(BOOLEAN: set)` — Sets the holdme argument to `set` + - `handler = func(VARIABLE args)` — Calls the function, will return + - `handler.isTFunc` — if true then its a threaded function + - `handler.wait()` — waits for the function to finish and returns like normal + - `handler.connect(Function: func(returns))` — Connects to the event that is triggered when the returns are avaiable and returns them + - `VARIABLE returns = handler.wait()` — Waits until returns are avaiable and then + - `handler.OnStatus(connector(VARIABLE args))` — A connection to the running function's status see example below + - `handler.OnReturn(connector(VARIABLE args))` — A connection that is triggered when the running function is finished see example below + - `handler.OnError(connector(nil,error))` + +Example: + +```lua +package.path = "./?/init.lua;"..package.path +multi,thread = require("multi"):init() + +func = thread:newFunction(function(count) + local a = 0 + while true do + a = a + 1 + thread.sleep(.1) + thread.pushStatus(a,count) + if a == count then break end + end + return "Done" +end) + +multi:newThread("Function Status Test",function() + local ret = func(10) + local ret2 = func(15) + local ret3 = func(20) + ret.OnStatus(function(part,whole) + --[[ Print out the current status. In this case every second it will update with: + 10% + 20% + 30% + ... + 100% + + Function Done! + ]] + print(math.ceil((part/whole)*1000)/10 .."%") + end) + ret2.OnStatus(function(part,whole) + print("Ret2: ",math.ceil((part/whole)*1000)/10 .."%") + end) + ret3.OnStatus(function(part,whole) + print("Ret3: ",math.ceil((part/whole)*1000)/10 .."%") + end) + -- Connections can now be added together, if you had multiple holds and one finished before others and wasn't consumed it would lock forever! This is now fixed + thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn) + print("Function Done!") + os.exit() +end) +``` \*A note about multi.NIL, this should only be used within the hold and hold like methods. thread.hold(), thread.holdFor(), and thread.holdWithin() methods. This is not needed within threaded functions! The reason hold prevents nil and false is because it is testing for a condition so the first argument needs to be non nil nor false! multi.NIL should not be used anywhere else. Sometimes you may need to pass a 'nil' value or return. While you could always return true or something you could use multi.NIL to force a nil value through a hold like method. @@ -651,7 +795,7 @@ Using this integration modifies some methods that the multi library has. - `THREAD.kill()` — Kills the thread - `THREAD.getName()` — Returns the name of the working thread - `THREAD.sleep(NUMBER n)` — Sleeps for an amount of time stopping the current thread -- `THREAD.hold(FUNCTION func)` — Holds the current thread until a condition is met +- `THREAD.hold(FUNCTION func, TABLE options)` — Holds the current thread until a condition is met - `THREAD.getID()` — returns a unique ID for the current thread. This varaiable is visible to the main thread as well as by accessing it through the returned thread object. OBJ.Id # ST - GLOBAL namespace @@ -762,6 +906,7 @@ console.print("Hello World!") ``` # ST - SystemThreadedJobQueue `jq = multi:newSystemThreadedJobQueue([NUMBER: threads])` — Creates a system threaded job queue with an optional number of threads +- `boolean jq:isEmpty()` — Returns true if the jobqueue is empty false otherwise - `jq.cores = (supplied number) or (the number of cores on your system*2)` - `jq.OnJobCompleted(FUNCTION: func(jID,...))` — Connection that is triggered when a job has been completed. The jobID and returns of the job are supplies as arguments - `self = jq:doToAll(FUNCTION: func)` — Send data to every thread in the job queue. Useful if you want to require a module and have it available on all threads @@ -770,6 +915,7 @@ console.print("Hello World!") - `handler = jq:newFunction([STRING: name], FUNCTION: func)` — returns a threaded Function that wraps around jq.registerFunction, jq.pushJob() and jq.OnJobCompleted() to provide an easy way to create and work with the jobqueue - `handler.connect(Function: func(returns))` — Connects to the event that is triggered when the returns are avaiable - `VARIAABLE returns = handler.wait()` — Waits until returns are avaiable and then returns them + **Note:** Created functions using this method act as normal functions on the queue side of things. So you can call the functions from other queue functions as if they were normal functions. Example: diff --git a/README.md b/README.md index 234fce7..b0848c7 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ -# Multi Version: 15.0.0 Fake it till you make it +# Multi Version: 15.1.0 Hold the thread **Key Changes** -- Emulating system threading on a single thread - - Purpose to allow consistant code that can scale when threading is available. Check out the changelog for more details -- Proper support for lua versions above 5.1 (More testing is needed, a full test suite is being developed and should be made available soon) +- thread.hold has been updated to allow all variants to work as well as some new features. Check the changelog or documentation for more info. +- multi:newProccesor() Creates a process that acts like the multi namespace that can be managed independently from the mainloop. +- Connections can be added together -Found an issue? Please [submit it](https://github.com/rayaman/multi/issues) and I'll look into it! +Found an issue? Please [submit it](https://github.com/rayaman/multi/issues) and someone will look into it! -My multitasking library for lua. It is a pure lua binding, with exceptions of the integrations and the love2d compat. If you find any bugs or have any issues, please [let me know](https://github.com/rayaman/multi/issues) and I'll look into it!. +My multitasking library for lua. It is a pure lua binding, with exceptions of the integrations and the love2d compat. INSTALLING ---------- @@ -22,7 +22,7 @@ Going forward I will include a Release zip for love2d. Discord ------- -Have a question that you need asking? Or need realtime assistance? Feel free to join the discord!
+Have a question? Or need realtime assistance? Feel free to join the discord!
https://discord.gg/U8UspuA
Planned features/TODO @@ -30,10 +30,10 @@ Planned features/TODO - [x] ~~Finish Documentation~~ Finished - [ ] Create test suite - [ ] Network Parallelism rework -- [ ] Fix some bugs Usage: [Check out the documentation for more info](https://github.com/rayaman/multi/blob/master/Documentation.md)
----- + ```lua package.path="?.lua;?/init.lua;?.lua;?/?/init.lua;"..package.path local multi, thread = require("multi"):init() diff --git a/changes.md b/changes.md index 1a3330f..bc444c4 100644 --- a/changes.md +++ b/changes.md @@ -1,7 +1,378 @@ # Changelog Table of contents --- -[Update 15.0.0 - The art of faking it](#update-1500---the-art-of-faking-it)
[Update 14.2.0 - Bloatware Removed](#update-1420---bloatware-removed)
[Update 14.1.0 - A whole new world of possibilities](#update-1410---a-whole-new-world-of-possibilities)
[Update 14.0.0 - Consistency, Additions and Stability](#update-1400---consistency-additions-and-stability)
[Update 13.1.0 - Bug fixes and features added](#update-1310---bug-fixes-and-features-added)
[Update 13.0.0 - Added some documentation, and some new features too check it out!](#update-1300---added-some-documentation-and-some-new-features-too-check-it-out)
[Update 12.2.2 - Time for some more bug fixes!](#update-1222---time-for-some-more-bug-fixes)
[Update 12.2.1 - Time for some bug fixes!](#update-1221---time-for-some-bug-fixes)
[Update 12.2.0 - The chains of binding](#update-1220---the-chains-of-binding)
[Update 12.1.0 - Threads just can't hold on anymore](#update-1210---threads-just-cant-hold-on-anymore)
[Update: 12.0.0 - Big update (Lots of additions some changes)](#update-1200---big-update-lots-of-additions-some-changes)
[Update: 1.11.1 - Small Clarification on Love](#update-1111---small-clarification-on-love)
[Update: 1.11.0](#update-1110)
[Update: 1.10.0](#update-1100)
[Update: 1.9.2](#update-192)
[Update: 1.9.1 - Threads can now argue](#update-191---threads-can-now-argue)
[Update: 1.9.0](#update-190)
[Update: 1.8.7](#update-187)
[Update: 1.8.6](#update-186)
[Update: 1.8.5](#update-185)
[Update: 1.8.4](#update-184)
[Update: 1.8.3 - Mainloop recieves some needed overhauling](#update-183---mainloop-recieves-some-needed-overhauling)
[Update: 1.8.2](#update-182)
[Update: 1.8.1](#update-181)
[Update: 1.7.6](#update-176)
[Update: 1.7.5](#update-175)
[Update: 1.7.4](#update-174)
[Update: 1.7.3](#update-173)
[Update: 1.7.2](#update-172)
[Update: 1.7.1 - Bug Fixes Only](#update-171---bug-fixes-only)
[Update: 1.7.0 - Threading the systems](#update-170---threading-the-systems)
[Update: 1.6.0](#update-160)
[Update: 1.5.0](#update-150)
[Update: 1.4.1 (4/10/2017) - First Public release of the library](#update-141-4102017---first-public-release-of-the-library)
[Update: 1.4.0 (3/20/2017)](#update-140-3202017)
[Update: 1.3.0 (1/29/2017)](#update-130-1292017)
[Update: 1.2.0 (12.31.2016)](#update-120-12312016)
[Update: 1.1.0](#update-110)
[Update: 1.0.0](#update-100)
[Update: 0.6.3](#update-063)
[Update: 0.6.2](#update-062)
[Update: 0.6.1-6](#update-061-6)
[Update: 0.5.1-6](#update-051-6)
[Update: 0.4.1](#update-041)
[Update: 0.3.0 - The update that started it all](#update-030---the-update-that-started-it-all)
[Update: EventManager 2.0.0](#update-eventmanager-200)
[Update: EventManager 1.2.0](#update-eventmanager-120)
[Update: EventManager 1.1.0](#update-eventmanager-110)
[Update: EventManager 1.0.0 - Error checking](#update-eventmanager-100---error-checking)
[Version: EventManager 0.0.1 - In The Beginning things were very different](#version-eventmanager-001---in-the-beginning-things-were-very-different) +[Update 15.1.0 - Hold the thread!](#update-1510---hold-the-thread)
[Update 15.0.0 - The art of faking it](#update-1500---the-art-of-faking-it)
[Update 14.2.0 - Bloatware Removed](#update-1420---bloatware-removed)
[Update 14.1.0 - A whole new world of possibilities](#update-1410---a-whole-new-world-of-possibilities)
[Update 14.0.0 - Consistency, Additions and Stability](#update-1400---consistency-additions-and-stability)
[Update 13.1.0 - Bug fixes and features added](#update-1310---bug-fixes-and-features-added)
[Update 13.0.0 - Added some documentation, and some new features too check it out!](#update-1300---added-some-documentation-and-some-new-features-too-check-it-out)
[Update 12.2.2 - Time for some more bug fixes!](#update-1222---time-for-some-more-bug-fixes)
[Update 12.2.1 - Time for some bug fixes!](#update-1221---time-for-some-bug-fixes)
[Update 12.2.0 - The chains of binding](#update-1220---the-chains-of-binding)
[Update 12.1.0 - Threads just can't hold on anymore](#update-1210---threads-just-cant-hold-on-anymore)
[Update: 12.0.0 - Big update (Lots of additions some changes)](#update-1200---big-update-lots-of-additions-some-changes)
[Update: 1.11.1 - Small Clarification on Love](#update-1111---small-clarification-on-love)
[Update: 1.11.0](#update-1110)
[Update: 1.10.0](#update-1100)
[Update: 1.9.2](#update-192)
[Update: 1.9.1 - Threads can now argue](#update-191---threads-can-now-argue)
[Update: 1.9.0](#update-190)
[Update: 1.8.7](#update-187)
[Update: 1.8.6](#update-186)
[Update: 1.8.5](#update-185)
[Update: 1.8.4](#update-184)
[Update: 1.8.3 - Mainloop recieves some needed overhauling](#update-183---mainloop-recieves-some-needed-overhauling)
[Update: 1.8.2](#update-182)
[Update: 1.8.1](#update-181)
[Update: 1.7.6](#update-176)
[Update: 1.7.5](#update-175)
[Update: 1.7.4](#update-174)
[Update: 1.7.3](#update-173)
[Update: 1.7.2](#update-172)
[Update: 1.7.1 - Bug Fixes Only](#update-171---bug-fixes-only)
[Update: 1.7.0 - Threading the systems](#update-170---threading-the-systems)
[Update: 1.6.0](#update-160)
[Update: 1.5.0](#update-150)
[Update: 1.4.1 (4/10/2017) - First Public release of the library](#update-141-4102017---first-public-release-of-the-library)
[Update: 1.4.0 (3/20/2017)](#update-140-3202017)
[Update: 1.3.0 (1/29/2017)](#update-130-1292017)
[Update: 1.2.0 (12.31.2016)](#update-120-12312016)
[Update: 1.1.0](#update-110)
[Update: 1.0.0](#update-100)
[Update: 0.6.3](#update-063)
[Update: 0.6.2](#update-062)
[Update: 0.6.1-6](#update-061-6)
[Update: 0.5.1-6](#update-051-6)
[Update: 0.4.1](#update-041)
[Update: 0.3.0 - The update that started it all](#update-030---the-update-that-started-it-all)
[Update: EventManager 2.0.0](#update-eventmanager-200)
[Update: EventManager 1.2.0](#update-eventmanager-120)
[Update: EventManager 1.1.0](#update-eventmanager-110)
[Update: EventManager 1.0.0 - Error checking](#update-eventmanager-100---error-checking)
[Version: EventManager 0.0.1 - In The Beginning things were very different](#version-eventmanager-001---in-the-beginning-things-were-very-different) + +# Update 15.1.0 - Hold the thread! + +Full Update Showcase + +```lua +package.path = "./?/init.lua;"..package.path +multi,thread = require("multi"):init() + +func = thread:newFunction(function(count) + local a = 0 + while true do + a = a + 1 + thread.sleep(.1) + thread.pushStatus(a,count) + if a == count then break end + end + return "Done" +end) + +multi:newThread("Function Status Test",function() + local ret = func(10) + local ret2 = func(15) + local ret3 = func(20) + ret.OnStatus(function(part,whole) + print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%") + end) + ret2.OnStatus(function(part,whole) + print("Ret2: ",math.ceil((part/whole)*1000)/10 .."%") + end) + ret3.OnStatus(function(part,whole) + print("Ret3: ",math.ceil((part/whole)*1000)/10 .."%") + end) + -- Connections can now be added together, if you had multiple holds and one finished before others and wasn't consumed it would lock forever! This is now fixed + thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn) + print("Function Done!") + os.exit() +end) + +test = thread:newFunction(function() + return 1,2,nil,3,4,5,6,7,8,9 +end,true) +print(test()) +multi:newThread("testing",function() + print("#Test = ",test()) + print(thread.hold(function() + print("Hello!") + return false + end,{ + interval = 2, + cycles = 3 + })) -- End result, 3 attempts within 6 seconds. If still false then timeout + print("held") +end).OnError(function(...) + print(...) +end) + +sandbox = multi:newProcessor() +sandbox:newTLoop(function() + print("testing...") +end,1) + +test2 = multi:newTLoop(function() + print("testing2...") +end,1) + +sandbox:newThread("Test Thread",function() + local a = 0 + while true do + thread.sleep(1) + a = a + 1 + print("Thread Test: ".. multi.getCurrentProcess().Name) + if a == 10 then + sandbox.Stop() + end + end +end).OnError(function(...) + print(...) +end) +multi:newThread("Test Thread",function() + while true do + thread.sleep(1) + print("Thread Test: ".. multi.getCurrentProcess().Name) + end +end).OnError(function(...) + print(...) +end) + +sandbox.Start() + +multi:mainloop() +``` + +Added: +--- + +## multi:newSystemThreadedJobQueue(n) isEmpty() + +- returns true if the queue is empty, false if there are items in the queue. + +**Note:** a queue might be empty, but the job may still be running and not finished yet! Also if a registered function is called directly instead of pushed, it will not reflect inside the queue until the next cycle! + +Example: +```lua +package.path="?.lua;?/init.lua;?.lua;?/?/init.lua;"..package.path +package.cpath = [[C:\Program Files (x86)\Lua\5.1\systree\lib\lua\5.1\?.dll;C:\Program Files (x86)\Lua\5.1\systree\lib\lua\5.1\?\core.dll;]] ..package.cpath +multi,thread = require("multi"):init() +GLOBAL,THREAD = require("multi.integration.threading"):init() -- Auto detects your enviroment and uses what's available + +jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads +func = jq:newFunction("test",function(a,b) + THREAD.sleep(2) + return a+b +end) +for i = 1,10 do + func(i,i*3).connect(function(data) + print(data) + end) +end + +local a = true +b = false + +multi:newThread("Standard Thread 1",function() + while true do + thread.sleep(.1) + print("Empty:",jq:isEmpty()) + end +end).OnError(function(self,msg) + print(msg) +end) +multi:mainloop() +``` + +## multi.TIMEOUT + +`multi.TIMEOUT` is equal to "TIMEOUT", it is reccomended to use this incase things change later on. There are plans to change the timeout value to become a custom object instead of a string. + +## new connections on threaded functions + +- `func.OnStatus(...)` + + Allows you to connect to the status of a function see [thread.pushStatus()](#status-added-to-threaded-functions) + +- `func.OnReturn(...)` + + Allows you to connect to the functions return event and capture its returns see [Example](#status-added-to-threaded-functions) for an example of it in use. + +## multi:newProcessor(name) + +```lua +package.path = "./?/init.lua;"..package.path +multi,thread = require("multi"):init() + +-- Create a processor object, it works a lot like the multi object +sandbox = multi:newProcessor() + +-- On our processor object create a TLoop that prints "testing..." every second +sandbox:newTLoop(function() + print("testing...") +end,1) + +-- Create a thread on the processor object +sandbox:newThread("Test Thread",function() + -- Create a counter named 'a' + local a = 0 + -- Start of the while loop that ends when a = 10 + while true do + -- pause execution of the thread for 1 second + thread.sleep(1) + -- increment a by 1 + a = a + 1 + -- display the name of the current process + print("Thread Test: ".. multi.getCurrentProcess().Name) + if a == 10 then + -- Stopping the processor stops all objects created inside that process including threads. In the backend threads use a regular multiobject to handle the scheduler and all of the holding functions. These all stop when a processor is stopped. This can be really useful to sandbox processes that might need to turned on and off with ease and not having to think about it. + sandbox.Stop() + end + end + -- Catch any errors that may come up +end).OnError(function(...) + print(...) +end) + +sandbox.Start() -- Start the process + +multi:mainloop() -- The main loop that allows all processes to continue +``` + +**Note:** Processor objects have been added and removed many times in the past, but will remain with this update. + +| Attribute | Type | Returns | Description | +---|---|---|--- +Start|Method()|self| Starts the process +Stop|Method()|self| Stops the process +OnError|Connection|connection| Allows connection to the process error handler +Type|Member:`string`|"process"| Contains the type of object +Active|Member:`boolean`|variable| If false the process is not active +Name|Member:`string`|variable| The name set at process creation +process|Thread|thread| A handle to a multi thread object + +**Note:** All tasks/threads created on a process are linked to that process. If a process is stopped all tasks/threads will be halted until the process is started back up. + +## Connection can now be added together + +Very useful when using thread.hold for multiple connections to trigger. + +Iif you had multiple holds and one finished before others and wasn't consumed it would lock forever! This is now fixed + +`print(conn + conn2 + conn3 + connN)` + +Can be chained as long as you want! See example below + +## Status added to threaded functions +- `thread.pushStatus(...)` + + Allows a developer to push a status from a function. + +- `tFunc.OnStatus(func(...))` + + A connection that can be used on a function to view the status of the threaded function + +Example: + +```lua +package.path = "./?/init.lua;"..package.path +multi,thread = require("multi"):init() + +func = thread:newFunction(function(count) + local a = 0 + while true do + a = a + 1 + thread.sleep(.1) + thread.pushStatus(a,count) + if a == count then break end + end + return "Done" +end) + +multi:newThread("Function Status Test",function() + local ret = func(10) + local ret2 = func(15) + local ret3 = func(20) + ret.OnStatus(function(part,whole) + --[[ Print out the current status. In this case every second it will update with: + 10% + 20% + 30% + ... + 100% + + Function Done! + ]] + print(math.ceil((part/whole)*1000)/10 .."%") + end) + ret2.OnStatus(function(part,whole) + print("Ret2: ",math.ceil((part/whole)*1000)/10 .."%") + end) + ret3.OnStatus(function(part,whole) + print("Ret3: ",math.ceil((part/whole)*1000)/10 .."%") + end) + -- Connections can now be added together, if you had multiple holds and one finished before others and wasn't consumed it would lock forever! This is now fixed + thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn) + print("Function Done!") + os.exit() +end) +``` + +Changed: +--- + +- `f = thread:newFunction(func,holdme)` + - Nothing changed that will affect how the object functions by default. The returned function is now a table that is callable and 3 new methods have been added: + + Method | Description + ---|--- + Pause() | Pauses the function, Will cause the function to return `nil, Function is paused` + Resume() | Resumes the function + holdMe(set) | Sets the holdme argument that existed at function creation + + ```lua + package.path = "./?/init.lua;"..package.path + multi, thread = require("multi"):init() + + test = thread:newFunction(function(a,b) + thread.sleep(1) + return a,b + end, true) + + print(test(1,2)) + + test:Pause() + + print(test(1,2)) + + test:Resume() + + print(test(1,2)) + + --[[ -- If you left holdme nil/false + + print(test(1,2).connect(function(...) + print(...) + end)) + + test:Pause() + + print(test(1,2).connect(function(...) + print(...) + end)) + + test:Resume() + + print(test(1,2).connect(function(...) + print(...) + end)) + + ]] + + multi:mainloop() + ``` + + **Output:** + + ``` + 1 2 + nil Function is paused + 1 2 + ``` + + **If holdme is nil/false:** + + ``` + nil Function is paused + + + 1 2 nil... + 1 2 nil... + ``` + +- thread.hold(n,opt) [Ref. Issue](https://github.com/rayaman/multi/issues/24) + - Added option table to thread.hold + | Option | Description | + ---|--- + | interval | Time between each poll | + | cycles | Number of cycles before timing out | + | sleep | Number of seconds before timing out | + | skip | Number of cycles before testing again, does not cause a timeout! | + + **Note:** cycles and sleep options cannot both be used at the same time. Interval and skip cannot be used at the same time either. Cycles take priority over sleep if both are present! HoldFor and HoldWithin can be emulated using the new features. Old functions will remain for backward compatibility. + + Using cycles, sleep or interval will cause a timeout; returning nil, multi.TIMEOUT + - `n` can be a number and thread.hold will act like thread.sleep. When `n` is a number the option table will be ignored! + +Removed: +--- + +- N/A + +Fixed: +--- + +- Threaded functions not returning multiple values [Ref. Issue](https://github.com/rayaman/multi/issues/21) +- Priority Lists not containing Very_High and Very_Low from previous update +- All functions that should have chaining now do, reminder all functions that don't return any data return a reference to itself to allow chaining of method calls. + +ToDo +--- + +- Work on network parallelism (I really want to make this, but time and getting it right is proving much more difficult) +- Work on QOL changes to allow cleaner code like [this](#connection-can-now-be-added-together) # Update 15.0.0 - The art of faking it Full Update Showcase @@ -67,6 +438,7 @@ Fixed: --- - pseudoThreading and threads had an issue where they weren't executing properly - lanesManager THREAD:get(STRING: name) not returning the value +- Issue where threaded function were not returning multiple values Todo: --- @@ -123,7 +495,7 @@ multi:lightloop() ``` Going Forward: --- -- There is no longer any plans for sterilization! Functions do not play nice on different platforms and there is no simple way to ensure that things work. +- There is no longer any plans for sterilization! Functions do not play nice on different platforms and there is no simple way to ensure that things work. Quality Of Life: --- @@ -367,7 +739,7 @@ Changed: - thread:newFunction(func,holup) — Added an argument holup to always force the threaded funcion to wait. Meaning you don't need to tell it to func().wait() or func().connect() - multi:newConnection(protect,callback,kill) — Added the kill argument. Makes connections work sort of like a stack. Pop off the connections as they get called. So a one time connection handler. - I'm not sure callback has been documented in any form. callback gets called each and everytime conn:Fire() gets called! As well as being triggered for each connfunc that is part of the connection. -- modified the lanes manager to create globals GLOBAL and THREAD when a thread is started. This way you are now able to more closely mirror code between lanes and love. As of right now parity between both enviroments is now really good. Upvalues being copied by default in lanes is something that I will not try and mirror in love. It's better to pass what you need as arguments, this way you can keep things consistant. looping thorugh upvalues and sterlizing them and sending them are very complex and slow opperations. +- modified the lanes manager to create globals GLOBAL and THREAD when a thread is started. This way you are now able to more closely mirror code between lanes and love. As of right now parity between both enviroments is now really good. Upvalues being copied by default in lanes is something that I will not try and mirror in love. It's better to pass what you need as arguments, this way you can keep things consistant. looping through upvalues and sterlizing them and sending them are very complex and slow. Removed: --- @@ -615,7 +987,7 @@ Tasks Details Table format # Update 13.0.0 - Added some documentation, and some new features too check it out! ------------- **Quick note** on the 13.0.0 update: -This update I went all in finding bugs and improving proformance within the library. I added some new features and the new task manager, which I used as a way to debug the library was a great help, so much so thats it is now a permanent feature. It's been about half a year since my last update, but so much work needed to be done. I hope you can find a use in your code to use my library. I am extremely proud of my work; 7 years of development, I learned so much about lua and programming through the creation of this library. It was fun, but there will always be more to add and bugs crawling there way in. I can't wait to see where this library goes in the future! +This update I went all in finding bugs and improving performance within the library. I added some new features and the new task manager, which I used as a way to debug the library was a great help, so much so thats it is now a permanent feature. It's been about half a year since my last update, but so much work needed to be done. I hope you can find a use in your code to use my library. I am extremely proud of my work; 7 years of development, I learned so much about lua and programming through the creation of this library. It was fun, but there will always be more to add and bugs crawling there way in. I can't wait to see where this library goes in the future! Fixed: --- diff --git a/multi/compat/lovr.lua b/multi/compat/lovr.lua new file mode 100644 index 0000000..c37906d --- /dev/null +++ b/multi/compat/lovr.lua @@ -0,0 +1,281 @@ +--[[ +MIT License + +Copyright (c) 2020 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +if table.unpack then + unpack=table.unpack +end +function table.val_to_str ( v ) + if "string" == type( v ) then + v = string.gsub( v, "\n", "\\n" ) + if string.match( string.gsub(v,"[^'\"]",""), '^"+$' ) then + return "'" .. v .. "'" + end + return '"' .. string.gsub(v,'"', '\\"' ) .. '"' + else + return "table" == type( v ) and table.tostring( v ) or + tostring( v ) + end +end + +function table.key_to_str ( k ) + if "string" == type( k ) and string.match( k, "^[_%a][_%a%d]*$" ) then + return k + else + return "[" .. table.val_to_str( k ) .. "]" + end +end + +function table.tostring( tbl ) + local result, done = {}, {} + for k, v in ipairs( tbl ) do + table.insert( result, table.val_to_str( v ) ) + done[ k ] = true + end + for k, v in pairs( tbl ) do + if not done[ k ] then + table.insert( result, + table.key_to_str( k ) .. "=" .. table.val_to_str( v ) ) + end + end + return "{" .. table.concat( result, "," ) .. "}" +end +function table.merge(t1, t2) + t1,t2= t1 or {},t2 or {} + for k,v in pairs(t2) do + if type(v) == "table" then + if type(t1[k] or false) == "table" then + table.merge(t1[k] or {}, t2[k] or {}) + else + t1[k] = v + end + else + t1[k] = v + end + end + return t1 +end +Library={} +function Library.inject(lib,dat,arg) + if type(lib)=="table" then + if type(dat)=="table" then + table.merge(lib,dat) + elseif type(dat)=="string" then + if lib.Version and dat:match("(%d-)%.(%d-)%.(%d-)") then + lib.Version={dat:match("(%d+)%.(%d+)%.(%d+)")} + elseif dat=="meta" and type(arg)=="table" then + local _mt=getmetatable(lib) or {} + local mt={} + table.merge(mt,arg) + table.merge(_mt,mt) + setmetatable(lib,_mt) + elseif dat=="compat" then + lib["getVersion"]=function(self) return self.Version[1].."."..self.Version[2].."."..self.Version[3] end + if not lib.Version then + lib.Version={1,0,0} + end + elseif dat=="inhert" then + if not(lib["!%"..arg.."%!"]) then print("Wrong Password!!") return end + lib["!%"..arg.."%!"].__index=lib["!!%"..arg.."%!!"] + end + elseif type(dat)=="function" then + for i,v in pairs(lib) do + dat(lib,i,v) + end + end + elseif type(lib)=="function" or type(lib)=="userdata" then + if lib==unpack then + print("function unpack cannot yet be injected!") + return unpack + elseif lib==pairs then + print("function pairs cannot yet be injected!") + return lib + elseif lib==ipairs then + print("function ipairs cannot yet be injected!") + return lib + elseif lib==type then + print("function type cannot yet be injected!") + return lib + end + temp={} + local mt={ + __call=function(t,...) + local consume,MainRet,init={},{},{...} + local tt={} + for i=1,#t.__Link do + tt={} + if t.__Link[i]==t.__Main then + if #consume~=0 then + MainRet={t.__Link[i](unpack(consume))} + else + MainRet={t.__Link[i](unpack(init))} + end + else + if i==1 then + consume=(t.__Link[i](unpack(init))) + else + if type(MainRet)=="table" then + table.merge(tt,MainRet) + end + if type(consume)=="table" then + table.merge(tt,consume) + end + consume={t.__Link[i](unpack(tt))} + end + if i==#t.__Link then + return unpack(consume) + end + if consume then if consume[0]=="\1\7\6\3\2\99\125" then consume[0]=nil return unpack(consume) end end + end + end + if type(MainRet)=="table" then + table.merge(tt,MainRet) + end + if type(consume)=="table" then + table.merge(tt,consume) + end + return unpack(tt) + end, + } + temp.__Link={lib} + temp.__Main=lib + temp.__self=temp + function temp:inject(func,i) + if i then + table.insert(self.__Link,i,func) + else + table.insert(self.__Link,func) + end + end + function temp:consume(func) + for i=1,#self.__Link do + if self.__Link[i]==self.__Main then + self.__Link[i]=func + self.__self.__Main=func + return true + end + end + return false + end + setmetatable(temp,mt) + return temp + else + return "arg1 must be a table or a function" + end +end +function Library.convert(...) + local temp,rets={...},{} + for i=1,#temp do + if type(temp[i])=="function" then + table.insert(rets,Library.inject(temp[i])) + else + error("Takes only functions and returns in order from functions given. arg # "..i.." is not a function!!! It is a "..type(temp[i])) + end + end + return unpack(rets) +end + +local link={MainLibrary=Library} +Library.inject(Library,"meta",{ + __Link=link, + __call=function(self,func) func(link) end, +}) +local multi, thread = require("multi").init() +os.sleep = lovr.timer.sleep +multi.drawF = {} +function multi:onDraw(func, i) + i = i or 1 + table.insert(self.drawF, i, func) +end +multi.OnKeyPressed = multi:newConnection() +multi.OnKeyReleased = multi:newConnection() +multi.OnErrHand = multi:newConnection() +multi.OnFocus = multi:newConnection() +multi.OnLoad = multi:newConnection() +multi.OnLog = multi:newConnection() +multi.OnPermission = multi:newConnection() +multi.OnResize = multi:newConnection() +multi.OnRestart = multi:newConnection() +multi.OnThreadError = multi:newConnection() +multi.OnDraw = multi:newConnection() +multi.OnTextInput = multi:newConnection() +multi.OnUpdate = multi:newConnection() +multi.OnQuit = multi:newConnection() +multi.OnPreLoad(function() + local function Hook(func, conn) + if lovr[func] ~= nil then + lovr[func] = Library.convert(lovr[func]) + lovr[func]:inject(function(...) + conn:Fire(...) + return {...} + end,1) + elseif lovr[func] == nil then + lovr[func] = function(...) + conn:Fire(...) + end + end + end + Hook("quit", multi.OnQuit) + Hook("keypressed", multi.OnKeyPressed) + Hook("keyreleased", multi.OnKeyReleased) + Hook("focus", multi.OnFocus) + Hook("log", multi.OnLog) + Hook("errhand", multi.OnErrHand) + Hook("load", multi.OnLoad) + Hook("draw", multi.OnDraw) + Hook("textinput", multi.OnTextInput) + Hook("update", multi.OnUpdate) + Hook("permission", multi.OnPermission) + Hook("resize", multi.OnResize) + Hook("restart", multi.OnRestart) + Hook("threaderror", multi.OnThreadError) + multi.OnDraw(function() + for i = 1, #multi.drawF do + lovr.graphics.setColor(255, 255, 255, 255) + multi.drawF[i]() + end + end) +end) + +function multi:lovrloop(light) + local link + link = multi:newThread(function() + local mainloop = lovr.run() + while true do + thread.yield() + pcall(mainloop) + end + end).OnError(function(...) + print(...) + end) + if light==false then + multi:mainloop() + else + multi:lightloop() + end +end + +multi.OnQuit(function() + multi.Stop() + lovr.event.quit() +end) +return multi diff --git a/multi/init.lua b/multi/init.lua index 5420197..63a5ced 100644 --- a/multi/init.lua +++ b/multi/init.lua @@ -21,24 +21,27 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] + local multi = {} local mainloopActive = false local isRunning = false local clock = os.clock local thread = {} + if not _G["$multi"] then _G["$multi"] = {multi=multi,thread=thread} end -multi.Version = "15.0.0" +multi.Version = "15.1.0" multi.stage = "stable" multi.Name = "multi.root" +multi.NIL = {Type="NIL"} multi.Mainloop = {} multi.Garbage = {} multi.ender = {} multi.Children = {} multi.Active = true -multi.Type = "mainprocess" +multi.Type = "rootprocess" multi.Rest = 0 multi._type = type multi.queue = {} @@ -46,6 +49,7 @@ multi.clock = os.clock multi.time = os.time multi.LinkedPath = multi multi.lastTime = clock() +multi.TIMEOUT = "TIMEOUT" multi.Priority_Core = 1 multi.Priority_Very_High = 4 @@ -59,23 +63,24 @@ multi.Priority_Idle = 65536 multi.PriorityResolve = { [1]="Core", - [4]="High", - [16]="Above Normal", - [64]="Normal", - [256]="Below Normal", - [1024]="Low", - [4096]="Idle", + [4]="Very High", + [16]="High", + [64]="Above Normal", + [256]="Normal", + [1024]="Below Normal", + [4096]="Low", + [16384]="Very Low", + [65536]="Idle", } multi.PStep = 1 -multi.PList = {multi.Priority_Core,multi.Priority_High,multi.Priority_Above_Normal,multi.Priority_Normal,multi.Priority_Below_Normal,multi.Priority_Low,multi.Priority_Idle} +multi.PList = {multi.Priority_Core,multi.Priority_Very_High,multi.Priority_High,multi.Priority_Above_Normal,multi.Priority_Normal,multi.Priority_Below_Normal,multi.Priority_Low,multi.Priority_Very_Low,multi.Priority_Idle} multi.PriorityTick=1 multi.Priority=multi.Priority_High multi.threshold=256 multi.threstimed=.001 function multi.init() - multi.NIL = {Type="NIL"} return _G["$multi"].multi,_G["$multi"].thread end @@ -85,7 +90,7 @@ function multi.Stop() end --Processor -local priorityTable = {[0]="Round-Robin",[1]="Just-Right",[2]="Top-heavy",[3]="Timed-Based-Balancer"} +local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"} local ProcessName = {[true]="SubProcessor",[false]="MainProcessor"} function multi:getTasksDetails(t) if t == "string" or not t then @@ -101,7 +106,7 @@ function multi:getTasksDetails(t) count = count + 1 table.insert(str,{v.Type:sub(1,1):upper()..v.Type:sub(2,-1)..name,multi.Round(os.clock()-v.creationTime,3),self.PriorityResolve[v.Priority],v.TID}) end - for v,i in pairs(multi.PausedObjects) do + for v,i in pairs(self.PausedObjects) do local name = v.Name or "" if name~="" then name = " <"..name..">" @@ -120,7 +125,7 @@ function multi:getTasksDetails(t) dat2 = dat2.."\n" end end - local load, steps = multi:getLoad() + local load, steps = self:getLoad() if thread.__threads then for i=1,#thread.__threads do dat = dat .. "\n" @@ -130,7 +135,7 @@ function multi:getTasksDetails(t) return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2 end elseif t == "t" or t == "table" then - local load,steps = multi:getLoad() + local load,steps = self:getLoad() str = { ProcessName = (self.Name or "Unnamed"), ThreadCount = #thread.__threads, @@ -166,15 +171,15 @@ end -- Used with ISO Threads local function isolateFunction(func,env) - local dmp = string.dump(func) - local env = env or {} - if setfenv then - local f = loadstring(dmp,"IsolatedThread_PesudoThreading") - setfenv(f,env) - return f - else - return load(dmp,"IsolatedThread_PesudoThreading","bt",env) - end + local dmp = string.dump(func) + local env = env or {} + if setfenv then + local f = loadstring(dmp,"IsolatedThread_PesudoThreading") + setfenv(f,env) + return f + else + return load(dmp,"IsolatedThread_PesudoThreading","bt",env) + end end function multi:Break() @@ -206,9 +211,9 @@ end -- Advance Timer stuff function multi:SetTime(n) if not n then n=3 end - local c=multi:newBase() + local c=self:newBase() c.Type='timemaster' - c.timer=multi:newTimer() + c.timer=self:newTimer() c.timer:Start() c.set=n c.link=self @@ -248,7 +253,7 @@ end -- Timer stuff done multi.PausedObjects = {} function multi:Pause() - if self.Type=='mainprocess' then + if self.Type=='rootprocess' then multi.print("You cannot pause the main process. Doing so will stop all methods and freeze your program! However if you still want to use multi:_Pause()") else self.Active=false @@ -265,7 +270,7 @@ function multi:Pause() end function multi:Resume() - if self.Type=='process' or self.Type=='mainprocess' then + if self.Type=='process' or self.Type=='rootprocess' then self.Active=true local c=self:getChildren() for i=1,#c do @@ -282,7 +287,7 @@ function multi:Resume() end function multi:Destroy() - if self.Type=='process' or self.Type=='mainprocess' then + if self.Type=='process' or self.Type=='rootprocess' then local c=self:getChildren() for i=1,#c do self.OnObjectDestroyed:Fire(c[i]) @@ -313,6 +318,7 @@ end function multi:create(ref) multi.OnObjectCreated:Fire(ref,self) + return self end function multi:setName(name) @@ -323,9 +329,9 @@ end --Constructors [CORE] local _tid = 0 function multi:newBase(ins) - if not(self.Type=='mainprocess' or self.Type=='process' or self.Type=='queue') then error('Can only create an object on multi or an interface obj') return false end + if not(self.Type=='rootprocess' or self.Type=='process' or self.Type=='queue' or self.Type == 'sandbox') then error('Can only create an object on multi or an interface obj') return false end local c = {} - if self.Type=='process' or self.Type=='queue' then + if self.Type=='process' or self.Type=='queue' or self.Type=='sandbox' then setmetatable(c, {__index = multi}) else setmetatable(c, {__index = multi}) @@ -372,6 +378,29 @@ function multi:newConnection(protect,func,kill) else return self:connect(...) end + end, + __add = function(c1,c2) + cn = multi:newConnection() + if not c1.__hasInstances then + cn.__hasInstances = 2 + cn.__count = 0 + else + cn.__hasInstances = c1.__hasInstances + 1 + cn.__count = c1.__count + end + c1(function(...) + cn.__count = cn.__count + 1 + if cn.__count == cn.__hasInstances then + cn:Fire(...) + end + end) + c2(function(...) + cn.__count = cn.__count + 1 + if cn.__count == cn.__hasInstances then + cn:Fire(...) + end + end) + return cn end}) c.Type='connector' c.func={} @@ -405,9 +434,11 @@ function multi:newConnection(protect,func,kill) end function c:Lock() c.lock = true + return self end function c:Unlock() c.lock = false + return self end function c:Fire(...) local ret={} @@ -520,6 +551,7 @@ function multi:newConnection(protect,func,kill) end return c end + multi.OnObjectCreated=multi:newConnection() multi.OnObjectDestroyed=multi:newConnection() multi.OnLoad = multi:newConnection(nil,nil,true) @@ -552,7 +584,7 @@ function multi:newTimer() time=os.clock()-time return self end - self:create(c) + multi:create(c) return c end @@ -580,7 +612,7 @@ function multi:newEvent(task) return self end self:setPriority("core") - self:create(c) + multi:create(c) return c end function multi:newUpdater(skip) @@ -602,7 +634,7 @@ function multi:newUpdater(skip) return self end c.OnUpdate=self.OnMainConnect - self:create(c) + multi:create(c) return c end function multi:newAlarm(set) @@ -642,7 +674,7 @@ function multi:newAlarm(set) self.Parent.Pause(self) return self end - self:create(c) + multi:create(c) return c end function multi:newLoop(func) @@ -662,7 +694,7 @@ function multi:newLoop(func) table.insert(funcs,func) return self end - self:create(c) + multi:create(c) return c end function multi:newFunction(func) @@ -688,7 +720,7 @@ function multi:newFunction(func) return self end setmetatable(c,mt) - self:create(c) + multi:create(c) return c end function multi:newStep(start,reset,count,skip) @@ -759,7 +791,7 @@ function multi:newStep(start,reset,count,skip) self:Resume() return self end - self:create(c) + multi:create(c) return c end function multi:newTLoop(func,set) @@ -795,7 +827,7 @@ function multi:newTLoop(func,set) table.insert(self.func,func) return self end - self:create(c) + multi:create(c) return c end function multi:setTimeout(func,t) @@ -869,7 +901,7 @@ function multi:newTStep(start,reset,count,set) self:Resume() return self end - self:create(c) + multi:create(c) return c end local scheduledjobs = {} @@ -902,6 +934,48 @@ function multi:scheduleJob(time,func) table.insert(scheduledjobs,{time, func,false}) end +local __CurrentProcess = multi +function multi.getCurrentProcess() + return __CurrentProcess +end + +local globalThreads = {} + +local sandcount = 0 +function multi:newProcessor(name) + local c = {} + setmetatable(c,{__index = self}) + local multi,thread = require("multi"):init() -- We need to capture the t in thread + local name = name or "Processor_"..sandcount + sandcount = sandcount + 1 + c.Mainloop = {} + c.Type = "process" + c.Active = false + c.Name = "multi.process<".. (name or "") .. ">" + c.process = self:newThread(c.Name,function() + while true do + thread.hold(function() + return c.Active + end) + __CurrentProcess = c + c:uManager() + __CurrentProcess = self + end + end) + c.OnError = c.process.OnError + function c.Start() + c.Active = true + return self + end + function c.Stop() + c.Active = false + return self + end + c:attachScheduler() + c.initThreads() + return c +end + -- Threading stuff local initT = false local threadCount = 0 @@ -911,12 +985,13 @@ local threads = thread.__threads local Gref = _G multi.GlobalVariables={} local dFunc = function() return true end -local dRef = {nil,nil,nil} +local dRef = {nil,nil,nil,nil,nil} thread.requests = {} function thread.request(t,cmd,...) thread.requests[t.thread] = {cmd,{...}} end function thread.getRunningThread() + local threads = globalThreads local t = coroutine.running() if t then for i,v in pairs(threads) do @@ -941,11 +1016,44 @@ function thread.sleep(n) dRef[2] = n or 0 return coroutine.yield(dRef) end -function thread.hold(n) +-- function thread.hold(n) +-- thread._Requests() +-- dRef[1] = "_hold_" +-- dRef[2] = n or dFunc +-- return coroutine.yield(dRef) +-- end +function thread.hold(n,opt) thread._Requests() - dRef[1] = "_hold_" - dRef[2] = n or dFunc - return coroutine.yield(dRef) + if opt and type(opt)=="table" then + if opt.interval then + dRef[4] = opt.interval + end + if opt.cycles then + dRef[1] = "_holdW_" + dRef[2] = opt.cycles or 1 + dRef[3] = n or dFunc + return coroutine.yield(dRef) + elseif opt.sleep then + dRef[1] = "_holdF_" + dRef[2] = opt.sleep + dRef[3] = n or dFunc + return coroutine.yield(dRef) + elseif opt.skip then + dRef[1] = "_skip_" + dRef[2] = opt.skip or 1 + return coroutine.yield(dRef) + end + end + if type(n) == "number" then + thread.getRunningThread().lastSleep = clock() + dRef[1] = "_sleep_" + dRef[2] = n or 0 + return coroutine.yield(dRef) + else + dRef[1] = "_hold_" + dRef[2] = n or dFunc + return coroutine.yield(dRef) + end end function thread.holdFor(sec,n) thread._Requests() @@ -1007,7 +1115,7 @@ function multi.hold(func,no) end local death = false if type(func)=="number" then - multi:newThread("Hold_func",function() + self:newThread("Hold_func",function() thread.sleep(func) death = true end) @@ -1016,7 +1124,7 @@ function multi.hold(func,no) end else local rets - multi:newThread("Hold_func",function() + self:newThread("Hold_func",function() rets = {thread.hold(func)} death = true end) @@ -1028,352 +1136,425 @@ function multi.hold(func,no) end function multi.holdFor(n,func) local temp - multi:newThread(function() + multi.getCurrentProcess():newThread(function() thread.sleep(n) temp = true end) - return multi.hold(function() + return multi.getCurrentProcess().hold(function() if func() then return func() elseif temp then - return multi.NIL, "TIMEOUT" + return multi.NIL, multi.TIMEOUT end end) end local function cleanReturns(...) - local n = select("#", ...) local returns = {...} local rets = {} local ind = 0 - for i=n,1,-1 do + for i=#returns,1,-1 do if returns[i] then - ind=i + ind = i + break end end return unpack(returns,1,ind) end +function thread.pushStatus(...) + local t = thread.getRunningThread() + t.statusconnector:Fire(...) +end function thread:newFunction(func,holdme) - return function(...) - local rets, err + local tfunc = {} + tfunc.Active = true + function tfunc:Pause() + self.Active = false + end + function tfunc:Resume() + self.Active = true + end + function tfunc:holdMe(b) + holdme = b + end + local function noWait() + return nil, "Function is paused" + end + local rets, err local function wait(no) - if thread.isThread() and not (no) then - return multi.hold(function() - if err then - return multi.NIL, err - elseif rets then - return cleanReturns((rets[1] or multi.NIL),rets[2],rets[3],rets[4],rets[5],rets[6],rets[7]) - end - end) - else - while not rets and not err do - multi.scheduler:Act() - end + if thread.isThread() and not (no) then + return multi.hold(function() if err then - return nil,err + return multi.NIL, err + elseif rets then + return cleanReturns((rets[1] or multi.NIL),rets[2],rets[3],rets[4],rets[5],rets[6],rets[7],rets[8],rets[9],rets[10],rets[11],rets[12],rets[13],rets[14],rets[15],rets[16]) end - return cleanReturns(rets[1],rets[2],rets[3],rets[4],rets[5],rets[6],rets[7]) + end) + else + while not rets and not err do + multi.scheduler:Act() end + if err then + return nil,err + end + return cleanReturns(rets[1],rets[2],rets[3],rets[4],rets[5],rets[6],rets[7],rets[8],rets[9],rets[10],rets[11],rets[12],rets[13],rets[14],rets[15],rets[16]) end - local t = multi:newThread("TempThread",func,...) + end + tfunc.__call = function(t,...) + if not t.Active then + if holdme then + return nil, "Function is paused" + end + return { + isTFunc = true, + wait = noWait, + connect = function(f) + f(nil,"Function is paused") + end + } + end + local t = multi.getCurrentProcess():newThread("TempThread",func,...) t.OnDeath(function(self,status,...) rets = {...} end) t.OnError(function(self,e) err = e end) if holdme then return wait() end local temp = { + OnStatus = multi:newConnection(), + OnError = multi:newConnection(), + OnReturn = multi:newConnection(), isTFunc = true, wait = wait, connect = function(f) - t.OnDeath(function(self,status,...) f(cleanReturns(...)) end) - t.OnError(function(self,err) f(err) end) + local tempConn = multi:newConnection() + t.OnDeath(function(self,status,...) if f then f(...) else tempConn:Fire(...) end end) + t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end) + return tempConn end } + t.OnDeath(function(self,status,...) temp.OnReturn:Fire(...) end) + t.OnError(function(self,err) temp.OnError:Fire(err) end) + t.linkedFunction = temp + t.statusconnector = temp.OnStatus return temp - end -end --- A cross version way to set enviroments, not the same as fenv though -function multi.setEnv(func,env) - local f = string.dump(func) - local chunk = load(f,"env","bt",env) - return chunk + end + setmetatable(tfunc,tfunc) + return tfunc end -function multi:newThread(name,func,...) - multi.OnLoad:Fire() - local func = func or name - if type(name) == "function" then - name = "Thread#"..threadCount - end - local c={} - local env = {self=c} - c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} - c.startArgs = {...} - c.ref={} - c.Name=name - c.thread=coroutine.create(func) - c.sleep=1 - c.Type="thread" - c.TID = threadid - c.firstRunDone=false - c.timer=multi:newTimer() - c._isPaused = false - c.returns = {} - c.isError = false - c.OnError = multi:newConnection(true,nil,true) - c.OnDeath = multi:newConnection(true,nil,true) - function c:isPaused() - return self._isPaused - end - local resumed = false - function c:Pause() - if not self._isPaused then - thread.request(self,"exec",function() - thread.hold(function() - return resumed +-- A cross version way to set enviroments, not the same as fenv though +function multi.setEnv(func,env) + local f = string.dump(func) + local chunk = load(f,"env","bt",env) + return chunk +end + +function multi:attachScheduler() + local threads = {} + self.threadsRef = threads + function self:newThread(name,func,...) + self.OnLoad:Fire() + local func = func or name + if type(name) == "function" then + name = "Thread#"..threadCount + end + local c={} + local env = {self=c} + c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} + c.startArgs = {...} + c.ref={} + c.Name=name + c.thread=coroutine.create(func) + c.sleep=1 + c.Type="thread" + c.TID = threadid + c.firstRunDone=false + c.timer=self:newTimer() + c._isPaused = false + c.returns = {} + c.isError = false + c.OnError = self:newConnection(true,nil,true) + c.OnDeath = self:newConnection(true,nil,true) + function c:isPaused() + return self._isPaused + end + local resumed = false + function c:Pause() + if not self._isPaused then + thread.request(self,"exec",function() + thread.hold(function() + return resumed + end) + resumed = false + self._isPaused = false end) - resumed = false - self._isPaused = false - end) - self._isPaused = true - end - return self - end - function c:Resume() - resumed = true - return self - end - function c:Kill() - thread.request(self,"kill") - return self - end - c.Destroy = c.Kill - function c.ref:send(name,val) - ret=coroutine.yield({Name=name,Value=val}) - end - function c.ref:get(name) - return self.Globals[name] - end - function c.ref:kill() - dRef[1] = "_kill_" - dRef[2] = "I Was killed by You!" - err = coroutine.yield(dRef) - if err then - error("Failed to kill a thread! Exiting...") - end - end - function c.ref:sleep(n) - if type(n)=="function" then - ret=thread.hold(n) - elseif type(n)=="number" then - ret=thread.sleep(tonumber(n) or 0) - else - error("Invalid Type for sleep!") - end - end - function c.ref:syncGlobals(v) - self.Globals=v - end - table.insert(threads,c) - if initT==false then - multi.initThreads() - end - c.creationTime = os.clock() - threadid = threadid + 1 - self:create(c) - return c -end -function multi:newISOThread(name,func,_env,...) - multi.OnLoad:Fire() - local func = func or name - local env = _env or {} - if not env.thread then - env.thread = thread - end - if not env.multi then - env.multi = multi - end - if type(name) == "function" then - name = "Thread#"..threadCount - end - local func = isolateFunction(func,env) - return self:newThread(name,func) -end -function multi.initThreads(justThreads) - initT = true - multi.scheduler=multi:newLoop():setName("multi.thread") - multi.scheduler.Type="scheduler" - function multi.scheduler:setStep(n) - self.skip=tonumber(n) or 24 - end - multi.scheduler.skip=0 - local t0,t1,t2,t3,t4,t5,t6 - local r1,r2,r3,r4,r5,r6 - local ret,_ - local function CheckRets(i) - if threads[i] and not(threads[i].isError) then - if not _ then - threads[i].isError = true - threads[i].TempRets[1] = ret - return + self._isPaused = true end - if ret or r1 or r2 or r3 or r4 or r5 or r6 then - threads[i].TempRets[1] = ret - threads[i].TempRets[2] = r1 - threads[i].TempRets[3] = r2 - threads[i].TempRets[4] = r3 - threads[i].TempRets[5] = r4 - threads[i].TempRets[6] = r5 - threads[i].TempRets[7] = r6 + return self + end + function c:Resume() + resumed = true + return self + end + function c:Kill() + thread.request(self,"kill") + return self + end + c.Destroy = c.Kill + function c.ref:send(name,val) + ret=coroutine.yield({Name=name,Value=val}) + end + function c.ref:get(name) + return self.Globals[name] + end + function c.ref:kill() + dRef[1] = "_kill_" + dRef[2] = "I Was killed by You!" + err = coroutine.yield(dRef) + if err then + error("Failed to kill a thread! Exiting...") end end + function c.ref:sleep(n) + if type(n)=="function" then + ret=thread.hold(n) + elseif type(n)=="number" then + ret=thread.sleep(tonumber(n) or 0) + else + error("Invalid Type for sleep!") + end + end + function c.ref:syncGlobals(v) + self.Globals=v + end + table.insert(threads,c) + table.insert(globalThreads,c) + if initT==false then + self.initThreads() + end + c.creationTime = os.clock() + threadid = threadid + 1 + multi:create(c) + return c end - local function holdconn(n) - if type(ret[n])=="table" and ret[n].Type=='connector' then - local letsgo - ret[n](function(...) letsgo = {...} end) - ret[n] = function() - if letsgo then - return unpack(letsgo) + function self:newISOThread(name,func,_env,...) + self.OnLoad:Fire() + local func = func or name + local env = _env or {} + if not env.thread then + env.thread = thread + end + if not env.multi then + env.multi = self + end + if type(name) == "function" then + name = "Thread#"..threadCount + end + local func = isolateFunction(func,env) + return self:newThread(name,func) + end + function self.initThreads(justThreads) + initT = true + self.scheduler=self:newLoop():setName("multi.thread") + self.scheduler.Type="scheduler" + function self.scheduler:setStep(n) + self.skip=tonumber(n) or 24 + end + self.scheduler.skip=0 + local t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 + local r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 + local ret,_ + local function CheckRets(i) + if threads[i] and not(threads[i].isError) then + if not _ then + threads[i].isError = true + threads[i].TempRets[1] = ret + return + end + if ret or r1 or r2 or r3 or r4 or r5 or r6 or r7 or r8 or r9 or r10 or r11 or r12 or r13 or r14 or r15 or r16 then + threads[i].TempRets[1] = ret + threads[i].TempRets[2] = r1 + threads[i].TempRets[3] = r2 + threads[i].TempRets[4] = r3 + threads[i].TempRets[5] = r4 + threads[i].TempRets[6] = r5 + threads[i].TempRets[7] = r6 + threads[i].TempRets[8] = r7 + threads[i].TempRets[9] = r8 + threads[i].TempRets[10] = r9 + threads[i].TempRets[11] = r10 + threads[i].TempRets[12] = r11 + threads[i].TempRets[13] = r12 + threads[i].TempRets[14] = r13 + threads[i].TempRets[15] = r14 + threads[i].TempRets[16] = r15 + threads[i].TempRets[17] = r16 end end end - end - local function helper(i) - if type(ret)=="table" then - if ret[1]=="_kill_" then - threads[i].OnDeath:Fire(threads[i],"killed",ret,r1,r2,r3,r4,r5,r6) - multi.setType(threads[i],multi.DestroyedObj) - table.remove(threads,i) - ret = nil - elseif ret[1]=="_sleep_" then - threads[i].sec = ret[2] - threads[i].time = clock() - threads[i].task = "sleep" - threads[i].__ready = false - ret = nil - elseif ret[1]=="_skip_" then - threads[i].count = ret[2] - threads[i].pos = 0 - threads[i].task = "skip" - threads[i].__ready = false - ret = nil - elseif ret[1]=="_hold_" then - holdconn(2) - threads[i].func = ret[2] - threads[i].task = "hold" - threads[i].__ready = false - ret = nil - elseif ret[1]=="_holdF_" then - holdconn(3) - threads[i].sec = ret[2] - threads[i].func = ret[3] - threads[i].task = "holdF" - threads[i].time = clock() - threads[i].__ready = false - ret = nil - elseif ret[1]=="_holdW_" then - holdconn(3) - threads[i].count = ret[2] - threads[i].pos = 0 - threads[i].func = ret[3] - threads[i].task = "holdW" - threads[i].time = clock() - threads[i].__ready = false - ret = nil + local function holdconn(n) + if type(ret[n])=="table" and ret[n].Type=='connector' then + local letsgo + ret[n](function(...) letsgo = {...} end) + ret[n] = function() + if letsgo then + return unpack(letsgo) + end + end end end - CheckRets(i) - end - multi.scheduler:OnLoop(function(self) - for i=#threads,1,-1 do - if threads[i].isError then - if coroutine.status(threads[i].thread)=="dead" then - threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) - multi.setType(threads[i],multi.DestroyedObj) + local function helper(i) + if type(ret)=="table" then + if ret[1]=="_kill_" then + threads[i].OnDeath:Fire(threads[i],"killed",ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16) + self.setType(threads[i],self.DestroyedObj) table.remove(threads,i) + ret = nil + elseif ret[1]=="_sleep_" then + threads[i].sec = ret[2] + threads[i].time = clock() + threads[i].task = "sleep" + threads[i].__ready = false + ret = nil + elseif ret[1]=="_skip_" then + threads[i].count = ret[2] + threads[i].pos = 0 + threads[i].task = "skip" + threads[i].__ready = false + ret = nil + elseif ret[1]=="_hold_" then + holdconn(2) + threads[i].func = ret[2] + threads[i].task = "hold" + threads[i].__ready = false + threads[i].interval = ret[4] or 0 + threads[i].intervalR = clock() + ret = nil + elseif ret[1]=="_holdF_" then + holdconn(3) + threads[i].sec = ret[2] + threads[i].func = ret[3] + threads[i].task = "holdF" + threads[i].time = clock() + threads[i].__ready = false + threads[i].interval = ret[4] or 0 + threads[i].intervalR = clock() + ret = nil + elseif ret[1]=="_holdW_" then + holdconn(3) + threads[i].count = ret[2] + threads[i].pos = 0 + threads[i].func = ret[3] + threads[i].task = "holdW" + threads[i].time = clock() + threads[i].__ready = false + threads[i].interval = ret[4] or 0 + threads[i].intervalR = clock() + ret = nil end end - if threads[i] and not threads[i].__started then - if coroutine.running() ~= threads[i].thread then - _,ret,r1,r2,r3,r4,r5,r6=coroutine.resume(threads[i].thread,unpack(threads[i].startArgs)) + CheckRets(i) + end + self.scheduler:OnLoop(function(self) + for i=#threads,1,-1 do + if threads[i].isError then + if coroutine.status(threads[i].thread)=="dead" then + threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) + self.setType(threads[i],self.DestroyedObj) + table.remove(threads,i) + end + end + if threads[i] and not threads[i].__started then + if coroutine.running() ~= threads[i].thread then + _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,unpack(threads[i].startArgs)) + end + threads[i].__started = true + helper(i) + end + if threads[i] and not _ then + threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) + threads[i].isError = true + end + if threads[i] and coroutine.status(threads[i].thread)=="dead" then + local t = threads[i].TempRets or {} + threads[i].OnDeath:Fire(threads[i],"ended",t[1],t[2],t[3],t[4],t[5],t[6],t[7],t[8],t[9],t[10],t[11],t[12],t[13],t[14],t[15],t[16]) + self.setType(threads[i],self.DestroyedObj) + table.remove(threads,i) + elseif threads[i] and threads[i].task == "skip" then + threads[i].pos = threads[i].pos + 1 + if threads[i].count==threads[i].pos then + threads[i].task = "" + threads[i].__ready = true + end + elseif threads[i] and threads[i].task == "hold" then + if clock() - threads[i].intervalR>=threads[i].interval then + t0,t1,t2,t3,t4,t5,t6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16 = threads[i].func() + if t0 then + if t0==self.NIL then + t0 = nil + end + threads[i].task = "" + threads[i].__ready = true + end + threads[i].intervalR = clock() + end + elseif threads[i] and threads[i].task == "sleep" then + if clock() - threads[i].time>=threads[i].sec then + threads[i].task = "" + threads[i].__ready = true + end + elseif threads[i] and threads[i].task == "holdF" then + if clock() - threads[i].intervalR>=threads[i].interval then + t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func() + if t0 then + threads[i].task = "" + threads[i].__ready = true + elseif clock() - threads[i].time>=threads[i].sec then + threads[i].task = "" + threads[i].__ready = true + t0 = nil + t1 = multi.TIMEOUT + end + threads[i].intervalR = clock() + end + elseif threads[i] and threads[i].task == "holdW" then + if clock() - threads[i].intervalR>=threads[i].interval then + threads[i].pos = threads[i].pos + 1 + print(threads[i].pos,threads[i].count) + t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func() + if t0 then + threads[i].task = "" + threads[i].__ready = true + elseif threads[i].count==threads[i].pos then + threads[i].task = "" + threads[i].__ready = true + t0 = nil + t1 = multi.TIMEOUT + end + threads[i].intervalR = clock() + end + end + if threads[i] and threads[i].__ready then + threads[i].__ready = false + if coroutine.running() ~= threads[i].thread then + _,ret,r1,r2,r3,r4,r5,r6,r7,r8,r9,r10,r11,r12,r13,r14,r15,r16=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15) + CheckRets(i) + end end - threads[i].__started = true helper(i) end - if threads[i] and not _ then - threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets)) - threads[i].isError = true + end) + if justThreads then + while true do + self.scheduler:Act() end - if threads[i] and coroutine.status(threads[i].thread)=="dead" then - local t = threads[i].TempRets or {} - threads[i].OnDeath:Fire(threads[i],"ended",t[1],t[2],t[3],t[4],t[5],t[6],t[7]) - multi.setType(threads[i],multi.DestroyedObj) - table.remove(threads,i) - elseif threads[i] and threads[i].task == "skip" then - threads[i].pos = threads[i].pos + 1 - if threads[i].count==threads[i].pos then - threads[i].task = "" - threads[i].__ready = true - end - elseif threads[i] and threads[i].task == "hold" then - t0,t1,t2,t3,t4,t5,t6 = threads[i].func() - if t0 then - if t0==multi.NIL then - t0 = nil - end - threads[i].task = "" - threads[i].__ready = true - end - elseif threads[i] and threads[i].task == "sleep" then - if clock() - threads[i].time>=threads[i].sec then - threads[i].task = "" - threads[i].__ready = true - end - elseif threads[i] and threads[i].task == "holdF" then - t0,t1,t2,t3,t4,t5,t6 = threads[i].func() - if t0 then - threads[i].task = "" - threads[i].__ready = true - elseif clock() - threads[i].time>=threads[i].sec then - threads[i].task = "" - threads[i].__ready = true - t0 = nil - t1 = "TIMEOUT" - end - elseif threads[i] and threads[i].task == "holdW" then - threads[i].pos = threads[i].pos + 1 - t0,t1,t2,t3,t4,t5,t6 = threads[i].func() - if t0 then - threads[i].task = "" - threads[i].__ready = true - elseif threads[i].count==threads[i].pos then - threads[i].task = "" - threads[i].__ready = true - t0 = nil - t1 = "TIMEOUT" - end - end - if threads[i] and threads[i].__ready then - threads[i].__ready = false - if coroutine.running() ~= threads[i].thread then - _,ret,r1,r2,r3,r4,r5,r6=coroutine.resume(threads[i].thread,t0,t1,t2,t3,t4,t5,t6) - CheckRets(i) - end - end - helper(i) - end - end) - if justThreads then - while true do - multi.scheduler:Act() end end end function multi:newService(func) -- Priority managed threads local c = {} c.Type = "service" - c.OnStopped = multi:newConnection() - c.OnStarted = multi:newConnection() + c.OnStopped = self:newConnection() + c.OnStarted = self:newConnection() local Service_Data = {} local active local time @@ -1383,7 +1564,7 @@ function multi:newService(func) -- Priority managed threads local scheme = 1 function c.Start() if not active then - time = multi:newTimer() + time = self:newTimer() time:Start() active = true c:OnStarted(c,Service_Data) @@ -1398,7 +1579,7 @@ function multi:newService(func) -- Priority managed threads task(ap) return c end - local th = multi:newThread(function() + local th = self:newThread(function() while true do process() end @@ -1408,6 +1589,7 @@ function multi:newService(func) -- Priority managed threads th:kill() c.Stop() multi.setType(c,multi.DestroyedObj) + return c end function c:SetScheme(n) if type(self)=="number" then n = self end @@ -1456,6 +1638,7 @@ function multi:newService(func) -- Priority managed threads if type(self)=="number" then pri = self end p = pri c.SetScheme(scheme) + return c end multi.create(multi,c) return c @@ -1482,6 +1665,7 @@ function multi:lightloop(settings) end end function multi:mainloop(settings) + __CurrentProcess = self multi.OnPreLoad:Fire() multi.defaultSettings = settings or multi.defaultSettings self.uManager=self.uManagerRef @@ -1536,8 +1720,10 @@ function multi:mainloop(settings) self.CID=_D if not protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1559,8 +1745,10 @@ function multi:mainloop(settings) self.CID=_D if not protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1591,8 +1779,10 @@ function multi:mainloop(settings) self.CID=_D if not protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1615,6 +1805,7 @@ function multi:mainloop(settings) if not protect then if sRef.solid then sRef:Act() + __CurrentProcess = self solid = true else time = multi.timer(sRef.Act,sRef) @@ -1631,9 +1822,11 @@ function multi:mainloop(settings) else if Loop[_D].solid then Loop[_D]:Act() + __CurrentProcess = self solid = true else time, status, err=multi.timer(pcall,Loop[_D].Act,Loop[_D]) + __CurrentProcess = self Loop[_D].solid = true solid = false end @@ -1673,8 +1866,10 @@ function multi:mainloop(settings) self.CID=_D if not protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1693,6 +1888,7 @@ function multi:mainloop(settings) end end function multi:uManager(settings) + __CurrentProcess = self multi.OnPreLoad:Fire() multi.defaultSettings = settings or multi.defaultSettings self.t,self.tt = clock(),0 @@ -1731,8 +1927,10 @@ function multi:uManagerRef(settings) self.CID=_D if not multi.defaultSettings.protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1754,8 +1952,10 @@ function multi:uManagerRef(settings) self.CID=_D if not multi.defaultSettings.protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1782,8 +1982,10 @@ function multi:uManagerRef(settings) self.CID=_D if not protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1806,6 +2008,7 @@ function multi:uManagerRef(settings) if not protect then if sRef.solid then sRef:Act() + __CurrentProcess = self solid = true else time = multi.timer(sRef.Act,sRef) @@ -1822,9 +2025,11 @@ function multi:uManagerRef(settings) else if Loop[_D].solid then Loop[_D]:Act() + __CurrentProcess = self solid = true else time, status, err=multi.timer(pcall,Loop[_D].Act,Loop[_D]) + __CurrentProcess = self Loop[_D].solid = true solid = false end @@ -1864,8 +2069,10 @@ function multi:uManagerRef(settings) self.CID=_D if not multi.defaultSettings.protect then Loop[_D]:Act() + __CurrentProcess = self else local status, err=pcall(Loop[_D].Act,Loop[_D]) + __CurrentProcess = self if err then Loop[_D].error=err self.OnError:Fire(Loop[_D],err) @@ -1881,18 +2088,18 @@ end -- UTILS -------- function table.merge(t1, t2) - for k,v in pairs(t2) do - if type(v) == 'table' then - if type(t1[k] or false) == 'table' then - table.merge(t1[k] or {}, t2[k] or {}) - else - t1[k] = v - end - else - t1[k] = v - end - end - return t1 + for k,v in pairs(t2) do + if type(v) == 'table' then + if type(t1[k] or false) == 'table' then + table.merge(t1[k] or {}, t2[k] or {}) + else + t1[k] = v + end + else + t1[k] = v + end + end + return t1 end if table.unpack and not unpack then unpack=table.unpack @@ -1932,7 +2139,7 @@ multi.defaultSettings = { function multi:enableLoadDetection() if multi.maxSpd then return end -- here we are going to run a quick benchMark solo - local temp = multi:newProcessor() + local temp = self:newProcessor() temp:Start() local t = os.clock() local stop = false @@ -1951,12 +2158,12 @@ local lastVal = 0 local bb = 0 function multi:getLoad() - if not multi.maxSpd then multi:enableLoadDetection() end + if not multi.maxSpd then self:enableLoadDetection() end if busy then return lastVal end local val = nil if thread.isThread() then local bench - multi:benchMark(.01):OnBench(function(time,steps) + self:benchMark(.01):OnBench(function(time,steps) bench = steps bb = steps end) @@ -1968,12 +2175,12 @@ function multi:getLoad() else busy = true local bench - multi:benchMark(.01):OnBench(function(time,steps) + self:benchMark(.01):OnBench(function(time,steps) bench = steps bb = steps end) while not bench do - multi:uManager() + self:uManager() end bench = bench^1.5 val = math.ceil((1-(bench/(multi.maxSpd/2.2)))*100) @@ -2014,10 +2221,12 @@ function multi:setPriority(s) self.defPriority = self.Priority self.PrioritySet = true end + return self end function multi:ResetPriority() self.Priority = self.defPriority + return self end function os.getOS() @@ -2150,6 +2359,7 @@ function multi:reallocate(o,n) self.Parent=o table.insert(o.Mainloop,n,self) self.Active=true + return self end function multi.timer(func,...) @@ -2168,6 +2378,7 @@ end function multi:FreeMainEvent() self.func={} + return self end function multi:connectFinal(func) @@ -2181,6 +2392,7 @@ function multi:connectFinal(func) multi.print("Warning!!! "..self.Type.." doesn't contain a Final Connection State! Use "..self.Type..":Break(func) to trigger it's final event!") self:OnBreak(func) end + return self end if os.getOS()=="windows" then @@ -2221,4 +2433,5 @@ else multi.m.sentinel = newproxy(true) getmetatable(multi.m.sentinel).__gc = multi.m.onexit end +multi:attachScheduler() return multi diff --git a/multi/integration/lanesManager/extensions.lua b/multi/integration/lanesManager/extensions.lua index ddb6525..d4fd404 100644 --- a/multi/integration/lanesManager/extensions.lua +++ b/multi/integration/lanesManager/extensions.lua @@ -68,6 +68,9 @@ function multi:newSystemThreadedJobQueue(n) local doAll = multi:newSystemThreadedQueue() local ID=1 local jid = 1 + function c:isEmpty() + return queueJob:peek()==nil + end function c:doToAll(func) for i=1,c.cores do doAll:push{ID,func} diff --git a/multi/integration/loveManager/extensions.lua b/multi/integration/loveManager/extensions.lua index cbeecf3..1673611 100644 --- a/multi/integration/loveManager/extensions.lua +++ b/multi/integration/loveManager/extensions.lua @@ -21,6 +21,8 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ]] + +-- TODO make compatible with lovr local multi, thread = require("multi").init() GLOBAL = multi.integration.GLOBAL THREAD = multi.integration.THREAD @@ -100,6 +102,9 @@ function multi:newSystemThreadedJobQueue(n) self.queue:push{name,self.id,...} return self.id end + function c:isEmpty() + return queueJob:peek()==nil + end local nFunc = 0 function c:newFunction(name,func,holup) -- This registers with the queue if type(name)=="function" then diff --git a/multi/integration/lovrManager/extensions.lua b/multi/integration/lovrManager/extensions.lua new file mode 100644 index 0000000..d2d5143 --- /dev/null +++ b/multi/integration/lovrManager/extensions.lua @@ -0,0 +1,203 @@ +--[[ +MIT License + +Copyright (c) 2020 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +local multi, thread = require("multi").init() +GLOBAL = multi.integration.GLOBAL +THREAD = multi.integration.THREAD +function multi:newSystemThreadedQueue(name) + local c = {} + c.Name = name + local fRef = {"func",nil} + function c:init() + local q = {} + q.chan = lovr.thread.getChannel(self.Name) + function q:push(dat) + if type(dat) == "function" then + fRef[2] = THREAD.dump(dat) + self.chan:push(fRef) + return + else + self.chan:push(dat) + end + end + function q:pop() + local dat = self.chan:pop() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end + end + function q:peek() + local dat = self.chan:peek() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end + end + return q + end + THREAD.package(name,c) + return c +end +function multi:newSystemThreadedTable(name) + local c = {} + c.name = name + function c:init() + return THREAD.createTable(self.name) + end + THREAD.package(name,c) + return c +end +local jqc = 1 +function multi:newSystemThreadedJobQueue(n) + local c = {} + c.cores = n or THREAD.getCores() + c.registerQueue = {} + c.funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") + c.queue = lovr.thread.getChannel("__JobQueue_"..jqc.."_queue") + c.queueReturn = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") + c.queueAll = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueAll") + c.id = 0 + c.OnJobCompleted = multi:newConnection() + local allfunc = 0 + function c:doToAll(func) + local f = THREAD.dump(func) + for i = 1, self.cores do + self.queueAll:push({allfunc,f}) + end + allfunc = allfunc + 1 + end + function c:registerFunction(name,func) + if self.funcs[name] then + error("A function by the name "..name.." has already been registered!") + end + self.funcs[name] = func + end + function c:pushJob(name,...) + self.id = self.id + 1 + self.queue:push{name,self.id,...} + return self.id + end + function c:isEmpty() + return queueJob:peek()==nil + end + local nFunc = 0 + function c:newFunction(name,func,holup) -- This registers with the queue + if type(name)=="function" then + holup = func + func = name + name = "JQ_Function_"..nFunc + end + nFunc = nFunc + 1 + c:registerFunction(name,func) + return thread:newFunction(function(...) + local id = c:pushJob(name,...) + local link + local rets + link = c.OnJobCompleted(function(jid,...) + if id==jid then + rets = {...} + link:Destroy() + end + end) + return thread.hold(function() + if rets then + return unpack(rets) or multi.NIL + end + end) + end,holup),name + end + multi:newThread("jobManager",function() + while true do + thread.yield() + local dat = c.queueReturn:pop() + if dat then + c.OnJobCompleted:Fire(unpack(dat)) + end + end + end) + for i=1,c.cores do + multi:newSystemThread("JobQueue_"..jqc.."_worker_"..i,function(jqc) + local multi, thread = require("multi"):init() + require("lovr.timer") + local function atomic(channel) + return channel:pop() + end + local clock = os.clock + local funcs = THREAD.createStaticTable("__JobQueue_"..jqc.."_table") + local queue = lovr.thread.getChannel("__JobQueue_"..jqc.."_queue") + local queueReturn = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueReturn") + local lastProc = clock() + local queueAll = lovr.thread.getChannel("__JobQueue_"..jqc.."_queueAll") + local registry = {} + setmetatable(_G,{__index = funcs}) + multi:newThread("startUp",function() + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + THREAD.loadDump(queueAll:pop()[2])() + end + end + end) + multi:newThread("runner",function() + thread.sleep(.1) + while true do + thread.yield() + local all = queueAll:peek() + if all and not registry[all[1]] then + lastProc = os.clock() + THREAD.loadDump(queueAll:pop()[2])() + end + local dat = queue:performAtomic(atomic) + if dat then + lastProc = os.clock() + local name = table.remove(dat,1) + local id = table.remove(dat,1) + local tab = {funcs[name](unpack(dat))} + table.insert(tab,1,id) + queueReturn:push(tab) + end + end + end):OnError(function(...) + error(...) + end) + multi:newThread("Idler",function() + while true do + thread.yield() + if clock()-lastProc> 2 then + THREAD.sleep(.05) + else + THREAD.sleep(.001) + end + end + end) + multi:mainloop() + end,jqc) + end + jqc = jqc + 1 + return c +end \ No newline at end of file diff --git a/multi/integration/lovrManager/init.lua b/multi/integration/lovrManager/init.lua new file mode 100644 index 0000000..523382a --- /dev/null +++ b/multi/integration/lovrManager/init.lua @@ -0,0 +1,88 @@ +--[[ +MIT License + +Copyright (c) 2020 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +-- TODO make compatible with lovr +if ISTHREAD then + error("You cannot require the lovrManager from within a thread!") +end +local ThreadFileData = [[ +ISTHREAD = true +THREAD = require("multi.integration.lovrManager.threads") -- order is important! +sThread = THREAD +__IMPORTS = {...} +__FUNC__=table.remove(__IMPORTS,1) +__THREADID__=table.remove(__IMPORTS,1) +__THREADNAME__=table.remove(__IMPORTS,1) +stab = THREAD.createStaticTable(__THREADNAME__) +GLOBAL = THREAD.getGlobal() +multi, thread = require("multi").init() +stab["returns"] = {THREAD.loadDump(__FUNC__)(unpack(__IMPORTS))} +]] +local multi, thread = require("multi.compat.lovr2d"):init() +local THREAD = {} +__THREADID__ = 0 +__THREADNAME__ = "MainThread" +multi.integration={} +multi.integration.lovr2d={} +local THREAD = require("multi.integration.lovrManager.threads") +local GLOBAL = THREAD.getGlobal() +local THREAD_ID = 1 +local OBJECT_ID = 0 +local stf = 0 +function THREAD:newFunction(func,holup) + stf = stf + 1 + return function(...) + local t = multi:newSystemThread("STF"..stf,func,...) + return thread:newFunction(function() + return thread.hold(function() + if t.stab["returns"] then + local dat = t.stab.returns + t.stab.returns = nil + return unpack(dat) + end + end) + end,holup)() + end +end +function multi:newSystemThread(name,func,...) + local c = {} + c.name = name + c.ID=THREAD_ID + c.thread=lovr.thread.newThread(ThreadFileData) + c.thread:start(THREAD.dump(func),c.ID,c.name,...) + c.stab = THREAD.createStaticTable(name) + GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread} + GLOBAL["__THREAD_COUNT"] = THREAD_ID + THREAD_ID=THREAD_ID+1 + return c +end +function lovr.threaderror(thread, errorstr) + print("Thread error!\n"..errorstr) +end +multi.integration.GLOBAL = GLOBAL +multi.integration.THREAD = THREAD +require("multi.integration.lovrManager.extensions") +print("Integrated lovr Threading!") +return {init=function() + return GLOBAL,THREAD +end} \ No newline at end of file diff --git a/multi/integration/lovrManager/threads.lua b/multi/integration/lovrManager/threads.lua new file mode 100644 index 0000000..b373136 --- /dev/null +++ b/multi/integration/lovrManager/threads.lua @@ -0,0 +1,222 @@ +--[[ +MIT License + +Copyright (c) 2020 Ryan Ward + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sub-license, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +]] +-- TODO make compatible with lovr +require("lovr.timer") +require("lovr.system") +require("lovr.data") +local socket = require("socket") +local multi, thread = require("multi").init() +local threads = {} +function threads.loadDump(d) + return loadstring(d:getString()) +end +function threads.dump(func) + return lovr.data.newByteData(string.dump(func)) +end +local fRef = {"func",nil} +local function manage(channel, value) + channel:clear() + if type(value) == "function" then + fRef[2] = THREAD.dump(value) + channel:push(fRef) + return + else + channel:push(value) + end +end +local function RandomVariable(length) + local res = {} + math.randomseed(socket.gettime()*10000) + for i = 1, length do + res[#res+1] = string.char(math.random(97, 122)) + end + return table.concat(res) +end +local GNAME = "__GLOBAL_" +local proxy = {} +function threads.set(name,val) + if not proxy[name] then proxy[name] = lovr.thread.getChannel(GNAME..name) end + proxy[name]:performAtomic(manage, val) +end +function threads.get(name) + if not proxy[name] then proxy[name] = lovr.thread.getChannel(GNAME..name) end + local dat = proxy[name]:peek() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end +end +function threads.waitFor(name) + if thread.isThread() then + return thread.hold(function() + return threads.get(name) + end) + end + while threads.get(name)==nil do + lovr.timer.sleep(.001) + end + local dat = threads.get(name) + if type(dat) == "table" and dat.init then + dat.init = threads.loadDump(dat.init) + end + return dat +end +function threads.package(name,val) + local init = val.init + val.init=threads.dump(val.init) + GLOBAL[name]=val + val.init=init +end +function threads.getCores() + return lovr.system.getProcessorCount() +end +function threads.kill() + error("Thread Killed!") +end +function threads.getThreads() + local t = {} + for i=1,GLOBAL["__THREAD_COUNT"] do + t[#t+1]=GLOBAL["__THREAD_"..i] + end + return t +end +function threads.getThread(n) + return GLOBAL["__THREAD_"..n] +end +function threads.getName() + return __THREADNAME__ +end +function threads.getID() + return __THREADID__ +end +function threads.sleep(n) + lovr.timer.sleep(n) +end +function threads.getGlobal() + return setmetatable({}, + { + __index = function(t, k) + return THREAD.get(k) + end, + __newindex = function(t, k, v) + THREAD.set(k,v) + end + } + ) +end +function threads.createTable(n) + local _proxy = {} + local function set(name,val) + if not _proxy[name] then _proxy[name] = lovr.thread.getChannel(n..name) end + _proxy[name]:performAtomic(manage, val) + end + local function get(name) + if not _proxy[name] then _proxy[name] = lovr.thread.getChannel(n..name) end + local dat = _proxy[name]:peek() + if type(dat)=="table" and dat[1]=="func" then + return THREAD.loadDump(dat[2]) + else + return dat + end + end + return setmetatable({}, + { + __index = function(t, k) + return get(k) + end, + __newindex = function(t, k, v) + set(k,v) + end + } + ) +end +function threads.getConsole() + local c = {} + c.queue = lovr.thread.getChannel("__CONSOLE__") + function c.print(...) + c.queue:push{...} + end + function c.error(err) + c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__} + error(err) + end + return c +end +if not ISTHREAD then + local clock = os.clock + local lastproc = clock() + local queue = lovr.thread.getChannel("__CONSOLE__") + multi:newThread("consoleManager",function() + while true do + thread.yield() + dat = queue:pop() + if dat then + lastproc = clock() + print(unpack(dat)) + end + if clock()-lastproc>2 then + thread.sleep(.1) + end + end + end) +end +function threads.createStaticTable(n) + local __proxy = {} + local function set(name,val) + if __proxy[name] then return end + local chan = lovr.thread.getChannel(n..name) + if chan:getCount()>0 then return end + chan:performAtomic(manage, val) + __proxy[name] = val + end + local function get(name) + if __proxy[name] then return __proxy[name] end + local dat = lovr.thread.getChannel(n..name):peek() + if type(dat)=="table" and dat[1]=="func" then + __proxy[name] = THREAD.loadDump(dat[2]) + return __proxy[name] + else + __proxy[name] = dat + return __proxy[name] + end + end + return setmetatable({}, + { + __index = function(t, k) + return get(k) + end, + __newindex = function(t, k, v) + set(k,v) + end + } + ) +end +function threads.hold(n) + local dat + while not(dat) do + dat = n() + end +end +return threads \ No newline at end of file diff --git a/multi/integration/pesudoManager/extensions.lua b/multi/integration/pesudoManager/extensions.lua index a64e67b..deae13d 100644 --- a/multi/integration/pesudoManager/extensions.lua +++ b/multi/integration/pesudoManager/extensions.lua @@ -93,6 +93,10 @@ function multi:newSystemThreadedJobQueue(n) jid = jid + 1 return jid-1 end + function c:isEmpty() + print(#jobs) + return #jobs == 0 + end local nFunc = 0 function c:newFunction(name,func,holup) -- This registers with the queue local func = stripUpValues(func) diff --git a/rockspecs/multi-15.0-0.rockspec b/rockspecs/multi-15.0-0.rockspec index 9dee9ed..1f3d52c 100644 --- a/rockspecs/multi-15.0-0.rockspec +++ b/rockspecs/multi-15.0-0.rockspec @@ -22,7 +22,7 @@ build = { modules = { ["multi"] = "multi/init.lua", ["multi.compat.love2d"] = "multi/compat/love2d.lua", - ["multi.integration"] = "multi/integration/threading.lua", + ["multi.integration.threading"] = "multi/integration/threading.lua", ["multi.integration.lanesManager"] = "multi/integration/lanesManager/init.lua", ["multi.integration.lanesManager.extensions"] = "multi/integration/lanesManager/extensions.lua", ["multi.integration.lanesManager.threads"] = "multi/integration/lanesManager/threads.lua", diff --git a/rockspecs/multi-15.1-0.rockspec b/rockspecs/multi-15.1-0.rockspec new file mode 100644 index 0000000..68ec605 --- /dev/null +++ b/rockspecs/multi-15.1-0.rockspec @@ -0,0 +1,42 @@ +package = "multi" +version = "15.1-0" +source = { + url = "git://github.com/rayaman/multi.git", + tag = "V15.1.0", +} +description = { + summary = "Lua Multi tasking library", + detailed = [[ + This library contains many methods for multi tasking. Features non coroutine based multi-tasking, coroutine based multi-tasking, and system threading (Requires use of an integration). + Check github for documentation. + ]], + homepage = "https://github.com/rayaman/multi", + license = "MIT" +} +dependencies = { + "lua >= 5.1", + "lanes", +} +build = { + type = "builtin", + modules = { + ["multi"] = "multi/init.lua", + ["multi.compat.love2d"] = "multi/compat/love2d.lua", + ["multi.compat.lovr"] = "multi/compat/lovr.lua", + ["multi.integration.lanesManager"] = "multi/integration/lanesManager/init.lua", + ["multi.integration.lanesManager.extensions"] = "multi/integration/lanesManager/extensions.lua", + ["multi.integration.lanesManager.threads"] = "multi/integration/lanesManager/threads.lua", + ["multi.integration.loveManager"] = "multi/integration/loveManager/init.lua", + ["multi.integration.loveManager.extensions"] = "multi/integration/loveManager/extensions.lua", + ["multi.integration.loveManager.threads"] = "multi/integration/loveManager/threads.lua", + --["multi.integration.lovrManager"] = "multi/integration/lovrManager/init.lua", + --["multi.integration.lovrManager.extensions"] = "multi/integration/lovrManager/extensions.lua", + --["multi.integration.lovrManager.threads"] = "multi/integration/lovrManager/threads.lua", + ["multi.integration.pesudoManager"] = "multi/integration/pesudoManager/init.lua", + ["multi.integration.pesudoManager.extensions"] = "multi/integration/pesudoManager/extensions.lua", + ["multi.integration.pesudoManager.threads"] = "multi/integration/pesudoManager/threads.lua", + ["multi.integration.luvitManager"] = "multi/integration/luvitManager.lua", + ["multi.integration.threading"] = "multi/integration/threading.lua", + --["multi.integration.networkManager"] = "multi/integration/networkManager.lua", + } +} \ No newline at end of file diff --git a/test.lua b/test.lua index af7e39c..21fd9e2 100644 --- a/test.lua +++ b/test.lua @@ -1,40 +1,113 @@ +--package.path = "./?/init.lua;"..package.path multi,thread = require("multi"):init() -GLOBAL,THREAD = require("multi.integration.threading"):init() -- Auto detects your enviroment and uses what's available -jq = multi:newSystemThreadedJobQueue(5) -- Job queue with 4 worker threads -func = jq:newFunction("test",function(a,b) - THREAD.sleep(2) - return a+b +func = thread:newFunction(function(count) + local a = 0 + while true do + a = a + 1 + thread.sleep(.5) + thread.pushStatus(a,count) + if a == count then break end + end + return "Done" end) -for i = 1,10 do - func(i,i*3).connect(function(data) - print(data) +multi:newThread("test",function() + local ret = func(10) + local ret2 = func(15) + local ret3 = func(20) + ret.OnStatus(function(part,whole) + print("Ret1: ",math.ceil((part/whole)*1000)/10 .."%") end) -end - -local a = true -b = false - -multi:newThread("Standard Thread 1",function() - while true do - thread.sleep(1) - print("Testing 1 ...",a,b,test) - end -end).OnError(function(self,msg) - print(msg) + ret2.OnStatus(function(part,whole) + print("Ret2: ",math.ceil((part/whole)*1000)/10 .."%") + end) + ret3.OnStatus(function(part,whole) + print("Ret3: ",math.ceil((part/whole)*1000)/10 .."%") + end) + thread.hold(ret2.OnReturn + ret.OnReturn + ret3.OnReturn) + print("Function Done!") + os.exit() end) --- All upvalues are stripped! no access to the global, multi and thread are exposed however -multi:newISOThread("ISO Thread 2",function() +--GLOBAL,THREAD = require("multi.integration.threading"):init() -- Auto detects your environment and uses what's available + +func2 = thread:newFunction(function() + thread.sleep(3) + print("Hello World!") + return true +end,true) -- set holdme to true + +func2:holdMe(false) -- reset holdme to false +print("Calling func...") +print(func2()) + +test = thread:newFunction(function(a,b) + thread.sleep(1) + return a,b +end) +print(test(1,2).connect(function(...) + print(...) +end)) +test:Pause() +print(test(1,2).connect(function(...) + print(...) +end)) +test:Resume() +print(test(1,2).connect(function(...) + print(...) +end)) + +test = thread:newFunction(function() + return 1,2,nil,3,4,5,6,7,8,9 +end,true) +print(test()) +multi:newThread("testing",function() + print("#Test = ",test()) + print(thread.hold(function() + print("Hello!") + return false + end,{ + interval = 2, + cycles = 3 + })) -- End result, 3 attempts within 6 seconds. If still false then timeout + print("held") +end).OnError(function(...) + print(...) +end) + +sandbox = multi:newProcessor("Test Processor") +sandbox:newTLoop(function() + print("testing...") +end,1) + +test2 = multi:newTLoop(function() + print("testing2...") +end,1) + +sandbox:newThread("Test Thread",function() + local a = 0 while true do thread.sleep(1) - print("Testing 2 ...",a,b,test) -- a and b are nil, but test is true + a = a + 1 + print("Thread Test: ".. multi.getCurrentProcess().Name) + if a == 10 then + sandbox.Stop() + end end -end,{test=true,print=print}) - -.OnError(function(self,msg) - print(msg) +end).OnError(function(...) + print(...) end) +multi:newThread("Test Thread",function() + while true do + thread.sleep(1) + print("Thread Test: ".. multi.getCurrentProcess().Name) + end +end).OnError(function(...) + print(...) +end) + +sandbox.Start() + multi:mainloop() \ No newline at end of file diff --git a/tests/objectTests.lua b/tests/objectTests.lua new file mode 100644 index 0000000..9296a48 --- /dev/null +++ b/tests/objectTests.lua @@ -0,0 +1,3 @@ +return function objectTests(multi,thread) + print("Testing Alarms!") +end \ No newline at end of file diff --git a/tests/runtests.lua b/tests/runtests.lua index a4c3263..e0faa88 100644 --- a/tests/runtests.lua +++ b/tests/runtests.lua @@ -14,4 +14,8 @@ package.path="../?.lua;../?/init.lua;../?.lua;../?/?/init.lua;"..package.path Each test that is ran should have a 5 second pause after the test is complete The expected and actual should "match" (Might be impossible when playing with threads) This will be pushed directly to the master as tests start existing. -]] \ No newline at end of file +]] +local multi, thread = require("multi"):init() +function runTest(path) + +end \ No newline at end of file