blob: fce2219272dac0fb162bad0ce999abd8efeecc91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <proton/condition.h>
#include <proton/listener.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>
#include "qpid/dispatch/alloc_pool.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "delivery.h"
#include "tcp_adaptor.h"
#include <stdio.h>
#include <inttypes.h>
ALLOC_DEFINE(qd_tcp_listener_t);
ALLOC_DEFINE(qd_tcp_connector_t);
#define READ_BUFFERS 4
#define WRITE_BUFFERS 4
typedef struct qdr_tcp_connection_t qdr_tcp_connection_t;
struct qdr_tcp_connection_t {
qd_handler_context_t context;
char *reply_to;
qdr_connection_t *qdr_conn;
uint64_t conn_id;
qdr_link_t *incoming;
uint64_t incoming_id;
qdr_link_t *outgoing;
uint64_t outgoing_id;
pn_raw_connection_t *pn_raw_conn;
sys_mutex_t *activation_lock;
qdr_delivery_t *instream;
qdr_delivery_t *outstream;
bool ingress;
bool flow_enabled;
bool egress_dispatcher;
bool connector_closed;//only used if egress_dispatcher=true
bool in_list; // This connection is in the adaptor's connections list
qdr_delivery_t *initial_delivery;
qd_timer_t *activate_timer;
qd_bridge_config_t config;
qd_server_t *server;
char *remote_address;
char *global_id;
uint64_t bytes_in;
uint64_t bytes_out;
uint64_t opened_time;
uint64_t last_in_time;
uint64_t last_out_time;
qd_message_stream_data_t *outgoing_stream_data; // current segment
size_t outgoing_body_bytes; // bytes received from current segment
int outgoing_body_offset; // buffer offset into current segment
pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS];
int outgoing_buff_count; // number of buffers with data
int outgoing_buff_idx; // first buffer with data
DEQ_LINKS(qdr_tcp_connection_t);
};
DEQ_DECLARE(qdr_tcp_connection_t, qdr_tcp_connection_list_t);
ALLOC_DECLARE(qdr_tcp_connection_t);
ALLOC_DEFINE(qdr_tcp_connection_t);
typedef struct qdr_tcp_adaptor_t {
qdr_core_t *core;
qdr_protocol_adaptor_t *adaptor;
qd_tcp_listener_list_t listeners;
qd_tcp_connector_list_t connectors;
qdr_tcp_connection_list_t connections;
qd_log_source_t *log_source;
} qdr_tcp_adaptor_t;
static qdr_tcp_adaptor_t *tcp_adaptor;
static void qdr_add_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void handle_disconnected(qdr_tcp_connection_t* conn);
static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn)
{
assert(conn);
return conn->instream ? conn->incoming_id : conn->outgoing_id;
}
static void on_activate(void *context)
{
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
if (conn->egress_dispatcher && conn->connector_closed) {
qdr_connection_closed(conn->qdr_conn);
qdr_connection_set_context(conn->qdr_conn, 0);
free_qdr_tcp_connection(conn);
}
}
static void grant_read_buffers(qdr_tcp_connection_t *conn)
{
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
// Give proactor more read buffers for the socket
if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted %zu read buffers", conn->conn_id, desired);
while (desired) {
size_t i;
for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
qd_buffer_t *buf = qd_buffer();
raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
raw_buffers[i].capacity = qd_buffer_capacity(buf);
raw_buffers[i].size = 0;
raw_buffers[i].offset = 0;
raw_buffers[i].context = (uintptr_t) buf;
}
desired -= i;
pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
}
}
}
static int handle_incoming(qdr_tcp_connection_t *conn)
{
//
// Don't initiate an ingress stream message if we don't yet have a reply-to address and credit.
//
if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn) && !conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) {
if (conn->ingress && !conn->reply_to) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", conn->conn_id, conn->outgoing_id);
}
if (!conn->flow_enabled) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for credit to initiate message", conn->conn_id, conn->outgoing_id);
}
return 0;
}
qd_buffer_list_t buffers;
DEQ_INIT(buffers);
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
size_t n;
int count = 0;
int free_count = 0;
while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
qd_buffer_insert(buf, raw_buffers[i].size);
count += raw_buffers[i].size;
if (raw_buffers[i].size > 0) {
DEQ_INSERT_TAIL(buffers, buf);
} else {
qd_buffer_free(buf);
free_count++;
}
}
}
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read buffers", conn->conn_id, DEQ_SIZE(buffers));
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count);
grant_read_buffers(conn);
if (conn->instream) {
qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers);
qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
} else {
qd_message_t *msg = qd_message();
qd_message_set_stream_annotation(msg, true);
qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(props);
qd_compose_insert_null(props); // message-id
qd_compose_insert_null(props); // user-id
if (conn->ingress) {
qd_compose_insert_string(props, conn->config.address); // to
qd_compose_insert_string(props, conn->global_id); // subject
qd_compose_insert_string(props, conn->reply_to); // reply-to
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating ingress to: %s reply: %s", conn->conn_id, conn->incoming_id, conn->config.address, conn->reply_to);
} else {
qd_compose_insert_string(props, conn->reply_to); // to
qd_compose_insert_string(props, conn->global_id); // subject
qd_compose_insert_null(props); // reply-to
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating egress to: %s", conn->conn_id, conn->incoming_id, conn->reply_to);
}
//qd_compose_insert_null(props); // correlation-id
//qd_compose_insert_null(props); // content-type
//qd_compose_insert_null(props); // content-encoding
//qd_compose_insert_timestamp(props, 0); // absolute-expiry-time
//qd_compose_insert_timestamp(props, 0); // creation-time
//qd_compose_insert_null(props); // group-id
//qd_compose_insert_uint(props, 0); // group-sequence
//qd_compose_insert_null(props); // reply-to-group-id
qd_compose_end_list(props);
if (count > 0) {
props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props);
qd_compose_insert_binary_buffers(props, &buffers);
}
qd_message_compose_2(msg, props, false);
qd_compose_free(props);
conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count);
}
return count;
}
static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
{
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc);
free(tc->reply_to);
free(tc->remote_address);
free(tc->global_id);
if (tc->activate_timer) {
qd_timer_free(tc->activate_timer);
}
if (tc->outgoing_stream_data) {
free_qd_message_stream_data_t(tc->outgoing_stream_data);
}
sys_mutex_free(tc->activation_lock);
//proactor will free the socket
free_qdr_tcp_connection_t(tc);
}
static void handle_disconnected(qdr_tcp_connection_t* conn)
{
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_disconnected", conn->conn_id);
if (conn->instream) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id);
qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream");
}
if (conn->outstream) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id);
qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream");
}
if (conn->incoming) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id);
qdr_link_detach(conn->incoming, QD_LOST, 0);
}
if (conn->outgoing) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", conn->conn_id, conn->outgoing_id);
qdr_link_detach(conn->outgoing, QD_LOST, 0);
}
if (conn->qdr_conn) {
qdr_connection_closed(conn->qdr_conn);
qdr_connection_set_context(conn->qdr_conn, 0);
}
if (conn->initial_delivery) {
qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, false);
qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery");
conn->initial_delivery = 0;
}
//need to free on core thread to avoid deleting while in use by management agent
qdr_action_t *action = qdr_action(qdr_del_tcp_connection_CT, "delete_tcp_connection");
action->args.general.context_1 = conn;
qdr_action_enqueue(tcp_adaptor->core, action);
}
static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count)
{
int used = 0;
// Advance to next stream_data vbin segment if necessary.
// Return early if no data to process or error
if (conn->outgoing_stream_data == 0) {
qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(msg, &conn->outgoing_stream_data);
if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
// a new stream_data segment has been found
conn->outgoing_body_bytes = 0;
conn->outgoing_body_offset = 0;
// continue to process this segment
} else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
return 0;
} else {
switch (stream_data_result) {
case QD_MESSAGE_STREAM_DATA_NO_MORE:
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] EOS", conn->conn_id); break;
case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body data for streaming message", conn->conn_id); break;
default:
break;
}
qd_message_set_send_complete(msg);
return -1;
}
}
// A valid stream_data is in place.
// Try to get a buffer set from it.
used = qd_message_stream_data_buffers(conn->outgoing_stream_data, buffers, conn->outgoing_body_offset, count);
if (used > 0) {
// Accumulate the lengths of the returned buffers.
for (int i=0; i<used; i++) {
conn->outgoing_body_bytes += buffers[i].size;
}
// Buffers returned should never exceed the stream_data payload length
assert(conn->outgoing_body_bytes <= conn->outgoing_stream_data->payload.length);
if (conn->outgoing_body_bytes == conn->outgoing_stream_data->payload.length) {
// This buffer set consumes the remainder of the stream_data segment.
// Attach the stream_data struct to the last buffer so that the struct
// can be freed after the buffer has been transmitted by raw connection out.
buffers[used-1].context = (uintptr_t) conn->outgoing_stream_data;
// Erase the stream_data struct from the connection so that
// a new one gets created on the next pass.
conn->outgoing_stream_data = 0;
} else {
// Returned buffer set did not consume the entire stream_data segment.
// Leave existing stream_data struct in place for use on next pass.
// Add the number of returned buffers to the offset for the next pass.
conn->outgoing_body_offset += used;
}
} else {
// No buffers returned.
// This sender has caught up with all data available on the input stream.
}
return used;
}
static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
{
// Send the outgoing buffs to pn_raw_conn.
// Return true if all the buffers went out.
bool result;
if (conn->outgoing_buff_count == 0) {
result = true;
} else {
size_t used = pn_raw_connection_write_buffers(conn->pn_raw_conn,
&conn->outgoing_buffs[conn->outgoing_buff_idx],
conn->outgoing_buff_count);
result = used == conn->outgoing_buff_count;
int bytes_written = 0;
for (size_t i = 0; i < used; i++) {
if (conn->outgoing_buffs[conn->outgoing_buff_idx + i].bytes) {
bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx + i].size;
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
"[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", conn->conn_id, i+1, used);
}
}
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
conn->outgoing_buff_count -= used;
conn->outgoing_buff_idx += used;
}
return result;
}
static void handle_outgoing(qdr_tcp_connection_t *conn)
{
if (conn->outstream) {
qd_message_t *msg = qdr_delivery_message(conn->outstream);
bool read_more_body = true;
if (conn->outgoing_buff_count > 0) {
// flush outgoing buffs that hold body data waiting to go out
read_more_body = write_outgoing_buffs(conn);
}
while (read_more_body) {
ZERO(conn->outgoing_buffs);
conn->outgoing_buff_idx = 0;
conn->outgoing_buff_count = read_message_body(conn, msg, conn->outgoing_buffs, WRITE_BUFFERS);
if (conn->outgoing_buff_count > 0) {
// Send the data just returned
read_more_body = write_outgoing_buffs(conn);
} else {
// The incoming stream has no new data to send
break;
}
}
if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
pn_raw_connection_close(conn->pn_raw_conn);
}
}
}
static char *get_global_id(char *site_id, char *host_port)
{
int len1 = strlen(host_port);
int len = site_id ? len1 + strlen(site_id) + 2 : len1 + 1;
char *result = malloc(len);
strcpy(result, host_port);
if (site_id) {
result[len1] = '@';
strcpy(result+len1+1, site_id);
}
return result;
}
static char *get_address_string(pn_raw_connection_t *socket)
{
const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(socket);
char buffer[1024];
int len = pn_netaddr_str(netaddr, buffer, 1024);
if (len <= 1024) {
return strdup(buffer);
} else {
return strndup(buffer, 1024);
}
}
static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
{
tc->remote_address = get_address_string(tc->pn_raw_conn);
tc->global_id = get_global_id(tc->config.site_id, tc->remote_address);
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
false, //bool is_authenticated,
true, //bool opened,
"", //char *sasl_mechanisms,
QD_INCOMING, //qd_direction_t dir,
tc->remote_address, //const char *host,
"", //const char *ssl_proto,
"", //const char *ssl_cipher,
"", //const char *user,
"TcpAdaptor", //const char *container,
0, //pn_data_t *connection_properties,
0, //int ssl_ssf,
false, //bool ssl,
"", // peer router version,
false); // streaming links
tc->conn_id = qd_server_allocate_connection_id(tc->server);
qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
tcp_adaptor->adaptor,
true, // incoming
QDR_ROLE_NORMAL, // role
1, // cost
tc->conn_id, // management_id
0, // label
0, // remote_container_id
false, // strip_annotations_in
false, // strip_annotations_out
250, // link_capacity
0, // vhost
0, // policy_spec
info, // connection_info
0, // context_binder
0); // bind_token
tc->qdr_conn = conn;
qdr_connection_set_context(conn, tc);
qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_set_address(target, tc->config.address);
tc->outgoing = qdr_link_first_attach(conn,
QD_OUTGOING,
dynamic_source, //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target,
"tcp.ingress.out", //const char *name,
0, //const char *terminus_addr,
false,
NULL,
&(tc->outgoing_id));
qdr_link_set_context(tc->outgoing, tc);
tc->incoming = qdr_link_first_attach(conn,
QD_INCOMING,
qdr_terminus(0), //qdr_terminus_t *source,
target, //qdr_terminus_t *target,
"tcp.ingress.in", //const char *name,
0, //const char *terminus_addr,
false,
NULL,
&(tc->incoming_id));
tc->opened_time = tcp_adaptor->core->uptime_ticks;
qdr_link_set_context(tc->incoming, tc);
grant_read_buffers(tc);
qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection");
action->args.general.context_1 = tc;
qdr_action_enqueue(tcp_adaptor->core, action);
}
static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context)
{
qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) context;
qd_log_source_t *log = tcp_adaptor->log_source;
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
if (conn->ingress) {
qdr_tcp_connection_ingress_accept(conn);
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, conn->config.host_port, conn->remote_address, conn->global_id);
break;
} else {
conn->remote_address = get_address_string(conn->pn_raw_conn);
conn->opened_time = tcp_adaptor->core->uptime_ticks;
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", conn->conn_id, conn->remote_address);
if (!!conn->initial_delivery) {
qdr_tcp_open_server_side_connection(conn);
}
while (qdr_connection_process(conn->qdr_conn)) {}
handle_outgoing(conn);
break;
}
}
case PN_RAW_CONNECTION_CLOSED_READ: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
pn_raw_connection_close(conn->pn_raw_conn);
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
pn_raw_connection_close(conn->pn_raw_conn);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
sys_mutex_lock(conn->activation_lock);
conn->pn_raw_conn = 0;
sys_mutex_unlock(conn->activation_lock);
handle_disconnected(conn);
break;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_WRITE_BUFFERS", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
handle_outgoing(conn);
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_READ_BUFFERS", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
handle_incoming(conn);
break;
}
case PN_RAW_CONNECTION_WAKE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_READ: {
int read = handle_incoming(conn);
conn->last_in_time = tcp_adaptor->core->uptime_ticks;
conn->bytes_in += read;
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes. Total read %"PRIu64" bytes", conn->conn_id, read, conn->bytes_in);
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_WRITTEN: {
pn_raw_buffer_t buffs[WRITE_BUFFERS];
size_t n;
size_t written = 0;
while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, buffs, WRITE_BUFFERS)) ) {
for (size_t i = 0; i < n; ++i) {
written += buffs[i].size;
if (buffs[i].context) {
qd_message_stream_data_release((qd_message_stream_data_t*) buffs[i].context);
}
}
}
conn->last_out_time = tcp_adaptor->core->uptime_ticks;
conn->bytes_out += written;
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN Wrote %zu bytes. Total written %"PRIu64" bytes", conn->conn_id, written, conn->bytes_out);
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
default:
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Unexpected Event: %d", conn->conn_id, pn_event_type(e));
break;
}
}
static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener)
{
qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t();
ZERO(tc);
tc->activation_lock = sys_mutex();
tc->ingress = true;
tc->context.context = tc;
tc->context.handler = &handle_connection_event;
tc->config = listener->config;
tc->server = listener->server;
tc->pn_raw_conn = pn_raw_connection();
pn_raw_connection_set_context(tc->pn_raw_conn, tc);
//the following call will cause a PN_RAW_CONNECTION_CONNECTED
//event on another thread, which is where the rest of the
//initialisation will happen, through a call to
//qdr_tcp_connection_ingress_accept
pn_listener_raw_accept(listener->pn_listener, tc->pn_raw_conn);
return tc;
}
static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
{
const char *host = tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port;
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening server-side core connection %s", tc->conn_id, host);
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
false, //bool is_authenticated,
true, //bool opened,
"", //char *sasl_mechanisms,
QD_OUTGOING, //qd_direction_t dir,
host, //const char *host,
"", //const char *ssl_proto,
"", //const char *ssl_cipher,
"", //const char *user,
"TcpAdaptor", //const char *container,
0, //pn_data_t *connection_properties,
0, //int ssl_ssf,
false, //bool ssl,
"", // peer router version,
false); // streaming links
qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
tcp_adaptor->adaptor,
false, // incoming
QDR_ROLE_NORMAL, // role
1, // cost
tc->conn_id, // management_id
0, // label
0, // remote_container_id
false, // strip_annotations_in
false, // strip_annotations_out
250, // link_capacity
0, // vhost
0, // policy_spec
info, // connection_info
0, // context_binder
0); // bind_token
tc->qdr_conn = conn;
qdr_connection_set_context(conn, tc);
qdr_terminus_t *source = qdr_terminus(0);
qdr_terminus_set_address(source, tc->config.address);
// This attach passes the ownership of the delivery from the core-side connection and link
// to the adaptor-side outgoing connection and link.
tc->outgoing = qdr_link_first_attach(conn,
QD_OUTGOING,
source, //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target,
"tcp.egress.out", //const char *name,
0, //const char *terminus_addr,
!(tc->egress_dispatcher),
tc->initial_delivery,
&(tc->outgoing_id));
if (!!tc->initial_delivery) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" initial_delivery ownership passed to "DLV_FMT,
DLV_ARGS(tc->initial_delivery), tc->outgoing->conn_id, tc->outgoing->identity, tc->initial_delivery->delivery_id);
qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link");
tc->initial_delivery = 0;
}
qdr_link_set_context(tc->outgoing, tc);
}
static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
{
qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t();
ZERO(tc);
tc->activation_lock = sys_mutex();
if (initial_delivery) {
tc->egress_dispatcher = false;
tc->initial_delivery = initial_delivery;
qdr_delivery_incref(initial_delivery, "qdr_tcp_connection_egress - held initial delivery");
} else {
tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
tc->egress_dispatcher = true;
}
tc->ingress = false;
tc->context.context = tc;
tc->context.handler = &handle_connection_event;
tc->config = *config;
tc->server = server;
tc->conn_id = qd_server_allocate_connection_id(tc->server);
//
// If this is the egress dispatcher, set up the core connection now. Otherwise, set up a physical
// raw connection and wait until we are running in that connection's context to set up the core
// connection.
//
if (tc->egress_dispatcher)
qdr_tcp_open_server_side_connection(tc);
else {
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", tc->conn_id, tc->config.host_port);
tc->pn_raw_conn = pn_raw_connection();
pn_raw_connection_set_context(tc->pn_raw_conn, tc);
pn_proactor_raw_connect(qd_server_proactor(tc->server), tc->pn_raw_conn, tc->config.host_port);
}
return tc;
}
static void free_bridge_config(qd_bridge_config_t *config)
{
if (!config) return;
free(config->name);
free(config->address);
free(config->host);
free(config->port);
free(config->site_id);
free(config->host_port);
}
#define CHECK() if (qd_error_code()) goto error
static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity, bool is_listener)
{
qd_error_clear();
ZERO(config);
config->name = qd_entity_get_string(entity, "name"); CHECK();
config->address = qd_entity_get_string(entity, "address"); CHECK();
config->host = qd_entity_get_string(entity, "host"); CHECK();
config->port = qd_entity_get_string(entity, "port"); CHECK();
config->site_id = qd_entity_opt_string(entity, "siteId", 0); CHECK();
int hplen = strlen(config->host) + strlen(config->port) + 2;
config->host_port = malloc(hplen);
snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
return QD_ERROR_NONE;
error:
free_bridge_config(config);
return qd_error_code();
}
static void log_tcp_bridge_config(qd_log_source_t *log, qd_bridge_config_t *c, const char *what) {
qd_log(log, QD_LOG_INFO, "Configured %s for %s, %s:%s", what, c->address, c->host, c->port);
}
void qd_tcp_listener_decref(qd_tcp_listener_t* li)
{
if (li && sys_atomic_dec(&li->ref_count) == 1) {
free_bridge_config(&li->config);
free_qd_tcp_listener_t(li);
}
}
static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *context) {
qd_log_source_t *log = tcp_adaptor->log_source;
qd_tcp_listener_t *li = (qd_tcp_listener_t*) context;
const char *host_port = li->config.host_port;
switch (pn_event_type(e)) {
case PN_LISTENER_OPEN: {
qd_log(log, QD_LOG_NOTICE, "PN_LISTENER_OPEN Listening on %s", host_port);
break;
}
case PN_LISTENER_ACCEPT: {
qd_log(log, QD_LOG_INFO, "PN_LISTENER_ACCEPT Accepting TCP connection on %s", host_port);
qdr_tcp_connection_ingress(li);
break;
}
case PN_LISTENER_CLOSE:
if (li->pn_listener) {
pn_condition_t *cond = pn_listener_condition(li->pn_listener);
if (pn_condition_is_set(cond)) {
qd_log(log, QD_LOG_ERROR, "PN_LISTENER_CLOSE Listener error on %s: %s (%s)", host_port,
pn_condition_get_description(cond),
pn_condition_get_name(cond));
} else {
qd_log(log, QD_LOG_TRACE, "PN_LISTENER_CLOSE Listener closed on %s", host_port);
}
pn_listener_set_context(li->pn_listener, 0);
li->pn_listener = 0;
qd_tcp_listener_decref(li);
}
break;
default:
break;
}
}
static qd_tcp_listener_t *qd_tcp_listener(qd_server_t *server)
{
qd_tcp_listener_t *li = new_qd_tcp_listener_t();
if (!li) return 0;
ZERO(li);
sys_atomic_init(&li->ref_count, 1);
li->server = server;
li->context.context = li;
li->context.handler = &handle_listener_event;
return li;
}
static const int BACKLOG = 50; /* Listening backlog */
static bool tcp_listener_listen(qd_tcp_listener_t *li) {
li->pn_listener = pn_listener();
if (li->pn_listener) {
pn_listener_set_context(li->pn_listener, &li->context);
pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG);
sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */
/* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s",
li->config.host_port);
}
return li->pn_listener;
}
qd_tcp_listener_t *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_tcp_listener_t *li = qd_tcp_listener(qd->server);
if (!li || load_bridge_config(qd, &li->config, entity, true) != QD_ERROR_NONE) {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp listener: %s", qd_error_message());
qd_tcp_listener_decref(li);
return 0;
}
DEQ_ITEM_INIT(li);
DEQ_INSERT_TAIL(tcp_adaptor->listeners, li);
log_tcp_bridge_config(tcp_adaptor->log_source, &li->config, "TcpListener");
tcp_listener_listen(li);
return li;
}
void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl)
{
qd_tcp_listener_t *li = (qd_tcp_listener_t*) impl;
if (li) {
if (li->pn_listener) {
pn_listener_close(li->pn_listener);
}
DEQ_REMOVE(tcp_adaptor->listeners, li);
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
qd_tcp_listener_decref(li);
}
}
qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl)
{
return QD_ERROR_NONE;
}
static qd_tcp_connector_t *qd_tcp_connector(qd_server_t *server)
{
qd_tcp_connector_t *c = new_qd_tcp_connector_t();
if (!c) return 0;
ZERO(c);
sys_atomic_init(&c->ref_count, 1);
c->server = server;
return c;
}
void qd_tcp_connector_decref(qd_tcp_connector_t* c)
{
if (c && sys_atomic_dec(&c->ref_count) == 1) {
free_bridge_config(&c->config);
free_qd_tcp_connector_t(c);
}
}
qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_tcp_connector_t *c = qd_tcp_connector(qd->server);
if (!c || load_bridge_config(qd, &c->config, entity, true) != QD_ERROR_NONE) {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message());
qd_tcp_connector_decref(c);
return 0;
}
DEQ_ITEM_INIT(c);
DEQ_INSERT_TAIL(tcp_adaptor->connectors, c);
log_tcp_bridge_config(tcp_adaptor->log_source, &c->config, "TcpConnector");
c->dispatcher = qdr_tcp_connection_egress(&(c->config), c->server, NULL);
return c;
}
static void close_egress_dispatcher(qdr_tcp_connection_t *context)
{
//actual close needs to happen on connection thread
context->connector_closed = true;
qd_timer_schedule(context->activate_timer, 0);
}
void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
{
qd_tcp_connector_t *ct = (qd_tcp_connector_t*) impl;
if (ct) {
//need to close the pseudo-connection used for dispatching
//deliveries out to live connnections:
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher);
DEQ_REMOVE(tcp_adaptor->connectors, ct);
qd_tcp_connector_decref(ct);
}
}
qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl)
{
return QD_ERROR_NONE;
}
static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target,
qd_session_class_t session_class)
{
void *tcontext = qdr_connection_get_context(conn);
if (tcontext) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_first_attach: no link context");
assert(false);
}
}
static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to)
{
tc->reply_to = (char*) qd_iterator_copy(reply_to);
}
static void qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_iterator_t* subject)
{
int length = qd_iterator_length(subject);
tc->global_id = malloc(length + 1);
qd_iterator_strncpy(subject, tc->global_id, length + 1);
}
static void qdr_tcp_second_attach(void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
if (qdr_link_direction(link) == QD_OUTGOING) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->outgoing_id);
if (tc->ingress) {
qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source));
// for ingress, can start reading from socket once we have
// a reply to address, as that is when we are able to send
// out a message
grant_read_buffers(tc);
handle_incoming(tc);
}
qdr_link_flow(tcp_adaptor->core, link, 10, false);
} else if (!tc->ingress) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->incoming_id);
//for egress we can start reading from the socket once we
//have the link to send messages over
grant_read_buffers(tc);
}
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_second_attach: no link context");
assert(false);
}
}
static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
{
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_detach");
assert(false);
}
static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
if (!conn->flow_enabled && credit > 0) {
conn->flow_enabled = true;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, credit=%d",
conn->conn_id, conn->outgoing_id, credit);
handle_incoming(conn);
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. enabled:%s, credit:%d",
conn->conn_id, qdr_tcp_conn_linkid(conn), conn->flow_enabled?"T":"F", credit);
}
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_flow: no link context");
assert(false);
}
}
static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_offer: no link context");
assert(false);
}
}
static void qdr_tcp_drained(void *context, qdr_link_t *link)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drained: no link context");
assert(false);
}
}
static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drain: no link context");
assert(false);
}
}
static int qdr_tcp_push(void *context, qdr_link_t *link, int limit)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push", conn->conn_id, qdr_tcp_conn_linkid(conn));
return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_push: no link context");
assert(false);
return 0;
}
}
static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_deliver Delivery event", DLV_ARGS(delivery));
if (tc->egress_dispatcher) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" tcp_adaptor initiating egress connection", DLV_ARGS(delivery));
qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
return QD_DELIVERY_MOVED_TO_NEW_LINK;
} else if (!tc->outstream) {
tc->outstream = delivery;
qdr_delivery_incref(delivery, "tcp_adaptor - new outstream");
if (!tc->ingress) {
//on egress, can only set up link for the reverse
//direction once we receive the first part of the
//message from client to server
qd_message_t *msg = qdr_delivery_message(delivery);
qd_iterator_t *f_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT);
qdr_tcp_connection_copy_global_id(tc, f_iter);
qd_iterator_free(f_iter);
f_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
qdr_tcp_connection_copy_reply_to(tc, f_iter);
qd_iterator_free(f_iter);
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_set_address(target, tc->reply_to);
tc->incoming = qdr_link_first_attach(tc->qdr_conn,
QD_INCOMING,
qdr_terminus(0), //qdr_terminus_t *source,
target, //qdr_terminus_t *target,
"tcp.egress.in", //const char *name,
0, //const char *terminus_addr,
false,
NULL,
&(tc->incoming_id));
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Create Link to %s", tc->conn_id, tc->incoming->identity, tc->reply_to);
qdr_link_set_context(tc->incoming, tc);
//add this connection to those visible through management now that we have the global_id
qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection");
action->args.general.context_1 = tc;
qdr_action_enqueue(tcp_adaptor->core, action);
handle_incoming(tc);
}
}
handle_outgoing(tc);
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_deliver: no link context");
assert(false);
}
return 0;
}
static int qdr_tcp_get_credit(void *context, qdr_link_t *link)
{
void* link_context = qdr_link_get_context(link);
if (link_context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_get_credit: no link context");
assert(false);
}
return 10;
}
static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
{
void* link_context = qdr_link_get_context(qdr_delivery_link(dlv));
if (link_context) {
qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s",
DLV_ARGS(dlv), disp, settled ? "true" : "false");
//
// If one of the streaming deliveries is ever settled, the connection must be torn down.
//
if (settled) {
pn_raw_connection_close(tc->pn_raw_conn);
}
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_delivery_update: no link context");
assert(false);
}
}
static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error)
{
void *tcontext = qdr_connection_get_context(conn);
if (tcontext) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_close: no connection context");
assert(false);
}
}
static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace)
{
void *tcontext = qdr_connection_get_context(conn);
if (tcontext) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn));
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_trace: no connection context");
assert(false);
}
}
static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
{
void *context = qdr_connection_get_context(c);
if (context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
sys_mutex_lock(conn->activation_lock);
if (conn->pn_raw_conn) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id);
pn_raw_connection_wake(conn->pn_raw_conn);
sys_mutex_unlock(conn->activation_lock);
} else if (conn->activate_timer) {
sys_mutex_unlock(conn->activation_lock);
// On egress, the raw connection is only created once the
// first part of the message encapsulating the
// client->server half of the stream has been
// received. Prior to that however a subscribing link (and
// its associated connection must be setup), for which we
// fake wakeup by using a timer.
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", conn->conn_id);
qd_timer_schedule(conn->activate_timer, 0);
} else {
sys_mutex_unlock(conn->activation_lock);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", conn->conn_id);
}
} else {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate: no connection context");
// assert(false); This is routine. TODO: Is that a problem?
}
}
/**
* This initialization function will be invoked when the router core is ready for the protocol
* adaptor to be created. This function must:
*
* 1) Register the protocol adaptor with the router-core.
* 2) Prepare the protocol adaptor to be configured.
*/
static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t);
adaptor->core = core;
adaptor->adaptor = qdr_protocol_adaptor(core,
"tcp", // name
adaptor, // context
qdr_tcp_activate, // activate
qdr_tcp_first_attach,
qdr_tcp_second_attach,
qdr_tcp_detach,
qdr_tcp_flow,
qdr_tcp_offer,
qdr_tcp_drained,
qdr_tcp_drain,
qdr_tcp_push,
qdr_tcp_deliver,
qdr_tcp_get_credit,
qdr_tcp_delivery_update,
qdr_tcp_conn_close,
qdr_tcp_conn_trace);
adaptor->log_source = qd_log_source("TCP_ADAPTOR");
DEQ_INIT(adaptor->listeners);
DEQ_INIT(adaptor->connectors);
DEQ_INIT(adaptor->connections);
*adaptor_context = adaptor;
tcp_adaptor = adaptor;
}
static void qdr_tcp_adaptor_final(void *adaptor_context)
{
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Shutting down TCP protocol adaptor");
qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context;
qd_tcp_listener_t *tl = DEQ_HEAD(adaptor->listeners);
while (tl) {
qd_tcp_listener_t *next = DEQ_NEXT(tl);
free_bridge_config(&tl->config);
free_qd_tcp_listener_t(tl);
tl = next;
}
qd_tcp_connector_t *tr = DEQ_HEAD(adaptor->connectors);
while (tr) {
qd_tcp_connector_t *next = DEQ_NEXT(tr);
free_bridge_config(&tr->config);
free_qdr_tcp_connection((qdr_tcp_connection_t*) tr->dispatcher);
free_qd_tcp_connector_t(tr);
tr = next;
}
qdr_tcp_connection_t *tc = DEQ_HEAD(adaptor->connections);
while (tc) {
qdr_tcp_connection_t *next = DEQ_NEXT(tc);
free_qdr_tcp_connection(tc);
tc = next;
}
qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
free(adaptor);
tcp_adaptor = NULL;
}
/**
* Declare the adaptor so that it will self-register on process startup.
*/
QDR_CORE_ADAPTOR_DECLARE("tcp-adaptor", qdr_tcp_adaptor_init, qdr_tcp_adaptor_final)
#define QDR_TCP_CONNECTION_NAME 0
#define QDR_TCP_CONNECTION_IDENTITY 1
#define QDR_TCP_CONNECTION_ADDRESS 2
#define QDR_TCP_CONNECTION_HOST 3
#define QDR_TCP_CONNECTION_DIRECTION 4
#define QDR_TCP_CONNECTION_BYTES_IN 5
#define QDR_TCP_CONNECTION_BYTES_OUT 6
#define QDR_TCP_CONNECTION_UPTIME_SECONDS 7
#define QDR_TCP_CONNECTION_LAST_IN_SECONDS 8
#define QDR_TCP_CONNECTION_LAST_OUT_SECONDS 9
const char * const QDR_TCP_CONNECTION_DIRECTION_IN = "in";
const char * const QDR_TCP_CONNECTION_DIRECTION_OUT = "out";
const char *qdr_tcp_connection_columns[] =
{"name",
"identity",
"address",
"host",
"direction",
"bytesIn",
"bytesOut",
"uptimeSeconds",
"lastInSeconds",
"lastOutSeconds",
0};
const char *TCP_CONNECTION_TYPE = "org.apache.qpid.dispatch.tcpConnection";
static void insert_column(qdr_core_t *core, qdr_tcp_connection_t *conn, int col, qd_composed_field_t *body)
{
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Insert column %i for %p", col, (void*) conn);
char id_str[100];
if (!conn)
return;
switch(col) {
case QDR_TCP_CONNECTION_NAME:
qd_compose_insert_string(body, conn->global_id);
break;
case QDR_TCP_CONNECTION_IDENTITY: {
snprintf(id_str, 100, "%"PRId64, conn->conn_id);
qd_compose_insert_string(body, id_str);
break;
}
case QDR_TCP_CONNECTION_ADDRESS:
qd_compose_insert_string(body, conn->config.address);
break;
case QDR_TCP_CONNECTION_HOST:
qd_compose_insert_string(body, conn->remote_address);
break;
case QDR_TCP_CONNECTION_DIRECTION:
if (conn->ingress)
qd_compose_insert_string(body, QDR_TCP_CONNECTION_DIRECTION_IN);
else
qd_compose_insert_string(body, QDR_TCP_CONNECTION_DIRECTION_OUT);
break;
case QDR_TCP_CONNECTION_BYTES_IN:
qd_compose_insert_uint(body, conn->bytes_in);
break;
case QDR_TCP_CONNECTION_BYTES_OUT:
qd_compose_insert_uint(body, conn->bytes_out);
break;
case QDR_TCP_CONNECTION_UPTIME_SECONDS:
qd_compose_insert_uint(body, core->uptime_ticks - conn->opened_time);
break;
case QDR_TCP_CONNECTION_LAST_IN_SECONDS:
if (conn->last_in_time==0)
qd_compose_insert_null(body);
else
qd_compose_insert_uint(body, core->uptime_ticks - conn->last_in_time);
break;
case QDR_TCP_CONNECTION_LAST_OUT_SECONDS:
if (conn->last_out_time==0)
qd_compose_insert_null(body);
else
qd_compose_insert_uint(body, core->uptime_ticks - conn->last_out_time);
break;
}
}
static void write_list(qdr_core_t *core, qdr_query_t *query, qdr_tcp_connection_t *conn)
{
qd_composed_field_t *body = query->body;
qd_compose_start_list(body);
if (conn) {
int i = 0;
while (query->columns[i] >= 0) {
insert_column(core, conn, query->columns[i], body);
i++;
}
}
qd_compose_end_list(body);
}
static void write_map(qdr_core_t *core,
qdr_tcp_connection_t *conn,
qd_composed_field_t *body,
const char *qdr_connection_columns[])
{
qd_compose_start_map(body);
for(int i = 0; i < QDR_TCP_CONNECTION_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_connection_columns[i]);
insert_column(core, conn, i, body);
}
qd_compose_end_map(body);
}
static void advance(qdr_query_t *query, qdr_tcp_connection_t *conn)
{
if (conn) {
query->next_offset++;
conn = DEQ_NEXT(conn);
query->more = !!conn;
}
else {
query->more = false;
}
}
static qdr_tcp_connection_t *find_by_identity(qdr_core_t *core, qd_iterator_t *identity)
{
if (!identity)
return 0;
qdr_tcp_connection_t *conn = DEQ_HEAD(tcp_adaptor->connections);
while (conn) {
// Convert the passed in identity to a char*
char id[100];
snprintf(id, 100, "%"PRId64, conn->conn_id);
if (qd_iterator_equal(identity, (const unsigned char*) id))
break;
conn = DEQ_NEXT(conn);
}
return conn;
}
void qdra_tcp_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "query for first tcp connection (%i)", offset);
query->status = QD_AMQP_OK;
if (offset >= DEQ_SIZE(tcp_adaptor->connections)) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
qdr_tcp_connection_t *conn = DEQ_HEAD(tcp_adaptor->connections);
for (int i = 0; i < offset && conn; i++)
conn = DEQ_NEXT(conn);
assert(conn);
if (conn) {
write_list(core, query, conn);
query->next_offset = offset;
advance(query, conn);
} else {
query->more = false;
}
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_tcp_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
qdr_tcp_connection_t *conn = 0;
if (query->next_offset < DEQ_SIZE(tcp_adaptor->connections)) {
conn = DEQ_HEAD(tcp_adaptor->connections);
for (int i = 0; i < query->next_offset && conn; i++)
conn = DEQ_NEXT(conn);
}
if (conn) {
write_list(core, query, conn);
advance(query, conn);
} else {
query->more = false;
}
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_tcp_connection_get_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
const char *qdr_tcp_connection_columns[])
{
qdr_tcp_connection_t *conn = 0;
if (!identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Name not supported. Identity required";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s", TCP_CONNECTION_TYPE, query->status.description);
} else {
conn = find_by_identity(core, identity);
if (conn == 0) {
query->status = QD_AMQP_NOT_FOUND;
} else {
write_map(core, conn, query->body, qdr_tcp_connection_columns);
query->status = QD_AMQP_OK;
}
}
qdr_agent_enqueue_response_CT(core, query);
}
static void qdr_add_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (!discard) {
qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1;
DEQ_INSERT_TAIL(tcp_adaptor->connections, conn);
conn->in_list = true;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_add_tcp_connection_CT %s (%zu)",
conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections));
}
}
static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (!discard) {
qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1;
if (conn->in_list) {
DEQ_REMOVE(tcp_adaptor->connections, conn);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"] qdr_del_tcp_connection_CT %s deleted. bytes_in=%"PRIu64", bytes_out=%"PRId64", "
"opened_time=%"PRId64", last_in_time=%"PRId64", last_out_time=%"PRId64". Connections remaining %zu",
conn->conn_id, conn->config.host_port,
conn->bytes_in, conn->bytes_out, conn->opened_time, conn->last_in_time, conn->last_out_time,
DEQ_SIZE(tcp_adaptor->connections));
}
free_qdr_tcp_connection(conn);
}
}