blob: a6993cb3a1423a5f13d1145be8eabf87d87beae7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "test_bits.hpp"
#include "proton/connection.hpp"
#include "proton/connection_options.hpp"
#include "proton/container.hpp"
#include "proton/delivery.hpp"
#include "proton/error_condition.hpp"
#include "proton/listen_handler.hpp"
#include "proton/listener.hpp"
#include "proton/message.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/receiver_options.hpp"
#include "proton/transport.hpp"
#include "proton/work_queue.hpp"
#include "proton/internal/pn_unique_ptr.hpp"
#include <cstdlib>
#include <ctime>
#include <string>
#include <cstdio>
#include <sstream>
namespace {
// Wait for N things to be done.
class waiter {
size_t count;
public:
waiter(size_t n) : count(n) {}
void done() { if (--count == 0) ready(); }
virtual void ready() = 0;
};
class server_connection_handler : public proton::messaging_handler {
struct listen_handler : public proton::listen_handler {
proton::connection_options opts;
std::string url;
waiter& listen_waiter;
listen_handler(proton::messaging_handler& h, waiter& w) : listen_waiter(w) {
opts.handler(h);
}
void on_open(proton::listener& l) override {
std::ostringstream o;
o << "//:" << l.port(); // Connect to the actual listening port
url = o.str();
// Schedule rather than call done() direct to ensure serialization
l.container().schedule(proton::duration::IMMEDIATE,
proton::make_work(&waiter::done, &listen_waiter));
}
proton::connection_options on_accept(proton::listener&) override { return opts; }
};
proton::listener listener_;
proton::sender sender_;
int expect_;
bool closing_;
int available_;
int acked_;
listen_handler listen_handler_;
proton::work_queue *notify_wq_;
proton::work notify_work_;
void close (proton::connection &c) {
if (closing_) return;
c.close(proton::error_condition("amqp:connection:forced", "Failover testing"));
closing_ = true;
}
public:
server_connection_handler(proton::container& c, int a, waiter& w)
: expect_(0), closing_(false), available_(a), acked_(0), listen_handler_(*this, w), notify_wq_(0)
{
listener_ = c.listen("//:0", listen_handler_);
}
std::string url() const {
if (listen_handler_.url.empty()) throw std::runtime_error("no url");
return listen_handler_.url;
}
void on_connection_open(proton::connection &c) override {
// Only listen for a single connection
listener_.stop();
c.open();
}
void on_sender_open(proton::sender &s) override {
s.open();
sender_ = s;
}
void on_sendable(proton::sender &s) override {
send_available_messages(s);
}
void send_available_messages(proton::sender &s) {
bool draining = s.draining();
while (available_ && s.credit() > 0) {
s.send(proton::message("hello"));
available_--;
expect_++;
}
if (draining && !available_ && s.credit()) {
s.return_credit(); // return the rest
}
}
void notify_idle() {
notify_wq_->add(notify_work_);
}
void on_tracker_accept(proton::tracker& d) override {
acked_++;
if (acked_ == expect_ && (available_ == 0 || d.sender().credit() == 0))
notify_idle();
}
void on_transport_error(proton::transport & ) override {
// If we get an error then (try to) stop the listener
// - this will stop the listener if we didn't already accept a connection
listener_.stop();
}
void notify_on_idle(proton::work_queue &wq, proton::work &w) { notify_wq_ = &wq; notify_work_ = w;}
void available(int i) {
available_ = i;
}
};
class tester : public proton::messaging_handler, public waiter {
public:
tester() : waiter(1), container_(*this, "credit_tester"),
received_(0), initial_credit_(0) {}
void on_container_start(proton::container &c) override {
srv_.reset(new server_connection_handler(c, 100000, *this));
}
// waiter::ready is called when listener can accept connections.
void ready() override {
container_.connect(srv_->url());
}
void on_connection_open(proton::connection& c) override {
c.open_receiver("messages", proton::receiver_options().credit_window(0));
}
void on_receiver_open(proton::receiver &r) override {
receiver_ = r;
next_idle_ = proton::make_work(&tester::first_idle, this);
proton::work call_on_server_idle(make_work(&tester::on_server_idle, this));
srv_->notify_on_idle(r.work_queue(), call_on_server_idle);
r.add_credit(initial_credit_);
}
void on_message(proton::delivery &d, proton::message &m) override {
received_++;
d.accept();
}
void run() {
container_.run(); // Single threaded to avoid locks and barriers.
}
void server_available(int available) {
// If multithreaded, locking would be required.
srv_->available(available);
}
void on_server_idle() {
next_idle_();
}
void fail(const std::string &msg, int rcv) {
// Call from work_queue. Remember the exception later.
std::ostringstream os;
os << msg << rcv;
fail_msg_ = os.str();
receiver_.connection().close();
}
void on_connection_close(proton::connection& c) override {
if (!fail_msg_.empty())
FAIL(fail_msg_);
}
virtual void first_idle() = 0;
protected:
proton::internal::pn_unique_ptr<server_connection_handler> srv_;
proton::container container_;
proton::receiver receiver_;
proton::work next_idle_;
std::string fail_msg_;
int received_;
int initial_credit_;
};
class basic_credit_tester : public tester {
public:
basic_credit_tester() { initial_credit_ = 3; }
void first_idle() override {
if (received_ != 3) {
fail(FAIL_MSG("messages received should be 3 not "), received_);
return;
}
next_idle_ = proton::make_work(&basic_credit_tester::second_idle, this);
server_available(2);
receiver_.add_credit(3);
}
void second_idle() {
if (received_ != 5) {
fail(FAIL_MSG("messages received should be 5 not "), received_);
return;
}
next_idle_ = proton::make_work(&basic_credit_tester::third_idle, this);
server_available(10);
receiver_.add_credit(1);
}
void third_idle() {
if (received_ != 7) {
fail(FAIL_MSG("messages received should be 7 not "), received_);
return;
}
// passed
receiver_.connection().close();
}
};
int test_basic_credit() {
basic_credit_tester().run();
return 0;
}
class drain_credit_tester : public tester {
int drain_finishes_;
public:
drain_credit_tester() : drain_finishes_(0) { initial_credit_ = 10; }
void on_receiver_drain_finish(proton::receiver &r) override {
drain_finishes_++;
}
void first_idle() override {
if (received_ != 10) {
fail(FAIL_MSG("messages received should be 10 not "), received_);
return;
}
next_idle_ = proton::make_work(&drain_credit_tester::second_idle, this);
server_available(10);
receiver_.add_credit(15);
receiver_.drain();
}
void second_idle() {
if (received_ != 20) {
fail(FAIL_MSG("messages received should be 20 not "), received_);
return;
}
if (drain_finishes_ != 1) {
fail(FAIL_MSG("drain finish callbacks should be 1, not: "), drain_finishes_);
return;
}
if (receiver_.credit() != 0) {
fail(FAIL_MSG("credit not returned on drain, remaining: "), receiver_.credit());
return;
}
next_idle_ = proton::make_work(&drain_credit_tester::third_idle, this);
server_available(5);
receiver_.add_credit(10);
}
void third_idle() {
if (received_ != 25) {
fail(FAIL_MSG("messages received should be 20 not "), received_);
return;
}
if (receiver_.credit() != 5) {
fail(FAIL_MSG("incorrect credit after drain, should be 5, not "), receiver_.credit());
return;
}
next_idle_ = proton::make_work(&drain_credit_tester::fourth_idle, this);
server_available(3);
receiver_.add_credit(1);
}
void fourth_idle() {
if (received_ != 28) {
fail(FAIL_MSG("messages received should be 28 not "), received_);
return;
}
if (receiver_.credit() != 3) {
fail(FAIL_MSG("incorrect credit, should be 3, not "), receiver_.credit());
return;
}
next_idle_ = proton::make_work(&drain_credit_tester::fifth_idle, this);
server_available(1);
receiver_.drain();
}
void fifth_idle() {
if (received_ != 29) {
fail(FAIL_MSG("messages received should be 29 not "), received_);
return;
}
if (drain_finishes_ != 2) {
fail(FAIL_MSG("drain finish callbacks should be 2, not: "), drain_finishes_);
return;
}
if (receiver_.credit() != 0) {
fail(FAIL_MSG("second drain credit failed, should be 0, not "), receiver_.credit());
return;
}
// passed
receiver_.connection().close();
}
};
int test_drain_credit() {
drain_credit_tester().run();
return 0;
}
} // namespace
int main(int argc, char** argv) {
int failed = 0;
RUN_ARGV_TEST(failed, test_basic_credit());
RUN_ARGV_TEST(failed, test_drain_credit());
return failed;
}