Ryan Ward fff3601041
Working on 16.0.0 (#53)
* Fixed spelling, started ideaing for 16.0.0

* Updated files

* Updated readme

* Updated version

* Concat conns now properly transfer events

* Testing types

* Connections can be % with functions

* Updated connections

* Fixed issue with double thread activations (Looking for another solution)

* Working on issue with love threaded functions not waiting when in a thread

* Working on issue where threads created in threads don't work

* Fixed broken threads for love

* Fixed some issues with threads

* removed test

* Updated changes.md

* Plan on testing parity between the threading modules

* Writing tests for system threading

* Added test cases for threading, fixed issues. Todo test love2d

* Fixed love2d to succeed with tests

* All tests working

* Updated files for testing

* Modified tests to make it more seamless

* removed extra __cores in lanes/pseudo

* Working on new priority scheme

* Working on priority management

* Working on custom prioritySchemes

* Fixed issues with missing code

* Threaded processors

* THREAD.exposeENV(), thread:newProcessor()

* Typo in changes.md

* Fixed typo in pseudoManager

* fixing

* Trying to fix exposeENV with pseudoThreading

* Changes to threads

* updated changes.md

* Working on systemthreadedprocess, and experimental newProxy for threading

* newProxy and STP work

* newProxy implemented

* Proxies work with connections now :D

* Added tstep to STP, updated changes.md

* thread.hold(proxy.conn)

* Clean up connection events when holding, working on scheduling tasks/threads to system threaded processors

* Getting loads of processors implemented

* Finished getLoad(type)

* Fixed some bugs

* Added an easy way to share a table, found some limitations with lanes threading.

* THREAD_NAME set for main thread, connections break the rules for proxies

* Testing

* Really close to portable proxies, currently extreamly unstable!

* Debugging what is going on...

* Fixed critical issue with coroutine based threads

* Removed extra bloat, proxies are portable now!

* Started work on the debugManager

* Testing actions, fixing bugs with lanes

* Testing...

* fixing actions

* typo fixed

* Throw an error when things break

* fixing stuff

* Fixed issue with errors not going through

* Removed system threaded connections, soon to be replaced by proxies

* Testing love2d tests

* Test love2d

* Use later love-build

* Use ubuntu for build

* Fixed path

* Use appimage

* use sudo

* No window for love2d

* Fixed love2d tests

* Testing love2d

* Use workspace

* Moved other tests while testing

* actually pull the repo

* packagepath set

* Fixed pull

* Update multi

* Removed link

* Edited symlink

* Added timeout to build

* Rewriting loveManager, too much outdated code

* Still implementing new love2d threading code

* Rewriting love2d threading binding

* Working on adding a Hold method to all objects. Will document how they all work when done.

* jobqueues having isues with stp

* new pack/unpack for tables, current issue is things being turned into strings

* Fixed packing of values into threads, need to fix system proxies and system threaded processors

* testing...

* Not hard crashing when error is encountered

* Should now push non 0 exit codes

* Push error when an error happens

* Closer to getting things working...

* Working on new type system, planning out debugmanager

* Fixed error code issue

* Test for 5.1

* Planning out debugManager

* Some work on the debug manager, proxies working on lanes, todo get pseudo manager and love2d working

* Working on getting pseudoThreading tests to work

* Added function / connection

* Added boost method

* Document new features to conns, todo fix newTask

* Fixed newTask()

* Updated changes.md and fixed some bugs

* Added thread.defer(func)

* Fixed tests on lanes and pseudo threading, todo fix love2d threading

* Fixed paths

* Working on paths

* Testing paths

* Add test for rockspec

* Fixed issues

* Fixed typo

* Added test for defer

* Threading working in love2d

* Fixed, conf

* lanes uses a threaded function like waitfor function

* Cleaned up changes.md

* added priorityManager to rockspec
2024-02-25 00:00:51 -05:00

393 lines
10 KiB
Lua

--[[
MIT License
Copyright (c) 2022 Ryan Ward
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
local multi, thread = require("multi"):init()
if not (GLOBAL and THREAD) then
GLOBAL, THREAD = multi.integration.GLOBAL, multi.integration.THREAD
else
lanes = require("lanes")
end
function multi:newSystemThreadedQueue(name)
local name = name or multi.randomString(16)
local c = {}
c.Name = name
c.linda = lanes.linda()
c.Type = multi.registerType("s_queue")
function c:push(v)
self.linda:send("Q", v)
end
function c:pop()
return ({self.linda:receive(0, "Q")})[2]
end
function c:peek()
return self.linda:get("Q")
end
function c:init()
return self
end
if multi.isMainThread then
multi.integration.GLOBAL[name] = c
else
GLOBAL[name] = c
end
function c:Hold(opt)
local multi, thread = require("multi"):init()
if opt.peek then
return thread.hold(function()
return self:peek()
end)
else
return thread.hold(function()
return self:pop()
end)
end
end
self:create(c)
return c
end
function multi:newSystemThreadedTable(name)
local name = name or multi.randomString(16)
local c = {}
c.link = lanes.linda()
c.Name = name
c.Type = multi.registerType("s_table")
function c:init()
return self
end
setmetatable(c,{
__index = function(t,k)
return c.link:get(k)
end,
__newindex = function(t,k,v)
c.link:set(k, v)
end
})
if multi.isMainThread then
multi.integration.GLOBAL[name] = c
else
GLOBAL[name] = c
end
function c:Hold(opt)
local multi, thread = require("multi"):init()
if opt.key then
return thread.hold(function()
return self.tab[opt.key]
end)
else
multi.error("Must provide a key to check opt.key = 'key'")
end
end
self:create(c)
return c
end
function multi:newSystemThreadedJobQueue(n)
local c = {}
c.cores = n or THREAD.getCores()*2
c.Type = multi.registerType("s_jobqueue")
c.OnJobCompleted = multi:newConnection()
local funcs = multi:newSystemThreadedTable()
local queueJob = multi:newSystemThreadedQueue()
local queueReturn = multi:newSystemThreadedQueue()
local doAll = multi:newSystemThreadedQueue()
local ID=1
local jid = 1
function c:isEmpty()
return queueJob:peek()==nil
end
function c:doToAll(func,...)
for i=1,c.cores do
doAll:push{ID,func,...}
end
ID = ID + 1
return self
end
function c:registerFunction(name,func)
funcs[name]=func
return self
end
function c:pushJob(name,...)
queueJob:push{name,jid,multi.pack(...)}
jid = jid + 1
return jid-1
end
local nFunc = 0
function c:newFunction(name, func, holup) -- This registers with the queue
if type(name)=="function" then
holup = func
func = name
name = "JQ_Function_"..nFunc
end
nFunc = nFunc + 1
c:registerFunction(name,func)
return thread:newFunction(function(...)
local id = c:pushJob(name,...)
local link
local rets
link = c.OnJobCompleted(function(jid,...)
if id==jid then
rets = multi.pack(...)
end
end)
return thread.hold(function()
if rets then
if #rets == 0 then
return multi.NIL
else
return multi.unpack(rets)
end
end
end)
end, holup), name
end
thread:newThread("JobQueueManager",function()
while true do
local job = thread.hold(function()
return queueReturn:pop()
end)
if job then
local id = table.remove(job,1)
c.OnJobCompleted:Fire(id,multi.unpack(job))
end
end
end)
for i=1,c.cores do
multi:newSystemThread("STJQ_"..multi.randomString(8),function(queue)
local multi, thread = require("multi"):init()
local idle = os.clock()
local clock = os.clock
local ref = 0
_G["__QR"] = queueReturn
setmetatable(_G,{__index = funcs})
thread:newThread("JobHandler",function()
while true do
local dat = thread.hold(function()
return queueJob:pop()
end)
idle = clock()
thread:newThread("JobQueue-Spawn",function()
local name = table.remove(dat, 1)
local jid = table.remove(dat, 1)
local args = table.remove(dat, 1)
queueReturn:push{jid, funcs[name](args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]), queue}
end)
end
end)
thread:newThread("DoAllHandler",function()
while true do
local dat = thread.hold(function()
return doAll:peek()
end)
if dat then
if dat[1]>ref then
ref = table.remove(dat, 1)
func = table.remove(dat, 1)
idle = clock()
func(unpack(dat))
doAll:pop()
end
end
end
end)
thread:newThread("IdleHandler",function()
while true do
thread.hold(function()
return clock()-idle>3
end)
THREAD.sleep(.01)
end
end)
multi:mainloop()
end,i)
end
function c:Hold(opt)
return thread.hold(self.OnJobCompleted)
end
self:create(c)
return c
end
function multi:newSystemThreadedConnection(name)
local name = name or multi.randomString(16)
local c = {}
c.Type = multi.registerType("s_connection")
c.CONN = 0x00
c.TRIG = 0x01
c.PING = 0x02
c.PONG = 0x03
local function remove(a, b)
local ai = {}
local r = {}
for k,v in pairs(a) do ai[v]=true end
for k,v in pairs(b) do
if ai[v]==nil then table.insert(r,a[k]) end
end
return r
end
c.CID = THREAD_ID
c.subscribe = multi:newSystemThreadedQueue("SUB_STC_"..self.Name):init()
c.Name = name
c.links = {} -- All triggers sent from main connection. When a connection is triggered on another thread, they speak to the main then send stuff out.
-- Locals will only live in the thread that creates the original object
local ping
local pong = function(link, links)
local res = thread.hold(function()
return link:peek()[1] == c.PONG
end,{sleep=3})
if not res then
for i=1,#links do
if links[i] == link then
table.remove(links,i,link)
break
end
end
else
link:pop()
end
end
ping = thread:newFunction(function(self)
ping:Pause()
multi.ForEach(self.links, function(link) -- Sync new connections
link:push{self.PING}
multi:newThread("pong Thread", pong, link, self.links)
end)
thread.sleep(3)
ping:Resume()
end,false)
local function fire(...)
for _, link in pairs(c.links) do
link:push {c.TRIG, multi.pack(...)}
end
end
thread:newThread("STC_SUB_MAN"..name,function()
local item
local sub_func = function() -- This will keep things held up until there is something to process
return c.subscribe:pop()
end
while true do
thread.yield()
-- We need to check on broken connections
ping(c) -- Should return instantlly and process this in another thread
item = thread.hold(sub_func)
if item[1] == c.CONN then
multi.ForEach(c.links, function(link) -- Sync new connections
item[2]:push{c.CONN, link}
end)
c.links[#c.links+1] = item[2]
elseif item[1] == c.TRIG then
fire(multi.unpack(item[2]))
c.proxy_conn:Fire(multi.unpack(item[2]))
end
end
end)
--- ^^^ This will only exist in the init thread
function c:Fire(...)
local args = multi.pack(...)
if self.CID == THREAD_ID then -- Host Call
for _, link in pairs(self.links) do
link:push {self.TRIG, args}
end
self.proxy_conn:Fire(...)
else
self.subscribe:push {self.TRIG, args}
end
end
function c:init()
local multi, thread = require("multi"):init()
self.links = {}
self.proxy_conn = multi:newConnection()
local mt = getmetatable(self.proxy_conn)
local tempMT = {}
for i,v in pairs(mt) do
tempMT[i] = v
end
tempMT.__index = self.proxy_conn
tempMT.__call = function(t,func) self.proxy_conn(func) end
setmetatable(self, tempMT)
if self.CID == THREAD_ID then return self end
thread:newThread("STC_CONN_MAN"..name,function()
local item
local link_self_ref = multi:newSystemThreadedQueue()
self.subscribe:push{self.CONN, link_self_ref}
while true do
item = thread.hold(function()
return link_self_ref:peek()
end)
if item[1] == self.PING then
link_self_ref:push{self.PONG}
link_self_ref:pop()
elseif item[1] == self.CONN then
if item[2].Name ~= link_self_ref.Name then
table.insert(self.links, item[2])
end
link_self_ref:pop()
elseif item[1] == self.TRIG then
self.proxy_conn:Fire(multi.unpack(item[2]))
link_self_ref:pop()
else
-- This shouldn't be the case
end
end
end)
return self
end
if multi.isMainThread then
multi.integration.GLOBAL[name] = c
else
GLOBAL[name] = c
end
self:create(c)
return c
end
require("multi.integration.sharedExtensions")