working on 1.16

This commit is contained in:
Ryan Ward 2021-12-18 12:42:14 -05:00
parent 57563688ae
commit cc20914391
7 changed files with 307 additions and 74 deletions

22
client.lua Normal file
View File

@ -0,0 +1,22 @@
package.path = "?/init.lua;?.lua;"..package.path
local multi, thread = require("multi"):init()
net = require("lnet.tcp")
client = net.newTCPClient("localhost",12345)
multi:newThread(function()
while true do
thread.sleep(1)
client:send(multi:getTasksDetails())
end
end)
multi:newThread(function()
while true do
thread.sleep(.01)
multi:newLoop()
end
end)
multi:mainloop()

View File

@ -50,6 +50,7 @@ multi.time = os.time
multi.LinkedPath = multi
multi.lastTime = clock()
multi.TIMEOUT = "TIMEOUT"
multi.TID = 0
multi.Priority_Core = 1
multi.Priority_Very_High = 4
@ -92,8 +93,9 @@ end
--Processor
local priorityTable = {[0]="Round-Robin",[1]="Balanced",[2]="Top-Down",[3]="Timed-Based-Balancer"}
local ProcessName = {[true]="SubProcessor",[false]="MainProcessor"}
local globalThreads = {}
function multi:getTasksDetails(t)
if t == "string" or not t then
if not(t) then
str = {
{"Type <Identifier>","Uptime","Priority","TID"}
}
@ -126,19 +128,37 @@ function multi:getTasksDetails(t)
end
end
local load, steps = self:getLoad()
if thread.__threads then
for i=1,#thread.__threads do
dat = dat .. "<THREAD: "..thread.__threads[i].Name.." | "..os.clock()-thread.__threads[i].creationTime..">\n"
end
return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: "..#thread.__threads.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s.."\n\n"..dat..dat2
local thread_count = 0
local process_count = 0
if globalThreads then
local th_tab = {
{"Thread Name","Uptime","TID","Attached To"}
}
local proc_tab = {
{"Process Name", "Uptime", "PID", "Load", "Cycles per Second per task"}
}
for th,process in pairs(globalThreads) do
if tostring(th.isProcessThread) == "destroyed" then
globalThreads[th] = nil
elseif th.isProcessThread then
local load, steps = process:getLoad()
process_count = process_count + 1
table.insert(proc_tab,{th.Name,os.clock()-th.creationTime,(th.PID or "-1"),load,steps})
else
return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..s..dat2
thread_count = thread_count + 1
table.insert(th_tab,{th.Name,os.clock()-th.creationTime,(th.TID or "-1"),process.Name})
end
elseif t == "t" or t == "table" then
end
dat = multi.AlignTable(proc_tab).. "\n"
dat = dat .. "\n" .. multi.AlignTable(th_tab)
return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: "..thread_count.."\nSystemThreads Running: "..#(multi.SystemThreads or {}).."\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat..dat2.."\n\n"..s
else
return "Load on "..ProcessName[self.Type=="process"].."<"..(self.Name or "Unnamed")..">"..": "..multi.Round(load,2).."%\nCycles Per Second Per Task: "..steps.."\n\nMemory Usage: "..math.ceil(collectgarbage("count")).." KB\nProcesses Running: "..process_count.."\nThreads Running: 0\nPriority Scheme: "..priorityTable[multi.defaultSettings.priority or 0].."\n\n"..dat2.."\n\n"..s
end
else
local load,steps = self:getLoad()
str = {
ProcessName = (self.Name or "Unnamed"),
ThreadCount = #thread.__threads,
MemoryUsage = math.ceil(collectgarbage("count")),
PriorityScheme = priorityTable[multi.defaultSettings.priority or 0],
SystemLoad = multi.Round(load,2),
@ -148,6 +168,7 @@ function multi:getTasksDetails(t)
str.Tasks = {}
str.PausedTasks = {}
str.Threads = {}
str.Processes = {}
str.Systemthreads = {}
for i,v in pairs(self.Mainloop) do
table.insert(str.Tasks,{Link = v, Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = v.TID})
@ -155,9 +176,18 @@ function multi:getTasksDetails(t)
for v,i in pairs(multi.PausedObjects) do
table.insert(str.Tasks,{Link = v, Type=v.Type,Name=v.Name,Uptime=os.clock()-v.creationTime,Priority=self.PriorityResolve[v.Priority],TID = v.TID})
end
for i=1,#thread.__threads do
table.insert(str.Threads,{Uptime = os.clock()-thread.__threads[i].creationTime,Name = thread.__threads[i].Name,Link = thread.__threads[i],TID = thread.__threads[i].TID})
for th,process in pairs(globalThreads) do
if tostring(th.isProcessThread) == "destroyed" then
globalThreads[th] = nil
elseif th.isProcessThread then
local load, steps = process:getLoad()
table.insert(str.Processes,{Uptime = os.clock()-th.creationTime, Name = th.Name, Link = th, TID = th.TID,Load = load,Steps = steps})
else
table.insert(str.Threads,{Uptime = os.clock()-th.creationTime,Name = th.Name,Link = th,TID = th.TID,Attached_to = process})
end
end
str.ThreadCount = #str.Threads
str.ProcessCount = #str.Processes
if multi.SystemThreads then
for i=1,#multi.SystemThreads do
table.insert(str.Systemthreads,{Uptime = os.clock()-multi.SystemThreads[i].creationTime,Name = multi.SystemThreads[i].Name,Link = multi.SystemThreads[i],TID = multi.SystemThreads[i].count})
@ -293,6 +323,17 @@ function multi:Destroy()
self.OnObjectDestroyed:Fire(c[i])
c[i]:Destroy()
end
local new = {}
for th,proc in pairs(globalThreads) do
if proc == self then
th:Destroy()
table.remove(globalThreads,th)
else
new[th]=proc
end
end
globalThreads = new
multi.setType(self,multi.DestroyedObj)
else
for i=1,#self.Parent.Mainloop do
if self.Parent.Mainloop[i]==self then
@ -806,11 +847,12 @@ function multi:newTLoop(func,set)
end
function c:Act()
if self.timer:Get()>=self.set then
print("Acting...")
self.life=self.life+1
self.timer:Reset()
for i=1,#self.func do
self.func[i](self,self.life)
end
self.timer:Reset()
end
end
function c:Resume()
@ -939,11 +981,10 @@ function multi.getCurrentProcess()
return __CurrentProcess
end
local globalThreads = {}
local sandcount = 0
local sandcount = 1
function multi:newProcessor(name)
local c = {}
print("Proc Created:",sandcount)
setmetatable(c,{__index = self})
local multi,thread = require("multi"):init() -- We need to capture the t in thread
local name = name or "Processor_"..sandcount
@ -951,7 +992,7 @@ function multi:newProcessor(name)
c.Mainloop = {}
c.Type = "process"
c.Active = false
c.Name = "multi.process<".. (name or "") .. ">"
c.Name = name or ""
c.process = self:newThread(c.Name,function()
while true do
thread.hold(function()
@ -962,6 +1003,8 @@ function multi:newProcessor(name)
__CurrentProcess = self
end
end)
c.process.isProcessThread = true
c.process.PID = sandcount
c.OnError = c.process.OnError
function c.Start()
c.Active = true
@ -971,6 +1014,9 @@ function multi:newProcessor(name)
c.Active = false
return self
end
function c:Destroy()
self.OnObjectDestroyed:Fire(c)
end
c:attachScheduler()
c.initThreads()
return c
@ -995,7 +1041,7 @@ function thread.getRunningThread()
local t = coroutine.running()
if t then
for i,v in pairs(threads) do
if t==v.thread then
if t==i.thread then
return v
end
end
@ -1300,6 +1346,7 @@ function multi:attachScheduler()
return self
end
c.Destroy = c.Kill
c.kill = c.Kill
function c.ref:send(name,val)
ret=coroutine.yield({Name=name,Value=val})
end
@ -1327,7 +1374,7 @@ function multi:attachScheduler()
self.Globals=v
end
table.insert(threads,c)
table.insert(globalThreads,c)
globalThreads[c] = self
if initT==false then
self.initThreads()
end
@ -1519,7 +1566,6 @@ function multi:attachScheduler()
elseif threads[i] and threads[i].task == "holdW" then
if clock() - threads[i].intervalR>=threads[i].interval then
threads[i].pos = threads[i].pos + 1
print(threads[i].pos,threads[i].count)
t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10,t11,t12,t13,t14,t15 = threads[i].func()
if t0 then
threads[i].task = ""
@ -2155,11 +2201,12 @@ end
local busy = false
local lastVal = 0
local last_step = 0
local bb = 0
function multi:getLoad()
if not multi.maxSpd then self:enableLoadDetection() end
if busy then return lastVal end
if busy then return lastVal,last_step end
local val = nil
if thread.isThread() then
local bench
@ -2189,7 +2236,8 @@ function multi:getLoad()
if val<0 then val = 0 end
if val > 100 then val = 100 end
lastVal = val
return val,bb*100
last_step = bb*100
return val,last_step
end
function multi:setPriority(s)

View File

@ -55,19 +55,84 @@ local threads = {}
local count = 1
local started = false
local livingThreads = {}
function THREAD:newFunction(func,holup)
return function(...)
local t = multi:newSystemThread("SystemThreadedFunction",function(...)
return func(...)
end,...)
return thread:newFunction(function()
thread.hold(function() return t.thread end)
return thread.hold(function()
return t.thread:join(.001)
function THREAD:newFunction(func,holdme)
local tfunc = {}
tfunc.Active = true
function tfunc:Pause()
self.Active = false
end
function tfunc:Resume()
self.Active = true
end
function tfunc:holdMe(b)
holdme = b
end
local function noWait()
return nil, "Function is paused"
end
local rets, err
local function wait(no)
if thread.isThread() and not (no) then
return multi.hold(function()
if err then
return nil, err
elseif rets then
return unpack(rets)
end
end)
end,holup)()
else
while not rets and not err do
multi.scheduler:Act()
end
if err then
return nil,err
end
return unpack(rets)
end
end
tfunc.__call = function(t,...)
if not t.Active then
if holdme then
return nil, "Function is paused"
end
return {
isTFunc = true,
wait = noWait,
connect = function(f)
f(nil,"Function is paused")
end
}
end
local t = multi:newSystemThread("SystemThreadedFunction",func,...)
t.OnDeath(function(self,...) rets = {...} end)
t.OnError(function(self,e) err = e end)
if holdme then
return wait()
end
local temp = {
OnStatus = multi:newConnection(),
OnError = multi:newConnection(),
OnReturn = multi:newConnection(),
isTFunc = true,
wait = wait,
connect = function(f)
local tempConn = multi:newConnection()
t.OnDeath(function(self,...) if f then f(...) else tempConn:Fire(...) end end)
t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end)
return tempConn
end
}
t.OnDeath(function(self,...) temp.OnReturn:Fire(...) end)
t.OnError(function(self,err) temp.OnError:Fire(err) end)
t.linkedFunction = temp
t.statusconnector = temp.OnStatus
return temp
end
setmetatable(tfunc,tfunc)
return tfunc
end
function multi:newSystemThread(name, func, ...)
multi.InitSystemThreadErrorHandler()
rand = math.random(1, 10000000)
@ -104,48 +169,66 @@ function multi:newSystemThread(name, func, ...)
self.alive = false
end
table.insert(multi.SystemThreads, c)
c.OnDeath = multi:newConnection()
c.OnError = multi:newConnection()
GLOBAL["__THREADS__"] = livingThreads
return c
end
multi.OnSystemThreadDied = multi:newConnection()
local function detectLuaError(str)
return type(str)=="string" and str:match("%.lua:%d*:")
end
local function tableLen(tab)
local len = 0
for i,v in pairs(tab) do
len = len + 1
end
return len
end
function multi.InitSystemThreadErrorHandler()
if started == true then
return
end
started = true
multi:newThread(
"ThreadErrorHandler",
function()
multi:newThread("ThreadErrorHandler",function()
local threads = multi.SystemThreads
while true do
thread.sleep(.5) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough.
thread.sleep(.1) -- switching states often takes a huge hit on performance. half a second to tell me there is an error is good enough.
for i = #threads, 1, -1 do
local v, err, t = threads[i].thread:join(.001)
if err then
if err:find("Thread was killed!") then
print(err)
local _,data = pcall(function()
return {threads[i].thread:join(1)}
end)
local v, err, t = data[1],data[2],data[3]
if detectLuaError(err) then
if err:find("Thread was killed!\1") then
livingThreads[threads[i].Id] = {false, threads[i].Name}
threads[i].alive = false
multi.OnSystemThreadDied:Fire(threads[i].Id)
threads[i].OnDeath:Fire(threads[i],nil,"Thread was killed!")
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
elseif err:find("stack traceback") then
print(err)
else
threads[i].OnError:Fire(threads[i], err, "Error in systemThread: '" .. threads[i].name .. "' <" .. err .. ">")
threads[i].alive = false
livingThreads[threads[i].Id] = {false, threads[i].Name}
multi.OnSystemThreadDied:Fire(threads[i].Id)
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
end
elseif tableLen(data)>0 then
livingThreads[threads[i].Id] = {false, threads[i].Name}
threads[i].alive = false
threads[i].OnDeath:Fire(threads[i],unpack(data))
GLOBAL["__THREADS__"] = livingThreads
table.remove(threads, i)
end
end
end
end).OnError(function(...)
print("Error!",...)
end)
end
end
)
end
print("Integrated Lanes!")
multi.print("Integrated Lanes!")
multi.integration = {} -- for module creators
multi.integration.GLOBAL = GLOBAL
multi.integration.THREAD = THREAD

View File

@ -82,7 +82,7 @@ local function INIT(__GlobalLinda,__SleepingLinda)
THREAD.__CORES = tonumber(io.popen("nproc --all"):read("*n"))
end
function THREAD.kill() -- trigger the lane destruction
error("Thread was killed!")
error("Thread was killed!\1")
end
function THREAD.getName()
return THREAD_NAME

View File

@ -49,19 +49,68 @@ local THREAD_ID = 1
local OBJECT_ID = 0
local stf = 0
function THREAD:newFunction(func,holup)
stf = stf + 1
return function(...)
local t = multi:newSystemThread("STF"..stf,func,...)
return thread:newFunction(function()
return thread.hold(function()
if t.stab["returns"] then
local dat = t.stab.returns
t.stab.returns = nil
return unpack(dat)
local tfunc = {}
tfunc.Active = true
function tfunc:Pause()
self.Active = false
end
end)
end,holup)()
function tfunc:Resume()
self.Active = true
end
function tfunc:holdMe(b)
holdme = b
end
local function noWait()
return nil, "Function is paused"
end
local rets, err
local function wait(no)
if thread.isThread() and not (no) then
-- In a thread
else
-- Not in a thread
end
end
tfunc.__call = function(t,...)
if not t.Active then
if holdme then
return nil, "Function is paused"
end
return {
isTFunc = true,
wait = noWait,
connect = function(f)
f(nil,"Function is paused")
end
}
end
local t = multi:newSystemThread("SystemThreadedFunction",func,...)
t.OnDeath(function(self,...) rets = {...} end)
t.OnError(function(self,e) err = e end)
if holdme then
return wait()
end
local temp = {
OnStatus = multi:newConnection(),
OnError = multi:newConnection(),
OnReturn = multi:newConnection(),
isTFunc = true,
wait = wait,
connect = function(f)
local tempConn = multi:newConnection()
t.OnDeath(function(self,...) if f then f(...) else tempConn:Fire(...) end end)
t.OnError(function(self,err) if f then f(nil,err) else tempConn:Fire(nil,err) end end)
return tempConn
end
}
t.OnDeath(function(self,...) temp.OnReturn:Fire(...) end)
t.OnError(function(self,err) temp.OnError:Fire(err) end)
t.linkedFunction = temp
t.statusconnector = temp.OnStatus
return temp
end
setmetatable(tfunc,tfunc)
return tfunc
end
function multi:newSystemThread(name,func,...)
local c = {}
@ -73,6 +122,26 @@ function multi:newSystemThread(name,func,...)
GLOBAL["__THREAD_"..c.ID] = {ID=c.ID,Name=c.name,Thread=c.thread}
GLOBAL["__THREAD_COUNT"] = THREAD_ID
THREAD_ID=THREAD_ID+1
multi:newThread(function()
while true do
thread.yield()
if c.stab["returns"] then
c.OnDeath:Fire(c,unpack(t.stab.returns))
t.stab.returns = nil
thread.kill()
end
local error = c.thread:getError()
if error then
if error:find("Thread Killed!\1") then
c.OnDeath:Fire(c,"Thread Killed!")
thread.kill()
else
c.OnError:Fire(c,error)
thread.kill()
end
end
end
end)
return c
end
function love.threaderror(thread, errorstr)

View File

@ -92,7 +92,7 @@ function threads.getCores()
return love.system.getProcessorCount()
end
function threads.kill()
error("Thread Killed!")
error("Thread Killed!\1")
end
function threads.getThreads()
local t = {}

11
server.lua Normal file
View File

@ -0,0 +1,11 @@
package.path = "?/init.lua;?.lua;"..package.path
local multi, thread = require("multi"):init()
net = require("lnet.tcp")
server = net.newTCPServer(12345)
server.OnDataRecieved(function(self,data)
print(data)
end)
multi:mainloop()