File io_linux.cpp¶
File List > modules > net > io_linux.cpp
Go to the documentation of this file.
#include <chrono>
#include <coroutine/linux.h>
#include <coroutine/net.h>
static_assert(sizeof(ssize_t) <= sizeof(int64_t));
using namespace std;
using namespace std::chrono;
namespace coro {
epoll_owner iep{}, oep{}; // inbound, outbound
void poll_net_tasks(uint64_t nano) noexcept(false) {
const auto half_time = duration_cast<milliseconds>(nanoseconds{nano} / 2);
// event buffer for this poll
constexpr auto buf_sz = 30u;
auto buf = make_unique<epoll_event[]>(buf_sz);
// resume inbound coroutines
{
auto count = iep.wait(half_time.count(), {buf.get(), buf_sz});
for (auto i = 0u; i < count; ++i)
if (auto coro =
coroutine_handle<void>::from_address(buf[i].data.ptr))
coro.resume();
}
// resume outbound coroutines
{
auto count = oep.wait(half_time.count(), {buf.get(), buf_sz});
for (auto i = 0u; i < count; ++i)
if (auto coro =
coroutine_handle<void>::from_address(buf[i].data.ptr))
coro.resume();
}
}
bool io_work_t::ready() const noexcept {
auto sd = this->handle;
// non blocking operation is expected going to suspend
if (fcntl(sd, F_GETFL, 0) & O_NONBLOCK)
return false;
// not configured. return `true` and bypass to the blocking I/O
return true;
}
uint32_t io_work_t::error() const noexcept {
return gsl::narrow_cast<uint32_t>(this->internal);
}
auto send_to(uint64_t sd, const sockaddr_in& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_send_to& {
work.handle = sd;
work.ptr = const_cast<sockaddr_in*>(addressof(remote));
work.internal_high = sizeof(sockaddr_in);
work.buffer = buffer;
return *reinterpret_cast<io_send_to*>(addressof(work));
}
auto send_to(uint64_t sd, const sockaddr_in6& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_send_to& {
work.handle = sd;
work.ptr = const_cast<sockaddr_in6*>(addressof(remote));
work.internal_high = sizeof(sockaddr_in6);
work.buffer = buffer;
return *reinterpret_cast<io_send_to*>(addressof(work));
}
void io_send_to::suspend(coroutine_handle<void> coro) noexcept(false) {
auto sd = this->handle;
auto& errc = this->internal;
errc = 0;
epoll_event req{};
req.events = EPOLLOUT | EPOLLONESHOT | EPOLLET;
req.data.ptr = coro.address();
oep.try_add(sd, req); // throws if epoll_ctl fails
}
int64_t io_send_to::resume() noexcept {
auto sd = this->handle;
auto addr = reinterpret_cast<sockaddr*>(this->ptr);
auto addrlen = static_cast<socklen_t>(this->internal_high);
auto& errc = this->internal;
auto sz = sendto(sd, buffer.data(), buffer.size_bytes(), //
0, addr, addrlen);
// update error code upon i/o failure
errc = sz < 0 ? errno : 0;
return sz;
}
auto recv_from(uint64_t sd, sockaddr_in& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_recv_from& {
work.handle = sd;
work.ptr = addressof(remote);
work.internal_high = sizeof(sockaddr_in);
work.buffer = buffer;
return *reinterpret_cast<io_recv_from*>(addressof(work));
}
auto recv_from(uint64_t sd, sockaddr_in6& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_recv_from& {
work.handle = sd;
work.ptr = addressof(remote);
work.internal_high = sizeof(sockaddr_in6);
work.buffer = buffer;
return *reinterpret_cast<io_recv_from*>(addressof(work));
}
void io_recv_from::suspend(coroutine_handle<void> coro) noexcept(false) {
auto sd = this->handle;
auto& errc = this->internal;
errc = 0;
epoll_event req{};
req.events = EPOLLIN | EPOLLONESHOT | EPOLLET;
req.data.ptr = coro.address();
iep.try_add(sd, req); // throws if epoll_ctl fails
}
int64_t io_recv_from::resume() noexcept {
auto sd = this->handle;
auto addr = reinterpret_cast<sockaddr*>(this->ptr);
auto addrlen = static_cast<socklen_t>(this->internal_high);
auto& errc = this->internal;
auto sz = recvfrom(sd, buffer.data(), buffer.size_bytes(), //
0, addr, addressof(addrlen));
// update error code upon i/o failure
errc = sz < 0 ? errno : 0;
return sz;
}
auto send_stream(uint64_t sd, io_buffer_t buffer, uint32_t flag,
io_work_t& work) noexcept(false) -> io_send& {
static_assert(sizeof(socklen_t) == sizeof(uint32_t));
work.handle = sd;
work.internal = flag;
work.buffer = buffer;
return *reinterpret_cast<io_send*>(addressof(work));
}
void io_send::suspend(coroutine_handle<void> coro) noexcept(false) {
auto sd = this->handle;
auto& errc = this->internal;
errc = 0;
epoll_event req{};
req.events = EPOLLOUT | EPOLLONESHOT | EPOLLET;
req.data.ptr = coro.address();
oep.try_add(sd, req); // throws if epoll_ctl fails
}
int64_t io_send::resume() noexcept {
auto sd = this->handle;
auto flag = this->internal;
auto& errc = this->internal;
const auto sz = send(sd, buffer.data(), buffer.size_bytes(), flag);
// update error code upon i/o failure
errc = sz < 0 ? errno : 0;
return sz;
}
auto recv_stream(uint64_t sd, io_buffer_t buffer, uint32_t flag,
io_work_t& work) noexcept(false) -> io_recv& {
static_assert(sizeof(socklen_t) == sizeof(uint32_t));
work.handle = sd;
work.internal = flag;
work.buffer = buffer;
return *reinterpret_cast<io_recv*>(addressof(work));
}
void io_recv::suspend(coroutine_handle<void> coro) noexcept(false) {
auto sd = this->handle;
auto& errc = this->internal;
errc = 0;
epoll_event req{};
req.events = EPOLLIN | EPOLLONESHOT | EPOLLET;
req.data.ptr = coro.address();
iep.try_add(sd, req);
}
int64_t io_recv::resume() noexcept {
auto sd = this->handle;
auto flag = this->internal;
auto& errc = this->internal;
const auto sz = recv(sd, buffer.data(), buffer.size_bytes(), flag);
// update error code upon i/o failure
errc = sz < 0 ? errno : 0;
return sz;
}
} // namespace coro