From 553d261d737b3329d73a722db489d0ea899f2863 Mon Sep 17 00:00:00 2001 From: Eric Mertens Date: Thu, 23 Jan 2025 12:46:52 -0800 Subject: [PATCH] coroutine checkpoint --- CMakeLists.txt | 15 +++---- CMakePresets.json | 8 ++-- connection.cpp | 35 ++++++++------- connection.hpp | 17 ++------ irc_coroutine.cpp | 31 +++++++++++++ irc_coroutine.hpp | 97 +++++++++++++++++++++++++++++++++++++++++ main.cpp | 74 +++---------------------------- ping_thread.cpp | 1 - registration_thread.cpp | 4 +- registration_thread.hpp | 4 +- 10 files changed, 172 insertions(+), 114 deletions(-) create mode 100644 irc_coroutine.cpp create mode 100644 irc_coroutine.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 833499e..15afc98 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,18 +1,15 @@ -cmake_minimum_required(VERSION 3.13) -set(CMAKE_C_STANDARD 11) +cmake_minimum_required(VERSION 3.25) set(CMAKE_CXX_STANDARD 20) project(xbot VERSION 1 - LANGUAGES C CXX + LANGUAGES CXX ) -find_package(Boost 1.83.0 CONFIG COMPONENTS log) find_package(PkgConfig REQUIRED) - +find_package(Boost 1.83.0 CONFIG COMPONENTS log) pkg_check_modules(LIBHS libhs REQUIRED IMPORTED_TARGET) include(FetchContent) - FetchContent_Declare( tomlplusplus GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git @@ -39,9 +36,9 @@ add_executable(xbot ping_thread.cpp snote_thread.cpp self_thread.cpp + irc_coroutine.cpp watchdog_thread.cpp ) -# command_thread.cpp priv_thread.cpp -# sasl_thread.cpp) + target_include_directories(xbot PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries(xbot PRIVATE Boost::log Boost::headers tomlplusplus_tomlplusplus PkgConfig::LIBHS mybase64) +target_link_libraries(xbot PRIVATE Boost::log Boost::boost tomlplusplus_tomlplusplus PkgConfig::LIBHS mybase64) diff --git a/CMakePresets.json b/CMakePresets.json index 49b13c6..2f6f99a 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -1,8 +1,8 @@ { - "version": 2, + "version": 6, "configurePresets": [ { - "name": "arm-mac", + "name": "default", "displayName": "Configure preset using toolchain file", "description": "Sets Ninja generator, build and install directory", "generator": "Ninja", @@ -17,8 +17,8 @@ ], "buildPresets": [ { - "name": "arm-mac", - "configurePreset": "arm-mac" + "name": "default", + "configurePreset": "default" } ] } diff --git a/connection.cpp b/connection.cpp index 2c0b38c..5f7f99c 100644 --- a/connection.cpp +++ b/connection.cpp @@ -74,7 +74,7 @@ auto Connection::connect( { auto resolver = boost::asio::ip::tcp::resolver{io}; auto const endpoints = co_await resolver.async_resolve(host, port, boost::asio::use_awaitable); - auto const endpoint = co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable); + co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable); sig_connect(); } @@ -90,26 +90,31 @@ auto Connection::connect( } buffer.add_bytes(n, [this](char * line) { BOOST_LOG_TRIVIAL(debug) << "RECV: " << line; - - auto const msg = parse_irc_message(line); - auto const recognized = IrcCommandHash::in_word_set(msg.command.data(), msg.command.size()); - auto const command - = recognized - && recognized->min_args <= msg.args.size() - && recognized->max_args >= msg.args.size() - ? recognized->command : IrcCommand::UNKNOWN; - - if (IrcCommand::UNKNOWN == command) - { - BOOST_LOG_TRIVIAL(warning) << "Unrecognized command: " << msg.command << " " << msg.args.size(); - } - sig_ircmsg(command, msg); + dispatch_line(line); }); } sig_disconnect(); } +/// Parse IRC message line and dispatch it to the ircmsg slot. +auto Connection::dispatch_line(char *line) -> void +{ + auto const msg = parse_irc_message(line); + auto const recognized = IrcCommandHash::in_word_set(msg.command.data(), msg.command.size()); + auto const command + = recognized + && recognized->min_args <= msg.args.size() + && recognized->max_args >= msg.args.size() + ? recognized->command : IrcCommand::UNKNOWN; + + if (IrcCommand::UNKNOWN == command) + { + BOOST_LOG_TRIVIAL(warning) << "Unrecognized command: " << msg.command << " " << msg.args.size(); + } + sig_ircmsg(command, msg); +} + auto Connection::write_line(std::string message) -> void { BOOST_LOG_TRIVIAL(debug) << "SEND: " << message; diff --git a/connection.hpp b/connection.hpp index f60e38c..bf79879 100644 --- a/connection.hpp +++ b/connection.hpp @@ -1,26 +1,14 @@ #pragma once -#include "event.hpp" -#include "settings.hpp" +#include "ircmsg.hpp" +#include "irc_command.hpp" #include #include -#include -#include -#include -#include #include #include #include -#include -#include -#include -#include -#include - -#include "ircmsg.hpp" -#include "irc_command.hpp" class Connection : public std::enable_shared_from_this { @@ -31,6 +19,7 @@ private: auto writer() -> void; auto writer_immediate() -> void; + auto dispatch_line(char * line) -> void; public: Connection(boost::asio::io_context & io); diff --git a/irc_coroutine.cpp b/irc_coroutine.cpp new file mode 100644 index 0000000..11a070a --- /dev/null +++ b/irc_coroutine.cpp @@ -0,0 +1,31 @@ +#include "irc_coroutine.hpp" + +auto irc_coroutine::is_running() -> bool { + return promise().connection_ != nullptr; +} +auto irc_coroutine::exception() -> std::exception_ptr { + return promise().exception_; +} + +auto irc_coroutine::start(Connection& connection) -> void { + promise().connection_ = connection.shared_from_this(); + resume(); +} + +void wait_command::await_suspend(std::coroutine_handle handle) +{ + auto &connection = *handle.promise().connection_; + ircmsg_connection_ = connection.sig_ircmsg.connect([this, handle](auto cmd, auto &msg) { + auto const wanted = std::find(want_cmds_.begin(), want_cmds_.end(), cmd) != want_cmds_.end(); + if (wanted) { + unsubscribe(); + resultCmd = cmd; + resultMsg = &msg; + handle.resume(); + } + }); + disconnect_connection_ = connection.sig_disconnect.connect([this, handle]() { + unsubscribe(); + handle.resume(); + }); +} diff --git a/irc_coroutine.hpp b/irc_coroutine.hpp new file mode 100644 index 0000000..22097ae --- /dev/null +++ b/irc_coroutine.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include "connection.hpp" + +#include +#include + +struct irc_promise; + +struct irc_coroutine : std::coroutine_handle { + using promise_type = irc_promise; + + auto start(Connection &connection) -> void; + auto is_running() -> bool; + auto exception() -> std::exception_ptr; +}; + +struct irc_promise +{ + // Pointer to the connection while running. Cleared on termination. + std::shared_ptr connection_; + + // Pointer to exception that terminated this coroutine if there is one. + std::exception_ptr exception_; + + irc_coroutine get_return_object() + { + return {irc_coroutine::from_promise(*this)}; + } + + // Suspend waiting for start() to initialize connection_ + std::suspend_always initial_suspend() noexcept { return {}; } + + // Suspend so that is_running() and exception() work + std::suspend_always final_suspend() noexcept { return {}; } + + // Normal termination + void return_void() { + connection_.reset(); + } + + // Abnormal termination - remember the exception + void unhandled_exception() { + connection_.reset(); + exception_ = std::current_exception(); + } +}; + +/* +struct wait_ircmsg { + using result_type = std::pair + std::vector want_cmds_; +}; + +struct wait_timeout { + struct result_type {}; + std::vector want_cmds_; +}; + +*/ + +class wait_command { + std::vector want_cmds_; + + IrcCommand resultCmd; + const IrcMsg *resultMsg; + + boost::signals2::scoped_connection ircmsg_connection_; + boost::signals2::scoped_connection disconnect_connection_; + + void unsubscribe() { + ircmsg_connection_.disconnect(); + disconnect_connection_.disconnect(); + } + +public: + wait_command(std::initializer_list want_cmds) + : want_cmds_(want_cmds) + , resultMsg{} + {} + + /// The coroutine always needs to wait for a message. It will never + /// be ready immediately. + bool await_ready() noexcept { return false; } + + /// Install event handles in the connection that will resume this coroutine. + void await_suspend(std::coroutine_handle handle); + + auto await_resume() -> std::pair { + if (resultMsg) { + return std::make_pair(resultCmd, std::cref(*resultMsg)); + } else { + throw std::runtime_error{"connection terminated"}; + } + } +}; + diff --git a/main.cpp b/main.cpp index 734c5a9..9619138 100644 --- a/main.cpp +++ b/main.cpp @@ -1,91 +1,30 @@ - #include "connection.hpp" -#include "event.hpp" #include "ircmsg.hpp" -#include "linebuffer.hpp" #include "settings.hpp" #include "write_irc.hpp" #include -#include -#include #include -#include #include -#include -#include #include #include -#include -#include -#include -#include #include -#include -#include -#include #include "ping_thread.hpp" #include "registration_thread.hpp" #include "self_thread.hpp" #include "snote_thread.hpp" +#include "irc_coroutine.hpp" + using namespace std::chrono_literals; -struct irc_promise; - -struct irc_coroutine : std::coroutine_handle { - using promise_type = irc_promise; -}; - -struct irc_promise { - std::exception_ptr exception_; - - irc_coroutine get_return_object() { return {irc_coroutine::from_promise(*this)}; } - std::suspend_never initial_suspend() noexcept { return {}; } - std::suspend_always final_suspend() noexcept { return {}; } - void return_void() {} - void unhandled_exception() { - exception_ = std::current_exception(); - } -}; - -struct wait_command { - Connection& connection_; - IrcCommand want_cmd_; - - const IrcMsg *result; - boost::signals2::connection ircmsg_connection_; - boost::signals2::connection disconnect_connection_; - - wait_command(Connection& connection, IrcCommand want_cmd) - : connection_{connection}, want_cmd_{want_cmd} {} - - bool await_ready() noexcept { return false; } - void await_suspend(std::coroutine_handle handle) { - ircmsg_connection_ = connection_.sig_ircmsg.connect([this, handle](auto cmd, auto &msg) { - if (cmd == want_cmd_) { - ircmsg_connection_.disconnect(); - disconnect_connection_.disconnect(); - result = &msg; - handle.resume(); - } - }); - disconnect_connection_ = connection_.sig_disconnect.connect([this, handle]() { - ircmsg_connection_.disconnect(); - disconnect_connection_.disconnect(); - handle.destroy(); // XXX - }); - } - const IrcMsg &await_resume() { return *result; } -}; - irc_coroutine example(Connection& connection) { - auto & msg1 = co_await wait_command {connection, IrcCommand::RPL_WELCOME}; - std::cout << "WELCOME " << msg1.args[0] << "\n"; - auto & msg5 = co_await wait_command {connection, IrcCommand::RPL_ISUPPORT}; - std::cout << "ISUPPORT " << msg5.args[0] << "\n"; + auto [cmd1, msg1] = co_await wait_command{IrcCommand::RPL_WELCOME}; + std::cout << "WELCOME " << msg1.args[0] << "\n"; + auto [cmd5, msg5] = co_await wait_command{IrcCommand::RPL_ISUPPORT}; + std::cout << "ISUPPORT " << msg5.args[0] << "\n"; } auto start(boost::asio::io_context & io, Settings const& settings) -> void @@ -107,6 +46,7 @@ auto start(boost::asio::io_context & io, Settings const& settings) -> void }); */ auto logic = example(*connection); + logic.start(*connection); boost::asio::co_spawn( io, diff --git a/ping_thread.cpp b/ping_thread.cpp index 9268cee..43838fc 100644 --- a/ping_thread.cpp +++ b/ping_thread.cpp @@ -1,7 +1,6 @@ #include "ping_thread.hpp" #include "connection.hpp" -#include "ircmsg.hpp" #include "write_irc.hpp" auto PingThread::start(Connection& connection) -> void diff --git a/registration_thread.cpp b/registration_thread.cpp index 4dd869b..12929ed 100644 --- a/registration_thread.cpp +++ b/registration_thread.cpp @@ -147,9 +147,9 @@ auto RegistrationThread::start( thread->listen_for_cap_ls(); - connection.sig_connect.connect_extended([thread](auto& handle) + thread->connect_handle_ = connection.sig_connect.connect([thread]() { - handle.disconnect(); + thread->connect_handle_.disconnect(); thread->on_connect(); }); diff --git a/registration_thread.hpp b/registration_thread.hpp index 99e2550..763cfb1 100644 --- a/registration_thread.hpp +++ b/registration_thread.hpp @@ -18,8 +18,8 @@ class RegistrationThread : public std::enable_shared_from_this caps; std::unordered_set outstanding; - boost::signals2::connection connect_handle_; - boost::signals2::connection message_handle_; + boost::signals2::scoped_connection connect_handle_; + boost::signals2::scoped_connection message_handle_; auto on_connect() -> void; auto send_req() -> void;