Awaitable event using the coroutine, epoll
, and eventfd
¶
commit:
ad1e682f
The note explains the detail of event
in coroutine/concrt.h
Summary¶
Look & feel of the interface via test code.
auto wait_for_one_event(event& e, atomic_flag& flag) -> no_return {
try {
// resume after the event is signaled ...
co_await e;
} catch (system_error& e) {
// event throws if there was an internal system error
FAIL(e.what());
}
flag.test_and_set();
}
TEST_CASE("wait for one event", "[event]") {
event e1{};
atomic_flag flag = ATOMIC_FLAG_INIT;
wait_for_one_event(e1, flag);
e1.set();
auto count = 0;
for (auto task : signaled_event_tasks()) {
task.resume();
++count;
}
REQUIRE(count > 0);
// already set by the coroutine `wait_for_one_event`
REQUIRE(flag.test_and_set() == true);
}
Note¶
Motivation¶
It would be convenient if there is a simple event type for co_await
operator.
Linux system's eventfd
might be able to do the work.
Requirement¶
The requirement for the event
type is simple.
- It doesn’t support copy construction/assignment
- It doesn’t support move construction/assignment
- The type can’t be inherited(
final
) - It can be an operand of
co_await
operator - The event is stateful and has 2 states.
- Signaled
- Non-signaled
- For each state, the behavior is like the following
- Non-signaled:
co_await
will suspend the coroutine and becomes resumable when the event object is signaled. If the event is already signaled, the coroutine must not suspend. - Signaled:
co_await
won’t suspend and the event object becomes non-signaled state after the statement.
- Non-signaled:
The first 3 requirement is quite strict, but it’s for simplicity. Normally it won’t be that hazardous for move operation. But moving from coroutine’s frame to another space is a tricky situation. So I’ve banned move semantics to prevent misusage like that.
Design¶
Win32 API supports internal thread pool, but Linux system API does not.
So user code has to poll those created events. Fortunately, Linux supports epoll
to allow the behavior.
We can derive 2 behavior constraint from the interface limitation.
- To acquire a list of signaled events, user code has to perform a polling operation
- Limitation from
epoll
’s use-case
- Limitation from
- User has to resume coroutines that are suspended for an event object
- Limitation from
coroutine_handle<void>
and absence of embedded thread pool/APC support
- Limitation from
This was the rough version of the interface type & function.
// Awaitable event type.
class event final : no_copy_move {
public:
using task = coroutine_handle<void>;
private:
uint64_t state; // it's lightweight !
public:
event();
~event();
void set();
bool await_ready() const;
void await_suspend(coroutine_handle<void> coro);
void await_resume();
};
// Enumerate all suspended coroutines that are waiting for signaled events.
auto signaled_event_tasks() -> coro::enumerable<event::task>;
coro::enumerable<T>
is my own implementation of thegenerator<T>
in<experimental/generator>
state
is a space for eventfd
, and signaled_event_tasks
performs polling operation on the epoll
file descriptor.
Concerns¶
You may think about why I didn’t adopt design like io_context
in Boost ASIO. Which provides an explicit point of creation and polling operation.
For instance, with Boost ASIO, user code must create objects(e.g, socket) via boost::asio::io_context
object.
// see: https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/example/cpp03/allocation/server.cpp
class server
{
private:
boost::asio::io_context& io_context_;
tcp::acceptor acceptor_;
public:
server(boost::asio::io_context& io_context, short port)
: io_context_(io_context),
acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) // <------
{
session_ptr new_session(new session(io_context_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(session_ptr new_session,
const boost::system::error_code& error)
{
if (!error)
{
new_session->start();
}
new_session.reset(new session(io_context_));
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
};
int main(int argc, char* argv[])
{
try
{
// ...
boost::asio::io_context io_context; // <------
server s(io_context, atoi(argv[1]));
io_context.run(); // <------
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
As you can see, such design enforces to use reference in construction like server
’s constructor.
Since there might be multiple instances of server in one program, this is sound and appropriate. However, event
is used in system level, and we don’t have to consider the owner of event
objects because it is always system itself.
This is why I didn’t designed some type like event_context
. Therefore, it will be enough to replace io_context.run()
to signaled_event_tasks()
.
Implementation¶
Each description is based on the actual code. In this note, I will explain with a simplified code. (skip some header, exception spec, etc.)
Wrapping epoll¶
I don't prefer writing wrapper for the system API, but I have another feature(networking) that uses it. Before the start, if you're not familiar with epoll
, I do recommend you to find some articles and read them first. (I'm sorry!)
Ok, let me start ...
The wrapper follows RAII and provides some member functions.
try_add
: add or modify givenepoll_event
usingepoll_ctl
remove
:epoll_ctl
withEPOLL_CTL_DEL
wait
: wait forepoll_event
s and allows iterate them for eachepoll_wait
#include <fcntl.h>
#include <sys/epoll.h>
#include <unistd.h>
struct event_poll_t final : no_copy_move {
int epfd;
const size_t capacity;
std::unique_ptr<epoll_event[]> events;
public:
event_poll_t() ;
~event_poll_t() ;
void try_add(uint64_t fd, epoll_event& req) ;
void remove(uint64_t fd);
auto wait(int timeout) -> coro::enumerable<epoll_event>;
};
As you can expect, it internally allocates an array to receive events from epoll_wait
event_poll_t::event_poll_t()
: epfd{-1},
// use 2 page for polling
capacity{2 * getpagesize() / sizeof(epoll_event)},
events{make_unique<epoll_event[]>(capacity)} {
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0)
throw system_error{errno, system_category(), "epoll_create1"};
}
event_poll_t::~event_poll_t() {
close(epfd);
}
With the RAII, epoll_ctl
can be wrapped with exception throwing code.
You might be able to write your own version if you hate using the exception.
void event_poll_t::try_add(uint64_t _fd, epoll_event& req) {
int op = EPOLL_CTL_ADD, ec = 0;
TRY_OP:
ec = epoll_ctl(epfd, op, _fd, &req);
if (ec == 0)
return;
if (errno == EEXIST) {
op = EPOLL_CTL_MOD; // already exists. try again with mod
goto TRY_OP;
}
// failed
throw system_error{errno, system_category(), "epoll_ctl"};
}
void event_poll_t::remove(uint64_t _fd) {
epoll_event req{}; // just prevent non-null input
auto ec = epoll_ctl(epfd, EPOLL_CTL_DEL, _fd, &req);
if (ec != 0)
throw system_error{errno, system_category(), "epoll_ctl EPOLL_CTL_DEL"};
}
It's not that complicated :)
Polling epoll¶
Since event_poll_t
is internal type, it is free to co_yield
internal type objects like epoll_event
. Type for the wait timeout uses int
instead of chrono::duration
because epoll_wait
allows negative(-1
) timeout.
auto event_poll_t::wait(int timeout)
-> coro::enumerable<epoll_event> {
auto count = epoll_wait(epfd, events.get(), capacity, timeout);
if (count == -1)
throw system_error{errno, system_category(), "epoll_wait"};
for (auto i = 0; i < count; ++i) {
co_yield events[i];
}
}
In another translation unit, signaled_event_tasks
queries the event_poll_t
and yield coroutine handles from the user data in epoll_event
.
// signaled event list
event_poll_t selist{};
auto signaled_event_tasks() -> coro::enumerable<event::task> {
event::task t{}; // it's an alias of `coroutine_handle<void>`
for (auto e : selist.wait(0)) {
// we don't care about the internal counter of eventfd.
// just receive the coroutine handle
t = event::task::from_address(e.data.ptr);
co_yield t;
}
}
Ok, now user code will invoke the function to acquire coroutines with the signaled event. Since its return type is coroutine generator, they can use for
statement like the following test code.
TEST_CASE("wait for one event", "[event]") {
// ...
for (auto task : signaled_event_tasks()) {
task.resume();
// ...
}
// ...
}
Event interface¶
Now, it's time to implement event
type. I will write some private
member functions for each of await_ready
, await_suspend
, and await_resume
.
You may ask why I'm not implementing await_*
functions directly.
Well, that's because I've met an issue that exporting await_*
functions for DLL leads to an internal compiler error. At least vc140 and vc141 did in my experience.
My approach is to export those interface functions as private
and redirecting to them using public
functions to allow co_await
statement
// note:
// _INTERFACE_ is __declspec(dllexport) or __attribute__((visibility("default")))
class event final : no_copy_move {
public:
using task = coroutine_handle<void>;
private:
uint64_t state; // <--- will explain in next section
private:
_INTERFACE_ void on_suspend(task) ;
_INTERFACE_ bool is_ready() const ;
_INTERFACE_ void on_resume() ;
public:
_INTERFACE_ event() ;
_INTERFACE_ ~event() ;
// signal the event object
_INTERFACE_ void set() ;
// ... redirect to private member functions safely ...
bool await_ready() const {
return this->is_ready();
}
void await_suspend(coroutine_handle<void> coro) {
return this->on_suspend(coro);
}
void await_resume() {
return this->on_resume();
}
};
// Enumerate all suspended coroutines that are waiting for signaled events.
_INTERFACE_
auto signaled_event_tasks() -> coro::enumerable<event::task>;
Its member functions will be explained below. Before that, let's see how the type implemented constructor and destructor. It's not complicated!
event::event() : state{} {
const auto fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (fd == -1)
throw system_error{errno, system_category(), "eventfd"};
this->state = fd; // start with unsignaled state
}
event::~event() {
// if already closed, fd == 0
if (auto fd = get_eventfd(state))
close(fd);
}
You can see a weird function, get_eventfd
.
Event's state managment¶
Just like the code above, event
's state is from eventfd
function. However, we need a piece of information to estimate the event is signaled.
class event final : no_copy_move {
private:
uint64_t state;
};
Instead of using an internal counter of eventfd
, I used a bit mask to distinguish that the event object is signaled. Please follow the comments. I wrote carefully!
//
// We are going to combine file descriptor and state bit
//
// On x86 system,
// this won't matter since `int` is 32 bit.
// we can safely use msb for state indicator.
//
// On x64 system,
// this might be a hazardous since the value of `eventfd` can be corrupted.
// **Normally** descriptor in Linux system grows from 3, so it is highly
// possible to reach system limitation before the value consumes all 63 bit.
//
constexpr uint64_t emask = 1ULL << 63;
// the msb(most significant bit) will be ...
// 1 if the fd is signaled,
// 0 on the other case
bool is_signaled(uint64_t state) {
return emask & state; // msb is 1?
}
int64_t get_eventfd(uint64_t state) {
return static_cast<int64_t>(~emask & state);
}
uint64_t make_signaled(int64_t efd) ;
Was the comment enough? With those helper functions, set
operation becomes really simple.
void event::set() {
// already signaled. nothing to do...
if (is_signaled(state))
// !!! under the race condition, this check is not safe !!!
return;
auto fd = get_eventfd(state);
state = make_signaled(fd); // if it didn't throwed
// it's signaled state from now
}
Let me remind you of one of the requirements.
- For each state, the behavior is like the following
- Non-signaled:
co_await
will suspend the coroutine and becomes resumable when the event object is signaled. If the event is already signaled, the coroutine must not suspend. - Signaled:
co_await
won’t suspend and the event object becomes non-signaled state after the statement.
- Non-signaled:
Let's just assume that our event
's fd is already registered to epoll
. To make epoll
return this event in epoll_wait
, we will use write
. And, of course, bit masking must come after the success of the operation!
uint64_t make_signaled(int64_t efd) {
// signal the eventfd...
// the message can be any value
// since the purpose of it is to trigger the epoll
// we won't care about the internal counter of the eventfd
auto sz = write(efd, &efd, sizeof(efd));
if (sz == -1)
throw system_error{errno, system_category(), "write"};
return emask | static_cast<uint64_t>(efd);
}
Remember that we flagged EFD_NONBLOCK
for eventfd
function in the constructor. It was intended :)
Event's await operations¶
The last part of the implementation is for co_await
statement. await_ready
and await_resume
is simple with the masking function.
bool event::is_ready() const {
return is_signaled(state);
}
void event::on_resume() {
// make non-signaled state
this->state = static_cast<decltype(state)>(get_eventfd(state));
}
The transition to non-signaled state is based on the requirement. And after set
member function, await_ready
will return true
and will bypass await_suspend
.
- The event is stateful and has 2 states.
- Signaled
- Non-signaled
- For each state, the behavior is like the following
- Non-signaled:
co_await
will suspend the coroutine and becomes resumable when the event object is signaled. If the event is already signaled, the coroutine must not suspend. - Signaled:
co_await
won’t suspend and the event object becomes non-signaled state after the statement.
- Non-signaled:
Now, our keystone function is like this. We declared global variable with event_poll_t
type when writing signaled_event_tasks
. When the coroutine enters await_suspend
, we have to send the coroutine_handle<void>
to the event_poll_t
.
// signaled event list.
event_poll_t selist{};
void event::on_suspend(task t) {
// just care if there was `write` for the eventfd
// when it happens, coroutine handle will be forwarded
// to `signaled_event_tasks` by epoll
epoll_event req{};
req.events = EPOLLET | EPOLLIN | EPOLLONESHOT;
req.data.ptr = t.address();
// throws if `epoll_ctl` fails
selist.try_add(get_eventfd(state), req);
}
Here, we use epoll_event
's user data to save the coroutine frame's address. Compare the code with the implementation of the signaled_event_tasks
below. It constructs coroutine handle from e.data.ptr
.
auto signaled_event_tasks() -> coro::enumerable<event::task> {
event::task t{}; // it's an alias of `coroutine_handle<void>`
for (auto e : selist.wait(0)) {
// we don't care about the internal counter of eventfd.
// just receive the coroutine handle
t = event::task::from_address(e.data.ptr);
co_yield t;
}
}
Summary for the implementation¶
It wasn't that hard to combine coroutine with epoll
and evnetfd
.
Let's cover the event
again.
// event type uses `eventfd` and bit masking for state check
class event final : no_copy_move {
public:
using task = coroutine_handle<void>; // becomes user data of `epoll_event`
private:
uint64_t state; // msb + file descriptor
public:
event(); // create fd with `eventfd`
~event(); // `close` the fd
// if it's signaled (msb is 1), no suspend
bool await_ready();
// bind current fd to epoll
// and its epoll_event will hold the coroutine's handle
void await_suspend(coroutine_handle<void> coro);
// make non-signaled (reset msb to 0)
void await_resume();
// if there is a suspended coroutine,
// it means that the event's fd is alreadty registered via `await_suspend`
// so we will invoke `write` for the fd.
// `epoll` in the `signaled_event_tasks` will report that using `epoll_wait`
// if it's not suspended (== no waiting coroutine),
// `write` on it won't matter
void set();
};
the exported function signaled_event_tasks
allows user code to acquire suspended(event-waiting) coroutines.
It might be unsatisfying that those coroutines are not resumed automatically, but if we already have a main loop for event handling, this function can be placed at the point without concerns.
// access to hidden(global) `epoll` and invoke `epoll_wait`.
// `epoll_wait` will return `epoll_event`s with `coroutine_handle<void>`
// this function extracts and yields them to caller
auto signaled_event_tasks() -> coro::enumerable<event::task>;
Whoa, that's all for the implementation details.
Conclusion¶
Now we can write a coroutine code like the summary section.
epoll
and eventfd
is simple enough to use, but almost all of their examples use thread(or system process). With the C++ 20 coroutine, we can use the pair in a more graceful manner.
auto wait_for_one_event(event& e, atomic_flag& flag) -> no_return {
try {
// resume after the event is signaled ...
co_await e;
} catch (system_error& e) {
// event throws if there was an internal system error
FAIL(e.what());
}
flag.test_and_set();
}
TEST_CASE("wait for one event", "[event]") {
event e1{};
atomic_flag flag = ATOMIC_FLAG_INIT;
wait_for_one_event(e1, flag);
e1.set();
auto count = 0;
for (auto task : signaled_event_tasks()) {
task.resume();
++count;
}
// we must enter the loop
REQUIRE(count > 0);
// already set by the coroutine `wait_for_one_event`
REQUIRE(flag.test_and_set() == true);
}