blob: d48b21585fd7f6dfd6ddc5e15b7f1672bcb073ab [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "options.hpp"
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/duration.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver.hpp>
#include <proton/receiver_options.hpp>
#include <proton/sender.hpp>
#include <proton/sender_options.hpp>
#include <proton/session.hpp>
#include <proton/target_options.hpp>
#include <proton/tracker.hpp>
#include <proton/transfer.hpp>
#include <proton/types.hpp>
#include <proton/work_queue.hpp>
#include <algorithm>
#include <condition_variable>
#include <cstddef>
#include <exception>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <string_view>
#include <thread>
#include <vector>
#if defined(_WIN32) || defined(_WIN64)
# include <windows.h>
#else
# include <csignal>
#endif
// Sentinel queue name: omit message 'to' address so the peer may reject the message.
constexpr std::string_view NO_TO_ADDRESS = "<none>";
// Interactive tester for AMQP transactions: declare/commit/abort transactions, receive
// messages (with optional timeout), send to the default or a given queue (or omit 'to'
// for rejection testing), wait for disposition updates, and release unsettled. Type
// 'help' for commands. Received messages are accepted on receipt.
class tx_recv_interactive : public proton::messaging_handler {
private:
std::string conn_url_;
std::string addr_;
proton::connection connection_;
proton::receiver receiver_;
proton::sender sender_;
proton::session session_;
proton::work_queue* work_queue_ = nullptr;
int send_pending_ = 0;
int send_next_id_ = 0;
std::string send_to_addr_;
mutable std::mutex wait_mutex_;
std::condition_variable wait_cv_;
bool ready_ = false;
bool sleep_done_ = true;
bool timed_out_ = false;
int fetch_expected_ = 0;
int fetch_received_ = 0;
bool fetch_done_ = false;
int settled_expected_ = 0;
int settled_received_ = 0;
bool settled_done_ = false;
bool interrupt_requested_ = false;
std::string last_error_;
/// Run f() in a try block; on error, record the message for later reporting.
template <typename F>
void catch_any_error(F&& f) {
try {
f();
} catch (const std::exception& e) {
auto l = std::lock_guard(wait_mutex_);
last_error_ = e.what();
}
}
std::mutex sync_mutex_;
std::condition_variable sync_cv_;
bool sync_done_ = false;
void timeout_fired() {
auto l = std::lock_guard(wait_mutex_);
timed_out_ = true;
wait_cv_.notify_all();
}
void do_declare() {
catch_any_error([this]() { session_.transaction_declare(); });
}
void do_fetch(int n) {
receiver_.add_credit(n);
}
void do_commit() {
catch_any_error([this]() { session_.transaction_commit(); });
}
void do_abort() {
catch_any_error([this]() { session_.transaction_abort(); });
}
void do_release() {
for (auto rcv : session_.receivers()) {
for (auto d : rcv.unsettled_deliveries()) {
d.release();
}
}
}
void do_quit() {
connection_.close();
}
public:
tx_recv_interactive(const std::string& url, const std::string& addr)
: conn_url_(url), addr_(addr) {}
void on_container_start(proton::container& c) override {
c.connect(conn_url_);
}
void on_connection_open(proton::connection& conn) override {
connection_ = conn;
work_queue_ = &conn.work_queue();
// credit_window(0) so we control flow via "fetch"
receiver_ = conn.open_receiver(addr_, proton::receiver_options().credit_window(0));
sender_ = conn.open_sender("", proton::sender_options{}.target(proton::target_options{}.anonymous(true)));
}
void on_sender_open(proton::sender& s) override {
sender_ = s;
}
void on_session_open(proton::session& s) override {
session_ = s;
{
auto l = std::lock_guard(wait_mutex_);
ready_ = true;
}
wait_cv_.notify_all();
}
void on_session_transaction_declared(proton::session& s) override {
std::cout << "transaction declared: " << s.transaction_id() << std::endl;
}
void on_session_transaction_committed(proton::session& s) override {
std::cout << "transaction committed" << std::endl;
}
void on_session_transaction_aborted(proton::session& s) override {
std::cout << "transaction aborted: " << s.transaction_error().what() << std::endl;
}
void on_session_transaction_error(proton::session& s) override {
std::cout << "transaction error: " << s.transaction_error().what() << std::endl;
}
void on_sendable(proton::sender&) override {
try_send();
}
void try_send() {
int to_send = 0;
std::string to_addr;
{
auto l = std::lock_guard(wait_mutex_);
to_send = send_pending_;
to_addr = send_to_addr_;
}
int sent = 0;
while (sender_ && sender_.credit() > 0 && to_send > 0) {
proton::message msg;
msg.id(send_next_id_);
if (to_addr != NO_TO_ADDRESS)
msg.to(to_addr);
msg.body(std::map<std::string, int>{{"message", send_next_id_}});
sender_.send(msg);
++send_next_id_;
++sent;
--to_send;
}
if (sent > 0) {
auto l = std::lock_guard(wait_mutex_);
send_pending_ -= sent;
if (send_pending_ == 0)
wait_cv_.notify_all();
}
}
void on_message(proton::delivery& d, proton::message& msg) override {
std::cout << d.tag() << ": " << msg.body() << std::endl;
d.accept();
{
auto l = std::lock_guard(wait_mutex_);
if (fetch_expected_ > 0) {
++fetch_received_;
if (fetch_received_ >= fetch_expected_) {
fetch_done_ = true;
wait_cv_.notify_all();
}
}
// Pre-settled deliveries never trigger on_delivery_settle; count them here for wait_settled
if (settled_expected_ > 0 && d.settled()) {
++settled_received_;
if (settled_received_ >= settled_expected_) {
settled_done_ = true;
wait_cv_.notify_all();
}
}
}
}
void on_delivery_settle(proton::delivery&) override {
auto l = std::lock_guard(wait_mutex_);
if (settled_expected_ > 0) {
++settled_received_;
if (settled_received_ >= settled_expected_) {
settled_done_ = true;
wait_cv_.notify_all();
}
}
}
void on_transactional_accept(proton::tracker& t) override {
std::cout << "disposition: accepted: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl;
}
void on_transactional_reject(proton::tracker& t) override {
std::cout << "disposition: rejected: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl;
}
void on_transactional_release(proton::tracker& t) override {
std::cout << "disposition: released: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl;
}
void on_tracker_accept(proton::tracker& t) override {
std::cout << "disposition: accepted: " << t.tag() << std::endl;
}
void on_tracker_reject(proton::tracker& t) override {
std::cout << "disposition: rejected: " << t.tag() << std::endl;
}
void on_tracker_release(proton::tracker& t) override {
std::cout << "disposition: released: " << t.tag() << std::endl;
}
void on_tracker_settle(proton::tracker& t) override {
std::cout << "disposition: settled: " << t.tag() << std::endl;
}
void on_session_error(proton::session& s) override {
std::cout << "Session error: " << s.error().what() << std::endl;
s.connection().close();
}
/// Called from the SIGINT-handling thread to wake any current wait.
void request_interrupt() {
{
auto l = std::lock_guard(wait_mutex_);
interrupt_requested_ = true;
wait_cv_.notify_all();
}
{
auto l = std::lock_guard(sync_mutex_);
sync_cv_.notify_all();
}
}
/// True if the last wait was exited due to Ctrl-C.
bool interrupted() const {
auto l = std::lock_guard(wait_mutex_);
return interrupt_requested_;
}
/// Clear the interrupt flag (e.g. after the command loop has handled it).
void clear_interrupt() {
auto l = std::lock_guard(wait_mutex_);
interrupt_requested_ = false;
}
// Thread-safe: wait until handler is ready to accept commands
void wait_ready() {
auto l = std::unique_lock(wait_mutex_);
interrupt_requested_ = false;
wait_cv_.wait(l, [this] { return ready_ || interrupt_requested_; });
}
/// Wait until the connection thread has processed all work queued so far.
/// There is no "loop is idle" callback in the Proton API; this adds a
/// sentinel work item and returns when it runs (so the connection thread
/// has caught up). Use before showing the prompt so the command loop stays
/// in sync with background connection-thread activity.
void sync_with_connection_thread() {
{
auto l = std::lock_guard(wait_mutex_);
interrupt_requested_ = false;
}
{
auto l = std::lock_guard(sync_mutex_);
sync_done_ = false;
}
work_queue_->add([this]() {
auto l = std::lock_guard(sync_mutex_);
sync_done_ = true;
sync_cv_.notify_all();
});
auto l = std::unique_lock(sync_mutex_);
sync_cv_.wait(l, [this] {
if (sync_done_) return true;
auto w = std::lock_guard(wait_mutex_);
return interrupt_requested_;
});
}
/// Schedule a callback on the connection thread after a delay (seconds).
/// wait_sleep_done() blocks until that callback has run.
void sleep(double seconds) {
auto l = std::unique_lock(wait_mutex_);
interrupt_requested_ = false;
sleep_done_ = false;
l.unlock();
auto ms = static_cast<proton::duration::numeric_type>(seconds * 1000);
work_queue_->schedule(proton::duration(ms), [this]() {
auto l = std::lock_guard(wait_mutex_);
sleep_done_ = true;
wait_cv_.notify_all();
});
l.lock();
wait_cv_.wait(l, [this] { return sleep_done_ || interrupt_requested_; });
}
/// Start fetching n messages; optionally timeout after timeout_seconds (0 = no timeout).
/// wait_fetch_done() blocks until n messages received or timeout (whichever first).
void fetch(int n, double timeout_seconds) {
auto l = std::unique_lock(wait_mutex_);
interrupt_requested_ = false;
timed_out_ = false;
fetch_expected_ = n;
fetch_received_ = 0;
fetch_done_ = (n <= 0);
l.unlock();
work_queue_->add([this, n]() { do_fetch(n); });
if (timeout_seconds > 0) {
auto ms = static_cast<proton::duration::numeric_type>(timeout_seconds * 1000);
work_queue_->schedule(proton::duration(ms), [this]() { timeout_fired(); });
}
l.lock();
wait_cv_.wait(l, [this] { return fetch_done_ || timed_out_ || interrupt_requested_; });
fetch_expected_ = 0;
}
int fetch_received_count() const {
auto l = std::lock_guard(wait_mutex_);
return fetch_received_;
}
bool timed_out() const {
auto l = std::lock_guard(wait_mutex_);
return timed_out_;
}
/// Clear the timed-out flag (e.g. after the command loop has handled it).
void clear_timed_out() {
auto l = std::lock_guard(wait_mutex_);
timed_out_ = false;
}
/// If a proton::error was recorded, assign its message to out, clear it, and return true.
bool take_last_error(std::string& out) {
auto l = std::lock_guard(wait_mutex_);
if (last_error_.empty())
return false;
out = std::move(last_error_);
last_error_.clear();
return true;
}
/// Wait until N disposition settlements (on_delivery_settle) have been received,
/// or timeout_seconds (0 = no timeout). Same pattern as fetch.
void wait_settled(int n, double timeout_seconds) {
auto l = std::unique_lock(wait_mutex_);
interrupt_requested_ = false;
timed_out_ = false;
settled_expected_ = n;
settled_received_ = 0;
settled_done_ = (n <= 0);
l.unlock();
if (n <= 0)
return;
if (timeout_seconds > 0) {
auto ms = static_cast<proton::duration::numeric_type>(timeout_seconds * 1000);
work_queue_->schedule(proton::duration(ms), [this]() { timeout_fired(); });
}
l.lock();
wait_cv_.wait(l, [this] { return settled_done_ || timed_out_ || interrupt_requested_; });
settled_expected_ = 0;
}
int settled_received_count() const {
auto l = std::lock_guard(wait_mutex_);
return settled_received_;
}
// Thread-safe: schedule work on the container thread
void declare() {
work_queue_->add([this]() { do_declare(); });
}
void commit() {
work_queue_->add([this]() { do_commit(); });
}
void abort() {
work_queue_->add([this]() { do_abort(); });
}
void release() {
work_queue_->add([this]() { do_release(); });
}
void send(int n, const std::string& to_addr) {
auto l = std::unique_lock(wait_mutex_);
interrupt_requested_ = false;
send_pending_ = n;
send_to_addr_ = to_addr.empty() ? addr_ : to_addr;
l.unlock();
work_queue_->add([this]() { try_send(); });
l.lock();
wait_cv_.wait(l, [this] { return send_pending_ == 0 || interrupt_requested_; });
}
void list_unsettled() {
auto count = std::size_t(0);
for (auto rcv : session_.receivers()) {
for (auto d : rcv.unsettled_deliveries()) {
(void)d;
++count;
}
}
std::cout << count << " unsettled delivery(ies)" << std::endl;
for (auto rcv : session_.receivers()) {
for (auto d : rcv.unsettled_deliveries()) {
std::cout << " " << d.tag() << std::endl;
}
}
}
void quit() {
work_queue_->add([this]() { do_quit(); });
}
};
using command_fn = bool (*)(tx_recv_interactive& recv, const std::vector<std::string>& args);
static bool cmd_declare(tx_recv_interactive& recv, const std::vector<std::string>&) {
recv.declare();
return false;
}
static bool cmd_fetch(tx_recv_interactive& recv, const std::vector<std::string>& args) {
auto n = 1;
auto timeout_seconds = 0.0;
if (!args.empty()) {
try {
n = std::stoi(args[0]);
if (n < 1) n = 1;
} catch (...) {
std::cout << "fetch: expected positive number, got '" << args[0] << "'" << std::endl;
return false;
}
}
if (args.size() >= 2) {
try {
timeout_seconds = std::stof(args[1]);
if (timeout_seconds < 0) timeout_seconds = 0;
} catch (...) {
std::cout << "fetch: expected timeout in seconds, got '" << args[1] << "'" << std::endl;
return false;
}
}
recv.fetch(n, timeout_seconds);
std::cout << "fetch: received " << recv.fetch_received_count() << " message(s)" << std::endl;
return false;
}
static bool cmd_commit(tx_recv_interactive& recv, const std::vector<std::string>&) {
recv.commit();
return false;
}
static bool cmd_abort(tx_recv_interactive& recv, const std::vector<std::string>&) {
recv.abort();
return false;
}
static bool cmd_unsettled(tx_recv_interactive& recv, const std::vector<std::string>&) {
recv.list_unsettled();
return false;
}
static bool cmd_release(tx_recv_interactive& recv, const std::vector<std::string>&) {
recv.release();
return false;
}
static bool cmd_send(tx_recv_interactive& recv, const std::vector<std::string>& args) {
auto n = 1;
auto to_addr = std::string();
if (!args.empty()) {
try {
n = std::stoi(args[0]);
if (n < 1) n = 1;
} catch (...) {
std::cout << "send: expected positive number, got '" << args[0] << "'" << std::endl;
return false;
}
}
if (args.size() >= 2)
to_addr = args[1];
recv.send(n, to_addr);
std::cout << "send: sent " << n << " message(s) to " << (to_addr.empty() ? "(default address)" : to_addr) << std::endl;
return false;
}
static bool cmd_wait_settled(tx_recv_interactive& recv, const std::vector<std::string>& args) {
auto n = 1;
auto timeout_seconds = 0.0;
if (!args.empty()) {
try {
n = std::stoi(args[0]);
if (n < 0) n = 0;
} catch (...) {
std::cout << "wait_settled: expected non-negative count, got '" << args[0] << "'" << std::endl;
return false;
}
}
if (args.size() >= 2) {
try {
timeout_seconds = std::stof(args[1]);
if (timeout_seconds < 0) timeout_seconds = 0;
} catch (...) {
std::cout << "wait_settled: expected timeout in seconds, got '" << args[1] << "'" << std::endl;
return false;
}
}
recv.wait_settled(n, timeout_seconds);
std::cout << "wait_settled: " << recv.settled_received_count() << " settlement(s)" << std::endl;
return false;
}
static bool cmd_sleep(tx_recv_interactive& recv, const std::vector<std::string>& args) {
if (args.empty()) {
std::cout << "sleep: expected duration in seconds (e.g. sleep 1.5)" << std::endl;
return false;
}
float seconds;
try {
seconds = std::stof(args[0]);
if (seconds < 0) seconds = 0;
} catch (...) {
std::cout << "sleep: expected number of seconds, got '" << args[0] << "'" << std::endl;
return false;
}
recv.sleep(static_cast<double>(seconds));
return false;
}
static bool cmd_quit(tx_recv_interactive&, const std::vector<std::string>&) {
return true;
}
static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&);
struct command_entry {
const char* name;
const char* description;
command_fn fn;
};
// Lexicographically sorted by name for std::lower_bound lookup
static constexpr command_entry COMMAND_TABLE[] = {
{"abort", "Abort the current transaction", cmd_abort},
{"commit", "Commit the current transaction", cmd_commit},
{"declare", "Start a transaction", cmd_declare},
{"fetch", "Receive n messages (optional timeout in seconds)", cmd_fetch},
{"help", "Show this list of commands", cmd_help},
{"quit", "Exit the program", cmd_quit},
{"release", "Release all unsettled deliveries", cmd_release},
{"send", "Send n messages to queue (optional to address; use <none> to omit)", cmd_send},
{"sleep", "Sleep for given seconds", cmd_sleep},
{"unsettled", "List unsettled deliveries", cmd_unsettled},
{"wait_settled", "Wait for n disposition updates (optional timeout)", cmd_wait_settled},
};
static constexpr std::size_t COMMAND_TABLE_SIZE = sizeof(COMMAND_TABLE) / sizeof(COMMAND_TABLE[0]);
static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&) {
for (std::size_t i = 0; i < COMMAND_TABLE_SIZE; ++i) {
std::cout << " " << COMMAND_TABLE[i].name << " - " << COMMAND_TABLE[i].description << "\n";
}
return false;
}
/// Split a string into words (by whitespace).
static std::vector<std::string> split_args(const std::string& line) {
std::vector<std::string> out;
std::istringstream is(line);
for (std::string word; is >> word;) out.push_back(word);
return out;
}
/// Parsed command: first element is command name, remaining elements are arguments.
using parsed_command = std::vector<std::string>;
/// Parse a line into zero or more commands separated by ';'. Each segment is
/// split into words (command name + args). Empty segments are skipped.
/// Returns a sequence that can be iterated to run each command.
static std::vector<parsed_command> parse_command_line(const std::string& line) {
std::vector<parsed_command> commands;
std::istringstream is(line);
for (std::string segment; std::getline(is, segment, ';');) {
auto args = split_args(segment);
if (!args.empty())
commands.push_back(std::move(args));
}
return commands;
}
static const command_entry* find_command(std::string_view name) {
auto it = std::lower_bound(std::begin(COMMAND_TABLE), std::end(COMMAND_TABLE), name,
[](const command_entry& e, std::string_view s) {
return std::string_view(e.name) < s;
});
if (it != std::end(COMMAND_TABLE) && std::string_view(it->name) == name)
return &*it;
return nullptr;
}
static bool execute_command(tx_recv_interactive& recv, const command_entry& cmd, const std::vector<std::string>& args) {
auto cmd_args = std::vector<std::string>(args.begin() + 1, args.end());
return cmd.fn(recv, cmd_args);
}
#if defined(_WIN32) || defined(_WIN64)
static void block_interrupt() {
}
static HANDLE g_ctrl_c_event = nullptr;
static BOOL WINAPI win_ctrl_handler(DWORD dwCtrlType) {
if (dwCtrlType == CTRL_C_EVENT && g_ctrl_c_event != nullptr)
SetEvent(g_ctrl_c_event);
return TRUE;
}
static void interrupt_thread(tx_recv_interactive* recv) {
HANDLE ev = CreateEventW(nullptr, TRUE, FALSE, nullptr);
if (ev == nullptr)
return;
g_ctrl_c_event = ev;
if (!SetConsoleCtrlHandler(win_ctrl_handler, TRUE)) {
CloseHandle(ev);
return;
}
for (;;) {
if (WaitForSingleObject(ev, INFINITE) == WAIT_OBJECT_0 && recv != nullptr) {
recv->request_interrupt();
ResetEvent(ev);
}
}
}
#else
static void block_interrupt() {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
pthread_sigmask(SIG_BLOCK, &set, nullptr);
}
static void interrupt_thread(tx_recv_interactive* recv) {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
for (;;) {
int sig = 0;
if (sigwait(&set, &sig) == 0 && sig == SIGINT && recv != nullptr)
recv->request_interrupt();
}
}
#endif
int main(int argc, char** argv) {
block_interrupt();
auto conn_url = std::string("//127.0.0.1:5672");
auto addr = std::string("examples");
auto initial_commands = std::string();
auto opts = example::options(argc, argv);
opts.add_value(conn_url, 'u', "url", "connection URL", "URL");
opts.add_value(addr, 'a', "address", "address to receive messages from", "ADDR");
opts.add_value(initial_commands, 'c', "commands", "commands to run before interactive mode (e.g. declare; fetch 2; quit)", "COMMANDS");
try {
opts.parse();
auto recv = tx_recv_interactive(conn_url, addr);
auto container = proton::container(recv);
auto container_thread = std::thread([&container]() { container.run(); });
std::thread interrupt_th(interrupt_thread, &recv);
interrupt_th.detach();
recv.wait_ready();
auto line = initial_commands;
bool quit_requested = false;
while (!quit_requested) {
if (line.empty()) {
std::cout << "> " << std::flush;
if (!std::getline(std::cin, line))
break;
}
for (const auto& args : parse_command_line(line)) {
auto* cmd = find_command(args[0]);
if (cmd) {
if (execute_command(recv, *cmd, args)) {
quit_requested = true;
break;
}
if (recv.interrupted()) {
std::cout << "[interrupted]" << std::endl;
recv.clear_interrupt();
}
if (recv.timed_out()) {
std::cout << "[timed out]" << std::endl;
recv.clear_timed_out();
}
recv.sync_with_connection_thread();
if (std::string err; recv.take_last_error(err)) {
std::cout << "[error: " << err << "]" <<std::endl;
}
} else {
std::cout << "Unknown command. Type 'help' for a list of commands." << std::endl;
}
}
line.clear();
}
recv.quit();
container_thread.join();
return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}