coroutine checkpoint

This commit is contained in:
Eric Mertens 2025-01-23 12:46:52 -08:00
parent d92c6fee21
commit 553d261d73
10 changed files with 172 additions and 114 deletions

View File

@ -1,18 +1,15 @@
cmake_minimum_required(VERSION 3.13)
set(CMAKE_C_STANDARD 11)
cmake_minimum_required(VERSION 3.25)
set(CMAKE_CXX_STANDARD 20)
project(xbot
VERSION 1
LANGUAGES C CXX
LANGUAGES CXX
)
find_package(Boost 1.83.0 CONFIG COMPONENTS log)
find_package(PkgConfig REQUIRED)
find_package(Boost 1.83.0 CONFIG COMPONENTS log)
pkg_check_modules(LIBHS libhs REQUIRED IMPORTED_TARGET)
include(FetchContent)
FetchContent_Declare(
tomlplusplus
GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git
@ -39,9 +36,9 @@ add_executable(xbot
ping_thread.cpp
snote_thread.cpp
self_thread.cpp
irc_coroutine.cpp
watchdog_thread.cpp
)
# command_thread.cpp priv_thread.cpp
# sasl_thread.cpp)
target_include_directories(xbot PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries(xbot PRIVATE Boost::log Boost::headers tomlplusplus_tomlplusplus PkgConfig::LIBHS mybase64)
target_link_libraries(xbot PRIVATE Boost::log Boost::boost tomlplusplus_tomlplusplus PkgConfig::LIBHS mybase64)

View File

@ -1,8 +1,8 @@
{
"version": 2,
"version": 6,
"configurePresets": [
{
"name": "arm-mac",
"name": "default",
"displayName": "Configure preset using toolchain file",
"description": "Sets Ninja generator, build and install directory",
"generator": "Ninja",
@ -17,8 +17,8 @@
],
"buildPresets": [
{
"name": "arm-mac",
"configurePreset": "arm-mac"
"name": "default",
"configurePreset": "default"
}
]
}

View File

@ -74,7 +74,7 @@ auto Connection::connect(
{
auto resolver = boost::asio::ip::tcp::resolver{io};
auto const endpoints = co_await resolver.async_resolve(host, port, boost::asio::use_awaitable);
auto const endpoint = co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable);
co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable);
sig_connect();
}
@ -90,26 +90,31 @@ auto Connection::connect(
}
buffer.add_bytes(n, [this](char * line) {
BOOST_LOG_TRIVIAL(debug) << "RECV: " << line;
auto const msg = parse_irc_message(line);
auto const recognized = IrcCommandHash::in_word_set(msg.command.data(), msg.command.size());
auto const command
= recognized
&& recognized->min_args <= msg.args.size()
&& recognized->max_args >= msg.args.size()
? recognized->command : IrcCommand::UNKNOWN;
if (IrcCommand::UNKNOWN == command)
{
BOOST_LOG_TRIVIAL(warning) << "Unrecognized command: " << msg.command << " " << msg.args.size();
}
sig_ircmsg(command, msg);
dispatch_line(line);
});
}
sig_disconnect();
}
/// Parse IRC message line and dispatch it to the ircmsg slot.
auto Connection::dispatch_line(char *line) -> void
{
auto const msg = parse_irc_message(line);
auto const recognized = IrcCommandHash::in_word_set(msg.command.data(), msg.command.size());
auto const command
= recognized
&& recognized->min_args <= msg.args.size()
&& recognized->max_args >= msg.args.size()
? recognized->command : IrcCommand::UNKNOWN;
if (IrcCommand::UNKNOWN == command)
{
BOOST_LOG_TRIVIAL(warning) << "Unrecognized command: " << msg.command << " " << msg.args.size();
}
sig_ircmsg(command, msg);
}
auto Connection::write_line(std::string message) -> void
{
BOOST_LOG_TRIVIAL(debug) << "SEND: " << message;

View File

@ -1,26 +1,14 @@
#pragma once
#include "event.hpp"
#include "settings.hpp"
#include "ircmsg.hpp"
#include "irc_command.hpp"
#include <boost/asio.hpp>
#include <boost/signals2.hpp>
#include <chrono>
#include <functional>
#include <concepts>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string_view>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
#include "ircmsg.hpp"
#include "irc_command.hpp"
class Connection : public std::enable_shared_from_this<Connection>
{
@ -31,6 +19,7 @@ private:
auto writer() -> void;
auto writer_immediate() -> void;
auto dispatch_line(char * line) -> void;
public:
Connection(boost::asio::io_context & io);

31
irc_coroutine.cpp Normal file
View File

@ -0,0 +1,31 @@
#include "irc_coroutine.hpp"
auto irc_coroutine::is_running() -> bool {
return promise().connection_ != nullptr;
}
auto irc_coroutine::exception() -> std::exception_ptr {
return promise().exception_;
}
auto irc_coroutine::start(Connection& connection) -> void {
promise().connection_ = connection.shared_from_this();
resume();
}
void wait_command::await_suspend(std::coroutine_handle<irc_promise> handle)
{
auto &connection = *handle.promise().connection_;
ircmsg_connection_ = connection.sig_ircmsg.connect([this, handle](auto cmd, auto &msg) {
auto const wanted = std::find(want_cmds_.begin(), want_cmds_.end(), cmd) != want_cmds_.end();
if (wanted) {
unsubscribe();
resultCmd = cmd;
resultMsg = &msg;
handle.resume();
}
});
disconnect_connection_ = connection.sig_disconnect.connect([this, handle]() {
unsubscribe();
handle.resume();
});
}

97
irc_coroutine.hpp Normal file
View File

@ -0,0 +1,97 @@
#pragma once
#include "connection.hpp"
#include <coroutine>
#include <vector>
struct irc_promise;
struct irc_coroutine : std::coroutine_handle<irc_promise> {
using promise_type = irc_promise;
auto start(Connection &connection) -> void;
auto is_running() -> bool;
auto exception() -> std::exception_ptr;
};
struct irc_promise
{
// Pointer to the connection while running. Cleared on termination.
std::shared_ptr<Connection> connection_;
// Pointer to exception that terminated this coroutine if there is one.
std::exception_ptr exception_;
irc_coroutine get_return_object()
{
return {irc_coroutine::from_promise(*this)};
}
// Suspend waiting for start() to initialize connection_
std::suspend_always initial_suspend() noexcept { return {}; }
// Suspend so that is_running() and exception() work
std::suspend_always final_suspend() noexcept { return {}; }
// Normal termination
void return_void() {
connection_.reset();
}
// Abnormal termination - remember the exception
void unhandled_exception() {
connection_.reset();
exception_ = std::current_exception();
}
};
/*
struct wait_ircmsg {
using result_type = std::pair<IrcCommand, const IrcMsg &>
std::vector<IrcCommand> want_cmds_;
};
struct wait_timeout {
struct result_type {};
std::vector<IrcCommand> want_cmds_;
};
*/
class wait_command {
std::vector<IrcCommand> want_cmds_;
IrcCommand resultCmd;
const IrcMsg *resultMsg;
boost::signals2::scoped_connection ircmsg_connection_;
boost::signals2::scoped_connection disconnect_connection_;
void unsubscribe() {
ircmsg_connection_.disconnect();
disconnect_connection_.disconnect();
}
public:
wait_command(std::initializer_list<IrcCommand> want_cmds)
: want_cmds_(want_cmds)
, resultMsg{}
{}
/// The coroutine always needs to wait for a message. It will never
/// be ready immediately.
bool await_ready() noexcept { return false; }
/// Install event handles in the connection that will resume this coroutine.
void await_suspend(std::coroutine_handle<irc_promise> handle);
auto await_resume() -> std::pair<IrcCommand, const IrcMsg &> {
if (resultMsg) {
return std::make_pair(resultCmd, std::cref(*resultMsg));
} else {
throw std::runtime_error{"connection terminated"};
}
}
};

View File

@ -1,91 +1,30 @@
#include "connection.hpp"
#include "event.hpp"
#include "ircmsg.hpp"
#include "linebuffer.hpp"
#include "settings.hpp"
#include "write_irc.hpp"
#include <boost/asio.hpp>
#include <algorithm>
#include <chrono>
#include <fstream>
#include <coroutine>
#include <iostream>
#include <limits>
#include <list>
#include <memory>
#include <string>
#include <string_view>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
#include <unordered_map>
#include <unordered_set>
#include <coroutine>
#include "ping_thread.hpp"
#include "registration_thread.hpp"
#include "self_thread.hpp"
#include "snote_thread.hpp"
#include "irc_coroutine.hpp"
using namespace std::chrono_literals;
struct irc_promise;
struct irc_coroutine : std::coroutine_handle<irc_promise> {
using promise_type = irc_promise;
};
struct irc_promise {
std::exception_ptr exception_;
irc_coroutine get_return_object() { return {irc_coroutine::from_promise(*this)}; }
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {
exception_ = std::current_exception();
}
};
struct wait_command {
Connection& connection_;
IrcCommand want_cmd_;
const IrcMsg *result;
boost::signals2::connection ircmsg_connection_;
boost::signals2::connection disconnect_connection_;
wait_command(Connection& connection, IrcCommand want_cmd)
: connection_{connection}, want_cmd_{want_cmd} {}
bool await_ready() noexcept { return false; }
void await_suspend(std::coroutine_handle<irc_promise> handle) {
ircmsg_connection_ = connection_.sig_ircmsg.connect([this, handle](auto cmd, auto &msg) {
if (cmd == want_cmd_) {
ircmsg_connection_.disconnect();
disconnect_connection_.disconnect();
result = &msg;
handle.resume();
}
});
disconnect_connection_ = connection_.sig_disconnect.connect([this, handle]() {
ircmsg_connection_.disconnect();
disconnect_connection_.disconnect();
handle.destroy(); // XXX
});
}
const IrcMsg &await_resume() { return *result; }
};
irc_coroutine example(Connection& connection) {
auto & msg1 = co_await wait_command {connection, IrcCommand::RPL_WELCOME};
std::cout << "WELCOME " << msg1.args[0] << "\n";
auto & msg5 = co_await wait_command {connection, IrcCommand::RPL_ISUPPORT};
std::cout << "ISUPPORT " << msg5.args[0] << "\n";
auto [cmd1, msg1] = co_await wait_command{IrcCommand::RPL_WELCOME};
std::cout << "WELCOME " << msg1.args[0] << "\n";
auto [cmd5, msg5] = co_await wait_command{IrcCommand::RPL_ISUPPORT};
std::cout << "ISUPPORT " << msg5.args[0] << "\n";
}
auto start(boost::asio::io_context & io, Settings const& settings) -> void
@ -107,6 +46,7 @@ auto start(boost::asio::io_context & io, Settings const& settings) -> void
});
*/
auto logic = example(*connection);
logic.start(*connection);
boost::asio::co_spawn(
io,

View File

@ -1,7 +1,6 @@
#include "ping_thread.hpp"
#include "connection.hpp"
#include "ircmsg.hpp"
#include "write_irc.hpp"
auto PingThread::start(Connection& connection) -> void

View File

@ -147,9 +147,9 @@ auto RegistrationThread::start(
thread->listen_for_cap_ls();
connection.sig_connect.connect_extended([thread](auto& handle)
thread->connect_handle_ = connection.sig_connect.connect([thread]()
{
handle.disconnect();
thread->connect_handle_.disconnect();
thread->on_connect();
});

View File

@ -18,8 +18,8 @@ class RegistrationThread : public std::enable_shared_from_this<RegistrationThrea
std::unordered_map<std::string, std::string> caps;
std::unordered_set<std::string> outstanding;
boost::signals2::connection connect_handle_;
boost::signals2::connection message_handle_;
boost::signals2::scoped_connection connect_handle_;
boost::signals2::scoped_connection message_handle_;
auto on_connect() -> void;
auto send_req() -> void;