File io_darwin.cpp¶
File List > modules > net > io_darwin.cpp
Go to the documentation of this file.
#include <chrono>
#include <coroutine/net.h>
#include <coroutine/unix.h>
static_assert(sizeof(ssize_t) <= sizeof(int64_t));
using namespace std;
using namespace std::chrono;
namespace coro {
kqueue_owner netkq{};
using net_callback_t = void (*)(void* ctx, coroutine_handle<void> coro);
void poll_net_tasks(const timespec& wait_time, //
net_callback_t callback, void* ctx) noexcept(false) {
constexpr auto buf_size = 30u;
auto buf = make_unique<kevent64_s[]>(buf_size);
const auto count = netkq.events(wait_time, {buf.get(), buf_size});
for (auto i = 0; i < count; ++i) {
auto* work = reinterpret_cast<io_work_t*>(buf[i].udata);
callback(ctx, work->task);
}
}
void resume_net_task(void*, coroutine_handle<void> coro) noexcept(false) {
return coro.resume();
}
void poll_net_tasks(uint64_t nano) noexcept(false) {
auto timeout = nanoseconds{nano};
const auto sec = duration_cast<seconds>(timeout);
const timespec wait_time{
.tv_sec = sec.count(),
.tv_nsec = (timeout - sec).count(),
};
return poll_net_tasks(wait_time, resume_net_task, nullptr);
}
bool io_work_t::ready() const noexcept {
const auto sd = this->handle;
// non blocking operation is expected
// going to suspend
if (fcntl(sd, F_GETFL, 0) & O_NONBLOCK)
return false;
// not configured. return true
// and bypass to the blocking I/O
return true;
}
uint32_t io_work_t::error() const noexcept {
return gsl::narrow_cast<uint32_t>(this->internal);
}
auto send_to(uint64_t sd, const sockaddr_in& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_send_to& {
work.handle = sd;
work.ptr = const_cast<sockaddr_in*>(addressof(remote));
work.internal_high = sizeof(sockaddr_in);
work.buffer = buffer;
return *reinterpret_cast<io_send_to*>(addressof(work));
}
auto send_to(uint64_t sd, const sockaddr_in6& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_send_to& {
work.handle = sd;
work.ptr = const_cast<sockaddr_in6*>(addressof(remote));
work.internal_high = sizeof(sockaddr_in6);
work.buffer = buffer;
return *reinterpret_cast<io_send_to*>(addressof(work));
}
void io_send_to::suspend(coroutine_handle<void> rh) noexcept(false) {
static_assert(sizeof(void*) <= sizeof(uint64_t));
task = rh;
// one-shot, write registration (edge-trigger)
kevent64_s req{};
req.ident = this->handle;
req.filter = EVFILT_WRITE;
req.flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
req.fflags = 0;
req.data = 0;
req.udata = reinterpret_cast<uint64_t>(static_cast<io_work_t*>(this));
netkq.change(req);
}
int64_t io_send_to::resume() noexcept {
auto sd = this->handle;
auto addr = reinterpret_cast<sockaddr*>(this->ptr);
auto addrlen = static_cast<socklen_t>(this->internal_high);
auto& errc = this->internal;
auto sz = sendto(sd, buffer.data(), buffer.size_bytes(), //
0, addr, addrlen);
errc = sz < 0 ? errno : 0;
return sz;
}
auto recv_from(uint64_t sd, sockaddr_in& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_recv_from& {
work.handle = sd;
work.ptr = addressof(remote);
work.internal_high = sizeof(sockaddr_in);
work.buffer = buffer;
return *reinterpret_cast<io_recv_from*>(addressof(work));
}
auto recv_from(uint64_t sd, sockaddr_in6& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_recv_from& {
work.handle = sd;
work.ptr = addressof(remote);
work.internal_high = sizeof(sockaddr_in6);
work.buffer = buffer;
return *reinterpret_cast<io_recv_from*>(addressof(work));
}
void io_recv_from::suspend(coroutine_handle<void> rh) noexcept(false) {
static_assert(sizeof(void*) <= sizeof(uint64_t));
task = rh;
// system operation
kevent64_s req{};
req.ident = this->handle;
req.filter = EVFILT_READ;
req.flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
req.fflags = 0;
req.data = 0;
// it is possible to pass `rh` for the user data,
// but will pass this object to support
// receiving some values from `wait_io_tasks`
req.udata = reinterpret_cast<uint64_t>(static_cast<io_work_t*>(this));
netkq.change(req);
}
int64_t io_recv_from::resume() noexcept {
auto sd = this->handle;
auto addr = reinterpret_cast<sockaddr*>(this->ptr);
auto addrlen = static_cast<socklen_t>(this->internal_high);
auto& errc = this->internal;
auto sz = recvfrom(sd, buffer.data(), buffer.size_bytes(), //
0, addr, addressof(addrlen));
errc = sz < 0 ? errno : 0;
return sz;
}
auto send_stream(uint64_t sd, io_buffer_t buffer, uint32_t flag,
io_work_t& work) noexcept(false) -> io_send& {
static_assert(sizeof(socklen_t) == sizeof(uint32_t));
work.handle = sd;
work.internal = flag;
work.buffer = buffer;
return *reinterpret_cast<io_send*>(addressof(work));
}
void io_send::suspend(coroutine_handle<void> rh) noexcept(false) {
static_assert(sizeof(void*) <= sizeof(uint64_t));
task = rh;
// one-shot, write registration (edge-trigger)
kevent64_s req{};
req.ident = this->handle;
req.filter = EVFILT_WRITE;
req.flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
req.fflags = 0;
req.data = 0;
req.udata = reinterpret_cast<uint64_t>(static_cast<io_work_t*>(this));
netkq.change(req);
}
int64_t io_send::resume() noexcept {
auto sd = this->handle;
auto flag = this->internal;
auto& errc = this->internal;
const auto sz = send(sd, buffer.data(), buffer.size_bytes(), flag);
errc = sz < 0 ? errno : 0;
return sz;
}
auto recv_stream(uint64_t sd, io_buffer_t buffer, uint32_t flag,
io_work_t& work) noexcept(false) -> io_recv& {
static_assert(sizeof(socklen_t) == sizeof(uint32_t));
work.handle = sd;
work.internal = flag;
work.buffer = buffer;
return *reinterpret_cast<io_recv*>(addressof(work));
}
void io_recv::suspend(coroutine_handle<void> rh) noexcept(false) {
static_assert(sizeof(void*) <= sizeof(uint64_t));
task = rh;
// system operation
kevent64_s req{};
req.ident = this->handle;
req.filter = EVFILT_READ;
req.flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
req.fflags = 0;
req.data = 0;
req.udata = reinterpret_cast<uint64_t>(static_cast<io_work_t*>(this));
netkq.change(req);
}
int64_t io_recv::resume() noexcept {
auto sd = this->handle;
auto flag = this->internal;
auto& errc = this->internal;
const auto sz = recv(sd, buffer.data(), buffer.size_bytes(), flag);
errc = sz < 0 ? errno : 0;
return sz;
}
} // namespace coro