blob: b56cd07e17d9b04d07d42877fa1949a832d76cde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "test_bits.hpp"
#include "proton_bits.hpp"
#include "link_namer.hpp"
#include "proton/connection.hpp"
#include "proton/container.hpp"
#include "proton/io/connection_driver.hpp"
#include "proton/link.hpp"
#include "proton/message.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/receiver_options.hpp"
#include "proton/sender.hpp"
#include "proton/sender_options.hpp"
#include "proton/source.hpp"
#include "proton/source_options.hpp"
#include "proton/target.hpp"
#include "proton/target_options.hpp"
#include "proton/transport.hpp"
#include "proton/types_fwd.hpp"
#include "proton/uuid.hpp"
#include <deque>
#include <algorithm>
namespace {
using namespace std;
using namespace proton;
using namespace test;
using proton::io::connection_driver;
using proton::io::const_buffer;
using proton::io::mutable_buffer;
typedef std::deque<char> byte_stream;
static const int MAX_SPIN = 1000; // Give up after 1000 event-less dispatches
/// In memory connection_driver that reads and writes from byte_streams
struct in_memory_driver : public connection_driver {
byte_stream& reads;
byte_stream& writes;
int spinning;
in_memory_driver(byte_stream& rd, byte_stream& wr, const std::string& name) :
connection_driver(name), reads(rd), writes(wr), spinning(0) {}
void do_read() {
mutable_buffer rbuf = read_buffer();
size_t size = std::min(reads.size(), rbuf.size);
if (size) {
copy(reads.begin(), reads.begin()+size, static_cast<char*>(rbuf.data));
read_done(size);
reads.erase(reads.begin(), reads.begin()+size);
}
}
void do_write() {
const_buffer wbuf = write_buffer();
if (wbuf.size) {
writes.insert(writes.begin(),
static_cast<const char*>(wbuf.data),
static_cast<const char*>(wbuf.data) + wbuf.size);
write_done(wbuf.size);
}
}
void check_idle() {
spinning = has_events() ? 0 : spinning+1;
if (spinning > MAX_SPIN)
throw test::error("no activity, interrupting test");
}
timestamp process(timestamp t = timestamp()) {
check_idle();
if (!dispatch())
throw test::error("unexpected close: "+connection().error().what());
timestamp next_tick;
if (t!=timestamp()) next_tick = tick(t);
do_read();
do_write();
check_idle();
dispatch();
return next_tick;
}
};
/// A pair of drivers that talk to each other in-memory, simulating a connection.
struct driver_pair {
byte_stream ab, ba;
in_memory_driver a, b;
driver_pair(const connection_options& oa, const connection_options& ob,
const std::string& name=""
) :
a(ba, ab, name+"a"), b(ab, ba, name+"b")
{
a.connect(oa);
b.accept(ob);
}
void process() { a.process(); b.process(); }
};
/// A pair of drivers that talk to each other in-memory, simulating a connection.
/// This version also simulates the passage of time
struct timed_driver_pair {
duration timeout;
byte_stream ab, ba;
in_memory_driver a, b;
timestamp now;
timed_driver_pair(duration t, const connection_options& oa0, const connection_options& ob0,
const std::string& name=""
) :
timeout(t),
a(ba, ab, name+"a"), b(ab, ba, name+"b"),
now(100100100)
{
connection_options oa(oa0);
connection_options ob(ob0);
a.connect(oa.idle_timeout(t));
b.accept(ob.idle_timeout(t));
}
void process_untimed() { a.process(); b.process(); }
void process_timed_succeed() {
timestamp anow = now + timeout - duration(100);
timestamp bnow = now + timeout - duration(100);
a.process(anow);
b.process(bnow);
now = std::max(anow, bnow);
}
void process_timed_fail() {
timestamp anow = now + timeout + timeout + duration(100);
timestamp bnow = now + timeout + timeout + duration(100);
a.process(anow);
b.process(bnow);
now = std::max(anow, bnow);
}
};
/// A handler that records incoming endpoints, errors etc.
struct record_handler : public messaging_handler {
std::deque<proton::receiver> receivers;
std::deque<proton::sender> senders;
std::deque<proton::session> sessions;
std::deque<std::string> unhandled_errors, transport_errors, connection_errors;
std::deque<proton::message> messages;
size_t link_count() const { return senders.size() + receivers.size(); }
void on_receiver_open(receiver &l) override {
messaging_handler::on_receiver_open(l);
receivers.push_back(l);
}
void on_sender_open(sender &l) override {
messaging_handler::on_sender_open(l);
senders.push_back(l);
}
void on_session_open(session &s) override {
messaging_handler::on_session_open(s);
sessions.push_back(s);
}
void on_transport_error(transport& t) override {
transport_errors.push_back(t.error().what());
}
void on_connection_error(connection& c) override {
connection_errors.push_back(c.error().what());
}
void on_error(const proton::error_condition& c) override {
unhandled_errors.push_back(c.what());
}
void on_message(proton::delivery&, proton::message& m) override {
messages.push_back(m);
}
};
template <class S> typename S::value_type quick_pop(S& s) {
ASSERT(!s.empty());
typename S::value_type x = s.front();
s.pop_front();
return x;
}
struct namer : public io::link_namer {
char name;
namer(char c) : name(c) {}
std::string link_name() { return std::string(1, name++); }
};
void test_driver_link_id() {
record_handler ha, hb;
driver_pair d(ha, hb);
d.a.connect(ha);
d.b.accept(hb);
namer na('x');
namer nb('b');
connection ca = d.a.connection();
connection cb = d.b.connection();
set_link_namer(ca, na);
set_link_namer(cb, nb);
d.b.connection().open();
d.a.connection().open_sender("foo");
while (ha.senders.empty() || hb.receivers.empty()) d.process();
sender s = quick_pop(ha.senders);
ASSERT_EQUAL("x", s.name());
ASSERT_EQUAL("x", quick_pop(hb.receivers).name());
d.a.connection().open_receiver("bar");
while (ha.receivers.empty() || hb.senders.empty()) d.process();
ASSERT_EQUAL("y", quick_pop(ha.receivers).name());
ASSERT_EQUAL("y", quick_pop(hb.senders).name());
d.b.connection().open_receiver("");
while (ha.senders.empty() || hb.receivers.empty()) d.process();
ASSERT_EQUAL("b", quick_pop(ha.senders).name());
ASSERT_EQUAL("b", quick_pop(hb.receivers).name());
}
void test_endpoint_close() {
record_handler ha, hb;
driver_pair d(ha, hb);
d.a.connection().open_sender("x");
d.a.connection().open_receiver("y");
while (ha.senders.size()+ha.receivers.size() < 2 ||
hb.senders.size()+hb.receivers.size() < 2) d.process();
proton::link ax = quick_pop(ha.senders), ay = quick_pop(ha.receivers);
proton::link bx = quick_pop(hb.receivers), by = quick_pop(hb.senders);
// Close a link
ax.close(proton::error_condition("err", "foo bar"));
while (!bx.closed()) d.process();
proton::error_condition c = bx.error();
ASSERT_EQUAL("err", c.name());
ASSERT_EQUAL("foo bar", c.description());
ASSERT_EQUAL("err: foo bar", c.what());
// Close a link with an empty condition
ay.close(proton::error_condition());
while (!by.closed()) d.process();
ASSERT(by.error().empty());
// Close a connection
connection ca = d.a.connection(), cb = d.b.connection();
ca.close(proton::error_condition("conn", "bad connection"));
while (!cb.closed()) d.process();
ASSERT_EQUAL("conn: bad connection", cb.error().what());
ASSERT_EQUAL(1u, hb.connection_errors.size());
ASSERT_EQUAL("conn: bad connection", hb.connection_errors.front());
}
void test_driver_disconnected() {
// driver.disconnected() aborts the connection and calls the local on_transport_error()
record_handler ha, hb;
driver_pair d(ha, hb);
d.a.connect(ha);
d.b.accept(hb);
while (!d.a.connection().active() || !d.b.connection().active())
d.process();
// Close a with an error condition. The AMQP connection is still open.
d.a.disconnected(proton::error_condition("oops", "driver failure"));
ASSERT(!d.a.dispatch());
ASSERT(!d.a.connection().closed());
ASSERT(d.a.connection().error().empty());
ASSERT_EQUAL(0u, ha.connection_errors.size());
ASSERT_EQUAL("oops: driver failure", d.a.transport().error().what());
ASSERT_EQUAL(1u, ha.transport_errors.size());
ASSERT_EQUAL("oops: driver failure", ha.transport_errors.front());
// In a real app the IO code would detect the abort and do this:
d.b.disconnected(proton::error_condition("broken", "it broke"));
ASSERT(!d.b.dispatch());
ASSERT(!d.b.connection().closed());
ASSERT(d.b.connection().error().empty());
ASSERT_EQUAL(0u, hb.connection_errors.size());
// Proton-C adds (connection aborted) if transport closes too early,
// and provides a default message if there is no user message.
ASSERT_EQUAL("broken: it broke (connection aborted)", d.b.transport().error().what());
ASSERT_EQUAL(1u, hb.transport_errors.size());
ASSERT_EQUAL("broken: it broke (connection aborted)", hb.transport_errors.front());
}
void test_no_container() {
// An driver with no container should throw, not crash.
connection_driver d;
try {
d.connection().container();
FAIL("expected error");
} catch (const proton::error&) {}
}
void test_spin_interrupt() {
// Check the test framework interrupts a spinning driver pair with nothing to do.
record_handler ha, hb;
driver_pair d(ha, hb);
try {
while (true)
d.process();
FAIL("expected exception");
} catch (const test::error&) {}
}
#define ASSERT_ADDR(ADDR, TERMINUS) do { \
ASSERT_EQUAL((ADDR), (TERMINUS).address()); \
if ((ADDR) == std::string()) ASSERT((TERMINUS).anonymous()); \
else ASSERT(!(TERMINUS).anonymous()); \
} while(0);
#define ASSERT_LINK(SRC, TGT, LINK) do { \
ASSERT_ADDR((SRC), (LINK).source()); \
ASSERT_ADDR((TGT), (LINK).target()); \
} while(0);
void test_link_address() {
record_handler ha, hb;
driver_pair d(ha, hb);
// Using open(address, opts)
d.a.connection().open_sender("tx", sender_options().name("_x").source(source_options().address("sx")));
d.a.connection().open_receiver("sy", receiver_options().name("_y").target(target_options().address("ty")));
while (ha.link_count()+hb.link_count() < 4) d.process();
proton::sender ax = quick_pop(ha.senders);
ASSERT_EQUAL("_x", ax.name());
ASSERT_LINK("sx", "tx", ax);
proton::receiver bx = quick_pop(hb.receivers);
ASSERT_EQUAL("_x", bx.name());
ASSERT_LINK("sx", "tx", bx);
proton::receiver ay = quick_pop(ha.receivers);
ASSERT_EQUAL("_y", ay.name());
ASSERT_LINK("sy", "ty", ay);
proton::sender by = quick_pop(hb.senders);
ASSERT_EQUAL("_y", by.name());
ASSERT_LINK("sy", "ty", by);
// Override address parameter in opts
d.a.connection().open_sender("x", sender_options().target(target_options().address("X")));
d.a.connection().open_receiver("y", receiver_options().source(source_options().address("Y")));
while (ha.link_count()+hb.link_count() < 4) d.process();
ax = quick_pop(ha.senders);
ASSERT_LINK("", "X", ax);
bx = quick_pop(hb.receivers);
ASSERT_LINK("", "X", bx);
ay = quick_pop(ha.receivers);
ASSERT_LINK("Y", "", ay);
by = quick_pop(hb.senders);
ASSERT_LINK("Y", "", by);
}
void test_link_anonymous_dynamic() {
record_handler ha, hb;
driver_pair d(ha, hb);
// Anonymous link should have NULL address
d.a.connection().open_sender("x", sender_options().target(target_options().anonymous(true)));
d.a.connection().open_receiver("y", receiver_options().source(source_options().anonymous(true)));
while (ha.link_count()+hb.link_count() < 4) d.process();
proton::sender ax = quick_pop(ha.senders);
ASSERT_LINK("", "", ax);
proton::receiver bx = quick_pop(hb.receivers);
ASSERT_LINK("", "", bx);
proton::receiver ay = quick_pop(ha.receivers);
ASSERT_LINK("", "", ay);
proton::sender by = quick_pop(hb.senders);
ASSERT_LINK("", "", by);
// Dynamic link should have NULL address and dynamic flag
d.a.connection().open_sender("x", sender_options().target(target_options().dynamic(true)));
d.a.connection().open_receiver("y", receiver_options().source(source_options().dynamic(true)));
while (ha.link_count()+hb.link_count() < 4) d.process();
ax = quick_pop(ha.senders);
ASSERT(ax.target().dynamic());
ASSERT_LINK("", "", ax);
bx = quick_pop(hb.receivers);
ASSERT(bx.target().dynamic());
ASSERT_LINK("", "", bx);
ay = quick_pop(ha.receivers);
ASSERT(ay.source().dynamic());
ASSERT_LINK("", "", ay);
by = quick_pop(hb.senders);
ASSERT(by.source().dynamic());
ASSERT_LINK("", "", by);
// Empty string as a link address is allowed and not considered anonymous.
d.a.connection().open_sender("", sender_options());
d.a.connection().open_receiver("", receiver_options());
while (ha.link_count()+hb.link_count() < 4) d.process();
ax = quick_pop(ha.senders);
ASSERT(ax.target().address().empty());
ASSERT(!ax.target().anonymous());
ay = quick_pop(ha.receivers);
ASSERT(ay.source().address().empty());
ASSERT(!ay.source().anonymous());
}
void test_link_capability_filter() {
record_handler ha, hb;
driver_pair d(ha, hb);
// Capabilities and filters
std::vector<proton::symbol> caps;
caps.push_back("foo");
caps.push_back("bar");
d.a.connection().open_sender("x", sender_options().target(target_options().capabilities(caps)));
source::filter_map f;
f.put("1", "11");
f.put("2", "22");
d.a.connection().open_receiver("y", receiver_options().source(source_options().filters(f).capabilities(caps)));
while (ha.link_count()+hb.link_count() < 4) d.process();
proton::sender ax = quick_pop(ha.senders);
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ax.target().capabilities());
proton::receiver ay = quick_pop(ha.receivers);
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", ay.source().capabilities());
proton::receiver bx = quick_pop(hb.receivers);
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", bx.target().capabilities());
ASSERT_EQUAL(many<proton::symbol>(), bx.source().capabilities());
proton::sender by = quick_pop(hb.senders);
ASSERT_EQUAL(many<proton::symbol>() + "foo" + "bar", by.source().capabilities());
f = by.source().filters();
ASSERT_EQUAL(2U, f.size());
ASSERT_EQUAL(value("11"), f.get("1"));
ASSERT_EQUAL(value("22"), f.get("2"));
}
void test_message() {
// Verify a message arrives intact
record_handler ha, hb;
driver_pair d(ha, hb);
proton::sender s = d.a.connection().open_sender("x");
proton::message m("barefoot");
m.properties().put("x", "y");
m.message_annotations().put("a", "b");
s.send(m);
while (hb.messages.size() == 0)
d.process();
proton::message m2 = quick_pop(hb.messages);
ASSERT_EQUAL(value("barefoot"), m2.body());
ASSERT_EQUAL(value("y"), m2.properties().get("x"));
ASSERT_EQUAL(value("b"), m2.message_annotations().get("a"));
}
void test_message_timeout_succeed() {
// Verify a message arrives intact
record_handler ha, hb;
timed_driver_pair d(duration(2000), ha, hb);
proton::sender s = d.a.connection().open_sender("x");
d.process_timed_succeed();
proton::message m("barefoot_timed_succeed");
m.properties().put("x", "y");
m.message_annotations().put("a", "b");
s.send(m);
while (hb.messages.size() == 0)
d.process_timed_succeed();
proton::message m2 = quick_pop(hb.messages);
ASSERT_EQUAL(value("barefoot_timed_succeed"), m2.body());
ASSERT_EQUAL(value("y"), m2.properties().get("x"));
ASSERT_EQUAL(value("b"), m2.message_annotations().get("a"));
}
void test_message_timeout_fail() {
// Verify a message arrives intact
record_handler ha, hb;
timed_driver_pair d(duration(2000), ha, hb);
proton::sender s = d.a.connection().open_sender("x");
d.process_timed_fail();
proton::message m("barefoot_timed_fail");
m.properties().put("x", "y");
m.message_annotations().put("a", "b");
s.send(m);
d.process_timed_fail();
ASSERT_THROWS(test::error,
while (hb.messages.size() == 0) {
d.process_timed_fail();
}
);
ASSERT_EQUAL(1u, hb.transport_errors.size());
ASSERT_EQUAL("amqp:resource-limit-exceeded: local-idle-timeout expired", d.b.transport().error().what());
ASSERT_EQUAL(1u, ha.connection_errors.size());
ASSERT_EQUAL("amqp:resource-limit-exceeded: local-idle-timeout expired", d.a.connection().error().what());
}
}
int main(int argc, char** argv) {
int failed = 0;
RUN_ARGV_TEST(failed, test_driver_link_id());
RUN_ARGV_TEST(failed, test_endpoint_close());
RUN_ARGV_TEST(failed, test_driver_disconnected());
RUN_ARGV_TEST(failed, test_no_container());
RUN_ARGV_TEST(failed, test_spin_interrupt());
RUN_ARGV_TEST(failed, test_link_address());
RUN_ARGV_TEST(failed, test_link_anonymous_dynamic());
RUN_ARGV_TEST(failed, test_link_capability_filter());
RUN_ARGV_TEST(failed, test_message());
RUN_ARGV_TEST(failed, test_message_timeout_succeed());
RUN_ARGV_TEST(failed, test_message_timeout_fail());
return failed;
}