| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <Python.h> |
| #include <qpid/dispatch/ctools.h> |
| #include <qpid/dispatch/threading.h> |
| #include <qpid/dispatch/log.h> |
| #include <qpid/dispatch/amqp.h> |
| #include <qpid/dispatch/server.h> |
| #include "entity.h" |
| #include "entity_cache.h" |
| #include "dispatch_private.h" |
| #include "policy.h" |
| #include "server_private.h" |
| #include "timer_private.h" |
| #include "alloc.h" |
| #include "config.h" |
| #include <stdio.h> |
| #include <time.h> |
| #include <string.h> |
| #include <errno.h> |
| |
| static __thread qd_server_t *thread_server = 0; |
| |
| #define HEARTBEAT_INTERVAL 1000 |
| |
| ALLOC_DEFINE(qd_work_item_t); |
| ALLOC_DEFINE(qd_listener_t); |
| ALLOC_DEFINE(qd_connector_t); |
| ALLOC_DEFINE(qd_deferred_call_t); |
| ALLOC_DEFINE(qd_connection_t); |
| ALLOC_DEFINE(qd_user_fd_t); |
| |
| const char *QD_CONNECTION_TYPE = "connection"; |
| const char *MECH_EXTERNAL = "EXTERNAL"; |
| |
| //Allowed uidFormat fields. |
| const char CERT_COUNTRY_CODE = 'c'; |
| const char CERT_STATE = 's'; |
| const char CERT_CITY_LOCALITY = 'l'; |
| const char CERT_ORGANIZATION_NAME = 'o'; |
| const char CERT_ORGANIZATION_UNIT = 'u'; |
| const char CERT_COMMON_NAME = 'n'; |
| const char CERT_FINGERPRINT_SHA1 = '1'; |
| const char CERT_FINGERPRINT_SHA256 = '2'; |
| const char CERT_FINGERPRINT_SHA512 = '5'; |
| char *COMPONENT_SEPARATOR = ";"; |
| |
| const char *DEFAULT_USER_ID = "anonymous"; |
| |
| static qd_thread_t *thread(qd_server_t *qd_server, int id) |
| { |
| qd_thread_t *thread = NEW(qd_thread_t); |
| if (!thread) |
| return 0; |
| |
| thread->qd_server = qd_server; |
| thread->thread_id = id; |
| thread->running = 0; |
| thread->canceled = 0; |
| thread->using_thread = 0; |
| |
| return thread; |
| } |
| |
| static void free_qd_connection(qd_connection_t *ctx) |
| { |
| if (ctx->policy_settings) { |
| if (ctx->policy_settings->sources) |
| free(ctx->policy_settings->sources); |
| if (ctx->policy_settings->targets) |
| free(ctx->policy_settings->targets); |
| free (ctx->policy_settings); |
| ctx->policy_settings = 0; |
| } |
| if (ctx->pn_conn) { |
| pn_connection_set_context(ctx->pn_conn, 0); |
| pn_decref(ctx->pn_conn); |
| ctx->pn_conn = NULL; |
| } |
| if (ctx->collector) { |
| pn_collector_free(ctx->collector); |
| ctx->collector = NULL; |
| } |
| |
| if (ctx->free_user_id) |
| free((char*)ctx->user_id); |
| |
| free_qd_connection_t(ctx); |
| } |
| |
| qd_error_t qd_entity_update_connection(qd_entity_t* entity, void *impl); |
| |
| /** |
| * This function is set as the pn_transport->tracer and is invoked when proton tries to write the log message to pn_transport->tracer |
| */ |
| static void qd_transport_tracer(pn_transport_t *transport, const char *message) |
| { |
| qd_connection_t *ctx = (qd_connection_t*) pn_transport_get_context(transport); |
| if (ctx) |
| qd_log(ctx->server->log_source, QD_LOG_TRACE, "[%d]:%s", ctx->connection_id, message); |
| } |
| |
| static qd_error_t connection_entity_update_host(qd_entity_t* entity, qd_connection_t *conn) |
| { |
| const qd_server_config_t *config; |
| if (conn->connector) { |
| config = conn->connector->config; |
| char host[strlen(config->host)+strlen(config->port)+2]; |
| snprintf(host, sizeof(host), "%s:%s", config->host, config->port); |
| return qd_entity_set_string(entity, "host", host); |
| } |
| else |
| return qd_entity_set_string(entity, "host", qdpn_connector_name(conn->pn_cxtr)); |
| } |
| |
| /** |
| * Returns a char pointer to a user id which is constructed from components specified in the config->ssl_uid_format. |
| * Parses through each component and builds a semi-colon delimited string which is returned as the user id. |
| */ |
| static const char *qd_transport_get_user(qd_connection_t *conn, pn_transport_t *tport) |
| { |
| const qd_server_config_t *config = |
| conn->connector ? conn->connector->config : conn->listener->config; |
| |
| if (config->ssl_uid_format) { |
| |
| // The ssl_uid_format length cannot be greater that 7 |
| assert(strlen(config->ssl_uid_format) < 8); |
| |
| // |
| // The tokens in the uidFormat strings are delimited by comma. Load the individual components of the uidFormat |
| // into the components[] array. The maximum number of components that are allowed are 7 namely, c, s, l, o, u, n, (1 or 2 or 5) |
| // |
| char components[8]; |
| |
| //The strcpy() function copies the string pointed to by src, including the terminating null byte ('\0'), to the buffer pointed to by dest. |
| strncpy(components, config->ssl_uid_format, 7); |
| |
| const char *country_code = 0; |
| const char *state = 0; |
| const char *locality_city = 0; |
| const char *organization = 0; |
| const char *org_unit = 0; |
| const char *common_name = 0; |
| // |
| // SHA1 is 20 octets (40 hex characters); SHA256 is 32 octets (64 hex characters). |
| // SHA512 is 64 octets (128 hex characters) |
| // |
| char fingerprint[129] = "\0"; |
| |
| int uid_length = 0; |
| int semi_colon_count = -1; |
| |
| int component_count = strlen(components); |
| |
| for (int x = 0; x < component_count ; x++) { |
| // accumulate the length into uid_length on each pass so we definitively know the number of octets to malloc. |
| if (components[x] == CERT_COUNTRY_CODE) { |
| country_code = pn_ssl_get_remote_subject_subfield(pn_ssl(tport), PN_SSL_CERT_SUBJECT_COUNTRY_NAME); |
| if (country_code) { |
| uid_length += strlen((const char *)country_code); |
| semi_colon_count++; |
| } |
| } |
| else if (components[x] == CERT_STATE) { |
| state = pn_ssl_get_remote_subject_subfield(pn_ssl(tport), PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE); |
| if (state) { |
| uid_length += strlen((const char *)state); |
| semi_colon_count++; |
| } |
| } |
| else if (components[x] == CERT_CITY_LOCALITY) { |
| locality_city = pn_ssl_get_remote_subject_subfield(pn_ssl(tport), PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY); |
| if (locality_city) { |
| uid_length += strlen((const char *)locality_city); |
| semi_colon_count++; |
| } |
| } |
| else if (components[x] == CERT_ORGANIZATION_NAME) { |
| organization = pn_ssl_get_remote_subject_subfield(pn_ssl(tport), PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME); |
| if(organization) { |
| uid_length += strlen((const char *)organization); |
| semi_colon_count++; |
| } |
| } |
| else if (components[x] == CERT_ORGANIZATION_UNIT) { |
| org_unit = pn_ssl_get_remote_subject_subfield(pn_ssl(tport), PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT); |
| if(org_unit) { |
| uid_length += strlen((const char *)org_unit); |
| semi_colon_count++; |
| } |
| } |
| else if (components[x] == CERT_COMMON_NAME) { |
| common_name = pn_ssl_get_remote_subject_subfield(pn_ssl(tport), PN_SSL_CERT_SUBJECT_COMMON_NAME); |
| if(common_name) { |
| uid_length += strlen((const char *)common_name); |
| semi_colon_count++; |
| } |
| } |
| else if (components[x] == CERT_FINGERPRINT_SHA1 || components[x] == CERT_FINGERPRINT_SHA256 || components[x] == CERT_FINGERPRINT_SHA512) { |
| // Allocate the memory for message digest |
| int out = 0; |
| |
| int fingerprint_length = 0; |
| if(components[x] == CERT_FINGERPRINT_SHA1) { |
| fingerprint_length = 40; |
| out = pn_ssl_get_cert_fingerprint(pn_ssl(tport), fingerprint, fingerprint_length + 1, PN_SSL_SHA1); |
| } |
| else if (components[x] == CERT_FINGERPRINT_SHA256) { |
| fingerprint_length = 64; |
| out = pn_ssl_get_cert_fingerprint(pn_ssl(tport), fingerprint, fingerprint_length + 1, PN_SSL_SHA256); |
| } |
| else if (components[x] == CERT_FINGERPRINT_SHA512) { |
| fingerprint_length = 128; |
| out = pn_ssl_get_cert_fingerprint(pn_ssl(tport), fingerprint, fingerprint_length + 1, PN_SSL_SHA512); |
| } |
| |
| (void) out; // avoid 'out unused' compiler warnings if NDEBUG undef'ed |
| assert (out != PN_ERR); |
| |
| uid_length += fingerprint_length; |
| semi_colon_count++; |
| } |
| else { |
| // This is an unrecognized component. log a critical error |
| qd_log(conn->server->log_source, QD_LOG_CRITICAL, "Unrecognized component '%c' in uidFormat ", components[x]); |
| return 0; |
| } |
| } |
| |
| if(uid_length > 0) { |
| char *user_id = malloc((uid_length + semi_colon_count + 1) * sizeof(char)); // the +1 is for the '\0' character |
| // |
| // We have allocated memory for user_id. We are responsible for freeing this memory. Set conn->free_user_id |
| // to true so that we know that we have to free the user_id |
| // |
| conn->free_user_id = true; |
| memset(user_id, 0, uid_length + semi_colon_count + 1); |
| |
| // The components in the user id string must appear in the same order as it appears in the component string. that is |
| // why we have this loop |
| for (int x=0; x < component_count ; x++) { |
| if (components[x] == CERT_COUNTRY_CODE) { |
| if (country_code) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) country_code); |
| } |
| } |
| else if (components[x] == CERT_STATE) { |
| if (state) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) state); |
| } |
| } |
| else if (components[x] == CERT_CITY_LOCALITY) { |
| if (locality_city) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) locality_city); |
| } |
| } |
| else if (components[x] == CERT_ORGANIZATION_NAME) { |
| if (organization) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) organization); |
| } |
| } |
| else if (components[x] == CERT_ORGANIZATION_UNIT) { |
| if (org_unit) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) org_unit); |
| } |
| } |
| else if (components[x] == CERT_COMMON_NAME) { |
| if (common_name) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) common_name); |
| } |
| } |
| else if (components[x] == CERT_FINGERPRINT_SHA1 || components[x] == CERT_FINGERPRINT_SHA256 || components[x] == CERT_FINGERPRINT_SHA512) { |
| if (strlen((char *) fingerprint) > 0) { |
| if(*user_id != '\0') |
| strcat(user_id, COMPONENT_SEPARATOR); |
| strcat(user_id, (char *) fingerprint); |
| } |
| } |
| } |
| qd_log(conn->server->log_source, QD_LOG_DEBUG, "User id is '%s' ", user_id); |
| return user_id; |
| } |
| } |
| else //config->ssl_uid_format not specified, just return the username provided by the proton transport. |
| return pn_transport_get_user(tport); |
| |
| return 0; |
| } |
| |
| |
| void qd_connection_set_user(qd_connection_t *conn) |
| { |
| pn_transport_t *tport = pn_connection_transport(conn->pn_conn); |
| pn_sasl_t *sasl = pn_sasl(tport); |
| if (sasl) { |
| const char *mech = pn_sasl_get_mech(sasl); |
| conn->user_id = pn_transport_get_user(tport); |
| // We want to set the user name only if it is not already set and the selected sasl mechanism is EXTERNAL |
| if (mech && strcmp(mech, MECH_EXTERNAL) == 0) { |
| const char *user_id = qd_transport_get_user(conn, tport); |
| if (user_id) |
| conn->user_id = user_id; |
| } |
| } |
| if (!conn->user_id) |
| conn->user_id = DEFAULT_USER_ID; |
| } |
| |
| |
| static void qd_get_next_pn_data(pn_data_t *data, const char **d, int *d1) |
| { |
| if (pn_data_next(data)) { |
| switch (pn_data_type(data)) { |
| case PN_STRING: |
| *d = pn_data_get_string(data).start; |
| break; |
| case PN_SYMBOL: |
| *d = pn_data_get_symbol(data).start; |
| break; |
| case PN_INT: |
| *d1 = pn_data_get_int(data); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| |
| |
| /** |
| * Obtains the remote connection properties and sets it as a map on the passed in entity. |
| * @param |
| */ |
| static qd_error_t qd_set_connection_properties(qd_entity_t* entity, qd_connection_t *conn) |
| { |
| // Get the connection properties and stick it into the entity as a map |
| pn_data_t *data = pn_connection_remote_properties(conn->pn_conn); |
| const char *props = "properties"; |
| if (data) { |
| size_t count = pn_data_get_map(data); |
| pn_data_enter(data); |
| |
| // Create a new map. |
| qd_error_t error_t = qd_entity_set_map(entity, props); |
| |
| if (error_t != QD_ERROR_NONE) |
| return error_t; |
| |
| for (size_t i = 0; i < count/2; i++) { |
| const char *key = 0; |
| // We are assuming for now that all keys are strings |
| qd_get_next_pn_data(data, &key, 0); |
| const char *value_string = 0; |
| int value_int = 0; |
| // We are assuming for now that all values are either strings or integers |
| qd_get_next_pn_data(data, &value_string, &value_int); |
| |
| if (value_string) |
| error_t = qd_entity_set_map_key_value_string(entity, props, key, value_string); |
| else if (value_int) |
| error_t = qd_entity_set_map_key_value_int(entity, props, key, value_int); |
| |
| if (error_t != QD_ERROR_NONE) |
| return error_t; |
| } |
| pn_data_exit(data); |
| } |
| |
| return QD_ERROR_NONE; |
| } |
| |
| |
| |
| |
| qd_error_t qd_entity_refresh_connection(qd_entity_t* entity, void *impl) |
| { |
| qd_connection_t *conn = (qd_connection_t*)impl; |
| const qd_server_config_t *config = |
| conn->connector ? conn->connector->config : conn->listener->config; |
| pn_transport_t *tport = 0; |
| pn_sasl_t *sasl = 0; |
| pn_ssl_t *ssl = 0; |
| const char *mech = 0; |
| const char *user = 0; |
| |
| if (conn->pn_conn) { |
| tport = pn_connection_transport(conn->pn_conn); |
| ssl = conn->ssl; |
| } |
| if (tport) { |
| sasl = pn_sasl(tport); |
| if(conn->user_id) |
| user = conn->user_id; |
| else |
| user = pn_transport_get_user(tport); |
| } |
| if (sasl) |
| mech = pn_sasl_get_mech(sasl); |
| |
| if (qd_entity_set_bool(entity, "opened", conn->opened) == 0 && |
| qd_entity_set_string(entity, "container", |
| conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0) == 0 && |
| connection_entity_update_host(entity, conn) == 0 && |
| qd_entity_set_string(entity, "sasl", mech) == 0 && |
| qd_entity_set_string(entity, "role", config->role) == 0 && |
| qd_entity_set_string(entity, "dir", conn->connector ? "out" : "in") == 0 && |
| qd_entity_set_string(entity, "user", user) == 0 && |
| qd_set_connection_properties(entity, conn) == 0 && |
| qd_entity_set_long(entity, "identity", conn->connection_id) == 0 && |
| qd_entity_set_bool(entity, "isAuthenticated", tport && pn_transport_is_authenticated(tport)) == 0 && |
| qd_entity_set_bool(entity, "isEncrypted", tport && pn_transport_is_encrypted(tport)) == 0 && |
| qd_entity_set_bool(entity, "ssl", ssl != 0) == 0) { |
| |
| if (ssl) { |
| #define SSL_ATTR_SIZE 50 |
| char proto[SSL_ATTR_SIZE]; |
| char cipher[SSL_ATTR_SIZE]; |
| pn_ssl_get_protocol_name(ssl, proto, SSL_ATTR_SIZE); |
| pn_ssl_get_cipher_name(ssl, cipher, SSL_ATTR_SIZE); |
| if (qd_entity_set_string(entity, "sslProto", proto) == 0 && |
| qd_entity_set_string(entity, "sslCipher", cipher) == 0 && |
| qd_entity_set_long(entity, "sslSsf", pn_ssl_get_ssf(ssl)) == 0) { |
| return QD_ERROR_NONE; |
| } |
| } |
| else |
| return QD_ERROR_NONE; |
| } |
| |
| return qd_error_code(); |
| } |
| |
| static qd_error_t listener_setup_ssl(qd_connection_t *ctx, const qd_server_config_t *config, pn_transport_t *tport) |
| { |
| pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_SERVER); |
| if (!domain) return qd_error(QD_ERROR_RUNTIME, "No SSL support"); |
| |
| // setup my identifying cert: |
| if (pn_ssl_domain_set_credentials(domain, |
| config->ssl_certificate_file, |
| config->ssl_private_key_file, |
| config->ssl_password)) { |
| pn_ssl_domain_free(domain); |
| return qd_error(QD_ERROR_RUNTIME, "Cannot set SSL credentials"); |
| } |
| if (!config->ssl_required) { |
| if (pn_ssl_domain_allow_unsecured_client(domain)) { |
| pn_ssl_domain_free(domain); |
| return qd_error(QD_ERROR_RUNTIME, "Cannot allow unsecured client"); |
| } |
| } |
| |
| // for peer authentication: |
| if (config->ssl_trusted_certificate_db) { |
| if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) { |
| pn_ssl_domain_free(domain); |
| return qd_error(QD_ERROR_RUNTIME, "Cannot set trusted SSL CA" ); |
| } |
| } |
| |
| const char *trusted = config->ssl_trusted_certificate_db; |
| if (config->ssl_trusted_certificates) |
| trusted = config->ssl_trusted_certificates; |
| |
| // do we force the peer to send a cert? |
| if (config->ssl_require_peer_authentication) { |
| if (!trusted || pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER, trusted)) { |
| pn_ssl_domain_free(domain); |
| return qd_error(QD_ERROR_RUNTIME, "Cannot set peer authentication"); |
| } |
| } |
| |
| ctx->ssl = pn_ssl(tport); |
| if (!ctx->ssl || pn_ssl_init(ctx->ssl, domain, 0)) { |
| pn_ssl_domain_free(domain); |
| return qd_error(QD_ERROR_RUNTIME, "Cannot initialize SSL"); |
| } |
| |
| pn_ssl_domain_free(domain); |
| return QD_ERROR_NONE; |
| } |
| |
| // Format the identity of an incoming connection to buf for logging |
| static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr) |
| { |
| qd_listener_t *qd_listener = qdpn_listener_context(qdpn_connector_listener(cxtr)); |
| assert(qd_listener); |
| const char *cname = qdpn_connector_name(cxtr); |
| const char *host = qd_listener->config->host; |
| const char *port = qd_listener->config->port; |
| snprintf(buf, size, "incoming connection from %s to %s:%s", cname, host, port); |
| return buf; |
| } |
| |
| |
| static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config) |
| { |
| size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY); |
| |
| // |
| // Set the container name |
| // |
| pn_connection_set_container(conn, qd_server->container_name); |
| |
| // |
| // Offer ANONYMOUS_RELAY capability |
| // |
| pn_data_put_symbol(pn_connection_offered_capabilities(conn), pn_bytes(clen, (char*) QD_CAPABILITY_ANONYMOUS_RELAY)); |
| |
| // |
| // Create the connection properties map |
| // |
| pn_data_put_map(pn_connection_properties(conn)); |
| pn_data_enter(pn_connection_properties(conn)); |
| |
| pn_data_put_symbol(pn_connection_properties(conn), |
| pn_bytes(strlen(QD_CONNECTION_PROPERTY_PRODUCT_KEY), QD_CONNECTION_PROPERTY_PRODUCT_KEY)); |
| pn_data_put_string(pn_connection_properties(conn), |
| pn_bytes(strlen(QD_CONNECTION_PROPERTY_PRODUCT_VALUE), QD_CONNECTION_PROPERTY_PRODUCT_VALUE)); |
| |
| pn_data_put_symbol(pn_connection_properties(conn), |
| pn_bytes(strlen(QD_CONNECTION_PROPERTY_VERSION_KEY), QD_CONNECTION_PROPERTY_VERSION_KEY)); |
| pn_data_put_string(pn_connection_properties(conn), |
| pn_bytes(strlen(QPID_DISPATCH_VERSION), QPID_DISPATCH_VERSION)); |
| |
| if (config && config->inter_router_cost > 1) { |
| pn_data_put_symbol(pn_connection_properties(conn), |
| pn_bytes(strlen(QD_CONNECTION_PROPERTY_COST_KEY), QD_CONNECTION_PROPERTY_COST_KEY)); |
| pn_data_put_int(pn_connection_properties(conn), config->inter_router_cost); |
| } |
| |
| pn_data_exit(pn_connection_properties(conn)); |
| } |
| |
| |
| static void thread_process_listeners_LH(qd_server_t *qd_server) |
| { |
| qdpn_driver_t *driver = qd_server->driver; |
| qdpn_listener_t *listener; |
| qdpn_connector_t *cxtr; |
| qd_connection_t *ctx; |
| |
| for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) { |
| bool policy_counted = false; |
| cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept, &policy_counted); |
| if (!cxtr) |
| continue; |
| |
| char logbuf[qd_log_max_len()]; |
| |
| qd_log(qd_server->log_source, QD_LOG_DEBUG, "Accepting %s", |
| log_incoming(logbuf, sizeof(logbuf), cxtr)); |
| |
| ctx = new_qd_connection_t(); |
| DEQ_ITEM_INIT(ctx); |
| ctx->server = qd_server; |
| ctx->opened = false; |
| ctx->closed = false; |
| ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER; |
| ctx->enqueued = 0; |
| ctx->pn_cxtr = cxtr; |
| ctx->collector = 0; |
| ctx->ssl = 0; |
| ctx->listener = qdpn_listener_context(listener); |
| ctx->connector = 0; |
| ctx->context = ctx->listener->context; |
| ctx->user_context = 0; |
| ctx->link_context = 0; |
| ctx->ufd = 0; |
| ctx->user_id = 0; |
| ctx->free_user_id = false; |
| ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it |
| ctx->policy_settings = 0; |
| ctx->n_senders = 0; |
| ctx->n_receivers = 0; |
| ctx->open_container = 0; |
| DEQ_INIT(ctx->deferred_calls); |
| ctx->deferred_call_lock = sys_mutex(); |
| ctx->event_stall = false; |
| ctx->policy_counted = policy_counted; |
| |
| pn_connection_t *conn = pn_connection(); |
| ctx->collector = pn_collector(); |
| pn_connection_collect(conn, ctx->collector); |
| decorate_connection(qd_server, conn, ctx->listener->config); |
| qdpn_connector_set_connection(cxtr, conn); |
| pn_connection_set_context(conn, ctx); |
| ctx->pn_conn = conn; |
| ctx->owner_thread = CONTEXT_NO_OWNER; |
| qdpn_connector_set_context(cxtr, ctx); |
| |
| // qd_server->lock is already locked |
| DEQ_INSERT_TAIL(qd_server->connections, ctx); |
| qd_entity_cache_add(QD_CONNECTION_TYPE, ctx); |
| |
| // |
| // Get a pointer to the transport so we can insert security components into it |
| // |
| pn_transport_t *tport = qdpn_connector_transport(cxtr); |
| const qd_server_config_t *config = ctx->listener->config; |
| |
| // |
| // Configure the transport. |
| // |
| pn_transport_set_server(tport); |
| pn_transport_set_max_frame(tport, config->max_frame_size); |
| pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); |
| |
| // |
| // Proton pushes out its trace to qd_transport_tracer() which in turn writes a trace message to the qdrouter log |
| // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport |
| // |
| pn_transport_set_context(tport, ctx); |
| if (qd_log_enabled(qd_server->log_source, QD_LOG_TRACE)) { |
| pn_transport_trace(tport, PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW); |
| pn_transport_set_tracer(tport, qd_transport_tracer); |
| } |
| |
| // Set up SSL if configured |
| if (config->ssl_enabled) { |
| qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s", |
| log_incoming(logbuf, sizeof(logbuf), cxtr)); |
| if (listener_setup_ssl(ctx, config, tport) != QD_ERROR_NONE) { |
| qd_log(qd_server->log_source, QD_LOG_ERROR, "%s on %s", |
| qd_error_message(), log_incoming(logbuf, sizeof(logbuf), cxtr)); |
| qdpn_connector_close(cxtr); |
| continue; |
| } |
| } |
| |
| // |
| // Set up SASL |
| // |
| pn_sasl_t *sasl = pn_sasl(tport); |
| if (qd_server->sasl_config_path) |
| pn_sasl_config_path(sasl, qd_server->sasl_config_path); |
| pn_sasl_config_name(sasl, qd_server->sasl_config_name); |
| if (config->sasl_mechanisms) |
| pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms); |
| pn_transport_require_auth(tport, config->requireAuthentication); |
| pn_transport_require_encryption(tport, config->requireEncryption); |
| pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication); |
| } |
| } |
| |
| |
| static void handle_signals_LH(qd_server_t *qd_server) |
| { |
| int signum = qd_server->pending_signal; |
| |
| if (signum) { |
| qd_server->pending_signal = 0; |
| if (qd_server->signal_handler) { |
| sys_mutex_unlock(qd_server->lock); |
| qd_server->signal_handler(qd_server->signal_context, signum); |
| sys_mutex_lock(qd_server->lock); |
| } |
| } |
| } |
| |
| |
| static void block_if_paused_LH(qd_server_t *qd_server) |
| { |
| if (qd_server->pause_requests > 0) { |
| qd_server->threads_paused++; |
| sys_cond_signal_all(qd_server->cond); |
| while (qd_server->pause_requests > 0) |
| sys_cond_wait(qd_server->cond, qd_server->lock); |
| qd_server->threads_paused--; |
| } |
| } |
| |
| |
| static void invoke_deferred_calls(qd_connection_t *conn, bool discard) |
| { |
| qd_deferred_call_list_t calls; |
| qd_deferred_call_t *dc; |
| |
| // |
| // Copy the deferred calls out of the connection under lock. |
| // |
| DEQ_INIT(calls); |
| sys_mutex_lock(conn->deferred_call_lock); |
| dc = DEQ_HEAD(conn->deferred_calls); |
| while (dc) { |
| DEQ_REMOVE_HEAD(conn->deferred_calls); |
| DEQ_INSERT_TAIL(calls, dc); |
| dc = DEQ_HEAD(conn->deferred_calls); |
| } |
| sys_mutex_unlock(conn->deferred_call_lock); |
| |
| // |
| // Invoke the calls outside of the critical section. |
| // |
| dc = DEQ_HEAD(calls); |
| while (dc) { |
| DEQ_REMOVE_HEAD(calls); |
| dc->call(dc->context, discard); |
| free_qd_deferred_call_t(dc); |
| dc = DEQ_HEAD(calls); |
| } |
| } |
| |
| |
| static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr) |
| { |
| qd_connection_t *ctx = qdpn_connector_context(cxtr); |
| int events = 0; |
| int passes = 0; |
| |
| if (ctx->closed) |
| return 0; |
| |
| // |
| // If this is a user connection, bypass the AMQP processing and invoke the |
| // UserFD handler instead. |
| // |
| if (ctx->ufd) { |
| qd_server->ufd_handler(ctx->ufd->context, ctx->ufd); |
| return 1; |
| } |
| |
| do { |
| passes++; |
| |
| // |
| // Step the engine for pre-handler processing |
| // |
| qdpn_connector_process(cxtr); |
| |
| // |
| // If the connector has closed, notify the client via callback. |
| // |
| if (qdpn_connector_closed(cxtr)) { |
| if (ctx->opened) |
| qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, |
| QD_CONN_EVENT_CLOSE, |
| (qd_connection_t*) qdpn_connector_context(cxtr)); |
| ctx->closed = true; |
| events = 0; |
| break; |
| } |
| |
| invoke_deferred_calls(ctx, false); |
| |
| qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr); |
| pn_collector_t *collector = qd_connection_collector(qd_conn); |
| pn_event_t *event; |
| |
| events = 0; |
| if (!ctx->event_stall) { |
| event = pn_collector_peek(collector); |
| while (event) { |
| // |
| // If we are transitioning to the open state, notify the client via callback. |
| // |
| if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) { |
| ctx->opened = true; |
| qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN; |
| |
| if (ctx->connector) { |
| ce = QD_CONN_EVENT_CONNECTOR_OPEN; |
| ctx->connector->delay = 0; |
| } else |
| assert(ctx->listener); |
| |
| qd_server->conn_handler(qd_server->conn_handler_context, |
| ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr)); |
| events = 1; |
| } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) { |
| if (ctx->connector) { |
| const qd_server_config_t *config = ctx->connector->config; |
| qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port); |
| } |
| } |
| |
| events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn); |
| pn_collector_pop(collector); |
| |
| event = ctx->event_stall ? 0 : pn_collector_peek(collector); |
| } |
| events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn); |
| } |
| } while (events > 0); |
| |
| return passes > 1; |
| } |
| |
| |
| // |
| // TEMPORARY FUNCTION PROTOTYPES |
| // |
| void qdpn_driver_wait_1(qdpn_driver_t *d); |
| int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout); |
| void qdpn_driver_wait_3(qdpn_driver_t *d); |
| // |
| // END TEMPORARY |
| // |
| |
| static void *thread_run(void *arg) |
| { |
| qd_thread_t *thread = (qd_thread_t*) arg; |
| qd_work_item_t *work; |
| qdpn_connector_t *cxtr; |
| qd_connection_t *ctx; |
| int error; |
| int poll_result; |
| |
| if (!thread) |
| return 0; |
| |
| qd_server_t *qd_server = thread->qd_server; |
| thread_server = qd_server; |
| thread->running = 1; |
| |
| if (thread->canceled) |
| return 0; |
| |
| // |
| // Invoke the start handler if the application supplied one. |
| // This handler can be used to set NUMA or processor affinnity for the thread. |
| // |
| if (qd_server->start_handler) |
| qd_server->start_handler(qd_server->start_context, thread->thread_id); |
| |
| // |
| // Main Loop |
| // |
| while (thread->running) { |
| sys_mutex_lock(qd_server->lock); |
| |
| // |
| // Check for pending signals to process |
| // |
| handle_signals_LH(qd_server); |
| if (!thread->running) { |
| sys_mutex_unlock(qd_server->lock); |
| break; |
| } |
| |
| // |
| // Check to see if the server is pausing. If so, block here. |
| // |
| block_if_paused_LH(qd_server); |
| if (!thread->running) { |
| sys_mutex_unlock(qd_server->lock); |
| break; |
| } |
| |
| // |
| // Service pending timers. |
| // |
| qd_timer_t *timer = DEQ_HEAD(qd_server->pending_timers); |
| if (timer) { |
| DEQ_REMOVE_HEAD(qd_server->pending_timers); |
| |
| // |
| // Mark the timer as idle in case it reschedules itself. |
| // |
| qd_timer_idle_LH(timer); |
| |
| // |
| // Release the lock and invoke the connection handler. |
| // |
| sys_mutex_unlock(qd_server->lock); |
| timer->handler(timer->context); |
| qdpn_driver_wakeup(qd_server->driver); |
| continue; |
| } |
| |
| // |
| // Check the work queue for connectors scheduled for processing. |
| // |
| work = DEQ_HEAD(qd_server->work_queue); |
| if (!work) { |
| // |
| // There is no pending work to do |
| // |
| if (qd_server->a_thread_is_waiting) { |
| // |
| // Another thread is waiting on the proton driver, this thread must |
| // wait on the condition variable until signaled. |
| // |
| sys_cond_wait(qd_server->cond, qd_server->lock); |
| } else { |
| // |
| // This thread elects itself to wait on the proton driver. Set the |
| // thread-is-waiting flag so other idle threads will not interfere. |
| // |
| qd_server->a_thread_is_waiting = true; |
| |
| // |
| // Ask the timer module when its next timer is scheduled to fire. We'll |
| // use this value in driver_wait as the timeout. If there are no scheduled |
| // timers, the returned value will be -1. |
| // |
| qd_timestamp_t duration = qd_timer_next_duration_LH(); |
| |
| // |
| // Invoke the proton driver's wait sequence. This is a bit of a hack for now |
| // and will be improved in the future. The wait process is divided into three parts, |
| // the first and third of which need to be non-reentrant, and the second of which |
| // must be reentrant (and blocks). |
| // |
| qdpn_driver_wait_1(qd_server->driver); |
| sys_mutex_unlock(qd_server->lock); |
| |
| do { |
| error = 0; |
| poll_result = qdpn_driver_wait_2(qd_server->driver, duration); |
| if (poll_result == -1) |
| error = errno; |
| } while (error == EINTR); |
| if (error) { |
| exit(-1); |
| } |
| |
| sys_mutex_lock(qd_server->lock); |
| qdpn_driver_wait_3(qd_server->driver); |
| |
| if (!thread->running) { |
| sys_mutex_unlock(qd_server->lock); |
| break; |
| } |
| |
| // |
| // Visit the timer module. |
| // |
| struct timespec tv; |
| clock_gettime(CLOCK_REALTIME, &tv); |
| qd_timestamp_t milliseconds = ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000; |
| qd_timer_visit_LH(milliseconds); |
| |
| // |
| // Process listeners (incoming connections). |
| // |
| thread_process_listeners_LH(qd_server); |
| |
| // |
| // Traverse the list of connectors-needing-service from the proton driver. |
| // If the connector is not already in the work queue and it is not currently |
| // being processed by another thread, put it in the work queue and signal the |
| // condition variable. |
| // |
| cxtr = qdpn_driver_connector(qd_server->driver); |
| while (cxtr) { |
| ctx = qdpn_connector_context(cxtr); |
| if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { |
| ctx->enqueued = 1; |
| qd_work_item_t *workitem = new_qd_work_item_t(); |
| DEQ_ITEM_INIT(workitem); |
| workitem->cxtr = cxtr; |
| DEQ_INSERT_TAIL(qd_server->work_queue, workitem); |
| sys_cond_signal(qd_server->cond); |
| } |
| cxtr = qdpn_driver_connector(qd_server->driver); |
| } |
| |
| // |
| // Release our exclusive claim on qdpn_driver_wait. |
| // |
| qd_server->a_thread_is_waiting = false; |
| } |
| } |
| |
| // |
| // If we were given a connector to work on from the work queue, mark it as |
| // owned by this thread and as no longer enqueued. |
| // |
| cxtr = 0; |
| if (work) { |
| DEQ_REMOVE_HEAD(qd_server->work_queue); |
| ctx = qdpn_connector_context(work->cxtr); |
| if (ctx->owner_thread == CONTEXT_NO_OWNER) { |
| ctx->owner_thread = thread->thread_id; |
| ctx->enqueued = 0; |
| qd_server->threads_active++; |
| cxtr = work->cxtr; |
| free_qd_work_item_t(work); |
| } else { |
| // |
| // This connector is being processed by another thread, re-queue it. |
| // |
| DEQ_INSERT_TAIL(qd_server->work_queue, work); |
| } |
| } |
| sys_mutex_unlock(qd_server->lock); |
| |
| // |
| // Process the connector that we now have exclusive access to. |
| // |
| if (cxtr) { |
| int work_done = 1; |
| |
| if (qdpn_connector_failed(cxtr)) |
| qdpn_connector_close(cxtr); |
| |
| // |
| // Even if the connector has failed there are still events that |
| // must be processed so that associated links will be cleaned up. |
| // |
| work_done = process_connector(qd_server, cxtr); |
| |
| // |
| // Check to see if the connector was closed during processing |
| // |
| if (qdpn_connector_closed(cxtr)) { |
| qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx); |
| // |
| // Connector is closed. Free the context and the connector. |
| // If this is a dispatch connector, schedule the re-connect timer |
| // |
| if (ctx->connector) { |
| ctx->connector->ctx = 0; |
| ctx->connector->state = CXTR_STATE_CONNECTING; |
| qd_timer_schedule(ctx->connector->timer, ctx->connector->delay); |
| } |
| |
| sys_mutex_lock(qd_server->lock); |
| DEQ_REMOVE(qd_server->connections, ctx); |
| |
| if (ctx->policy_counted) { |
| qd_policy_socket_close(qd_server->qd->policy, ctx); |
| } |
| |
| qdpn_connector_free(cxtr); |
| invoke_deferred_calls(ctx, true); // Discard any pending deferred calls |
| sys_mutex_free(ctx->deferred_call_lock); |
| free_qd_connection(ctx); |
| qd_server->threads_active--; |
| sys_mutex_unlock(qd_server->lock); |
| } else { |
| // |
| // The connector lives on. Mark it as no longer owned by this thread. |
| // |
| sys_mutex_lock(qd_server->lock); |
| ctx->owner_thread = CONTEXT_NO_OWNER; |
| qd_server->threads_active--; |
| sys_mutex_unlock(qd_server->lock); |
| } |
| |
| // |
| // Wake up the proton driver to force it to reconsider its set of FDs |
| // in light of the processing that just occurred. |
| // |
| if (work_done) |
| qdpn_driver_wakeup(qd_server->driver); |
| } |
| } |
| |
| return 0; |
| } |
| |
| |
| static void thread_start(qd_thread_t *thread) |
| { |
| if (!thread) |
| return; |
| |
| thread->using_thread = 1; |
| thread->thread = sys_thread(thread_run, (void*) thread); |
| } |
| |
| |
| static void thread_cancel(qd_thread_t *thread) |
| { |
| if (!thread) |
| return; |
| |
| thread->running = 0; |
| thread->canceled = 1; |
| } |
| |
| |
| static void thread_join(qd_thread_t *thread) |
| { |
| if (!thread) |
| return; |
| |
| if (thread->using_thread) { |
| sys_thread_join(thread->thread); |
| sys_thread_free(thread->thread); |
| } |
| } |
| |
| |
| static void thread_free(qd_thread_t *thread) |
| { |
| if (!thread) |
| return; |
| |
| free(thread); |
| } |
| |
| |
| static void cxtr_try_open(void *context) |
| { |
| qd_connector_t *ct = (qd_connector_t*) context; |
| if (ct->state != CXTR_STATE_CONNECTING) |
| return; |
| |
| qd_connection_t *ctx = new_qd_connection_t(); |
| DEQ_ITEM_INIT(ctx); |
| ctx->server = ct->server; |
| ctx->opened = false; |
| ctx->closed = false; |
| ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER; |
| ctx->enqueued = 0; |
| ctx->pn_conn = pn_connection(); |
| ctx->collector = pn_collector(); |
| ctx->ssl = 0; |
| ctx->listener = 0; |
| ctx->connector = ct; |
| ctx->context = ct->context; |
| ctx->user_context = 0; |
| ctx->link_context = 0; |
| ctx->ufd = 0; |
| ctx->user_id = 0; |
| ctx->free_user_id = false; |
| ctx->policy_settings = 0; |
| ctx->n_senders = 0; |
| ctx->n_receivers = 0; |
| ctx->open_container = 0; |
| |
| DEQ_INIT(ctx->deferred_calls); |
| ctx->deferred_call_lock = sys_mutex(); |
| ctx->event_stall = false; |
| ctx->policy_counted = false; |
| |
| qd_log(ct->server->log_source, QD_LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); |
| |
| pn_connection_collect(ctx->pn_conn, ctx->collector); |
| decorate_connection(ctx->server, ctx->pn_conn, ct->config); |
| |
| // |
| // qdpn_connector is not thread safe |
| // |
| sys_mutex_lock(ct->server->lock); |
| // Increment the connection id so the next connection can use it |
| ctx->connection_id = ct->server->next_connection_id++; |
| ctx->pn_cxtr = qdpn_connector(ct->server->driver, ct->config->host, ct->config->port, ct->config->protocol_family, (void*) ctx); |
| if (ctx->pn_cxtr) { |
| DEQ_INSERT_TAIL(ct->server->connections, ctx); |
| qd_entity_cache_add(QD_CONNECTION_TYPE, ctx); |
| } |
| sys_mutex_unlock(ct->server->lock); |
| |
| const qd_server_config_t *config = ct->config; |
| |
| if (ctx->pn_cxtr == 0) { |
| sys_mutex_free(ctx->deferred_call_lock); |
| free_qd_connection(ctx); |
| ct->delay = 10000; |
| qd_timer_schedule(ct->timer, ct->delay); |
| return; |
| } |
| |
| // |
| // Set the hostname on the pn_connection. This hostname will be used by proton as the hostname in the open frame. |
| // |
| pn_connection_set_hostname(ctx->pn_conn, config->host); |
| |
| // Set the sasl user name and password on the proton connection object. This has to be done before the call to qdpn_connector_transport() which |
| // binds the transport to the connection |
| if(config->sasl_username) |
| pn_connection_set_user(ctx->pn_conn, config->sasl_username); |
| if (config->sasl_password) |
| pn_connection_set_password(ctx->pn_conn, config->sasl_password); |
| |
| qdpn_connector_set_connection(ctx->pn_cxtr, ctx->pn_conn); |
| pn_connection_set_context(ctx->pn_conn, ctx); |
| |
| ctx->connector->state = CXTR_STATE_OPEN; |
| |
| ct->ctx = ctx; |
| ct->delay = 5000; |
| |
| // |
| // Set up the transport, SASL, and SSL for the connection. |
| // |
| pn_transport_t *tport = qdpn_connector_transport(ctx->pn_cxtr); |
| |
| // |
| // Configure the transport |
| // |
| pn_transport_set_max_frame(tport, config->max_frame_size); |
| pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); |
| |
| // |
| // Proton pushes out its trace to qd_transport_tracer() which in turn writes a trace message to the qdrouter log |
| // |
| // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport |
| pn_transport_set_context(tport, ctx); |
| if (qd_log_enabled(ct->server->log_source, QD_LOG_TRACE)) { |
| pn_transport_trace(tport, PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW); |
| pn_transport_set_tracer(tport, qd_transport_tracer); |
| } |
| |
| // |
| // Set up SSL if appropriate |
| // |
| if (config->ssl_enabled) { |
| pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); |
| |
| if (!domain) { |
| qd_error(QD_ERROR_RUNTIME, "SSL domain failed for connection to %s:%s", |
| ct->config->host, ct->config->port); |
| /* TODO aconway 2014-07-15: Close the connection, clean up. */ |
| return; |
| } |
| |
| /* TODO aconway 2014-07-15: error handling on all SSL calls. */ |
| |
| // set our trusted database for checking the peer's cert: |
| if (config->ssl_trusted_certificate_db) { |
| if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) { |
| qd_log(ct->server->log_source, QD_LOG_ERROR, |
| "SSL CA configuration failed for %s:%s", |
| ct->config->host, ct->config->port); |
| } |
| } |
| // should we force the peer to provide a cert? |
| if (config->ssl_require_peer_authentication) { |
| const char *trusted = (config->ssl_trusted_certificates) |
| ? config->ssl_trusted_certificates |
| : config->ssl_trusted_certificate_db; |
| if (pn_ssl_domain_set_peer_authentication(domain, |
| PN_SSL_VERIFY_PEER, |
| trusted)) { |
| qd_log(ct->server->log_source, QD_LOG_ERROR, |
| "SSL peer auth configuration failed for %s:%s", |
| ct->config->host, ct->config->port); |
| } |
| } |
| |
| // configure our certificate if the peer requests one: |
| if (config->ssl_certificate_file) { |
| if (pn_ssl_domain_set_credentials(domain, |
| config->ssl_certificate_file, |
| config->ssl_private_key_file, |
| config->ssl_password)) { |
| qd_log(ct->server->log_source, QD_LOG_ERROR, |
| "SSL local configuration failed for %s:%s", |
| ct->config->host, ct->config->port); |
| } |
| } |
| |
| //If ssl is enabled and verify_host_name is true, instruct proton to verify peer name |
| if (config->verify_host_name) { |
| if (pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, NULL)) { |
| qd_log(ct->server->log_source, QD_LOG_ERROR, |
| "SSL peer host name verification failed for %s:%s", |
| ct->config->host, ct->config->port); |
| } |
| } |
| |
| ctx->ssl = pn_ssl(tport); |
| pn_ssl_init(ctx->ssl, domain, 0); |
| pn_ssl_domain_free(domain); |
| } |
| |
| // |
| // Set up SASL |
| // |
| sys_mutex_lock(ct->server->lock); |
| pn_sasl_t *sasl = pn_sasl(tport); |
| if (config->sasl_mechanisms) |
| pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms); |
| pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication); |
| sys_mutex_unlock(ct->server->lock); |
| |
| pn_connection_open(ctx->pn_conn); |
| |
| ctx->owner_thread = CONTEXT_NO_OWNER; |
| } |
| |
| |
| static void heartbeat_cb(void *context) |
| { |
| qd_server_t *qd_server = (qd_server_t*) context; |
| qdpn_activate_all(qd_server->driver); |
| qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL); |
| } |
| |
| |
| qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *container_name, |
| const char *sasl_config_path, const char *sasl_config_name) |
| { |
| int i; |
| |
| qd_server_t *qd_server = NEW(qd_server_t); |
| if (qd_server == 0) |
| return 0; |
| |
| DEQ_INIT(qd_server->connections); |
| qd_server->qd = qd; |
| qd_server->log_source = qd_log_source("SERVER"); |
| qd_server->thread_count = thread_count; |
| qd_server->container_name = container_name; |
| qd_server->sasl_config_path = sasl_config_path; |
| qd_server->sasl_config_name = sasl_config_name; |
| qd_server->driver = qdpn_driver(); |
| qd_server->start_handler = 0; |
| qd_server->conn_handler = 0; |
| qd_server->pn_event_handler = 0; |
| qd_server->signal_handler = 0; |
| qd_server->ufd_handler = 0; |
| qd_server->start_context = 0; |
| qd_server->signal_context = 0; |
| qd_server->lock = sys_mutex(); |
| qd_server->cond = sys_cond(); |
| |
| qd_timer_initialize(qd_server->lock); |
| |
| qd_server->threads = NEW_PTR_ARRAY(qd_thread_t, thread_count); |
| for (i = 0; i < thread_count; i++) |
| qd_server->threads[i] = thread(qd_server, i); |
| |
| DEQ_INIT(qd_server->work_queue); |
| DEQ_INIT(qd_server->pending_timers); |
| qd_server->a_thread_is_waiting = false; |
| qd_server->threads_active = 0; |
| qd_server->pause_requests = 0; |
| qd_server->threads_paused = 0; |
| qd_server->pause_next_sequence = 0; |
| qd_server->pause_now_serving = 0; |
| qd_server->pending_signal = 0; |
| qd_server->heartbeat_timer = 0; |
| qd_server->next_connection_id = 1; |
| |
| qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name); |
| |
| return qd_server; |
| } |
| |
| |
| void qd_server_free(qd_server_t *qd_server) |
| { |
| if (!qd_server) return; |
| for (int i = 0; i < qd_server->thread_count; i++) |
| thread_free(qd_server->threads[i]); |
| qd_timer_finalize(); |
| qdpn_driver_free(qd_server->driver); |
| sys_mutex_free(qd_server->lock); |
| sys_cond_free(qd_server->cond); |
| free(qd_server->threads); |
| free(qd_server); |
| } |
| |
| |
| void qd_server_set_conn_handler(qd_dispatch_t *qd, |
| qd_conn_handler_cb_t handler, |
| qd_pn_event_handler_cb_t pn_event_handler, |
| void *handler_context) |
| { |
| qd->server->conn_handler = handler; |
| qd->server->pn_event_handler = pn_event_handler; |
| qd->server->conn_handler_context = handler_context; |
| } |
| |
| |
| void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t handler, void *context) |
| { |
| qd->server->signal_handler = handler; |
| qd->server->signal_context = context; |
| } |
| |
| |
| void qd_server_set_start_handler(qd_dispatch_t *qd, qd_thread_start_cb_t handler, void *context) |
| { |
| qd->server->start_handler = handler; |
| qd->server->start_context = context; |
| } |
| |
| |
| void qd_server_set_user_fd_handler(qd_dispatch_t *qd, qd_user_fd_handler_cb_t ufd_handler) |
| { |
| qd->server->ufd_handler = ufd_handler; |
| } |
| |
| |
| static void qd_server_announce(qd_server_t* qd_server) |
| { |
| qd_log(qd_server->log_source, QD_LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count); |
| #ifndef NDEBUG |
| qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode"); |
| #endif |
| } |
| |
| |
| void qd_server_run(qd_dispatch_t *qd) |
| { |
| qd_server_t *qd_server = qd->server; |
| |
| int i; |
| if (!qd_server) |
| return; |
| |
| assert(qd_server->conn_handler); // Server can't run without a connection handler. |
| |
| for (i = 1; i < qd_server->thread_count; i++) |
| thread_start(qd_server->threads[i]); |
| |
| qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server); |
| qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL); |
| |
| qd_server_announce(qd_server); |
| |
| thread_run((void*) qd_server->threads[0]); |
| |
| for (i = 1; i < qd_server->thread_count; i++) |
| thread_join(qd_server->threads[i]); |
| |
| for (i = 0; i < qd_server->thread_count; i++) |
| qd_server->threads[i]->canceled = 0; |
| |
| qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down"); |
| } |
| |
| |
| void qd_server_start(qd_dispatch_t *qd) |
| { |
| qd_server_t *qd_server = qd->server; |
| int i; |
| |
| if (!qd_server) |
| return; |
| |
| assert(qd_server->conn_handler); // Server can't run without a connection handler. |
| |
| for (i = 0; i < qd_server->thread_count; i++) |
| thread_start(qd_server->threads[i]); |
| |
| qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server); |
| qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL); |
| |
| qd_server_announce(qd_server); |
| } |
| |
| |
| void qd_server_stop(qd_dispatch_t *qd) |
| { |
| qd_server_t *qd_server = qd->server; |
| int idx; |
| |
| sys_mutex_lock(qd_server->lock); |
| for (idx = 0; idx < qd_server->thread_count; idx++) |
| thread_cancel(qd_server->threads[idx]); |
| sys_cond_signal_all(qd_server->cond); |
| qdpn_driver_wakeup(qd_server->driver); |
| sys_mutex_unlock(qd_server->lock); |
| |
| if (thread_server != qd_server) { |
| for (idx = 0; idx < qd_server->thread_count; idx++) |
| thread_join(qd_server->threads[idx]); |
| qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down"); |
| } |
| } |
| |
| |
| void qd_server_signal(qd_dispatch_t *qd, int signum) |
| { |
| if (!qd) |
| return; |
| |
| qd_server_t *qd_server = qd->server; |
| |
| qd_server->pending_signal = signum; |
| sys_cond_signal_all(qd_server->cond); |
| qdpn_driver_wakeup(qd_server->driver); |
| } |
| |
| |
| void qd_server_pause(qd_dispatch_t *qd) |
| { |
| qd_server_t *qd_server = qd->server; |
| |
| sys_mutex_lock(qd_server->lock); |
| |
| // |
| // Bump the request count to stop all the threads. |
| // |
| qd_server->pause_requests++; |
| int my_sequence = qd_server->pause_next_sequence++; |
| |
| // |
| // Awaken all threads that are currently blocking. |
| // |
| sys_cond_signal_all(qd_server->cond); |
| qdpn_driver_wakeup(qd_server->driver); |
| |
| // |
| // Wait for the paused thread count plus the number of threads requesting a pause to equal |
| // the total thread count. Also, don't exit the blocking loop until now_serving equals our |
| // sequence number. This ensures that concurrent pausers don't run at the same time. |
| // |
| while ((qd_server->threads_paused + qd_server->pause_requests < qd_server->thread_count) || |
| (my_sequence != qd_server->pause_now_serving)) |
| sys_cond_wait(qd_server->cond, qd_server->lock); |
| |
| sys_mutex_unlock(qd_server->lock); |
| } |
| |
| |
| void qd_server_resume(qd_dispatch_t *qd) |
| { |
| qd_server_t *qd_server = qd->server; |
| |
| sys_mutex_lock(qd_server->lock); |
| qd_server->pause_requests--; |
| qd_server->pause_now_serving++; |
| sys_cond_signal_all(qd_server->cond); |
| sys_mutex_unlock(qd_server->lock); |
| } |
| |
| |
| void qd_server_activate(qd_connection_t *ctx) |
| { |
| if (!ctx) |
| return; |
| |
| qdpn_connector_t *ctor = ctx->pn_cxtr; |
| if (!ctor) |
| return; |
| |
| if (!qdpn_connector_closed(ctor)) { |
| qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE); |
| qdpn_driver_wakeup(ctx->server->driver); |
| } |
| } |
| |
| |
| void qd_connection_set_context(qd_connection_t *conn, void *context) |
| { |
| conn->user_context = context; |
| } |
| |
| |
| void *qd_connection_get_context(qd_connection_t *conn) |
| { |
| return conn->user_context; |
| } |
| |
| |
| void *qd_connection_get_config_context(qd_connection_t *conn) |
| { |
| return conn->context; |
| } |
| |
| |
| void qd_connection_set_link_context(qd_connection_t *conn, void *context) |
| { |
| conn->link_context = context; |
| } |
| |
| |
| void *qd_connection_get_link_context(qd_connection_t *conn) |
| { |
| return conn->link_context; |
| } |
| |
| |
| pn_connection_t *qd_connection_pn(qd_connection_t *conn) |
| { |
| return conn->pn_conn; |
| } |
| |
| |
| bool qd_connection_inbound(qd_connection_t *conn) |
| { |
| return conn->listener != 0; |
| } |
| |
| |
| pn_collector_t *qd_connection_collector(qd_connection_t *conn) |
| { |
| return conn->collector; |
| } |
| |
| uint64_t qd_connection_connection_id(qd_connection_t *conn) |
| { |
| return conn->connection_id; |
| } |
| |
| |
| const qd_server_config_t *qd_connection_config(const qd_connection_t *conn) |
| { |
| if (conn->listener) |
| return conn->listener->config; |
| return conn->connector->config; |
| } |
| |
| |
| void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context) |
| { |
| qd_deferred_call_t *dc = new_qd_deferred_call_t(); |
| DEQ_ITEM_INIT(dc); |
| dc->call = call; |
| dc->context = context; |
| |
| sys_mutex_lock(conn->deferred_call_lock); |
| DEQ_INSERT_TAIL(conn->deferred_calls, dc); |
| sys_mutex_unlock(conn->deferred_call_lock); |
| |
| qd_server_activate(conn); |
| } |
| |
| |
| void qd_connection_set_event_stall(qd_connection_t *conn, bool stall) |
| { |
| conn->event_stall = stall; |
| if (!stall) |
| qd_server_activate(conn); |
| } |
| |
| |
| qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context) |
| { |
| qd_server_t *qd_server = qd->server; |
| qd_listener_t *li = new_qd_listener_t(); |
| |
| if (!li) |
| return 0; |
| |
| li->server = qd_server; |
| li->config = config; |
| li->context = context; |
| li->pn_listener = qdpn_listener(qd_server->driver, config->host, config->port, config->protocol_family, (void*) li); |
| |
| if (!li->pn_listener) { |
| free_qd_listener_t(li); |
| return 0; |
| } |
| qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s", config->host, config->port); |
| |
| return li; |
| } |
| |
| |
| void qd_server_listener_free(qd_listener_t* li) |
| { |
| if (!li) |
| return; |
| |
| qdpn_listener_free(li->pn_listener); |
| free_qd_listener_t(li); |
| } |
| |
| |
| void qd_server_listener_close(qd_listener_t* li) |
| { |
| if (li) |
| qdpn_listener_close(li->pn_listener); |
| } |
| |
| |
| qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context) |
| { |
| qd_server_t *qd_server = qd->server; |
| qd_connector_t *ct = new_qd_connector_t(); |
| |
| if (!ct) |
| return 0; |
| |
| ct->server = qd_server; |
| ct->state = CXTR_STATE_CONNECTING; |
| ct->config = config; |
| ct->context = context; |
| ct->ctx = 0; |
| ct->timer = qd_timer(qd, cxtr_try_open, (void*) ct); |
| ct->delay = 0; |
| |
| qd_timer_schedule(ct->timer, ct->delay); |
| return ct; |
| } |
| |
| |
| void qd_server_connector_free(qd_connector_t* ct) |
| { |
| // Don't free the proton connector. This will be done by the connector |
| // processing/cleanup. |
| |
| if (!ct) |
| return; |
| |
| if (ct->ctx) { |
| qdpn_connector_close(ct->ctx->pn_cxtr); |
| ct->ctx->connector = 0; |
| } |
| |
| qd_timer_free(ct->timer); |
| free_qd_connector_t(ct); |
| } |
| |
| |
| qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context) |
| { |
| qd_server_t *qd_server = qd->server; |
| qd_user_fd_t *ufd = new_qd_user_fd_t(); |
| |
| if (!ufd) |
| return 0; |
| |
| qd_connection_t *ctx = new_qd_connection_t(); |
| DEQ_ITEM_INIT(ctx); |
| ctx->server = qd_server; |
| ctx->opened = false; |
| ctx->closed = false; |
| ctx->owner_thread = CONTEXT_NO_OWNER; |
| ctx->enqueued = 0; |
| ctx->pn_conn = 0; |
| ctx->collector = 0; |
| ctx->ssl = 0; |
| ctx->listener = 0; |
| ctx->connector = 0; |
| ctx->context = 0; |
| ctx->user_context = 0; |
| ctx->link_context = 0; |
| ctx->ufd = ufd; |
| ctx->user_id = 0; |
| ctx->free_user_id = false; |
| ctx->policy_settings = 0; |
| ctx->n_senders = 0; |
| ctx->n_receivers = 0; |
| ctx->open_container = 0; |
| DEQ_INIT(ctx->deferred_calls); |
| ctx->deferred_call_lock = sys_mutex(); |
| ctx->event_stall = false; |
| ctx->policy_counted = false; |
| |
| ufd->context = context; |
| ufd->server = qd_server; |
| ufd->fd = fd; |
| ufd->pn_conn = qdpn_connector_fd(qd_server->driver, fd, (void*) ctx); |
| qdpn_driver_wakeup(qd_server->driver); |
| |
| return ufd; |
| } |
| |
| |
| void qd_user_fd_free(qd_user_fd_t *ufd) |
| { |
| if (!ufd) return; |
| qdpn_connector_close(ufd->pn_conn); |
| free_qd_user_fd_t(ufd); |
| } |
| |
| |
| void qd_user_fd_activate_read(qd_user_fd_t *ufd) |
| { |
| qdpn_connector_activate(ufd->pn_conn, QDPN_CONNECTOR_READABLE); |
| qdpn_driver_wakeup(ufd->server->driver); |
| } |
| |
| |
| void qd_user_fd_activate_write(qd_user_fd_t *ufd) |
| { |
| qdpn_connector_activate(ufd->pn_conn, QDPN_CONNECTOR_WRITABLE); |
| qdpn_driver_wakeup(ufd->server->driver); |
| } |
| |
| |
| bool qd_user_fd_is_readable(qd_user_fd_t *ufd) |
| { |
| return qdpn_connector_activated(ufd->pn_conn, QDPN_CONNECTOR_READABLE); |
| } |
| |
| |
| bool qd_user_fd_is_writeable(qd_user_fd_t *ufd) |
| { |
| return qdpn_connector_activated(ufd->pn_conn, QDPN_CONNECTOR_WRITABLE); |
| } |
| |
| |
| void qd_server_timer_pending_LH(qd_timer_t *timer) |
| { |
| DEQ_INSERT_TAIL(timer->server->pending_timers, timer); |
| qdpn_driver_wakeup(timer->server->driver); |
| } |
| |
| |
| void qd_server_timer_cancel_LH(qd_timer_t *timer) |
| { |
| DEQ_REMOVE(timer->server->pending_timers, timer); |
| } |