Fix pause hanging (#61)

This commit is contained in:
mihacooper 2017-09-10 22:28:38 +03:00 committed by Ilia
parent 736cf8d96d
commit 5aa43c0b8a
5 changed files with 154 additions and 91 deletions

View File

@ -1,6 +1,10 @@
cmake_minimum_required(VERSION 2.8)
project(effil)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release")
endif(NOT CMAKE_BUILD_TYPE)
if (NOT LUA_INCLUDE_DIR OR NOT LUA_LIBRARY)
find_package(Lua REQUIRED)
endif()

View File

@ -24,6 +24,10 @@ enum class Status {
Failed
};
bool isFinishStatus(Status stat) {
return stat == Status::Canceled || stat == Status::Completed || stat == Status::Failed;
}
std::string statusToString(Status status) {
switch (status) {
case Status::Running:
@ -50,19 +54,9 @@ enum class Command {
} // namespace
class ThreadHandle {
public:
Status status;
StoredArray result;
Notifier completion;
// on thread resume
Notifier pause;
// used only with sync pause
Notifier syncPause;
public:
ThreadHandle()
: status(Status::Running)
: status_(Status::Running)
, command_(Command::Run)
, lua_(std::make_unique<sol::state>()) {
luaL_openlibs(*lua_);
@ -70,11 +64,52 @@ public:
Command command() const { return command_; }
void command(Command cmd) {
std::lock_guard<SpinMutex> lock(commandMutex_);
if (command_ == Command::Cancel)
void putCommand(Command cmd) {
std::unique_lock<std::mutex> lock(stateLock_);
if (isFinishStatus(status_))
return;
command_ = cmd;
statusNotifier_.reset();
commandNotifier_.notify();
}
void changeStatus(Status stat) {
std::unique_lock<std::mutex> lock(stateLock_);
status_ = stat;
commandNotifier_.reset();
statusNotifier_.notify();
if (isFinishStatus(stat))
completionNotifier_.notify();
}
template <typename T>
Status waitForStatusChange(const sol::optional<T>& time) {
if (time)
statusNotifier_.waitFor(*time);
else
statusNotifier_.wait();
return status_;
}
template <typename T>
Command waitForCommandChange(const sol::optional<T>& time) {
if (time)
commandNotifier_.waitFor(*time);
else
commandNotifier_.wait();
return command_;
}
template <typename T>
bool waitForCompletion(const sol::optional<T>& time) {
if (time) {
return completionNotifier_.waitFor(*time);
}
else {
completionNotifier_.wait();
return true;
}
}
sol::state& lua() {
@ -84,15 +119,26 @@ public:
void destroyLua() { lua_.reset(); }
Status status() { return status_; }
StoredArray& result() { return result_; }
private:
SpinMutex commandMutex_;
Status status_;
Command command_;
Notifier statusNotifier_;
Notifier commandNotifier_;
Notifier completionNotifier_;
std::mutex stateLock_;
StoredArray result_;
std::unique_ptr<sol::state> lua_;
};
namespace {
const sol::optional<std::chrono::milliseconds> NO_TIMEOUT;
static thread_local ThreadHandle* thisThreadHandle = nullptr;
void luaHook(lua_State*, lua_Debug*) {
@ -103,11 +149,13 @@ void luaHook(lua_State*, lua_Debug*) {
case Command::Cancel:
throw LuaHookStopException();
case Command::Pause: {
thisThreadHandle->status = Status::Paused;
thisThreadHandle->syncPause.notify();
thisThreadHandle->pause.wait();
if (thisThreadHandle->command() == Command::Run)
thisThreadHandle->status = Status::Running;
thisThreadHandle->changeStatus(Status::Paused);
Command cmd;
do {
cmd = thisThreadHandle->waitForCommandChange(NO_TIMEOUT);
} while(cmd != Command::Run && cmd != Command::Cancel);
if (cmd == Command::Run)
thisThreadHandle->changeStatus(Status::Running);
else
throw LuaHookStopException();
break;
@ -115,46 +163,36 @@ void luaHook(lua_State*, lua_Debug*) {
}
}
class ScopeGuard {
public:
ScopeGuard(const std::function<void()>& f) : f_(f) {}
~ScopeGuard() { f_(); }
private:
std::function<void()> f_;
};
void runThread(std::shared_ptr<ThreadHandle> handle,
std::string strFunction,
effil::StoredArray arguments) {
ScopeGuard reportComplete([=](){
DEBUG << "Finished " << std::endl;
// Let's destroy accociated state
// to release all resources as soon as possible
handle->destroyLua();
handle->completion.notify();
});
assert(handle);
thisThreadHandle = handle.get();
try {
sol::function userFuncObj = loadString(handle->lua(), strFunction);
sol::function_result results = userFuncObj(std::move(arguments));
(void)results; // just leave all returns on the stack
sol::variadic_args args(handle->lua(), -lua_gettop(handle->lua()));
for (const auto& iter : args) {
StoredObject store = createStoredObject(iter.get<sol::object>());
handle->result.emplace_back(std::move(store));
{
ScopeGuard reportComplete([=](){
DEBUG << "Finished " << std::endl;
// Let's destroy accociated state
// to release all resources as soon as possible
handle->destroyLua();
});
sol::function userFuncObj = loadString(handle->lua(), strFunction);
sol::function_result results = userFuncObj(std::move(arguments));
(void)results; // just leave all returns on the stack
sol::variadic_args args(handle->lua(), -lua_gettop(handle->lua()));
for (const auto& iter : args) {
StoredObject store = createStoredObject(iter.get<sol::object>());
handle->result().emplace_back(std::move(store));
}
}
handle->status = Status::Completed;
handle->changeStatus(Status::Completed);
} catch (const LuaHookStopException&) {
handle->status = Status::Canceled;
handle->changeStatus(Status::Canceled);
} catch (const sol::error& err) {
DEBUG << "Failed with msg: " << err.what() << std::endl;
handle->result.emplace_back(createStoredObject(err.what()));
handle->status = Status::Failed;
handle->result().emplace_back(createStoredObject(err.what()));
handle->changeStatus(Status::Failed);
}
}
@ -224,38 +262,35 @@ void Thread::getUserType(sol::state_view& lua) {
}
std::pair<sol::object, sol::object> Thread::status(const sol::this_state& lua) {
sol::object luaStatus = sol::make_object(lua, statusToString(handle_->status));
sol::object luaStatus = sol::make_object(lua, statusToString(handle_->status()));
if (handle_->status == Status::Failed) {
assert(!handle_->result.empty());
return std::make_pair(luaStatus, handle_->result[0]->unpack(lua));
if (handle_->status() == Status::Failed) {
assert(!handle_->result().empty());
return std::make_pair(luaStatus, handle_->result()[0]->unpack(lua));
} else {
return std::make_pair(luaStatus, sol::nil);
}
}
bool Thread::waitFor(const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
if (!duration) { // sync version
handle_->completion.wait();
return true;
} else { // async version
return handle_->completion.waitFor(fromLuaTime(*duration, period));
}
sol::optional<std::chrono::milliseconds> toOptionalTime(const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
if (duration)
return fromLuaTime(*duration, period);
else
return sol::optional<std::chrono::milliseconds>();
}
std::pair<sol::object, sol::object> Thread::wait(const sol::this_state& lua,
const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
waitFor(duration, period);
handle_->waitForCompletion(toOptionalTime(duration, period));
return status(lua);
}
StoredArray Thread::get(const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
bool completed = waitFor(duration, period);
if (completed && handle_->status == Status::Completed)
return handle_->result;
if (handle_->waitForCompletion(toOptionalTime(duration, period)) && handle_->status() == Status::Completed)
return handle_->result();
else
return StoredArray();
}
@ -263,35 +298,21 @@ StoredArray Thread::get(const sol::optional<int>& duration,
bool Thread::cancel(const sol::this_state&,
const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
handle_->command(Command::Cancel);
handle_->pause.notify();
if (handle_->status == Status::Running) {
return waitFor(duration, period);
} else {
handle_->completion.wait();
return true;
}
handle_->putCommand(Command::Cancel);
Status status = handle_->waitForStatusChange(toOptionalTime(duration, period));
return isFinishStatus(status);
}
bool Thread::pause(const sol::this_state&,
const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
handle_->pause.reset();
handle_->command(Command::Pause);
if (!duration) { // sync wait
handle_->syncPause.wait();
return true;
} else { // async wait
return handle_->syncPause.waitFor(fromLuaTime(*duration, period));
}
handle_->putCommand(Command::Pause);
Status status = handle_->waitForStatusChange(toOptionalTime(duration, period));
return status == Status::Paused;
}
void Thread::resume() {
handle_->command(Command::Run);
handle_->syncPause.reset();
handle_->pause.notify();
handle_->putCommand(Command::Run);
}
} // effil

View File

@ -39,9 +39,6 @@ public:
private:
std::shared_ptr<ThreadHandle> handle_;
private:
bool waitFor(const sol::optional<int>& duration,
const sol::optional<std::string>& period);
private:
Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;

View File

@ -37,6 +37,15 @@ private:
std::string message_;
};
class ScopeGuard {
public:
ScopeGuard(const std::function<void()>& f) : f_(f) {}
~ScopeGuard() { f_(); }
private:
std::function<void()> f_;
};
} // effil
#define REQUIRE(cond) if (!(cond)) throw effil::Exception()

View File

@ -98,7 +98,6 @@ test.thread.cancel = function ()
test.is_true(thread:cancel())
test.equal(thread:status(), "canceled")
end
test.thread.async_cancel = function ()
local thread_runner = effil.thread(
function()
@ -156,7 +155,7 @@ test.thread.pause_cancel = function ()
sleep(1)
test.equal(data.value, savedValue)
test.is_true(thread:cancel(0))
test.is_true(thread:cancel(1))
end
test.thread.async_pause_resume_cancel = function ()
@ -334,3 +333,36 @@ test.this_thread.pause_with_yield = function ()
test.is_true(thr:get())
test.is_true(share.done)
end
local function worker(cmd)
eff = require("effil")
while not cmd.need_to_stop do
eff.yield()
end
return true
end
local function call_pause(thr)
-- 'pause()' may hang infinitelly, so lets to run it in separate thread
thr:pause()
return true
end
-- Regress test to check hanging when invoke pause on canceled thread
test.this_thread.pause_on_canceled_thread = function ()
local worker_thread = effil.thread(worker)({ need_to_stop = false})
effil.sleep(1, 's')
worker_thread:cancel()
test.equal(worker_thread:wait(2, "s"), "canceled")
test.is_true(effil.thread(call_pause)(worker_thread):get(5, "s"))
end
-- Regress test to check hanging when invoke pause on finished thread
test.this_thread.pause_on_finished_thread = function ()
local cmd = effil.table({ need_to_stop = false})
local worker_thread = effil.thread(worker)(cmd)
effil.sleep(1, 's')
cmd.need_to_stop = true
test.is_true(worker_thread:get(2, "s"))
test.is_true(effil.thread(call_pause)(worker_thread):get(5, "s"))
end