From 1b0c968bf580aedcb49d89a79bdbe605677b3776 Mon Sep 17 00:00:00 2001 From: mihacooper Date: Sun, 17 Apr 2022 20:46:25 +0400 Subject: [PATCH] Interruption points in sync operations (get, wait, pop, ...) (#118) Interruption points in sync operations (get, wait, pop, ...) --- .github/workflows/win_ci.yml | 2 +- README.md | 19 +++++++ src/cpp/channel.cpp | 30 +++++++++--- src/cpp/channel.h | 4 +- src/cpp/lua-helpers.cpp | 17 +++++++ src/cpp/lua-helpers.h | 11 +++++ src/cpp/lua-module.cpp | 36 +++++++------- src/cpp/notifier.h | 38 +++++++++++--- src/cpp/shared-table.cpp | 2 +- src/cpp/this_thread.h | 25 ++++++++++ src/cpp/threading.cpp | 90 ++++++++++++++++++++++++---------- src/cpp/threading.h | 19 ++++--- tests/lua/run_tests | 1 + tests/lua/thread-interrupt.lua | 74 ++++++++++++++++++++++++++++ tests/lua/thread.lua | 28 ----------- 15 files changed, 300 insertions(+), 96 deletions(-) create mode 100644 src/cpp/this_thread.h create mode 100644 tests/lua/thread-interrupt.lua diff --git a/.github/workflows/win_ci.yml b/.github/workflows/win_ci.yml index 23dd25c..800c9fb 100644 --- a/.github/workflows/win_ci.yml +++ b/.github/workflows/win_ci.yml @@ -13,7 +13,7 @@ jobs: matrix: build-type: [Release] # Debug lua: ["lua 5.1", "lua 5.2", "lua 5.3", "luajit 2.0"] - os: ["windows-2016"] + os: ["windows-2019"] platform: [ {"forLua": "vs_32", "forCMake": "Win32"}, {"forLua": "vs_64", "forCMake": "x64"}, diff --git a/README.md b/README.md index 1dc8967..fedfb09 100644 --- a/README.md +++ b/README.md @@ -244,6 +244,24 @@ List of available time intervals: - `m` - minutes; - `h` - hours. +All blocking operations (even in non blocking mode) are interruption points. Thread hanged in such operation can be interrupted by invoking [thread:cancel()](#threadcanceltime-metric) method. +
+ Example +

+ +```lua +local effil = require "effil" + +local worker = effil.thread(function() + effil.sleep(999) -- worker will hang for 999 seconds +end)() + +worker:cancel(1) -- returns true, cause blocking operation was interrupted and thread was canceled +``` +

+
+ + ## Function's upvalues Working with functions Effil serializes and deserializes them using [`lua_dump`](#https://www.lua.org/manual/5.3/manual.html#lua_dump) and [`lua_load`](#https://www.lua.org/manual/5.3/manual.html#lua_load) methods. All function's upvalues are stored following the same [rules](#important-notes) as usual. If function has **upvalue of unsupported type** this function cannot be transmitted to Effil. You will get error in that case. @@ -528,3 +546,4 @@ effil.type(effil.channel()) == "effil.channel" effil.type({}) == "table" effil.type(1) == "number" ``` + diff --git a/src/cpp/channel.cpp b/src/cpp/channel.cpp index 9e35960..ce23742 100644 --- a/src/cpp/channel.cpp +++ b/src/cpp/channel.cpp @@ -52,14 +52,25 @@ bool Channel::push(const sol::variadic_args& args) { StoredArray Channel::pop(const sol::optional& duration, const sol::optional& period) { + this_thread::interruptionPoint(); std::unique_lock lock(ctx_->lock_); - while (ctx_->channel_.empty()) { - if (duration) { - if (ctx_->cv_.wait_for(lock, fromLuaTime(duration.value(), period)) == std::cv_status::timeout) - return StoredArray(); - } - else { // No time limit - ctx_->cv_.wait(lock); + { + this_thread::ScopedSetInterruptable interruptable(this); + + Timer timer(duration ? fromLuaTime(duration.value(), period) : + std::chrono::milliseconds()); + while (ctx_->channel_.empty()) { + if (duration) { + if (timer.isFinished() || + ctx_->cv_.wait_for(lock, timer.left()) == + std::cv_status::timeout) { + return StoredArray(); + } + } + else { // No time limit + ctx_->cv_.wait(lock); + } + this_thread::interruptionPoint(); } } @@ -78,4 +89,9 @@ size_t Channel::size() { return ctx_->channel_.size(); } +void Channel::interrupt() +{ + ctx_->cv_.notify_all(); +} + } // namespace effil diff --git a/src/cpp/channel.h b/src/cpp/channel.h index 27c2901..18c6893 100644 --- a/src/cpp/channel.h +++ b/src/cpp/channel.h @@ -17,7 +17,7 @@ public: std::queue channel_; }; -class Channel : public GCObject { +class Channel : public GCObject, public IInterruptable { public: static void exportAPI(sol::state_view& lua); @@ -27,6 +27,8 @@ public: size_t size(); + void interrupt() final; + private: Channel() = default; void initialize(const sol::stack_object& capacity); diff --git a/src/cpp/lua-helpers.cpp b/src/cpp/lua-helpers.cpp index f1fc19a..65687f4 100644 --- a/src/cpp/lua-helpers.cpp +++ b/src/cpp/lua-helpers.cpp @@ -61,4 +61,21 @@ std::chrono::milliseconds fromLuaTime(int duration, const sol::optional diff ? duration_cast((timeout_ - diff)): + milliseconds(0); +} + + } // namespace effil diff --git a/src/cpp/lua-helpers.h b/src/cpp/lua-helpers.h index dd4a2fa..5b213a2 100644 --- a/src/cpp/lua-helpers.h +++ b/src/cpp/lua-helpers.h @@ -34,6 +34,17 @@ std::string luaTypename(const SolObject& obj) { typedef std::vector StoredArray; +class Timer { +public: + Timer(const std::chrono::milliseconds& timeout); + bool isFinished(); + std::chrono::milliseconds left(); + +private: + std::chrono::milliseconds timeout_; + std::chrono::high_resolution_clock::time_point startTime_; +}; + } // namespace effil namespace sol { diff --git a/src/cpp/lua-module.cpp b/src/cpp/lua-module.cpp index eabdddc..a8fcf9d 100644 --- a/src/cpp/lua-module.cpp +++ b/src/cpp/lua-module.cpp @@ -96,24 +96,24 @@ int luaopen_effil(lua_State* L) { }; sol::usertype type("new", sol::no_constructor, - "thread", createThreadRunner, - "thread_id", threadId, - "sleep", sleep, - "yield", yield, - "table", createTable, - "rawset", SharedTable::luaRawSet, - "rawget", SharedTable::luaRawGet, - "setmetatable", SharedTable::luaSetMetatable, - "getmetatable", SharedTable::luaGetMetatable, - "channel", createChannel, - "type", getLuaTypename, - "pairs", SharedTable::globalLuaPairs, - "ipairs", SharedTable::globalLuaIPairs, - "next", SharedTable::globalLuaNext, - "size", luaSize, - "dump", luaDump, - "hardware_threads", std::thread::hardware_concurrency, - sol::meta_function::index, luaIndex + "thread", createThreadRunner, + "thread_id", this_thread::threadId, + "sleep", this_thread::sleep, + "yield", this_thread::yield, + "table", createTable, + "rawset", SharedTable::luaRawSet, + "rawget", SharedTable::luaRawGet, + "setmetatable", SharedTable::luaSetMetatable, + "getmetatable", SharedTable::luaGetMetatable, + "channel", createChannel, + "type", getLuaTypename, + "pairs", SharedTable::globalLuaPairs, + "ipairs", SharedTable::globalLuaIPairs, + "next", SharedTable::globalLuaNext, + "size", luaSize, + "dump", luaDump, + "hardware_threads", std::thread::hardware_concurrency, + sol::meta_function::index, luaIndex ); sol::stack::push(lua, type); diff --git a/src/cpp/notifier.h b/src/cpp/notifier.h index a02db77..2fa768b 100644 --- a/src/cpp/notifier.h +++ b/src/cpp/notifier.h @@ -1,15 +1,22 @@ #pragma once +#include +#include + #include #include namespace effil { -class Notifier { +struct IInterruptable { + virtual void interrupt() = 0; +}; + +class Notifier : public IInterruptable { public: - Notifier() - : notified_(false) { - } + typedef std::function InterruptChecker; + + Notifier() : notified_(false) {} void notify() { std::unique_lock lock(mutex_); @@ -17,19 +24,38 @@ public: cv_.notify_all(); } + void interrupt() final { + cv_.notify_all(); + } + void wait() { + this_thread::interruptionPoint(); + + this_thread::ScopedSetInterruptable interruptable(this); + std::unique_lock lock(mutex_); - while (!notified_) + while (!notified_) { cv_.wait(lock); + this_thread::interruptionPoint(); + } } template bool waitFor(T period) { + this_thread::interruptionPoint(); + if (period == std::chrono::seconds(0) || notified_) return notified_; + this_thread::ScopedSetInterruptable interruptable(this); + + Timer timer(period); std::unique_lock lock(mutex_); - while (cv_.wait_for(lock, period) != std::cv_status::timeout && !notified_); + while (!timer.isFinished() && + cv_.wait_for(lock, timer.left()) != std::cv_status::timeout && + !notified_) { + this_thread::interruptionPoint(); + } return notified_; } diff --git a/src/cpp/shared-table.cpp b/src/cpp/shared-table.cpp index dd33d63..25ad39b 100644 --- a/src/cpp/shared-table.cpp +++ b/src/cpp/shared-table.cpp @@ -64,7 +64,7 @@ void SharedTable::set(StoredObject&& key, StoredObject&& value) { sol::object SharedTable::get(const StoredObject& key, sol::this_state state) const { SharedLock g(ctx_->lock); - auto val = ctx_->entries.find(key); + const auto val = ctx_->entries.find(key); if (val == ctx_->entries.end()) { return sol::nil; } else { diff --git a/src/cpp/this_thread.h b/src/cpp/this_thread.h new file mode 100644 index 0000000..0a80101 --- /dev/null +++ b/src/cpp/this_thread.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace effil { + +struct IInterruptable; + +namespace this_thread { + +class ScopedSetInterruptable +{ +public: + ScopedSetInterruptable(IInterruptable* notifier); + ~ScopedSetInterruptable(); +}; +void interruptionPoint(); + +// Lua API +std::string threadId(); +void yield(); +void sleep(const sol::stack_object& duration, const sol::stack_object& metric); + +} // namespace this_thread +} // namespace effil diff --git a/src/cpp/threading.cpp b/src/cpp/threading.cpp index 253d97b..fb01764 100644 --- a/src/cpp/threading.cpp +++ b/src/cpp/threading.cpp @@ -81,10 +81,70 @@ void luaHook(lua_State*, lua_Debug*) { } // namespace +namespace this_thread { + +ScopedSetInterruptable::ScopedSetInterruptable(IInterruptable* notifier) { + if (thisThreadHandle) { + thisThreadHandle->setNotifier(notifier); + } +} + +ScopedSetInterruptable::~ScopedSetInterruptable() { + if (thisThreadHandle) { + thisThreadHandle->setNotifier(nullptr); + } +} + +void interruptionPoint() { + if (thisThreadHandle && thisThreadHandle->command() == Command::Cancel) + { + thisThreadHandle->changeStatus(Status::Canceled); + throw LuaHookStopException(); + } +} + +std::string threadId() { + std::stringstream ss; + ss << std::this_thread::get_id(); + return ss.str(); +} + +void yield() { + luaHook(nullptr, nullptr); + std::this_thread::yield(); +} + +void sleep(const sol::stack_object& duration, const sol::stack_object& metric) { + if (duration.valid()) { + REQUIRE(duration.get_type() == sol::type::number) + << "bad argument #1 to 'effil.sleep' (number expected, got " + << luaTypename(duration) << ")"; + + if (metric.valid()) + { + REQUIRE(metric.get_type() == sol::type::string) + << "bad argument #2 to 'effil.sleep' (string expected, got " + << luaTypename(metric) << ")"; + } + try { + Notifier notifier; + notifier.waitFor(fromLuaTime(duration.as(), + metric.as>())); + } RETHROW_WITH_PREFIX("effil.sleep"); + } + else { + yield(); + } +} + +} // namespace this_thread + ThreadHandle::ThreadHandle() : status_(Status::Running) , command_(Command::Run) - , lua_(std::make_unique()) { + , currNotifier_(nullptr) + , lua_(std::make_unique()) +{ luaL_openlibs(*lua_); } @@ -165,32 +225,6 @@ void Thread::runThread(Thread thread, } } -std::string threadId() { - std::stringstream ss; - ss << std::this_thread::get_id(); - return ss.str(); -} - -void yield() { - if (thisThreadHandle) - luaHook(nullptr, nullptr); - std::this_thread::yield(); -} - -void sleep(const sol::stack_object& duration, const sol::stack_object& metric) { - if (duration.valid()) { - REQUIRE(duration.get_type() == sol::type::number) << "bad argument #1 to 'effil.sleep' (number expected, got " << luaTypename(duration) << ")"; - if (metric.valid()) - REQUIRE(metric.get_type() == sol::type::string) << "bad argument #2 to 'effil.sleep' (string expected, got " << luaTypename(metric) << ")"; - try { - std::this_thread::sleep_for(fromLuaTime(duration.as(), metric.valid() ? metric.as() : sol::optional())); - } RETHROW_WITH_PREFIX("effil.sleep"); - } - else { - yield(); - } -} - void Thread::initialize( const std::string& path, const std::string& cpath, @@ -198,6 +232,7 @@ void Thread::initialize( const sol::function& function, const sol::variadic_args& variadicArgs) { + sol::optional functionObj; try { functionObj = GC::instance().create(function); @@ -282,6 +317,7 @@ bool Thread::cancel(const sol::this_state&, const sol::optional& duration, const sol::optional& period) { ctx_->putCommand(Command::Cancel); + ctx_->interrupt(); Status status = ctx_->waitForStatusChange(toOptionalTime(duration, period)); return isFinishStatus(status); } diff --git a/src/cpp/threading.h b/src/cpp/threading.h index b7c0b92..fa2431d 100644 --- a/src/cpp/threading.h +++ b/src/cpp/threading.h @@ -8,11 +8,6 @@ namespace effil { -// Lua this thread API -std::string threadId(); -void yield(); -void sleep(const sol::stack_object& duration, const sol::stack_object& metric); - class ThreadHandle : public GCData { public: enum class Status { @@ -71,10 +66,20 @@ public: void destroyLua() { lua_.reset(); } - Status status() { return status_; } + Status status() const { return status_; } StoredArray& result() { return result_; } + void setNotifier(IInterruptable* notifier) { + currNotifier_ = notifier; + } + + void interrupt() const { + IInterruptable* currNotifier = currNotifier_; + if (currNotifier) + currNotifier->interrupt(); + } + private: Status status_; Command command_; @@ -83,7 +88,7 @@ private: Notifier completionNotifier_; std::mutex stateLock_; StoredArray result_; - + IInterruptable* currNotifier_; std::unique_ptr lua_; }; diff --git a/tests/lua/run_tests b/tests/lua/run_tests index db0ad42..f58c9e9 100755 --- a/tests/lua/run_tests +++ b/tests/lua/run_tests @@ -16,6 +16,7 @@ require "type" require "gc" require "channel" require "thread" +require "thread-interrupt" require "shared-table" require "metatable" require "type_mismatch" diff --git a/tests/lua/thread-interrupt.lua b/tests/lua/thread-interrupt.lua new file mode 100644 index 0000000..e310682 --- /dev/null +++ b/tests/lua/thread-interrupt.lua @@ -0,0 +1,74 @@ +require "bootstrap-tests" + +local effil = effil + +test.thread_interrupt.tear_down = default_tear_down + +local function interruption_test(worker) + local state = effil.table { stop = false } + + local ctx = effil.thread(worker) + ctx.step = 0 + local thr = ctx(state) + + effil.sleep(500, 'ms') -- let thread starts + + local start_time = os.time() + thr:cancel(1) + + test.equal(thr:status(), "canceled") + test.almost_equal(os.time(), start_time, 1) + state.stop = true +end + +local get_thread_for_test = function(state) + local runner = effil.thread(function() + while not state.stop do end + end) + runner.step = 0 + return runner() +end + +test.thread_interrupt.thread_wait = function() + interruption_test(function(state) + get_thread_for_test(state):wait() + end) +end + +test.thread_interrupt.thread_get = function() + interruption_test(function(state) + get_thread_for_test(state):get() + end) +end + +test.thread_interrupt.thread_cancel = function() + interruption_test(function(state) + get_thread_for_test(state):cancel() + end) +end + +test.thread_interrupt.thread_pause = function() + interruption_test(function(state) + get_thread_for_test(state):pause() + end) +end + +test.thread_interrupt.channel_pop = function() + interruption_test(function() + effil.channel():pop() + end) +end + +test.thread_interrupt.sleep = function() + interruption_test(function() + effil.sleep(20) + end) +end + +test.thread_interrupt.yield = function() + interruption_test(function() + while true do + effil.yield() + end + end) +end diff --git a/tests/lua/thread.lua b/tests/lua/thread.lua index 53aa7be..d8b378a 100644 --- a/tests/lua/thread.lua +++ b/tests/lua/thread.lua @@ -244,14 +244,6 @@ test.thread.returns = function () test.equal(returns[5](11, 89), 100) end -test.thread.timed_cancel = function () - local thread = effil.thread(function() - effil.sleep(4) - end)() - test.is_false(thread:cancel(100, "ms")) - thread:wait() -end - test.thread_with_table.tear_down = default_tear_down test.thread_with_table.types = function () @@ -322,26 +314,6 @@ test.this_thread.functions = function () test.not_equal(share["child.id"], effil.thread_id()) end -test.this_thread.cancel_with_yield = function () - local share = effil.table() - local spec = effil.thread(function (share) - effil.sleep(1) - for i=1,10000 do - -- Just waiting - end - share.done = true - effil.yield() - share.afet_yield = true - end) - spec.step = 0 - local thr = spec(share) - - test.is_true(thr:cancel()) - test.equal(thr:status(), "canceled") - test.is_true(share.done) - test.is_nil(share.afet_yield) -end - test.this_thread.pause_with_yield = function () local share = effil.table({stop = false}) local spec = effil.thread(function (share)