| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include <proton/connection.h> |
| #include <proton/object.h> |
| #include <proton/sasl.h> |
| #include <proton/ssl.h> |
| #include <proton/transport.h> |
| #include <proton/url.h> |
| #include <assert.h> |
| #include <stdio.h> |
| #include <string.h> |
| #include "selectable.h" |
| #include "reactor.h" |
| |
| // XXX: overloaded for both directions |
| PN_HANDLE(PN_TRANCTX) |
| PN_HANDLE(PNI_CONN_PEER_ADDRESS) |
| |
| void pni_reactor_set_connection_peer_address(pn_connection_t *connection, |
| const char *host, |
| const char *port) |
| { |
| pn_url_t *url = pn_url(); |
| pn_url_set_host(url, host); |
| pn_url_set_port(url, port); |
| pn_record_t *record = pn_connection_attachments(connection); |
| if (!pn_record_has(record, PNI_CONN_PEER_ADDRESS)) { |
| pn_record_def(record, PNI_CONN_PEER_ADDRESS, PN_OBJECT); |
| } |
| pn_record_set(record, PNI_CONN_PEER_ADDRESS, url); |
| pn_decref(url); |
| } |
| |
| static pn_transport_t *pni_transport(pn_selectable_t *sel) { |
| pn_record_t *record = pn_selectable_attachments(sel); |
| return (pn_transport_t *) pn_record_get(record, PN_TRANCTX); |
| } |
| |
| static ssize_t pni_connection_capacity(pn_selectable_t *sel) |
| { |
| pn_transport_t *transport = pni_transport(sel); |
| ssize_t capacity = pn_transport_capacity(transport); |
| if (capacity < 0) { |
| if (pn_transport_closed(transport)) { |
| pn_selectable_terminate(sel); |
| } |
| } |
| return capacity; |
| } |
| |
| static ssize_t pni_connection_pending(pn_selectable_t *sel) |
| { |
| pn_transport_t *transport = pni_transport(sel); |
| ssize_t pending = pn_transport_pending(transport); |
| if (pending < 0) { |
| if (pn_transport_closed(transport)) { |
| pn_selectable_terminate(sel); |
| } |
| } |
| return pending; |
| } |
| |
| static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel) |
| { |
| pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); |
| pn_transport_t *transport = pni_transport(sel); |
| pn_timestamp_t deadline = pn_transport_tick(transport, pn_reactor_now(reactor)); |
| return deadline; |
| } |
| |
| static void pni_connection_update(pn_selectable_t *sel) { |
| ssize_t c = pni_connection_capacity(sel); |
| ssize_t p = pni_connection_pending(sel); |
| pn_selectable_set_reading(sel, c > 0); |
| pn_selectable_set_writing(sel, p > 0); |
| pn_selectable_set_deadline(sel, pni_connection_deadline(sel)); |
| } |
| |
| void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) { |
| assert(reactor); |
| pn_transport_t *transport = pn_event_transport(event); |
| pn_record_t *record = pn_transport_attachments(transport); |
| pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX); |
| if (sel && !pn_selectable_is_terminal(sel)) { |
| pni_connection_update(sel); |
| pn_reactor_update(reactor, sel); |
| } |
| } |
| |
| pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport); |
| |
| void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) { |
| assert(reactor); |
| assert(event); |
| |
| pn_connection_t *conn = pn_event_connection(event); |
| if (!(pn_connection_state(conn) & PN_REMOTE_UNINIT)) { |
| return; |
| } |
| |
| pn_transport_t *transport = pn_transport(); |
| pn_transport_bind(transport, conn); |
| pn_decref(transport); |
| } |
| |
| void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) { |
| assert(reactor); |
| assert(event); |
| |
| pn_connection_t *conn = pn_event_connection(event); |
| pn_transport_t *transport = pn_event_transport(event); |
| pn_record_t *record = pn_connection_attachments(conn); |
| pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_PEER_ADDRESS); |
| const char *host = NULL; |
| const char *port = "5672"; |
| pn_string_t *str = NULL; |
| |
| // link the new transport to its reactor: |
| pni_record_init_reactor(pn_transport_attachments(transport), reactor); |
| |
| if (pn_connection_acceptor(conn) != NULL) { |
| // this connection was created by the acceptor. There is already a |
| // socket assigned to this connection. Nothing needs to be done. |
| return; |
| } |
| |
| if (url) { |
| host = pn_url_get_host(url); |
| const char *uport = pn_url_get_port(url); |
| if (uport) { |
| port = uport; |
| } else { |
| const char *scheme = pn_url_get_scheme(url); |
| if (scheme && strcmp(scheme, "amqps") == 0) { |
| port = "5671"; |
| } |
| } |
| if (!pn_connection_get_user(conn)) { |
| // user did not manually set auth info |
| const char *user = pn_url_get_username(url); |
| if (user) pn_connection_set_user(conn, user); |
| const char *passwd = pn_url_get_password(url); |
| if (passwd) pn_connection_set_password(conn, passwd); |
| } |
| } else { |
| // for backward compatibility, see if the connection's hostname can be |
| // used for the remote address. See JIRA PROTON-1133 |
| const char *hostname = pn_connection_get_hostname(conn); |
| if (hostname) { |
| str = pn_string(hostname); |
| char *h = pn_string_buffer(str); |
| // see if a port has been included in the hostname. This is not |
| // allowed by the spec, but the old reactor interface allowed it. |
| char *colon = strrchr(h, ':'); |
| if (colon) { |
| *colon = '\0'; |
| port = colon + 1; |
| } |
| host = h; |
| } |
| } |
| |
| if (!host) { |
| // error: no address configured |
| pn_condition_t *cond = pn_transport_condition(transport); |
| pn_condition_set_name(cond, "proton:io"); |
| pn_condition_set_description(cond, "Connection failed: no address configured"); |
| pn_transport_close_tail(transport); |
| pn_transport_close_head(transport); |
| } else { |
| pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port); |
| // invalid sockets are ignored by poll, so we need to do this manualy |
| if (sock == PN_INVALID_SOCKET) { |
| pn_condition_t *cond = pn_transport_condition(transport); |
| pn_condition_set_name(cond, "proton:io"); |
| pn_condition_set_description(cond, pn_error_text(pn_io_error(pn_reactor_io(reactor)))); |
| pn_transport_close_tail(transport); |
| pn_transport_close_head(transport); |
| } else { |
| pn_reactor_selectable_transport(reactor, sock, transport); |
| } |
| } |
| pn_free(str); |
| } |
| |
| void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) { |
| assert(reactor); |
| assert(event); |
| pn_connection_t *conn = pn_event_connection(event); |
| pn_list_remove(pn_reactor_children(reactor), conn); |
| } |
| |
| static void pni_connection_readable(pn_selectable_t *sel) |
| { |
| pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); |
| pn_transport_t *transport = pni_transport(sel); |
| ssize_t capacity = pn_transport_capacity(transport); |
| if (capacity > 0) { |
| ssize_t n = pn_recv(pn_reactor_io(reactor), pn_selectable_get_fd(sel), |
| pn_transport_tail(transport), capacity); |
| if (n <= 0) { |
| if (n == 0 || !pn_wouldblock(pn_reactor_io(reactor))) { |
| if (n < 0) { |
| pn_condition_t *cond = pn_transport_condition(transport); |
| pn_condition_set_name(cond, "proton:io"); |
| pn_condition_set_description(cond, pn_error_text(pn_io_error(pn_reactor_io(reactor)))); |
| } |
| pn_transport_close_tail(transport); |
| } |
| } else { |
| pn_transport_process(transport, (size_t)n); |
| } |
| } |
| |
| ssize_t newcap = pn_transport_capacity(transport); |
| //occasionally transport events aren't generated when expected, so |
| //the following hack ensures we always update the selector |
| if (1 || newcap != capacity) { |
| pni_connection_update(sel); |
| pn_reactor_update(reactor, sel); |
| } |
| } |
| |
| static void pni_connection_writable(pn_selectable_t *sel) |
| { |
| pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); |
| pn_transport_t *transport = pni_transport(sel); |
| ssize_t pending = pn_transport_pending(transport); |
| if (pending > 0) { |
| ssize_t n = pn_send(pn_reactor_io(reactor), pn_selectable_get_fd(sel), |
| pn_transport_head(transport), pending); |
| if (n < 0) { |
| if (!pn_wouldblock(pn_reactor_io(reactor))) { |
| pn_condition_t *cond = pn_transport_condition(transport); |
| if (!pn_condition_is_set(cond)) { |
| pn_condition_set_name(cond, "proton:io"); |
| pn_condition_set_description(cond, pn_error_text(pn_io_error(pn_reactor_io(reactor)))); |
| } |
| pn_transport_close_head(transport); |
| } |
| } else { |
| pn_transport_pop(transport, n); |
| } |
| } |
| |
| ssize_t newpending = pn_transport_pending(transport); |
| if (newpending != pending) { |
| pni_connection_update(sel); |
| pn_reactor_update(reactor, sel); |
| } |
| } |
| |
| static void pni_connection_error(pn_selectable_t *sel) { |
| pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); |
| pn_transport_t *transport = pni_transport(sel); |
| pn_transport_close_head(transport); |
| pn_transport_close_tail(transport); |
| pn_selectable_terminate(sel); |
| pn_reactor_update(reactor, sel); |
| } |
| |
| static void pni_connection_expired(pn_selectable_t *sel) { |
| pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); |
| pn_transport_t *transport = pni_transport(sel); |
| pn_timestamp_t deadline = pn_transport_tick(transport, pn_reactor_now(reactor)); |
| pn_selectable_set_deadline(sel, deadline); |
| ssize_t c = pni_connection_capacity(sel); |
| ssize_t p = pni_connection_pending(sel); |
| pn_selectable_set_reading(sel, c > 0); |
| pn_selectable_set_writing(sel, p > 0); |
| pn_reactor_update(reactor, sel); |
| } |
| |
| static void pni_connection_finalize(pn_selectable_t *sel) { |
| pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); |
| pn_transport_t *transport = pni_transport(sel); |
| pn_record_t *record = pn_transport_attachments(transport); |
| pn_record_set(record, PN_TRANCTX, NULL); |
| pn_socket_t fd = pn_selectable_get_fd(sel); |
| pn_close(pn_reactor_io(reactor), fd); |
| } |
| |
| pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) { |
| pn_selectable_t *sel = pn_reactor_selectable(reactor); |
| pn_selectable_set_fd(sel, sock); |
| pn_selectable_on_readable(sel, pni_connection_readable); |
| pn_selectable_on_writable(sel, pni_connection_writable); |
| pn_selectable_on_error(sel, pni_connection_error); |
| pn_selectable_on_expired(sel, pni_connection_expired); |
| pn_selectable_on_finalize(sel, pni_connection_finalize); |
| pn_record_t *record = pn_selectable_attachments(sel); |
| pn_record_def(record, PN_TRANCTX, PN_OBJECT); |
| pn_record_set(record, PN_TRANCTX, transport); |
| pn_record_t *tr = pn_transport_attachments(transport); |
| pn_record_def(tr, PN_TRANCTX, PN_WEAKREF); |
| pn_record_set(tr, PN_TRANCTX, sel); |
| pni_connection_update(sel); |
| pn_reactor_update(reactor, sel); |
| return sel; |
| } |
| |
| pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler) { |
| assert(reactor); |
| pn_connection_t *connection = pn_connection(); |
| pn_record_t *record = pn_connection_attachments(connection); |
| pn_record_set_handler(record, handler); |
| pn_connection_collect(connection, pn_reactor_collector(reactor)); |
| pn_list_add(pn_reactor_children(reactor), connection); |
| pni_record_init_reactor(record, reactor); |
| pn_decref(connection); |
| return connection; |
| } |
| |
| pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor, |
| const char *host, |
| const char *port, |
| pn_handler_t *handler) { |
| pn_connection_t *connection = pn_reactor_connection(reactor, handler); |
| pn_reactor_set_connection_host(reactor, connection, host, port); |
| return connection; |
| } |
| |
| |
| void pn_reactor_set_connection_host(pn_reactor_t *reactor, |
| pn_connection_t *connection, |
| const char *host, |
| const char *port) |
| { |
| (void)reactor; // ignored |
| if (pn_connection_acceptor(connection) != NULL) { |
| // this is an inbound connection created by the acceptor. The peer |
| // address cannot be modified. |
| return; |
| } |
| pni_reactor_set_connection_peer_address(connection, host, port); |
| } |
| |
| |
| const char *pn_reactor_get_connection_address(pn_reactor_t *reactor, |
| pn_connection_t *connection) |
| { |
| (void)reactor; // ignored |
| if (!connection) return NULL; |
| pn_record_t *record = pn_connection_attachments(connection); |
| pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_PEER_ADDRESS); |
| if (url) { |
| return pn_url_str(url); |
| } |
| return NULL; |
| } |