some more updates

This commit is contained in:
Ryan Ward 2019-01-28 15:53:32 -05:00
parent 0abd8183b5
commit 4040e0d9d0
7 changed files with 445 additions and 177 deletions

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ lanestest.lua
sample-node.lua sample-node.lua
sample-master.lua sample-master.lua
Ayn Rand - The Virtue of Selfishness-Mg4QJheclsQ.m4a Ayn Rand - The Virtue of Selfishness-Mg4QJheclsQ.m4a
Atlas Shrugged by Ayn Rand Audiobook-9s2qrEau63E.webm

View File

@ -765,7 +765,7 @@ multi:mainloop()
CBT: Hyper Threaded Process CBT: Hyper Threaded Process
--------------------------- ---------------------------
`process = multi:newHyperThreadedProcess(STRING name)` -- Creates a process object that is able allows all processes created on it to use the thread.* namespace. Hold/Sleep/Skip can be used in each multi obj created without stopping each other object that is running. `process = multi:newHyperThreadedProcess(STRING name)` -- Creates a process object that is able allows all processes created on it to use the thread.* namespace. Hold/Sleep/Skip can be used in each multi obj created without stopping each other object that is running, but allows for one to pause/halt a process and stop all objects running in that process.
`nil = process:getController()` -- Returns nothing there is no "controller" when using threaded processes `nil = process:getController()` -- Returns nothing there is no "controller" when using threaded processes
`self = process:Start()` -- Starts the processor `self = process:Start()` -- Starts the processor
@ -798,7 +798,7 @@ System Threads (ST) - Multi-Integration Getting Started
------------------------------------------------------- -------------------------------------------------------
The system threads need to be required seperatly. The system threads need to be required seperatly.
```lua ```lua
local GLOBAL, THREAD = require("multi.integration.lanesManager").init() -- We will talk about the global and thread interface that is returned local GLOBAL, THREAD = require("multi.integration.lanesManager").init()# -- We will talk about the global and thread interface that is returned
GLOBAL, THREAD = require("multi.integration.loveManager").init() GLOBAL, THREAD = require("multi.integration.loveManager").init()
GLOBAL, THREAD = require("luvitManager")-- There is a catch to this* GLOBAL, THREAD = require("luvitManager")-- There is a catch to this*
``` ```
@ -808,14 +808,15 @@ Using this integration modifies some methods that the multi library has.
This variable is created on the main thread only inside of the multi namespace: multi.isMainThread = true This variable is created on the main thread only inside of the multi namespace: multi.isMainThread = true
This is used to know which thread is the main thread. When network threads are being discussed there is a gotcha that needs to be addressed. This is used to know which thread is the main thread. When network threads are being discussed there is a gotcha that needs to be addressed.
*** GLOBAL and THREAD do not work currently when using the luvit integration `*` GLOBAL and THREAD do not work currently when using the luvit integration
`#`So you may have noticed that when using the lanes manager you need to make the global and thread local, this is due to how lanes copies local variables between states. Also love2d does not require this, actually things will break if this is done! Keep these non local since the way threading is handled at the lower level is much different anyway so GLOBAL and THREAD is automatically set up for use within a spawned thread!
ST - THREAD namespace ST - THREAD namespace
--------------------- ---------------------
`THREAD.set(STRING name, VALUE val)` -- Sets a value in GLOBAL `THREAD.set(STRING name, VALUE val)` -- Sets a value in GLOBAL
`THREAD.get(STRING name)` -- Gets a value in GLOBAL `THREAD.get(STRING name)` -- Gets a value in GLOBAL
`THREAD.waitFor(STRING name)` -- Waits for a value in GLOBAL to exist `THREAD.waitFor(STRING name)` -- Waits for a value in GLOBAL to exist
`THREAD.testFor(STRING name, VALUE val, STRING sym)` -- **NOT YET IMPLEMENTED** `THREAD.testFor(STRING name, VALUE val, STRING sym)` -- **NOT YET IMPLEMENTED** but planned
`THREAD.getCores()` -- Returns the number of actual system threads/cores `THREAD.getCores()` -- Returns the number of actual system threads/cores
`THREAD.kill()` -- Kills the thread `THREAD.kill()` -- Kills the thread
`THREAD.getName()` -- Returns the name of the working thread `THREAD.getName()` -- Returns the name of the working thread
@ -832,25 +833,176 @@ print(GLOBAL["name"])
Removes the need to use THREAD.set() and THREAD.get() Removes the need to use THREAD.set() and THREAD.get()
ST - System Threads ST - System Threads
------------------- -------------------
`systemThread = multi:newSystemThread(STRING thread_name,FUNCTION spawned_function,ARGUMENTS ...)` -- Spawns a thread with a certain name.
`systemThread:kill()` -- kills a thread; can only be called in the main thread!
System Threads are the feature that allows a user to interact with systen threads. It differs from regular coroutine based thread in how it can interact with variables. When using system threads the GLOBAL table is the "only way"* to send data. Spawning a System thread is really simple once all the required libraries are in place. See example below:
```lua
local multi = require("multi") -- keep this global when using lanes or implicitly define multi within the spawned thread
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
multi:newSystemThread("Example thread",function()
local multi = require("multi") -- we are in a thread so lets not refer to that upvalue!
print("We have spawned a thread!")
-- we could do work but theres no need to we can save that for other examples
print("Lets have a non ending loop!")
while true do
-- If this was not in a thread execution would halt for the entire process
end
end,"A message that we are passing") -- There are restrictions on what can be passed!
tloop = multi:newTLoop(function()
print("I'm still kicking!")
end,1)
multi:mainloop()
```
*This isn't entirely true, as of right now the compatiablity with the lanes library and love2d engine have their own methods to share data, but if you would like to have your code work in both enviroments then using the GLOBAL table and the data structures provided by the multi library will ensure this happens. If you do not plan on having support for both platforms then feel free to use linda's in lanes and channels in love2d.
Note: luvit currently has very basic support, it only allows the spawning of system threads, but no way to send data back and forth as of yet. I do not know if this is doable or not, but I will keep looking into it. If I can somehow emulate System Threaded Queues and the GLOBAL tabke then all other datastructures will work!
ST - System Threaded Objects
----------------------------
Great we are able to spawn threads, but unless your working with a process that works on passed data and then uses a socket or writes to the disk I can't do to much with out being able to pass data between threads. This section we will look at how we can share objects between threads. In order to keep the compatibility between both love2d and lanes I had to format the system threaded objects in a strange way, but they are consistant and should work on both enviroments.
When creating objects with a name they are automatically exposed to the GLOBAL table. Which means you can retrieve them from a spawned thread. For example we have a queue object, which will be discussed in more detail next.
```lua
-- Exposing a queue
multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init() -- The standard setup above
queue = multi:newSystemThreadedQueue("myQueue"):init() -- We create and initiate the queue for the main thread
queue:push("This is a test!") -- We push some data onto the queue that other threads can consume and do stuff with
multi:newSystemThread("Example thread",function() -- Create a system thread
queue = THREAD.waitFor("myQueue"):init() -- Get the queue. It is good pratice to use the waitFor command when getting objects. If it doesn't exist yet we wait for it, preventing future errors. It is possible for the data to not ve present when a thread is looking for it! Especally when using the love2d module, my fault needs some rewriting data passing on the GLOBAL is quite slow, but the queue internally uses channels so after it is exposed you should have good speeds!
local data = queue:pop() -- Get the data
print(data) -- print the data
end)
multi:mainloop()
```
ST - SystemThreadedQueue ST - SystemThreadedQueue
------------------------ ------------------------
`queue(nonInit) = multi:newSystemThreadedQueue(STRING name)` -- You must enter a name!
`queue = queue:init()` -- initiates the queue, without doing this it will not work
`void = queue:push(DATA data)` -- Pushes data into a queue that all threads that have been shared have access to
`data = queue:pop()` -- pops data from the queue removing it from all threads
`data = queue:peek()` -- looks at data that is on the queue, but dont remove it from the queue
ST - SystemThreadedConnection This object the System Threaded Queue is the basis for all other data structures that a user has access to within the "shared" objects.
-----------------------------
ST - SystemThreadedBenchmark General tips when using a queue. You can always pop from a queue without worrying if another thread poped that same data, BUT if you are peeking at a queue there is the possibility that another thread popped the data while you are peeking and this could cause an issue, depends on what you are doing though. It's important to keep this in mind when using queues.
----------------------------
Let's get into some examples:
```lua
multi = require("multi")
thread_names = {"Thread_A","Thread_B","Thread_C","Thread_D"}
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
queue = multi:newSystemThreadedQueue("myQueue"):init()
for _,n in pairs(thread_names) do
multi:newSystemThread(n,function()
queue = THREAD.waitFor("myQueue"):init()
local name = THREAD.getName()
local data = queue:pop()
while data do
print(name.." "..data)
data = queue:pop()
end
end)
end
for i=1,100 do
queue:push(math.random(1,1000))
end
multi:newEvent(function() -- Felt like using the event object, I hardly use them for anything non internal
return not queue:peek()
end):OnEvent(function()
print("No more data within the queue!")
os.exit()
end)
multi:mainloop()
```
You have probable noticed that the output from this is a total mess! Well I though so too, and created the system threaded console!
ST - SystemThreadedConsole ST - SystemThreadedConsole
-------------------------- --------------------------
`console(nonInit) = multi:newSystemThreadedConsole(STRING name)` -- Creates a console object called name. The name is mandatory!
`concole = console:inti()` -- initiates the console object
`console:print(...)` -- prints to the console
`console:write(msg)` -- writes to the console, to be fair you wouldn't want to use this one.
ST - SystemThreadedTable The console makes printing from threads much cleaner. We will use the same example from above with the console implemented and compare the outputs and how readable they now are!
------------------------
```lua
multi = require("multi")
thread_names = {"Thread_A","Thread_B","Thread_C","Thread_D"}
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
multi:newSystemThreadedConsole("console"):init()
queue = multi:newSystemThreadedQueue("myQueue"):init()
for _,n in pairs(thread_names) do
multi:newSystemThread(n,function()
local queue = THREAD.waitFor("myQueue"):init()
local console = THREAD.waitFor("console"):init()
local name = THREAD.getName()
local data = queue:pop()
while data do
--THREAD.sleep(.1) -- uncomment this to see them all work
console:print(name.." "..data)
data = queue:pop()
end
end)
end
for i=1,100 do
queue:push(math.random(1,1000))
end
multi:newEvent(function()
return not queue:peek()
end):OnEvent(function()
multi:newAlarm(.1):OnRing(function() -- Well the mainthread has to read from an internal queue so we have to wait a sec
print("No more data within the queue!")
os.exit()
end)
end)
multi:mainloop()
```
As you see the output here is so much cleaner, but we have a small gotcha, you probably noticed that I used an alarm to delay the exiting of the program for a bit. This is due to how the console object works, I send all the print data into a queue that the main thread then reads and prints out when it looks at the queue. This should not be an issue since you gain so much by having clean outputs!
Another thing to note, because system threads are put to work one thread at a time, really quick though, the first thread that is loaded is able to complete the tasks really fast, its just printing after all. If you want to see all the threads working uncomment the code with THREAD.sleep(.1)
ST - SystemThreadedJobQueue ST - SystemThreadedJobQueue
--------------------------- ---------------------------
ST - SystemThreadedConnection - WIP*
-----------------------------
`connection(nonInit) = multi:newSystemThreadedConnection(name,protect)` -- creates a connecion object
`connection = connection:init()` -- initaties the connection object
`connectionID = connection:connect(FUNCTION func)` -- works like the regular connect function
`void = connection:holdUT(NUMBER/FUNCTION n)` -- works just like the regular holdut function
`void = connection:Remove()` -- works the same as the default
`voic = connection:Fire(ARGS ...)` -- works the same as the default
In the current form a connection object requires that the multi:mainloop() is running on the threads that are sharing this object! By extention since SystemThreadedTables rely on SystemThreadedConnections they have the same requirements. Both objects should not be used for now.
Since the current object is not in a stable condition, I will not be providing examples of how to use it just yet!
*The main issue we have with the connection objects in this form is proper comunication and memory managament between threads. For example if a thread crashes or no longer exists the current apporach to how I manage the connection objects will cause all connections to halt. This feature is still being worked on and has many bugs to be patched out. for now only use for testing purposes.
ST - SystemThreadedTable - WIP*
------------------------
ST - SystemThreadedBenchmark
----------------------------
`bench = multi:SystemThreadedBenchmark(NUMBER seconds)` -- runs a benchmark for a certain amount of time
`bench:OnBench(FUNCTION callback(NUMBER steps/second))`
```lua
multi = require("multi")
local GLOBAL, THREAD = require("multi.integration.lanesManager").init()
multi:SystemThreadedBenchmark(1).OnBench(function(...)
print(...)
end)
multi:mainloop()
```
ST - SystemThreadedExecute ST - SystemThreadedExecute
-------------------------- --------------------------

View File

@ -1,10 +1,10 @@
#Changes #Changes
[TOC] [TOC]
Update 13.0.0 So you documented it, finally, but it's sad to see some things go isn't it? Update 13.0.0 So you documented it, finally! New additions/changes/ and fixes
------------- -------------
Fixed: Tons of bugs, I actually went through the entire library and did a full test of everything, I mean everything, while writing the documentation. Fixed: Tons of bugs, I actually went through the entire library and did a full test of everything, I mean everything, while writing the documentation.
Changed: Changed:
- A few things, to make concepts in the library more clear - A few things, to make concepts in the library more clear.
- The way functions returned paused status. Before it would return "PAUSED" now it returns nil, true if paused - The way functions returned paused status. Before it would return "PAUSED" now it returns nil, true if paused
- Modified the connection object to allow for some more syntaxial suger! - Modified the connection object to allow for some more syntaxial suger!
@ -20,8 +20,18 @@ end)
multi:mainloop() multi:mainloop()
``` ```
Function Example:
```lua
func = multi:newFunction(function(self,a,b)
self:Pause()
return 1,2,3
end)
print(func()) -- returns: 1, 2, 3
print(func()) -- nil, true
```
Removed: Removed:
- Ranges and conditions -- corutine based threads can dmulate what these objects did and much better! - Ranges and conditions -- corutine based threads can emulate what these objects did and much better!
- Due to the creation of hyper threaded processes the following objects are no more! - Due to the creation of hyper threaded processes the following objects are no more!
-- ~~multi:newThreadedEvent()~~ -- ~~multi:newThreadedEvent()~~
-- ~~multi:newThreadedLoop()~~ -- ~~multi:newThreadedLoop()~~
@ -30,15 +40,22 @@ Removed:
-- ~~multi:newThreadedTStep()~~ -- ~~multi:newThreadedTStep()~~
-- ~~multi:newThreadedAlarm()~~ -- ~~multi:newThreadedAlarm()~~
-- ~~multi:newThreadedUpdater()~~ -- ~~multi:newThreadedUpdater()~~
-- ~~multi:newTBase()~~ -- Acted as the base for creating the other objects
These didn't have much use in their previous form, but with the addition of hyper threaded processes the goals that these objects aimed to solve are now possible using a process These didn't have much use in their previous form, but with the addition of hyper threaded processes the goals that these objects aimed to solve are now possible using a process
Fixed: Fixed:
- There were some bugs in the networkmanager.lua file. Desrtoy -> Destroy some misspellings. - There were some bugs in the networkmanager.lua file. Desrtoy -> Destroy some misspellings.
- Massive object management bugs which caused performance to drop like a rock. Remember to Destroy objects when no longer using them. I should probably start working on a garbage collector for these objects!
- Found a bug with processors not having the Destroy() function implemented properly.
Added: Added:
- multi:newHyperThreadedProcess(STRING name) -- This is a version of the threaded process that gives each object created its own coroutine based thread which means you can use thread.* without affecting other objects created within the hyper threaded processes. - multi:newHyperThreadedProcess(STRING name) -- This is a version of the threaded process that gives each object created its own coroutine based thread which means you can use thread.* without affecting other objects created within the hyper threaded processes.
- multi:newConnector() -- A simple object that allows you to use the new connection Fire syntax without using a multi obj - multi:newConnector() -- A simple object that allows you to use the new connection Fire syntax without using a multi obj or the standard object format that I follow.
- multi:purge() -- Removes all references to objects that are contained withing the processes list of tasks to do. Doing this will stop all objects from functioning. Calling Resume on an object should make it work again.
- multi:getTasksDetails(STRING format) -- Simple function, will get massive updates in the future, as of right now It will print out the current processes that are running; listing their type, uptime, and priority. More useful additions will be added in due time. Format can be either a string "s" or "t" see below for the table format
- multi:endTask(TID) -- Use multi:getTasksDetails("t") to get the tid of a task
- multi:enableLoadDetection() -- Since load detection puts some strain on the system (very little) I decided to make it something that has to be enabled. Once on it cant be turned off!
```lua ```lua
package.path="?/init.lua;?.lua;"..package.path package.path="?/init.lua;?.lua;"..package.path
@ -54,7 +71,7 @@ test:newTLoop(function()
conn:OnTest() conn:OnTest()
end,1) end,1)
test:newLoop(function() test:newLoop(function()
print("HI2!") print("HEY!")
thread.sleep(.5) thread.sleep(.5)
end) end)
multi:newAlarm(3):OnRing(function() multi:newAlarm(3):OnRing(function()
@ -63,7 +80,22 @@ end)
test:Start() test:Start()
multi:mainloop() multi:mainloop()
``` ```
Table format for getTasksDetails(STRING format)
```lua
{
{TID = 1,Type="",Priority="",Uptime=0}
{TID = 2,Type="",Priority="",Uptime=0}
...
{TID = n,Type="",Priority="",Uptime=0}
ThreadCount = 0
threads={
[Thread_Name]={
Uptime = 0
}
}
}
```
**Note:** After adding the getTasksDetails() function I noticed many areas where threads, and tasks were not being cleaned up and fixed the leaks. I also found out that a lot of tasks were starting by default and made them enable only. If you compare the benchmark from this version to last version you;ll notice a signifacant increase in performance.
Update 12.2.2 Time for some more bug fixes! Update 12.2.2 Time for some more bug fixes!
------------- -------------
Fixed: multi.Stop() not actually stopping due to the new pirority management scheme and preformance boost changes. Fixed: multi.Stop() not actually stopping due to the new pirority management scheme and preformance boost changes.

View File

@ -61,11 +61,20 @@ multi.Priority_Normal = 64
multi.Priority_Below_Normal = 256 multi.Priority_Below_Normal = 256
multi.Priority_Low = 1024 multi.Priority_Low = 1024
multi.Priority_Idle = 4096 multi.Priority_Idle = 4096
multi.PriorityResolve = {
[1]="Core",
[4]="High",
[16]="Above Normal",
[64]="Normal",
[256]="Below Normal",
[1024]="Low",
[4096]="Idle",
}
multi.PStep = 1 multi.PStep = 1
multi.PList = {multi.Priority_Core,multi.Priority_High,multi.Priority_Above_Normal,multi.Priority_Normal,multi.Priority_Below_Normal,multi.Priority_Low,multi.Priority_Idle} multi.PList = {multi.Priority_Core,multi.Priority_High,multi.Priority_Above_Normal,multi.Priority_Normal,multi.Priority_Below_Normal,multi.Priority_Low,multi.Priority_Idle}
--^^^^ --^^^^
multi.PriorityTick=1 -- Between 1, 2 and 4 multi.PriorityTick=1 -- Between 1, 2 and 4
multi.Priority=multi.Priority_Core multi.Priority=multi.Priority_High
multi.threshold=256 multi.threshold=256
multi.threstimed=.001 multi.threstimed=.001
function multi.queuefinal(self) function multi.queuefinal(self)
@ -240,6 +249,75 @@ function multi:benchMark(sec,p,pt)
self.tt=function() end self.tt=function() end
return temp return temp
end end
function multi.Round(num, numDecimalPlaces)
local mult = 10^(numDecimalPlaces or 0)
return math.floor(num * mult + 0.5) / mult
end
function multi.AlignTable(tab)
local longest = {}
local columns = #tab[1]
local rows = #tab
for i=1, columns do
longest[i] = -math.huge
end
for i = 1,rows do
for j = 1,columns do
tab[i][j] = tostring(tab[i][j])
if #tab[i][j]>longest[j] then
longest[j] = #tab[i][j]
end
end
end
for i = 1,rows do
for j = 1,columns do
if tab[i][j]~=nil and #tab[i][j]<longest[j] then
tab[i][j]=tab[i][j]..string.rep(" ",longest[j]-#tab[i][j])
end
end
end
local str = {}
for i = 1,rows do
str[#str+1] = table.concat(tab[i]," ")
end
return table.concat(str,"\n")
end
function multi:getTasksDetails(t)
if not multi.load_updater then
multi:enableLoadDetection()
end
if t == "string" or not t then
str = {
{"Type","Uptime","Priority","TID"}
}
for i,v in pairs(self.Mainloop) do
local name = v.Name or ""
if name~="" then
name = " <"..name..">"
end
table.insert(str,{v.Type:sub(1,1):upper()..v.Type:sub(2,-1)..name,multi.Round(os.clock()-v.creationTime,3),self.PriorityResolve[v.Priority],i})
end
local s = multi.AlignTable(str)
dat = ""
for i=1,#multi.scheduler.Threads do
dat = dat .. "<THREAD: "..multi.scheduler.Threads[i].Name.." | "..os.clock()-multi.scheduler.Threads[i].creationTime..">\n"
end
return "Load on manager: "..multi.Round(multi:getLoad(),2).."%\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#multi.scheduler.Threads.."\n\n"..s.."\n\n"..dat
elseif t == "t" or t == "table" then
str = {ThreadCount = #multi.scheduler.Threads,MemoryUsage = math.ceil(collectgarbage("count")).." KB"}
str.threads = {}
for i,v in pairs(self.Mainloop) do
str[#str+1]={Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = i}
end
for i=1,#multi.scheduler.Threads do
str.threads[multi.scheduler.Threads[i].Name]={Uptime = os.clock()-multi.scheduler.Threads[i].creationTime}
end
return str
end
end
function multi:endTask(TID)
self.Mainloop[TID]:Destroy()
end
function multi.startFPSMonitior() function multi.startFPSMonitior()
if not multi.runFPS then if not multi.runFPS then
multi.doFPS(s) multi.doFPS(s)
@ -432,6 +510,9 @@ multi.IsDone=multi.isDone
function multi:create(ref) function multi:create(ref)
multi.OnObjectCreated:Fire(ref,self) multi.OnObjectCreated:Fire(ref,self)
end end
function multi:setName(name)
self.Name = name
end
--Constructors [CORE] --Constructors [CORE]
function multi:newBase(ins) function multi:newBase(ins)
if not(self.Type=='mainprocess' or self.Type=='process' or self.Type=='queue') then error('Can only create an object on multi or an interface obj') return false end if not(self.Type=='mainprocess' or self.Type=='process' or self.Type=='queue') then error('Can only create an object on multi or an interface obj') return false end
@ -451,6 +532,7 @@ function multi:newBase(ins)
c.Act=function() end c.Act=function() end
c.Parent=self c.Parent=self
c.held=false c.held=false
c.creationTime = os.clock()
if ins then if ins then
table.insert(self.Mainloop,ins,c) table.insert(self.Mainloop,ins,c)
else else
@ -469,29 +551,33 @@ function multi:newProcessor(file)
c.Mainloop={} c.Mainloop={}
c.Garbage={} c.Garbage={}
c.Children={} c.Children={}
c.Active=true c.Active=false
c.Id=-1 c.Id=-1
c.Rest=0 c.Rest=0
c.Jobs={} c.Jobs={}
c.queue={} c.queue={}
c.jobUS=2 c.jobUS=2
c.l=self:newLoop(function(self,dt) c:uManager() end) c.l=self:newLoop(function(self,dt)
c.l:Pause() if self.link.Active then
c:uManager()
end
end)
c.l.link = c
c.l.Type = "process"
function c:getController() function c:getController()
return c.l return c.l
end end
function c:Start() function c:Start()
if self.l then self.Active = true
self.l:Resume()
end
return self return self
end end
function c:Resume() function c:Resume()
if self.l then self.Active = false
self.l:Resume()
end
return self return self
end end
function c:setName(name)
c.Name = name
end
function c:Pause() function c:Pause()
if self.l then if self.l then
self.l:Pause() self.l:Pause()
@ -506,8 +592,8 @@ function multi:newProcessor(file)
self.Cself=c self.Cself=c
loadstring('local process=multi.Cself '..io.open(file,'rb'):read('*all'))() loadstring('local process=multi.Cself '..io.open(file,'rb'):read('*all'))()
end end
self.__Destroy = self.Destroy c.__Destroy = self.Destroy
self.Destroy = Remove c.Destroy = c.Remove
self:create(c) self:create(c)
--~ c:IngoreObject() --~ c:IngoreObject()
return c return c
@ -962,6 +1048,7 @@ function multi:newTLoop(func,set)
c.set=set or 0 c.set=set or 0
c.timer=self:newTimer() c.timer=self:newTimer()
c.life=0 c.life=0
c:setPriority("Low")
if func then if func then
c.func={func} c.func={func}
end end
@ -1305,20 +1392,9 @@ function thread.testFor(name,_val,sym)
end) end)
return thread.get(name) return thread.get(name)
end end
function multi:newTBase(name) multi:setDomainName("Threads")
local c = {} multi:setDomainName("Globals")
c.name=name local initT = false
c.Active=true
c.func={}
c.ender={}
c.Id=0
c.Parent=self
c.important={}
c.held=false
c.ToString=multi.ToString
c.ToFile=multi.ToFile
return c
end
function multi:newThread(name,func) function multi:newThread(name,func)
local c={} local c={}
c.ref={} c.ref={}
@ -1327,7 +1403,7 @@ function multi:newThread(name,func)
c.sleep=1 c.sleep=1
c.Type="thread" c.Type="thread"
c.firstRunDone=false c.firstRunDone=false
c.timer=multi.scheduler:newTimer() c.timer=multi:newTimer()
c.ref.Globals=self:linkDomain("Globals") c.ref.Globals=self:linkDomain("Globals")
function c.ref:send(name,val) function c.ref:send(name,val)
ret=coroutine.yield({Name=name,Value=val}) ret=coroutine.yield({Name=name,Value=val})
@ -1358,22 +1434,23 @@ function multi:newThread(name,func)
self.Globals=v self.Globals=v
end end
table.insert(self:linkDomain("Threads"),c) table.insert(self:linkDomain("Threads"),c)
if not multi.scheduler:isActive() then if initT==false then
multi.scheduler:Resume() multi.initThreads()
end end
c.creationTime = os.clock()
end end
multi:setDomainName("Threads") function multi.initThreads()
multi:setDomainName("Globals") initT = true
multi.scheduler=multi:newLoop() multi.scheduler=multi:newLoop()
multi.scheduler.Type="scheduler" multi.scheduler.Type="scheduler"
function multi.scheduler:setStep(n) function multi.scheduler:setStep(n)
self.skip=tonumber(n) or 24 self.skip=tonumber(n) or 24
end end
multi.scheduler.skip=0 multi.scheduler.skip=0
multi.scheduler.counter=0 multi.scheduler.counter=0
multi.scheduler.Threads=multi:linkDomain("Threads") multi.scheduler.Threads=multi:linkDomain("Threads")
multi.scheduler.Globals=multi:linkDomain("Globals") multi.scheduler.Globals=multi:linkDomain("Globals")
multi.scheduler:OnLoop(function(self) multi.scheduler:OnLoop(function(self)
self.counter=self.counter+1 self.counter=self.counter+1
for i=#self.Threads,1,-1 do for i=#self.Threads,1,-1 do
ret={} ret={}
@ -1418,6 +1495,7 @@ multi.scheduler:OnLoop(function(self)
event.counter=self.counter+ret[2] event.counter=self.counter+ret[2]
event:OnEvent(function(evnt) event:OnEvent(function(evnt)
evnt.link.sleep=0 evnt.link.sleep=0
evnt:Destroy()
end) end)
elseif ret[1]=="_hold_" then elseif ret[1]=="_hold_" then
self.Threads[i].timer:Reset() self.Threads[i].timer:Reset()
@ -1428,6 +1506,9 @@ multi.scheduler:OnLoop(function(self)
event:OnEvent(function(evnt) event:OnEvent(function(evnt)
evnt.link.sleep=0 evnt.link.sleep=0
evnt.link.returns = evnt.returns evnt.link.returns = evnt.returns
multi.nextStep(function()
evnt:Destroy()
end)
end) end)
elseif ret.Name then elseif ret.Name then
self.Globals[ret.Name]=ret.Value self.Globals[ret.Name]=ret.Value
@ -1435,8 +1516,8 @@ multi.scheduler:OnLoop(function(self)
end end
end end
end end
end) end)
multi.scheduler:Pause() end
multi.OnError=multi:newConnection() multi.OnError=multi:newConnection()
function multi:newThreadedProcess(name) function multi:newThreadedProcess(name)
local c = {} local c = {}
@ -1663,6 +1744,7 @@ function multi:threadloop(settings)
event.counter=counter+ret[2] event.counter=counter+ret[2]
event:OnEvent(function(evnt) event:OnEvent(function(evnt)
evnt.link.sleep=0 evnt.link.sleep=0
evnt:Destroy()
end) end)
elseif ret[1]=="_hold_" then elseif ret[1]=="_hold_" then
Threads[i].timer:Reset() Threads[i].timer:Reset()
@ -1671,6 +1753,7 @@ function multi:threadloop(settings)
event.link=Threads[i] event.link=Threads[i]
event:OnEvent(function(evnt) event:OnEvent(function(evnt)
evnt.link.sleep=0 evnt.link.sleep=0
evnt:Destroy()
end) end)
elseif ret.Name then elseif ret.Name then
Globals[ret.Name]=ret.Value Globals[ret.Name]=ret.Value
@ -2087,7 +2170,6 @@ function multi:IngoreObject()
self.Ingore=true self.Ingore=true
return self return self
end end
multi.scheduler:IngoreObject()
function multi:ToString() function multi:ToString()
if self.Ingore then return end if self.Ingore then return end
local t=self.Type local t=self.Type
@ -2268,34 +2350,6 @@ function multi:newFromString(str)
local item=self:newLoop() local item=self:newLoop()
table.merge(item,data) table.merge(item,data)
return item return item
elseif t=="eventThread" then -- GOOD
local item=self:newThreadedEvent(data.name)
table.merge(item,data)
return item
elseif t=="loopThread" then -- GOOD
local item=self:newThreadedLoop(data.name)
table.merge(item,data)
return item
elseif t=="stepThread" then -- GOOD
local item=self:newThreadedStep(data.name)
table.merge(item,data)
return item
elseif t=="tloopThread" then -- GOOD
local item=self:newThreadedTLoop(data.name,nil,data.restN)
table.merge(item,data)
return item
elseif t=="tstepThread" then -- GOOD
local item=self:newThreadedTStep(data.name)
table.merge(item,data)
return item
elseif t=="updaterThread" then -- GOOD
local item=self:newThreadedUpdater(data.name)
table.merge(item,data)
return item
elseif t=="alarmThread" then -- GOOD
local item=self:newThreadedAlarm(data.name)
table.merge(item,data)
return item
end end
end end
function multi:Important(varname) function multi:Important(varname)
@ -2322,13 +2376,15 @@ end
function multi:setDefualtStateFlag(opt) function multi:setDefualtStateFlag(opt)
-- --
end end
multi.dStepA = 0 function multi:enableLoadDetection()
multi.dStepB = 0 if multi.load_updater then return end
multi.dSwap = 0 multi.dStepA = 0
multi.deltaTarget = .05 multi.dStepB = 0
multi.load_updater = multi:newUpdater(2) multi.dSwap = 0
multi.load_updater:Pause() multi.deltaTarget = .05
multi.load_updater:OnUpdate(function(self) multi.load_updater = multi:newUpdater(2)
multi.load_updater:setName("LoadDetector")
multi.load_updater:OnUpdate(function(self)
if self.Parent.dSwap == 0 then if self.Parent.dSwap == 0 then
self.Parent.dStepA = os.clock() self.Parent.dStepA = os.clock()
self.Parent.dSwap = 1 self.Parent.dSwap = 1
@ -2336,5 +2392,6 @@ multi.load_updater:OnUpdate(function(self)
self.Parent.dSwap = 0 self.Parent.dSwap = 0
self.Parent.dStepB = os.clock() self.Parent.dStepB = os.clock()
end end
end) end)
end
return multi return multi

View File

@ -142,7 +142,7 @@ function multi:nodeManager(port)
server.OnDataRecieved(function(server,data,cid,ip,port) server.OnDataRecieved(function(server,data,cid,ip,port)
local cmd = data:sub(1,1) local cmd = data:sub(1,1)
if cmd == "R" then if cmd == "R" then
multi:newThread("Test",function(loop) multi:newThread("Node Client Manager",function(loop)
while true do while true do
if server.timeouts[cid]==true then if server.timeouts[cid]==true then
server.OnNodeRemoved:Fire(server.nodes[cid]) server.OnNodeRemoved:Fire(server.nodes[cid])
@ -175,6 +175,7 @@ function multi:nodeManager(port)
end end
-- The main driving force of the network manager: Nodes -- The main driving force of the network manager: Nodes
function multi:newNode(settings) function multi:newNode(settings)
multi:enableLoadDetection()
settings = settings or {} settings = settings or {}
-- Here we have to use the net library to broadcast our node across the network -- Here we have to use the net library to broadcast our node across the network
math.randomseed(os.time()) math.randomseed(os.time())
@ -439,7 +440,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
name = self:getRandomNode() name = self:getRandomNode()
end end
if name==nil then if name==nil then
multi:newThread("Test",function(loop) multi:newThread("Network Thread Manager",function(loop)
while true do while true do
if name~=nil then if name~=nil then
self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData) self:sendTo(name,char(CMD_TASK)..len..aData..len2..fData)
@ -461,7 +462,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
name = "NODE_"..name name = "NODE_"..name
end end
if self.connections[name]==nil then if self.connections[name]==nil then
multi:newThread("Test",function(loop) multi:newThread("Node Data Link Controller",function(loop)
while true do while true do
if self.connections[name]~=nil then if self.connections[name]~=nil then
self.connections[name]:send(data) self.connections[name]:send(data)
@ -504,7 +505,7 @@ function multi:newMaster(settings) -- You will be able to have more than one mas
client.OnClientReady(function() client.OnClientReady(function()
client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect client:send(char(CMD_INITMASTER)..master.name) -- Tell the node that you are a master trying to connect
if not settings.managerDetails then if not settings.managerDetails then
multi:newThread("Test",function(loop) multi:newThread("Node Data Link Controller",function(loop)
while true do while true do
if master.timeouts[name]==true then if master.timeouts[name]==true then
master.timeouts[name] = nil master.timeouts[name] = nil

View File

@ -211,6 +211,7 @@ function multi:SystemThreadedBenchmark(n)
multi:benchMark(n):OnBench(function(self,count) multi:benchMark(n):OnBench(function(self,count)
queue:push(count) queue:push(count)
sThread.kill() sThread.kill()
error("Thread was killed!")
end) end)
multi:mainloop() multi:mainloop()
end,n) end,n)
@ -247,10 +248,10 @@ function multi:newSystemThreadedConsole(name)
end end
local cc={} local cc={}
if multi.isMainThread then if multi.isMainThread then
if GLOBAL["__SYSTEM_CONSLOE__"] then if GLOBAL["__SYSTEM_CONSOLE__"] then
cc.stream = sThread.waitFor("__SYSTEM_CONSLOE__"):init() cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init()
else else
cc.stream = multi:newSystemThreadedQueue("__SYSTEM_CONSLOE__"):init() cc.stream = multi:newSystemThreadedQueue("__SYSTEM_CONSOLE__"):init()
multi:newLoop(function() multi:newLoop(function()
local data = cc.stream:pop() local data = cc.stream:pop()
if data then if data then
@ -264,7 +265,7 @@ function multi:newSystemThreadedConsole(name)
end) end)
end end
else else
cc.stream = sThread.waitFor("__SYSTEM_CONSLOE__"):init() cc.stream = sThread.waitFor("__SYSTEM_CONSOLE__"):init()
end end
function cc:write(msg) function cc:write(msg)
self.stream:push({"w",tostring(msg)}) self.stream:push({"w",tostring(msg)})

View File

@ -1,16 +1,40 @@
package.path="?/init.lua;?.lua;"..package.path package.path="?/init.lua;?.lua;"..package.path
local multi = require("multi") multi = require("multi")
test = multi:newHyperThreadedProcess("test") local GLOBAL,THREAD = require("multi.integration.lanesManager").init()
test:newLoop(function() nGLOBAL = require("multi.integration.networkManager").init()
print("HI!") local a
function multi:setName(name)
self.Name = name
end
local clock = os.clock
function sleep(n) -- seconds
local t0 = clock()
while clock() - t0 <= n do end
end
master = multi:newMaster{
name = "Main", -- the name of the master
--noBroadCast = true, -- if using the node manager, set this to true to avoid double connections
managerDetails = {"192.168.1.4",12345}, -- the details to connect to the node manager (ip,port)
}
master.OnError(function(name,err)
print(name.." has encountered an error: "..err)
end) end)
test:newLoop(function() master.OnNodeConnected(function(name)
print("HI2!") multi:newThread("Main Thread Data Sender",function()
while true do
thread.sleep(.5) thread.sleep(.5)
conn = master:execute("TASK_MAN",name, multi:getTasksDetails())
end
end,5)
end) end)
multi:newAlarm(3):OnRing(function() multi.OnError(function(...)
test:Sleep(10) print(...)
end) end)
test:Start() multi:mainloop{
multi:mainloop() protect = false
}
--~ print(multi.AlignTable{
--~ {"Name","Type","Number"},
--~ {"Test","This is a type","1.34"},
--~ {"Test Hello","This is another type","143.43"},
--~ })