| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "../src/proactor/proactor-internal.h" |
| #include "./pn_test_proactor.hpp" |
| #include "./test_config.h" |
| |
| #include <proton/condition.h> |
| #include <proton/connection.h> |
| #include <proton/delivery.h> |
| #include <proton/event.h> |
| #include <proton/link.h> |
| #include <proton/listener.h> |
| #include <proton/netaddr.h> |
| #include <proton/proactor.h> |
| #include <proton/session.h> |
| #include <proton/ssl.h> |
| #include <proton/transport.h> |
| |
| #include <string.h> |
| |
| #include <iostream> |
| |
| using namespace pn_test; |
| using Catch::Matchers::Contains; |
| using Catch::Matchers::Equals; |
| |
| /* Test that interrupt and timeout events cause pn_proactor_wait() to return. */ |
| TEST_CASE("proactor_interrupt_timeout") { |
| proactor p; |
| |
| CHECK(pn_proactor_get(p) == NULL); /* idle */ |
| pn_proactor_interrupt(p); |
| CHECK(PN_PROACTOR_INTERRUPT == p.wait_next()); |
| CHECK(pn_proactor_get(p) == NULL); /* idle */ |
| |
| /* Set an immediate timeout */ |
| pn_proactor_set_timeout(p, 0); |
| CHECK(PN_PROACTOR_TIMEOUT == p.wait_next()); |
| CHECK(PN_PROACTOR_INACTIVE == p.wait_next()); |
| |
| /* Set a (very short) timeout */ |
| pn_proactor_set_timeout(p, 1); |
| CHECK(PN_PROACTOR_TIMEOUT == p.wait_next()); |
| CHECK(PN_PROACTOR_INACTIVE == p.wait_next()); |
| |
| /* Set and cancel a timeout, make sure we don't get the timeout event */ |
| pn_proactor_set_timeout(p, 10000000); |
| pn_proactor_cancel_timeout(p); |
| CHECK(PN_PROACTOR_INACTIVE == p.wait_next()); |
| CHECK(pn_proactor_get(p) == NULL); /* idle */ |
| } |
| |
| namespace { |
| |
| class common_handler : public handler { |
| handler *accept_; // Handler for accepted connections |
| |
| public: |
| common_handler(handler *accept = 0) : accept_(accept) {} |
| |
| bool handle(pn_event_t *e) CATCH_OVERRIDE { |
| switch (pn_event_type(e)) { |
| /* Always stop on these noteworthy events */ |
| case PN_TRANSPORT_ERROR: |
| case PN_LISTENER_OPEN: |
| case PN_LISTENER_CLOSE: |
| case PN_PROACTOR_INACTIVE: |
| return true; |
| |
| case PN_LISTENER_ACCEPT: |
| listener = pn_event_listener(e); |
| connection = pn_connection(); |
| if (accept_) pn_connection_set_context(connection, accept_); |
| pn_listener_accept2(listener, connection, NULL); |
| return false; |
| |
| // Return remote opens |
| case PN_CONNECTION_REMOTE_OPEN: |
| pn_connection_open(pn_event_connection(e)); |
| return false; |
| case PN_SESSION_REMOTE_OPEN: |
| pn_session_open(pn_event_session(e)); |
| return false; |
| case PN_LINK_REMOTE_OPEN: |
| pn_link_open(pn_event_link(e)); |
| return false; |
| |
| // Return remote closes |
| case PN_CONNECTION_REMOTE_CLOSE: |
| pn_connection_close(pn_event_connection(e)); |
| return false; |
| case PN_SESSION_REMOTE_CLOSE: |
| pn_session_close(pn_event_session(e)); |
| return false; |
| case PN_LINK_REMOTE_CLOSE: |
| pn_link_close(pn_event_link(e)); |
| return false; |
| |
| default: |
| return false; |
| } |
| } |
| }; |
| |
| /* close a connection when it is remote open */ |
| struct close_on_open_handler : public common_handler { |
| bool handle(pn_event_t *e) CATCH_OVERRIDE { |
| switch (pn_event_type(e)) { |
| case PN_CONNECTION_REMOTE_OPEN: |
| pn_connection_close(pn_event_connection(e)); |
| return false; |
| default: |
| return common_handler::handle(e); |
| } |
| } |
| }; |
| |
| } // namespace |
| |
| /* Test simple client/server connection that opens and closes */ |
| TEST_CASE("proactor_connect") { |
| close_on_open_handler h; |
| proactor p(&h); |
| /* Connect and wait for close at both ends */ |
| pn_listener_t *l = p.listen(":0", &h); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| p.connect(l); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| } |
| |
| namespace { |
| /* Return on connection open, close and return on wake */ |
| struct close_on_wake_handler : public common_handler { |
| bool handle(pn_event_t *e) CATCH_OVERRIDE { |
| switch (pn_event_type(e)) { |
| case PN_CONNECTION_WAKE: |
| pn_connection_close(pn_event_connection(e)); |
| return true; |
| default: |
| return common_handler::handle(e); |
| } |
| } |
| }; |
| } // namespace |
| |
| // Test waking up a connection that is idle |
| TEST_CASE("proactor_connection_wake") { |
| common_handler h; |
| proactor p(&h); |
| close_on_wake_handler wh; |
| pn_listener_t *l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| pn_connection_t *c = p.connect(l, &wh); |
| pn_incref(c); /* Keep a reference for wake() after free */ |
| |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| while (p.flush().first != 0); |
| pn_connection_wake(c); |
| REQUIRE_RUN(p, PN_CONNECTION_WAKE); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); /* Both ends */ |
| |
| /* Verify we don't get a wake after close even if they happen together */ |
| pn_connection_t *c2 = p.connect(l, &wh); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| pn_connection_wake(c2); |
| pn_proactor_disconnect(p, NULL); |
| pn_connection_wake(c2); |
| |
| for (pn_event_type_t et = p.run(); et != PN_PROACTOR_INACTIVE; et = p.run()) { |
| switch (et) { |
| case PN_TRANSPORT_ERROR: |
| case PN_TRANSPORT_CLOSED: |
| case PN_LISTENER_CLOSE: |
| break; // expected |
| default: |
| FAIL("Unexpected event type: " << et); |
| } |
| } |
| // The pn_connection_t is still valid so wake is legal but a no-op. |
| // Make sure there's no memory error. |
| pn_connection_wake(c); |
| pn_decref(c); |
| } |
| |
| namespace { |
| struct abort_handler : public common_handler { |
| bool handle(pn_event_t *e) { |
| switch (pn_event_type(e)) { |
| case PN_CONNECTION_REMOTE_OPEN: |
| /* Close the transport - abruptly closes the socket */ |
| pn_transport_close_tail(pn_connection_transport(pn_event_connection(e))); |
| pn_transport_close_head(pn_connection_transport(pn_event_connection(e))); |
| return false; |
| |
| default: |
| return common_handler::handle(e); |
| } |
| } |
| }; |
| } // namespace |
| |
| /* Verify that pn_transport_close_head/tail aborts a connection without an AMQP |
| * protocol close */ |
| TEST_CASE("proactor_abort") { |
| abort_handler sh; // Handle listener and server side of connection |
| proactor p(&sh); |
| pn_listener_t *l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| common_handler ch; // Handle client side of connection |
| pn_connection_t *c = p.connect(l, &ch); |
| |
| /* server transport closes */ |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*sh.last_condition, |
| cond_matches("amqp:connection:framing-error", "abort")); |
| |
| /* client transport closes */ |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*ch.last_condition, |
| cond_matches("amqp:connection:framing-error", "abort")); |
| pn_listener_close(l); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| /* Verify expected event sequences, no unexpected events */ |
| CHECK_THAT(ETYPES(PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, |
| PN_CONNECTION_BOUND, PN_TRANSPORT_TAIL_CLOSED, |
| PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, |
| PN_TRANSPORT_CLOSED), |
| Equals(ch.log_clear())); |
| CHECK_THAT(ETYPES(PN_LISTENER_OPEN, PN_LISTENER_ACCEPT, PN_CONNECTION_INIT, |
| PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN, |
| PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, |
| PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED, |
| PN_LISTENER_CLOSE, PN_PROACTOR_INACTIVE), |
| Equals(sh.log_clear())); |
| } |
| |
| /* Test that INACTIVE event is generated when last connections/listeners closes. |
| */ |
| TEST_CASE("proactor_inactive") { |
| close_on_wake_handler h; |
| proactor p(&h); |
| |
| /* Listen, connect, disconnect */ |
| pn_listener_t *l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| pn_connection_t *c = p.connect(l, &h); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| pn_connection_wake(c); |
| REQUIRE_RUN(p, PN_CONNECTION_WAKE); |
| /* Expect TRANSPORT_CLOSED both ends */ |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| pn_listener_close(l); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| /* Immediate timer generates INACTIVE (no connections) */ |
| pn_proactor_set_timeout(p, 0); |
| REQUIRE_RUN(p, PN_PROACTOR_TIMEOUT); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| /* Connect, set-timer, disconnect */ |
| l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| c = p.connect(l, &h); |
| pn_proactor_set_timeout(p, 1000000); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| pn_connection_wake(c); |
| REQUIRE_RUN(p, PN_CONNECTION_WAKE); |
| /* Expect TRANSPORT_CLOSED from client and server */ |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| pn_listener_close(l); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| /* No INACTIVE till timer is cancelled */ |
| CHECK(!pn_proactor_get(p)); // idle |
| pn_proactor_cancel_timeout(p); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| } |
| |
| /* Tests for error handling */ |
| TEST_CASE("proactor_errors") { |
| close_on_wake_handler h; |
| proactor p(&h); |
| /* Invalid connect/listen service name */ |
| p.connect("127.0.0.1:xxx"); |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io", "xxx")); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| pn_listener_t *l = pn_listener(); |
| pn_proactor_listen(p, l, "127.0.0.1:xxx", 1); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io", "xxx")); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| /* Invalid connect/listen host name */ |
| p.connect("nosuch.example.com:"); |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch")); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| pn_proactor_listen(p, pn_listener(), "nosuch.example.com:", 1); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch")); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| /* Listen on a port already in use */ |
| l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| std::string laddr = ":" + listening_port(l); |
| p.listen(laddr); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io")); |
| |
| pn_listener_close(l); |
| REQUIRE_RUN(p, PN_LISTENER_CLOSE); |
| REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); |
| |
| /* Connect with no listener */ |
| p.connect(laddr); |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io", "refused")); |
| } |
| |
| namespace { |
| /* Closing the connection during PN_TRANSPORT_ERROR should be a no-op |
| * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586 |
| */ |
| struct transport_close_connection_handler : public common_handler { |
| bool handle(pn_event_t *e) { |
| switch (pn_event_type(e)) { |
| case PN_TRANSPORT_ERROR: |
| pn_connection_close(pn_event_connection(e)); |
| break; |
| default: |
| return common_handler::handle(e); |
| } |
| return PN_EVENT_NONE; |
| } |
| }; |
| } // namespace |
| |
| /* Closing the connection during PN_TRANSPORT_ERROR due to connection failure |
| * should be a no-op. Regression test for: |
| * https://issues.apache.org/jira/browse/PROTON-1586 |
| */ |
| TEST_CASE("proactor_proton_1586") { |
| transport_close_connection_handler h; |
| proactor p(&h); |
| p.connect(":yyy"); |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| CHECK_THAT(*h.last_condition, cond_matches("proton:io", ":yyy")); |
| |
| // No events expected after PN_TRANSPORT_CLOSED, proactor is inactive. |
| CHECK(PN_PROACTOR_INACTIVE == p.wait_next()); |
| } |
| |
| /* Test that we can control listen/select on ipv6/v4 and listen on both by |
| * default */ |
| TEST_CASE("proactor_ipv4_ipv6") { |
| close_on_open_handler h; |
| proactor p(&h); |
| |
| /* Listen on all interfaces for IPv4 only. */ |
| pn_listener_t *l4 = p.listen("0.0.0.0:0"); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| /* Empty address listens on both IPv4 and IPv6 on all interfaces */ |
| pn_listener_t *l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| |
| #define EXPECT_CONNECT(LISTENER, HOST) \ |
| do { \ |
| p.connect(std::string(HOST) + ":" + listening_port(LISTENER)); \ |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); \ |
| CHECK_THAT(*h.last_condition, cond_empty()); \ |
| } while (0) |
| |
| EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */ |
| EXPECT_CONNECT(l4, ""); /* local->v4*/ |
| |
| EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */ |
| EXPECT_CONNECT(l, ""); /* local->all */ |
| |
| /* Listen on ipv6 loopback, if it fails skip ipv6 tests. |
| |
| NOTE: Don't use the unspecified address "::" here - ipv6-disabled platforms |
| may allow listening on "::" without complaining. However they won't have a |
| local ipv6 loopback configured, so "::1" will force an error. |
| */ |
| pn_listener_t *l6 = pn_listener(); |
| pn_proactor_listen(p, l6, "::1:0", 4); |
| pn_event_type_t e = p.run(); |
| if (e == PN_LISTENER_OPEN && !pn_condition_is_set(h.last_condition)) { |
| EXPECT_CONNECT(l6, "::1"); /* v6->v6 */ |
| EXPECT_CONNECT(l6, ""); /* local->v6 */ |
| EXPECT_CONNECT(l, "::1"); /* v6->all */ |
| pn_listener_close(l6); |
| } else { |
| WARN("skip IPv6 tests: %s %s" << e << *h.last_condition); |
| } |
| |
| pn_listener_close(l); |
| pn_listener_close(l4); |
| } |
| |
| /* Make sure we clean up released connections and open sockets |
| * correctly */ |
| TEST_CASE("proactor_release_free") { |
| common_handler h; |
| proactor p(&h); |
| |
| pn_listener_t *l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| /* leave one connection to the proactor */ |
| pn_connection_t *c = p.connect(l); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| |
| { |
| /* release c1 and free immediately */ |
| auto_free<pn_connection_t, pn_connection_free> c1(p.connect(l)); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| pn_proactor_release_connection(c1); |
| } |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| |
| /* release c2 and but don't free till after proactor free */ |
| auto_free<pn_connection_t, pn_connection_free> c2(p.connect(l)); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| pn_proactor_release_connection(c2); |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| |
| // OK to free a listener/connection that was never used by a |
| // proactor. |
| pn_listener_free(pn_listener()); |
| pn_connection_free(pn_connection()); |
| } |
| |
| #define SSL_FILE(NAME) CMAKE_CURRENT_SOURCE_DIR "/ssl-certs/" NAME |
| #define SSL_PW "tserverpw" |
| /* Windows vs. OpenSSL certificates */ |
| #if defined(_WIN32) |
| #define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12") |
| #define SET_CREDENTIALS(DOMAIN, NAME) \ |
| pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW) |
| #else |
| #define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem") |
| #define SET_CREDENTIALS(DOMAIN, NAME) \ |
| pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), \ |
| SSL_FILE(NAME "-private-key.pem"), SSL_PW) |
| #endif |
| |
| namespace { |
| |
| struct ssl_handler : public common_handler { |
| auto_free<pn_ssl_domain_t, pn_ssl_domain_free> ssl_domain; |
| |
| ssl_handler(pn_ssl_domain_t *d) : ssl_domain(d) {} |
| |
| bool handle(pn_event_t *e) { |
| switch (pn_event_type(e)) { |
| |
| case PN_CONNECTION_BOUND: |
| CHECK(0 == pn_ssl_init(pn_ssl(pn_event_transport(e)), ssl_domain, NULL)); |
| return false; |
| |
| case PN_CONNECTION_REMOTE_OPEN: { |
| pn_ssl_t *ssl = pn_ssl(pn_event_transport(e)); |
| CHECK(ssl); |
| char protocol[256]; |
| CHECK(pn_ssl_get_protocol_name(ssl, protocol, sizeof(protocol))); |
| CHECK_THAT(protocol, Contains("TLS")); |
| pn_connection_t *c = pn_event_connection(e); |
| if (pn_connection_state(c) & PN_LOCAL_ACTIVE) { |
| pn_connection_close(c); // Client closes on completion. |
| } else { |
| pn_connection_open(c); // Server returns the OPEN |
| } |
| return true; |
| } |
| default: |
| return common_handler::handle(e); |
| } |
| } |
| }; |
| |
| } // namespace |
| |
| /* Test various SSL connections between proactors*/ |
| TEST_CASE("proactor_ssl") { |
| if (!pn_ssl_present()) { |
| WARN("Skip SSL tests, not available"); |
| return; |
| } |
| |
| ssl_handler client(pn_ssl_domain(PN_SSL_MODE_CLIENT)); |
| ssl_handler server(pn_ssl_domain(PN_SSL_MODE_SERVER)); |
| CHECK(0 == SET_CREDENTIALS(server.ssl_domain, "tserver")); |
| proactor p; |
| common_handler listener(&server); // Use server for accepted connections |
| pn_listener_t *l = p.listen(":0", &listener); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| |
| /* Basic SSL connection */ |
| p.connect(l, &client); |
| /* Open ok at both ends */ |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| CHECK_THAT(*server.last_condition, cond_empty()); |
| CHECK_THAT(*client.last_condition, cond_empty()); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| |
| /* Verify peer with good hostname */ |
| pn_ssl_domain_t *cd = client.ssl_domain; |
| REQUIRE(0 == pn_ssl_domain_set_trusted_ca_db(cd, CERTIFICATE("tserver"))); |
| REQUIRE(0 == pn_ssl_domain_set_peer_authentication( |
| cd, PN_SSL_VERIFY_PEER_NAME, NULL)); |
| pn_connection_t *c = pn_connection(); |
| pn_connection_set_hostname(c, "test_server"); |
| p.connect(l, &client, c); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| CHECK_THAT(*server.last_condition, cond_empty()); |
| CHECK_THAT(*client.last_condition, cond_empty()); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); |
| |
| /* Verify peer with bad hostname */ |
| c = pn_connection(); |
| pn_connection_set_hostname(c, "wrongname"); |
| p.connect(l, &client, c); |
| REQUIRE_RUN(p, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*client.last_condition, |
| cond_matches("amqp:connection:framing-error", "SSL")); |
| } |
| |
| TEST_CASE("proactor_addr") { |
| /* Test the address formatter */ |
| CHECK(1 == pn_proactor_addr(NULL, 0, "", "")); |
| CHECK(7 == pn_proactor_addr(NULL, 0, "foo", "bar")); |
| |
| char addr[PN_MAX_ADDR]; |
| pn_proactor_addr(addr, sizeof(addr), "foo", "bar"); |
| CHECK_THAT("foo:bar", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), "foo", ""); |
| CHECK_THAT("foo:", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), "foo", NULL); |
| CHECK_THAT("foo:", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), "", "bar"); |
| CHECK_THAT(":bar", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), NULL, "bar"); |
| CHECK_THAT(":bar", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "5"); |
| CHECK_THAT("1:2:3:4:5", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", ""); |
| CHECK_THAT("1:2:3:4:", Equals(addr)); |
| pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", NULL); |
| CHECK_THAT("1:2:3:4:", Equals(addr)); |
| } |
| |
| /* Test pn_proactor_addr functions */ |
| |
| TEST_CASE("proactor_netaddr") { |
| common_handler h; |
| proactor p(&h); |
| /* Use IPv4 to get consistent results all platforms */ |
| pn_listener_t *l = p.listen("127.0.0.1:0"); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| pn_connection_t *c = p.connect(l); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); |
| |
| // client remote, client local, server remote and server |
| // local address strings |
| char cr[1024], cl[1024], sr[1024], sl[1024]; |
| |
| pn_transport_t *ct = pn_connection_transport(c); |
| const pn_netaddr_t *na = pn_transport_remote_addr(ct); |
| pn_netaddr_str(na, cr, sizeof(cr)); |
| CHECK_THAT(cr, Contains(listening_port(l))); |
| |
| pn_connection_t *s = h.connection; /* server side of the connection */ |
| |
| pn_transport_t *st = pn_connection_transport(s); |
| pn_netaddr_str(pn_transport_local_addr(st), sl, sizeof(sl)); |
| CHECK_THAT(cr, Equals(sl)); /* client remote == server local */ |
| |
| pn_netaddr_str(pn_transport_local_addr(ct), cl, sizeof(cl)); |
| pn_netaddr_str(pn_transport_remote_addr(st), sr, sizeof(sr)); |
| CHECK_THAT(cl, Equals(sr)); /* client local == server remote */ |
| |
| char host[PN_MAX_ADDR] = ""; |
| char serv[PN_MAX_ADDR] = ""; |
| CHECK(0 == pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv))); |
| CHECK_THAT("127.0.0.1", Equals(host)); |
| CHECK(listening_port(l) == serv); |
| |
| /* Make sure you can use NULL, 0 to get length of address |
| * string without a crash */ |
| size_t len = pn_netaddr_str(pn_transport_local_addr(ct), NULL, 0); |
| CHECK(strlen(cl) == len); |
| } |
| |
| TEST_CASE("proactor_parse_addr") { |
| char buf[1024]; |
| const char *host, *port; |
| |
| CHECK(0 == pni_parse_addr("foo:bar", buf, sizeof(buf), &host, &port)); |
| CHECK_THAT("foo", Equals(host)); |
| CHECK_THAT("bar", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr("foo:", buf, sizeof(buf), &host, &port)); |
| CHECK_THAT("foo", Equals(host)); |
| CHECK_THAT("5672", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr(":bar", buf, sizeof(buf), &host, &port)); |
| CHECK(NULL == host); |
| CHECK_THAT("bar", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr(":", buf, sizeof(buf), &host, &port)); |
| CHECK(NULL == host); |
| CHECK_THAT("5672", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr(":amqps", buf, sizeof(buf), &host, &port)); |
| CHECK_THAT("5671", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr(":amqp", buf, sizeof(buf), &host, &port)); |
| CHECK_THAT("5672", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr("::1:2:3", buf, sizeof(buf), &host, &port)); |
| CHECK_THAT("::1:2", Equals(host)); |
| CHECK_THAT("3", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr(":::", buf, sizeof(buf), &host, &port)); |
| CHECK_THAT("::", Equals(host)); |
| CHECK_THAT("5672", Equals(port)); |
| |
| CHECK(0 == pni_parse_addr("", buf, sizeof(buf), &host, &port)); |
| CHECK(NULL == host); |
| CHECK_THAT("5672", Equals(port)); |
| } |
| |
| /* Test pn_proactor_disconnect */ |
| TEST_CASE("proactor_disconnect") { |
| common_handler ch, sh; |
| proactor client(&ch), server(&sh); |
| |
| // Start two listeners on the server |
| pn_listener_t *l = server.listen(); |
| REQUIRE_RUN(server, PN_LISTENER_OPEN); |
| pn_listener_t *l2 = server.listen(); |
| REQUIRE_RUN(server, PN_LISTENER_OPEN); |
| |
| // Two connections from client |
| pn_connection_t *c = client.connect(l); |
| CHECK_CORUN(client, server, PN_CONNECTION_REMOTE_OPEN); |
| pn_connection_t *c2 = client.connect(l); |
| CHECK_CORUN(client, server, PN_CONNECTION_REMOTE_OPEN); |
| |
| /* Disconnect the client proactor */ |
| auto_free<pn_condition_t, pn_condition_free> cond(pn_condition()); |
| pn_condition_format(cond, "test-name", "test-description"); |
| pn_proactor_disconnect(client, cond); |
| |
| /* Verify expected client side first */ |
| CHECK_CORUN(client, server, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*client.handler->last_condition, |
| cond_matches("test-name", "test-description")); |
| CHECK_CORUN(client, server, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*client.handler->last_condition, |
| cond_matches("test-name", "test-description")); |
| REQUIRE_RUN(client, PN_PROACTOR_INACTIVE); |
| |
| /* Now check server sees the disconnects */ |
| CHECK_CORUN(server, client, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*server.handler->last_condition, |
| cond_matches("amqp:connection:framing-error", "aborted")); |
| CHECK_CORUN(server, client, PN_TRANSPORT_ERROR); |
| CHECK_THAT(*server.handler->last_condition, |
| cond_matches("amqp:connection:framing-error", "aborted")); |
| |
| /* Now disconnect the server end (the listeners) */ |
| pn_proactor_disconnect(server, NULL); |
| REQUIRE_RUN(server, PN_LISTENER_CLOSE); |
| REQUIRE_RUN(server, PN_LISTENER_CLOSE); |
| REQUIRE_RUN(server, PN_PROACTOR_INACTIVE); |
| |
| /* Make sure the proactors are still functional */ |
| pn_listener_t *l3 = server.listen(); |
| REQUIRE_RUN(server, PN_LISTENER_OPEN); |
| client.connect(l3); |
| CHECK_CORUN(client, server, PN_CONNECTION_REMOTE_OPEN); |
| } |
| |
| namespace { |
| const size_t FRAME = 512; /* Smallest legal frame */ |
| const ssize_t CHUNK = (FRAME + FRAME / 2); /* Chunk overflows frame */ |
| const size_t BODY = (CHUNK * 3 + CHUNK / 2); /* Body doesn't fit into chunks */ |
| } // namespace |
| |
| struct message_stream_handler : public common_handler { |
| pn_link_t *sender; |
| pn_delivery_t *dlv; |
| pn_rwbytes_t send_buf, recv_buf; |
| ssize_t size, sent, received; |
| bool complete; |
| |
| message_stream_handler() |
| : sender(), dlv(), send_buf(), recv_buf(), size(), sent(), received(), |
| complete() {} |
| |
| bool handle(pn_event_t *e) { |
| switch (pn_event_type(e)) { |
| case PN_CONNECTION_BOUND: |
| pn_transport_set_max_frame(pn_event_transport(e), FRAME); |
| return false; |
| |
| case PN_SESSION_INIT: |
| pn_session_set_incoming_capacity(pn_event_session(e), |
| FRAME); /* Single frame incoming */ |
| pn_session_set_outgoing_window(pn_event_session(e), |
| 1); /* Single frame outgoing */ |
| return false; |
| |
| case PN_LINK_REMOTE_OPEN: |
| common_handler::handle(e); |
| if (pn_link_is_receiver(pn_event_link(e))) { |
| pn_link_flow(pn_event_link(e), 1); |
| } else { |
| sender = pn_event_link(e); |
| } |
| return false; |
| |
| case PN_LINK_FLOW: /* Start a delivery */ |
| if (pn_link_is_sender(pn_event_link(e)) && !dlv) { |
| dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1)); |
| } |
| return false; |
| |
| case PN_CONNECTION_WAKE: { /* Send a chunk */ |
| ssize_t remains = size - sent; |
| ssize_t n = (CHUNK < remains) ? CHUNK : remains; |
| CHECK(n == pn_link_send(sender, send_buf.start + sent, n)); |
| sent += n; |
| if (sent == size) { |
| CHECK(pn_link_advance(sender)); |
| } |
| return false; |
| } |
| |
| case PN_DELIVERY: { /* Receive a delivery - smaller than a |
| chunk? */ |
| pn_delivery_t *dlv = pn_event_delivery(e); |
| if (pn_delivery_readable(dlv)) { |
| ssize_t n = pn_delivery_pending(dlv); |
| rwbytes_ensure(&recv_buf, received + n); |
| REQUIRE(n == |
| pn_link_recv(pn_event_link(e), recv_buf.start + received, n)); |
| received += n; |
| } |
| complete = !pn_delivery_partial(dlv); |
| return true; |
| } |
| default: |
| return common_handler::handle(e); |
| } |
| } |
| }; |
| |
| /* Test sending/receiving a message in chunks */ |
| TEST_CASE("proactor_message_stream") { |
| message_stream_handler h; |
| proactor p(&h); |
| |
| pn_listener_t *l = p.listen(); |
| REQUIRE_RUN(p, PN_LISTENER_OPEN); |
| |
| /* Encode a large (not very) message to send in chunks */ |
| auto_free<pn_message_t, pn_message_free> m(pn_message()); |
| pn_data_put_binary(pn_message_body(m), pn_bytes(std::string(BODY, 'x'))); |
| h.size = pn_message_encode2(m, &h.send_buf); |
| |
| pn_connection_t *c = p.connect(l); |
| pn_session_t *ssn = pn_session(c); |
| pn_session_open(ssn); |
| pn_link_t *snd = pn_sender(ssn, "x"); |
| pn_link_open(snd); |
| REQUIRE_RUN(p, PN_LINK_FLOW); |
| |
| /* Send and receive the message in chunks */ |
| do { |
| pn_connection_wake(c); /* Initiate send/receive of one chunk */ |
| do { /* May be multiple receives for one send */ |
| REQUIRE_RUN(p, PN_DELIVERY); |
| } while (h.received < h.sent); |
| } while (!h.complete); |
| CHECK(h.received == h.size); |
| CHECK(h.sent == h.size); |
| CHECK(!memcmp(h.send_buf.start, h.recv_buf.start, h.size)); |
| |
| free(h.send_buf.start); |
| free(h.recv_buf.start); |
| } |