Interruption points in sync operations (get, wait, pop, ...) (#118)

Interruption points in sync operations (get, wait, pop, ...)
This commit is contained in:
mihacooper 2022-04-17 20:46:25 +04:00 committed by GitHub
parent c5e5123341
commit 1b0c968bf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 300 additions and 96 deletions

View File

@ -13,7 +13,7 @@ jobs:
matrix:
build-type: [Release] # Debug
lua: ["lua 5.1", "lua 5.2", "lua 5.3", "luajit 2.0"]
os: ["windows-2016"]
os: ["windows-2019"]
platform: [
{"forLua": "vs_32", "forCMake": "Win32"},
{"forLua": "vs_64", "forCMake": "x64"},

View File

@ -244,6 +244,24 @@ List of available time intervals:
- `m` - minutes;
- `h` - hours.
All blocking operations (even in non blocking mode) are interruption points. Thread hanged in such operation can be interrupted by invoking [thread:cancel()](#threadcanceltime-metric) method.
<details>
<summary><b>Example</b></summary>
<p>
```lua
local effil = require "effil"
local worker = effil.thread(function()
effil.sleep(999) -- worker will hang for 999 seconds
end)()
worker:cancel(1) -- returns true, cause blocking operation was interrupted and thread was canceled
```
</p>
</details>
## Function's upvalues
Working with functions Effil serializes and deserializes them using [`lua_dump`](#https://www.lua.org/manual/5.3/manual.html#lua_dump) and [`lua_load`](#https://www.lua.org/manual/5.3/manual.html#lua_load) methods. All function's upvalues are stored following the same [rules](#important-notes) as usual. If function has **upvalue of unsupported type** this function cannot be transmitted to Effil. You will get error in that case.
@ -528,3 +546,4 @@ effil.type(effil.channel()) == "effil.channel"
effil.type({}) == "table"
effil.type(1) == "number"
```

View File

@ -52,15 +52,26 @@ bool Channel::push(const sol::variadic_args& args) {
StoredArray Channel::pop(const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
this_thread::interruptionPoint();
std::unique_lock<std::mutex> lock(ctx_->lock_);
{
this_thread::ScopedSetInterruptable interruptable(this);
Timer timer(duration ? fromLuaTime(duration.value(), period) :
std::chrono::milliseconds());
while (ctx_->channel_.empty()) {
if (duration) {
if (ctx_->cv_.wait_for(lock, fromLuaTime(duration.value(), period)) == std::cv_status::timeout)
if (timer.isFinished() ||
ctx_->cv_.wait_for(lock, timer.left()) ==
std::cv_status::timeout) {
return StoredArray();
}
}
else { // No time limit
ctx_->cv_.wait(lock);
}
this_thread::interruptionPoint();
}
}
auto ret = ctx_->channel_.front();
@ -78,4 +89,9 @@ size_t Channel::size() {
return ctx_->channel_.size();
}
void Channel::interrupt()
{
ctx_->cv_.notify_all();
}
} // namespace effil

View File

@ -17,7 +17,7 @@ public:
std::queue<StoredArray> channel_;
};
class Channel : public GCObject<ChannelData> {
class Channel : public GCObject<ChannelData>, public IInterruptable {
public:
static void exportAPI(sol::state_view& lua);
@ -27,6 +27,8 @@ public:
size_t size();
void interrupt() final;
private:
Channel() = default;
void initialize(const sol::stack_object& capacity);

View File

@ -61,4 +61,21 @@ std::chrono::milliseconds fromLuaTime(int duration, const sol::optional<std::str
else throw sol::error("invalid time metric: " + metric);
}
using namespace std::chrono;
Timer::Timer(const milliseconds& timeout)
: timeout_(timeout), startTime_(high_resolution_clock::now())
{}
bool Timer::isFinished() {
return left() == milliseconds(0);
}
milliseconds Timer::left() {
const auto diff = high_resolution_clock::now() - startTime_;
return timeout_ > diff ? duration_cast<milliseconds>((timeout_ - diff)):
milliseconds(0);
}
} // namespace effil

View File

@ -34,6 +34,17 @@ std::string luaTypename(const SolObject& obj) {
typedef std::vector<effil::StoredObject> StoredArray;
class Timer {
public:
Timer(const std::chrono::milliseconds& timeout);
bool isFinished();
std::chrono::milliseconds left();
private:
std::chrono::milliseconds timeout_;
std::chrono::high_resolution_clock::time_point startTime_;
};
} // namespace effil
namespace sol {

View File

@ -97,9 +97,9 @@ int luaopen_effil(lua_State* L) {
sol::usertype<EffilApiMarker> type("new", sol::no_constructor,
"thread", createThreadRunner,
"thread_id", threadId,
"sleep", sleep,
"yield", yield,
"thread_id", this_thread::threadId,
"sleep", this_thread::sleep,
"yield", this_thread::yield,
"table", createTable,
"rawset", SharedTable::luaRawSet,
"rawget", SharedTable::luaRawGet,

View File

@ -1,15 +1,22 @@
#pragma once
#include <this_thread.h>
#include <lua-helpers.h>
#include <mutex>
#include <condition_variable>
namespace effil {
class Notifier {
struct IInterruptable {
virtual void interrupt() = 0;
};
class Notifier : public IInterruptable {
public:
Notifier()
: notified_(false) {
}
typedef std::function<void()> InterruptChecker;
Notifier() : notified_(false) {}
void notify() {
std::unique_lock<std::mutex> lock(mutex_);
@ -17,19 +24,38 @@ public:
cv_.notify_all();
}
void interrupt() final {
cv_.notify_all();
}
void wait() {
this_thread::interruptionPoint();
this_thread::ScopedSetInterruptable interruptable(this);
std::unique_lock<std::mutex> lock(mutex_);
while (!notified_)
while (!notified_) {
cv_.wait(lock);
this_thread::interruptionPoint();
}
}
template <typename T>
bool waitFor(T period) {
this_thread::interruptionPoint();
if (period == std::chrono::seconds(0) || notified_)
return notified_;
this_thread::ScopedSetInterruptable interruptable(this);
Timer timer(period);
std::unique_lock<std::mutex> lock(mutex_);
while (cv_.wait_for(lock, period) != std::cv_status::timeout && !notified_);
while (!timer.isFinished() &&
cv_.wait_for(lock, timer.left()) != std::cv_status::timeout &&
!notified_) {
this_thread::interruptionPoint();
}
return notified_;
}

View File

@ -64,7 +64,7 @@ void SharedTable::set(StoredObject&& key, StoredObject&& value) {
sol::object SharedTable::get(const StoredObject& key, sol::this_state state) const {
SharedLock g(ctx_->lock);
auto val = ctx_->entries.find(key);
const auto val = ctx_->entries.find(key);
if (val == ctx_->entries.end()) {
return sol::nil;
} else {

25
src/cpp/this_thread.h Normal file
View File

@ -0,0 +1,25 @@
#pragma once
#include <sol.hpp>
namespace effil {
struct IInterruptable;
namespace this_thread {
class ScopedSetInterruptable
{
public:
ScopedSetInterruptable(IInterruptable* notifier);
~ScopedSetInterruptable();
};
void interruptionPoint();
// Lua API
std::string threadId();
void yield();
void sleep(const sol::stack_object& duration, const sol::stack_object& metric);
} // namespace this_thread
} // namespace effil

View File

@ -81,10 +81,70 @@ void luaHook(lua_State*, lua_Debug*) {
} // namespace
namespace this_thread {
ScopedSetInterruptable::ScopedSetInterruptable(IInterruptable* notifier) {
if (thisThreadHandle) {
thisThreadHandle->setNotifier(notifier);
}
}
ScopedSetInterruptable::~ScopedSetInterruptable() {
if (thisThreadHandle) {
thisThreadHandle->setNotifier(nullptr);
}
}
void interruptionPoint() {
if (thisThreadHandle && thisThreadHandle->command() == Command::Cancel)
{
thisThreadHandle->changeStatus(Status::Canceled);
throw LuaHookStopException();
}
}
std::string threadId() {
std::stringstream ss;
ss << std::this_thread::get_id();
return ss.str();
}
void yield() {
luaHook(nullptr, nullptr);
std::this_thread::yield();
}
void sleep(const sol::stack_object& duration, const sol::stack_object& metric) {
if (duration.valid()) {
REQUIRE(duration.get_type() == sol::type::number)
<< "bad argument #1 to 'effil.sleep' (number expected, got "
<< luaTypename(duration) << ")";
if (metric.valid())
{
REQUIRE(metric.get_type() == sol::type::string)
<< "bad argument #2 to 'effil.sleep' (string expected, got "
<< luaTypename(metric) << ")";
}
try {
Notifier notifier;
notifier.waitFor(fromLuaTime(duration.as<int>(),
metric.as<sol::optional<std::string>>()));
} RETHROW_WITH_PREFIX("effil.sleep");
}
else {
yield();
}
}
} // namespace this_thread
ThreadHandle::ThreadHandle()
: status_(Status::Running)
, command_(Command::Run)
, lua_(std::make_unique<sol::state>()) {
, currNotifier_(nullptr)
, lua_(std::make_unique<sol::state>())
{
luaL_openlibs(*lua_);
}
@ -165,32 +225,6 @@ void Thread::runThread(Thread thread,
}
}
std::string threadId() {
std::stringstream ss;
ss << std::this_thread::get_id();
return ss.str();
}
void yield() {
if (thisThreadHandle)
luaHook(nullptr, nullptr);
std::this_thread::yield();
}
void sleep(const sol::stack_object& duration, const sol::stack_object& metric) {
if (duration.valid()) {
REQUIRE(duration.get_type() == sol::type::number) << "bad argument #1 to 'effil.sleep' (number expected, got " << luaTypename(duration) << ")";
if (metric.valid())
REQUIRE(metric.get_type() == sol::type::string) << "bad argument #2 to 'effil.sleep' (string expected, got " << luaTypename(metric) << ")";
try {
std::this_thread::sleep_for(fromLuaTime(duration.as<int>(), metric.valid() ? metric.as<std::string>() : sol::optional<std::string>()));
} RETHROW_WITH_PREFIX("effil.sleep");
}
else {
yield();
}
}
void Thread::initialize(
const std::string& path,
const std::string& cpath,
@ -198,6 +232,7 @@ void Thread::initialize(
const sol::function& function,
const sol::variadic_args& variadicArgs)
{
sol::optional<Function> functionObj;
try {
functionObj = GC::instance().create<Function>(function);
@ -282,6 +317,7 @@ bool Thread::cancel(const sol::this_state&,
const sol::optional<int>& duration,
const sol::optional<std::string>& period) {
ctx_->putCommand(Command::Cancel);
ctx_->interrupt();
Status status = ctx_->waitForStatusChange(toOptionalTime(duration, period));
return isFinishStatus(status);
}

View File

@ -8,11 +8,6 @@
namespace effil {
// Lua this thread API
std::string threadId();
void yield();
void sleep(const sol::stack_object& duration, const sol::stack_object& metric);
class ThreadHandle : public GCData {
public:
enum class Status {
@ -71,10 +66,20 @@ public:
void destroyLua() { lua_.reset(); }
Status status() { return status_; }
Status status() const { return status_; }
StoredArray& result() { return result_; }
void setNotifier(IInterruptable* notifier) {
currNotifier_ = notifier;
}
void interrupt() const {
IInterruptable* currNotifier = currNotifier_;
if (currNotifier)
currNotifier->interrupt();
}
private:
Status status_;
Command command_;
@ -83,7 +88,7 @@ private:
Notifier completionNotifier_;
std::mutex stateLock_;
StoredArray result_;
IInterruptable* currNotifier_;
std::unique_ptr<sol::state> lua_;
};

View File

@ -16,6 +16,7 @@ require "type"
require "gc"
require "channel"
require "thread"
require "thread-interrupt"
require "shared-table"
require "metatable"
require "type_mismatch"

View File

@ -0,0 +1,74 @@
require "bootstrap-tests"
local effil = effil
test.thread_interrupt.tear_down = default_tear_down
local function interruption_test(worker)
local state = effil.table { stop = false }
local ctx = effil.thread(worker)
ctx.step = 0
local thr = ctx(state)
effil.sleep(500, 'ms') -- let thread starts
local start_time = os.time()
thr:cancel(1)
test.equal(thr:status(), "canceled")
test.almost_equal(os.time(), start_time, 1)
state.stop = true
end
local get_thread_for_test = function(state)
local runner = effil.thread(function()
while not state.stop do end
end)
runner.step = 0
return runner()
end
test.thread_interrupt.thread_wait = function()
interruption_test(function(state)
get_thread_for_test(state):wait()
end)
end
test.thread_interrupt.thread_get = function()
interruption_test(function(state)
get_thread_for_test(state):get()
end)
end
test.thread_interrupt.thread_cancel = function()
interruption_test(function(state)
get_thread_for_test(state):cancel()
end)
end
test.thread_interrupt.thread_pause = function()
interruption_test(function(state)
get_thread_for_test(state):pause()
end)
end
test.thread_interrupt.channel_pop = function()
interruption_test(function()
effil.channel():pop()
end)
end
test.thread_interrupt.sleep = function()
interruption_test(function()
effil.sleep(20)
end)
end
test.thread_interrupt.yield = function()
interruption_test(function()
while true do
effil.yield()
end
end)
end

View File

@ -244,14 +244,6 @@ test.thread.returns = function ()
test.equal(returns[5](11, 89), 100)
end
test.thread.timed_cancel = function ()
local thread = effil.thread(function()
effil.sleep(4)
end)()
test.is_false(thread:cancel(100, "ms"))
thread:wait()
end
test.thread_with_table.tear_down = default_tear_down
test.thread_with_table.types = function ()
@ -322,26 +314,6 @@ test.this_thread.functions = function ()
test.not_equal(share["child.id"], effil.thread_id())
end
test.this_thread.cancel_with_yield = function ()
local share = effil.table()
local spec = effil.thread(function (share)
effil.sleep(1)
for i=1,10000 do
-- Just waiting
end
share.done = true
effil.yield()
share.afet_yield = true
end)
spec.step = 0
local thr = spec(share)
test.is_true(thr:cancel())
test.equal(thr:status(), "canceled")
test.is_true(share.done)
test.is_nil(share.afet_yield)
end
test.this_thread.pause_with_yield = function ()
local share = effil.table({stop = false})
local spec = effil.thread(function (share)