blob: d49aad9f5b4799361e6e48c29a19d6870f477571 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "test_bits.hpp"
#include "proton/connection.hpp"
#include "proton/connection_options.hpp"
#include "proton/container.hpp"
#include "proton/listener.hpp"
#include "proton/listen_handler.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/receiver.hpp"
#include "proton/receiver_options.hpp"
#include "proton/sender.hpp"
#include "proton/sender_options.hpp"
#include "proton/work_queue.hpp"
#include <cstdlib>
#include <ctime>
#include <string>
#include <cstdio>
#include <sstream>
#include <thread>
#include <mutex>
#include <condition_variable>
namespace {
std::string make_url(std::string host, int port) {
std::ostringstream url;
url << "//" << host << ":" << port;
return url.str();
}
struct test_listen_handler : public proton::listen_handler {
bool on_open_, on_accept_, on_close_;
std::string on_error_;
std::string host_;
proton::connection_options opts_;
test_listen_handler(const std::string& host=std::string(),
const proton::connection_options& opts=proton::connection_options()
) : on_open_(false), on_accept_(false), on_close_(false), host_(host), opts_(opts) {}
proton::connection_options on_accept(proton::listener&) override {
on_accept_ = true;
return proton::connection_options();
}
void on_open(proton::listener& l) override {
on_open_ = true;
ASSERT(!on_accept_);
ASSERT(on_error_.empty());
ASSERT(!on_close_);
l.container().connect(make_url(host_, l.port()), opts_);
}
void on_close(proton::listener&) override {
on_close_ = true;
ASSERT(on_open_ || on_error_.size());
}
void on_error(proton::listener&, const std::string& e) override {
on_error_ = e;
ASSERT(!on_close_);
}
};
class test_handler : public proton::messaging_handler {
public:
bool closing;
bool done;
std::string peer_vhost;
std::string peer_container_id;
std::vector<proton::symbol> peer_offered_capabilities;
std::vector<proton::symbol> peer_desired_capabilities;
std::map<proton::symbol, proton::value> peer_properties;
proton::listener listener;
test_listen_handler listen_handler;
test_handler(const std::string h, const proton::connection_options& c_opts)
: closing(false), done(false), listen_handler(h, c_opts)
{}
void on_container_start(proton::container &c) override {
listener = c.listen("//:0", listen_handler);
}
void on_connection_open(proton::connection &c) override {
ASSERT(listen_handler.on_open_);
ASSERT(!listen_handler.on_close_);
ASSERT(listen_handler.on_error_.empty());
// First call is the incoming server-side of the connection, that we are interested in.
// Second call is for the response to the client, ignore that.
if (!closing) {
peer_vhost = c.virtual_host();
peer_container_id = c.container_id();
peer_offered_capabilities = c.offered_capabilities();
peer_desired_capabilities = c.desired_capabilities();
peer_properties = c.properties();
c.close();
}
closing = true;
}
void on_connection_close(proton::connection &) override {
if (!done) listener.stop();
done = true;
}
};
int test_container_default_container_id() {
proton::connection_options opts;
test_handler th("", opts);
proton::container(th).run();
ASSERT(!th.peer_container_id.empty());
ASSERT(th.listen_handler.on_error_.empty());
ASSERT(th.listen_handler.on_close_);
return 0;
}
int test_container_vhost() {
proton::connection_options opts;
opts.virtual_host("a.b.c");
test_handler th("", opts);
proton::container(th).run();
ASSERT_EQUAL(th.peer_vhost, "a.b.c");
return 0;
}
int test_container_default_vhost() {
proton::connection_options opts;
test_handler th("127.0.0.1", opts);
proton::container(th).run();
ASSERT_EQUAL(th.peer_vhost, "127.0.0.1");
return 0;
}
int test_container_no_vhost() {
// explicitly setting an empty virtual-host will cause the Open
// performative to be sent without a hostname field present.
// Sadly whether or not a 'hostname' field was received cannot be
// determined from here, so just exercise the code
proton::connection_options opts;
opts.virtual_host("");
test_handler th("127.0.0.1", opts);
proton::container(th).run();
ASSERT_EQUAL(th.peer_vhost, "");
return 0;
}
std::vector<proton::symbol> make_caps(const std::string& s) {
std::vector<proton::symbol> caps;
caps.push_back(s);
return caps;
}
int test_container_capabilities() {
proton::connection_options opts;
opts.offered_capabilities(make_caps("offered"));
opts.desired_capabilities(make_caps("desired"));
test_handler th("", opts);
proton::container(th).run();
ASSERT_EQUAL(th.peer_offered_capabilities.size(), 1u);
ASSERT_EQUAL(th.peer_offered_capabilities[0], proton::symbol("offered"));
ASSERT_EQUAL(th.peer_desired_capabilities.size(), 1u);
ASSERT_EQUAL(th.peer_desired_capabilities[0], proton::symbol("desired"));
ASSERT_EQUAL(th.peer_properties.size(), 0u);
return 0;
}
int test_container_properties() {
proton::connection_options opts;
std::map<proton::symbol, proton::value> props;
props["qpid.client_process"] = "test_process";
props["qpid.client_pid"] = 123;
opts.properties(props);
test_handler th("", opts);
proton::container(th).run();
ASSERT_EQUAL(th.peer_properties.size(), 2u);
ASSERT_EQUAL(th.peer_properties["qpid.client_process"], "test_process");
ASSERT_EQUAL(th.peer_properties["qpid.client_pid"], 123);
return 0;
}
int test_container_bad_address() {
// Listen on a bad address, check for leaks
// Regression test for https://issues.apache.org/jira/browse/PROTON-1217
proton::container c;
// Default fixed-option listener. Valgrind for leaks.
c.listen("999.666.999.666:0");
c.run();
// Dummy listener.
test_listen_handler l;
test_handler h2("999.999.999.666", proton::connection_options());
c.listen("999.666.999.666:0", l);
c.run();
ASSERT(!l.on_open_);
ASSERT(!l.on_accept_);
ASSERT(l.on_close_);
ASSERT(!l.on_error_.empty()); // proton:io: Name or service not known
return 0;
}
class stop_tester : public proton::messaging_handler {
proton::listener listener;
test_listen_handler listen_handler;
// Set up a listener which would block forever
void on_container_start(proton::container& c) override {
ASSERT(state==0);
listener = c.listen("//:0", listen_handler);
c.auto_stop(false);
state = 1;
}
// Get here twice - once for listener, once for connector
void on_connection_open(proton::connection &c) override {
c.close();
state++;
}
void on_connection_close(proton::connection &c) override {
ASSERT(state==3);
c.container().stop();
state = 4;
}
void on_container_stop(proton::container & ) override {
ASSERT(state==4);
state = 5;
}
void on_transport_error(proton::transport & t) override {
// Do nothing - ignore transport errors - we're going to get one when
// the container stops.
}
public:
stop_tester(): state(0) {}
int state;
};
int test_container_stop() {
stop_tester t;
proton::container(t).run();
ASSERT(t.state==5);
return 0;
}
struct hang_tester : public proton::messaging_handler {
proton::listener listener;
bool done;
hang_tester() : done(false) {}
void connect(proton::container* c) {
c->connect(make_url("", listener.port()));
}
void on_container_start(proton::container& c) override {
listener = c.listen("//:0");
c.schedule(proton::duration(250), proton::make_work(&hang_tester::connect, this, &c));
}
void on_connection_open(proton::connection& c) override {
c.close();
}
void on_connection_close(proton::connection& c) override {
if (!done) {
done = true;
listener.stop();
}
}
};
int test_container_schedule_nohang() {
hang_tester t;
proton::container(t).run();
return 0;
}
class immediate_stop_tester : public proton::messaging_handler {
public:
void on_container_start(proton::container &c) override {
c.stop();
}
};
int test_container_immediate_stop() {
immediate_stop_tester t;
proton::container(t).run(); // Should return after on_container_start
return 0;
}
int test_container_pre_stop() {
proton::container c;
c.stop();
c.run(); // Should return immediately
return 0;
}
struct schedule_tester : public proton::messaging_handler {
void stop(proton::container* c) { c->stop(); }
void on_container_start(proton::container& c) override {
c.schedule(proton::duration(250), proton::make_work(&schedule_tester::stop, this, &c));
}
};
int test_container_schedule_stop() {
schedule_tester tester;
proton::container c(tester);
c.auto_stop(false);
c.run();
return 0;
}
class link_test_handler : public proton::messaging_handler {//, public proton::listen_handler {
public:
bool had_receiver;
bool had_sender;
test_listen_handler listen_handler;
proton::listener listener;
proton::receiver_options receiver_options;
proton::sender_options sender_options;
std::map<proton::symbol, proton::value> peer_receiver_properties;
std::map<proton::symbol, proton::value> peer_sender_properties;
link_test_handler(const proton::receiver_options &r_opts=proton::receiver_options(),
const proton::sender_options &s_opts=proton::sender_options())
: had_receiver(false),
had_sender(false),
receiver_options(r_opts),
sender_options(s_opts)
{}
void on_container_start(proton::container &c) override {
listener = c.listen("//:0", listen_handler);
}
void on_connection_open(proton::connection &c) override {
if (c.uninitialized()) {
proton::messaging_handler::on_connection_open(c);
c.open_receiver("", receiver_options);
c.open_sender("", sender_options);
}
}
void check_close(proton::link &l) {
if (had_receiver && had_sender) {
l.connection().close();
listener.stop();
}
}
void on_receiver_open(proton::receiver &l) override {
had_receiver = true;
// When a client creates a sender then the server is notified about it as a receiver
peer_sender_properties = l.properties();
check_close(l);
}
void on_sender_open(proton::sender &l) override {
had_sender = true;
// When a client creates a receiver then the server is notified about it as a sender
peer_receiver_properties = l.properties();
check_close(l);
}
};
int test_container_links_no_properties() {
link_test_handler th;
proton::container(th).run();
ASSERT(th.had_receiver);
ASSERT(th.had_sender);
ASSERT_EQUAL(th.peer_receiver_properties.size(), 0u);
ASSERT_EQUAL(th.peer_sender_properties.size(), 0u);
return 0;
}
int test_container_links_properties() {
proton::receiver_options r_opts;
std::map<proton::symbol, proton::value> r_props;
r_props["recv.prop_str"] = "receiver_string";
r_opts.properties(r_props);
proton::sender_options s_opts;
std::map<proton::symbol, proton::value> s_props;
s_props["send.prop_str"] = "sender_string";
s_props["send.prop_int"] = 123456789;
s_opts.properties(s_props);
link_test_handler th(r_opts, s_opts);
proton::container(th).run();
ASSERT(th.had_receiver);
ASSERT(th.had_sender);
ASSERT_EQUAL(th.peer_receiver_properties.size(), 1u);
ASSERT_EQUAL(th.peer_receiver_properties["recv.prop_str"], "receiver_string");
ASSERT_EQUAL(th.peer_sender_properties.size(), 2u);
ASSERT_EQUAL(th.peer_sender_properties["send.prop_str"], "sender_string");
ASSERT_EQUAL(th.peer_sender_properties["send.prop_int"], 123456789);
return 0;
}
class test_mt_handler : public proton::messaging_handler {
public:
std::mutex lock_;
std::condition_variable cond_;
std::string str_;
proton::error_condition err_;
void set(const std::string& s) {
std::lock_guard<std::mutex> l(lock_);
str_ = s;
cond_.notify_one();
}
std::string wait() {
std::unique_lock<std::mutex> l(lock_);
while (str_.empty()) cond_.wait(l);
std::string s = str_;
str_.clear();
return s;
}
proton::error_condition error() const { return err_; }
void on_container_start(proton::container &) override { set("start"); }
void on_connection_open(proton::connection &) override { set("open"); }
// Catch errors and save.
void on_error(const proton::error_condition& e) override { err_ = e; }
};
class container_runner {
proton::container& c_;
public:
container_runner(proton::container& c) : c_(c) {}
void operator()() {c_.run();}
};
void test_container_mt_stop_empty() {
test_mt_handler th;
proton::container c(th);
c.auto_stop( false );
container_runner runner(c);
auto t = std::thread(runner);
// Must ensure that thread is joined
try {
ASSERT_EQUAL("start", th.wait());
c.stop();
t.join();
ASSERT_EQUAL("", th.error().name());
} catch (const std::exception &e) {
std::cerr << FAIL_MSG(e.what()) << std::endl;
// If join hangs, let the test die by timeout. We cannot
// detach and continue: deleting the container while it is
// still alive will put the process in an undefined state.
t.join();
throw;
}
}
void test_container_mt_stop() {
test_mt_handler th;
proton::container c(th);
c.auto_stop(false);
container_runner runner(c);
auto t = std::thread(runner);
// Must ensure that thread is joined
try {
test_listen_handler lh;
ASSERT_EQUAL("start", th.wait());
c.listen("//:0", lh); // Also opens a connection
ASSERT_EQUAL("open", th.wait());
c.stop();
t.join();
} catch (const std::exception& e) {
std::cerr << FAIL_MSG(e.what()) << std::endl;
// If join hangs, let the test die by timeout. We cannot
// detach and continue: deleting the container while t is
// still alive will put the process in an undefined state.
t.join();
throw;
}
}
class test_mt_handler_wq : public test_mt_handler {
public:
proton::work_queue *wq_;
proton::work call_do_close_;
proton::connection connection_;
std::mutex wqlock_;
test_mt_handler_wq() : wq_(0) {}
void on_connection_open(proton::connection &c) override {
{
std::unique_lock<std::mutex> l(wqlock_);
// Just record first connection side, inbound or outbound
if (!connection_) {
connection_ = c;
wq_ = &c.work_queue();
call_do_close_ = make_work(&test_mt_handler_wq::do_close, this);
}
else
return;
}
test_mt_handler::on_connection_open(c);
}
void initiate_close() {
std::unique_lock<std::mutex> l(wqlock_);
wq_->add(call_do_close_);
}
void do_close() { connection_.close(); }
void on_connection_close(proton::connection &) override { set("closed"); }
};
void test_container_mt_close_race() {
test_mt_handler_wq th;
proton::container c(th);
c.auto_stop(false);
container_runner runner(c);
auto t = std::thread(runner);
// Must ensure that thread is joined
try {
test_listen_handler lh;
ASSERT_EQUAL("start", th.wait());
c.listen("//:0", lh); // Also opens a connection
ASSERT_EQUAL("open", th.wait());
th.initiate_close();
ASSERT_EQUAL("closed", th.wait());
// The two sides of the connection are closing, each with its
// own connection context. Start a proactor disconnect to run
// competing close cleanup in a third context. PROTON-2027.
c.stop();
t.join();
} catch (const std::exception& e) {
std::cerr << FAIL_MSG(e.what()) << std::endl;
// If join hangs, let the test die by timeout. We cannot
// detach and continue: deleting the container while t is
// still alive will put the process in an undefined state.
t.join();
throw;
}
}
} // namespace
int main(int argc, char** argv) {
int failed = 0;
RUN_ARGV_TEST(failed, test_container_default_container_id());
RUN_ARGV_TEST(failed, test_container_vhost());
RUN_ARGV_TEST(failed, test_container_capabilities());
RUN_ARGV_TEST(failed, test_container_properties());
RUN_ARGV_TEST(failed, test_container_default_vhost());
RUN_ARGV_TEST(failed, test_container_no_vhost());
RUN_ARGV_TEST(failed, test_container_bad_address());
RUN_ARGV_TEST(failed, test_container_stop());
RUN_ARGV_TEST(failed, test_container_schedule_nohang());
RUN_ARGV_TEST(failed, test_container_immediate_stop());
RUN_ARGV_TEST(failed, test_container_pre_stop());
RUN_ARGV_TEST(failed, test_container_schedule_stop());
RUN_ARGV_TEST(failed, test_container_links_no_properties());
RUN_ARGV_TEST(failed, test_container_links_properties());
RUN_ARGV_TEST(failed, test_container_mt_stop_empty());
RUN_ARGV_TEST(failed, test_container_mt_stop());
RUN_ARGV_TEST(failed, test_container_mt_close_race());
return failed;
}