Fixed tons of issues with async functions

This commit is contained in:
Ryan Ward 2020-02-14 23:31:46 -05:00
parent ebe87ed69a
commit 8f5973343d
7 changed files with 216 additions and 100 deletions

View File

@ -9,9 +9,9 @@ Full Update Showcase
--- ---
Something I plan on doing each version going forward Something I plan on doing each version going forward
```lua ```lua
package.path="?/init.lua;?.lua;"..package.path package.path="?.lua;?/init.lua;?.lua;"..package.path
multi,thread = require("multi"):init() local multi,thread = require("multi"):init()
local GLOBAL, THREAD = require("multi.integration.lanesManager"):init() GLOBAL,THREAD = require("multi.integration.lanesManager"):init()
t = THREAD:newFunction(function(...) t = THREAD:newFunction(function(...)
print("This is a system threaded function!",...) print("This is a system threaded function!",...)
THREAD.sleep(1) -- This is handled within a system thread! Note: this creates a system thread that runs then ends. C THREAD.sleep(1) -- This is handled within a system thread! Note: this creates a system thread that runs then ends. C
@ -79,17 +79,54 @@ multi:newThread(function()
end) end)
-- This waits for the returns since we are demanding them -- This waits for the returns since we are demanding them
end) end)
local test = multi:newSystemThreadedJobQueue(4) -- Load up a queue that has 4 running threads
func = test:newFunction("test",function(a) -- register a function on the queue that has an async function feature
test2() -- Call the other registered function on the queue
return a..a
end,true)
func2 = test:newFunction("test2",function(a)
print("ooo")
console = THREAD:getConsole()
console.print("Hello!",true)
end,true) -- When called internally on the job queue the function is a normal sync function and not an async function.
print(func("1"))
print(func("Hello"))
print(func("sigh"))
multi:mainloop() multi:mainloop()
``` ```
Changed: Changed:
--- ---
- thread:newFunction(func,holup) -- Added an argument holdme to always force the threaded funcion to wait. Meaning you don't need to tell it to func().wait() or func().connect() - thread:newFunction(func,holup) -- Added an argument holup to always force the threaded funcion to wait. Meaning you don't need to tell it to func().wait() or func().connect()
- multi:newConnection(protect,callback,kill) -- Added the kill argument. Makes connections work sort of like a stack. Pop off the connections as they get called. So a one time connection handler. - multi:newConnection(protect,callback,kill) -- Added the kill argument. Makes connections work sort of like a stack. Pop off the connections as they get called. So a one time connection handler.
- I'm not sure callback has been documented in any form. callback gets called each and everytime conn:Fire() gets called! As well as being triggered for each connfunc that is part of the connection. - I'm not sure callback has been documented in any form. callback gets called each and everytime conn:Fire() gets called! As well as being triggered for each connfunc that is part of the connection.
- modified the lanes manager to create globals GLOBAL and THREAD when a thread is started. This way you are now able to more closely mirror code between lanes and love. As of right now parity between both enviroments is now really good. Upvalues being copied by default in lanes is something that I will not try and mirror in love. It's better to pass what you need as arguments, this way you can keep things consistant. looping thorugh upvalues and sterlizing them and sending them are very complex and slow opperations. - modified the lanes manager to create globals GLOBAL and THREAD when a thread is started. This way you are now able to more closely mirror code between lanes and love. As of right now parity between both enviroments is now really good. Upvalues being copied by default in lanes is something that I will not try and mirror in love. It's better to pass what you need as arguments, this way you can keep things consistant. looping thorugh upvalues and sterlizing them and sending them are very complex and slow opperations.
Added: Added:
--- ---
- jq = multi:newSystemThreadedJobQueue(n)
- jq:newFunction([name optional],func,holup) -- Provides a newFunction like syntax. If name is left blank a unique one is assigned. The second return after the function is the name of the function.
- console = THREAD:getConsole() -- Now working on lanes and love2d, allows you to print from multiple threads while keeping the console from writing over each other
- console.print(...)
- console.write(str)
```lua
package.path="?.lua;?/init.lua;?.lua;"..package.path
local multi,thread = require("multi"):init()
GLOBAL,THREAD = require("multi.integration.lanesManager"):init()
local test = multi:newSystemThreadedJobQueue(4) -- Load up a queue that has 4 running threads
func = test:newFunction("test",function(a) -- register a function on the queue that has an async function feature
test2() -- Call the other registered function on the queue
return a..a
end,true)
func2 = test:newFunction("test2",function(a)
print("ooo")
console = THREAD:getConsole()
console.print("Hello!",true)
end,true) -- When called internally on the job queue the function is a normal sync function and not an async function.
print(func("1"))
print(func("Hello"))
print(func("sigh"))
```
- THREAD:newFunction(func,holup) -- A system threaded based variant to thread:newFunction(func,holup) works the same way. Though this should only be used for intensive functions! Calling a STfunction has a decent amount of overhead, use wisely. System threaded jobqueue may be a better choice depending on what you are planning on doing. - THREAD:newFunction(func,holup) -- A system threaded based variant to thread:newFunction(func,holup) works the same way. Though this should only be used for intensive functions! Calling a STfunction has a decent amount of overhead, use wisely. System threaded jobqueue may be a better choice depending on what you are planning on doing.
- multi:loveloop() -- Handles the run function for love2d as well as run the multi mainloop. - multi:loveloop() -- Handles the run function for love2d as well as run the multi mainloop.
- multi.OnLoad(func) -- A special connection that allows you to connect to the an event that triggers when the multi engine starts! This is slightly different from multi.PreLoad(func) Which connects before any variables have been set up in the multi table, before any settings are cemented into the core. In most cases they will operate exactly the same. This is a feature that was created with module creators in mind. This way they can have code be loaded and managed before the main loop starts. - multi.OnLoad(func) -- A special connection that allows you to connect to the an event that triggers when the multi engine starts! This is slightly different from multi.PreLoad(func) Which connects before any variables have been set up in the multi table, before any settings are cemented into the core. In most cases they will operate exactly the same. This is a feature that was created with module creators in mind. This way they can have code be loaded and managed before the main loop starts.
@ -158,8 +195,14 @@ Removed:
Fixed: Fixed:
--- ---
- love2d had an issue where threads crashing would break the mainloop - Issue where async functions connect wasn't properly triggering when a function returned
- fixed bugs within the extensions.lua file for love threading - Issue where async functions were not passing arguments properly.
- Issue where async functions were not handling errors properly
- nil, err = func().wait() -- When waiting
- func().connect(function(err) end) -- When connection
- Love2d had an issue where threads crashing would break the mainloop
- Issue where systemthreadedjobqueues pushJob() was not returning the jobID of the job that was pushed!
- Fixed bugs within the extensions.lua file for love threading
- Modified the thread.* methods to perform better (Tables were being created each time one of these methods were called! Which in turn slowed things down. One table is modified to get things working properly) - Modified the thread.* methods to perform better (Tables were being created each time one of these methods were called! Which in turn slowed things down. One table is modified to get things working properly)
- thread.sleep() - thread.sleep()
- thread.hold() - thread.hold()

View File

@ -1475,7 +1475,6 @@ function multi.holdFor(n,func)
end) end)
end end
function thread:newFunction(func,holdme) function thread:newFunction(func,holdme)
local done = false
return function(...) return function(...)
local rets, err local rets, err
local function wait(no) local function wait(no)
@ -1488,9 +1487,12 @@ function thread:newFunction(func,holdme)
end end
end) end)
else else
while not rets do while not rets and not err do
multi.scheduler:Act() multi.scheduler:Act()
end end
if err then
return nil,err
end
return unpack(rets) return unpack(rets)
end end
end end
@ -1504,11 +1506,11 @@ function thread:newFunction(func,holdme)
isTFunc = true, isTFunc = true,
wait = wait, wait = wait,
connect = function(f) connect = function(f)
t.OnDeath(function(self,status,...) if done == false then f(...) done = true end end) t.OnDeath(function(self,status,...) f(...) end)
t.OnError(function(self,err) if done == false then f(self,err) done = true end end) t.OnError(function(self,err) f(err) end)
end end
} }
return temp,temp,temp,temp,temp,temp,temp return temp
end end
end end
function thread.run(func) function thread.run(func)
@ -1557,41 +1559,41 @@ function multi:newThread(name,func,...)
if type(name) == "function" then if type(name) == "function" then
name = "Thread#"..threadCount name = "Thread#"..threadCount
end end
local env = {} -- local env = {}
setmetatable(env,{ -- setmetatable(env,{
__index = Gref, -- __index = Gref,
__newindex = function(t,k,v) -- __newindex = function(t,k,v)
if type(v)=="function" then -- if type(v)=="function" then
rawset(t,k,thread:newFunction(v)) -- rawset(t,k,thread:newFunction(v))
else -- else
if type(v)=="table" then -- if type(v)=="table" then
if v.isTFunc then -- if v.isTFunc then
if not _G["_stack_"] or #_G["_stack_"]==0 then -- if not _G["_stack_"] or #_G["_stack_"]==0 then
_G["_stack_"] = {} -- _G["_stack_"] = {}
local s = _G["_stack_"] -- local s = _G["_stack_"]
local a,b,c,d,e,f,g = v.wait(true) -- local a,b,c,d,e,f,g = v.wait(true)
table.insert(s,a) -- table.insert(s,a)
table.insert(s,b) -- table.insert(s,b)
table.insert(s,c) -- table.insert(s,c)
table.insert(s,d) -- table.insert(s,d)
table.insert(s,e) -- table.insert(s,e)
table.insert(s,f) -- table.insert(s,f)
local x = table.remove(_G["_stack_"]) -- local x = table.remove(_G["_stack_"])
rawset(t,k,x) -- rawset(t,k,x)
else -- else
local x = table.remove(_G["_stack_"]) -- local x = table.remove(_G["_stack_"])
rawset(t,k,x) -- rawset(t,k,x)
end -- end
else -- else
Gref[k]=v -- Gref[k]=v
end -- end
else -- else
Gref[k]=v -- Gref[k]=v
end -- end
end -- end
end -- end
}) -- })
setfenv(func,env) -- setfenv(func,env)
local c={} local c={}
c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil} c.TempRets = {nil,nil,nil,nil,nil,nil,nil,nil,nil,nil}
c.startArgs = {...} c.startArgs = {...}
@ -1676,8 +1678,13 @@ function multi.initThreads(justThreads)
local r1,r2,r3,r4,r5,r6 local r1,r2,r3,r4,r5,r6
local ret,_ local ret,_
local function CheckRets(i) local function CheckRets(i)
if ret~=nil then if ret~=nil and not(threads[i].isError) then
if not threads[i] then return end if not threads[i] then return end
if not _ then
threads[i].isError = true
threads[i].TempRets[1] = ret
return
end
threads[i].TempRets[1] = ret threads[i].TempRets[1] = ret
threads[i].TempRets[2] = r1 threads[i].TempRets[2] = r1
threads[i].TempRets[3] = r2 threads[i].TempRets[3] = r2
@ -1740,24 +1747,23 @@ function multi.initThreads(justThreads)
end end
multi.scheduler:OnLoop(function(self) multi.scheduler:OnLoop(function(self)
for i=#threads,1,-1 do for i=#threads,1,-1 do
if not threads[i].__started then if threads[i].isError then
threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets))
table.remove(threads,i)
end
if threads[i] and not threads[i].__started then
if coroutine.running() ~= threads[i].thread then if coroutine.running() ~= threads[i].thread then
_,ret,r1,r2,r3,r4,r5,r6=coroutine.resume(threads[i].thread,unpack(threads[i].startArgs)) _,ret,r1,r2,r3,r4,r5,r6=coroutine.resume(threads[i].thread,unpack(threads[i].startArgs))
CheckRets(i)
end end
threads[i].__started = true threads[i].__started = true
helper(i) helper(i)
end end
if not _ then if threads[i] and not _ then
threads[i].OnError:Fire(threads[i],ret) threads[i].OnError:Fire(threads[i],unpack(threads[i].TempRets))
threads[i].isError = true
end end
if threads[i] and coroutine.status(threads[i].thread)=="dead" then if threads[i] and coroutine.status(threads[i].thread)=="dead" then
local tr = threads[i].TempRets threads[i].OnDeath:Fire(threads[i],"ended",unpack(threads[i].TempRets or {}))
if ret == nil then
threads[i].OnDeath:Fire(threads[i],"ended",tr[1],tr[2],tr[3],tr[4],tr[5],tr[6],tr[7])
else
threads[i].OnDeath:Fire(threads[i],"ended",ret,r1,r2,r3,r4,r5,r6)
end
table.remove(threads,i) table.remove(threads,i)
elseif threads[i] and threads[i].task == "skip" then elseif threads[i] and threads[i].task == "skip" then
threads[i].pos = threads[i].pos + 1 threads[i].pos = threads[i].pos + 1
@ -1995,6 +2001,21 @@ function multi:newHyperThreadedProcess(name)
return c return c
end end
-- Multi runners -- Multi runners
function multi:lightloop()
if not isRunning then
local Loop=self.Mainloop
while true do
for _D=#Loop,1,-1 do
if Loop[_D].Active then
self.CID=_D
if not protect then
Loop[_D]:Act()
end
end
end
end
end
end
function multi:mainloop(settings) function multi:mainloop(settings)
multi.OnPreLoad:Fire() multi.OnPreLoad:Fire()
multi.defaultSettings = settings or multi.defaultSettings multi.defaultSettings = settings or multi.defaultSettings

View File

@ -82,6 +82,32 @@ function multi:newSystemThreadedJobQueue(n)
jid = jid + 1 jid = jid + 1
return jid-1 return jid-1
end end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Remove()
end
end)
return thread.hold(function()
if rets then
return unpack(rets)
end
end)
end,holup),name
end
multi:newThread("JobQueueManager",function() multi:newThread("JobQueueManager",function()
while true do while true do
local job = thread.hold(function() local job = thread.hold(function()

View File

@ -43,6 +43,13 @@ end
-- Step 2 set up the Linda objects -- Step 2 set up the Linda objects
local __GlobalLinda = lanes.linda() -- handles global stuff local __GlobalLinda = lanes.linda() -- handles global stuff
local __SleepingLinda = lanes.linda() -- handles sleeping stuff local __SleepingLinda = lanes.linda() -- handles sleeping stuff
local __ConsoleLinda = lanes.linda() -- handles console stuff
multi:newLoop(function()
local _,data = __ConsoleLinda:receive(0, "Q")
if data then
print(unpack(data))
end
end)
local GLOBAL,THREAD = require("multi.integration.lanesManager.threads").init(__GlobalLinda,__SleepingLinda) local GLOBAL,THREAD = require("multi.integration.lanesManager.threads").init(__GlobalLinda,__SleepingLinda)
local threads = {} local threads = {}
local count = 1 local count = 1
@ -79,7 +86,8 @@ function multi:newSystemThread(name, func, ...)
THREAD_NAME=name, THREAD_NAME=name,
THREAD_ID=count, THREAD_ID=count,
THREAD = THREAD, THREAD = THREAD,
GLOBAL = GLOBAL GLOBAL = GLOBAL,
_Console = __ConsoleLinda
},priority=c.priority}, func)(unpack(args)) },priority=c.priority}, func)(unpack(args))
count = count + 1 count = count + 1
function c:kill() function c:kill()

View File

@ -54,6 +54,18 @@ local function INIT(__GlobalLinda,__SleepingLinda)
function THREAD.getCores() function THREAD.getCores()
return THREAD.__CORES return THREAD.__CORES
end end
function THREAD.getConsole()
local c = {}
c.queue = _Console
function c.print(...)
c.queue:send("Q", {...})
end
function c.error(err)
c.queue:push{"ERROR in <"..__THREADNAME__..">: "..err,__THREADID__}
error(err)
end
return c
end
function THREAD.getThreads() function THREAD.getThreads()
return GLOBAL.__THREADS__ return GLOBAL.__THREADS__
end end

View File

@ -99,6 +99,32 @@ function multi:newSystemThreadedJobQueue(n)
self.id = self.id + 1 self.id = self.id + 1
self.queue:push{name,self.id,...} self.queue:push{name,self.id,...}
return self.id return self.id
end
local nFunc = 0
function c:newFunction(name,func,holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Remove()
end
end)
return thread.hold(function()
if rets then
return unpack(rets)
end
end)
end,holup),name
end end
multi:newThread("jobManager",function() multi:newThread("jobManager",function()
while true do while true do

View File

@ -1,44 +1,24 @@
package.path="?.lua;?/init.lua;?.lua;"..package.path package.path="?.lua;?/init.lua;?.lua;"..package.path
local multi, thread = require("multi"):init() local multi, thread = require("multi"):init()
GLOBAL,THREAD = require("multi.integration.lanesManager"):init() func = thread:newFunction(function()
local test = multi:newSystemThreadedJobQueue(4) thread.sleep(math.random(1,3))
local nFunc = 0 local t = math.random(1,10)
function test:newFunction(name,func,holup) -- This registers with the queue return t
if type(name)=="function" then
holup = func
func = name
name = "JQFunction_"..nFunc
end
local ref = self
nFunc = nFunc + 1
ref:registerFunction(name,func)
return thread:newFunction(function(...)
local id = ref:pushJob(name,...)
local link
local rets
link = ref.OnJobCompleted(function(jid,...)
if id==jid then
rets = {...}
link:Remove()
end
end) end)
return thread.hold(function() func().connect(function(a)
if rets then print(a)
return unpack(rets)
end
end) end)
end,holup) func().connect(function(a)
end print(a)
func = test:newFunction("test",function(a) end)
test2() func().connect(function(a)
return a..a print(a)
end,true) end)
func2 = test:newFunction("test2",function(a) func().connect(function(a)
print("ooo") print(a)
end,true) end)
print(func("1")) func().connect(function(a)
print(func("Hello")) print(a)
print(func("sigh")) end)
print(#test.OnJobCompleted.connections) --os.exit()
os.exit()
multi:mainloop() multi:mainloop()