From 959d51f5f49c1c5be828b49bad59541fe88531a7 Mon Sep 17 00:00:00 2001 From: Eric Mertens Date: Sat, 25 Nov 2023 20:09:20 -0800 Subject: [PATCH] Switch to eventpp --- CMakeLists.txt | 16 +++-- connection.cpp | 11 ---- connection.hpp | 52 +++++++++++++-- irc_parse_thread.cpp | 18 ++---- irc_parse_thread.hpp | 9 +-- main.cpp | 57 +++++----------- ping_thread.cpp | 20 ++---- ping_thread.hpp | 11 +--- registration_thread.cpp | 139 +++++++++++++++++++++++++++------------- registration_thread.hpp | 51 +++------------ snote_thread.cpp | 24 +++++++ snote_thread.hpp | 14 ++++ thread.cpp | 45 ------------- thread.hpp | 33 +--------- watchdog_thread.cpp | 87 +++++++++++++++++++------ watchdog_thread.hpp | 16 +---- 16 files changed, 298 insertions(+), 305 deletions(-) create mode 100644 snote_thread.cpp create mode 100644 snote_thread.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0768815..fc299c8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,13 +6,10 @@ project(xbot LANGUAGES C CXX ) -find_package(PkgConfig REQUIRED) - -pkg_check_modules(LIBIDN IMPORTED_TARGET libidn) find_package(Boost REQUIRED) -find_package(OpenSSL REQUIRED) include(FetchContent) + FetchContent_Declare( tomlplusplus GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git @@ -20,5 +17,12 @@ FetchContent_Declare( ) FetchContent_MakeAvailable(tomlplusplus) -add_executable(xbot main.cpp ircmsg.cpp settings.cpp connection.cpp thread.cpp watchdog_thread.cpp irc_parse_thread.cpp write_irc.cpp ping_thread.cpp registration_thread.cpp) -target_link_libraries(xbot PRIVATE Boost::headers OpenSSL::SSL tomlplusplus_tomlplusplus) +FetchContent_Declare( + eventpp + GIT_REPOSITORY https://github.com/wqking/eventpp.git + GIT_TAG v0.1.3 +) +FetchContent_MakeAvailable(eventpp) + +add_executable(xbot main.cpp ircmsg.cpp settings.cpp connection.cpp thread.cpp snote_thread.cpp watchdog_thread.cpp write_irc.cpp ping_thread.cpp irc_parse_thread.cpp registration_thread.cpp) +target_link_libraries(xbot PRIVATE Boost::headers tomlplusplus_tomlplusplus eventpp) diff --git a/connection.cpp b/connection.cpp index 984a090..16299a2 100644 --- a/connection.cpp +++ b/connection.cpp @@ -6,16 +6,6 @@ Connection::Connection(boost::asio::io_context & io) { } -auto Connection::add_thread(std::shared_ptr thread) -> void -{ - dispatcher_.add_thread(std::move(thread)); -} - -auto Connection::add_event(std::shared_ptr event) -> void -{ - dispatcher_.dispatch(std::move(event)); -} - auto Connection::writer_() -> void { std::vector buffers; @@ -100,7 +90,6 @@ auto Connection::connect( auto Connection::write_raw(std::string message) -> void { - std::cout << "Writing " << message; auto const need_cancel = write_strings_.empty(); write_strings_.push_back(std::move(message)); if (need_cancel) diff --git a/connection.hpp b/connection.hpp index aa02679..83ab78d 100644 --- a/connection.hpp +++ b/connection.hpp @@ -4,9 +4,13 @@ #include "settings.hpp" #include "thread.hpp" +#include +#include + #include #include +#include #include #include #include @@ -17,6 +21,8 @@ #include #include #include +#include +#include struct ConnectEvent : Event { @@ -34,22 +40,60 @@ struct LineEvent : Event class Connection : public std::enable_shared_from_this { + using EventDispatcher = eventpp::EventDispatcher; + +public: + template + class Handle + { + EventDispatcher::Handle handle; + Handle(EventDispatcher::Handle handle) : handle{handle} {} + public: + Handle() : handle{} {} + friend Connection; + }; + +private: boost::asio::ip::tcp::socket stream_; boost::asio::steady_timer write_timer_; std::list write_strings_; - Dispatcher dispatcher_; + EventDispatcher dispatcher_; auto writer() -> void; auto writer_() -> void; public: Connection(boost::asio::io_context & io); - auto add_thread(std::shared_ptr thread) -> void; - auto add_event(std::shared_ptr event) -> void; + + template + auto add_listener(F f) -> Handle + { + return Handle{dispatcher_.appendListener( + typeid(T), + eventpp::argumentAdapter(f) + )}; + } + + template + auto remove_listener(Handle handle) -> void + { + dispatcher_.removeListener(typeid(T), handle.handle); + } + + template + auto dispatch(T& event) -> void + { + dispatcher_.dispatch(typeid(T), event); + } + + auto get_executor() -> boost::asio::any_io_executor { + return stream_.get_executor(); + } template auto make_event(Args&& ... args) { - add_event(std::make_shared(std::forward(args)...)); + auto event = T{std::forward(args)...}; + dispatch(event); } /// Write bytes into the socket. Messages should be properly newline terminated. diff --git a/irc_parse_thread.cpp b/irc_parse_thread.cpp index e9c104b..8af0370 100644 --- a/irc_parse_thread.cpp +++ b/irc_parse_thread.cpp @@ -2,20 +2,10 @@ #include "connection.hpp" -IrcParseThread::IrcParseThread(Connection * connection) noexcept -: connection_{connection} {} - -auto IrcParseThread::priority() const -> priority_type +auto irc_parse_thread(Connection * connection) -> void { - return 0; -} - -auto IrcParseThread::on_event(Event const& event) -> callback_result -{ - if (auto line_event = dynamic_cast(&event)) + connection->add_listener([connection](LineEvent const& event) { - connection_->make_event(parse_irc_message(line_event->line)); - return { ThreadOutcome::Continue, EventOutcome::Consume }; - } - return {}; + connection->make_event(parse_irc_message(event.line)); + }); } \ No newline at end of file diff --git a/irc_parse_thread.hpp b/irc_parse_thread.hpp index 47a0b51..b602d2d 100644 --- a/irc_parse_thread.hpp +++ b/irc_parse_thread.hpp @@ -10,11 +10,4 @@ struct IrcMsgEvent : Event IrcMsg irc; }; -struct IrcParseThread : Thread -{ - Connection * connection_; - - IrcParseThread(Connection * connection) noexcept; - auto priority() const -> priority_type override; - auto on_event(Event const& event) -> callback_result override; -}; \ No newline at end of file +auto irc_parse_thread(Connection * connection) -> void; diff --git a/main.cpp b/main.cpp index b66229a..0a898e3 100644 --- a/main.cpp +++ b/main.cpp @@ -1,15 +1,17 @@ #include -#include #include "connection.hpp" #include "ircmsg.hpp" #include "linebuffer.hpp" #include "settings.hpp" #include "thread.hpp" +#include "write_irc.hpp" #include "irc_parse_thread.hpp" -#include "ping_thread.hpp" #include "registration_thread.hpp" +#include "ping_thread.hpp" +#include "watchdog_thread.hpp" +#include "snote_thread.hpp" #include #include @@ -30,40 +32,13 @@ using namespace std::chrono_literals; - -struct ChatThread : public Thread +auto unhandled_message_thread(Connection * connection) -> void { - auto priority() const -> priority_type override + connection->add_listener([](IrcMsgEvent const& event) { - return 100; - } - auto on_event(Event const& event) -> callback_result override - { - if (auto const* irc_event = dynamic_cast(&event)) + if (not event.handled_) { - auto const& irc = irc_event->irc; - if (irc.command == "PRIVMSG" && 2 == irc.args.size()) - { - std::cout << "Chat from " << irc.source << ": " << irc.args[1] << std::endl; - return { ThreadOutcome::Continue, EventOutcome::Consume }; - } - } - return {}; - } -}; - -struct UnhandledThread : public Thread -{ - auto priority() const -> priority_type override - { - return std::numeric_limits::max(); - } - - auto on_event(Event const& event) -> callback_result override - { - if (auto irc_event = dynamic_cast(&event)) - { - auto& irc = irc_event->irc; + auto& irc = event.irc; std::cout << "Unhandled message " << irc.command; for (auto const arg : irc.args) { @@ -71,19 +46,19 @@ struct UnhandledThread : public Thread } std::cout << "\n"; } - return {}; - } -}; + }); +} auto start(boost::asio::io_context & io, Settings const& settings) -> void { auto connection = std::make_shared(io); - connection->add_thread(std::make_shared(connection.get())); - connection->add_thread(std::make_shared(connection.get())); - connection->add_thread(std::make_shared(connection.get(), settings.password, settings.username, settings.realname, settings.nickname)); - connection->add_thread(std::make_shared()); - connection->add_thread(std::make_shared()); + watchdog_thread(connection.get()); + irc_parse_thread(connection.get()); + ping_thread(connection.get()); + registration_thread(connection.get(), settings.password, settings.username, settings.realname, settings.nickname); + snote_thread(connection.get()); + unhandled_message_thread(connection.get()); boost::asio::co_spawn( io, diff --git a/ping_thread.cpp b/ping_thread.cpp index 35bf745..6c7f949 100644 --- a/ping_thread.cpp +++ b/ping_thread.cpp @@ -3,23 +3,15 @@ #include "irc_parse_thread.hpp" #include "write_irc.hpp" -PingThread::PingThread(Connection * connection) noexcept : connection_{connection} {} - -auto PingThread::priority() const -> priority_type +auto ping_thread(Connection * connection) -> void { - return 1; -} - -auto PingThread::on_event(Event const& event) -> std::pair -{ - if (auto const irc_event = dynamic_cast(&event)) + connection->add_listener([connection](IrcMsgEvent& event) { - auto& irc = irc_event->irc; + auto& irc = event.irc; if ("PING" == irc.command && 1 == irc.args.size()) { - write_irc(*connection_, "PONG", irc.args[0]); - return {ThreadOutcome::Continue, EventOutcome::Consume}; + write_irc(*connection, "PONG", irc.args[0]); + event.handled_ = true; } - } - return {}; + }); } diff --git a/ping_thread.hpp b/ping_thread.hpp index b7eae77..a45a31c 100644 --- a/ping_thread.hpp +++ b/ping_thread.hpp @@ -3,13 +3,4 @@ #include "connection.hpp" #include "thread.hpp" -class PingThread : public Thread -{ - Connection * connection_; - -public: - PingThread(Connection * connection) noexcept; - - auto priority() const -> priority_type override; - auto on_event(Event const& event) -> std::pair override; -}; +auto ping_thread(Connection * connection) -> void; diff --git a/registration_thread.cpp b/registration_thread.cpp index dc1279b..88ce87f 100644 --- a/registration_thread.cpp +++ b/registration_thread.cpp @@ -1,13 +1,59 @@ #include "registration_thread.hpp" +#include +#include +#include + +namespace { +struct RegistrationThread : std::enable_shared_from_this +{ + Connection * connection_; + std::string password_; + std::string username_; + std::string realname_; + std::string nickname_; + + std::unordered_map caps; + std::unordered_set outstanding; + + Connection::Handle connect_handle_; + Connection::Handle message_handle_; + + enum class Stage + { + LsReply, + AckReply, + }; + + Stage stage_; + + RegistrationThread( + Connection * connection_, + std::string password, + std::string username, + std::string realname, + std::string nickname + ); + + auto on_connect() -> void; + + auto send_req() -> void; + + auto capack(IrcMsg const& msg) -> void; + + auto capls(IrcMsg const& msg) -> void; + + auto on_msg(IrcMsg const& msg) -> void; +}; + RegistrationThread::RegistrationThread( - Connection * connection_, + Connection * connection, std::string password, std::string username, std::string realname, std::string nickname ) - : connection_{connection_} + : connection_{connection} , password_{password} , username_{username} , realname_{realname} @@ -15,24 +61,36 @@ RegistrationThread::RegistrationThread( , stage_{Stage::LsReply} {} -auto RegistrationThread::priority() const -> priority_type -{ - return 2; -} - -auto RegistrationThread::on_connect() -> Thread::callback_result +auto RegistrationThread::on_connect() -> void { write_irc(*connection_, "CAP", "LS", "302"); write_irc(*connection_, "PASS", password_); write_irc(*connection_, "USER", username_, "*", "*", realname_); write_irc(*connection_, "NICK", nickname_); - return {}; + + connection_->remove_listener(connect_handle_); } -auto RegistrationThread::send_req() -> Thread::callback_result +auto RegistrationThread::send_req() -> void { std::string request; - char const* want[] = { "extended-join", "account-notify", "draft/chathistory", "batch", "soju.im/no-implicit-names", "chghost", "setname", "account-tag", "solanum.chat/oper", "solanum.chat/identify-msg", "solanum.chat/realhost", "server-time", "invite-notify", "extended-join" }; + char const* const want[] = { + "extended-join", + "account-notify", + "draft/chathistory", + "batch", + "soju.im/no-implicit-names", + "chghost", + "setname", + "account-tag", + "solanum.chat/oper", + "solanum.chat/identify-msg", + "solanum.chat/realhost", + "server-time", + "invite-notify", + "extended-join" + }; + for (auto cap : want) { if (caps.contains(cap)) @@ -47,16 +105,15 @@ auto RegistrationThread::send_req() -> Thread::callback_result request.pop_back(); write_irc(*connection_, "CAP", "REQ", request); stage_ = Stage::AckReply; - return {ThreadOutcome::Continue, EventOutcome::Consume}; } else { write_irc(*connection_, "CAP", "END"); - return {ThreadOutcome::Finish, EventOutcome::Consume}; + connection_->remove_listener(message_handle_); } } -auto RegistrationThread::capack(IrcMsg const& msg) -> Thread::callback_result +auto RegistrationThread::capack(IrcMsg const& msg) -> void { auto const n = msg.args.size(); if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "ACK" == msg.args[1]) @@ -72,20 +129,12 @@ auto RegistrationThread::capack(IrcMsg const& msg) -> Thread::callback_result if (outstanding.empty()) { write_irc(*connection_, "CAP", "END"); - return {ThreadOutcome::Finish, EventOutcome::Consume}; + connection_->remove_listener(message_handle_); } - else - { - return {ThreadOutcome::Continue, EventOutcome::Consume}; - } - } - else - { - return {}; } } -auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result +auto RegistrationThread::capls(IrcMsg const& msg) -> void { auto const n = msg.args.size(); if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "LS" == msg.args[1]) @@ -105,7 +154,7 @@ auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result } else { - return {}; + return; } auto in = std::istringstream{std::string{*kvs}}; @@ -128,37 +177,37 @@ auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result if (last) { - return send_req(); + send_req(); } - - return {ThreadOutcome::Continue, EventOutcome::Consume}; } - else - { - return {}; - } - } -auto RegistrationThread::on_msg(IrcMsg const& msg) -> Thread::callback_result +auto RegistrationThread::on_msg(IrcMsg const& msg) -> void { switch (stage_) { - case Stage::LsReply: return capls(msg); - case Stage::AckReply: return capack(msg); - default: return {}; + case Stage::LsReply: capls(msg); return; + case Stage::AckReply: capack(msg); return; } } -auto RegistrationThread::on_event(Event const& event) -> Thread::callback_result +} // namespace + +auto registration_thread( + Connection * connection, + std::string password, + std::string username, + std::string realname, + std::string nickname +) -> void { - if (auto const irc_event = dynamic_cast(&event)) + auto thread = std::make_shared(connection, password, username, realname, nickname); + thread->message_handle_ = connection->add_listener([thread](IrcMsgEvent const& event) { - return on_msg(irc_event->irc); - } - if (auto const connect_event = dynamic_cast(&event)) + thread->on_msg(event.irc); + }); + thread->connect_handle_ = connection->add_listener([thread](ConnectEvent const&) { - return on_connect(); - } - return {}; + thread->on_connect(); + }); } diff --git a/registration_thread.hpp b/registration_thread.hpp index afd1050..bcf780d 100644 --- a/registration_thread.hpp +++ b/registration_thread.hpp @@ -5,47 +5,14 @@ #include "irc_parse_thread.hpp" #include "write_irc.hpp" +#include + #include -#include -#include -struct RegistrationThread : Thread -{ - Connection * connection_; - std::string password_; - std::string username_; - std::string realname_; - std::string nickname_; - - std::unordered_map caps; - std::unordered_set outstanding; - - enum class Stage - { - LsReply, - AckReply, - }; - - Stage stage_; - - RegistrationThread( - Connection * connection_, - std::string password, - std::string username, - std::string realname, - std::string nickname - ); - - auto priority() const -> priority_type override; - auto on_connect() -> Thread::callback_result; - - auto send_req() -> Thread::callback_result; - - auto capack(IrcMsg const& msg) -> Thread::callback_result; - - auto capls(IrcMsg const& msg) -> Thread::callback_result; - - auto on_msg(IrcMsg const& msg) -> Thread::callback_result; - - auto on_event(Event const& event) -> Thread::callback_result override; -}; \ No newline at end of file +auto registration_thread( + Connection * connection, + std::string password, + std::string username, + std::string realname, + std::string nickname +) -> void; \ No newline at end of file diff --git a/snote_thread.cpp b/snote_thread.cpp new file mode 100644 index 0000000..a17e445 --- /dev/null +++ b/snote_thread.cpp @@ -0,0 +1,24 @@ +#include "snote_thread.hpp" + +#include "irc_parse_thread.hpp" +#include "connection.hpp" + +#include + + +auto snote_thread(Connection * connection) -> void +{ + static char const* const prefix = "*** Notice -- "; + connection->add_listener([connection](IrcMsgEvent& event) + { + auto& irc = event.irc; + if ("NOTICE" == irc.command + && 2 == irc.args.size() + && "*" == irc.args[0] + && irc.args[1].starts_with(prefix)) + { + event.handled_ = true; + connection->make_event(irc.args[1].substr(strlen(prefix))); + } + }); +} diff --git a/snote_thread.hpp b/snote_thread.hpp new file mode 100644 index 0000000..91f66ed --- /dev/null +++ b/snote_thread.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "thread.hpp" + +class Connection; + +// WIP: Use much finer granularity +struct SnoteEvent : Event +{ + SnoteEvent(std::string_view raw) : raw{raw} {} + std::string_view raw; +}; + +auto snote_thread(Connection * connection) -> void; \ No newline at end of file diff --git a/thread.cpp b/thread.cpp index 1f1c4bf..66f8d48 100644 --- a/thread.cpp +++ b/thread.cpp @@ -1,46 +1 @@ #include "thread.hpp" - -auto Dispatcher::add_thread(std::shared_ptr thread) -> void -{ - threads_.push_back(std::move(thread)); -} - -auto Dispatcher::dispatch(std::shared_ptr event) -> void -{ - if (dispatching_) - { - events_.push_back(std::move(event)); - return; - } - - dispatching_ = true; - std::vector> events{std::move(event)}; - while (not events.empty()) - { - for (auto && event : events) - { - std::vector> work; - work.swap(threads_); - std::sort(work.begin(), work.end(), [](auto const& a, auto const& b) { return a->priority() < b->priority(); }); - - std::size_t const n = work.size(); - for (std::size_t i = 0; i < n; i++) - { - auto const [thread_outcome, msg_outcome] = work[i]->on_event(*event); - if (thread_outcome == ThreadOutcome::Continue) - { - threads_.push_back(std::move(work[i])); - } - if (msg_outcome == EventOutcome::Consume) - { - std::move(work.begin() + i + 1, work.end(), std::back_inserter(threads_)); - break; - } - } - } - - events = std::move(events_); - events_.clear(); - } - dispatching_ = false; -} \ No newline at end of file diff --git a/thread.hpp b/thread.hpp index 0f696d9..fc41cdb 100644 --- a/thread.hpp +++ b/thread.hpp @@ -5,38 +5,7 @@ #include #include -enum class EventOutcome -{ - Pass, - Consume, -}; - -enum class ThreadOutcome -{ - Continue, - Finish, -}; - struct Event { virtual ~Event() {} + bool handled_ = false; }; - -struct Thread -{ - using priority_type = std::uint64_t; - using callback_result = std::pair; - virtual ~Thread() {} - virtual auto on_event(Event const& event) -> callback_result { return {}; }; - virtual auto priority() const -> priority_type = 0; -}; - -struct Dispatcher -{ - std::vector> threads_; - std::vector> events_; - bool dispatching_; - - /// Apply a function to all the threads in priority order - auto dispatch(std::shared_ptr event) -> void; - auto add_thread(std::shared_ptr thread) -> void; -}; \ No newline at end of file diff --git a/watchdog_thread.cpp b/watchdog_thread.cpp index 4d479d6..cefccc6 100644 --- a/watchdog_thread.cpp +++ b/watchdog_thread.cpp @@ -2,41 +2,92 @@ #include "connection.hpp" #include "irc_parse_thread.hpp" +#include "write_irc.hpp" + +#include #include +#include using namespace std::chrono_literals; -WatchdogThread::WatchdogThread(Connection * connection) noexcept -: connection_{connection} -{ -} +namespace { -auto WatchdogThread::priority() const -> priority_type +struct WatchdogThread : std::enable_shared_from_this { - return 0; -} - -auto WatchdogThread::on_event(Event const& event) -> std::pair -{ - if (auto const irc_event = dynamic_cast(&event)) + WatchdogThread(Connection * connection) + : connection_{connection} + , timer_{connection->get_executor()} + , tried_ping{false} { - timer_.expires_from_now(30s); - return {}; } - if (auto const connect_event = dynamic_cast(&event)) + + Connection * connection_; + boost::asio::steady_timer timer_; + bool tried_ping; + + auto on_activity() -> void { + tried_ping = false; timer_.expires_from_now(30s); - timer_.async_wait([weak = weak_from_this()](auto error) + } + + auto timeout_token() + { + return [weak = weak_from_this()](auto const& error) { if (not error) { if (auto self = weak.lock()) { - self->connection_->close(); + self->on_timeout(); } } - }); + }; } - return {}; + + auto on_timeout() -> void + { + if (tried_ping) + { + connection_->close(); + } + else + { + write_irc(*connection_, "PING", "watchdog"); + tried_ping = true; + timer_.expires_from_now(30s); + timer_.async_wait(timeout_token()); + } + } + + auto on_connect() -> void + { + on_activity(); + timer_.async_wait(timeout_token()); + } + + auto on_disconnect() -> void + { + timer_.cancel(); + } +}; + +} // namespace + +auto watchdog_thread(Connection * connection) -> void +{ + auto const thread = std::make_shared(connection); + connection->add_listener([thread](auto&) + { + thread->on_connect(); + }); + connection->add_listener([thread](auto&) + { + thread->on_disconnect(); + }); + connection->add_listener([thread](auto&) + { + thread->on_activity(); + }); } diff --git a/watchdog_thread.hpp b/watchdog_thread.hpp index 5923876..4433f7a 100644 --- a/watchdog_thread.hpp +++ b/watchdog_thread.hpp @@ -1,18 +1,4 @@ #pragma once -#include "thread.hpp" -#include - class Connection; - -class WatchdogThread : public Thread, public std::enable_shared_from_this -{ - Connection * connection_; - boost::asio::steady_timer timer_; - -public: - WatchdogThread(Connection * connection) noexcept; - - auto priority() const -> priority_type override; - auto on_event(Event const& event) -> std::pair override; -}; +auto watchdog_thread(Connection * connection) -> void;