From c09adc8115ea45f4e74176230ee2a2bf1c916284 Mon Sep 17 00:00:00 2001 From: Ilia Date: Tue, 4 Apr 2017 15:48:38 +0300 Subject: [PATCH] Brand new threading API (#32) Brand new threading API --- src/cpp/lua-helpers.h | 29 +++ src/cpp/lua-module.cpp | 25 ++- src/cpp/notifier.h | 49 +++++ src/cpp/shared-table.cpp | 2 +- src/cpp/stored-object.cpp | 11 + src/cpp/stored-object.h | 1 + src/cpp/threading.cpp | 448 ++++++++++++++++++++++---------------- src/cpp/threading.h | 95 +++----- src/cpp/utils.h | 11 +- src/lua/effil.lua | 32 ++- tests/cpp/notifier.cpp | 80 +++++++ tests/cpp/test-utils.h | 5 +- tests/lua/test_utils.lua | 1 - tests/lua/thread.lua | 238 +++++++++++++------- 14 files changed, 676 insertions(+), 351 deletions(-) create mode 100644 src/cpp/lua-helpers.h create mode 100644 src/cpp/notifier.h create mode 100644 tests/cpp/notifier.cpp diff --git a/src/cpp/lua-helpers.h b/src/cpp/lua-helpers.h new file mode 100644 index 0000000..8b8361c --- /dev/null +++ b/src/cpp/lua-helpers.h @@ -0,0 +1,29 @@ +#pragma once + +#include "utils.h" +#include + +namespace effil { + +// TODO: make function more reliable +// string.dump can be changed by user +// TODO: Add cache for each state +inline std::string dumpFunction(const sol::function& f) { + sol::state_view lua(f.lua_state()); + sol::function dumper = lua["string"]["dump"]; + REQUIRE(dumper.valid() && dumper.get_type() == sol::type::function) + << "Invalid string.dump() in state"; + return dumper(f); +} + +// TODO: make function more reliable +// loadstring can be changed by user +// TODO: Add cache for each state +inline sol::function loadString(const sol::state_view& lua, const std::string& str) { + sol::function loader = lua["loadstring"]; + REQUIRE(loader.valid() && loader.get_type() == sol::type::function) + << "Invalid loadstring function"; + return loader(str); +} + +} // namespace effil \ No newline at end of file diff --git a/src/cpp/lua-module.cpp b/src/cpp/lua-module.cpp index c31e560..0d5789c 100644 --- a/src/cpp/lua-module.cpp +++ b/src/cpp/lua-module.cpp @@ -8,8 +8,14 @@ using namespace effil; namespace { -sol::object createThreadFactory(sol::this_state lua, const sol::function& func) { - return sol::make_object(lua, std::make_unique(func)); +sol::object createThread(const sol::this_state& lua, + const std::string& path, + const std::string& cpath, + bool stepwise, + unsigned int step, + const sol::function& function, + const sol::variadic_args& args) { + return sol::make_object(lua, std::make_unique(path, cpath, stepwise, step, function, args)); } sol::object createTable(sol::this_state lua) { return sol::make_object(lua, getGC().create()); } @@ -18,15 +24,14 @@ sol::object createTable(sol::this_state lua) { return sol::make_object(lua, getG extern "C" int luaopen_libeffil(lua_State* L) { sol::state_view lua(L); - effil::LuaThread::getUserType(lua); + Thread::getUserType(lua); SharedTable::getUserType(lua); - ThreadFactory::getUserType(lua); - sol::table public_api = lua.create_table_with("thread", createThreadFactory, // - "thread_id", threadId, // - "sleep", sleep, // - "yield", yield, // - "table", createTable // + sol::table publicApi = lua.create_table_with("thread", createThread, + "thread_id", threadId, + "sleep", sleep, + "yield", yield, + "table", createTable ); - sol::stack::push(lua, public_api); + sol::stack::push(lua, publicApi); return 1; } diff --git a/src/cpp/notifier.h b/src/cpp/notifier.h new file mode 100644 index 0000000..997867e --- /dev/null +++ b/src/cpp/notifier.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include + +namespace effil { + +class Notifier { +public: + Notifier() + : notified_(false) { + } + + void notify() { + std::unique_lock lock(mutex_); + notified_ = true; + cv_.notify_all(); + } + + void wait() { + std::unique_lock lock(mutex_); + while (!notified_) + cv_.wait(lock); + } + + template + bool waitFor(T period) { + if (period == std::chrono::seconds(0) || notified_) + return notified_; + + std::unique_lock lock(mutex_); + while (cv_.wait_for(lock, period) != std::cv_status::timeout && !notified_); + return notified_; + } + + void reset() { + notified_ = false; + } + +private: + bool notified_; + std::mutex mutex_; + std::condition_variable cv_; + +private: + Notifier(Notifier& ) = delete; +}; + +} // namespace effil \ No newline at end of file diff --git a/src/cpp/shared-table.cpp b/src/cpp/shared-table.cpp index f39950d..bbce185 100644 --- a/src/cpp/shared-table.cpp +++ b/src/cpp/shared-table.cpp @@ -16,7 +16,7 @@ SharedTable::SharedTable(const SharedTable& init) , data_(init.data_) {} sol::object SharedTable::getUserType(sol::state_view& lua) { - static sol::usertype type("new", sol::no_constructor, // + sol::usertype type("new", sol::no_constructor, // sol::meta_function::new_index, &SharedTable::luaSet, // sol::meta_function::index, &SharedTable::luaGet, // sol::meta_function::length, &SharedTable::length, // diff --git a/src/cpp/stored-object.cpp b/src/cpp/stored-object.cpp index c7ed13d..c4fd65a 100644 --- a/src/cpp/stored-object.cpp +++ b/src/cpp/stored-object.cpp @@ -13,6 +13,12 @@ namespace effil { namespace { +class NilHolder : public BaseHolder { +public: + bool rawCompare(const BaseHolder*) const noexcept final { return true; } + sol::object unpack(sol::this_state) const final { return sol::nil; } +}; + template class PrimitiveHolder : public BaseHolder { public: @@ -130,6 +136,7 @@ template StoredObject fromSolObject(const SolObject& luaObject) { switch (luaObject.get_type()) { case sol::type::nil: + return std::make_unique(); break; case sol::type::boolean: return std::make_unique>(luaObject); @@ -170,6 +177,10 @@ StoredObject createStoredObject(const std::string& value) { return std::make_unique>(value); } +StoredObject createStoredObject(const char* value) { + return std::make_unique>(value); +} + StoredObject createStoredObject(const sol::object& object) { return fromSolObject(object); } StoredObject createStoredObject(const sol::stack_object& object) { return fromSolObject(object); } diff --git a/src/cpp/stored-object.h b/src/cpp/stored-object.h index cc852f3..f551bdb 100644 --- a/src/cpp/stored-object.h +++ b/src/cpp/stored-object.h @@ -36,6 +36,7 @@ struct StoredObjectLess { StoredObject createStoredObject(bool); StoredObject createStoredObject(double); StoredObject createStoredObject(const std::string&); +StoredObject createStoredObject(const char*); StoredObject createStoredObject(GCObjectHandle); StoredObject createStoredObject(const sol::object&); StoredObject createStoredObject(const sol::stack_object&); diff --git a/src/cpp/threading.cpp b/src/cpp/threading.cpp index 7033287..cf86559 100644 --- a/src/cpp/threading.cpp +++ b/src/cpp/threading.cpp @@ -1,11 +1,171 @@ #include "threading.h" -#include "stored-object.h" +#include "stored-object.h" +#include "notifier.h" +#include "spin-mutex.h" +#include "lua-helpers.h" #include "utils.h" +#include +#include + namespace effil { -class LuaHookStopException : public std::exception {}; +namespace { + +// Doesn't inherit std::exception +// to prevent from catching this exception third party lua C++ libs +class LuaHookStopException {}; + +enum class Status { + Running, + Paused, + Canceled, + Completed, + Failed +}; + +std::string statusToString(Status status) { + switch (status) { + case Status::Running: + return "running"; + case Status::Paused: + return "paused"; + case Status::Canceled: + return "canceled"; + case Status::Completed: + return "completed"; + case Status::Failed: + return "failed"; + } + assert(false); + return "unknown"; +} + +enum class Command { + Run, + Cancel, + Pause +}; + +} // namespace + + +class ThreadHandle { +public: + const bool managed; + + sol::state lua; + Status status; + StoredObject result; + + Notifier completion; + // on thread resume + Notifier pause; + // used only with sync pause + Notifier syncPause; + +public: + ThreadHandle(bool isManaged) + : managed(isManaged) + , status(Status::Running) + , command_(Command::Run) { + luaL_openlibs(lua); + } + + Command command() const { return command_; } + + void command(Command cmd) { + std::lock_guard lock(commandMutex_); + if (command_ == Command::Cancel) + return; + command_ = cmd; + } + +private: + SpinMutex commandMutex_; + Command command_; +}; + +namespace { + +static thread_local ThreadHandle* thisThreadHandle = nullptr; + +void luaHook(lua_State*, lua_Debug*) { + assert(thisThreadHandle); + switch (thisThreadHandle->command()) { + case Command::Run: + break; + case Command::Cancel: + throw LuaHookStopException(); + case Command::Pause: { + thisThreadHandle->status = Status::Paused; + thisThreadHandle->syncPause.notify(); + thisThreadHandle->pause.wait(); + if (thisThreadHandle->command() == Command::Run) + thisThreadHandle->status = Status::Running; + else + throw LuaHookStopException(); + break; + } + } +} + +class ScopeGuard { +public: + ScopeGuard(const std::function& f) + : f_(f) { + } + + ~ScopeGuard() { + f_(); + } + +private: + std::function f_; +}; + +void runThread(std::shared_ptr handle, + const std::string &strFunction, + std::vector &&arguments) { + + ScopeGuard reportComplete([=](){ + DEBUG << "Finished " << std::endl; + handle->completion.notify(); + }); + + assert(handle); + thisThreadHandle = handle.get(); + + try { + sol::function userFuncObj = loadString(handle->lua, strFunction); + sol::object result = userFuncObj(sol::as_args(arguments)); + handle->result = createStoredObject(result); + + handle->status = Status::Completed; + } catch (const LuaHookStopException&) { + handle->status = Status::Canceled; + } catch (const sol::error& err) { + DEBUG << "Failed with msg: " << err.what() << std::endl; + handle->result = createStoredObject(err.what()); + handle->status = Status::Failed; + } +} + +std::chrono::milliseconds fromLuaTime(int duration, const sol::optional& period) { + using namespace std::chrono; + + REQUIRE(duration >= 0) << "Invalid duration interval: " << duration; + + std::string metric = period ? period.value() : "s"; + if (metric == "ms") return milliseconds(duration); + else if (metric == "s") return seconds(duration); + else if (metric == "m") return minutes(duration); + else throw sol::error("invalid time identification: " + metric); +} + +} // namespace + std::string threadId() { std::stringstream ss; @@ -15,218 +175,136 @@ std::string threadId() { void yield() { std::this_thread::yield(); } -void sleep(int64_t time, sol::optional period) { - std::string metric = period ? period.value() : "s"; - if (metric == "ms") - std::this_thread::sleep_for(std::chrono::milliseconds(time)); - else if (metric == "s") - std::this_thread::sleep_for(std::chrono::seconds(time)); - else if (metric == "m") - std::this_thread::sleep_for(std::chrono::minutes(time)); +void sleep(const sol::optional& duration, const sol::optional& period) { + if (duration) + std::this_thread::sleep_for(fromLuaTime(*duration, period)); else - throw sol::error("invalid time identificator: " + metric); + yield(); } -thread_local LuaThread::ThreadData* LuaThread::pThreadLocalData = NULL; +Thread::Thread(const std::string& path, + const std::string& cpath, + bool managed, + unsigned int step, + const sol::function& function, + const sol::variadic_args& variadicArgs) + : handle_(std::make_shared(managed)) { -// class LuaThread + handle_->lua["package"]["path"] = path; + handle_->lua["package"]["cpath"] = cpath; + handle_->lua.script("require 'effil'"); -LuaThread::LuaThread(std::shared_ptr threadData, const std::string& function, - const sol::variadic_args& args) { - pThreadData_ = threadData; - assert(pThreadData_); - pThreadData_->command = ThreadCommand::Nothing; - pThreadData_->status = ThreadStatus::Running; + if (managed) + lua_sethook(handle_->lua, luaHook, LUA_MASKCOUNT, step); + + std::string strFunction = dumpFunction(function); std::vector arguments; - for (const auto& iter : args) { - StoredObject store = createStoredObject(iter.get()); - arguments.push_back(store->unpack(sol::this_state{pThreadData_->luaState})); + for (const auto& arg : variadicArgs) { + StoredObject store = createStoredObject(arg.get()); + arguments.push_back(store->unpack(sol::this_state{handle_->lua})); } - pThread_.reset(new std::thread(&LuaThread::work, pThreadData_, function, std::move(arguments))); - assert(pThread_); - pThread_->detach(); + + std::thread thr(&runThread, + handle_, + strFunction, + std::move(arguments)); + DEBUG << "Created " << thr.get_id() << std::endl; + thr.detach(); } -void LuaThread::luaHook(lua_State*, lua_Debug*) { - if (pThreadLocalData) { - switch (pThreadLocalData->command) { - case ThreadCommand::Pause: { - pThreadLocalData->status = ThreadStatus::Paused; - ThreadCommand cmd = pThreadLocalData->command; - while (cmd == ThreadCommand::Pause) { - std::this_thread::yield(); - cmd = pThreadLocalData->command; - } - assert(cmd != ThreadCommand::Nothing); - if (cmd == ThreadCommand::Resume) { - pThreadLocalData->status = ThreadStatus::Running; - break; // Just go out of the function - } else { /* HOOK_STOP - do nothing and go to the next case */ - } - } - case ThreadCommand::Cancel: - throw LuaHookStopException(); - default: - case ThreadCommand::Nothing: - break; - } - } -} +sol::object Thread::getUserType(sol::state_view& lua) { + sol::usertype type( + "new", sol::no_constructor, + "get", &Thread::get, + "wait", &Thread::wait, + "cancel", &Thread::cancel, + "pause", &Thread::pause, + "resume", &Thread::resume, + "status", &Thread::status); -void LuaThread::work(std::shared_ptr threadData, const std::string strFunction, - std::vector&& arguments) { - try { - pThreadLocalData = threadData.get(); - assert(threadData); - const sol::object& stringLoader = threadData->luaState["loadstring"]; - REQUIRE(stringLoader.valid() && stringLoader.get_type() == sol::type::function) - << "Invalid loadstring function"; - sol::function userFuncObj = static_cast(stringLoader)(strFunction); - sol::function_result results = userFuncObj(sol::as_args(arguments)); - (void)results; // TODO: try to avoid use of useless sol::function_result here - sol::variadic_args args(threadData->luaState, -lua_gettop(threadData->luaState)); - for (const auto& iter : args) { - StoredObject store = createStoredObject(iter.get()); - threadData->results.emplace_back(std::move(store)); - } - threadData->status = ThreadStatus::Completed; - } catch (const LuaHookStopException&) { - threadData->status = ThreadStatus::Canceled; - } catch (const sol::error& err) { - threadData->status = ThreadStatus::Failed; - sol::stack::push(threadData->luaState, err.what()); - StoredObject store = createStoredObject(sol::stack::pop(threadData->luaState)); - threadData->results.emplace_back(std::move(store)); - } -} - -void LuaThread::cancel() { pThreadData_->command = ThreadCommand::Cancel; } - -void LuaThread::pause() { pThreadData_->command = ThreadCommand::Pause; } - -void LuaThread::resume() { pThreadData_->command = ThreadCommand::Resume; } - -std::tuple LuaThread::wait(sol::this_state state) const { - - ThreadStatus stat = pThreadData_->status; - while (stat == ThreadStatus::Running) { - std::this_thread::yield(); - stat = pThreadData_->status; - } - sol::table returns = sol::state_view(state).create_table(); - if (stat == ThreadStatus::Completed) { - for (const StoredObject& obj : pThreadData_->results) { - returns.add(obj->unpack(state)); - } - } - return std::make_tuple(sol::make_object(state, threadStatusToString(stat)), std::move(returns)); -} - -std::string LuaThread::threadStatusToString(ThreadStatus stat) const { - switch (stat) { - case ThreadStatus::Running: - return "running"; - case ThreadStatus::Paused: - return "paused"; - case ThreadStatus::Canceled: - return "canceled"; - case ThreadStatus::Completed: - return "completed"; - case ThreadStatus::Failed: - return "failed"; - } - assert(false); - return "unknown"; -} - -std::string LuaThread::status() const { return threadStatusToString(pThreadData_->status); } - -sol::object LuaThread::getUserType(sol::state_view& lua) { - static sol::usertype type("new", sol::no_constructor, // - "cancel", &LuaThread::cancel, // - "pause", &LuaThread::pause, // - "resume", &LuaThread::resume, // - "status", &LuaThread::status, // - "wait", &LuaThread::wait); sol::stack::push(lua, type); return sol::stack::pop(lua); } -// class ThreadFactory +std::pair Thread::status(const sol::this_state& lua) { + sol::object luaStatus = sol::make_object(lua, statusToString(handle_->status)); -ThreadFactory::ThreadFactory(const sol::function& func) - : stepwise_(false) - , step_(100U) { - sol::state_view lua(func.lua_state()); - const sol::object& dumper = lua["string"]["dump"]; - REQUIRE(dumper.valid() && dumper.get_type() == sol::type::function) << "Unable to get string.dump()"; - strFunction_ = static_cast(dumper)(func); - - // Inherit all pathes from parent state by default - packagePath_ = lua["package"]["path"].get(); - packageCPath_ = lua["package"]["cpath"].get(); + if (handle_->status == Status::Failed) { + assert(handle_->result); + return std::make_pair(luaStatus, handle_->result->unpack(lua)); + } else { + return std::make_pair(luaStatus, sol::nil); + } } -std::unique_ptr ThreadFactory::runThread(const sol::variadic_args& args) { - std::shared_ptr threadData = std::make_shared(); - assert(threadData.get()); - threadData->luaState.open_libraries(sol::lib::base, sol::lib::string, sol::lib::package, sol::lib::io, - sol::lib::os); - - if (stepwise_) - lua_sethook(threadData->luaState, LuaThread::luaHook, LUA_MASKCOUNT, step_); - - threadData->luaState["package"]["path"] = packagePath_; - threadData->luaState["package"]["cpath"] = packageCPath_; - - // Inherit all pathes from parent state - effil::LuaThread::getUserType(threadData->luaState); - effil::ThreadFactory::getUserType(threadData->luaState); - effil::SharedTable::getUserType(threadData->luaState); - - return std::make_unique(threadData, strFunction_, args); +bool Thread::waitFor(const sol::optional& duration, + const sol::optional& period) { + if (!duration) { // sync version + handle_->completion.wait(); + return true; + } else { // async version + return handle_->completion.waitFor(fromLuaTime(*duration, period)); + } } -bool ThreadFactory::stepwise(const sol::optional& value) { - bool ret = stepwise_; - if (value) - stepwise_ = value.value(); - return ret; +std::pair Thread::wait(const sol::this_state& lua, + const sol::optional& duration, + const sol::optional& period) { + waitFor(duration, period); + return status(lua); } -unsigned int ThreadFactory::step(const sol::optional& value) { - bool ret = step_; - if (value) - step_ = value.value(); - return ret; +sol::object Thread::get(const sol::this_state& lua, + const sol::optional& duration, + const sol::optional& period) { + bool completed = waitFor(duration, period); + + if (completed && handle_->status == Status::Completed) + return handle_->result->unpack(lua); + else + return sol::nil; } -std::string ThreadFactory::packagePath(const sol::optional& value) { - std::string& ret = packagePath_; - if (value) - packagePath_ = value.value(); - return ret; +bool Thread::cancel(const sol::this_state&, + const sol::optional& duration, + const sol::optional& period) { + REQUIRE(handle_->managed) << "Unable to cancel: unmanaged thread"; + + handle_->command(Command::Cancel); + handle_->pause.notify(); + + if (handle_->status == Status::Running) { + return waitFor(duration, period); + } else { + handle_->completion.wait(); + return true; + } } -std::string ThreadFactory::packageCPath(const sol::optional& value) { - std::string& ret = packageCPath_; - if (value) - packageCPath_ = value.value(); - return ret; +bool Thread::pause(const sol::this_state&, + const sol::optional& duration, + const sol::optional& period) { + REQUIRE(handle_->managed) << "Unable to pause: unmanaged thread"; + + handle_->pause.reset(); + handle_->command(Command::Pause); + + if (!duration) { // sync wait + handle_->syncPause.wait(); + return true; + } else { // async wait + return handle_->syncPause.waitFor(fromLuaTime(*duration, period)); + } } -sol::object ThreadFactory::getUserType(sol::state_view& lua) { - static sol::usertype type("new", sol::no_constructor, // - sol::meta_function::call, &ThreadFactory::runThread, // - "stepwise", &ThreadFactory::stepwise, // - "step", &ThreadFactory::step, // - "package_path", &ThreadFactory::packagePath, // - "package_cpath", &ThreadFactory::packageCPath // - ); - sol::stack::push(lua, type); - return sol::stack::pop(lua); +void Thread::resume() { + REQUIRE(handle_->managed) << "Unable to resume: unmanaged thread"; + + handle_->command(Command::Run); + handle_->syncPause.reset(); + handle_->pause.notify(); } } // effil diff --git a/src/cpp/threading.h b/src/cpp/threading.h index 5bae0bd..1fead13 100644 --- a/src/cpp/threading.h +++ b/src/cpp/threading.h @@ -1,85 +1,52 @@ #pragma once -#include "shared-table.h" - -#include -#include -#include +#include namespace effil { // Lua this thread API std::string threadId(); void yield(); -void sleep(int64_t, sol::optional); +void sleep(const sol::optional&, const sol::optional&); -class LuaThread { +class ThreadHandle; + +class Thread { public: - enum class ThreadStatus { - Running = 1, - Paused, - Canceled, - Completed, - Failed, - }; + Thread(const std::string& path, + const std::string& cpath, + bool managed, + unsigned int step, + const sol::function& function, + const sol::variadic_args& args); - enum class ThreadCommand { - Nothing = 1, - Cancel, - Pause, - Resume, - }; - - struct ThreadData { - sol::state luaState; - std::atomic status; - std::atomic command; - std::vector results; - }; - - LuaThread(std::shared_ptr threadData, const std::string& function, const sol::variadic_args& args); static sol::object getUserType(sol::state_view& lua); - static void luaHook(lua_State*, lua_Debug*); - /* Public lua methods*/ - void cancel(); - void pause(); + std::pair status(const sol::this_state& state); + std::pair wait(const sol::this_state& state, + const sol::optional& duration, + const sol::optional& period); + sol::object get(const sol::this_state& state, + const sol::optional& duration, + const sol::optional& period); + bool cancel(const sol::this_state& state, + const sol::optional& duration, + const sol::optional& period); + bool pause(const sol::this_state&, + const sol::optional& duration, + const sol::optional& period); void resume(); - std::string status() const; - std::tuple wait(sol::this_state state) const; private: - LuaThread(const LuaThread&) = delete; - LuaThread& operator=(const LuaThread&) = delete; - - std::string threadStatusToString(ThreadStatus stat) const; - static void work(std::shared_ptr threadData, const std::string strFunction, - std::vector&& arguments); - - std::shared_ptr pThreadData_; - std::shared_ptr pThread_; - - static thread_local LuaThread::ThreadData* pThreadLocalData; -}; - -class ThreadFactory { -public: - ThreadFactory(const sol::function& func); - static sol::object getUserType(sol::state_view& lua); - - /* Public lua methods*/ - std::unique_ptr runThread(const sol::variadic_args& args); - bool stepwise(const sol::optional&); - unsigned int step(const sol::optional&); - std::string packagePath(const sol::optional&); - std::string packageCPath(const sol::optional&); + std::shared_ptr handle_; private: - std::string strFunction_; - bool stepwise_; - unsigned int step_; - std::string packagePath_; - std::string packageCPath_; + bool waitFor(const sol::optional& duration, + const sol::optional& period); + +private: + Thread(const Thread&) = delete; + Thread& operator=(const Thread&) = delete; }; } // effil diff --git a/src/cpp/utils.h b/src/cpp/utils.h index 5b04724..3827e58 100644 --- a/src/cpp/utils.h +++ b/src/cpp/utils.h @@ -2,6 +2,7 @@ #include #include +#include #include @@ -28,14 +29,10 @@ private: } // effil -#define REQUIRE(cond) \ - if (!cond) \ - throw effil::Exception() +#define REQUIRE(cond) if (!(cond)) throw effil::Exception() #ifdef NDEBUG -#define DEBUG \ - if (false) \ - std::cout +#define DEBUG if (false) std::cout #else -#define DEBUG std::cout +#define DEBUG std::cout << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " tid:" << std::this_thread::get_id() << " " #endif diff --git a/src/lua/effil.lua b/src/lua/effil.lua index 1dd69a9..6d06203 100644 --- a/src/lua/effil.lua +++ b/src/lua/effil.lua @@ -4,12 +4,40 @@ local function detect_native_lib_ext() if string.find(home, "/Users/") then return "dylib" end if string.find(home, "/home/") then return "so" end -- TODO: unable to detect os - -- how to reportabout error + -- Unix, is it you? return "so" end package.cpath = package.cpath .. ";./?." .. detect_native_lib_ext() -local api = require 'libeffil' +local capi = require 'libeffil' +local api = { + version = "0.1.0", + table = capi.table, + thread_id = capi.thread_id, + sleep = capi.sleep, + yield = capi.yield +} + +local function run_thread(config, f, ...) + return capi.thread(config.path, config.cpath, config.managed, config.step, f, ...) +end + +-- Creates thread runner with given function +-- configurable parameters: +-- path - lua modules search path in child thread +-- cpath - lua libs search path in child thread +-- stepwise - is thread resumable +-- step - who fast reacte on state changing +-- __call - run thread, can be invoked multiple times +api.thread = function (f) + local thread_config = { + path = package.path, + cpath = package.cpath, + managed = true, + step = 200 } + setmetatable(thread_config, {__call = function(c, ...) return run_thread(c, f, ...) end}) + return thread_config +end return api \ No newline at end of file diff --git a/tests/cpp/notifier.cpp b/tests/cpp/notifier.cpp new file mode 100644 index 0000000..2a548bc --- /dev/null +++ b/tests/cpp/notifier.cpp @@ -0,0 +1,80 @@ +#include + +#include "notifier.h" + +#include +#include + +using namespace effil; + +TEST(notifier, wait) { + Notifier n; + bool done = false; + auto t = std::thread([&]{ + done = true; + n.notify(); + }); + + n.wait(); + EXPECT_TRUE(done); + t.join(); +} + +TEST(notifier, waitMany) { + const size_t nfutures = 32; + std::vector vt; + std::atomic counter(0); + Notifier n; + + for(size_t i = 0; i < nfutures; i++) + vt.emplace_back(std::thread([&]{ + n.wait(); + counter++; + })); + + EXPECT_EQ(counter.load(), (size_t)0); + + n.notify(); + for(auto& t : vt) t.join(); + EXPECT_EQ(counter.load(), nfutures); +} + +TEST(notifier, waitFor) { + Notifier n; + auto t = std::thread([&] { + std::this_thread::sleep_for(std::chrono::seconds(2)); + n.notify(); + }); + + EXPECT_FALSE(n.waitFor(std::chrono::seconds(1))); + EXPECT_TRUE(n.waitFor(std::chrono::seconds(2))); + t.join(); +} + +TEST(notifier, reset) { + const size_t iterations = 1024; + Notifier readyToProcess; + Notifier needNew; + size_t resource = 0; + + std::thread producer([&]() { + for (size_t i = 0; i < iterations; i++) { + resource++; + readyToProcess.notify(); + needNew.wait(); + needNew.reset(); + } + }); + + std::thread consumer([&](){ + for (size_t i = 0; i < iterations; i++) { + readyToProcess.wait(); + readyToProcess.reset(); + EXPECT_EQ(resource, i + 1); + needNew.notify(); + } + }); + + producer.join(); + consumer.join(); +} \ No newline at end of file diff --git a/tests/cpp/test-utils.h b/tests/cpp/test-utils.h index 65c2cde..d2d7118 100644 --- a/tests/cpp/test-utils.h +++ b/tests/cpp/test-utils.h @@ -1,13 +1,14 @@ #pragma once #include "shared-table.h" +#include "lua-helpers.h" #include namespace effil { inline void bootstrapState(sol::state& lua) { - lua.open_libraries(sol::lib::base, sol::lib::string, sol::lib::table); + luaL_openlibs(lua); SharedTable::getUserType(lua); } -} // namespace +} // namespace effil diff --git a/tests/lua/test_utils.lua b/tests/lua/test_utils.lua index 39930ec..6c7c178 100644 --- a/tests/lua/test_utils.lua +++ b/tests/lua/test_utils.lua @@ -45,6 +45,5 @@ function sleep(timeInSec, silent) end function tearDown() - log "TearDown() collect garbage" collectgarbage() end diff --git a/tests/lua/thread.lua b/tests/lua/thread.lua index 2d8fa2b..5eaf1af 100644 --- a/tests/lua/thread.lua +++ b/tests/lua/thread.lua @@ -1,140 +1,219 @@ +local effil = require 'effil' + TestThread = {tearDown = tearDown } -function TestThread:testThreadCancel() - local effil = require 'effil' +function TestThread:testWait() + local thread = effil.thread(function() print 'Effil is not that tower' end)() + local status = thread:wait() + test.assertNil(thread:get()) + test.assertEquals(status, "completed") + test.assertEquals(thread:status(), "completed") +end + +function TestThread:testMultipleWaitGet() + local thread = effil.thread(function() return "test value" end)() + local status1 = thread:wait() + local status2 = thread:wait() + test.assertEquals(status1, "completed") + test.assertEquals(status2, status1) + + local value = thread:get() + test.assertEquals(value, "test value") +end + +function TestThread:testTimedGet() + local thread = effil.thread(function() + require('effil').sleep(2) + return "-_-" + end)() + test.assertNil(thread:get(1)) + test.assertEquals(thread:get(2), "-_-") +end + +function TestThread:testTimedWait() + local thread = effil.thread(function() + require('effil').sleep(2) + return 8 + end)() + + local status = thread:wait(1) + test.assertEquals(status, "running") + + local value = thread:get(2, "s") + test.assertEquals(value, 8); + + test.assertEquals(thread:status(), "completed") +end + +function TestThread:testAsyncWait() + local thread = effil.thread( function() + require('effil').sleep(1) + end)() + + local iter = 0 + while thread:wait(0) == "running" do + iter = iter + 1 + end + + test.assertTrue(iter > 10) + test.assertEquals(thread:status(), "completed") +end + +function TestThread:testDetached() + local st = effil.table() + + for i = 1, 32 do + effil.thread(function(st, index) + st[index] = index + end)(st, i) + end + + -- here all thead temporary objects have to be destroyed + collectgarbage() + effil.sleep(1) + + for i = 1, 32 do + test.assertEquals(st[i], i) + end +end + +function TestThread:testCancel() + local thread = effil.thread(function() + while true do end + end)() + + test.assertTrue(thread:cancel()) + test.assertEquals(thread:status(), "canceled") +end + +function TestThread:testAsyncCancel() local thread_runner = effil.thread( function() local startTime = os.time() while ( (os.time() - startTime) <= 10) do --[[ Just sleep ]] end end ) - test.assertFalse(thread_runner:stepwise(true)) + local thread = thread_runner() sleep(2) -- let thread starts working - thread:cancel() + thread:cancel(0) test.assertTrue(wait(2, function() return thread:status() ~= 'running' end)) test.assertEquals(thread:status(), 'canceled') end -function TestThread:testThreadPauseAndResume() - local effil = require 'effil' +function TestThread:testPauseResumeCancel() local data = effil.table() data.value = 0 - local thread_runner = effil.thread( + local thread = effil.thread( function(data) while true do data.value = data.value + 1 end end - ) - test.assertFalse(thread_runner:stepwise(true)) - - local thread = thread_runner(data) + )(data) test.assertTrue(wait(2, function() return data.value > 100 end)) - thread:pause() - test.assertTrue(wait(2, function() return thread:status() == "paused" end)) + test.assertTrue(thread:pause()) + test.assertEquals(thread:status(), "paused") + local savedValue = data.value - sleep(3) + sleep(1) test.assertEquals(data.value, savedValue) thread:resume() test.assertTrue(wait(5, function() return (data.value - savedValue) > 100 end)) - thread:cancel() - thread:wait() + test.assertTrue(thread:cancel()) end -function TestThread:testThreadPauseAndStop() - local effil = require 'effil' - log "Create thread" +function TestThread:testPauseCancel() local data = effil.table() data.value = 0 - local thread_runner = effil.thread( + local thread = effil.thread( function(data) while true do data.value = data.value + 1 end end - ) - test.assertFalse(thread_runner:stepwise(true)) + )(data) - local thread = thread_runner(data) test.assertTrue(wait(2, function() return data.value > 100 end)) - thread:pause() + thread:pause(0) test.assertTrue(wait(2, function() return thread:status() == "paused" end)) local savedValue = data.value - sleep(3) + sleep(1) test.assertEquals(data.value, savedValue) - thread:cancel() - test.assertTrue(wait(2, function() return thread:status() == "canceled" end)) - thread:wait() + test.assertTrue(thread:cancel(0)) end -function TestThread:testThreadPauseAndStop() - local effil = require 'effil' - log "Create thread" +function TestThread:testAsyncPauseResumeCancel() local data = effil.table() data.value = 0 - local thread_runner = effil.thread( + local thread = effil.thread( function(data) while true do data.value = data.value + 1 end end - ) - test.assertFalse(thread_runner:stepwise(true)) + )(data) - local thread = thread_runner(data) test.assertTrue(wait(2, function() return data.value > 100 end)) thread:pause() - test.assertTrue(wait(2, function() return thread:status() == "paused" end)) + local savedValue = data.value - sleep(3) + sleep(1) test.assertEquals(data.value, savedValue) - thread:cancel() - test.assertTrue(wait(2, function() return thread:status() == "canceled" end)) + thread:resume() + test.assertTrue(wait(5, function() return (data.value - savedValue) > 100 end)) + + thread:cancel(0) + test.assertTrue(wait(5, function() return thread:status() == "canceled" end)) +end + +function TestThread:testTimedCancel() + local thread = effil.thread(function() + require("effil").sleep(2) + end)() + test.assertFalse(thread:cancel(1)) thread:wait() end -function TestThread:testCheckThreadReturns() - local effil = require 'effil' - local share = effil.table() - share.value = "some value" - - local thread_factory = effil.thread( - function(share) - return 100500, "string value", true, share, function(a,b) return a + b end - end - ) - local thread = thread_factory(share) - local status, returns = thread:wait() - - log "Check values" - test.assertEquals(status, "completed") - - test.assertNumber(returns[1]) - test.assertEquals(returns[1], 100500) - - test.assertString(returns[2]) - test.assertEquals(returns[2], "string value") - - test.assertBoolean(returns[3]) - test.assertTrue(returns[3]) - - test.assertUserdata(returns[4]) - test.assertEquals(returns[4].value, share.value) - - test.assertFunction(returns[5]) - test.assertEquals(returns[5](11, 89), 100) -end - - +--function TestThread:testCheckThreadReturns() +-- local effil = require 'effil' +-- local share = effil.table() +-- share.value = "some value" +-- +-- local thread_factory = effil.thread( +-- function(share) +-- return 100500, "string value", true, share, function(a,b) return a + b end +-- end +-- ) +-- local thread = thread_factory(share) +-- local status, returns = thread:get() +-- +-- log "Check values" +-- test.assertEquals(status, "completed") +-- +-- test.assertNumber(returns[1]) +-- test.assertEquals(returns[1], 100500) +-- +-- test.assertString(returns[2]) +-- test.assertEquals(returns[2], "string value") +-- +-- test.assertBoolean(returns[3]) +-- test.assertTrue(returns[3]) +-- +-- test.assertUserdata(returns[4]) +-- test.assertEquals(returns[4].value, share.value) +-- +-- test.assertFunction(returns[5]) +-- test.assertEquals(returns[5](11, 89), 100) +--end +-- TestThreadWithTable = {tearDown = tearDown } function TestThreadWithTable:testSharedTableTypes() - local effil = require 'effil' local share = effil.table() share["number"] = 100500 @@ -161,7 +240,6 @@ function TestThreadWithTable:testSharedTableTypes() end function TestThreadWithTable:testRecursiveTables() - local effil = require 'effil' local share = effil.table() local magic_number = 42 @@ -187,24 +265,26 @@ function TestThreadWithTable:testRecursiveTables() test.assertEquals(share["magic_number"], nil) end -function TestThreadWithTable:testThisThreadFunctions() - local effil = require 'effil' +TestThisThread = {tearDown = tearDown } + +function TestThisThread:testThisThreadFunctions() local share = effil.table() local thread_factory = effil.thread( function(share) - share["child.id"] = require('libeffil').thread_id() + share["child.id"] = require('effil').thread_id() end ) local thread = thread_factory(share) - thread:wait() + thread:get() - log "Check values" test.assertString(share["child.id"]) test.assertNumber(tonumber(share["child.id"])) test.assertNotEquals(share["child.id"], effil.thread_id()) effil.yield() -- just call it +end +function TestThisThread:testTime() local function check_time(real_time, use_time, metric) local start_time = os.time() effil.sleep(use_time, metric)