blob: 27b25cde25bcc6ac226d038b7acac2681d47ae8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "python_private.h"
#include "http1_private.h"
#include "adaptors/adaptor_utils.h"
#include <proton/listener.h>
#include <proton/proactor.h>
//
// This file contains code specific to HTTP client processing. The raw
// connection is terminated at an HTTP client, not an HTTP server.
//
#define DEFAULT_CAPACITY 250
#define LISTENER_BACKLOG 16
const char *CONTENT_LENGTH_KEY = "Content-Length";
const char *POST_METHOD = "POST";
//
// State for a single response message to be sent to the client via the raw
// connection.
//
typedef struct _client_response_msg_t {
DEQ_LINKS(struct _client_response_msg_t);
qdr_delivery_t *dlv; // from core via core_link_deliver
uint64_t dispo; // set by adaptor on encode complete
bool headers_encoded; // all headers completely encoded
bool encoded; // true when full response encoded
// HTTP encoded message data
qdr_http1_out_data_fifo_t out_data;
} _client_response_msg_t;
ALLOC_DECLARE(_client_response_msg_t);
ALLOC_DEFINE(_client_response_msg_t);
DEQ_DECLARE(_client_response_msg_t, _client_response_msg_list_t);
//
// State for an HTTP/1.x Request+Response exchange, client facing
//
typedef struct _client_request_t {
qdr_http1_request_base_t base;
// The request arrives via the raw connection. These fields are used to
// build the message and deliver it into the core.
//
qd_message_t *request_msg; // holds inbound message as it is built
qdr_delivery_t *request_dlv; // qdr_link_deliver()
qd_composed_field_t *request_props; // holds HTTP headers as they arrive
uint64_t request_dispo; // set by core (core_update_delivery)
bool request_settled; // set by core (core_update_delivery)
// A single request may result in more than one response (1xx Continue for
// example). These responses are written to the raw connection from HEAD
// to TAIL.
//
_client_response_msg_list_t responses;
uint32_t error_code;
char *error_text;
bool codec_completed; // encoder/decoder done
bool cancelled;
bool close_on_complete; // close the conn when this request is complete
bool conn_close_hdr; // add Connection: close to response msg
uint32_t version_major;
uint32_t version_minor;
} _client_request_t;
ALLOC_DECLARE(_client_request_t);
ALLOC_DEFINE(_client_request_t);
static void _client_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
static void _client_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data);
static int _client_rx_request_cb(h1_codec_request_state_t *lib_rs,
const char *method,
const char *target,
uint32_t version_major,
uint32_t version_minor);
static int _client_rx_response_cb(h1_codec_request_state_t *lib_rs,
int status_code,
const char *reason_phrase,
uint32_t version_major,
uint32_t version_minor);
static int _client_rx_header_cb(h1_codec_request_state_t *lib_rs, const char *key, const char *value);
static int _client_rx_headers_done_cb(h1_codec_request_state_t *lib_rs, bool has_body);
static int _client_rx_body_cb(h1_codec_request_state_t *lib_rs, qd_buffer_list_t *body, uintmax_t len, bool more);
static void _client_rx_done_cb(h1_codec_request_state_t *lib_rs);
static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled);
static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg);
static void _client_request_free(_client_request_t *req);
static void _write_pending_response(_client_request_t *req);
static void _deliver_request(qdr_http1_connection_t *hconn, _client_request_t *req);
////////////////////////////////////////////////////////
// HTTP/1.x Client Listener
////////////////////////////////////////////////////////
// Listener received connection request from client
//
static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
{
qdr_http1_connection_t *hconn = new_qdr_http1_connection_t();
ZERO(hconn);
hconn->type = HTTP1_CONN_CLIENT;
hconn->qd_server = li->server;
hconn->adaptor = qdr_http1_adaptor;
hconn->handler_context.handler = &_handle_connection_events;
hconn->handler_context.context = hconn;
hconn->client.next_msg_id = 1;
// configure the HTTP/1.x library
h1_codec_config_t config = {0};
config.type = HTTP1_CONN_CLIENT;
config.tx_buffers = _client_tx_buffers_cb;
config.tx_stream_data = _client_tx_stream_data_cb;
config.rx_request = _client_rx_request_cb;
config.rx_response = _client_rx_response_cb;
config.rx_header = _client_rx_header_cb;
config.rx_headers_done = _client_rx_headers_done_cb;
config.rx_body = _client_rx_body_cb;
config.rx_done = _client_rx_done_cb;
config.request_complete = _client_request_complete_cb;
hconn->http_conn = h1_codec_connection(&config, hconn);
if (!hconn->http_conn) {
qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
"Failed to initialize HTTP/1.x library - connection refused.");
qdr_http1_connection_free(hconn);
return 0;
}
hconn->cfg.host = qd_strdup(li->config.host);
hconn->cfg.port = qd_strdup(li->config.port);
hconn->cfg.address = qd_strdup(li->config.address);
hconn->cfg.site = li->config.site ? qd_strdup(li->config.site) : 0;
hconn->cfg.event_channel = li->config.event_channel;
hconn->cfg.aggregation = li->config.aggregation;
hconn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
sys_mutex_unlock(qdr_http1_adaptor->lock);
// we'll create a QDR connection and links once the raw connection activates
return hconn;
}
// Process proactor events for the client listener
//
static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void *context)
{
qd_log_source_t *log = qdr_http1_adaptor->log;
qd_http_listener_t *li = (qd_http_listener_t*) context;
const char *host_port = li->config.host_port;
qd_log(log, QD_LOG_DEBUG, "HTTP/1.x Client Listener Event %s\n", pn_event_type_name(pn_event_type(e)));
switch (pn_event_type(e)) {
case PN_LISTENER_OPEN: {
qd_log(log, QD_LOG_NOTICE, "Listening for HTTP/1.x client requests on %s", host_port);
break;
}
case PN_LISTENER_ACCEPT: {
qd_log(log, QD_LOG_INFO, "Accepting HTTP/1.x connection on %s", host_port);
qdr_http1_connection_t *hconn = _create_client_connection(li);
if (hconn) {
// Note: the proactor may schedule the hconn on another thread
// during this call!
pn_listener_raw_accept(li->pn_listener, hconn->raw_conn);
}
break;
}
case PN_LISTENER_CLOSE: {
if (li->pn_listener) {
pn_condition_t *cond = pn_listener_condition(li->pn_listener);
if (pn_condition_is_set(cond)) {
qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
pn_condition_get_description(cond),
pn_condition_get_name(cond));
} else {
qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
}
pn_listener_set_context(li->pn_listener, 0);
li->pn_listener = 0;
}
break;
}
default:
break;
}
}
// Management Agent API - Create
//
qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
qd_http_listener_t *li = qd_http_listener(qd->server, &_handle_listener_events);
if (!li) {
qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR, "Unable to create http listener: no memory");
return 0;
}
li->config = *config;
DEQ_ITEM_INIT(li);
sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_INSERT_TAIL(qdr_http1_adaptor->listeners, li);
sys_mutex_unlock(qdr_http1_adaptor->lock);
qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Configured HTTP_ADAPTOR listener on %s", (&li->config)->host_port);
// Note: the proactor may schedule the pn_listener on another thread during this call
pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, LISTENER_BACKLOG);
return li;
}
// Management Agent API - Delete
//
void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_listener_t *li)
{
if (li) {
if (li->pn_listener) {
pn_listener_close(li->pn_listener);
li->pn_listener = 0;
}
sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_REMOVE(qdr_http1_adaptor->listeners, li);
sys_mutex_unlock(qdr_http1_adaptor->lock);
qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port);
qd_http_listener_decref(li);
}
}
////////////////////////////////////////////////////////
// Raw Connector Events
////////////////////////////////////////////////////////
// Raw Connection Initialization
//
static void _setup_client_connection(qdr_http1_connection_t *hconn)
{
hconn->client.client_ip_addr = qda_raw_conn_get_address(hconn->raw_conn);
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
false, //bool is_authenticated,
true, //bool opened,
"", //char *sasl_mechanisms,
QD_INCOMING, //qd_direction_t dir,
hconn->client.client_ip_addr, //const char *host,
"", //const char *ssl_proto,
"", //const char *ssl_cipher,
"", //const char *user,
"HTTP/1.x Adaptor", //const char *container,
0, //pn_data_t *connection_properties,
0, //int ssl_ssf,
false, //bool ssl,
"", // peer router version,
false); // streaming links
hconn->conn_id = qd_server_allocate_connection_id(hconn->qd_server);
hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
qdr_http1_adaptor->adaptor,
true, // incoming
QDR_ROLE_NORMAL,
1, //cost
hconn->conn_id,
0, // label
0, // remote container id
false, // strip annotations in
false, // strip annotations out
DEFAULT_CAPACITY,
0, // vhost
0, // policy_spec
info,
0, // bind context
0); // bind token
qdr_connection_set_context(hconn->qdr_conn, hconn);
qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id);
// simulate a client subscription for reply-to
qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
hconn->out_link = qdr_link_first_attach(hconn->qdr_conn,
QD_OUTGOING,
dynamic_source, //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target,
"http1.client.reply-to", //const char *name,
0, //const char *terminus_addr,
false, // no-route
NULL, // initial delivery
&(hconn->out_link_id));
qdr_link_set_context(hconn->out_link, hconn);
qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] HTTP client response link created",
hconn->conn_id, hconn->out_link_id);
// simulate a client publisher link to the HTTP server:
qdr_terminus_t *target = qdr_terminus(0);
if (hconn->cfg.event_channel) {
//For an event channel, we always want to be able to handle
//incoming requests. We use an anonymous publisher so that we
//get credit regardless of there being consumers.
qdr_terminus_set_address(target, NULL);
} else {
qdr_terminus_set_address(target, hconn->cfg.address);
}
hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
QD_INCOMING,
qdr_terminus(0), //qdr_terminus_t *source,
target, //qdr_terminus_t *target,
"http1.client.in", //const char *name,
0, //const char *terminus_addr,
false,
NULL,
&(hconn->in_link_id));
qdr_link_set_context(hconn->in_link, hconn);
qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] HTTP client request link created",
hconn->conn_id, hconn->in_link_id);
// wait until the dynamic reply-to address is returned in the second attach
// to grant buffers to the raw connection
}
// Proton Connection Event Handler
//
static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
{
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
qd_log_source_t *log = qdr_http1_adaptor->log;
if (!hconn) return;
qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP client proactor event %s", hconn->conn_id, pn_event_type_name(pn_event_type(e)));
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
_setup_client_connection(hconn);
break;
}
case PN_RAW_CONNECTION_CLOSED_READ:
case PN_RAW_CONNECTION_CLOSED_WRITE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for %s", hconn->conn_id,
pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
? "reading" : "writing");
pn_raw_connection_close(hconn->raw_conn);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Disconnected", hconn->conn_id);
pn_raw_connection_set_context(hconn->raw_conn, 0);
// prevent core from waking this connection
sys_mutex_lock(qdr_http1_adaptor->lock);
qdr_connection_set_context(hconn->qdr_conn, 0);
hconn->raw_conn = 0;
sys_mutex_unlock(qdr_http1_adaptor->lock);
// at this point the core can no longer activate this connection
if (hconn->out_link) {
qdr_link_set_context(hconn->out_link, 0);
hconn->out_link = 0;
}
if (hconn->in_link) {
qdr_link_set_context(hconn->in_link, 0);
hconn->in_link = 0;
}
if (hconn->qdr_conn) {
qdr_connection_set_context(hconn->qdr_conn, 0);
qdr_connection_closed(hconn->qdr_conn);
hconn->qdr_conn = 0;
}
qdr_http1_connection_free(hconn);
return; // hconn no longer valid
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id);
_write_pending_response((_client_request_t*) DEQ_HEAD(hconn->requests));
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
// @TODO(kgiusti): backpressure if no credit
if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) {
int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
hconn->conn_id, granted);
}
break;
}
case PN_RAW_CONNECTION_WAKE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
while (qdr_connection_process(hconn->qdr_conn)) {}
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id);
break;
}
case PN_RAW_CONNECTION_READ: {
qd_buffer_list_t blist;
uintmax_t length;
qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
if (length) {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client",
hconn->conn_id, hconn->in_link_id, length);
hconn->in_http1_octets += length;
int error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
if (error)
qdr_http1_close_connection(hconn, "Incoming request message failed to parse");
}
break;
}
case PN_RAW_CONNECTION_WRITTEN: {
qdr_http1_free_written_buffers(hconn);
break;
}
default:
break;
}
// Check the head request for completion and advance to next request if
// done.
// remove me:
if (hconn) {
_client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
if (hreq) {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request msg-id=%"PRIu64" complete????",
hconn->conn_id, hreq->base.msg_id);
qd_log(log, QD_LOG_DEBUG, " codec=%s req-dlv=%p resp-dlv=%d req_msg=%p %s",
hreq->codec_completed ? "Done" : "Not Done",
(void*)hreq->request_dlv,
(int)DEQ_SIZE(hreq->responses),
(void*)hreq->request_msg,
hreq->cancelled ? "Cancelled" : "Not Cancelled");
}
}
// check if the head request is done
bool need_close = false;
_client_request_t *hreq = (_client_request_t *)DEQ_HEAD(hconn->requests);
if (hreq) {
if (hreq->cancelled) {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" cancelled",
hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
need_close = true;
} else {
if (hreq->error_code) {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Responding with %i %s", hconn->conn_id,
hconn->out_link_id, hreq->error_code, hreq->error_text);
_client_response_msg_t *rmsg = new__client_response_msg_t();
ZERO(rmsg);
DEQ_INIT(rmsg->out_data.fifo);
DEQ_INSERT_TAIL(hreq->responses, rmsg);
qdr_http1_error_response(&hreq->base, hreq->error_code, hreq->error_text);
_write_pending_response(hreq);
}
// Can we retire the current outgoing response messages?
//
_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
while (rmsg &&
rmsg->dispo &&
DEQ_IS_EMPTY(rmsg->out_data.fifo) &&
hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
// response message fully received and forwarded to client
if (rmsg->dlv) {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" settling response, dispo=0x%"PRIx64,
hconn->conn_id, hconn->out_link_id, hreq->base.msg_id, rmsg->dispo);
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
rmsg->dlv,
rmsg->dispo,
true, // settled,
0, // delivery state
false);
}
qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
_client_response_msg_free(hreq, rmsg);
rmsg = DEQ_HEAD(hreq->responses);
}
if (hreq->codec_completed &&
DEQ_IS_EMPTY(hreq->responses) &&
hreq->request_settled) {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
hconn->conn_id, hreq->base.msg_id);
need_close = hreq->close_on_complete;
_client_request_free(hreq);
}
}
}
if (need_close)
qdr_http1_close_connection(hconn, "Connection: close");
else {
hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
if (hreq) {
if (hreq->request_msg && hconn->in_link_credit > 0) {
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
hconn->in_link_credit -= 1;
_deliver_request(hconn, hreq);
}
_write_pending_response(hreq);
}
}
}
////////////////////////////////////////////////////////
// HTTP/1.x Encoder/Decoder Callbacks
////////////////////////////////////////////////////////
// Encoder callback: send blist buffers (response msg) to client endpoint
//
static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *blist, unsigned int len)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (!hconn->raw_conn) {
// client connection has been lost
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"] Discarding outgoing data - client connection closed", hconn->conn_id);
qd_buffer_list_free_buffers(blist);
return;
}
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] %u request octets encoded",
hconn->conn_id, hconn->out_link_id, len);
_client_response_msg_t *rmsg;
if (hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
// responses are decoded one at a time - the current response it at the
// tail of the response list
rmsg = DEQ_TAIL(hreq->responses);
} else {
// when responses are aggregated the buffers don't need to be
// correlated to specific responses as they will all be
// written out together, so can just use the head of the
// response list
rmsg = DEQ_HEAD(hreq->responses);
}
assert(rmsg);
qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist);
// if this happens to be the current outgoing response try writing to the
// raw connection
if (rmsg == DEQ_HEAD(hreq->responses))
_write_pending_response(hreq);
}
// Encoder callback: send stream_data buffers (response msg) to client endpoint
//
static void _client_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (!hconn->raw_conn) {
// client connection has been lost
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"] Discarding outgoing data - client connection closed", hconn->conn_id);
qd_message_stream_data_release(stream_data);
return;
}
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Sending body data to client",
hconn->conn_id, hconn->out_link_id);
_client_response_msg_t *rmsg;
if (hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
// responses are decoded one at a time - the current response it at the
// tail of the response list
rmsg = DEQ_TAIL(hreq->responses);
} else {
// when responses are aggregated the buffers don't need to be
// correlated to specific responses as they will all be
// written out together, so can just use the head of the
// response list
rmsg = DEQ_HEAD(hreq->responses);
}
assert(rmsg);
qdr_http1_enqueue_stream_data(&rmsg->out_data, stream_data);
// if this happens to be the current outgoing response try writing to the
// raw connection
if (rmsg == DEQ_HEAD(hreq->responses))
_write_pending_response(hreq);
}
// Called when decoding an HTTP request from a client. This indicates the
// start of a new request message.
//
static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
const char *method,
const char *target,
uint32_t version_major,
uint32_t version_minor)
{
h1_codec_connection_t *h1c = h1_codec_request_state_get_connection(hrs);
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)h1_codec_connection_get_context(h1c);
_client_request_t *creq = new__client_request_t();
ZERO(creq);
creq->base.start = qd_timer_now();
creq->base.msg_id = hconn->client.next_msg_id++;
creq->base.lib_rs = hrs;
creq->base.hconn = hconn;
creq->close_on_complete = (version_minor == 0);
creq->version_major = version_major;
creq->version_minor = version_minor;
DEQ_INIT(creq->responses);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s target=%s version=%"PRIi32".%"PRIi32,
hconn->conn_id, creq->base.msg_id, method, target, version_major, version_minor);
if (hconn->cfg.event_channel) {
if (strcasecmp(method, POST_METHOD) == 0) {
creq->error_code = 204;
creq->error_text = "Event posted.";
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Event posted", hconn->conn_id);
} else {
creq->error_code = 405;
creq->error_text = "Invalid method for event channel, only POST is allowed.";
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"] HTTP %s request not allowed for event channel", hconn->conn_id, method);
}
}
creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_compose_start_map(creq->request_props);
{
// OASIS specifies this value as "1.1" by default...
char version[64];
snprintf(version, 64, "%"PRIi32".%"PRIi32, version_major, version_minor);
qd_compose_insert_symbol(creq->request_props, REQUEST_HEADER_KEY);
qd_compose_insert_string(creq->request_props, version);
qd_compose_insert_symbol(creq->request_props, TARGET_HEADER_KEY);
qd_compose_insert_string(creq->request_props, target);
}
h1_codec_request_state_set_context(hrs, (void*) creq);
DEQ_INSERT_TAIL(hconn->requests, &creq->base);
return 0;
}
// Cannot happen for a client connection!
static int _client_rx_response_cb(h1_codec_request_state_t *hrs,
int status_code,
const char *reason_phrase,
uint32_t version_major,
uint32_t version_minor)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
"[C%"PRIu64"][L%"PRIu64"] Spurious HTTP response received from client",
hconn->conn_id, hconn->in_link_id);
return HTTP1_STATUS_BAD_REQ;
}
// called for each decoded HTTP header.
//
static int _client_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP request header received: key='%s' value='%s'",
hconn->conn_id, hconn->in_link_id, key, value);
if (strcasecmp(key, "Connection") == 0) {
// We need to filter the connection header out. But first see if
// client requested that the connection be closed after the response
// arrives.
//
// @TODO(kgiusti): also have to remove other headers given in value!
// @TODO(kgiusti): do we need to support keep-alive on 1.0 connections?
//
size_t len;
const char *token = h1_codec_token_list_next(value, &len, &value);
while (token) {
if (len == 5 && strncasecmp(token, "close", 5) == 0) {
hreq->close_on_complete = true;
hreq->conn_close_hdr = true;
break;
}
token = h1_codec_token_list_next(value, &len, &value);
}
} else {
qd_compose_insert_symbol(hreq->request_props, key);
qd_compose_insert_string(hreq->request_props, value);
}
return 0;
}
// Called after the last header is decoded, before decoding any body data.
// At this point there is enough data to start forwarding the message to
// the router.
//
static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) {
return 0;
}
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP request headers done.",
hconn->conn_id, hconn->in_link_id);
// now that all the headers have been received we can construct
// the AMQP message
hreq->request_msg = qd_message();
qd_composed_field_t *hdrs = qd_compose(QD_PERFORMATIVE_HEADER, 0);
qd_compose_start_list(hdrs);
qd_compose_insert_bool(hdrs, 0); // durable
qd_compose_insert_null(hdrs); // priority
//qd_compose_insert_null(hdrs); // ttl
//qd_compose_insert_bool(hdrs, 0); // first-acquirer
//qd_compose_insert_uint(hdrs, 0); // delivery-count
qd_compose_end_list(hdrs);
qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, hdrs);
qd_compose_start_list(props);
qd_compose_insert_ulong(props, hreq->base.msg_id); // message-id
qd_compose_insert_null(props); // user-id
// @TODO(kgiusti) set to: to target?
qd_compose_insert_string(props, hconn->cfg.address); // to
qd_compose_insert_string(props, h1_codec_request_state_method(hrs)); // subject
if (hconn->cfg.event_channel) {
// event channel does not want replies
qd_compose_insert_null(props); // reply-to
} else {
qd_compose_insert_string(props, hconn->client.reply_to_addr); // reply-to
}
qd_compose_insert_null(props); // correlation-id
qd_compose_insert_null(props); // content-type
qd_compose_insert_null(props); // content-encoding
qd_compose_insert_null(props); // absolute-expiry-time
qd_compose_insert_null(props); // creation-time
qd_compose_insert_string(props, hconn->cfg.site); // group-id
qd_compose_end_list(props);
qd_compose_end_map(hreq->request_props);
qd_message_compose_3(hreq->request_msg, props, hreq->request_props, !has_body);
qd_compose_free(props);
qd_compose_free(hreq->request_props);
hreq->request_props = 0;
// Use up one credit to obtain a delivery and forward to core. If no
// credit is available the request is stalled until the core grants more
// flow.
if (hreq == (_client_request_t*) DEQ_HEAD(hconn->requests) && hconn->in_link_credit > 0) {
hconn->in_link_credit -= 1;
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Delivering request msg-id=%"PRIu64" to router",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
_deliver_request(hconn, hreq);
}
return 0;
}
// Called with decoded body data. This may be called multiple times as body
// data becomes available.
//
static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len,
bool more)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) {
qd_buffer_list_free_buffers(body);
return 0;
}
qd_message_t *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
// @TODO(kgiusti): handle Q2 block event:
qd_message_stream_data_append(msg, body, 0);
//
// Notify the router that more data is ready to be pushed out on the delivery
//
if (!more)
qd_message_set_receive_complete(msg);
if (hreq->request_dlv)
qdr_delivery_continue(qdr_http1_adaptor->core, hreq->request_dlv, false);
return 0;
}
// Called at the completion of request message decoding.
//
static void _client_rx_done_cb(h1_codec_request_state_t *hrs)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" receive complete.",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
if (!qd_message_receive_complete(msg)) {
qd_message_set_receive_complete(msg);
if (hreq->request_dlv) {
qdr_delivery_continue(qdr_http1_adaptor->core, hreq->request_dlv, false);
}
}
}
// The coded has completed processing the request and response messages.
//
static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool cancelled)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(lib_rs);
if (hreq) {
hreq->base.stop = qd_timer_now();
qdr_http1_record_client_request_info(qdr_http1_adaptor, &hreq->base);
hreq->base.lib_rs = 0; // freed on return from this call
hreq->cancelled = hreq->cancelled || cancelled;
hreq->codec_completed = !hreq->cancelled;
uint64_t in_octets, out_octets;
h1_codec_request_state_counters(lib_rs, &in_octets, &out_octets);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"] HTTP request msg-id=%"PRIu64" %s. Octets read: %"PRIu64" written: %"PRIu64,
hreq->base.hconn->conn_id,
hreq->base.msg_id,
cancelled ? "cancelled!" : "codec done",
in_octets, out_octets);
}
}
//////////////////////////////////////////////////////////////////////
// Router Protocol Adapter Callbacks
//////////////////////////////////////////////////////////////////////
void qdr_http1_client_core_second_attach(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_link_t *link,
qdr_terminus_t *source,
qdr_terminus_t *target)
{
if (link == hconn->out_link) {
// this is the reply-to link for the client
qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
hconn->client.reply_to_addr = (char*) qd_iterator_copy(reply_iter);
assert(hconn->client.reply_to_addr);
hconn->out_link_credit += DEFAULT_CAPACITY;
qdr_link_flow(adaptor->core, link, DEFAULT_CAPACITY, false);
}
}
void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_link_t *link,
int credit)
{
qd_log(adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Credit granted on request link %d",
hconn->conn_id, hconn->in_link_id, credit);
assert(link == hconn->in_link); // router only grants flow on incoming link
hconn->in_link_credit += credit;
if (hconn->in_link_credit > 0) {
if (hconn->raw_conn) {
int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
qd_log(adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"] %d read buffers granted",
hconn->conn_id, granted);
}
// is the current request message blocked by lack of credit?
_client_request_t *hreq = (_client_request_t *)DEQ_HEAD(hconn->requests);
if (hreq && hreq->request_msg) {
assert(!hreq->request_dlv);
hconn->in_link_credit -= 1;
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
_deliver_request(hconn, hreq);
}
}
}
static bool _get_multipart_content_length(_client_request_t *hreq, char *value)
{
uint64_t total = 0;
for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg = rmsg->next) {
qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
uint64_t content_length = h1_codec_tx_multipart_section_boundary_length();
bool got_body_length = false;
qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
if (app_props_iter) {
qd_parsed_field_t *app_props = qd_parse(app_props_iter);
if (app_props && qd_parse_is_map(app_props)) {
// now send all headers in app properties
qd_parsed_field_t *key = qd_field_first_child(app_props);
while (key) {
qd_parsed_field_t *value = qd_field_next_child(key);
if (!value)
break;
qd_iterator_t *i_key = qd_parse_raw(key);
if (!i_key)
break;
if (qd_iterator_equal(i_key, (const unsigned char*) CONTENT_LENGTH_KEY)) {
qd_iterator_t *i_value = qd_parse_raw(value);
if (i_value) {
char *length_str = (char*) qd_iterator_copy(i_value);
uint64_t body_length;
sscanf(length_str, "%"SCNu64, &body_length);
free(length_str);
got_body_length = true;
content_length += body_length;
}
} else if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
qd_iterator_t *i_value = qd_parse_raw(value);
if (!i_value)
break;
content_length += qd_iterator_length(i_key) + 2 + qd_iterator_length(i_value) + 2;
}
key = qd_field_next_child(value);
}
}
qd_parse_free(app_props);
}
qd_iterator_free(app_props_iter);
if (got_body_length) {
total += content_length;
} else {
return false;
}
}
total += h1_codec_tx_multipart_end_boundary_length();
sprintf(value, "%"SCNu64, total);
return true;
}
static void _encode_json_response(_client_request_t *hreq)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding json response", hconn->conn_id);
bool ok = !h1_codec_tx_response(hreq->base.lib_rs, 200, NULL, hreq->version_major, hreq->version_minor);
if (!ok) {
qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] Could not encode response", hconn->conn_id);
return;
}
PyObject* msgs = 0;
qd_json_msgs_init(&msgs);
for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg = rmsg->next) {
qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
qd_json_msgs_append(msgs, msg);
rmsg->encoded = true;
}
char *body = qd_json_msgs_string(msgs);
if (body) {
h1_codec_tx_add_header(hreq->base.lib_rs, "Content-Type", "application/json");
int len = strlen(body);
char content_length[25];
sprintf(content_length, "%i", len);
h1_codec_tx_add_header(hreq->base.lib_rs, CONTENT_LENGTH_KEY, content_length);
h1_codec_tx_body_str(hreq->base.lib_rs, body);
free(body);
} else {
qd_log(hconn->adaptor->log, QD_LOG_ERROR, "[C%"PRIu64"] No aggregated json response returned", hconn->conn_id);
}
bool need_close;
h1_codec_tx_done(hreq->base.lib_rs, &need_close);
hreq->close_on_complete = need_close || hreq->close_on_complete;
hreq->codec_completed = true;
}
static void _encode_multipart_response(_client_request_t *hreq)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding multipart response", hconn->conn_id);
bool ok = !h1_codec_tx_response(hreq->base.lib_rs, 200, NULL, hreq->version_major, hreq->version_minor);
char content_length[25];
if (_get_multipart_content_length(hreq, content_length)) {
h1_codec_tx_add_header(hreq->base.lib_rs, CONTENT_LENGTH_KEY, content_length);
}
h1_codec_tx_begin_multipart(hreq->base.lib_rs);
for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg = rmsg->next) {
h1_codec_tx_begin_multipart_section(hreq->base.lib_rs);
qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
if (app_props_iter) {
qd_parsed_field_t *app_props = qd_parse(app_props_iter);
if (app_props && qd_parse_is_map(app_props)) {
// now send all headers in app properties
qd_parsed_field_t *key = qd_field_first_child(app_props);
while (ok && key) {
qd_parsed_field_t *value = qd_field_next_child(key);
if (!value)
break;
qd_iterator_t *i_key = qd_parse_raw(key);
if (!i_key)
break;
// ignore the special headers added by the mapping and content-length field (TODO: case insensitive comparison for content-length)
if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX) && !qd_iterator_equal(i_key, (const unsigned char*) CONTENT_LENGTH_KEY)) {
qd_iterator_t *i_value = qd_parse_raw(value);
if (!i_value)
break;
char *header_key = (char*) qd_iterator_copy(i_key);
char *header_value = (char*) qd_iterator_copy(i_value);
ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
free(header_key);
free(header_value);
}
key = qd_field_next_child(value);
}
}
qd_parse_free(app_props);
}
qd_iterator_free(app_props_iter);
rmsg->headers_encoded = true;
qd_message_stream_data_t *body_data = 0;
bool done = false;
while (ok && !done) {
switch (qd_message_next_stream_data(msg, &body_data)) {
case QD_MESSAGE_STREAM_DATA_BODY_OK:
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
hconn->conn_id, hconn->out_link_id);
if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] body data encode failed",
hconn->conn_id, hconn->out_link_id);
ok = false;
}
break;
case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] response message encoding completed",
hconn->conn_id, hconn->out_link_id);
done = true;
break;
case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Ignoring incomplete body data in aggregated response.",
hconn->conn_id, hconn->out_link_id);
done = true;
break; // wait for more
case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Ignoring corrupted body data in aggregated response.",
hconn->conn_id, hconn->out_link_id);
done = true;
break;
case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Ignoring footer in aggregated response.",
hconn->conn_id, hconn->out_link_id);
done = true;
break;
}
}
rmsg->encoded = true;
}
h1_codec_tx_end_multipart(hreq->base.lib_rs);
bool need_close;
h1_codec_tx_done(hreq->base.lib_rs, &need_close);
hreq->close_on_complete = need_close || hreq->close_on_complete;
hreq->codec_completed = true;
}
static void _encode_aggregated_response(qdr_http1_connection_t *hconn, _client_request_t *hreq)
{
if (hconn->cfg.aggregation == QD_AGGREGATION_MULTIPART) {
_encode_multipart_response(hreq);
} else if (hconn->cfg.aggregation == QD_AGGREGATION_JSON) {
_encode_json_response(hreq);
}
}
static void _encode_empty_response(qdr_http1_connection_t *hconn, _client_request_t *hreq)
{
qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding empty response", hconn->conn_id);
h1_codec_tx_response(hreq->base.lib_rs, 204, NULL, hreq->version_major, hreq->version_minor);
bool need_close;
h1_codec_tx_done(hreq->base.lib_rs, &need_close);
hreq->close_on_complete = need_close || hreq->close_on_complete;
hreq->codec_completed = true;
}
// Handle disposition/settlement update for the outstanding request msg
//
void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_http1_request_base_t *req,
qdr_delivery_t *dlv,
uint64_t disp,
bool settled)
{
_client_request_t *hreq = (_client_request_t *)req;
assert(dlv == hreq->request_dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" delivery update, outcome=0x%"PRIx64"%s",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, disp, settled ? " settled" : "");
if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) {
// terminal disposition
hreq->request_dispo = disp;
if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
// when aggregating response from a multicast request, the
// acknowledgement of the request triggers generating the
// output from the responses received
if (settled) {
if (DEQ_IS_EMPTY(hreq->responses)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Aggregation request settled but no responses received.", hconn->conn_id, hconn->in_link_id);
_encode_empty_response(hconn, hreq);
} else {
_encode_aggregated_response(hconn, hreq);
}
_write_pending_response(hreq);
}
} else if (disp != PN_ACCEPTED) {
// no response message is going to arrive. Now what? For now fake
// a response from the server by using the codec to write an error
// response on the behalf of the server.
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] HTTP request failure, outcome=0x%"PRIx64,
hconn->conn_id, hconn->in_link_id, disp);
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" failure, outcome=0x%"PRIx64,
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, disp);
if (hreq->base.out_http1_octets == 0) {
// best effort attempt to send an error to the client
// if nothing has been sent back so far
_client_response_msg_t *rmsg = new__client_response_msg_t();
ZERO(rmsg);
DEQ_INIT(rmsg->out_data.fifo);
DEQ_INSERT_TAIL(hreq->responses, rmsg);
if (disp == PN_REJECTED) {
qdr_http1_error_response(&hreq->base, 400, "Bad Request");
} else {
// total guess as to what the proper error code should be
qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
}
hreq->close_on_complete = true; // trust nothing
} else {
// partial response already sent - punt:
qdr_http1_close_connection(hconn, "HTTP request failed");
}
}
}
hreq->request_settled = settled || hreq->request_settled;
}
//
// Response message forwarding
//
// use the correlation ID from the AMQP message containing the response to look
// up the original request context
//
static _client_request_t *_lookup_request_context(qdr_http1_connection_t *hconn,
qd_message_t *msg)
{
qdr_http1_request_base_t *req = 0;
qd_parsed_field_t *cid_pf = 0;
qd_iterator_t *cid_iter = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID);
if (cid_iter) {
cid_pf = qd_parse(cid_iter);
if (cid_pf && qd_parse_ok(cid_pf)) {
uint64_t cid = qd_parse_as_ulong(cid_pf);
if (qd_parse_ok(cid_pf)) {
req = DEQ_HEAD(hconn->requests);
while (req) {
if (req->msg_id == cid)
break;
req = DEQ_NEXT(req);
}
}
}
}
qd_parse_free(cid_pf);
qd_iterator_free(cid_iter);
return (_client_request_t*) req;
}
// Encode the response status and all HTTP headers.
// The message has been validated to app properties depth
//
static bool _encode_response_headers(_client_request_t *hreq,
_client_response_msg_t *rmsg)
{
bool ok = false;
qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
if (!hreq->base.site) {
qd_iterator_t *group_id_itr = qd_message_field_iterator(msg, QD_FIELD_GROUP_ID);
hreq->base.site = (char*) qd_iterator_copy(group_id_itr);
qd_iterator_free(group_id_itr);
}
qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
if (app_props_iter) {
qd_parsed_field_t *app_props = qd_parse(app_props_iter);
if (app_props && qd_parse_is_map(app_props)) {
qd_parsed_field_t *tmp = qd_parse_value_by_key(app_props, STATUS_HEADER_KEY);
if (tmp) {
int32_t status_code = qd_parse_as_int(tmp);
if (qd_parse_ok(tmp)) {
// the value for RESPONSE_HEADER_KEY is optional and is set
// to a string representation of the version of the server
// (e.g. "1.1")
uint32_t major = 1;
uint32_t minor = 1;
tmp = qd_parse_value_by_key(app_props, RESPONSE_HEADER_KEY);
if (tmp) {
char *version_str = (char*) qd_iterator_copy(qd_parse_raw(tmp));
if (version_str) {
sscanf(version_str, "%"SCNu32".%"SCNu32, &major, &minor);
free(version_str);
}
}
char *reason_str = 0;
tmp = qd_parse_value_by_key(app_props, REASON_HEADER_KEY);
if (tmp) {
reason_str = (char*) qd_iterator_copy(qd_parse_raw(tmp));
}
qd_log(hreq->base.hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding response %d %s",
hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, (int)status_code,
reason_str ? reason_str : "");
ok = !h1_codec_tx_response(hreq->base.lib_rs, (int)status_code, reason_str, major, minor);
free(reason_str);
// now send all headers in app properties
qd_parsed_field_t *key = qd_field_first_child(app_props);
while (ok && key) {
qd_parsed_field_t *value = qd_field_next_child(key);
if (!value)
break;
qd_iterator_t *i_key = qd_parse_raw(key);
if (!i_key)
break;
// ignore the special headers added by the mapping
if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
qd_iterator_t *i_value = qd_parse_raw(value);
if (!i_value)
break;
char *header_key = (char*) qd_iterator_copy(i_key);
char *header_value = (char*) qd_iterator_copy(i_value);
// @TODO(kgiusti): remove me (sensitive content)
qd_log(hreq->base.hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding response header %s:%s",
hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
header_key, header_value);
ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
free(header_key);
free(header_value);
}
key = qd_field_next_child(value);
}
// If the client has requested Connection: close respond
// accordingly IF this is the terminal response (not
// INFORMATIONAL)
if (ok && (status_code / 100) == 1) {
if (hreq->conn_close_hdr) {
ok = !h1_codec_tx_add_header(hreq->base.lib_rs,
"Connection", "close");
}
}
}
}
}
qd_parse_free(app_props);
qd_iterator_free(app_props_iter);
}
return ok;
}
static uint64_t _encode_response_message(_client_request_t *hreq,
_client_response_msg_t *rmsg)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
if (!rmsg->headers_encoded) {
rmsg->headers_encoded = true;
if (!_encode_response_headers(hreq, rmsg)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] message headers malformed - discarding.",
hconn->conn_id, hconn->out_link_id);
return PN_REJECTED;
}
}
qd_message_stream_data_t *stream_data = 0;
while (true) {
switch (qd_message_next_stream_data(msg, &stream_data)) {
case QD_MESSAGE_STREAM_DATA_BODY_OK:
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
hconn->conn_id, hconn->out_link_id);
if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] body data encode failed",
hconn->conn_id, hconn->out_link_id);
return PN_REJECTED;
}
break;
case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
// ignore footers
qd_message_stream_data_release(stream_data);
break;
case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] response message encoding completed",
hconn->conn_id, hconn->out_link_id);
return PN_ACCEPTED;
case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] body data need more",
hconn->conn_id, hconn->out_link_id);
return 0; // wait for more
case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
hconn->conn_id, hconn->out_link_id);
return PN_REJECTED;
}
}
}
// The I/O thread wants to send this delivery containing the response out the
// link. It is unlikely that the parsing of this message will fail since the
// message was constructed by the ingress router. However if the message fails
// to parse then there is probably no recovering as the client will now be out
// of sync. For now close the connection if an error occurs.
//
uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_link_t *link,
qdr_delivery_t *delivery,
bool settled)
{
qd_message_t *msg = qdr_delivery_message(delivery);
_client_request_t *hreq = (_client_request_t*) qdr_delivery_get_context(delivery);
if (!hreq) {
// new delivery - look for corresponding request via correlation_id
switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
case QD_MESSAGE_DEPTH_INCOMPLETE:
return 0;
case QD_MESSAGE_DEPTH_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
qdr_http1_close_connection(hconn, "Malformed response message");
return PN_REJECTED;
case QD_MESSAGE_DEPTH_OK:
hreq = _lookup_request_context(hconn, msg);
if (!hreq) {
// No corresponding request found
// @TODO(kgiusti) how to handle this? - simply discard?
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
qdr_http1_close_connection(hconn, "Cannot correlate response message");
return PN_REJECTED;
}
// link request state and delivery
_client_response_msg_t *rmsg = new__client_response_msg_t();
ZERO(rmsg);
rmsg->dlv = delivery;
DEQ_INIT(rmsg->out_data.fifo);
qdr_delivery_set_context(delivery, hreq);
qdr_delivery_incref(delivery, "HTTP1 client referencing response delivery");
DEQ_INSERT_TAIL(hreq->responses, rmsg);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP received response for msg-id=%"PRIu64,
hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
break;
}
}
// deliveries arrive one at a time and are added to the tail
_client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
assert(rmsg && rmsg->dlv == delivery);
// when aggregating responses, they are saved on the list until
// the request has been settled, then encoded in the configured
// aggregation format
if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
if (!qd_message_receive_complete(msg)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Response incomplete (%zu responses received)", hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses));
return 0;
}
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Received response (%zu responses received), settling", hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses));
rmsg->dispo = PN_ACCEPTED;
qd_message_set_send_complete(msg);
qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
rmsg->dlv,
rmsg->dispo,
true, // settled,
0, // delivery state
false);
return PN_ACCEPTED;
}
if (!rmsg->dispo) {
rmsg->dispo = _encode_response_message(hreq, rmsg);
if (rmsg->dispo) {
qd_message_set_send_complete(msg);
if (rmsg->dispo == PN_ACCEPTED) {
bool need_close = false;
h1_codec_tx_done(hreq->base.lib_rs, &need_close);
hreq->close_on_complete = need_close || hreq->close_on_complete;
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] HTTP response message msg-id=%"PRIu64" encoding complete",
hconn->conn_id, link->identity, hreq->base.msg_id);
} else {
// The response was bad. There's not much that can be done to
// recover, so for now I punt...
// returning a terminal disposition will cause the delivery to be updated and settled,
// so drop our reference
qdr_delivery_set_context(rmsg->dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 client releasing malformed response delivery");
rmsg->dlv = 0;
qdr_http1_close_connection(hconn, "Cannot parse response message");
return rmsg->dispo;
}
}
}
return 0;
}
//
// Misc
//
// free the response message
//
static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg)
{
DEQ_REMOVE(req->responses, rmsg);
if (rmsg->dlv) {
qdr_delivery_set_context(rmsg->dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 client response delivery settled");
}
qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
free__client_response_msg_t(rmsg);
}
// Check the head response message for buffers that need to be sent
//
static void _write_pending_response(_client_request_t *hreq)
{
if (hreq && !hreq->cancelled) {
assert(DEQ_PREV(&hreq->base) == 0); // must preserve order
_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
if (rmsg && rmsg->out_data.write_ptr) {
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data);
hreq->base.out_http1_octets += written;
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
hreq->base.hconn->conn_id, written);
}
}
}
static void _client_request_free(_client_request_t *hreq)
{
if (hreq) {
qdr_http1_request_base_cleanup(&hreq->base);
qd_message_free(hreq->request_msg);
if (hreq->request_dlv) {
qdr_delivery_set_context(hreq->request_dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 client request delivery settled");
}
qd_compose_free(hreq->request_props);
_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
while (rmsg) {
_client_response_msg_free(hreq, rmsg);
rmsg = DEQ_HEAD(hreq->responses);
}
free__client_request_t(hreq);
}
}
// release client-specific state
void qdr_http1_client_conn_cleanup(qdr_http1_connection_t *hconn)
{
for (_client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
hreq;
hreq = (_client_request_t*) DEQ_HEAD(hconn->requests)) {
_client_request_free(hreq);
}
}
// handle connection close request from management
//
void qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
const char *error)
{
// initiate close of the raw conn. the adaptor will call
// qdr_connection_close() and clean up once the DISCONNECT
// event is processed
//
qdr_http1_close_connection(hconn, error);
}
static void _deliver_request(qdr_http1_connection_t *hconn, _client_request_t *hreq)
{
if (hconn->cfg.event_channel) {
qd_iterator_t *addr = qd_message_field_iterator(hreq->request_msg, QD_FIELD_TO);
qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
hreq->request_dlv = qdr_link_deliver_to(hconn->in_link, hreq->request_msg, 0, addr, false, 0, 0, 0, 0);
} else {
hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
}
qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
hreq->request_msg = 0;
}