blob: 40ceca78f69cd13f0491900c05b07e045a9177e2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include "./pn_test_proactor.hpp"
#include "./thread.h"
#include <proton/condition.h>
#include <proton/connection.h>
#include <proton/delivery.h>
#include <proton/event.h>
#include <proton/link.h>
#include <proton/listener.h>
#include <proton/message.h>
#include <proton/netaddr.h>
#include <proton/object.h>
#include <proton/proactor.h>
#include <proton/transport.h>
namespace pn_test {
std::string listening_port(pn_listener_t *l) {
const pn_netaddr_t *na = pn_listener_addr(l);
char port[PN_MAX_ADDR];
pn_netaddr_host_port(na, NULL, 0, port, sizeof(port));
return port;
proactor::proactor(struct handler *h)
: auto_free<pn_proactor_t, pn_proactor_free>(pn_proactor()), handler(h) {}
bool proactor::dispatch(pn_event_t *e) {
void *ctx = NULL;
if (pn_event_listener(e))
ctx = pn_listener_get_context(pn_event_listener(e));
else if (pn_event_connection(e))
ctx = pn_connection_get_context(pn_event_connection(e));
struct handler *h = ctx ? reinterpret_cast<struct handler *>(ctx) : handler;
bool ret = h ? h->dispatch(e) : false;
// pn_test::handler doesn't know about listeners so save listener condition
// here.
if (pn_event_listener(e)) {
return ret;
// RAII for event batches
class auto_batch {
pn_proactor_t *p_;
pn_event_batch_t *b_;
auto_batch(pn_proactor_t *p, pn_event_batch_t *b) : p_(p), b_(b) {}
~auto_batch() {
if (b_) pn_proactor_done(p_, b_);
pn_event_t *next() { return b_ ? pn_event_batch_next(b_) : NULL; }
pn_event_batch_t *get() { return b_; }
pn_event_type_t proactor::run(pn_event_type_t stop) {
// This will hang in the underlying poll if test expectations are never met.
// Not ideal, but easier to debug than race conditions caused by buggy
// test-harness code that attempts to detect the problem and recover early.
// The larger test or CI harness will kill us after some time limit
while (true) {
auto_batch b(*this, pn_proactor_wait(*this));
if (b.get()) {
pn_event_t *e;
while ((e = {
pn_event_type_t et = pn_event_type(e);
if (dispatch(e) || et == stop) return et;
std::pair<int, pn_event_type_t> proactor::flush(pn_event_type_t stop) {
auto_batch b(*this, pn_proactor_get(*this));
int n = 0;
if (b.get()) {
pn_event_t *e;
while ((e = {
pn_event_type_t et = pn_event_type(e);
if (dispatch(e) || et == stop) return std::make_pair(n, et);
return std::make_pair(n, PN_EVENT_NONE);
pn_event_type_t proactor::corun(proactor &other, pn_event_type_t stop) {
// We can't wait() on either proactor as it might be idle.
// We can't give up immediately if both proactors are idle
// something happens on the other, so spin between the two for a limited
// number of attempts that should be large enough if the test is going to
// pass.
int spin_limit = 1000;
do {
std::pair<int, pn_event_type_t> n_et = flush(stop);
if (n_et.second) return n_et.second;
if (n_et.first + other.flush().first == 0) {
// Both idle, sleep and retry in case network traffic is in flight.
} while (spin_limit);
pn_event_type_t proactor::wait_next() {
// pn_proactor_wait() should never return an empty batch, so we shouldn't need
// a loop here. Due to bug
// we need to re-wait if we get an empty batch.
// To reproduce PROTON-1964 remove the loop below and run
// TEST_CASE("proactor_proton_1586") from proactor_test.cpp
// You will pn_proactor_wait() return a non-NULL batch, but the
// first call to pn_event_batch_next() returns a NULL event.
while (true) {
auto_batch b(*this, pn_proactor_wait(*this));
pn_event_t *e =;
if (e) {
return pn_event_type(e);
} // Try again on an empty batch.
pn_listener_t *proactor::listen(const std::string &addr,
struct handler *handler) {
pn_listener_t *l = pn_listener();
pn_listener_set_context(l, handler);
pn_proactor_listen(*this, l, addr.c_str(), 4);
return l;
pn_connection_t *proactor::connect(const std::string &addr, struct handler *h,
pn_connection_t *c) {
if (!c) c = pn_connection();
if (h) pn_connection_set_context(c, h);
pn_proactor_connect2(*this, c, NULL, addr.c_str());
return c;
} // namespace pn_test