pre-eventpp snapshot

This commit is contained in:
Eric Mertens 2023-11-25 09:22:55 -08:00
parent a6b6a4179c
commit 53050bb2a1
19 changed files with 606 additions and 348 deletions

View File

@ -20,5 +20,5 @@ FetchContent_Declare(
) )
FetchContent_MakeAvailable(tomlplusplus) FetchContent_MakeAvailable(tomlplusplus)
add_executable(xbot main.cpp ircmsg.cpp settings.cpp connection.cpp) add_executable(xbot main.cpp ircmsg.cpp settings.cpp connection.cpp thread.cpp watchdog_thread.cpp irc_parse_thread.cpp write_irc.cpp ping_thread.cpp registration_thread.cpp)
target_link_libraries(xbot PRIVATE Boost::headers OpenSSL::SSL tomlplusplus_tomlplusplus) target_link_libraries(xbot PRIVATE Boost::headers OpenSSL::SSL tomlplusplus_tomlplusplus)

View File

@ -1,5 +1,21 @@
#include "connection.hpp" #include "connection.hpp"
Connection::Connection(boost::asio::io_context & io)
: stream_{io}
, write_timer_{io, std::chrono::steady_clock::time_point::max()}
{
}
auto Connection::add_thread(std::shared_ptr<Thread> thread) -> void
{
dispatcher_.add_thread(std::move(thread));
}
auto Connection::add_event(std::shared_ptr<Event> event) -> void
{
dispatcher_.dispatch(std::move(event));
}
auto Connection::writer_() -> void auto Connection::writer_() -> void
{ {
std::vector<boost::asio::const_buffer> buffers; std::vector<boost::asio::const_buffer> buffers;
@ -48,20 +64,24 @@ auto Connection::writer() -> void
auto Connection::connect( auto Connection::connect(
boost::asio::io_context & io, boost::asio::io_context & io,
Settings settings) std::string host,
-> boost::asio::awaitable<void> std::string port
) -> boost::asio::awaitable<void>
{ {
auto self = shared_from_this(); using namespace std::placeholders;
// keep connection alive while coroutine is active
auto const self = shared_from_this();
{ {
auto resolver = boost::asio::ip::tcp::resolver{io}; auto resolver = boost::asio::ip::tcp::resolver{io};
auto const endpoints = co_await resolver.async_resolve(settings.host, settings.service, boost::asio::use_awaitable); auto const endpoints = co_await resolver.async_resolve(host, port, boost::asio::use_awaitable);
auto const endpoint = co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable); auto const endpoint = co_await boost::asio::async_connect(stream_, endpoints, boost::asio::use_awaitable);
self->writer(); make_event<ConnectEvent>();
dispatch(&IrcThread::on_connect);
} }
self->writer();
for(LineBuffer buffer{32'768};;) for(LineBuffer buffer{32'768};;)
{ {
boost::system::error_code error; boost::system::error_code error;
@ -71,38 +91,25 @@ auto Connection::connect(
break; break;
} }
buffer.add_bytes(n, [this](char * line) { buffer.add_bytes(n, [this](char * line) {
dispatch<ircmsg const&>(&IrcThread::on_msg, parse_irc_message(line)); make_event<LineEvent>(line);
}); });
} }
dispatch(&IrcThread::on_disconnect); make_event<DisconnectEvent>();
} }
auto Connection::write(std::string message) -> void auto Connection::write_raw(std::string message) -> void
{ {
std::cout << "Writing " << message;
auto const need_cancel = write_strings_.empty(); auto const need_cancel = write_strings_.empty();
message += "\r\n";
write_strings_.push_back(std::move(message)); write_strings_.push_back(std::move(message));
if (need_cancel) if (need_cancel)
{ {
write_timer_.cancel_one(); write_timer_.cancel_one();
} }
} }
auto Connection::write(std::string front, std::string_view last) -> void auto Connection::close() -> void
{ {
auto const is_invalid = [](char x) -> bool { stream_.close();
return x == '\0' || x == '\r' || x == '\n';
};
if (last.end() != std::find_if(last.begin(), last.end(), is_invalid))
{
throw std::runtime_error{"bad irc argument"};
}
front += " :";
front += last;
write(std::move(front));
} }

View File

@ -1,14 +1,13 @@
#pragma once #pragma once
#include "irc_thread.hpp"
#include "ircmsg.hpp"
#include "linebuffer.hpp" #include "linebuffer.hpp"
#include "settings.hpp" #include "settings.hpp"
#include "thread.hpp"
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <chrono> #include <chrono>
#include <coroutine> #include <concepts>
#include <iostream> #include <iostream>
#include <list> #include <list>
#include <memory> #include <memory>
@ -19,79 +18,49 @@
#include <variant> #include <variant>
#include <vector> #include <vector>
struct ConnectEvent : Event
{
};
struct DisconnectEvent : Event
{
};
struct LineEvent : Event
{
explicit LineEvent(char * line) : line{line} {}
char * line;
};
class Connection : public std::enable_shared_from_this<Connection> class Connection : public std::enable_shared_from_this<Connection>
{ {
boost::asio::ip::tcp::socket stream_; boost::asio::ip::tcp::socket stream_;
boost::asio::steady_timer write_timer_; boost::asio::steady_timer write_timer_;
std::list<std::string> write_strings_; std::list<std::string> write_strings_;
std::vector<std::unique_ptr<IrcThread>> threads_; Dispatcher dispatcher_;
auto writer() -> void; auto writer() -> void;
auto writer_() -> void; auto writer_() -> void;
template <typename... Args>
auto dispatch(
IrcThread::callback_result (IrcThread::* method)(Args...),
Args... args
) -> void
{
std::vector<std::unique_ptr<IrcThread>> work;
work.swap(threads_);
std::sort(work.begin(), work.end(), [](auto const& a, auto const& b) { return a->priority() < b->priority(); });
std::size_t const n = work.size();
for (std::size_t i = 0; i < n; i++)
{
auto const [thread_outcome, msg_outcome] = (work[i].get()->*method)(args...);
if (thread_outcome == ThreadOutcome::Continue)
{
threads_.push_back(std::move(work[i]));
}
if (msg_outcome == EventOutcome::Consume)
{
std::move(work.begin() + i + 1, work.end(), std::back_inserter(threads_));
break;
}
}
}
public: public:
Connection(boost::asio::io_context & io) Connection(boost::asio::io_context & io);
: stream_{io} auto add_thread(std::shared_ptr<Thread> thread) -> void;
, write_timer_{io, std::chrono::steady_clock::time_point::max()} auto add_event(std::shared_ptr<Event> event) -> void;
{
template <typename T, typename... Args>
auto make_event(Args&& ... args) {
add_event(std::make_shared<T>(std::forward<Args>(args)...));
} }
auto listen(std::unique_ptr<IrcThread> thread) -> void /// Write bytes into the socket. Messages should be properly newline terminated.
{ auto write_raw(std::string message) -> void;
threads_.push_back(std::move(thread));
}
auto write(std::string front, std::string_view last) -> void;
template <typename... Args>
auto write(std::string front, std::string_view next, Args ...rest) -> void
{
auto const is_invalid = [](char x) -> bool {
return x == '\0' || x == '\r' || x == '\n' || x == ' ';
};
if (next.empty() || next.end() != std::find_if(next.begin(), next.end(), is_invalid))
{
throw std::runtime_error{"bad irc argument"};
}
front += " ";
front += next;
write(std::move(front), rest...);
}
auto write(std::string message) -> void;
auto connect( auto connect(
boost::asio::io_context & io, boost::asio::io_context & io,
Settings settings std::string host,
std::string port
) -> boost::asio::awaitable<void>; ) -> boost::asio::awaitable<void>;
auto close() -> void;
}; };

21
irc_parse_thread.cpp Normal file
View File

@ -0,0 +1,21 @@
#include "irc_parse_thread.hpp"
#include "connection.hpp"
IrcParseThread::IrcParseThread(Connection * connection) noexcept
: connection_{connection} {}
auto IrcParseThread::priority() const -> priority_type
{
return 0;
}
auto IrcParseThread::on_event(Event const& event) -> callback_result
{
if (auto line_event = dynamic_cast<LineEvent const*>(&event))
{
connection_->make_event<IrcMsgEvent>(parse_irc_message(line_event->line));
return { ThreadOutcome::Continue, EventOutcome::Consume };
}
return {};
}

20
irc_parse_thread.hpp Normal file
View File

@ -0,0 +1,20 @@
#pragma once
#include "thread.hpp"
class Connection;
struct IrcMsgEvent : Event
{
IrcMsgEvent(IrcMsg irc) : irc{irc} {}
IrcMsg irc;
};
struct IrcParseThread : Thread
{
Connection * connection_;
IrcParseThread(Connection * connection) noexcept;
auto priority() const -> priority_type override;
auto on_event(Event const& event) -> callback_result override;
};

View File

@ -1,27 +0,0 @@
#pragma once
#include "ircmsg.hpp"
enum class EventOutcome
{
Pass,
Consume,
};
enum class ThreadOutcome
{
Continue,
Finish,
};
struct IrcThread
{
using priority_type = std::uint64_t;
using callback_result = std::pair<ThreadOutcome, EventOutcome>;
virtual ~IrcThread() {}
virtual auto on_connect() -> callback_result { return {}; }
virtual auto on_disconnect() -> callback_result { return {}; };
virtual auto on_msg(ircmsg const&) -> callback_result { return {}; };
virtual auto priority() const -> priority_type = 0;
};

View File

@ -4,7 +4,7 @@
#include "ircmsg.hpp" #include "ircmsg.hpp"
namespace { namespace {
class parser { class Parser {
char* msg_; char* msg_;
inline static char empty[1]; inline static char empty[1];
@ -13,7 +13,7 @@ class parser {
} }
public: public:
parser(char* msg) : msg_(msg) { Parser(char* msg) : msg_(msg) {
if (msg_ == nullptr) { if (msg_ == nullptr) {
msg_ = empty; msg_ = empty;
} else { } else {
@ -101,10 +101,10 @@ auto parse_irc_tags(char* str) -> std::vector<irctag>
return tags; return tags;
} }
auto parse_irc_message(char* msg) -> ircmsg auto parse_irc_message(char* msg) -> IrcMsg
{ {
parser p {msg}; Parser p {msg};
ircmsg out; IrcMsg out;
/* MESSAGE TAGS */ /* MESSAGE TAGS */
if (p.match('@')) { if (p.match('@')) {
@ -134,7 +134,7 @@ auto parse_irc_message(char* msg) -> ircmsg
return out; return out;
} }
auto ircmsg::hassource() const -> bool auto IrcMsg::hassource() const -> bool
{ {
return source.data() != nullptr; return source.data() != nullptr;
} }

View File

@ -14,16 +14,16 @@ struct irctag
friend auto operator==(irctag const&, irctag const&) -> bool = default; friend auto operator==(irctag const&, irctag const&) -> bool = default;
}; };
struct ircmsg struct IrcMsg
{ {
std::vector<irctag> tags; std::vector<irctag> tags;
std::vector<std::string_view> args; std::vector<std::string_view> args;
std::string_view source; std::string_view source;
std::string_view command; std::string_view command;
ircmsg() = default; IrcMsg() = default;
ircmsg( IrcMsg(
std::vector<irctag> && tags, std::vector<irctag> && tags,
std::string_view source, std::string_view source,
std::string_view command, std::string_view command,
@ -35,7 +35,7 @@ struct ircmsg
bool hassource() const; bool hassource() const;
friend bool operator==(ircmsg const&, ircmsg const&) = default; friend bool operator==(IrcMsg const&, IrcMsg const&) = default;
}; };
enum class irc_error_code { enum class irc_error_code {
@ -57,6 +57,6 @@ struct irc_parse_error : public std::exception {
* *
* Returns zero for success, non-zero for parse error. * Returns zero for success, non-zero for parse error.
*/ */
auto parse_irc_message(char* msg) -> ircmsg; auto parse_irc_message(char* msg) -> IrcMsg;
auto parse_irc_tags(char* msg) -> std::vector<irctag>; auto parse_irc_tags(char* msg) -> std::vector<irctag>;

242
main.cpp
View File

@ -1,11 +1,15 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/asio/ssl.hpp> #include <boost/asio/ssl.hpp>
#include "linebuffer.hpp"
#include "ircmsg.hpp"
#include "settings.hpp"
#include "irc_thread.hpp"
#include "connection.hpp" #include "connection.hpp"
#include "ircmsg.hpp"
#include "linebuffer.hpp"
#include "settings.hpp"
#include "thread.hpp"
#include "irc_parse_thread.hpp"
#include "ping_thread.hpp"
#include "registration_thread.hpp"
#include <algorithm> #include <algorithm>
#include <chrono> #include <chrono>
@ -27,254 +31,68 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
struct ChatThread : public IrcThread struct ChatThread : public Thread
{ {
auto priority() const -> priority_type override auto priority() const -> priority_type override
{ {
return 100; return 100;
} }
auto on_msg(ircmsg const& irc) -> std::pair<ThreadOutcome, EventOutcome> override auto on_event(Event const& event) -> callback_result override
{ {
if (auto const* irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
{
auto const& irc = irc_event->irc;
if (irc.command == "PRIVMSG" && 2 == irc.args.size()) if (irc.command == "PRIVMSG" && 2 == irc.args.size())
{ {
std::cout << "Chat from " << irc.source << ": " << irc.args[1] << std::endl; std::cout << "Chat from " << irc.source << ": " << irc.args[1] << std::endl;
return {ThreadOutcome::Continue, EventOutcome::Pass}; return { ThreadOutcome::Continue, EventOutcome::Consume };
} }
else
{
return {ThreadOutcome::Continue, EventOutcome::Pass};
} }
return {};
} }
}; };
struct UnhandledThread : public IrcThread struct UnhandledThread : public Thread
{ {
auto priority() const -> priority_type override auto priority() const -> priority_type override
{ {
return std::numeric_limits<IrcThread::priority_type>::max(); return std::numeric_limits<Thread::priority_type>::max();
} }
auto on_msg(ircmsg const& irc) -> std::pair<ThreadOutcome, EventOutcome> override auto on_event(Event const& event) -> callback_result override
{ {
if (auto irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
{
auto& irc = irc_event->irc;
std::cout << "Unhandled message " << irc.command; std::cout << "Unhandled message " << irc.command;
for (auto const arg : irc.args) for (auto const arg : irc.args)
{ {
std::cout << " " << arg; std::cout << " " << arg;
} }
std::cout << "\n"; std::cout << "\n";
return {ThreadOutcome::Continue, EventOutcome::Pass};
} }
};
class PingThread : public IrcThread
{
Connection * connection_;
public:
PingThread(Connection * connection) noexcept : connection_{connection} {}
auto priority() const -> priority_type override
{
return 0;
}
auto on_msg(ircmsg const& irc) -> std::pair<ThreadOutcome, EventOutcome> override
{
if (irc.command == "PING" && 1 == irc.args.size())
{
connection_->write("PONG", irc.args[0]);
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
return {}; return {};
} }
}
};
struct RegistrationThread : IrcThread
{
Connection * connection_;
std::string password_;
std::string username_;
std::string realname_;
std::string nickname_;
std::unordered_map<std::string, std::string> caps;
std::unordered_set<std::string> outstanding;
enum class Stage
{
LsReply,
AckReply,
};
Stage stage_;
RegistrationThread(
Connection * connection_,
std::string password,
std::string username,
std::string realname,
std::string nickname
)
: connection_{connection_}
, password_{password}
, username_{username}
, realname_{realname}
, nickname_{nickname}
, stage_{Stage::LsReply}
{}
auto priority() const -> priority_type override { return 1; }
auto on_connect() -> IrcThread::callback_result override
{
connection_->write("CAP", "LS", "302");
connection_->write("PASS", password_);
connection_->write("USER", username_, "*", "*", realname_);
connection_->write("NICK", nickname_);
return {};
}
auto send_req() -> IrcThread::callback_result
{
std::string request;
char const* want[] = { "extended-join", "account-notify", "draft/chathistory", "batch", "soju.im/no-implicit-names", "chghost", "setname", "account-tag", "solanum.chat/oper", "solanum.chat/identify-msg", "solanum.chat/realhost", "server-time", "invite-notify", "extended-join" };
for (auto cap : want)
{
if (caps.contains(cap))
{
request.append(cap);
request.push_back(' ');
outstanding.insert(cap);
}
}
if (not outstanding.empty())
{
request.pop_back();
connection_->write("CAP", "REQ", request);
stage_ = Stage::AckReply;
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
connection_->write("CAP", "END");
return {ThreadOutcome::Finish, EventOutcome::Consume};
}
}
auto capack(ircmsg const& msg) -> IrcThread::callback_result
{
auto const n = msg.args.size();
if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "ACK" == msg.args[1])
{
auto in = std::istringstream{std::string{msg.args[2]}};
std::for_each(
std::istream_iterator<std::string>{in},
std::istream_iterator<std::string>{},
[this](std::string x) {
outstanding.erase(x);
}
);
if (outstanding.empty())
{
connection_->write("CAP","END");
return {ThreadOutcome::Finish, EventOutcome::Consume};
}
else
{
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
}
else
{
return {};
}
}
auto capls(ircmsg const& msg) -> IrcThread::callback_result
{
auto const n = msg.args.size();
if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "LS" == msg.args[1])
{
std::string_view const* kvs;
bool last;
if (3 == n)
{
kvs = &msg.args[2];
last = true;
}
else if (4 == n && "*" == msg.args[2])
{
kvs = &msg.args[3];
last = false;
}
else
{
return {};
}
auto in = std::istringstream{std::string{*kvs}};
std::for_each(
std::istream_iterator<std::string>{in},
std::istream_iterator<std::string>{},
[this](std::string x) {
auto const eq = x.find('=');
if (eq == x.npos)
{
caps.emplace(x, std::string{});
}
else
{
caps.emplace(std::string{x, 0, eq}, std::string{x, eq+1, x.npos});
}
}
);
if (last)
{
return send_req();
}
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
return {};
}
}
auto on_msg(ircmsg const& msg) -> IrcThread::callback_result override
{
switch (stage_)
{
case Stage::LsReply: return capls(msg);
case Stage::AckReply: return capack(msg);
default: return {};
}
}
}; };
auto start(boost::asio::io_context & io, Settings const& settings) -> void auto start(boost::asio::io_context & io, Settings const& settings) -> void
{ {
auto connection = std::make_shared<Connection>(io); auto connection = std::make_shared<Connection>(io);
connection->listen(std::make_unique<PingThread>(connection.get())); connection->add_thread(std::make_shared<IrcParseThread>(connection.get()));
connection->listen(std::make_unique<RegistrationThread>(connection.get(), settings.password, settings.username, settings.realname, settings.nickname)); connection->add_thread(std::make_shared<PingThread>(connection.get()));
connection->listen(std::make_unique<ChatThread>()); connection->add_thread(std::make_shared<RegistrationThread>(connection.get(), settings.password, settings.username, settings.realname, settings.nickname));
connection->listen(std::make_unique<UnhandledThread>()); connection->add_thread(std::make_shared<ChatThread>());
connection->add_thread(std::make_shared<UnhandledThread>());
boost::asio::co_spawn( boost::asio::co_spawn(
io, io,
connection->connect(io, settings), connection->connect(io, settings.host, settings.service),
[&io, &settings](std::exception_ptr e) [&io, &settings](std::exception_ptr e)
{ {
auto timer = boost::asio::steady_timer{io}; auto timer = std::make_shared<boost::asio::steady_timer>(io);
timer.expires_from_now(5s); timer->expires_from_now(5s);
timer.async_wait([&io, &settings](auto) { timer->async_wait([&io, &settings, timer](auto) {
start(io, settings); start(io, settings);
}); });
}); });

25
ping_thread.cpp Normal file
View File

@ -0,0 +1,25 @@
#include "ping_thread.hpp"
#include "irc_parse_thread.hpp"
#include "write_irc.hpp"
PingThread::PingThread(Connection * connection) noexcept : connection_{connection} {}
auto PingThread::priority() const -> priority_type
{
return 1;
}
auto PingThread::on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome>
{
if (auto const irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
{
auto& irc = irc_event->irc;
if ("PING" == irc.command && 1 == irc.args.size())
{
write_irc(*connection_, "PONG", irc.args[0]);
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
}
return {};
}

15
ping_thread.hpp Normal file
View File

@ -0,0 +1,15 @@
#pragma once
#include "connection.hpp"
#include "thread.hpp"
class PingThread : public Thread
{
Connection * connection_;
public:
PingThread(Connection * connection) noexcept;
auto priority() const -> priority_type override;
auto on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome> override;
};

164
registration_thread.cpp Normal file
View File

@ -0,0 +1,164 @@
#include "registration_thread.hpp"
RegistrationThread::RegistrationThread(
Connection * connection_,
std::string password,
std::string username,
std::string realname,
std::string nickname
)
: connection_{connection_}
, password_{password}
, username_{username}
, realname_{realname}
, nickname_{nickname}
, stage_{Stage::LsReply}
{}
auto RegistrationThread::priority() const -> priority_type
{
return 2;
}
auto RegistrationThread::on_connect() -> Thread::callback_result
{
write_irc(*connection_, "CAP", "LS", "302");
write_irc(*connection_, "PASS", password_);
write_irc(*connection_, "USER", username_, "*", "*", realname_);
write_irc(*connection_, "NICK", nickname_);
return {};
}
auto RegistrationThread::send_req() -> Thread::callback_result
{
std::string request;
char const* want[] = { "extended-join", "account-notify", "draft/chathistory", "batch", "soju.im/no-implicit-names", "chghost", "setname", "account-tag", "solanum.chat/oper", "solanum.chat/identify-msg", "solanum.chat/realhost", "server-time", "invite-notify", "extended-join" };
for (auto cap : want)
{
if (caps.contains(cap))
{
request.append(cap);
request.push_back(' ');
outstanding.insert(cap);
}
}
if (not outstanding.empty())
{
request.pop_back();
write_irc(*connection_, "CAP", "REQ", request);
stage_ = Stage::AckReply;
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
write_irc(*connection_, "CAP", "END");
return {ThreadOutcome::Finish, EventOutcome::Consume};
}
}
auto RegistrationThread::capack(IrcMsg const& msg) -> Thread::callback_result
{
auto const n = msg.args.size();
if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "ACK" == msg.args[1])
{
auto in = std::istringstream{std::string{msg.args[2]}};
std::for_each(
std::istream_iterator<std::string>{in},
std::istream_iterator<std::string>{},
[this](std::string x) {
outstanding.erase(x);
}
);
if (outstanding.empty())
{
write_irc(*connection_, "CAP", "END");
return {ThreadOutcome::Finish, EventOutcome::Consume};
}
else
{
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
}
else
{
return {};
}
}
auto RegistrationThread::capls(IrcMsg const& msg) -> Thread::callback_result
{
auto const n = msg.args.size();
if ("CAP" == msg.command && n >= 2 && "*" == msg.args[0] && "LS" == msg.args[1])
{
std::string_view const* kvs;
bool last;
if (3 == n)
{
kvs = &msg.args[2];
last = true;
}
else if (4 == n && "*" == msg.args[2])
{
kvs = &msg.args[3];
last = false;
}
else
{
return {};
}
auto in = std::istringstream{std::string{*kvs}};
std::for_each(
std::istream_iterator<std::string>{in},
std::istream_iterator<std::string>{},
[this](std::string x) {
auto const eq = x.find('=');
if (eq == x.npos)
{
caps.emplace(x, std::string{});
}
else
{
caps.emplace(std::string{x, 0, eq}, std::string{x, eq+1, x.npos});
}
}
);
if (last)
{
return send_req();
}
return {ThreadOutcome::Continue, EventOutcome::Consume};
}
else
{
return {};
}
}
auto RegistrationThread::on_msg(IrcMsg const& msg) -> Thread::callback_result
{
switch (stage_)
{
case Stage::LsReply: return capls(msg);
case Stage::AckReply: return capack(msg);
default: return {};
}
}
auto RegistrationThread::on_event(Event const& event) -> Thread::callback_result
{
if (auto const irc_event = dynamic_cast<IrcMsgEvent const*>(&event))
{
return on_msg(irc_event->irc);
}
if (auto const connect_event = dynamic_cast<ConnectEvent const*>(&event))
{
return on_connect();
}
return {};
}

51
registration_thread.hpp Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#include "thread.hpp"
#include "connection.hpp"
#include "irc_parse_thread.hpp"
#include "write_irc.hpp"
#include <string>
#include <unordered_set>
#include <unordered_map>
struct RegistrationThread : Thread
{
Connection * connection_;
std::string password_;
std::string username_;
std::string realname_;
std::string nickname_;
std::unordered_map<std::string, std::string> caps;
std::unordered_set<std::string> outstanding;
enum class Stage
{
LsReply,
AckReply,
};
Stage stage_;
RegistrationThread(
Connection * connection_,
std::string password,
std::string username,
std::string realname,
std::string nickname
);
auto priority() const -> priority_type override;
auto on_connect() -> Thread::callback_result;
auto send_req() -> Thread::callback_result;
auto capack(IrcMsg const& msg) -> Thread::callback_result;
auto capls(IrcMsg const& msg) -> Thread::callback_result;
auto on_msg(IrcMsg const& msg) -> Thread::callback_result;
auto on_event(Event const& event) -> Thread::callback_result override;
};

46
thread.cpp Normal file
View File

@ -0,0 +1,46 @@
#include "thread.hpp"
auto Dispatcher::add_thread(std::shared_ptr<Thread> thread) -> void
{
threads_.push_back(std::move(thread));
}
auto Dispatcher::dispatch(std::shared_ptr<Event> event) -> void
{
if (dispatching_)
{
events_.push_back(std::move(event));
return;
}
dispatching_ = true;
std::vector<std::shared_ptr<Event>> events{std::move(event)};
while (not events.empty())
{
for (auto && event : events)
{
std::vector<std::shared_ptr<Thread>> work;
work.swap(threads_);
std::sort(work.begin(), work.end(), [](auto const& a, auto const& b) { return a->priority() < b->priority(); });
std::size_t const n = work.size();
for (std::size_t i = 0; i < n; i++)
{
auto const [thread_outcome, msg_outcome] = work[i]->on_event(*event);
if (thread_outcome == ThreadOutcome::Continue)
{
threads_.push_back(std::move(work[i]));
}
if (msg_outcome == EventOutcome::Consume)
{
std::move(work.begin() + i + 1, work.end(), std::back_inserter(threads_));
break;
}
}
}
events = std::move(events_);
events_.clear();
}
dispatching_ = false;
}

42
thread.hpp Normal file
View File

@ -0,0 +1,42 @@
#pragma once
#include "ircmsg.hpp"
#include <memory>
#include <vector>
enum class EventOutcome
{
Pass,
Consume,
};
enum class ThreadOutcome
{
Continue,
Finish,
};
struct Event {
virtual ~Event() {}
};
struct Thread
{
using priority_type = std::uint64_t;
using callback_result = std::pair<ThreadOutcome, EventOutcome>;
virtual ~Thread() {}
virtual auto on_event(Event const& event) -> callback_result { return {}; };
virtual auto priority() const -> priority_type = 0;
};
struct Dispatcher
{
std::vector<std::shared_ptr<Thread>> threads_;
std::vector<std::shared_ptr<Event>> events_;
bool dispatching_;
/// Apply a function to all the threads in priority order
auto dispatch(std::shared_ptr<Event> event) -> void;
auto add_thread(std::shared_ptr<Thread> thread) -> void;
};

42
watchdog_thread.cpp Normal file
View File

@ -0,0 +1,42 @@
#include "watchdog_thread.hpp"
#include "connection.hpp"
#include "irc_parse_thread.hpp"
#include <chrono>
using namespace std::chrono_literals;
WatchdogThread::WatchdogThread(Connection * connection) noexcept
: connection_{connection}
{
}
auto WatchdogThread::priority() const -> priority_type
{
return 0;
}
auto WatchdogThread::on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome>
{
if (auto const irc_event = dynamic_cast<LineEvent const*>(&event))
{
timer_.expires_from_now(30s);
return {};
}
if (auto const connect_event = dynamic_cast<ConnectEvent const*>(&event))
{
timer_.expires_from_now(30s);
timer_.async_wait([weak = weak_from_this()](auto error)
{
if (not error)
{
if (auto self = weak.lock())
{
self->connection_->close();
}
}
});
}
return {};
}

18
watchdog_thread.hpp Normal file
View File

@ -0,0 +1,18 @@
#pragma once
#include "thread.hpp"
#include <boost/asio/steady_timer.hpp>
class Connection;
class WatchdogThread : public Thread, public std::enable_shared_from_this<WatchdogThread>
{
Connection * connection_;
boost::asio::steady_timer timer_;
public:
WatchdogThread(Connection * connection) noexcept;
auto priority() const -> priority_type override;
auto on_event(Event const& event) -> std::pair<ThreadOutcome, EventOutcome> override;
};

23
write_irc.cpp Normal file
View File

@ -0,0 +1,23 @@
#include "write_irc.hpp"
auto write_irc(Connection& connection, std::string message) -> void
{
message += "\r\n";
connection.write_raw(std::move(message));
}
auto write_irc(Connection& connection, std::string front, std::string_view last) -> void
{
auto const is_invalid = [](char x) -> bool {
return x == '\0' || x == '\r' || x == '\n';
};
if (last.end() != std::find_if(last.begin(), last.end(), is_invalid))
{
throw std::runtime_error{"bad irc argument"};
}
front += " :";
front += last;
write_irc(connection, std::move(front));
}

24
write_irc.hpp Normal file
View File

@ -0,0 +1,24 @@
#pragma once
#include "connection.hpp"
auto write_irc(Connection& connection, std::string message) -> void;
auto write_irc(Connection& connection, std::string front, std::string_view last) -> void;
template <typename... Args>
auto write_irc(Connection& connection, std::string front, std::string_view next, Args ...rest) -> void
{
auto const is_invalid = [](char x) -> bool {
return x == '\0' || x == '\r' || x == '\n' || x == ' ';
};
if (next.empty() || next.end() != std::find_if(next.begin(), next.end(), is_invalid))
{
throw std::runtime_error{"bad irc argument"};
}
front += " ";
front += next;
write_irc(connection, std::move(front), rest...);
}