blob: c7716958cbb0ba2bb55c10f077dcaf2df627762a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "http_common.h"
#include <proton/listener.h>
#include <stdio.h>
ALLOC_DECLARE(qd_http_listener_t);
ALLOC_DEFINE(qd_http_listener_t);
ALLOC_DECLARE(qd_http_connector_t);
ALLOC_DEFINE(qd_http_connector_t);
static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t *config, qd_entity_t* entity)
{
char *version_str = 0;
char *aggregation_str = 0;
qd_error_clear();
ZERO(config);
#define CHECK() if (qd_error_code()) goto error
config->name = qd_entity_get_string(entity, "name"); CHECK();
config->host = qd_entity_get_string(entity, "host"); CHECK();
config->port = qd_entity_get_string(entity, "port"); CHECK();
config->address = qd_entity_get_string(entity, "address"); CHECK();
config->site = qd_entity_opt_string(entity, "siteId", 0); CHECK();
version_str = qd_entity_get_string(entity, "protocolVersion"); CHECK();
aggregation_str = qd_entity_opt_string(entity, "aggregation", 0); CHECK();
config->event_channel = qd_entity_opt_bool(entity, "eventChannel", false); CHECK();
config->host_override = qd_entity_opt_string(entity, "hostOverride", 0); CHECK();
if (strcmp(version_str, "HTTP2") == 0) {
config->version = VERSION_HTTP2;
} else {
config->version = VERSION_HTTP1;
}
free(version_str);
version_str = 0;
if (aggregation_str && strcmp(aggregation_str, "json") == 0) {
config->aggregation = QD_AGGREGATION_JSON;
} else if (aggregation_str && strcmp(aggregation_str, "multipart") == 0) {
config->aggregation = QD_AGGREGATION_MULTIPART;
} else {
config->aggregation = QD_AGGREGATION_NONE;
}
free(aggregation_str);
aggregation_str = 0;
int hplen = strlen(config->host) + strlen(config->port) + 2;
config->host_port = malloc(hplen);
snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
return QD_ERROR_NONE;
error:
qd_http_free_bridge_config(config);
free(version_str);
return qd_error_code();
}
void qd_http_free_bridge_config(qd_http_bridge_config_t *config)
{
if (!config) {
return;
}
free(config->host);
free(config->port);
free(config->name);
free(config->address);
free(config->site);
free(config->host_override);
free(config->host_port);
}
//
// HTTP Listener Management (HttpListenerEntity)
//
qd_http_listener_t *qd_dispatch_configure_http_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_http_listener_t *listener = 0;
qd_http_bridge_config_t config;
if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
"Unable to create http listener: %s", qd_error_message());
return 0;
}
switch (config.version) {
case VERSION_HTTP1:
listener = qd_http1_configure_listener(qd, &config, entity);
break;
case VERSION_HTTP2:
listener = qd_http2_configure_listener(qd, &config, entity);
break;
}
if (!listener)
qd_http_free_bridge_config(&config);
return listener;
}
void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl)
{
qd_http_listener_t *listener = (qd_http_listener_t*) impl;
if (listener) {
switch (listener->config.version) {
case VERSION_HTTP1:
qd_http1_delete_listener(qd, listener);
break;
case VERSION_HTTP2:
qd_http2_delete_listener(qd, listener);
break;
}
}
}
qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl)
{
return QD_ERROR_NONE;
}
//
// HTTP Connector Management (HttpConnectorEntity)
//
qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_http_connector_t *conn = 0;
qd_http_bridge_config_t config;
if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
"Unable to create http connector: %s", qd_error_message());
return 0;
}
switch (config.version) {
case VERSION_HTTP1:
conn = qd_http1_configure_connector(qd, &config, entity);
break;
case VERSION_HTTP2:
conn = qd_http2_configure_connector(qd, &config, entity);
break;
}
if (!conn)
qd_http_free_bridge_config(&config);
return conn;
}
void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl)
{
qd_http_connector_t *conn = (qd_http_connector_t*) impl;
if (conn) {
switch (conn->config.version) {
case VERSION_HTTP1:
qd_http1_delete_connector(qd, conn);
break;
case VERSION_HTTP2:
qd_http2_delete_connector(qd, conn);
break;
}
}
}
qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl)
{
return QD_ERROR_NONE;
}
//
// qd_http_listener_t constructor
//
qd_http_listener_t *qd_http_listener(qd_server_t *server, qd_server_event_handler_t handler)
{
qd_http_listener_t *li = new_qd_http_listener_t();
if (!li)
return 0;
ZERO(li);
li->pn_listener = pn_listener();
if (!li->pn_listener) {
free_qd_http_listener_t(li);
return 0;
}
sys_atomic_init(&li->ref_count, 1);
li->server = server;
li->context.context = li;
li->context.handler = handler;
pn_listener_set_context(li->pn_listener, &li->context);
return li;
}
void qd_http_listener_decref(qd_http_listener_t* li)
{
if (li && sys_atomic_dec(&li->ref_count) == 1) {
qd_http_free_bridge_config(&li->config);
free_qd_http_listener_t(li);
}
}
//
// qd_http_connector_t constructor
//
qd_http_connector_t *qd_http_connector(qd_server_t *server)
{
qd_http_connector_t *c = new_qd_http_connector_t();
if (!c) return 0;
ZERO(c);
sys_atomic_init(&c->ref_count, 1);
c->server = server;
return c;
}
void qd_http_connector_decref(qd_http_connector_t* c)
{
if (c && sys_atomic_dec(&c->ref_count) == 1) {
qd_http_free_bridge_config(&c->config);
free_qd_http_connector_t(c);
}
}
typedef struct qdr_http_method_status_t qdr_http_method_status_t;
struct qdr_http_method_status_t {
DEQ_LINKS(qdr_http_method_status_t);
char *key;
uint64_t requests;
};
DEQ_DECLARE(qdr_http_method_status_t, qdr_http_method_status_list_t);
typedef struct qdr_http_request_info_t qdr_http_request_info_t;
struct qdr_http_request_info_t {
DEQ_LINKS(qdr_http_request_info_t);
char *key;
char *address;
char *host;
char *site;
bool ingress;
uint64_t requests;
uint64_t bytes_in;
uint64_t bytes_out;
uint64_t max_latency;
qdr_http_method_status_list_t detail;
};
DEQ_DECLARE(qdr_http_request_info_t, qdr_http_request_info_list_t);
#define QDR_HTTP_REQUEST_INFO_NAME 0
#define QDR_HTTP_REQUEST_INFO_IDENTITY 1
#define QDR_HTTP_REQUEST_INFO_ADDRESS 2
#define QDR_HTTP_REQUEST_INFO_HOST 3
#define QDR_HTTP_REQUEST_INFO_SITE 4
#define QDR_HTTP_REQUEST_INFO_DIRECTION 5
#define QDR_HTTP_REQUEST_INFO_REQUESTS 6
#define QDR_HTTP_REQUEST_INFO_BYTES_IN 7
#define QDR_HTTP_REQUEST_INFO_BYTES_OUT 8
#define QDR_HTTP_REQUEST_INFO_MAX_LATENCY 9
#define QDR_HTTP_REQUEST_INFO_DETAIL 10
const char * const QDR_HTTP_REQUEST_INFO_DIRECTION_IN = "in";
const char * const QDR_HTTP_REQUEST_INFO_DIRECTION_OUT = "out";
const char *qdr_http_request_info_columns[] =
{"name",
"identity",
"address",
"host",
"site",
"direction",
"requests",
"bytesIn",
"bytesOut",
"maxLatency",
"details",
0};
const char *HTTP_REQUEST_INFO_TYPE = "org.apache.qpid.dispatch.httpRequestInfo";
typedef struct {
qdr_http_request_info_list_t records;
} http_request_info_records_t;
static http_request_info_records_t* request_info = 0;
static http_request_info_records_t *_get_request_info()
{
if (!request_info) {
request_info = NEW(http_request_info_records_t);
DEQ_INIT(request_info->records);
}
return request_info;
}
static void insert_column(qdr_core_t *core, qdr_http_request_info_t *record, int col, qd_composed_field_t *body)
{
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Insert column %i for %p", col, (void*) record);
if (!record)
return;
switch(col) {
case QDR_HTTP_REQUEST_INFO_NAME:
qd_compose_insert_string(body, record->key);
break;
case QDR_HTTP_REQUEST_INFO_IDENTITY: {
qd_compose_insert_string(body, record->key);
break;
}
case QDR_HTTP_REQUEST_INFO_ADDRESS:
qd_compose_insert_string(body, record->address);
break;
case QDR_HTTP_REQUEST_INFO_HOST:
qd_compose_insert_string(body, record->host);
break;
case QDR_HTTP_REQUEST_INFO_SITE:
qd_compose_insert_string(body, record->site);
break;
case QDR_HTTP_REQUEST_INFO_DIRECTION:
if (record->ingress)
qd_compose_insert_string(body, QDR_HTTP_REQUEST_INFO_DIRECTION_IN);
else
qd_compose_insert_string(body, QDR_HTTP_REQUEST_INFO_DIRECTION_OUT);
break;
case QDR_HTTP_REQUEST_INFO_REQUESTS:
qd_compose_insert_uint(body, record->requests);
break;
case QDR_HTTP_REQUEST_INFO_BYTES_IN:
qd_compose_insert_uint(body, record->bytes_in);
break;
case QDR_HTTP_REQUEST_INFO_BYTES_OUT:
qd_compose_insert_uint(body, record->bytes_out);
break;
case QDR_HTTP_REQUEST_INFO_MAX_LATENCY:
qd_compose_insert_uint(body, record->max_latency);
break;
case QDR_HTTP_REQUEST_INFO_DETAIL:
qd_compose_start_map(body);
for (qdr_http_method_status_t *item = DEQ_HEAD(record->detail); item; item = DEQ_NEXT(item)) {
qd_compose_insert_string(body, item->key);
qd_compose_insert_int(body, item->requests);
}
qd_compose_end_map(body);
break;
}
}
static void write_list(qdr_core_t *core, qdr_query_t *query, qdr_http_request_info_t *record)
{
qd_composed_field_t *body = query->body;
qd_compose_start_list(body);
if (record) {
int i = 0;
while (query->columns[i] >= 0) {
insert_column(core, record, query->columns[i], body);
i++;
}
}
qd_compose_end_list(body);
}
static void write_map(qdr_core_t *core,
qdr_http_request_info_t *record,
qd_composed_field_t *body,
const char *qdr_connection_columns[])
{
qd_compose_start_map(body);
for(int i = 0; i < QDR_HTTP_REQUEST_INFO_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_connection_columns[i]);
insert_column(core, record, i, body);
}
qd_compose_end_map(body);
}
static void advance(qdr_query_t *query, qdr_http_request_info_t *record)
{
if (record) {
query->next_offset++;
record = DEQ_NEXT(record);
query->more = !!record;
}
else {
query->more = false;
}
}
static qdr_http_request_info_t *find_by_identity(qdr_core_t *core, qd_iterator_t *identity)
{
if (!identity)
return 0;
qdr_http_request_info_t *record = DEQ_HEAD(_get_request_info()->records);
while (record) {
// Convert the passed in identity to a char*
if (qd_iterator_equal(identity, (const unsigned char*) record->key))
break;
record = DEQ_NEXT(record);
}
return record;
}
void qdra_http_request_info_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "query for first http request info (%i)", offset);
query->status = QD_AMQP_OK;
if (offset >= DEQ_SIZE(_get_request_info()->records)) {
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
}
qdr_http_request_info_t *record = DEQ_HEAD(_get_request_info()->records);
for (int i = 0; i < offset && record; i++)
record = DEQ_NEXT(record);
assert(record);
if (record) {
write_list(core, query, record);
query->next_offset = offset;
advance(query, record);
} else {
query->more = false;
}
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_http_request_info_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
qdr_http_request_info_t *record = 0;
if (query->next_offset < DEQ_SIZE(_get_request_info()->records)) {
record = DEQ_HEAD(_get_request_info()->records);
for (int i = 0; i < query->next_offset && record; i++)
record = DEQ_NEXT(record);
}
if (record) {
write_list(core, query, record);
advance(query, record);
} else {
query->more = false;
}
qdr_agent_enqueue_response_CT(core, query);
}
void qdra_http_request_info_get_CT(qdr_core_t *core,
qd_iterator_t *name,
qd_iterator_t *identity,
qdr_query_t *query,
const char *qdr_http_request_info_columns[])
{
qdr_http_request_info_t *record = 0;
if (!identity) {
query->status = QD_AMQP_BAD_REQUEST;
query->status.description = "Name not supported. Identity required";
qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s", HTTP_REQUEST_INFO_TYPE, query->status.description);
} else {
record = find_by_identity(core, identity);
if (record == 0) {
query->status = QD_AMQP_NOT_FOUND;
} else {
write_map(core, record, query->body, qdr_http_request_info_columns);
query->status = QD_AMQP_OK;
}
}
qdr_agent_enqueue_response_CT(core, query);
}
static const char* UNKNOWN = "unknown";
static qdr_http_method_status_t* _new_qdr_http_method_status_t(const char *const method, int status)
{
qdr_http_method_status_t* record = NEW(qdr_http_method_status_t);
ZERO(record);
if (status >= 600 || status < 100) {
status = 500;
}
if (method) {
size_t key_len = strlen(method) + 5;
record->key = malloc(key_len);
snprintf(record->key, key_len, "%s:%03i", method, status);
} else {
record->key = qd_strdup(UNKNOWN);
}
return record;
}
static void _free_qdr_http_method_status(qdr_http_method_status_t* record)
{
free(record->key);
free(record);
}
static void _free_qdr_http_request_info(qdr_http_request_info_t* record)
{
if (record->key) {
free(record->key);
}
if (record->address) {
free(record->address);
}
if (record->host) {
free(record->host);
}
if (record->site) {
free(record->site);
}
for (qdr_http_method_status_t *item = DEQ_HEAD(record->detail); item; item = DEQ_HEAD(record->detail)) {
_free_qdr_http_method_status(item);
}
free(record);
}
static void _update_http_method_status_detail(qdr_http_method_status_list_t *detail, qdr_http_method_status_t *addition)
{
bool updated = false;
for (qdr_http_method_status_t *item = DEQ_HEAD(*detail); item && !updated; item = DEQ_NEXT(item)) {
if (strcmp(item->key, addition->key) == 0) {
item->requests += addition->requests;
_free_qdr_http_method_status(addition);
updated = true;
}
}
if (!updated) {
DEQ_INSERT_TAIL(*detail, addition);
}
}
static bool _update_qdr_http_request_info(qdr_http_request_info_t* record, qdr_http_request_info_t* additions)
{
if (strcmp(record->key, additions->key) == 0) {
record->requests += additions->requests;
record->bytes_in += additions->bytes_in;
record->bytes_out += additions->bytes_out;
if (additions->max_latency > record->max_latency) {
record->max_latency = additions->max_latency;
}
for (qdr_http_method_status_t *item = DEQ_HEAD(additions->detail); item; item = DEQ_HEAD(additions->detail)) {
DEQ_REMOVE_HEAD(additions->detail);
_update_http_method_status_detail(&record->detail, item);
}
return true;
} else {
return false;
}
}
static void _add_http_request_info_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_http_request_info_t *update = (qdr_http_request_info_t*) action->args.general.context_1;
bool updated = false;
for (qdr_http_request_info_t *record = DEQ_HEAD(_get_request_info()->records); record && !updated; record = DEQ_NEXT(record)) {
if (_update_qdr_http_request_info(record, update)) {
updated = true;
_free_qdr_http_request_info(update);
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Updated http request info %s", record->key);
}
}
if (!updated) {
DEQ_INSERT_TAIL(_get_request_info()->records, update);
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Added http request info %s (%zu)", update->key, DEQ_SIZE(_get_request_info()->records));
}
}
static void _add_http_request_info(qdr_core_t *core, qdr_http_request_info_t* record)
{
qdr_action_t *action = qdr_action(_add_http_request_info_CT, "add_http_request_info");
action->args.general.context_1 = record;
qdr_action_enqueue(core, action);
}
static qdr_http_request_info_t* _new_qdr_http_request_info_t()
{
qdr_http_request_info_t* record = NEW(qdr_http_request_info_t);
ZERO(record);
DEQ_INIT(record->detail);
return record;
}
static char *_record_key(const char *host, const char *address, const char* site, bool ingress)
{
if (!host) return 0;
size_t hostlen = strlen(host);
size_t addresslen = address ? strlen(address) + 1 : 0;
size_t sitelen = site ? strlen(site) + 1 : 0;
char *key = malloc(hostlen + addresslen + sitelen + 3);
size_t i = 0;
key[i++] = ingress ? 'i' : 'o';
key[i++] = '_';
strcpy(key+i, host);
i += hostlen;
if (address) {
key[i++] = '_';
strcpy(key+i, address);
i += (addresslen-1);
}
if (site) {
key[i++] = '@';
strcpy(key+i, site);
}
return key;
}
void qd_http_record_request(qdr_core_t *core, const char * method, uint32_t status_code, const char *address, const char *host,
const char *local_site, const char *remote_site, bool ingress,
uint64_t bytes_in, uint64_t bytes_out, uint64_t latency)
{
qdr_http_request_info_t* record = _new_qdr_http_request_info_t();
record->ingress = ingress;
record->address = address ? qd_strdup(address) : 0;
record->host = host ? qd_strdup(host) : 0;
record->site = remote_site ? qd_strdup(remote_site) : 0;
record->key = _record_key(record->host, record->address, remote_site, record->ingress);
record->requests = 1;
record->bytes_in = bytes_in;
record->bytes_out = bytes_out;
record->max_latency = latency;
qdr_http_method_status_t *detail = _new_qdr_http_method_status_t(method, (int) status_code);
detail->requests = 1;
DEQ_INSERT_TAIL(record->detail, detail);
qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Adding http request info %s", record->key);
_add_http_request_info(core, record);
}
char *qd_get_host_from_host_port(const char *host_port)
{
char *end = strchr(host_port, ':');
if (end == NULL) {
return 0;
} else {
size_t len = end - host_port;
char *host = malloc(len + 1);
strncpy(host, host_port, len);
host[len] = '\0';
return host;
}
}