blob: 4f2f88257d8d36c21d7b56b9a733f45de5df6570 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <proton/raw_connection.h>
#include "proactor/raw_connection-internal.h"
#include "pn_test.hpp"
#ifdef _WIN32
#include <errno.h>
#else
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <arpa/inet.h>
#endif
#include <string.h>
// Raw connection tests driven by a proactor.
// These tests often cheat by directly calling API functions that
// would normally be called in an event callback for thread safety
// reasons. This can usually work because the proactors and API calls
// are all called from a single thread so there is no contention, but
// the raw connection may require a wake so that the state machine and
// polling mask can be updated. Note that wakes stop working around
// the time the raw connection thinks it is about to be fully closed,
// so close operations may need to be done in event callbacks to
// avoid wake uncertainty.
#include "../src/proactor/proactor-internal.h"
#include "./pn_test_proactor.hpp"
#include <proton/event.h>
#include <proton/listener.h>
using namespace pn_test;
namespace {
class common_handler : public handler {
bool close_on_wake_;
bool write_close_on_wake_;
bool stop_on_wake_;
bool abort_on_wake_;
int closed_read_count_;
int closed_write_count_;
int disconnect_count_;
bool disconnect_error_;
pn_raw_connection_t *last_server_;
pn_raw_buffer_t write_buff_;
public:
explicit common_handler() : close_on_wake_(false), write_close_on_wake_(0), stop_on_wake_(false),
abort_on_wake_(false), closed_read_count_(0), closed_write_count_(0),
disconnect_count_(0), disconnect_error_(false),
last_server_(0), write_buff_({0}) {}
void set_close_on_wake(bool b) { close_on_wake_ = b; }
void set_write_close_on_wake(bool b) { write_close_on_wake_ = b; }
void set_stop_on_wake(bool b) { stop_on_wake_ = b; }
void set_abort_on_wake(bool b) { abort_on_wake_ = b; }
int closed_read_count() { return closed_read_count_; }
int closed_write_count() { return closed_write_count_; }
int disconnect_count() { return disconnect_count_; }
bool disconnect_error() { return disconnect_error_; }
pn_raw_connection_t *last_server() { return last_server_; }
void set_write_on_wake(pn_raw_buffer_t *b) { write_buff_ = *b; }
bool handle(pn_event_t *e) override {
switch (pn_event_type(e)) {
/* Always stop on these noteworthy events */
case PN_LISTENER_OPEN:
case PN_LISTENER_CLOSE:
case PN_PROACTOR_INACTIVE:
return true;
case PN_LISTENER_ACCEPT: {
listener = pn_event_listener(e);
pn_raw_connection_t *rc = pn_raw_connection();
pn_listener_raw_accept(listener, rc);
last_server_ = rc;
return false;
} break;
case PN_RAW_CONNECTION_WAKE: {
if (abort_on_wake_) abort();
pn_raw_connection_t *rc = pn_event_raw_connection(e);
if (write_buff_.size) {
// Add the buff for writing before any close operation.
CHECK(pn_raw_connection_write_buffers(rc, &write_buff_, 1) == 1);
write_buff_.size = 0;
}
if (write_close_on_wake_)
pn_raw_connection_write_close(rc);
if (close_on_wake_)
pn_raw_connection_close(rc);
return stop_on_wake_;
} break;
case PN_RAW_CONNECTION_DISCONNECTED: {
disconnect_count_++;
pn_raw_connection_t *rc = pn_event_raw_connection(e);
pn_condition_t *cond = pn_raw_connection_condition(rc);
if (disconnect_count_ == 1 && pn_condition_is_set(cond)) {
const char *nm = pn_condition_get_name(cond);
const char *ds = pn_condition_get_description(cond);
if (nm && strlen(nm) > 0 && ds && strlen(ds) > 0)
disconnect_error_ = true;
}
return false;
} break;
case PN_RAW_CONNECTION_CLOSED_READ:
closed_read_count_++;
return false;
case PN_RAW_CONNECTION_CLOSED_WRITE:
closed_write_count_++;
return false;
default:
return false;
}
}
};
static const size_t buffsz = 128;
// Basic test consisting of
// client is an OS socket.
// server is a pn_raw_connection_t with one shared read/write buffer.
// pn_listener_t used to put the two together.
struct basic_test {
common_handler h;
proactor p;
pn_listener_t *l;
int sockfd; // client
pn_raw_connection_t *server_rc;
char buff[buffsz];
bool buff_in_use;
basic_test() : h(), p(&h) {
l = p.listen();
REQUIRE_RUN(p, PN_LISTENER_OPEN);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
REQUIRE(sockfd >= 0);
struct sockaddr_in laddr;
memset(&laddr, 0, sizeof(laddr));
laddr.sin_family = AF_INET;
laddr.sin_port = htons(atoi(pn_test::listening_port(l).c_str()));
laddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
connect(sockfd, (const struct sockaddr*) &laddr, sizeof(laddr));
REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
server_rc = h.last_server();
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1);
buff_in_use = true;
pn_raw_connection_wake(server_rc);
REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
CHECK(pn_proactor_get(p) == NULL); /* idle */
}
~basic_test() {
pn_listener_close(l);
REQUIRE_RUN(p, PN_LISTENER_CLOSE);
REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
if (sockfd >= 0) close(sockfd);
bool sanity = h.closed_read_count() == 1 && h.closed_write_count() == 1 &&
h.disconnect_count() == 1;
REQUIRE(sanity == true);
}
void socket_write_close() {
if (sockfd < 0) return;
shutdown(sockfd, SHUT_WR);
}
void socket_graceful_close() {
if (sockfd < 0) return;
close(sockfd);
sockfd = -1;
}
bool socket_hard_close() {
// RST (not FIN), hard/abort close
if (sockfd < 0) return false;
struct linger lngr;
lngr.l_onoff = 1;
lngr.l_linger = 0;
if (sockfd < 0) return false;
if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lngr, sizeof(lngr)) == 0) {
if (close(sockfd) == 0) {
sockfd = -1;
return true;
}
}
return false;
}
void drain_read_buffer() {
assert(buff_in_use);
send(sockfd, "FOO", 3, 0);
REQUIRE_RUN(p, PN_RAW_CONNECTION_READ);
pn_raw_buffer_t rb = {0};
REQUIRE(pn_raw_connection_take_read_buffers(server_rc, &rb, 1) == 1);
REQUIRE(rb.size == 3);
buff_in_use = false;
}
void give_read_buffer() {
assert(!buff_in_use);
pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1);
buff_in_use = true;
}
void write_next_wake(const char *m) {
assert(!buff_in_use);
pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0};
size_t l = strlen(m);
assert(l < buffsz);
strcpy(rb.bytes, m);
rb.size = l;
h.set_write_on_wake(&rb);
}
int drain_events() {
int ec = 0;
pn_event_batch_t *batch = NULL;
while ((batch = pn_proactor_get(p.get()))) {
pn_event_t *e;
while ((e = pn_event_batch_next(batch))) {
ec++;
h.dispatch(e);
}
pn_proactor_done(p.get(), batch);
}
return ec;
}
};
} // namespace
// Test waking up a connection that is idle
TEST_CASE("proactor_raw_connection_wake") {
common_handler h;
proactor p(&h);
pn_listener_t *l = p.listen();
REQUIRE_RUN(p, PN_LISTENER_OPEN);
pn_raw_connection_t *rc = pn_raw_connection();
std::string addr = ":" + pn_test::listening_port(l);
pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
CHECK(pn_proactor_get(p) == NULL); /* idle */
pn_raw_connection_wake(rc);
REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
CHECK(pn_proactor_get(p) == NULL); /* idle */
h.set_close_on_wake(true);
pn_raw_connection_wake(rc);
REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
pn_raw_connection_wake(h.last_server());
REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
pn_listener_close(l);
REQUIRE_RUN(p, PN_LISTENER_CLOSE);
REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
}
// Normal close
TEST_CASE("raw_connection_graceful_close") {
struct basic_test x;
x.socket_graceful_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.disconnect_error() == false);
}
// HARD close
TEST_CASE("raw_connection_hardclose") {
struct basic_test x;
x.socket_hard_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.disconnect_error() == true);
}
// HARD close, no read buffer
TEST_CASE("raw_connection_hardclose_nrb") {
struct basic_test x;
// Drain read buffer without replenishing
x.drain_read_buffer();
x.drain_events();
CHECK(pn_proactor_get(x.p) == NULL); /* idle */
x.socket_hard_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.disconnect_error() == true);
}
// HARD close after read close
TEST_CASE("raw_connection_readclose_then_hardclose") {
struct basic_test x;
x.socket_write_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
x.drain_events();
REQUIRE(x.h.disconnect_count() == 0);
x.socket_hard_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.disconnect_error() == true);
}
// HARD close after read close, no read buffer
TEST_CASE("raw_connection_readclose_then_hardclose_nrb") {
struct basic_test x;
// Drain read buffer without replenishing
x.drain_read_buffer();
x.drain_events();
CHECK(pn_proactor_get(x.p) == NULL); /* idle */
// Shut of read side should be ignored with no read buffer.
x.socket_write_close();
CHECK(pn_proactor_get(x.p) == NULL); /* still idle */
// Confirm raw connection shuts down, even with no read buffer
x.socket_hard_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.disconnect_error() == true);
}
// Normal close on socket delays CLOSED_READ event until application makes read buffers available
TEST_CASE("raw_connection_delay_readclose") {
struct basic_test x;
x.drain_read_buffer();
x.socket_graceful_close();
x.drain_events();
REQUIRE(x.h.closed_read_count() == 0);
x.give_read_buffer();
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ);
REQUIRE(x.h.closed_read_count() == 1);
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
}
TEST_CASE("raw_connection_rst_on_write") {
struct basic_test x;
x.drain_read_buffer();
// Send some data
x.write_next_wake("foo");
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WRITTEN);
pn_raw_buffer_t rb = {0};
CHECK(pn_raw_connection_take_written_buffers(x.server_rc, &rb, 1) == 1);
char b[buffsz];
REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3);
// Repeat, with closed peer socket.
x.socket_graceful_close();
x.write_next_wake("bar");
pn_raw_connection_wake(x.server_rc);
// Write or subsequent poll should fail EPIPE
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.disconnect_error() == true);
}
// One sided close. No cooperation from peer.
TEST_CASE("raw_connection_full_close") {
struct basic_test x;
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
// No send/recv/close/shutdown activity from peer socket.
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
}
// As above. No read buffer.
TEST_CASE("raw_connection_full_close_nrb") {
struct basic_test x;
x.drain_read_buffer();
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
// No send/recv/close/shutdown activity from peer socket.
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
}
// One sided close, pending write.
TEST_CASE("raw_connection_close_wdrain") {
struct basic_test x;
x.drain_read_buffer();
// write and then close on next wake
x.write_next_wake("fubar");
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
// No send/recv/close/shutdown activity from peer socket.
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
// Now check fubar made it
char b[buffsz];
REQUIRE(recv(x.sockfd, b, buffsz, 0) == 5);
REQUIRE(strncmp("fubar", b, 5) == 0);
}
// One sided write_close then close.
TEST_CASE("raw_connection_wclose_full_close") {
struct basic_test x;
x.h.set_write_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE);
x.drain_events();
REQUIRE(x.h.closed_read_count() == 0);
x.h.set_write_close_on_wake(false);
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
// No send/recv/close/shutdown activity from peer socket.
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
}
TEST_CASE("raw_connection_wclose_full_close_nrb") {
struct basic_test x;
x.drain_read_buffer();
x.h.set_write_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE);
x.drain_events();
REQUIRE(x.h.closed_read_count() == 0);
x.h.set_write_close_on_wake(false);
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
// No send/recv/close/shutdown activity from peer socket.
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
}
TEST_CASE("raw_connection_wclose_full_close_wdrain") {
struct basic_test x;
x.drain_read_buffer();
// write and then wclose then close on next wake
x.write_next_wake("bar");
x.h.set_write_close_on_wake(true);
x.h.set_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE);
// No send/recv/close/shutdown activity from peer socket.
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
// Now check bar made it
char b[buffsz];
REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3);
REQUIRE(strncmp("bar", b, 3) == 0);
}
// Half closes each direction. Raw connection then peer.
TEST_CASE("raw_connection_wclose_then_rclose") {
struct basic_test x;
x.h.set_write_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
x.drain_events();
REQUIRE(x.h.closed_write_count() == 1);
REQUIRE(x.h.closed_read_count() == 0);
char b[buffsz];
REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF
x.socket_write_close();
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
REQUIRE(x.h.closed_read_count() == 1);
}
// As above but peer first then raw connection.
TEST_CASE("raw_connection_rclose_then_wclose") {
struct basic_test x;
x.socket_write_close();
x.drain_events();
REQUIRE(x.h.closed_read_count() == 1);
REQUIRE(x.h.closed_write_count() == 0);
x.h.set_write_close_on_wake(true);
pn_raw_connection_wake(x.server_rc);
REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED);
char b[buffsz];
REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF
REQUIRE(x.h.closed_write_count() == 1);
}