Switch to eventpp

This commit is contained in:
Eric Mertens 2023-11-25 20:09:20 -08:00
parent 53050bb2a1
commit 959d51f5f4
16 changed files with 298 additions and 305 deletions

View File

@ -6,13 +6,10 @@ project(xbot
LANGUAGES C CXX
)
find_package(PkgConfig REQUIRED)
pkg_check_modules(LIBIDN IMPORTED_TARGET libidn)
find_package(Boost REQUIRED)
find_package(OpenSSL REQUIRED)
include(FetchContent)
FetchContent_Declare(
tomlplusplus
GIT_REPOSITORY https://github.com/marzer/tomlplusplus.git
@ -20,5 +17,12 @@ FetchContent_Declare(
)
FetchContent_MakeAvailable(tomlplusplus)
add_executable(xbot main.cpp ircmsg.cpp settings.cpp connection.cpp thread.cpp watchdog_thread.cpp irc_parse_thread.cpp write_irc.cpp ping_thread.cpp registration_thread.cpp)
target_link_libraries(xbot PRIVATE Boost::headers OpenSSL::SSL tomlplusplus_tomlplusplus)
FetchContent_Declare(
eventpp
GIT_REPOSITORY https://github.com/wqking/eventpp.git
GIT_TAG v0.1.3
)
FetchContent_MakeAvailable(eventpp)
add_executable(xbot main.cpp ircmsg.cpp settings.cpp connection.cpp thread.cpp snote_thread.cpp watchdog_thread.cpp write_irc.cpp ping_thread.cpp irc_parse_thread.cpp registration_thread.cpp)
target_link_libraries(xbot PRIVATE Boost::headers tomlplusplus_tomlplusplus eventpp)

View File

@ -6,16 +6,6 @@ Connection::Connection(boost::asio::io_context & io)
{
}
auto Connection::add_thread(std::shared_ptr<Thread> thread) -> void
{
dispatcher_.add_thread(std::move(thread));
}
auto Connection::add_event(std::shared_ptr<Event> event) -> void
{
dispatcher_.dispatch(std::move(event));
}
auto Connection::writer_() -> void
{
std::vector<boost::asio::const_buffer> buffers;
@ -100,7 +90,6 @@ auto Connection::connect(
auto Connection::write_raw(std::string message) -> void
{
std::cout << "Writing " << message;
auto const need_cancel = write_strings_.empty();
write_strings_.push_back(std::move(message));
if (need_cancel)

View File

@ -4,9 +4,13 @@
#include "settings.hpp"
#include "thread.hpp"
#include <eventpp/eventdispatcher.h>
#include <eventpp/utilities/argumentadapter.h>
#include <boost/asio.hpp>
#include <chrono>
#include <functional>
#include <concepts>
#include <iostream>
#include <list>
@ -17,6 +21,8 @@
#include <utility>
#include <variant>
#include <vector>
#include <typeinfo>
#include <typeindex>
struct ConnectEvent : Event
{
@ -34,22 +40,60 @@ struct LineEvent : Event
class Connection : public std::enable_shared_from_this<Connection>
{
using EventDispatcher = eventpp::EventDispatcher<std::type_index, void(Event&)>;
public:
template <typename T>
class Handle
{
EventDispatcher::Handle handle;
Handle(EventDispatcher::Handle handle) : handle{handle} {}
public:
Handle() : handle{} {}
friend Connection;
};
private:
boost::asio::ip::tcp::socket stream_;
boost::asio::steady_timer write_timer_;
std::list<std::string> write_strings_;
Dispatcher dispatcher_;
EventDispatcher dispatcher_;
auto writer() -> void;
auto writer_() -> void;
public:
Connection(boost::asio::io_context & io);
auto add_thread(std::shared_ptr<Thread> thread) -> void;
auto add_event(std::shared_ptr<Event> event) -> void;
template <typename T, typename F>
auto add_listener(F f) -> Handle<T>
{
return Handle<T>{dispatcher_.appendListener(
typeid(T),
eventpp::argumentAdapter<void(T&)>(f)
)};
}
template <typename T>
auto remove_listener(Handle<T> handle) -> void
{
dispatcher_.removeListener(typeid(T), handle.handle);
}
template <typename T>
auto dispatch(T& event) -> void
{
dispatcher_.dispatch(typeid(T), event);
}
auto get_executor() -> boost::asio::any_io_executor {
return stream_.get_executor();
}
template <typename T, typename... Args>
auto make_event(Args&& ... args) {
add_event(std::make_shared<T>(std::forward<Args>(args)...));
auto event = T{std::forward<Args>(args)...};
dispatch<T>(event);
}
/// Write bytes into the socket. Messages should be properly newline terminated.

View File

@ -2,20 +2,10 @@
#include "connection.hpp"
IrcParseThread::IrcParseThread(Connection * connection) noexcept
: connection_{connection} {}
auto IrcParseThread::priority() const -> priority_type
auto irc_parse_thread(Connection * connection) -> void
{
return 0;
}
auto IrcParseThread::on_event(Event const& event) -> callback_result
{
if (auto line_event = dynamic_cast<LineEvent const*>(&event))
connection->add_listener<LineEvent>([connection](LineEvent const& event)
{
connection_->make_event<IrcMsgEvent>(parse_irc_message(line_event->line));
return { ThreadOutcome::Continue, EventOutcome::Consume };
}
return {};
connection->make_event<IrcMsgEvent>(parse_irc_message(event.line));
});
}

View File

@ -10,11 +10,4 @@ struct IrcMsgEvent : Event
IrcMsg irc;
};
struct IrcParseThread : Thread
{
Connection * connection_;
IrcParseThread(Connection * connection) noexcept;
auto priority() const -> priority_type override;
auto on_event(Event const& event) -> callback_result override;
};
auto irc_parse_thread(Connection * connection) -> void;

View File

@ -1,15 +1,17 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include "connection.hpp"
#include "ircmsg.hpp"
#include "linebuffer.hpp"
#include "settings.hpp"
#include "thread.hpp"
#include "write_irc.hpp"
#include "irc_parse_thread.hpp"
#include "ping_thread.hpp"
#include "registration_thread.hpp"
#include "ping_thread.hpp"
#include "watchdog_thread.hpp"
#include "snote_thread.hpp"
#include <algorithm>
#include <chrono>
@ -30,40 +32,13 @@
using namespace std::chrono_literals;
struct ChatThread : public Thread
auto unhandled_message_thread(Connection * connection) -> void
{
auto priority() const -> priority_type override
connection->add_listener<IrcMsgEvent>([](IrcMsgEvent const& event)
{
return 100;
}
auto on_event(Event const& event) -> callback_result override
{
if (auto const* irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
if (not event.handled_)
{
auto const& irc = irc_event->irc;
if (irc.command == "PRIVMSG" && 2 == irc.args.size())
{
std::cout << "Chat from " << irc.source << ": " << irc.args[1] << std::endl;
return { ThreadOutcome::Continue, EventOutcome::Consume };
}
}
return {};
}
};
struct UnhandledThread : public Thread
{
auto priority() const -> priority_type override
{
return std::numeric_limits<Thread::priority_type>::max();
}
auto on_event(Event const& event) -> callback_result override
{
if (auto irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
{
auto& irc = irc_event->irc;
auto& irc = event.irc;
std::cout << "Unhandled message " << irc.command;
for (auto const arg : irc.args)
{
@ -71,19 +46,19 @@ struct UnhandledThread : public Thread
}
std::cout << "\n";
}
return {};
}
};
});
}
auto start(boost::asio::io_context & io, Settings const& settings) -> void
{
auto connection = std::make_shared<Connection>(io);
connection->add_thread(std::make_shared<IrcParseThread>(connection.get()));
connection->add_thread(std::make_shared<PingThread>(connection.get()));
connection->add_thread(std::make_shared<RegistrationThread>(connection.get(), settings.password, settings.username, settings.realname, settings.nickname));
connection->add_thread(std::make_shared<ChatThread>());
connection->add_thread(std::make_shared<UnhandledThread>());
watchdog_thread(connection.get());
irc_parse_thread(connection.get());
ping_thread(connection.get());
registration_thread(connection.get(), settings.password, settings.username, settings.realname, settings.nickname);
snote_thread(connection.get());
unhandled_message_thread(connection.get());
boost::asio::co_spawn(
io,

View File

@ -3,23 +3,15 @@
#include "irc_parse_thread.hpp"
#include "write_irc.hpp"
PingThread::PingThread(Connection * connection) noexcept : connection_{connection} {}
auto PingThread::priority() const -> priority_type
auto ping_thread(Connection * connection) -> void
{
return 1;
}
auto PingThread::on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome>
{
if (auto const irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
connection->add_listener<IrcMsgEvent>([connection](IrcMsgEvent& event)
{
auto& irc = irc_event->irc;
auto& irc = event.irc;
if ("PING" == irc.command && 1 == irc.args.size())
{
write_irc(*connection_, "PONG", irc.args[0]);
return {ThreadOutcome::Continue, EventOutcome::Consume};
write_irc(*connection, "PONG", irc.args[0]);
event.handled_ = true;
}
}
return {};
});
}

View File

@ -3,13 +3,4 @@
#include "connection.hpp"
#include "thread.hpp"
class PingThread : public Thread
{
Connection * connection_;
public:
PingThread(Connection * connection) noexcept;
auto priority() const -> priority_type override;
auto on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome> override;
};
auto ping_thread(Connection * connection) -> void;

View File

@ -1,13 +1,59 @@
#include "registration_thread.hpp"
#include <memory>
#include <unordered_set>
#include <unordered_map>
namespace {
struct RegistrationThread : std::enable_shared_from_this<RegistrationThread>
{
Connection * connection_;
std::string password_;
std::string username_;
std::string realname_;
std::string nickname_;
std::unordered_map<std::string, std::string> caps;
std::unordered_set<std::string> outstanding;
Connection::Handle<ConnectEvent> connect_handle_;
Connection::Handle<IrcMsgEvent> message_handle_;
enum class Stage
{
LsReply,
AckReply,
};
Stage stage_;
RegistrationThread(
Connection * connection_,
std::string password,
std::string username,
std::string realname,
std::string nickname
);
auto on_connect() -> void;
auto send_req() -> void;
auto capack(IrcMsg const& msg) -> void;
auto capls(IrcMsg const& msg) -> void;
auto on_msg(IrcMsg const& msg) -> void;
};
RegistrationThread::RegistrationThread(
Connection * connection_,
Connection * connection,
std::string password,
std::string username,
std::string realname,
std::string nickname
)
: connection_{connection_}
: connection_{connection}
, password_{password}
, username_{username}
, realname_{realname}
@ -15,24 +61,36 @@ RegistrationThread::RegistrationThread(
, stage_{Stage::LsReply}
{}
auto RegistrationThread::priority() const -> priority_type
{
return 2;
}
auto RegistrationThread::on_connect() -> Thread::callback_result
auto RegistrationThread::on_connect() -> void
{
write_irc(*connection_, "CAP", "LS", "302");
write_irc(*connection_, "PASS", password_);
write_irc(*connection_, "USER", username_, "*", "*", realname_);
write_irc(*connection_, "NICK", nickname_);
return {};
connection_->remove_listener(connect_handle_);
}
auto RegistrationThread::send_req() -> Thread::callback_result
auto RegistrationThread::send_req() -> void
{
std::string request;
char const* want[] = { "extended-join", "account-notify", "draft/chathistory", "batch", "soju.im/no-implicit-names", "chghost", "setname", "account-tag", "solanum.chat/oper", "solanum.chat/identify-msg", "solanum.chat/realhost", "server-time", "invite-notify", "extended-join" };
char const* const want[] = {
"extended-join",
"account-notify",
"draft/chathistory",
"batch",
"soju.im/no-implicit-names",
"chghost",
"setname",
"account-tag",
"solanum.chat/oper",
"solanum.chat/identify-msg",
"solanum.chat/realhost",
"server-time",
"invite-notify",
"extended-join"
};
for (auto cap : want)
{
if (caps.contains(cap))
@ -47,16 +105,15 @@ auto RegistrationThread::send_req() -> Thread::callback_result
request.pop_back();
write_irc(*connection_, "CAP", "REQ", request);
stage_ = Stage::AckReply;
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
write_irc(*connection_, "CAP", "END");
return {ThreadOutcome::Finish, EventOutcome::Consume};
connection_->remove_listener(message_handle_);
}
}
auto RegistrationThread::capack(IrcMsg const& msg) -> Thread::callback_result
auto RegistrationThread::capack(IrcMsg const& msg) -> void
{
auto const n = msg.args.size();
if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "ACK" == msg.args[1])
@ -72,20 +129,12 @@ auto RegistrationThread::capack(IrcMsg const& msg) -> Thread::callback_result
if (outstanding.empty())
{
write_irc(*connection_, "CAP", "END");
return {ThreadOutcome::Finish, EventOutcome::Consume};
connection_->remove_listener(message_handle_);
}
else
{
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
}
else
{
return {};
}
}
auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result
auto RegistrationThread::capls(IrcMsg const& msg) -> void
{
auto const n = msg.args.size();
if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "LS" == msg.args[1])
@ -105,7 +154,7 @@ auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result
}
else
{
return {};
return;
}
auto in = std::istringstream{std::string{*kvs}};
@ -128,37 +177,37 @@ auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result
if (last)
{
return send_req();
send_req();
}
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
return {};
}
}
auto RegistrationThread::on_msg(IrcMsg const& msg) -> Thread::callback_result
auto RegistrationThread::on_msg(IrcMsg const& msg) -> void
{
switch (stage_)
{
case Stage::LsReply: return capls(msg);
case Stage::AckReply: return capack(msg);
default: return {};
case Stage::LsReply: capls(msg); return;
case Stage::AckReply: capack(msg); return;
}
}
auto RegistrationThread::on_event(Event const& event) -> Thread::callback_result
} // namespace
auto registration_thread(
Connection * connection,
std::string password,
std::string username,
std::string realname,
std::string nickname
) -> void
{
if (auto const irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
auto thread = std::make_shared<RegistrationThread>(connection, password, username, realname, nickname);
thread->message_handle_ = connection->add_listener<IrcMsgEvent>([thread](IrcMsgEvent const& event)
{
return on_msg(irc_event->irc);
}
if (auto const connect_event = dynamic_cast<ConnectEvent const*>(&event))
thread->on_msg(event.irc);
});
thread->connect_handle_ = connection->add_listener<ConnectEvent>([thread](ConnectEvent const&)
{
return on_connect();
}
return {};
thread->on_connect();
});
}

View File

@ -5,47 +5,14 @@
#include "irc_parse_thread.hpp"
#include "write_irc.hpp"
#include <eventpp/eventdispatcher.h>
#include <string>
#include <unordered_set>
#include <unordered_map>
struct RegistrationThread : Thread
{
Connection * connection_;
std::string password_;
std::string username_;
std::string realname_;
std::string nickname_;
std::unordered_map<std::string, std::string> caps;
std::unordered_set<std::string> outstanding;
enum class Stage
{
LsReply,
AckReply,
};
Stage stage_;
RegistrationThread(
Connection * connection_,
std::string password,
std::string username,
std::string realname,
std::string nickname
);
auto priority() const -> priority_type override;
auto on_connect() -> Thread::callback_result;
auto send_req() -> Thread::callback_result;
auto capack(IrcMsg const& msg) -> Thread::callback_result;
auto capls(IrcMsg const& msg) -> Thread::callback_result;
auto on_msg(IrcMsg const& msg) -> Thread::callback_result;
auto on_event(Event const& event) -> Thread::callback_result override;
};
auto registration_thread(
Connection * connection,
std::string password,
std::string username,
std::string realname,
std::string nickname
) -> void;

24
snote_thread.cpp Normal file
View File

@ -0,0 +1,24 @@
#include "snote_thread.hpp"
#include "irc_parse_thread.hpp"
#include "connection.hpp"
#include <cstring>
auto snote_thread(Connection * connection) -> void
{
static char const* const prefix = "*** Notice -- ";
connection->add_listener<IrcMsgEvent>([connection](IrcMsgEvent& event)
{
auto& irc = event.irc;
if ("NOTICE" == irc.command
&& 2 == irc.args.size()
&& "*" == irc.args[0]
&& irc.args[1].starts_with(prefix))
{
event.handled_ = true;
connection->make_event<SnoteEvent>(irc.args[1].substr(strlen(prefix)));
}
});
}

14
snote_thread.hpp Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include "thread.hpp"
class Connection;
// WIP: Use much finer granularity
struct SnoteEvent : Event
{
SnoteEvent(std::string_view raw) : raw{raw} {}
std::string_view raw;
};
auto snote_thread(Connection * connection) -> void;

View File

@ -1,46 +1 @@
#include "thread.hpp"
auto Dispatcher::add_thread(std::shared_ptr<Thread> thread) -> void
{
threads_.push_back(std::move(thread));
}
auto Dispatcher::dispatch(std::shared_ptr<Event> event) -> void
{
if (dispatching_)
{
events_.push_back(std::move(event));
return;
}
dispatching_ = true;
std::vector<std::shared_ptr<Event>> events{std::move(event)};
while (not events.empty())
{
for (auto && event : events)
{
std::vector<std::shared_ptr<Thread>> work;
work.swap(threads_);
std::sort(work.begin(), work.end(), [](auto const& a, auto const& b) { return a->priority() < b->priority(); });
std::size_t const n = work.size();
for (std::size_t i = 0; i < n; i++)
{
auto const [thread_outcome, msg_outcome] = work[i]->on_event(*event);
if (thread_outcome == ThreadOutcome::Continue)
{
threads_.push_back(std::move(work[i]));
}
if (msg_outcome == EventOutcome::Consume)
{
std::move(work.begin() + i + 1, work.end(), std::back_inserter(threads_));
break;
}
}
}
events = std::move(events_);
events_.clear();
}
dispatching_ = false;
}

View File

@ -5,38 +5,7 @@
#include <memory>
#include <vector>
enum class EventOutcome
{
Pass,
Consume,
};
enum class ThreadOutcome
{
Continue,
Finish,
};
struct Event {
virtual ~Event() {}
bool handled_ = false;
};
struct Thread
{
using priority_type = std::uint64_t;
using callback_result = std::pair<ThreadOutcome, EventOutcome>;
virtual ~Thread() {}
virtual auto on_event(Event const& event) -> callback_result { return {}; };
virtual auto priority() const -> priority_type = 0;
};
struct Dispatcher
{
std::vector<std::shared_ptr<Thread>> threads_;
std::vector<std::shared_ptr<Event>> events_;
bool dispatching_;
/// Apply a function to all the threads in priority order
auto dispatch(std::shared_ptr<Event> event) -> void;
auto add_thread(std::shared_ptr<Thread> thread) -> void;
};

View File

@ -2,41 +2,92 @@
#include "connection.hpp"
#include "irc_parse_thread.hpp"
#include "write_irc.hpp"
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <memory>
using namespace std::chrono_literals;
WatchdogThread::WatchdogThread(Connection * connection) noexcept
: connection_{connection}
{
}
namespace {
auto WatchdogThread::priority() const -> priority_type
struct WatchdogThread : std::enable_shared_from_this<WatchdogThread>
{
return 0;
}
auto WatchdogThread::on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome>
{
if (auto const irc_event = dynamic_cast<LineEvent const*>(&event))
WatchdogThread(Connection * connection)
: connection_{connection}
, timer_{connection->get_executor()}
, tried_ping{false}
{
timer_.expires_from_now(30s);
return {};
}
if (auto const connect_event = dynamic_cast<ConnectEvent const*>(&event))
Connection * connection_;
boost::asio::steady_timer timer_;
bool tried_ping;
auto on_activity() -> void
{
tried_ping = false;
timer_.expires_from_now(30s);
timer_.async_wait([weak = weak_from_this()](auto error)
}
auto timeout_token()
{
return [weak = weak_from_this()](auto const& error)
{
if (not error)
{
if (auto self = weak.lock())
{
self->connection_->close();
self->on_timeout();
}
}
});
};
}
return {};
auto on_timeout() -> void
{
if (tried_ping)
{
connection_->close();
}
else
{
write_irc(*connection_, "PING", "watchdog");
tried_ping = true;
timer_.expires_from_now(30s);
timer_.async_wait(timeout_token());
}
}
auto on_connect() -> void
{
on_activity();
timer_.async_wait(timeout_token());
}
auto on_disconnect() -> void
{
timer_.cancel();
}
};
} // namespace
auto watchdog_thread(Connection * connection) -> void
{
auto const thread = std::make_shared<WatchdogThread>(connection);
connection->add_listener<ConnectEvent>([thread](auto&)
{
thread->on_connect();
});
connection->add_listener<DisconnectEvent>([thread](auto&)
{
thread->on_disconnect();
});
connection->add_listener<IrcMsgEvent>([thread](auto&)
{
thread->on_activity();
});
}

View File

@ -1,18 +1,4 @@
#pragma once
#include "thread.hpp"
#include <boost/asio/steady_timer.hpp>
class Connection;
class WatchdogThread : public Thread, public std::enable_shared_from_this<WatchdogThread>
{
Connection * connection_;
boost::asio::steady_timer timer_;
public:
WatchdogThread(Connection * connection) noexcept;
auto priority() const -> priority_type override;
auto on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome> override;
};
auto watchdog_thread(Connection * connection) -> void;