#include "connection.hpp" #include "linebuffer.hpp" #include Connection::Connection(boost::asio::io_context & io) : stream_{io} , write_timer_{io, std::chrono::steady_clock::time_point::max()} { } auto Connection::writer_immediate() -> void { std::vector buffers; buffers.reserve(write_strings_.size()); for (auto const& elt : write_strings_) { buffers.push_back(boost::asio::buffer(elt)); } boost::asio::async_write( stream_, buffers, [weak = weak_from_this() ,strings = std::move(write_strings_) ](boost::system::error_code const& error, std::size_t) { if (not error) { if (auto self = weak.lock()) { self->writer(); } } }); write_strings_.clear(); } auto Connection::writer() -> void { if (write_strings_.empty()) { write_timer_.async_wait([weak = weak_from_this()](auto){ if (auto self = weak.lock()) { if (not self->write_strings_.empty()) { self->writer_immediate(); } } }); } else { writer_immediate(); } } auto Connection::connect( boost::asio::io_context & io, std::string host, std::string port ) -> boost::asio::awaitable { using namespace std::placeholders; // keep connection alive while coroutine is active auto const self = shared_from_this(); { auto resolver = boost::asio::ip::tcp::resolver{io}; auto const endpoints = co_await resolver.async_resolve(host, port, boost::asio::use_awaitable); auto const endpoint = co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable); make_event(); } self->writer(); for(LineBuffer buffer{32'768};;) { boost::system::error_code error; auto const n = co_await stream_.async_read_some(buffer.get_buffer(), boost::asio::redirect_error(boost::asio::use_awaitable, error)); if (error) { break; } buffer.add_bytes(n, [this](char * line) { BOOST_LOG_TRIVIAL(debug) << "RECV: " << line; make_event(line); }); } make_event(); } auto Connection::write_line(std::string message) -> void { BOOST_LOG_TRIVIAL(debug) << "SEND: " << message; message += "\r\n"; auto const need_cancel = write_strings_.empty(); write_strings_.push_back(std::move(message)); if (need_cancel) { write_timer_.cancel_one(); } } auto Connection::close() -> void { stream_.close(); }