From ef93c6a2a8b25efff85b6d4eef48811f569375ee Mon Sep 17 00:00:00 2001 From: mihacooper Date: Mon, 5 Dec 2022 22:09:39 +0100 Subject: [PATCH] Rework thread cancellation, using regular exception (#177) BREAKS BACK COMPATIBILITY: - cancellation error can be caught by `pcall` - `canceled` thread status was renamed to `cancelled` --- README.md | 83 ++++++++- src/cpp/channel.cpp | 4 +- src/cpp/lua-module.cpp | 7 +- src/cpp/notifier.h | 10 +- src/cpp/stored-object.cpp | 4 +- src/cpp/this-thread.cpp | 83 +++++++++ src/cpp/{this_thread.h => this-thread.h} | 4 +- src/cpp/thread-handle.cpp | 82 +++++++++ src/cpp/{threading.h => thread-handle.h} | 63 +++---- .../{thread_runner.cpp => thread-runner.cpp} | 2 +- src/cpp/{thread_runner.h => thread-runner.h} | 2 +- src/cpp/{threading.cpp => thread.cpp} | 166 +++--------------- src/cpp/thread.h | 43 +++++ src/cpp/utils.h | 4 +- tests/lua/thread-interrupt.lua | 2 +- tests/lua/thread-stress.lua | 2 +- tests/lua/thread.lua | 143 ++++++++++++++- 17 files changed, 498 insertions(+), 206 deletions(-) create mode 100644 src/cpp/this-thread.cpp rename src/cpp/{this_thread.h => this-thread.h} (88%) create mode 100644 src/cpp/thread-handle.cpp rename src/cpp/{threading.h => thread-handle.h} (63%) rename src/cpp/{thread_runner.cpp => thread-runner.cpp} (96%) rename src/cpp/{thread_runner.h => thread-runner.h} (95%) rename src/cpp/{threading.cpp => thread.cpp} (58%) create mode 100644 src/cpp/thread.h diff --git a/README.md b/README.md index ae8b1c2..8dd6e8a 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Requires C++14 compiler compliance. Tested with GCC 4.9+, clang 3.8 and Visual S * [Important notes](#important-notes) * [Blocking and nonblocking operations](#blocking-and-nonblocking-operations) * [Function's upvalues](#functions-upvalues) +* [Thread cancellation and pausing](#thread-cancellation-and-pausing) * [API Reference](#api-reference) * [Thread](#thread) * [effil.thread()](#runner--effilthreadfunc) @@ -45,6 +46,7 @@ Requires C++14 compiler compliance. Tested with GCC 4.9+, clang 3.8 and Visual S * [effil.yield()](#effilyield) * [effil.sleep()](#effilsleeptime-metric) * [effil.hardware_threads()](#effilhardware_threads) + * [effil.pcall()](#status---effilpcallfunc) * [Table](#table) * [effil.table()](#table--effiltabletbl) * [__newindex: table[key] = value](#tablekey--value) @@ -71,6 +73,7 @@ Requires C++14 compiler compliance. Tested with GCC 4.9+, clang 3.8 and Visual S * [effil.size()](#size--effilsizeobj) * [effil.type()](#effiltype) + # How to install ### Build from src on Linux and Mac 1. `git clone --recursive https://github.com/effil/effil effil` @@ -256,7 +259,7 @@ local worker = effil.thread(function() effil.sleep(999) -- worker will hang for 999 seconds end)() -worker:cancel(1) -- returns true, cause blocking operation was interrupted and thread was canceled +worker:cancel(1) -- returns true, cause blocking operation was interrupted and thread was cancelled ```

@@ -269,10 +272,71 @@ Working with function Effil can store function environment (`_ENV`) as well. Con * *Lua = 5.1*: function environment is not stored at all (due to limitations of lua_setfenv we cannot use userdata) * *Lua > 5.1*: Effil serialize and store function environment only if it's not equal to global environment (`_ENV ~= _G`). +## Thread cancellation and pausing +The [`effil.thread`](#runner--effilthreadfunc) can be paused and cancelled using corresponding methods of thread object [`thread:cancel()`](#threadcanceltime-metric) and [`thread:pause()`](#threadpausetime-metric). +Thread that you try to interrupt can be interrupted in two execution points: explicit and implicit. + - Explicit points are [`effil.yield()`](#effilyield) +
+ Example of explicit interruption point +

+ + ```lua + local thread = effil.thread(function() + while true do + effil.yield() + end + -- will never reach this line + end)() + thread:cancel() + ``` + +

+
+ - Implicit points are lua debug hook invocation which is set using [lua_sethook](https://www.lua.org/manual/5.3/manual.html#lua_sethook) with LUA_MASKCOUNT. + Implicit points are optional and enabled only if [thread_runner.step](#runnerstep) > 0. +
+ Example of implicit interruption point +

+ + ```lua + local thread_runner = effil.thread(function() + while true do + end + -- will never reach this line + end) + thread_runner.step = 10 + thread = thread_runner() + thread:cancel() + ``` + +

+
+ - Additionally thread can be cancelled (but not paused) in any [blocking or non-blocking waiting operation](#blocking-and-nonblocking-operations). +
+ Example +

+ + ```lua + local channel = effil.channel() + local thread = effil.thread(function() + channel:pop() -- thread hangs waiting infinitely + -- will never reach this line + end)() + thread:cancel() + ``` + +

+
+ + **How does cancellation works?** + When you cancel thread it generates lua [`error`](https://lua.org.ru/manual_ru.html#pdf-error) with message `"Effil: thread is cancelled"` when it reaches any interruption point. It means that you can catch this error using [`pcall`](https://lua.org.ru/manual_ru.html#pdf-pcall) but thread will generate new error on next interruption point. + If you want to catch your own error but pass cancellation error you can use [effil.pcall()](#status---effilpcallfunc). + Status of cancelled thread will be equal to `cancelled` only if it finished with cancellation error. It means that if you catch cancellation error thread may finished with `completed` status or `failed` status if there will be some another error. + # API Reference ## Thread -`effil.thread` is the way to create a thread. Threads can be stopped, paused, resumed and canceled. +`effil.thread` is the way to create a thread. Threads can be stopped, paused, resumed and cancelled. All operation with threads can be synchronous (with optional timeout) or asynchronous. Each thread runs with its own lua state. @@ -309,7 +373,7 @@ Thread handle provides API for interaction with thread. Returns thread status. **output**: -- `status` - string values describes status of thread. Possible values are: `"running", "paused", "canceled", "completed" and "failed"`. +- `status` - string values describes status of thread. Possible values are: `"running", "paused", "cancelled", "completed" and "failed"`. - `err` - error message, if any. This value is specified only if thread status == `"failed"`. - `stacktrace` - stacktrace of failed thread. This value is specified only if thread status == `"failed"`. @@ -360,9 +424,20 @@ Suspend current thread. ### `effil.hardware_threads()` Returns the number of concurrent threads supported by implementation. -Basically forwards value from [std::thread::hardware_concurrency](https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency). +Basically forwards value from [std::thread::hardware_concurrency](https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency). **output**: number of concurrent hardware threads. +### `status, ... = effil.pcall(func, ...)` +Works exactly the same way as standard [pcall](https://www.lua.org/manual/5.3/manual.html#pdf-pcall) except that it will not catch thread cancellation error caused by [thread:cancel()](#threadcanceltime-metric) call. + +**input:** + - func - function to call + - ... - arguments to pass to functions + +**output:** + - status - `true` if no error occurred, `false` otherwise + - ... - in case of error return one additional result with message of error, otherwise return function call results + ## Table `effil.table` is a way to exchange data between effil threads. It behaves almost like standard lua tables. All operations with shared table are thread safe. **Shared table stores** primitive types (number, boolean, string), function, table, light userdata and effil based userdata. **Shared table doesn't store** lua threads (coroutines) or arbitrary userdata. See examples of shared table usage [here](#examples) diff --git a/src/cpp/channel.cpp b/src/cpp/channel.cpp index ce23742..781b6fb 100644 --- a/src/cpp/channel.cpp +++ b/src/cpp/channel.cpp @@ -52,7 +52,7 @@ bool Channel::push(const sol::variadic_args& args) { StoredArray Channel::pop(const sol::optional& duration, const sol::optional& period) { - this_thread::interruptionPoint(); + this_thread::cancellationPoint(); std::unique_lock lock(ctx_->lock_); { this_thread::ScopedSetInterruptable interruptable(this); @@ -70,7 +70,7 @@ StoredArray Channel::pop(const sol::optional& duration, else { // No time limit ctx_->cv_.wait(lock); } - this_thread::interruptionPoint(); + this_thread::cancellationPoint(); } } diff --git a/src/cpp/lua-module.cpp b/src/cpp/lua-module.cpp index a8fcf9d..2b3064f 100644 --- a/src/cpp/lua-module.cpp +++ b/src/cpp/lua-module.cpp @@ -1,8 +1,9 @@ -#include "threading.h" +#include "thread.h" +#include "this-thread.h" +#include "thread-runner.h" #include "shared-table.h" #include "garbage-collector.h" #include "channel.h" -#include "thread_runner.h" #include @@ -100,6 +101,7 @@ int luaopen_effil(lua_State* L) { "thread_id", this_thread::threadId, "sleep", this_thread::sleep, "yield", this_thread::yield, + "pcall", this_thread::pcall, "table", createTable, "rawset", SharedTable::luaRawSet, "rawget", SharedTable::luaRawGet, @@ -115,7 +117,6 @@ int luaopen_effil(lua_State* L) { "hardware_threads", std::thread::hardware_concurrency, sol::meta_function::index, luaIndex ); - sol::stack::push(lua, type); sol::stack::pop(lua); sol::stack::push(lua, EffilApiMarker()); diff --git a/src/cpp/notifier.h b/src/cpp/notifier.h index 2fa768b..046e5cb 100644 --- a/src/cpp/notifier.h +++ b/src/cpp/notifier.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -29,20 +29,20 @@ public: } void wait() { - this_thread::interruptionPoint(); + this_thread::cancellationPoint(); this_thread::ScopedSetInterruptable interruptable(this); std::unique_lock lock(mutex_); while (!notified_) { cv_.wait(lock); - this_thread::interruptionPoint(); + this_thread::cancellationPoint(); } } template bool waitFor(T period) { - this_thread::interruptionPoint(); + this_thread::cancellationPoint(); if (period == std::chrono::seconds(0) || notified_) return notified_; @@ -54,7 +54,7 @@ public: while (!timer.isFinished() && cv_.wait_for(lock, timer.left()) != std::cv_status::timeout && !notified_) { - this_thread::interruptionPoint(); + this_thread::cancellationPoint(); } return notified_; } diff --git a/src/cpp/stored-object.cpp b/src/cpp/stored-object.cpp index c8a4a0d..bd9c7a4 100644 --- a/src/cpp/stored-object.cpp +++ b/src/cpp/stored-object.cpp @@ -1,10 +1,10 @@ #include "stored-object.h" #include "channel.h" -#include "threading.h" +#include "thread.h" #include "shared-table.h" #include "function.h" #include "utils.h" -#include "thread_runner.h" +#include "thread-runner.h" #include #include diff --git a/src/cpp/this-thread.cpp b/src/cpp/this-thread.cpp new file mode 100644 index 0000000..17a16fe --- /dev/null +++ b/src/cpp/this-thread.cpp @@ -0,0 +1,83 @@ +#include "this-thread.h" + +#include "thread-handle.h" +#include "notifier.h" + +namespace effil { +namespace this_thread { + +ScopedSetInterruptable::ScopedSetInterruptable(IInterruptable* notifier) { + if (const auto thisThread = ThreadHandle::getThis()) { + thisThread->setNotifier(notifier); + } +} + +ScopedSetInterruptable::~ScopedSetInterruptable() { + if (const auto thisThread = ThreadHandle::getThis()) { + thisThread->setNotifier(nullptr); + } +} + +void cancellationPoint() { + const auto thisThread = ThreadHandle::getThis(); + if (thisThread && thisThread->command() == ThreadHandle::Command::Cancel) { + thisThread->changeStatus(ThreadHandle::Status::Cancelled); + throw ThreadCancelException(); + } +} + +std::string threadId() { + std::stringstream ss; + ss << std::this_thread::get_id(); + return ss.str(); +} + +void yield() { + if (const auto thisThread = ThreadHandle::getThis()) { + thisThread->performInterruptionPointThrow(); + } + std::this_thread::yield(); +} + +void sleep(const sol::stack_object& duration, const sol::stack_object& metric) { + if (duration.valid()) { + REQUIRE(duration.get_type() == sol::type::number) + << "bad argument #1 to 'effil.sleep' (number expected, got " + << luaTypename(duration) << ")"; + + if (metric.valid()) + { + REQUIRE(metric.get_type() == sol::type::string) + << "bad argument #2 to 'effil.sleep' (string expected, got " + << luaTypename(metric) << ")"; + } + try { + Notifier notifier; + notifier.waitFor(fromLuaTime(duration.as(), + metric.as>())); + } RETHROW_WITH_PREFIX("effil.sleep"); + } + else { + yield(); + } +} + +int pcall(lua_State* L) +{ + int status; + luaL_checkany(L, 1); + status = lua_pcall(L, lua_gettop(L) - 1, LUA_MULTRET, 0); + + const auto thisThread = ThreadHandle::getThis(); + if (thisThread && thisThread->command() == ThreadHandle::Command::Cancel) { + lua_pushstring(L, ThreadCancelException::message); + lua_error(L); + } + + lua_pushboolean(L, (status == 0)); + lua_insert(L, 1); + return lua_gettop(L); /* return status + all results */ +} + +} // namespace this_thread +} // namespace effil diff --git a/src/cpp/this_thread.h b/src/cpp/this-thread.h similarity index 88% rename from src/cpp/this_thread.h rename to src/cpp/this-thread.h index 0a80101..9718716 100644 --- a/src/cpp/this_thread.h +++ b/src/cpp/this-thread.h @@ -14,12 +14,12 @@ public: ScopedSetInterruptable(IInterruptable* notifier); ~ScopedSetInterruptable(); }; -void interruptionPoint(); -// Lua API +void cancellationPoint(); std::string threadId(); void yield(); void sleep(const sol::stack_object& duration, const sol::stack_object& metric); +int pcall(lua_State* L); } // namespace this_thread } // namespace effil diff --git a/src/cpp/thread-handle.cpp b/src/cpp/thread-handle.cpp new file mode 100644 index 0000000..3faccc9 --- /dev/null +++ b/src/cpp/thread-handle.cpp @@ -0,0 +1,82 @@ +#include "thread-handle.h" + +namespace effil { + +// Thread specific pointer to current thread +static thread_local ThreadHandle* thisThreadHandle = nullptr; +static const sol::optional NO_TIMEOUT; + +ThreadHandle::ThreadHandle() + : status_(Status::Running) + , command_(Command::Run) + , currNotifier_(nullptr) + , lua_(std::make_unique()) +{ + luaL_openlibs(*lua_); +} + +void ThreadHandle::putCommand(Command cmd) { + std::unique_lock lock(stateLock_); + if (isFinishStatus(status_) || command() == Command::Cancel) + return; + + command_ = cmd; + statusNotifier_.reset(); + commandNotifier_.notify(); +} + +void ThreadHandle::changeStatus(Status stat) { + std::unique_lock lock(stateLock_); + status_ = stat; + commandNotifier_.reset(); + statusNotifier_.notify(); + if (isFinishStatus(stat)) + completionNotifier_.notify(); +} + +void ThreadHandle::performInterruptionPointImpl(const std::function& cancelClbk) { + switch (command()) { + case Command::Run: + break; + case Command::Cancel: + cancelClbk(); + break; + case Command::Pause: { + changeStatus(Status::Paused); + Command cmd; + do { + cmd = waitForCommandChange(NO_TIMEOUT); + } while(cmd != Command::Run && cmd != Command::Cancel); + if (cmd == Command::Run) { + changeStatus(Status::Running); + } else { + cancelClbk(); + } + break; + } + } +} + +void ThreadHandle::performInterruptionPoint(lua_State* L) { + performInterruptionPointImpl([L](){ + lua_pushstring(L, ThreadCancelException::message); + lua_error(L); + }); +} + +void ThreadHandle::performInterruptionPointThrow() { + performInterruptionPointImpl([](){ + throw ThreadCancelException(); + }); +} + +ThreadHandle* ThreadHandle::getThis() { + return thisThreadHandle; +} + +void ThreadHandle::setThis(ThreadHandle* handle) { + assert(handle); + thisThreadHandle = handle; +} + +} // namespace effil diff --git a/src/cpp/threading.h b/src/cpp/thread-handle.h similarity index 63% rename from src/cpp/threading.h rename to src/cpp/thread-handle.h index fa2431d..9afd7b0 100644 --- a/src/cpp/threading.h +++ b/src/cpp/thread-handle.h @@ -1,19 +1,31 @@ #pragma once #include "lua-helpers.h" -#include "function.h" #include "notifier.h" +#include "gc-data.h" #include namespace effil { +class ThreadCancelException : public std::runtime_error +{ +public: + static constexpr auto message = "Effil: thread is cancelled"; + + ThreadCancelException() + : std::runtime_error(message) + {} +}; + +class Thread; + class ThreadHandle : public GCData { public: enum class Status { Running, Paused, - Canceled, + Cancelled, Completed, Failed }; @@ -29,6 +41,14 @@ public: Command command() const { return command_; } void putCommand(Command cmd); void changeStatus(Status stat); + void performInterruptionPoint(lua_State* L); + void performInterruptionPointThrow(); + + static ThreadHandle* getThis(); + + static bool isFinishStatus(Status stat) { + return stat == Status::Cancelled || stat == Status::Completed || stat == Status::Failed; + } template Status waitForStatusChange(const sol::optional& time) { @@ -90,38 +110,11 @@ private: StoredArray result_; IInterruptable* currNotifier_; std::unique_ptr lua_; + + void performInterruptionPointImpl(const std::function& cancelClbk); + + static void setThis(ThreadHandle* handle); + friend class Thread; }; -class Thread : public GCObject { -public: - static void exportAPI(sol::state_view& lua); - - StoredArray status(const sol::this_state& state); - StoredArray wait(const sol::this_state& state, - const sol::optional& duration, - const sol::optional& period); - StoredArray get(const sol::optional& duration, - const sol::optional& period); - bool cancel(const sol::this_state& state, - const sol::optional& duration, - const sol::optional& period); - bool pause(const sol::this_state&, - const sol::optional& duration, - const sol::optional& period); - void resume(); - -private: - Thread() = default; - void initialize( - const std::string& path, - const std::string& cpath, - int step, - const sol::function& function, - const sol::variadic_args& args); - friend class GC; - -private: - static void runThread(Thread, Function, effil::StoredArray); -}; - -} // effil +} // namespace effil diff --git a/src/cpp/thread_runner.cpp b/src/cpp/thread-runner.cpp similarity index 96% rename from src/cpp/thread_runner.cpp rename to src/cpp/thread-runner.cpp index bf97911..9128aa6 100644 --- a/src/cpp/thread_runner.cpp +++ b/src/cpp/thread-runner.cpp @@ -1,4 +1,4 @@ -#include "thread_runner.h" +#include "thread-runner.h" namespace effil { diff --git a/src/cpp/thread_runner.h b/src/cpp/thread-runner.h similarity index 95% rename from src/cpp/thread_runner.h rename to src/cpp/thread-runner.h index 3be75d0..efeac04 100644 --- a/src/cpp/thread_runner.h +++ b/src/cpp/thread-runner.h @@ -1,4 +1,4 @@ -#include "threading.h" +#include "thread.h" #include "gc-data.h" #include "gc-object.h" diff --git a/src/cpp/threading.cpp b/src/cpp/thread.cpp similarity index 58% rename from src/cpp/threading.cpp rename to src/cpp/thread.cpp index fb01764..3b1ca4c 100644 --- a/src/cpp/threading.cpp +++ b/src/cpp/thread.cpp @@ -1,5 +1,6 @@ -#include "threading.h" +#include "thread.h" +#include "thread-handle.h" #include "stored-object.h" #include "notifier.h" #include "spin-mutex.h" @@ -15,27 +16,14 @@ using Command = ThreadHandle::Command; namespace { -const sol::optional NO_TIMEOUT; - -// Thread specific pointer to current thread -static thread_local ThreadHandle* thisThreadHandle = nullptr; - -// Doesn't inherit std::exception -// to prevent from catching this exception third party lua C++ libs -class LuaHookStopException {}; - -bool isFinishStatus(Status stat) { - return stat == Status::Canceled || stat == Status::Completed || stat == Status::Failed; -} - std::string statusToString(Status status) { switch (status) { case Status::Running: return "running"; case Status::Paused: return "paused"; - case Status::Canceled: - return "canceled"; + case Status::Cancelled: + return "cancelled"; case Status::Completed: return "completed"; case Status::Failed: @@ -48,131 +36,26 @@ std::string statusToString(Status status) { int luaErrorHandler(lua_State* state) { luaL_traceback(state, state, nullptr, 1); const auto stacktrace = sol::stack::pop(state); - thisThreadHandle->result().emplace_back(createStoredObject(stacktrace)); + ThreadHandle::getThis()->result().emplace_back(createStoredObject(stacktrace)); return 1; } const lua_CFunction luaErrorHandlerPtr = luaErrorHandler; -void luaHook(lua_State*, lua_Debug*) { - assert(thisThreadHandle); - switch (thisThreadHandle->command()) { - case Command::Run: - break; - case Command::Cancel: - thisThreadHandle->changeStatus(Status::Canceled); - throw LuaHookStopException(); - case Command::Pause: { - thisThreadHandle->changeStatus(Status::Paused); - Command cmd; - do { - cmd = thisThreadHandle->waitForCommandChange(NO_TIMEOUT); - } while(cmd != Command::Run && cmd != Command::Cancel); - if (cmd == Command::Run) { - thisThreadHandle->changeStatus(Status::Running); - } else { - thisThreadHandle->changeStatus(Status::Canceled); - throw LuaHookStopException(); - } - break; - } +void luaHook(lua_State* L, lua_Debug*) { + if (const auto thisThread = ThreadHandle::getThis()) { + thisThread->performInterruptionPoint(L); } } } // namespace -namespace this_thread { - -ScopedSetInterruptable::ScopedSetInterruptable(IInterruptable* notifier) { - if (thisThreadHandle) { - thisThreadHandle->setNotifier(notifier); - } -} - -ScopedSetInterruptable::~ScopedSetInterruptable() { - if (thisThreadHandle) { - thisThreadHandle->setNotifier(nullptr); - } -} - -void interruptionPoint() { - if (thisThreadHandle && thisThreadHandle->command() == Command::Cancel) - { - thisThreadHandle->changeStatus(Status::Canceled); - throw LuaHookStopException(); - } -} - -std::string threadId() { - std::stringstream ss; - ss << std::this_thread::get_id(); - return ss.str(); -} - -void yield() { - luaHook(nullptr, nullptr); - std::this_thread::yield(); -} - -void sleep(const sol::stack_object& duration, const sol::stack_object& metric) { - if (duration.valid()) { - REQUIRE(duration.get_type() == sol::type::number) - << "bad argument #1 to 'effil.sleep' (number expected, got " - << luaTypename(duration) << ")"; - - if (metric.valid()) - { - REQUIRE(metric.get_type() == sol::type::string) - << "bad argument #2 to 'effil.sleep' (string expected, got " - << luaTypename(metric) << ")"; - } - try { - Notifier notifier; - notifier.waitFor(fromLuaTime(duration.as(), - metric.as>())); - } RETHROW_WITH_PREFIX("effil.sleep"); - } - else { - yield(); - } -} - -} // namespace this_thread - -ThreadHandle::ThreadHandle() - : status_(Status::Running) - , command_(Command::Run) - , currNotifier_(nullptr) - , lua_(std::make_unique()) +void Thread::runThread( + Thread thread, + Function function, + effil::StoredArray arguments) { - luaL_openlibs(*lua_); -} - -void ThreadHandle::putCommand(Command cmd) { - std::unique_lock lock(stateLock_); - if (isFinishStatus(status_)) - return; - - command_ = cmd; - statusNotifier_.reset(); - commandNotifier_.notify(); -} - -void ThreadHandle::changeStatus(Status stat) { - std::unique_lock lock(stateLock_); - status_ = stat; - commandNotifier_.reset(); - statusNotifier_.notify(); - if (isFinishStatus(stat)) - completionNotifier_.notify(); -} - -void Thread::runThread(Thread thread, - Function function, - effil::StoredArray arguments) { - thisThreadHandle = thread.ctx_.get(); - assert(thisThreadHandle != nullptr); - + ThreadHandle::setThis(thread.ctx_.get()); try { { ScopeGuard reportComplete([thread, &arguments](){ @@ -193,7 +76,7 @@ void Thread::runThread(Thread thread, sol::protected_function_result result = userFuncObj(std::move(arguments)); if (!result.valid()) { - if (thread.ctx_->status() == Status::Canceled) + if (thread.ctx_->status() == Status::Cancelled) return; sol::error err = result; @@ -213,15 +96,18 @@ void Thread::runThread(Thread thread, } } thread.ctx_->changeStatus(Status::Completed); - } catch (const LuaHookStopException&) { - thread.ctx_->changeStatus(Status::Canceled); } catch (const std::exception& err) { - DEBUG("thread") << "Failed with msg: " << err.what() << std::endl; - auto& returns = thread.ctx_->result(); - returns.insert(returns.begin(), - { createStoredObject("failed"), - createStoredObject(err.what()) }); - thread.ctx_->changeStatus(Status::Failed); + if (thread.ctx_->command() == Command::Cancel && strcmp(err.what(), ThreadCancelException::message) == 0) { + thread.ctx_->changeStatus(Status::Cancelled); + } else { + DEBUG("thread") << "Failed with msg: " << err.what() << std::endl; + auto& returns = thread.ctx_->result(); + returns.insert(returns.begin(), { + createStoredObject("failed"), + createStoredObject(err.what()) + }); + thread.ctx_->changeStatus(Status::Failed); + } } } @@ -319,7 +205,7 @@ bool Thread::cancel(const sol::this_state&, ctx_->putCommand(Command::Cancel); ctx_->interrupt(); Status status = ctx_->waitForStatusChange(toOptionalTime(duration, period)); - return isFinishStatus(status); + return ThreadHandle::isFinishStatus(status); } bool Thread::pause(const sol::this_state&, diff --git a/src/cpp/thread.h b/src/cpp/thread.h new file mode 100644 index 0000000..e112ba8 --- /dev/null +++ b/src/cpp/thread.h @@ -0,0 +1,43 @@ +#pragma once + +#include "lua-helpers.h" +#include "function.h" +#include "thread-handle.h" + +#include + +namespace effil { + +class Thread : public GCObject { +public: + static void exportAPI(sol::state_view& lua); + + StoredArray status(const sol::this_state& state); + StoredArray wait(const sol::this_state& state, + const sol::optional& duration, + const sol::optional& period); + StoredArray get(const sol::optional& duration, + const sol::optional& period); + bool cancel(const sol::this_state& state, + const sol::optional& duration, + const sol::optional& period); + bool pause(const sol::this_state&, + const sol::optional& duration, + const sol::optional& period); + void resume(); + +private: + Thread() = default; + void initialize( + const std::string& path, + const std::string& cpath, + int step, + const sol::function& function, + const sol::variadic_args& args); + friend class GC; + +private: + static void runThread(Thread, Function, effil::StoredArray); +}; + +} // effil diff --git a/src/cpp/utils.h b/src/cpp/utils.h index 116cec1..3beff7d 100644 --- a/src/cpp/utils.h +++ b/src/cpp/utils.h @@ -23,10 +23,10 @@ int luaopen_effil(lua_State* L); namespace effil { -class Exception : public sol::error { +class Exception : public std::runtime_error { public: Exception() noexcept - : sol::error("") {} + : std::runtime_error("") {} template Exception& operator<<(const T& value) { diff --git a/tests/lua/thread-interrupt.lua b/tests/lua/thread-interrupt.lua index e310682..a837c09 100644 --- a/tests/lua/thread-interrupt.lua +++ b/tests/lua/thread-interrupt.lua @@ -16,7 +16,7 @@ local function interruption_test(worker) local start_time = os.time() thr:cancel(1) - test.equal(thr:status(), "canceled") + test.equal(thr:status(), "cancelled") test.almost_equal(os.time(), start_time, 1) state.stop = true end diff --git a/tests/lua/thread-stress.lua b/tests/lua/thread-stress.lua index cc51f59..85d7529 100644 --- a/tests/lua/thread-stress.lua +++ b/tests/lua/thread-stress.lua @@ -6,7 +6,7 @@ test.thread_stress.time = function () local function check_time(real_time, use_time, metric) local start_time = os.time() effil.sleep(use_time, metric) - test.almost_equal(os.time(), start_time + real_time, 1) + test.almost_equal(os.time(), start_time + real_time, 2) end check_time(4, 4, nil) -- seconds by default check_time(4, 4, 's') diff --git a/tests/lua/thread.lua b/tests/lua/thread.lua index d8b378a..a57e6f5 100644 --- a/tests/lua/thread.lua +++ b/tests/lua/thread.lua @@ -124,7 +124,7 @@ test.thread.cancel = function () )() test.is_true(thread:cancel()) - test.equal(thread:status(), "canceled") + test.equal(thread:status(), "cancelled") end test.thread.async_cancel = function () @@ -140,7 +140,7 @@ test.thread.async_cancel = function () thread:cancel(0) test.is_true(wait(2, function() return thread:status() ~= 'running' end)) - test.equal(thread:status(), 'canceled') + test.equal(thread:status(), 'cancelled') end test.thread.pause_resume_cancel = function () @@ -156,7 +156,7 @@ test.thread.pause_resume_cancel = function () test.is_true(wait(2, function() return data.value > 100 end)) test.is_true(thread:pause()) test.equal(thread:status(), "paused") - + local savedValue = data.value sleep(1) test.equal(data.value, savedValue) @@ -209,7 +209,7 @@ test.thread.async_pause_resume_cancel = function () test.is_true(wait(5, function() return (data.value - savedValue) > 100 end)) thread:cancel(0) - test.is_true(wait(5, function() return thread:status() == "canceled" end)) + test.is_true(wait(5, function() return thread:status() == "cancelled" end)) thread:wait() end @@ -314,6 +314,30 @@ test.this_thread.functions = function () test.not_equal(share["child.id"], effil.thread_id()) end +test.this_thread.cancel_with_yield = function () + local ctx = effil.table() + local spec = effil.thread(function() + while not ctx.stop do + -- Just waiting + end + ctx.done = true + while true do + effil.yield() + end + ctx.after_yield = true + end) + spec.step = 0 + local thr = spec() + + test.is_false(thr:cancel(1)) + ctx.stop = true + + test.is_true(thr:cancel()) + test.equal(thr:status(), "cancelled") + test.is_true(ctx.done) + test.is_nil(ctx.after_yield) +end + test.this_thread.pause_with_yield = function () local share = effil.table({stop = false}) local spec = effil.thread(function (share) @@ -352,12 +376,12 @@ local function call_pause(thr) return true end --- Regress test to check hanging when invoke pause on canceled thread -test.this_thread.pause_on_canceled_thread = function () +-- Regress test to check hanging when invoke pause on cancelled thread +test.this_thread.pause_on_cancelled_thread = function () local worker_thread = effil.thread(worker)({ need_to_stop = false}) effil.sleep(1, 's') worker_thread:cancel() - test.equal(worker_thread:wait(2, "s"), "canceled") + test.equal(worker_thread:wait(2, "s"), "cancelled") test.is_true(effil.thread(call_pause)(worker_thread):get(5, "s")) end @@ -406,3 +430,108 @@ test.thread.traceback = function() end end -- LUA_VERSION > 51 + +test.thread.cancel_thread_with_pcall = function() + local steps = effil.table{step1 = false, step2 = false} + local pcall_results = effil.table{} + + local thr = effil.thread( + function() + pcall_results.ret, pcall_results.msg = pcall(function() + while true do + effil.yield() + end + end) + + steps.step1 = true + effil.yield() + steps.step2 = true -- should never reach + end + )() + + test.is_true(thr:cancel()) + test.equal(thr:wait(), "cancelled") + test.is_true(steps.step1) + test.is_false(steps.step2) + test.is_false(pcall_results.ret) + test.equal(pcall_results.msg, "Effil: thread is cancelled") +end + +test.thread.cancel_thread_with_pcall_not_cancelled = function() + local thr = effil.thread( + function() + pcall(function() + while true do + effil.yield() + end + end) + end + )() + test.is_true(thr:cancel()) + test.equal(thr:wait(), "completed") +end + +test.thread.cancel_thread_with_pcall_and_another_error = function() + local msg = 'some text' + local thr = effil.thread( + function() + pcall(function() + while true do + effil.yield() + end + end) + error(msg) + end + )() + test.is_true(thr:cancel()) + local status, message = thr:wait() + test.equal(status, "failed") + test.is_not_nil(string.find(message, ".+: " .. msg)) +end + +if not jit then + +test.thread.cancel_thread_with_pcall_without_yield = function() + local thr = effil.thread( + function() + while true do + -- pass + end + end + ) + thr = thr() + test.is_true(thr:cancel()) + test.equal(thr:wait(), "cancelled") +end + +end + +test.thread.check_effil_pcall_success = function() + local inp1, inp2, inp3 = 1, "str", {} + local res, ret1, ret2, ret3 = effil.pcall(function(...) return ... end, inp1, inp2, inp3) + test.is_true(res) + test.equal(ret1, inp1) + test.equal(ret2, inp2) + test.equal(ret3, inp3) +end + +test.thread.check_effil_pcall_fail = function() + local err = "some text" + local res, msg = effil.pcall(function(err) error(err) end, err) + test.is_false(res) + test.is_not_nil(string.find(msg, ".+: " .. err)) +end + +test.thread.check_effil_pcall_with_cancel_thread = function() + local thr = effil.thread( + function() + effil.pcall(function() + while true do + effil.yield() + end + end) + end + )() + test.is_true(thr:cancel()) + test.equal(thr:wait(), "cancelled") +end