#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <mutex>
#include <thread>
#include <chrono>
#include <expected>
#include <optional>
#include <cxxopts.hpp>
#include <nlohmann/json.hpp>
#include <spdlog/spdlog.h>
#include <mw/http_server.hpp>
#include <mw/http_client.hpp>
#include <mw/url.hpp>
#include <mw/error.hpp>
#include <mw/utils.hpp>
using json = nlohmann::json;
class TelegramClient
{
public:
explicit TelegramClient(std::string token)
: token_(std::move(token)),
base_url_(std::format("https://api.telegram.org/bot{}/", token_))
{}
mw::E<json> sendMessage(int64_t chat_id, const std::string& text)
{
mw::HTTPRequest req(base_url_ + "sendMessage");
req.setContentType("application/json");
req.setPayload(json{{"chat_id", chat_id}, {"text", text}}.dump());
mw::HTTPSession session;
auto res = session.post(req);
if(!res.has_value())
{
return std::unexpected(res.error());
}
return json::parse((*res)->payloadAsStr());
}
mw::E<json> getUpdates(int64_t offset, int timeout)
{
std::string url = std::format("{}getUpdates?offset={}&timeout={}",
base_url_, offset, timeout);
mw::HTTPSession session;
auto res = session.get(url);
if(!res.has_value())
{
return std::unexpected(res.error());
}
return json::parse((*res)->payloadAsStr());
}
private:
std::string token_;
std::string base_url_;
};
class SubscriptionManager
{
public:
void addSubscription(int64_t chat_id, std::string callback_url)
{
std::lock_guard lock(mutex_);
subscribers_[chat_id].push_back(std::move(callback_url));
}
std::vector<std::string> getSubscribers(int64_t chat_id)
{
std::lock_guard lock(mutex_);
if(auto it = subscribers_.find(chat_id); it != subscribers_.end())
{
return it->second;
}
return {};
}
private:
std::mutex mutex_;
std::map<int64_t, std::vector<std::string>> subscribers_;
};
class UsernameResolver
{
public:
void update(const std::string& username, int64_t chat_id)
{
std::lock_guard lock(mutex_);
username_map_[username] = chat_id;
}
std::optional<int64_t> resolve(const std::string& username)
{
std::lock_guard lock(mutex_);
if(auto it = username_map_.find(username); it != username_map_.end())
{
return it->second;
}
return std::nullopt;
}
private:
std::mutex mutex_;
std::map<std::string, int64_t> username_map_;
};
class App : public mw::HTTPServer
{
public:
App(mw::IPSocketInfo listen_info, std::string token)
: mw::HTTPServer(listen_info),
tg_client_(std::move(token))
{}
void setup()
{
server.Post("/send", [this](const Request& req, Response& res)
{
try
{
auto body = json::parse(req.body);
if(!body.contains("text"))
{
res.status = 400;
res.set_content("Missing text", "text/plain");
return;
}
int64_t chat_id = 0;
if(body.contains("chat_id"))
{
chat_id = body["chat_id"];
}
else if(body.contains("username"))
{
auto username = body["username"].get<std::string>();
auto resolved = username_resolver_.resolve(username);
if(!resolved.has_value())
{
res.status = 404;
res.set_content("Username not found", "text/plain");
return;
}
chat_id = *resolved;
}
else
{
res.status = 400;
res.set_content("Missing chat_id or username",
"text/plain");
return;
}
ASSIGN_OR_RESPOND_ERROR(auto result,
tg_client_.sendMessage(chat_id,
body["text"]),
res);
res.status = 200;
res.set_content(result.dump(), "application/json");
}
catch(const std::exception& e)
{
res.status = 400;
res.set_content(e.what(), "text/plain");
}
});
server.Post("/subscribe", [this](const Request& req, Response& res)
{
try
{
auto body = json::parse(req.body);
if(!body.contains("chat_id") || !body.contains("callback_url"))
{
res.status = 400;
res.set_content("Missing chat_id or callback_url",
"text/plain");
return;
}
sub_manager_.addSubscription(body["chat_id"],
body["callback_url"]);
res.status = 200;
res.set_content("Subscribed", "text/plain");
}
catch(const std::exception& e)
{
res.status = 400;
res.set_content(e.what(), "text/plain");
}
});
}
void runPolling()
{
int64_t offset = 0;
while(running_)
{
auto updates = tg_client_.getUpdates(offset, 30);
if(!updates.has_value())
{
spdlog::error("Failed to get updates: {}",
mw::errorMsg(updates.error()));
std::this_thread::sleep_for(std::chrono::seconds(5));
continue;
}
if((*updates)["ok"] == true)
{
for(const auto& update : (*updates)["result"])
{
offset = update["update_id"].get<int64_t>() + 1;
if(update.contains("message"))
{
dispatchMessage(update["message"]);
}
}
}
}
}
void stopPolling()
{
running_ = false;
}
private:
void dispatchMessage(const json& message)
{
int64_t chat_id = message["chat"]["id"];
if(message.contains("from") && message["from"].contains("username"))
{
std::string username = message["from"]["username"];
username_resolver_.update(username, chat_id);
}
auto callbacks = sub_manager_.getSubscribers(chat_id);
for(const auto& url : callbacks)
{
std::thread([url, message]()
{
mw::HTTPSession session;
mw::HTTPRequest req(url);
req.setContentType("application/json");
req.setPayload(message.dump());
auto res = session.post(req);
if(!res.has_value())
{
spdlog::error("Failed to post to callback {}: {}", url,
mw::errorMsg(res.error()));
}
}).detach();
}
}
TelegramClient tg_client_;
SubscriptionManager sub_manager_;
UsernameResolver username_resolver_;
bool running_ = true;
};
int main(int argc, char** argv)
{
cxxopts::Options cmd_options("telegrammer", "Telegram Bot API Gateway");
cmd_options.add_options()
("t,token", "Telegram Bot Token", cxxopts::value<std::string>())
("p,port", "Port to listen on",
cxxopts::value<int>()->default_value("8080"))
("h,host", "Interface to bind to",
cxxopts::value<std::string>()->default_value("0.0.0.0"))
("help", "Print help");
try
{
auto result = cmd_options.parse(argc, argv);
if(result.count("help"))
{
std::cout << cmd_options.help() << std::endl;
return 0;
}
if(!result.count("token"))
{
spdlog::error("Token is required. Use --token <TOKEN>");
std::cout << cmd_options.help() << std::endl;
return 1;
}
std::string token = result["token"].as<std::string>();
std::string host = result["host"].as<std::string>();
int port = result["port"].as<int>();
mw::IPSocketInfo listen_info;
listen_info.address = host;
listen_info.port = port;
App app(listen_info, token);
app.setup();
auto start_res = app.start();
if(!start_res.has_value())
{
spdlog::error("Failed to start server: {}",
mw::errorMsg(start_res.error()));
return 1;
}
spdlog::info("Server listening on {}:{}", host, port);
std::thread polling_thread([&app]()
{
app.runPolling();
});
app.wait();
app.stopPolling();
if(polling_thread.joinable())
{
polling_thread.join();
}
}
catch(const cxxopts::exceptions::exception& e)
{
spdlog::error("Error parsing options: {}", e.what());
return 1;
}
return 0;
}