From 5aa43c0b8add1e1382cf818df6b98b8f941f4b80 Mon Sep 17 00:00:00 2001 From: mihacooper Date: Sun, 10 Sep 2017 22:28:38 +0300 Subject: [PATCH] Fix pause hanging (#61) --- CMakeLists.txt | 4 + src/cpp/threading.cpp | 193 +++++++++++++++++++++++------------------- src/cpp/threading.h | 3 - src/cpp/utils.h | 9 ++ tests/lua/thread.lua | 36 +++++++- 5 files changed, 154 insertions(+), 91 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 191d5a7..ddb4964 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,10 @@ cmake_minimum_required(VERSION 2.8) project(effil) +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE "Release") +endif(NOT CMAKE_BUILD_TYPE) + if (NOT LUA_INCLUDE_DIR OR NOT LUA_LIBRARY) find_package(Lua REQUIRED) endif() diff --git a/src/cpp/threading.cpp b/src/cpp/threading.cpp index 0136acd..4e0fb86 100644 --- a/src/cpp/threading.cpp +++ b/src/cpp/threading.cpp @@ -24,6 +24,10 @@ enum class Status { Failed }; +bool isFinishStatus(Status stat) { + return stat == Status::Canceled || stat == Status::Completed || stat == Status::Failed; +} + std::string statusToString(Status status) { switch (status) { case Status::Running: @@ -50,19 +54,9 @@ enum class Command { } // namespace class ThreadHandle { -public: - Status status; - StoredArray result; - - Notifier completion; - // on thread resume - Notifier pause; - // used only with sync pause - Notifier syncPause; - public: ThreadHandle() - : status(Status::Running) + : status_(Status::Running) , command_(Command::Run) , lua_(std::make_unique()) { luaL_openlibs(*lua_); @@ -70,11 +64,52 @@ public: Command command() const { return command_; } - void command(Command cmd) { - std::lock_guard lock(commandMutex_); - if (command_ == Command::Cancel) + void putCommand(Command cmd) { + std::unique_lock lock(stateLock_); + if (isFinishStatus(status_)) return; + command_ = cmd; + statusNotifier_.reset(); + commandNotifier_.notify(); + } + + void changeStatus(Status stat) { + std::unique_lock lock(stateLock_); + status_ = stat; + commandNotifier_.reset(); + statusNotifier_.notify(); + if (isFinishStatus(stat)) + completionNotifier_.notify(); + } + + template + Status waitForStatusChange(const sol::optional& time) { + if (time) + statusNotifier_.waitFor(*time); + else + statusNotifier_.wait(); + return status_; + } + + template + Command waitForCommandChange(const sol::optional& time) { + if (time) + commandNotifier_.waitFor(*time); + else + commandNotifier_.wait(); + return command_; + } + + template + bool waitForCompletion(const sol::optional& time) { + if (time) { + return completionNotifier_.waitFor(*time); + } + else { + completionNotifier_.wait(); + return true; + } } sol::state& lua() { @@ -84,15 +119,26 @@ public: void destroyLua() { lua_.reset(); } + Status status() { return status_; } + + StoredArray& result() { return result_; } + private: - SpinMutex commandMutex_; + Status status_; Command command_; + Notifier statusNotifier_; + Notifier commandNotifier_; + Notifier completionNotifier_; + std::mutex stateLock_; + StoredArray result_; std::unique_ptr lua_; }; namespace { +const sol::optional NO_TIMEOUT; + static thread_local ThreadHandle* thisThreadHandle = nullptr; void luaHook(lua_State*, lua_Debug*) { @@ -103,11 +149,13 @@ void luaHook(lua_State*, lua_Debug*) { case Command::Cancel: throw LuaHookStopException(); case Command::Pause: { - thisThreadHandle->status = Status::Paused; - thisThreadHandle->syncPause.notify(); - thisThreadHandle->pause.wait(); - if (thisThreadHandle->command() == Command::Run) - thisThreadHandle->status = Status::Running; + thisThreadHandle->changeStatus(Status::Paused); + Command cmd; + do { + cmd = thisThreadHandle->waitForCommandChange(NO_TIMEOUT); + } while(cmd != Command::Run && cmd != Command::Cancel); + if (cmd == Command::Run) + thisThreadHandle->changeStatus(Status::Running); else throw LuaHookStopException(); break; @@ -115,46 +163,36 @@ void luaHook(lua_State*, lua_Debug*) { } } -class ScopeGuard { -public: - ScopeGuard(const std::function& f) : f_(f) {} - ~ScopeGuard() { f_(); } - -private: - std::function f_; -}; - void runThread(std::shared_ptr handle, std::string strFunction, effil::StoredArray arguments) { - - ScopeGuard reportComplete([=](){ - DEBUG << "Finished " << std::endl; - // Let's destroy accociated state - // to release all resources as soon as possible - handle->destroyLua(); - handle->completion.notify(); - }); - assert(handle); thisThreadHandle = handle.get(); try { - sol::function userFuncObj = loadString(handle->lua(), strFunction); - sol::function_result results = userFuncObj(std::move(arguments)); - (void)results; // just leave all returns on the stack - sol::variadic_args args(handle->lua(), -lua_gettop(handle->lua())); - for (const auto& iter : args) { - StoredObject store = createStoredObject(iter.get()); - handle->result.emplace_back(std::move(store)); + { + ScopeGuard reportComplete([=](){ + DEBUG << "Finished " << std::endl; + // Let's destroy accociated state + // to release all resources as soon as possible + handle->destroyLua(); + }); + sol::function userFuncObj = loadString(handle->lua(), strFunction); + sol::function_result results = userFuncObj(std::move(arguments)); + (void)results; // just leave all returns on the stack + sol::variadic_args args(handle->lua(), -lua_gettop(handle->lua())); + for (const auto& iter : args) { + StoredObject store = createStoredObject(iter.get()); + handle->result().emplace_back(std::move(store)); + } } - handle->status = Status::Completed; + handle->changeStatus(Status::Completed); } catch (const LuaHookStopException&) { - handle->status = Status::Canceled; + handle->changeStatus(Status::Canceled); } catch (const sol::error& err) { DEBUG << "Failed with msg: " << err.what() << std::endl; - handle->result.emplace_back(createStoredObject(err.what())); - handle->status = Status::Failed; + handle->result().emplace_back(createStoredObject(err.what())); + handle->changeStatus(Status::Failed); } } @@ -224,38 +262,35 @@ void Thread::getUserType(sol::state_view& lua) { } std::pair Thread::status(const sol::this_state& lua) { - sol::object luaStatus = sol::make_object(lua, statusToString(handle_->status)); + sol::object luaStatus = sol::make_object(lua, statusToString(handle_->status())); - if (handle_->status == Status::Failed) { - assert(!handle_->result.empty()); - return std::make_pair(luaStatus, handle_->result[0]->unpack(lua)); + if (handle_->status() == Status::Failed) { + assert(!handle_->result().empty()); + return std::make_pair(luaStatus, handle_->result()[0]->unpack(lua)); } else { return std::make_pair(luaStatus, sol::nil); } } -bool Thread::waitFor(const sol::optional& duration, - const sol::optional& period) { - if (!duration) { // sync version - handle_->completion.wait(); - return true; - } else { // async version - return handle_->completion.waitFor(fromLuaTime(*duration, period)); - } +sol::optional toOptionalTime(const sol::optional& duration, + const sol::optional& period) { + if (duration) + return fromLuaTime(*duration, period); + else + return sol::optional(); } std::pair Thread::wait(const sol::this_state& lua, const sol::optional& duration, const sol::optional& period) { - waitFor(duration, period); + handle_->waitForCompletion(toOptionalTime(duration, period)); return status(lua); } StoredArray Thread::get(const sol::optional& duration, const sol::optional& period) { - bool completed = waitFor(duration, period); - if (completed && handle_->status == Status::Completed) - return handle_->result; + if (handle_->waitForCompletion(toOptionalTime(duration, period)) && handle_->status() == Status::Completed) + return handle_->result(); else return StoredArray(); } @@ -263,35 +298,21 @@ StoredArray Thread::get(const sol::optional& duration, bool Thread::cancel(const sol::this_state&, const sol::optional& duration, const sol::optional& period) { - handle_->command(Command::Cancel); - handle_->pause.notify(); - - if (handle_->status == Status::Running) { - return waitFor(duration, period); - } else { - handle_->completion.wait(); - return true; - } + handle_->putCommand(Command::Cancel); + Status status = handle_->waitForStatusChange(toOptionalTime(duration, period)); + return isFinishStatus(status); } bool Thread::pause(const sol::this_state&, const sol::optional& duration, const sol::optional& period) { - handle_->pause.reset(); - handle_->command(Command::Pause); - - if (!duration) { // sync wait - handle_->syncPause.wait(); - return true; - } else { // async wait - return handle_->syncPause.waitFor(fromLuaTime(*duration, period)); - } + handle_->putCommand(Command::Pause); + Status status = handle_->waitForStatusChange(toOptionalTime(duration, period)); + return status == Status::Paused; } void Thread::resume() { - handle_->command(Command::Run); - handle_->syncPause.reset(); - handle_->pause.notify(); + handle_->putCommand(Command::Run); } } // effil diff --git a/src/cpp/threading.h b/src/cpp/threading.h index b4c4c86..e29d0db 100644 --- a/src/cpp/threading.h +++ b/src/cpp/threading.h @@ -39,9 +39,6 @@ public: private: std::shared_ptr handle_; -private: - bool waitFor(const sol::optional& duration, - const sol::optional& period); private: Thread(const Thread&) = delete; Thread& operator=(const Thread&) = delete; diff --git a/src/cpp/utils.h b/src/cpp/utils.h index c4b75f9..9b790d6 100644 --- a/src/cpp/utils.h +++ b/src/cpp/utils.h @@ -37,6 +37,15 @@ private: std::string message_; }; +class ScopeGuard { +public: + ScopeGuard(const std::function& f) : f_(f) {} + ~ScopeGuard() { f_(); } + +private: + std::function f_; +}; + } // effil #define REQUIRE(cond) if (!(cond)) throw effil::Exception() diff --git a/tests/lua/thread.lua b/tests/lua/thread.lua index 6fff35b..b6ca537 100644 --- a/tests/lua/thread.lua +++ b/tests/lua/thread.lua @@ -98,7 +98,6 @@ test.thread.cancel = function () test.is_true(thread:cancel()) test.equal(thread:status(), "canceled") end - test.thread.async_cancel = function () local thread_runner = effil.thread( function() @@ -156,7 +155,7 @@ test.thread.pause_cancel = function () sleep(1) test.equal(data.value, savedValue) - test.is_true(thread:cancel(0)) + test.is_true(thread:cancel(1)) end test.thread.async_pause_resume_cancel = function () @@ -334,3 +333,36 @@ test.this_thread.pause_with_yield = function () test.is_true(thr:get()) test.is_true(share.done) end + +local function worker(cmd) + eff = require("effil") + while not cmd.need_to_stop do + eff.yield() + end + return true +end + +local function call_pause(thr) + -- 'pause()' may hang infinitelly, so lets to run it in separate thread + thr:pause() + return true +end + +-- Regress test to check hanging when invoke pause on canceled thread +test.this_thread.pause_on_canceled_thread = function () + local worker_thread = effil.thread(worker)({ need_to_stop = false}) + effil.sleep(1, 's') + worker_thread:cancel() + test.equal(worker_thread:wait(2, "s"), "canceled") + test.is_true(effil.thread(call_pause)(worker_thread):get(5, "s")) +end + +-- Regress test to check hanging when invoke pause on finished thread +test.this_thread.pause_on_finished_thread = function () + local cmd = effil.table({ need_to_stop = false}) + local worker_thread = effil.thread(worker)(cmd) + effil.sleep(1, 's') + cmd.need_to_stop = true + test.is_true(worker_thread:get(2, "s")) + test.is_true(effil.thread(call_pause)(worker_thread):get(5, "s")) +end