Brand new threading API (#32)

Brand new threading API
This commit is contained in:
Ilia 2017-04-04 15:48:38 +03:00 committed by mihacooper
parent 2d61d1d9df
commit c09adc8115
14 changed files with 676 additions and 351 deletions

29
src/cpp/lua-helpers.h Normal file
View File

@ -0,0 +1,29 @@
#pragma once
#include "utils.h"
#include <sol.hpp>
namespace effil {
// TODO: make function more reliable
// string.dump can be changed by user
// TODO: Add cache for each state
inline std::string dumpFunction(const sol::function& f) {
sol::state_view lua(f.lua_state());
sol::function dumper = lua["string"]["dump"];
REQUIRE(dumper.valid() && dumper.get_type() == sol::type::function)
<< "Invalid string.dump() in state";
return dumper(f);
}
// TODO: make function more reliable
// loadstring can be changed by user
// TODO: Add cache for each state
inline sol::function loadString(const sol::state_view& lua, const std::string& str) {
sol::function loader = lua["loadstring"];
REQUIRE(loader.valid() && loader.get_type() == sol::type::function)
<< "Invalid loadstring function";
return loader(str);
}
} // namespace effil

View File

@ -8,8 +8,14 @@ using namespace effil;
namespace { namespace {
sol::object createThreadFactory(sol::this_state lua, const sol::function& func) { sol::object createThread(const sol::this_state& lua,
return sol::make_object(lua, std::make_unique<ThreadFactory>(func)); const std::string& path,
const std::string& cpath,
bool stepwise,
unsigned int step,
const sol::function& function,
const sol::variadic_args& args) {
return sol::make_object(lua, std::make_unique<Thread>(path, cpath, stepwise, step, function, args));
} }
sol::object createTable(sol::this_state lua) { return sol::make_object(lua, getGC().create<SharedTable>()); } sol::object createTable(sol::this_state lua) { return sol::make_object(lua, getGC().create<SharedTable>()); }
@ -18,15 +24,14 @@ sol::object createTable(sol::this_state lua) { return sol::make_object(lua, getG
extern "C" int luaopen_libeffil(lua_State* L) { extern "C" int luaopen_libeffil(lua_State* L) {
sol::state_view lua(L); sol::state_view lua(L);
effil::LuaThread::getUserType(lua); Thread::getUserType(lua);
SharedTable::getUserType(lua); SharedTable::getUserType(lua);
ThreadFactory::getUserType(lua); sol::table publicApi = lua.create_table_with("thread", createThread,
sol::table public_api = lua.create_table_with("thread", createThreadFactory, // "thread_id", threadId,
"thread_id", threadId, // "sleep", sleep,
"sleep", sleep, // "yield", yield,
"yield", yield, // "table", createTable
"table", createTable //
); );
sol::stack::push(lua, public_api); sol::stack::push(lua, publicApi);
return 1; return 1;
} }

49
src/cpp/notifier.h Normal file
View File

@ -0,0 +1,49 @@
#pragma once
#include <mutex>
#include <condition_variable>
namespace effil {
class Notifier {
public:
Notifier()
: notified_(false) {
}
void notify() {
std::unique_lock<std::mutex> lock(mutex_);
notified_ = true;
cv_.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(mutex_);
while (!notified_)
cv_.wait(lock);
}
template <typename T>
bool waitFor(T period) {
if (period == std::chrono::seconds(0) || notified_)
return notified_;
std::unique_lock<std::mutex> lock(mutex_);
while (cv_.wait_for(lock, period) != std::cv_status::timeout && !notified_);
return notified_;
}
void reset() {
notified_ = false;
}
private:
bool notified_;
std::mutex mutex_;
std::condition_variable cv_;
private:
Notifier(Notifier& ) = delete;
};
} // namespace effil

View File

@ -16,7 +16,7 @@ SharedTable::SharedTable(const SharedTable& init)
, data_(init.data_) {} , data_(init.data_) {}
sol::object SharedTable::getUserType(sol::state_view& lua) { sol::object SharedTable::getUserType(sol::state_view& lua) {
static sol::usertype<SharedTable> type("new", sol::no_constructor, // sol::usertype<SharedTable> type("new", sol::no_constructor, //
sol::meta_function::new_index, &SharedTable::luaSet, // sol::meta_function::new_index, &SharedTable::luaSet, //
sol::meta_function::index, &SharedTable::luaGet, // sol::meta_function::index, &SharedTable::luaGet, //
sol::meta_function::length, &SharedTable::length, // sol::meta_function::length, &SharedTable::length, //

View File

@ -13,6 +13,12 @@ namespace effil {
namespace { namespace {
class NilHolder : public BaseHolder {
public:
bool rawCompare(const BaseHolder*) const noexcept final { return true; }
sol::object unpack(sol::this_state) const final { return sol::nil; }
};
template <typename StoredType> template <typename StoredType>
class PrimitiveHolder : public BaseHolder { class PrimitiveHolder : public BaseHolder {
public: public:
@ -130,6 +136,7 @@ template <typename SolObject>
StoredObject fromSolObject(const SolObject& luaObject) { StoredObject fromSolObject(const SolObject& luaObject) {
switch (luaObject.get_type()) { switch (luaObject.get_type()) {
case sol::type::nil: case sol::type::nil:
return std::make_unique<NilHolder>();
break; break;
case sol::type::boolean: case sol::type::boolean:
return std::make_unique<PrimitiveHolder<bool>>(luaObject); return std::make_unique<PrimitiveHolder<bool>>(luaObject);
@ -170,6 +177,10 @@ StoredObject createStoredObject(const std::string& value) {
return std::make_unique<PrimitiveHolder<std::string>>(value); return std::make_unique<PrimitiveHolder<std::string>>(value);
} }
StoredObject createStoredObject(const char* value) {
return std::make_unique<PrimitiveHolder<std::string>>(value);
}
StoredObject createStoredObject(const sol::object& object) { return fromSolObject(object); } StoredObject createStoredObject(const sol::object& object) { return fromSolObject(object); }
StoredObject createStoredObject(const sol::stack_object& object) { return fromSolObject(object); } StoredObject createStoredObject(const sol::stack_object& object) { return fromSolObject(object); }

View File

@ -36,6 +36,7 @@ struct StoredObjectLess {
StoredObject createStoredObject(bool); StoredObject createStoredObject(bool);
StoredObject createStoredObject(double); StoredObject createStoredObject(double);
StoredObject createStoredObject(const std::string&); StoredObject createStoredObject(const std::string&);
StoredObject createStoredObject(const char*);
StoredObject createStoredObject(GCObjectHandle); StoredObject createStoredObject(GCObjectHandle);
StoredObject createStoredObject(const sol::object&); StoredObject createStoredObject(const sol::object&);
StoredObject createStoredObject(const sol::stack_object&); StoredObject createStoredObject(const sol::stack_object&);

View File

@ -1,11 +1,171 @@
#include "threading.h" #include "threading.h"
#include "stored-object.h"
#include "stored-object.h"
#include "notifier.h"
#include "spin-mutex.h"
#include "lua-helpers.h"
#include "utils.h" #include "utils.h"
#include <thread>
#include <sstream>
namespace effil { namespace effil {
class LuaHookStopException : public std::exception {}; namespace {
// Doesn't inherit std::exception
// to prevent from catching this exception third party lua C++ libs
class LuaHookStopException {};
enum class Status {
Running,
Paused,
Canceled,
Completed,
Failed
};
std::string statusToString(Status status) {
switch (status) {
case Status::Running:
return "running";
case Status::Paused:
return "paused";
case Status::Canceled:
return "canceled";
case Status::Completed:
return "completed";
case Status::Failed:
return "failed";
}
assert(false);
return "unknown";
}
enum class Command {
Run,
Cancel,
Pause
};
} // namespace
class ThreadHandle {
public:
const bool managed;
sol::state lua;
Status status;
StoredObject result;
Notifier completion;
// on thread resume
Notifier pause;
// used only with sync pause
Notifier syncPause;
public:
ThreadHandle(bool isManaged)
: managed(isManaged)
, status(Status::Running)
, command_(Command::Run) {
luaL_openlibs(lua);
}
Command command() const { return command_; }
void command(Command cmd) {
std::lock_guard<SpinMutex> lock(commandMutex_);
if (command_ == Command::Cancel)
return;
command_ = cmd;
}
private:
SpinMutex commandMutex_;
Command command_;
};
namespace {
static thread_local ThreadHandle* thisThreadHandle = nullptr;
void luaHook(lua_State*, lua_Debug*) {
assert(thisThreadHandle);
switch (thisThreadHandle->command()) {
case Command::Run:
break;
case Command::Cancel:
throw LuaHookStopException();
case Command::Pause: {
thisThreadHandle->status = Status::Paused;
thisThreadHandle->syncPause.notify();
thisThreadHandle->pause.wait();
if (thisThreadHandle->command() == Command::Run)
thisThreadHandle->status = Status::Running;
else
throw LuaHookStopException();
break;
}
}
}
class ScopeGuard {
public:
ScopeGuard(const std::function<void()>& f)
: f_(f) {
}
~ScopeGuard() {
f_();
}
private:
std::function<void()> f_;
};
void runThread(std::shared_ptr<ThreadHandle> handle,
const std::string &strFunction,
std::vector<sol::object> &&arguments) {
ScopeGuard reportComplete([=](){
DEBUG << "Finished " << std::endl;
handle->completion.notify();
});
assert(handle);
thisThreadHandle = handle.get();
try {
sol::function userFuncObj = loadString(handle->lua, strFunction);
sol::object result = userFuncObj(sol::as_args(arguments));
handle->result = createStoredObject(result);
handle->status = Status::Completed;
} catch (const LuaHookStopException&) {
handle->status = Status::Canceled;
} catch (const sol::error& err) {
DEBUG << "Failed with msg: " << err.what() << std::endl;
handle->result = createStoredObject(err.what());
handle->status = Status::Failed;
}
}
std::chrono::milliseconds fromLuaTime(int duration, const sol::optional<std::string>& period) {
using namespace std::chrono;
REQUIRE(duration >= 0) << "Invalid duration interval: " << duration;
std::string metric = period ? period.value() : "s";
if (metric == "ms") return milliseconds(duration);
else if (metric == "s") return seconds(duration);
else if (metric == "m") return minutes(duration);
else throw sol::error("invalid time identification: " + metric);
}
} // namespace
std::string threadId() { std::string threadId() {
std::stringstream ss; std::stringstream ss;
@ -15,218 +175,136 @@ std::string threadId() {
void yield() { std::this_thread::yield(); } void yield() { std::this_thread::yield(); }
void sleep(int64_t time, sol::optional<std::string> period) { void sleep(const sol::optional<int>& duration, const sol::optional<std::string>& period) {
std::string metric = period ? period.value() : "s"; if (duration)
if (metric == "ms") std::this_thread::sleep_for(fromLuaTime(*duration, period));
std::this_thread::sleep_for(std::chrono::milliseconds(time));
else if (metric == "s")
std::this_thread::sleep_for(std::chrono::seconds(time));
else if (metric == "m")
std::this_thread::sleep_for(std::chrono::minutes(time));
else else
throw sol::error("invalid time identificator: " + metric); yield();
} }
thread_local LuaThread::ThreadData* LuaThread::pThreadLocalData = NULL; Thread::Thread(const std::string& path,
const std::string& cpath,
bool managed,
unsigned int step,
const sol::function& function,
const sol::variadic_args& variadicArgs)
: handle_(std::make_shared<ThreadHandle>(managed)) {
// class LuaThread handle_->lua["package"]["path"] = path;
handle_->lua["package"]["cpath"] = cpath;
handle_->lua.script("require 'effil'");
LuaThread::LuaThread(std::shared_ptr<ThreadData> threadData, const std::string& function, if (managed)
const sol::variadic_args& args) { lua_sethook(handle_->lua, luaHook, LUA_MASKCOUNT, step);
pThreadData_ = threadData;
assert(pThreadData_); std::string strFunction = dumpFunction(function);
pThreadData_->command = ThreadCommand::Nothing;
pThreadData_->status = ThreadStatus::Running;
std::vector<sol::object> arguments; std::vector<sol::object> arguments;
for (const auto& iter : args) { for (const auto& arg : variadicArgs) {
StoredObject store = createStoredObject(iter.get<sol::object>()); StoredObject store = createStoredObject(arg.get<sol::object>());
arguments.push_back(store->unpack(sol::this_state{pThreadData_->luaState})); arguments.push_back(store->unpack(sol::this_state{handle_->lua}));
} }
pThread_.reset(new std::thread(&LuaThread::work, pThreadData_, function, std::move(arguments)));
assert(pThread_); std::thread thr(&runThread,
pThread_->detach(); handle_,
strFunction,
std::move(arguments));
DEBUG << "Created " << thr.get_id() << std::endl;
thr.detach();
} }
void LuaThread::luaHook(lua_State*, lua_Debug*) { sol::object Thread::getUserType(sol::state_view& lua) {
if (pThreadLocalData) { sol::usertype<Thread> type(
switch (pThreadLocalData->command) { "new", sol::no_constructor,
case ThreadCommand::Pause: { "get", &Thread::get,
pThreadLocalData->status = ThreadStatus::Paused; "wait", &Thread::wait,
ThreadCommand cmd = pThreadLocalData->command; "cancel", &Thread::cancel,
while (cmd == ThreadCommand::Pause) { "pause", &Thread::pause,
std::this_thread::yield(); "resume", &Thread::resume,
cmd = pThreadLocalData->command; "status", &Thread::status);
}
assert(cmd != ThreadCommand::Nothing);
if (cmd == ThreadCommand::Resume) {
pThreadLocalData->status = ThreadStatus::Running;
break; // Just go out of the function
} else { /* HOOK_STOP - do nothing and go to the next case */
}
}
case ThreadCommand::Cancel:
throw LuaHookStopException();
default:
case ThreadCommand::Nothing:
break;
}
}
}
void LuaThread::work(std::shared_ptr<ThreadData> threadData, const std::string strFunction,
std::vector<sol::object>&& arguments) {
try {
pThreadLocalData = threadData.get();
assert(threadData);
const sol::object& stringLoader = threadData->luaState["loadstring"];
REQUIRE(stringLoader.valid() && stringLoader.get_type() == sol::type::function)
<< "Invalid loadstring function";
sol::function userFuncObj = static_cast<const sol::function&>(stringLoader)(strFunction);
sol::function_result results = userFuncObj(sol::as_args(arguments));
(void)results; // TODO: try to avoid use of useless sol::function_result here
sol::variadic_args args(threadData->luaState, -lua_gettop(threadData->luaState));
for (const auto& iter : args) {
StoredObject store = createStoredObject(iter.get<sol::object>());
threadData->results.emplace_back(std::move(store));
}
threadData->status = ThreadStatus::Completed;
} catch (const LuaHookStopException&) {
threadData->status = ThreadStatus::Canceled;
} catch (const sol::error& err) {
threadData->status = ThreadStatus::Failed;
sol::stack::push(threadData->luaState, err.what());
StoredObject store = createStoredObject(sol::stack::pop<sol::object>(threadData->luaState));
threadData->results.emplace_back(std::move(store));
}
}
void LuaThread::cancel() { pThreadData_->command = ThreadCommand::Cancel; }
void LuaThread::pause() { pThreadData_->command = ThreadCommand::Pause; }
void LuaThread::resume() { pThreadData_->command = ThreadCommand::Resume; }
std::tuple<sol::object, sol::table> LuaThread::wait(sol::this_state state) const {
ThreadStatus stat = pThreadData_->status;
while (stat == ThreadStatus::Running) {
std::this_thread::yield();
stat = pThreadData_->status;
}
sol::table returns = sol::state_view(state).create_table();
if (stat == ThreadStatus::Completed) {
for (const StoredObject& obj : pThreadData_->results) {
returns.add(obj->unpack(state));
}
}
return std::make_tuple(sol::make_object(state, threadStatusToString(stat)), std::move(returns));
}
std::string LuaThread::threadStatusToString(ThreadStatus stat) const {
switch (stat) {
case ThreadStatus::Running:
return "running";
case ThreadStatus::Paused:
return "paused";
case ThreadStatus::Canceled:
return "canceled";
case ThreadStatus::Completed:
return "completed";
case ThreadStatus::Failed:
return "failed";
}
assert(false);
return "unknown";
}
std::string LuaThread::status() const { return threadStatusToString(pThreadData_->status); }
sol::object LuaThread::getUserType(sol::state_view& lua) {
static sol::usertype<LuaThread> type("new", sol::no_constructor, //
"cancel", &LuaThread::cancel, //
"pause", &LuaThread::pause, //
"resume", &LuaThread::resume, //
"status", &LuaThread::status, //
"wait", &LuaThread::wait);
sol::stack::push(lua, type); sol::stack::push(lua, type);
return sol::stack::pop<sol::object>(lua); return sol::stack::pop<sol::object>(lua);
} }
// class ThreadFactory std::pair<sol::object, sol::object> Thread::status(const sol::this_state& lua) {
sol::object luaStatus = sol::make_object(lua, statusToString(handle_->status));
ThreadFactory::ThreadFactory(const sol::function& func) if (handle_->status == Status::Failed) {
: stepwise_(false) assert(handle_->result);
, step_(100U) { return std::make_pair(luaStatus, handle_->result->unpack(lua));
sol::state_view lua(func.lua_state()); } else {
const sol::object& dumper = lua["string"]["dump"]; return std::make_pair(luaStatus, sol::nil);
REQUIRE(dumper.valid() && dumper.get_type() == sol::type::function) << "Unable to get string.dump()"; }
strFunction_ = static_cast<const sol::function&>(dumper)(func);
// Inherit all pathes from parent state by default
packagePath_ = lua["package"]["path"].get<std::string>();
packageCPath_ = lua["package"]["cpath"].get<std::string>();
} }
std::unique_ptr<LuaThread> ThreadFactory::runThread(const sol::variadic_args& args) { bool Thread::waitFor(const sol::optional<int>& duration,
std::shared_ptr<LuaThread::ThreadData> threadData = std::make_shared<LuaThread::ThreadData>(); const sol::optional<std::string>& period) {
assert(threadData.get()); if (!duration) { // sync version
threadData->luaState.open_libraries(sol::lib::base, sol::lib::string, sol::lib::package, sol::lib::io, handle_->completion.wait();
sol::lib::os); return true;
} else { // async version
if (stepwise_) return handle_->completion.waitFor(fromLuaTime(*duration, period));
lua_sethook(threadData->luaState, LuaThread::luaHook, LUA_MASKCOUNT, step_); }
threadData->luaState["package"]["path"] = packagePath_;
threadData->luaState["package"]["cpath"] = packageCPath_;
// Inherit all pathes from parent state
effil::LuaThread::getUserType(threadData->luaState);
effil::ThreadFactory::getUserType(threadData->luaState);
effil::SharedTable::getUserType(threadData->luaState);
return std::make_unique<LuaThread>(threadData, strFunction_, args);
} }
bool ThreadFactory::stepwise(const sol::optional<bool>& value) { std::pair<sol::object, sol::object> Thread::wait(const sol::this_state& lua,
bool ret = stepwise_; const sol::optional<int>& duration,
if (value) const sol::optional<std::string>& period) {
stepwise_ = value.value(); waitFor(duration, period);
return ret; return status(lua);
} }
unsigned int ThreadFactory::step(const sol::optional<unsigned int>& value) { sol::object Thread::get(const sol::this_state& lua,
bool ret = step_; const sol::optional<int>& duration,
if (value) const sol::optional<std::string>& period) {
step_ = value.value(); bool completed = waitFor(duration, period);
return ret;
if (completed && handle_->status == Status::Completed)
return handle_->result->unpack(lua);
else
return sol::nil;
} }
std::string ThreadFactory::packagePath(const sol::optional<std::string>& value) { bool Thread::cancel(const sol::this_state&,
std::string& ret = packagePath_; const sol::optional<int>& duration,
if (value) const sol::optional<std::string>& period) {
packagePath_ = value.value(); REQUIRE(handle_->managed) << "Unable to cancel: unmanaged thread";
return ret;
handle_->command(Command::Cancel);
handle_->pause.notify();
if (handle_->status == Status::Running) {
return waitFor(duration, period);
} else {
handle_->completion.wait();
return true;
}
} }
std::string ThreadFactory::packageCPath(const sol::optional<std::string>& value) { bool Thread::pause(const sol::this_state&,
std::string& ret = packageCPath_; const sol::optional<int>& duration,
if (value) const sol::optional<std::string>& period) {
packageCPath_ = value.value(); REQUIRE(handle_->managed) << "Unable to pause: unmanaged thread";
return ret;
handle_->pause.reset();
handle_->command(Command::Pause);
if (!duration) { // sync wait
handle_->syncPause.wait();
return true;
} else { // async wait
return handle_->syncPause.waitFor(fromLuaTime(*duration, period));
}
} }
sol::object ThreadFactory::getUserType(sol::state_view& lua) { void Thread::resume() {
static sol::usertype<ThreadFactory> type("new", sol::no_constructor, // REQUIRE(handle_->managed) << "Unable to resume: unmanaged thread";
sol::meta_function::call, &ThreadFactory::runThread, //
"stepwise", &ThreadFactory::stepwise, // handle_->command(Command::Run);
"step", &ThreadFactory::step, // handle_->syncPause.reset();
"package_path", &ThreadFactory::packagePath, // handle_->pause.notify();
"package_cpath", &ThreadFactory::packageCPath //
);
sol::stack::push(lua, type);
return sol::stack::pop<sol::object>(lua);
} }
} // effil } // effil

View File

@ -1,85 +1,52 @@
#pragma once #pragma once
#include "shared-table.h" #include <sol.hpp>
#include <iostream>
#include <sstream>
#include <thread>
namespace effil { namespace effil {
// Lua this thread API // Lua this thread API
std::string threadId(); std::string threadId();
void yield(); void yield();
void sleep(int64_t, sol::optional<std::string>); void sleep(const sol::optional<int>&, const sol::optional<std::string>&);
class LuaThread { class ThreadHandle;
class Thread {
public: public:
enum class ThreadStatus { Thread(const std::string& path,
Running = 1, const std::string& cpath,
Paused, bool managed,
Canceled, unsigned int step,
Completed, const sol::function& function,
Failed, const sol::variadic_args& args);
};
enum class ThreadCommand {
Nothing = 1,
Cancel,
Pause,
Resume,
};
struct ThreadData {
sol::state luaState;
std::atomic<ThreadStatus> status;
std::atomic<ThreadCommand> command;
std::vector<StoredObject> results;
};
LuaThread(std::shared_ptr<ThreadData> threadData, const std::string& function, const sol::variadic_args& args);
static sol::object getUserType(sol::state_view& lua); static sol::object getUserType(sol::state_view& lua);
static void luaHook(lua_State*, lua_Debug*);
/* Public lua methods*/ std::pair<sol::object, sol::object> status(const sol::this_state& state);
void cancel(); std::pair<sol::object, sol::object> wait(const sol::this_state& state,
void pause(); const sol::optional<int>& duration,
const sol::optional<std::string>& period);
sol::object get(const sol::this_state& state,
const sol::optional<int>& duration,
const sol::optional<std::string>& period);
bool cancel(const sol::this_state& state,
const sol::optional<int>& duration,
const sol::optional<std::string>& period);
bool pause(const sol::this_state&,
const sol::optional<int>& duration,
const sol::optional<std::string>& period);
void resume(); void resume();
std::string status() const;
std::tuple<sol::object, sol::table> wait(sol::this_state state) const;
private: private:
LuaThread(const LuaThread&) = delete; std::shared_ptr<ThreadHandle> handle_;
LuaThread& operator=(const LuaThread&) = delete;
std::string threadStatusToString(ThreadStatus stat) const;
static void work(std::shared_ptr<ThreadData> threadData, const std::string strFunction,
std::vector<sol::object>&& arguments);
std::shared_ptr<ThreadData> pThreadData_;
std::shared_ptr<std::thread> pThread_;
static thread_local LuaThread::ThreadData* pThreadLocalData;
};
class ThreadFactory {
public:
ThreadFactory(const sol::function& func);
static sol::object getUserType(sol::state_view& lua);
/* Public lua methods*/
std::unique_ptr<LuaThread> runThread(const sol::variadic_args& args);
bool stepwise(const sol::optional<bool>&);
unsigned int step(const sol::optional<unsigned int>&);
std::string packagePath(const sol::optional<std::string>&);
std::string packageCPath(const sol::optional<std::string>&);
private: private:
std::string strFunction_; bool waitFor(const sol::optional<int>& duration,
bool stepwise_; const sol::optional<std::string>& period);
unsigned int step_;
std::string packagePath_; private:
std::string packageCPath_; Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;
}; };
} // effil } // effil

View File

@ -2,6 +2,7 @@
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <thread>
#include <sol.hpp> #include <sol.hpp>
@ -28,14 +29,10 @@ private:
} // effil } // effil
#define REQUIRE(cond) \ #define REQUIRE(cond) if (!(cond)) throw effil::Exception()
if (!cond) \
throw effil::Exception()
#ifdef NDEBUG #ifdef NDEBUG
#define DEBUG \ #define DEBUG if (false) std::cout
if (false) \
std::cout
#else #else
#define DEBUG std::cout #define DEBUG std::cout << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " tid:" << std::this_thread::get_id() << " "
#endif #endif

View File

@ -4,12 +4,40 @@ local function detect_native_lib_ext()
if string.find(home, "/Users/") then return "dylib" end if string.find(home, "/Users/") then return "dylib" end
if string.find(home, "/home/") then return "so" end if string.find(home, "/home/") then return "so" end
-- TODO: unable to detect os -- TODO: unable to detect os
-- how to reportabout error -- Unix, is it you?
return "so" return "so"
end end
package.cpath = package.cpath .. ";./?." .. detect_native_lib_ext() package.cpath = package.cpath .. ";./?." .. detect_native_lib_ext()
local api = require 'libeffil' local capi = require 'libeffil'
local api = {
version = "0.1.0",
table = capi.table,
thread_id = capi.thread_id,
sleep = capi.sleep,
yield = capi.yield
}
local function run_thread(config, f, ...)
return capi.thread(config.path, config.cpath, config.managed, config.step, f, ...)
end
-- Creates thread runner with given function
-- configurable parameters:
-- path - lua modules search path in child thread
-- cpath - lua libs search path in child thread
-- stepwise - is thread resumable
-- step - who fast reacte on state changing
-- __call - run thread, can be invoked multiple times
api.thread = function (f)
local thread_config = {
path = package.path,
cpath = package.cpath,
managed = true,
step = 200 }
setmetatable(thread_config, {__call = function(c, ...) return run_thread(c, f, ...) end})
return thread_config
end
return api return api

80
tests/cpp/notifier.cpp Normal file
View File

@ -0,0 +1,80 @@
#include <gtest/gtest.h>
#include "notifier.h"
#include <thread>
#include <atomic>
using namespace effil;
TEST(notifier, wait) {
Notifier n;
bool done = false;
auto t = std::thread([&]{
done = true;
n.notify();
});
n.wait();
EXPECT_TRUE(done);
t.join();
}
TEST(notifier, waitMany) {
const size_t nfutures = 32;
std::vector<std::thread> vt;
std::atomic<size_t> counter(0);
Notifier n;
for(size_t i = 0; i < nfutures; i++)
vt.emplace_back(std::thread([&]{
n.wait();
counter++;
}));
EXPECT_EQ(counter.load(), (size_t)0);
n.notify();
for(auto& t : vt) t.join();
EXPECT_EQ(counter.load(), nfutures);
}
TEST(notifier, waitFor) {
Notifier n;
auto t = std::thread([&] {
std::this_thread::sleep_for(std::chrono::seconds(2));
n.notify();
});
EXPECT_FALSE(n.waitFor(std::chrono::seconds(1)));
EXPECT_TRUE(n.waitFor(std::chrono::seconds(2)));
t.join();
}
TEST(notifier, reset) {
const size_t iterations = 1024;
Notifier readyToProcess;
Notifier needNew;
size_t resource = 0;
std::thread producer([&]() {
for (size_t i = 0; i < iterations; i++) {
resource++;
readyToProcess.notify();
needNew.wait();
needNew.reset();
}
});
std::thread consumer([&](){
for (size_t i = 0; i < iterations; i++) {
readyToProcess.wait();
readyToProcess.reset();
EXPECT_EQ(resource, i + 1);
needNew.notify();
}
});
producer.join();
consumer.join();
}

View File

@ -1,13 +1,14 @@
#pragma once #pragma once
#include "shared-table.h" #include "shared-table.h"
#include "lua-helpers.h"
#include <sol.hpp> #include <sol.hpp>
namespace effil { namespace effil {
inline void bootstrapState(sol::state& lua) { inline void bootstrapState(sol::state& lua) {
lua.open_libraries(sol::lib::base, sol::lib::string, sol::lib::table); luaL_openlibs(lua);
SharedTable::getUserType(lua); SharedTable::getUserType(lua);
} }
} // namespace } // namespace effil

View File

@ -45,6 +45,5 @@ function sleep(timeInSec, silent)
end end
function tearDown() function tearDown()
log "TearDown() collect garbage"
collectgarbage() collectgarbage()
end end

View File

@ -1,140 +1,219 @@
local effil = require 'effil'
TestThread = {tearDown = tearDown } TestThread = {tearDown = tearDown }
function TestThread:testThreadCancel() function TestThread:testWait()
local effil = require 'effil' local thread = effil.thread(function() print 'Effil is not that tower' end)()
local status = thread:wait()
test.assertNil(thread:get())
test.assertEquals(status, "completed")
test.assertEquals(thread:status(), "completed")
end
function TestThread:testMultipleWaitGet()
local thread = effil.thread(function() return "test value" end)()
local status1 = thread:wait()
local status2 = thread:wait()
test.assertEquals(status1, "completed")
test.assertEquals(status2, status1)
local value = thread:get()
test.assertEquals(value, "test value")
end
function TestThread:testTimedGet()
local thread = effil.thread(function()
require('effil').sleep(2)
return "-_-"
end)()
test.assertNil(thread:get(1))
test.assertEquals(thread:get(2), "-_-")
end
function TestThread:testTimedWait()
local thread = effil.thread(function()
require('effil').sleep(2)
return 8
end)()
local status = thread:wait(1)
test.assertEquals(status, "running")
local value = thread:get(2, "s")
test.assertEquals(value, 8);
test.assertEquals(thread:status(), "completed")
end
function TestThread:testAsyncWait()
local thread = effil.thread( function()
require('effil').sleep(1)
end)()
local iter = 0
while thread:wait(0) == "running" do
iter = iter + 1
end
test.assertTrue(iter > 10)
test.assertEquals(thread:status(), "completed")
end
function TestThread:testDetached()
local st = effil.table()
for i = 1, 32 do
effil.thread(function(st, index)
st[index] = index
end)(st, i)
end
-- here all thead temporary objects have to be destroyed
collectgarbage()
effil.sleep(1)
for i = 1, 32 do
test.assertEquals(st[i], i)
end
end
function TestThread:testCancel()
local thread = effil.thread(function()
while true do end
end)()
test.assertTrue(thread:cancel())
test.assertEquals(thread:status(), "canceled")
end
function TestThread:testAsyncCancel()
local thread_runner = effil.thread( local thread_runner = effil.thread(
function() function()
local startTime = os.time() local startTime = os.time()
while ( (os.time() - startTime) <= 10) do --[[ Just sleep ]] end while ( (os.time() - startTime) <= 10) do --[[ Just sleep ]] end
end end
) )
test.assertFalse(thread_runner:stepwise(true))
local thread = thread_runner() local thread = thread_runner()
sleep(2) -- let thread starts working sleep(2) -- let thread starts working
thread:cancel() thread:cancel(0)
test.assertTrue(wait(2, function() return thread:status() ~= 'running' end)) test.assertTrue(wait(2, function() return thread:status() ~= 'running' end))
test.assertEquals(thread:status(), 'canceled') test.assertEquals(thread:status(), 'canceled')
end end
function TestThread:testThreadPauseAndResume() function TestThread:testPauseResumeCancel()
local effil = require 'effil'
local data = effil.table() local data = effil.table()
data.value = 0 data.value = 0
local thread_runner = effil.thread( local thread = effil.thread(
function(data) function(data)
while true do while true do
data.value = data.value + 1 data.value = data.value + 1
end end
end end
) )(data)
test.assertFalse(thread_runner:stepwise(true))
local thread = thread_runner(data)
test.assertTrue(wait(2, function() return data.value > 100 end)) test.assertTrue(wait(2, function() return data.value > 100 end))
thread:pause() test.assertTrue(thread:pause())
test.assertTrue(wait(2, function() return thread:status() == "paused" end)) test.assertEquals(thread:status(), "paused")
local savedValue = data.value local savedValue = data.value
sleep(3) sleep(1)
test.assertEquals(data.value, savedValue) test.assertEquals(data.value, savedValue)
thread:resume() thread:resume()
test.assertTrue(wait(5, function() return (data.value - savedValue) > 100 end)) test.assertTrue(wait(5, function() return (data.value - savedValue) > 100 end))
thread:cancel() test.assertTrue(thread:cancel())
thread:wait()
end end
function TestThread:testThreadPauseAndStop() function TestThread:testPauseCancel()
local effil = require 'effil'
log "Create thread"
local data = effil.table() local data = effil.table()
data.value = 0 data.value = 0
local thread_runner = effil.thread( local thread = effil.thread(
function(data) function(data)
while true do while true do
data.value = data.value + 1 data.value = data.value + 1
end end
end end
) )(data)
test.assertFalse(thread_runner:stepwise(true))
local thread = thread_runner(data)
test.assertTrue(wait(2, function() return data.value > 100 end)) test.assertTrue(wait(2, function() return data.value > 100 end))
thread:pause() thread:pause(0)
test.assertTrue(wait(2, function() return thread:status() == "paused" end)) test.assertTrue(wait(2, function() return thread:status() == "paused" end))
local savedValue = data.value local savedValue = data.value
sleep(3) sleep(1)
test.assertEquals(data.value, savedValue) test.assertEquals(data.value, savedValue)
thread:cancel() test.assertTrue(thread:cancel(0))
test.assertTrue(wait(2, function() return thread:status() == "canceled" end))
thread:wait()
end end
function TestThread:testThreadPauseAndStop() function TestThread:testAsyncPauseResumeCancel()
local effil = require 'effil'
log "Create thread"
local data = effil.table() local data = effil.table()
data.value = 0 data.value = 0
local thread_runner = effil.thread( local thread = effil.thread(
function(data) function(data)
while true do while true do
data.value = data.value + 1 data.value = data.value + 1
end end
end end
) )(data)
test.assertFalse(thread_runner:stepwise(true))
local thread = thread_runner(data)
test.assertTrue(wait(2, function() return data.value > 100 end)) test.assertTrue(wait(2, function() return data.value > 100 end))
thread:pause() thread:pause()
test.assertTrue(wait(2, function() return thread:status() == "paused" end))
local savedValue = data.value local savedValue = data.value
sleep(3) sleep(1)
test.assertEquals(data.value, savedValue) test.assertEquals(data.value, savedValue)
thread:cancel() thread:resume()
test.assertTrue(wait(2, function() return thread:status() == "canceled" end)) test.assertTrue(wait(5, function() return (data.value - savedValue) > 100 end))
thread:cancel(0)
test.assertTrue(wait(5, function() return thread:status() == "canceled" end))
end
function TestThread:testTimedCancel()
local thread = effil.thread(function()
require("effil").sleep(2)
end)()
test.assertFalse(thread:cancel(1))
thread:wait() thread:wait()
end end
function TestThread:testCheckThreadReturns() --function TestThread:testCheckThreadReturns()
local effil = require 'effil' -- local effil = require 'effil'
local share = effil.table() -- local share = effil.table()
share.value = "some value" -- share.value = "some value"
--
local thread_factory = effil.thread( -- local thread_factory = effil.thread(
function(share) -- function(share)
return 100500, "string value", true, share, function(a,b) return a + b end -- return 100500, "string value", true, share, function(a,b) return a + b end
end -- end
) -- )
local thread = thread_factory(share) -- local thread = thread_factory(share)
local status, returns = thread:wait() -- local status, returns = thread:get()
--
log "Check values" -- log "Check values"
test.assertEquals(status, "completed") -- test.assertEquals(status, "completed")
--
test.assertNumber(returns[1]) -- test.assertNumber(returns[1])
test.assertEquals(returns[1], 100500) -- test.assertEquals(returns[1], 100500)
--
test.assertString(returns[2]) -- test.assertString(returns[2])
test.assertEquals(returns[2], "string value") -- test.assertEquals(returns[2], "string value")
--
test.assertBoolean(returns[3]) -- test.assertBoolean(returns[3])
test.assertTrue(returns[3]) -- test.assertTrue(returns[3])
--
test.assertUserdata(returns[4]) -- test.assertUserdata(returns[4])
test.assertEquals(returns[4].value, share.value) -- test.assertEquals(returns[4].value, share.value)
--
test.assertFunction(returns[5]) -- test.assertFunction(returns[5])
test.assertEquals(returns[5](11, 89), 100) -- test.assertEquals(returns[5](11, 89), 100)
end --end
--
TestThreadWithTable = {tearDown = tearDown } TestThreadWithTable = {tearDown = tearDown }
function TestThreadWithTable:testSharedTableTypes() function TestThreadWithTable:testSharedTableTypes()
local effil = require 'effil'
local share = effil.table() local share = effil.table()
share["number"] = 100500 share["number"] = 100500
@ -161,7 +240,6 @@ function TestThreadWithTable:testSharedTableTypes()
end end
function TestThreadWithTable:testRecursiveTables() function TestThreadWithTable:testRecursiveTables()
local effil = require 'effil'
local share = effil.table() local share = effil.table()
local magic_number = 42 local magic_number = 42
@ -187,24 +265,26 @@ function TestThreadWithTable:testRecursiveTables()
test.assertEquals(share["magic_number"], nil) test.assertEquals(share["magic_number"], nil)
end end
function TestThreadWithTable:testThisThreadFunctions() TestThisThread = {tearDown = tearDown }
local effil = require 'effil'
function TestThisThread:testThisThreadFunctions()
local share = effil.table() local share = effil.table()
local thread_factory = effil.thread( local thread_factory = effil.thread(
function(share) function(share)
share["child.id"] = require('libeffil').thread_id() share["child.id"] = require('effil').thread_id()
end end
) )
local thread = thread_factory(share) local thread = thread_factory(share)
thread:wait() thread:get()
log "Check values"
test.assertString(share["child.id"]) test.assertString(share["child.id"])
test.assertNumber(tonumber(share["child.id"])) test.assertNumber(tonumber(share["child.id"]))
test.assertNotEquals(share["child.id"], effil.thread_id()) test.assertNotEquals(share["child.id"], effil.thread_id())
effil.yield() -- just call it effil.yield() -- just call it
end
function TestThisThread:testTime()
local function check_time(real_time, use_time, metric) local function check_time(real_time, use_time, metric)
local start_time = os.time() local start_time = os.time()
effil.sleep(use_time, metric) effil.sleep(use_time, metric)