blob: 2ad36e2d023482176206ae725cb8dffea71f6c45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "test_bits.hpp"
#include "uuid.hpp"
#include <proton/connection_engine.hpp>
#include <proton/handler.hpp>
#include <proton/event.hpp>
#include <proton/types.hpp>
#include <proton/link.hpp>
#include <deque>
#include <algorithm>
using namespace proton;
using namespace test;
// One end of an in-memory connection
struct mem_pipe {
mem_pipe(std::deque<char>& r, std::deque<char>& w) : read(r), write(w) {}
struct std::deque<char> &read, &write;
};
struct mem_queues : public std::pair<std::deque<char>, std::deque<char> > {
mem_pipe a() { return mem_pipe(first, second); }
mem_pipe b() { return mem_pipe(second, first); }
};
// In memory connection_engine
struct mem_engine : public connection_engine {
mem_pipe socket;
std::string read_error;
std::string write_error;
mem_engine(mem_pipe s, handler &h, const connection_options &opts)
: connection_engine(h, opts), socket(s) {}
size_t io_read(char* buf, size_t size) {
if (!read_error.empty()) throw io_error(read_error);
size = std::min(socket.read.size(), size);
copy(socket.read.begin(), socket.read.begin()+size, buf);
socket.read.erase(socket.read.begin(), socket.read.begin()+size);
return size;
}
size_t io_write(const char* buf, size_t size) {
if (!write_error.empty()) throw io_error(write_error);
socket.write.insert(socket.write.begin(), buf, buf+size);
return size;
}
void io_close() {
read_error = write_error = "closed";
}
};
struct debug_handler : handler {
void on_unhandled(event& e) {
std::cout << e.name() << std::endl;
}
};
struct record_handler : handler {
std::deque<std::string> events;
void on_unhandled(event& e) {
events.push_back(e.name());
}
};
template <class HA=record_handler, class HB=record_handler> struct engine_pair {
connection_engine::container cont;
mem_queues queues;
HA ha;
HB hb;
mem_engine a, b;
engine_pair() : a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options()) {}
engine_pair(const std::string& id)
: cont(id), a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options())
{}
engine_pair(const connection_options &aopts, connection_options &bopts)
: a(queues.a(), ha, aopts), b(queues.b(), hb, bopts)
{}
void process() { a.process(); b.process(); }
};
void test_process_amqp() {
engine_pair<> e;
e.a.process(connection_engine::READ); // Don't write unlesss writable
ASSERT(e.a.socket.write.empty());
e.a.process(connection_engine::WRITE);
std::string wrote(e.a.socket.write.begin(), e.a.socket.write.end());
e.a.process(connection_engine::WRITE);
ASSERT_EQUAL(8, wrote.size());
ASSERT_EQUAL("AMQP", wrote.substr(0,4));
e.b.process(); // Read and write AMQP
ASSERT_EQUAL("AMQP", std::string(e.b.socket.write.begin(), e.b.socket.write.begin()+4));
ASSERT(e.b.socket.read.empty());
ASSERT(e.a.socket.write.empty());
ASSERT_EQUAL(many<std::string>() + "START", e.ha.events);
}
struct link_handler : public record_handler {
std::deque<proton::link> links;
void on_link_open(event& e) {
links.push_back(e.link());
}
proton::link pop() {
proton::link l;
if (!links.empty()) {
l = links.front();
links.pop_front();
}
return l;
}
};
void test_engine_prefix() {
// Set container ID and prefix explicitly
engine_pair<link_handler, link_handler> e(
connection_options().container_id("a").link_prefix("x/"),
connection_options().container_id("b").link_prefix("y/"));
e.a.connection().open();
ASSERT_EQUAL("a", e.a.connection().container_id());
e.b.connection().open();
ASSERT_EQUAL("b", e.b.connection().container_id());
e.a.connection().open_sender("");
while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
ASSERT_EQUAL("x/1", e.ha.pop().name());
ASSERT_EQUAL("x/1", e.hb.pop().name());
e.a.connection().open_receiver("");
while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
ASSERT_EQUAL("x/2", e.ha.pop().name());
ASSERT_EQUAL("x/2", e.hb.pop().name());
e.b.connection().open_receiver("");
while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
ASSERT_EQUAL("y/1", e.ha.pop().name());
ASSERT_EQUAL("y/1", e.hb.pop().name());
}
void test_container_prefix() {
/// Let the container set the options.
engine_pair<link_handler, link_handler> e;
e.a.connection().open();
e.a.connection().open_sender("x");
while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
ASSERT_EQUAL("1/1", e.ha.pop().name());
ASSERT_EQUAL("1/1", e.hb.pop().name());
e.a.connection().open_receiver("y");
while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
ASSERT_EQUAL("1/2", e.ha.pop().name());
ASSERT_EQUAL("1/2", e.hb.pop().name());
e.b.connection().open_receiver("z");
while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
ASSERT_EQUAL("2/1", e.ha.pop().name());
ASSERT_EQUAL("2/1", e.hb.pop().name());
// TODO aconway 2016-01-22: check we respect name set in linkn-options.
};
int main(int, char**) {
int failed = 0;
RUN_TEST(failed, test_process_amqp());
RUN_TEST(failed, test_engine_prefix());
RUN_TEST(failed, test_container_prefix());
return failed;
}