blob: c1fb17b0d4a207ba6bd99061eb5a7261a3a58cb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "http1_private.h"
#include "adaptors/adaptor_utils.h"
#include <proton/proactor.h>
//
// This file contains code specific to HTTP server processing. The raw
// connection is terminated at an HTTP server, not an HTTP client.
//
//
// State for a single response message arriving via the raw connection. This
// message will be decoded into a single AMQP message and forwarded into the
// core.
//
// This object is instantiated when the HTTP1 codec indicates the arrival of a
// response message (See _server_rx_response_cb()). The response is considered
// "complete" after it has been fully encoded and delivered to the core. The
// _server_response_msg_t is freed at this point - we do not wait for dispo or
// settlement from the core since we cannot do anything meaningful should the
// delivery fail (other than log it).
//
typedef struct _server_response_msg_t {
DEQ_LINKS(struct _server_response_msg_t);
struct _server_request_t *hreq; // owning request
qd_message_t *msg; // hold incoming message
qd_composed_field_t *msg_props; // hold incoming headers
qdr_delivery_t *dlv; // inbound to router (qdr_link_deliver)
bool rx_complete; // response rx complete
} _server_response_msg_t;
ALLOC_DECLARE(_server_response_msg_t);
ALLOC_DEFINE(_server_response_msg_t);
DEQ_DECLARE(_server_response_msg_t, _server_response_msg_list_t);
const char *HOST_KEY = "Host";
//
// State for an HTTP/1.x Request+Response exchange, server facing
//
typedef struct _server_request_t {
qdr_http1_request_base_t base;
// The request arrives via the router core in an AMQP message
// (qd_message_t). These fields are used to encode the response and send
// it out the raw connection.
//
qdr_delivery_t *request_dlv; // outbound from core_link_deliver
uint64_t request_dispo; // set by adaptor during encode
bool request_settled; // set by adaptor
bool request_acked; // true if dispo sent to core
bool request_discard; // drop incoming request data
bool headers_encoded; // True when header encode done
qdr_http1_out_data_fifo_t out_data; // encoded request written to raw conn
_server_response_msg_list_t responses; // response(s) to this request
bool codec_completed; // Request and Response HTTP msgs OK
bool cancelled;
bool close_on_complete; // close the conn when this request is complete
bool response_complete; // true when server response message decoded
} _server_request_t;
ALLOC_DECLARE(_server_request_t);
ALLOC_DEFINE(_server_request_t);
//
// This file contains code specific to HTTP server processing. The raw
// connection is terminated at an HTTP server, not an HTTP client.
//
#define DEFAULT_CAPACITY 250
// Reconnection logic time values: When the HTTP server disconnects this
// adaptor will attempt to reconnect. The reconnect interval increases by
// RETRY_PAUSE_MSEC with each reconnect failure until it hits the maximum of
// RETRY_MAX_PAUSE_MSEC. If the reconnection does not succeed after
// LINK_TIMEOUT_MSEC then the qdr_link_t's are detached to prevent client
// requests from arriving for a potentially dead server.
#define RETRY_PAUSE_MSEC ((qd_duration_t)500)
#define RETRY_MAX_PAUSE_MSEC ((qd_duration_t)3000)
#define LINK_TIMEOUT_MSEC ((qd_duration_t)2500)
static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
static void _server_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data);
static int _server_rx_request_cb(h1_codec_request_state_t *hrs,
const char *method,
const char *target,
uint32_t version_major,
uint32_t version_minor);
static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
int status_code,
const char *reason_phrase,
uint32_t version_major,
uint32_t version_minor);
static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value);
static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body);
static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len, bool more);
static void _server_rx_done_cb(h1_codec_request_state_t *hrs);
static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled);
static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context);
static void _do_reconnect(void *context);
static void _server_response_msg_free(_server_request_t *req, _server_response_msg_t *rmsg);
static void _server_request_free(_server_request_t *hreq);
static void _write_pending_request(_server_request_t *req);
static void _cancel_request(_server_request_t *req);
static bool _process_request(_server_request_t *req);
static void _encode_request_message(_server_request_t *hreq);
static void _send_request_message(_server_request_t *hreq);
////////////////////////////////////////////////////////
// HTTP/1.x Server Connector
////////////////////////////////////////////////////////
// An HttpConnector has been created. Create an qdr_http_connection_t and a
// qdr_connection_t for it.
//
static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ctor,
qd_dispatch_t *qd,
const qd_http_bridge_config_t *bconfig)
{
qdr_http1_connection_t *hconn = new_qdr_http1_connection_t();
ZERO(hconn);
hconn->type = HTTP1_CONN_SERVER;
hconn->qd_server = qd->server;
hconn->adaptor = qdr_http1_adaptor;
hconn->handler_context.handler = &_handle_connection_events;
hconn->handler_context.context = hconn;
sys_atomic_init(&hconn->q2_restart, 0);
hconn->cfg.host = qd_strdup(bconfig->host);
hconn->cfg.port = qd_strdup(bconfig->port);
hconn->cfg.address = qd_strdup(bconfig->address);
hconn->cfg.site = bconfig->site ? qd_strdup(bconfig->site) : 0;
hconn->cfg.host_port = qd_strdup(bconfig->host_port);
hconn->server.connector = ctor;
ctor->ctx = (void*)hconn;
hconn->cfg.event_channel = bconfig->event_channel;
hconn->cfg.aggregation = bconfig->aggregation;
hconn->cfg.host_override = bconfig->host_override ? qd_strdup(bconfig->host_override) : 0;
// for initiating a connection to the server
hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, _do_reconnect, hconn);
// Create the qdr_connection
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
false, //bool is_authenticated,
true, //bool opened,
"", //char *sasl_mechanisms,
QD_OUTGOING, //qd_direction_t dir,
hconn->cfg.host_port, //const char *host,
"", //const char *ssl_proto,
"", //const char *ssl_cipher,
"", //const char *user,
"HTTP/1.x Adaptor", //const char *container,
0, //pn_data_t *connection_properties,
0, //int ssl_ssf,
false, //bool ssl,
"", // peer router version,
false); // streaming links
hconn->conn_id = qd_server_allocate_connection_id(hconn->qd_server);
hconn->qdr_conn = qdr_connection_opened(qdr_http1_adaptor->core,
qdr_http1_adaptor->adaptor,
false, // incoming
QDR_ROLE_NORMAL,
1, // cost
hconn->conn_id,
0, // label
0, // remote container id
false, // strip annotations in
false, // strip annotations out
DEFAULT_CAPACITY,
0, // vhost
0, // policy_spec
info,
0, // bind context
0); // bind token
// wait for the raw connection to come up before creating the in and out links
qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to server created", hconn->conn_id);
return hconn;
}
// Management Agent API - Create
//
qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity)
{
qd_http_connector_t *c = qd_http_connector(qd->server);
if (!c) {
qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR, "Unable to create http connector: no memory");
return 0;
}
c->config = *config;
DEQ_ITEM_INIT(c);
qdr_http1_connection_t *hconn = _create_server_connection(c, qd, config);
if (hconn) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"] Initiating connection to HTTP server %s",
hconn->conn_id, hconn->cfg.host_port);
// lock out the core activation thread. Up until this point the core
// thread cannot activate the qdr_connection_t since the
// qdr_connection_t context has not been set (see
// _core_connection_activate_CT in http1_adaptor.c). This keeps the
// core from attempting to schedule the connection until we finish
// setup.
sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c);
qdr_connection_set_context(hconn->qdr_conn, hconn);
qd_timer_schedule(hconn->server.reconnect_timer, 0);
sys_mutex_unlock(qdr_http1_adaptor->lock);
// setup complete - core thread can activate the connection
return c;
} else {
qd_http_connector_decref(c);
c = 0;
}
return c;
}
// Management Agent API - Delete
//
void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct)
{
if (ct) {
qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_REMOVE(qdr_http1_adaptor->connectors, ct);
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) ct->ctx;
if (hconn) {
hconn->server.connector = 0;
ct->ctx = 0;
if (hconn->qdr_conn)
// have the core close this connection
qdr_core_close_connection(hconn->qdr_conn);
}
sys_mutex_unlock(qdr_http1_adaptor->lock);
qd_http_connector_decref(ct);
}
}
////////////////////////////////////////////////////////
// Raw Connector Events
////////////////////////////////////////////////////////
// Is the hreq currently in flight to the server?
//
static inline bool _is_request_in_progress(const _server_request_t *hreq)
{
return hreq && (hreq->base.out_http1_octets > 0 || hreq->cancelled);
}
// Create the qdr links and HTTP codec when the server connection comes up.
// These links & codec will persist across temporary drops in the connection to
// the server (like when closing the connection to indicate end of response
// message). However if the connection to the server cannot be re-established
// in a "reasonable" amount of time we consider the server unavailable and
// these links and codec will be closed - aborting any pending requests. Once
// the connection to the server is reestablished these links & codec will be
// recreated.
//
static void _setup_server_links(qdr_http1_connection_t *hconn)
{
if (!hconn->in_link) {
// simulate an anonymous link for responses from the server
hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
QD_INCOMING,
qdr_terminus(0), //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target
"http1.server.in", //const char *name,
0, //const char *terminus_addr,
false,
NULL,
&(hconn->in_link_id));
qdr_link_set_context(hconn->in_link, hconn);
qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] HTTP server response link created",
hconn->conn_id, hconn->in_link_id);
}
if (!hconn->out_link) {
// simulate a server subscription for its service address
qdr_terminus_t *source = qdr_terminus(0);
qdr_terminus_set_address(source, hconn->cfg.address);
hconn->out_link = qdr_link_first_attach(hconn->qdr_conn,
QD_OUTGOING,
source, //qdr_terminus_t *source,
qdr_terminus(0), //qdr_terminus_t *target,
"http1.server.out", //const char *name,
0, //const char *terminus_addr,
false,
0, // initial delivery
&(hconn->out_link_id));
qdr_link_set_context(hconn->out_link, hconn);
hconn->out_link_credit = DEFAULT_CAPACITY;
qdr_link_flow(hconn->adaptor->core, hconn->out_link, DEFAULT_CAPACITY, false);
qd_log(hconn->adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] HTTP server request link created",
hconn->conn_id, hconn->out_link_id);
}
if (!hconn->http_conn) {
h1_codec_config_t config = {0};
config.type = HTTP1_CONN_SERVER;
config.tx_buffers = _server_tx_buffers_cb;
config.tx_stream_data = _server_tx_stream_data_cb;
config.rx_request = _server_rx_request_cb;
config.rx_response = _server_rx_response_cb;
config.rx_header = _server_rx_header_cb;
config.rx_headers_done = _server_rx_headers_done_cb;
config.rx_body = _server_rx_body_cb;
config.rx_done = _server_rx_done_cb;
config.request_complete = _server_request_complete_cb;
hconn->http_conn = h1_codec_connection(&config, hconn);
}
}
// Tear down the qdr links and the codec. This is called when the
// connection to the server has dropped and cannot be re-established in a
// timely manner.
//
static void _teardown_server_links(qdr_http1_connection_t *hconn)
{
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
while (hreq) {
_server_request_free(hreq);
hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
}
h1_codec_connection_free(hconn->http_conn);
hconn->http_conn = 0;
if (hconn->out_link) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Closing outgoing HTTP link",
hconn->conn_id, hconn->out_link_id);
qdr_link_set_context(hconn->out_link, 0);
qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
hconn->out_link = 0;
}
if (hconn->in_link) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Closing incoming HTTP link",
hconn->conn_id, hconn->in_link_id);
qdr_link_set_context(hconn->in_link, 0);
qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
hconn->in_link = 0;
}
}
// Reconnection timer handler.
// This timer can be scheduled either by the event loop during the
// PN_RAW_CONNECTION_DISCONNECT event or by the core thread via
// _core_connection_activate_CT in http1_adaptor.c. Since timers do not run
// concurrently this handler is guaranteed never to collide with itself. Once
// hconn->raw_conn is set to zero by the disconnect handler it will remain zero
// until this handler creates a new raw connection.
//
static void _do_reconnect(void *context)
{
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
uint64_t conn_id = hconn->conn_id;
// while timers do not run concurrently it is possible to reschedule them
// via another thread while the timer handler is running, resulting in this
// handler running twice
if (hconn->raw_conn) return; // already ran
if (hconn->qdr_conn) {
// handle any qdr_connection_t processing requests that occurred since
// this raw connection dropped.
while (qdr_connection_process(hconn->qdr_conn))
;
if (!hconn->qdr_conn) {
// the qdr_connection_t has been closed
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id);
qdr_http1_connection_free(hconn);
return;
}
_process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
}
// Do not attempt to re-connect if the current request is still in
// progress. This happens when the server has closed the connection before
// the request message has fully arrived (!rx_complete).
// qdr_connection_process() will continue to invoke the
// qdr_http1_server_core_link_deliver callback until the request message is
// complete.
// false positive: head request is removed before it is freed, null is passed
/* coverity[pass_freed_arg] */
if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"] Connecting to HTTP server...", conn_id);
sys_mutex_lock(qdr_http1_adaptor->lock);
hconn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
// this next call may immediately reschedule the connection on another I/O
// thread. After this call hconn may no longer be valid!
pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
sys_mutex_unlock(qdr_http1_adaptor->lock);
}
}
static void _accept_and_settle_request(_server_request_t *hreq)
{
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
hreq->request_dlv,
hreq->request_dispo,
true, // settled
0, // delivery state
false);
// can now release the delivery
qdr_delivery_set_context(hreq->request_dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
hreq->request_dlv = 0;
hreq->request_settled = true;
}
// handle PN_RAW_CONNECTION_READ
static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
{
int error = 0;
qd_buffer_list_t blist;
uintmax_t length;
qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
if (HTTP1_DUMP_BUFFERS) {
fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length);
qd_buffer_t *bb = DEQ_HEAD(blist);
while (bb) {
fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]);
bb = DEQ_NEXT(bb);
}
fflush(stdout);
}
if (length) {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server (%zu buffers)",
hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
hconn->in_http1_octets += length;
error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
}
return error;
}
// handle PN_RAW_CONNECTION_NEED_READ_BUFFERS
static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
{
// @TODO(kgiusti): backpressure if no credit
// if (hconn->in_link_credit > 0 */)
int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
hconn->conn_id, granted);
}
// Proton Raw Connection Events
//
static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context)
{
qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
qd_log_source_t *log = qdr_http1_adaptor->log;
if (!hconn) return;
qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP server proactor event %s", hconn->conn_id, pn_event_type_name(pn_event_type(e)));
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
hconn->server.link_timeout = 0;
_setup_server_links(hconn);
while (qdr_connection_process(hconn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_CLOSED_READ: {
// notify the codec so it can complete the current response
// message (response body terminated on connection closed)
h1_codec_connection_rx_closed(hconn->http_conn);
pn_raw_connection_close(hconn->raw_conn);
hconn->q2_blocked = false;
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
// discard any remaining outgoing request message data
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
if (_is_request_in_progress(hreq)) {
hreq->request_discard = true;
qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
}
pn_raw_connection_close(hconn->raw_conn);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
pn_raw_connection_set_context(hconn->raw_conn, 0);
// Check for a request that is in-progress - it needs to be cancelled.
// However there is an exception: the server has completed sending a
// response message and closed the connection, but the outgoing request
// message has not completed (example: a streaming POST that has been
// rejected by the server). In this case wait until the request message
// has fully arrived from the core.
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
if (_is_request_in_progress(hreq) && !hreq->response_complete)
_cancel_request(hreq);
_process_request(hreq);
//
// Try to reconnect to the server. Leave the links intact so pending
// requests are not aborted. If we fail to reconnect after
// LINK_TIMEOUT_MSECS drop the links to prevent additional request from
// arriving.
//
bool reconnect = false;
if (hconn->qdr_conn) {
if (hconn->server.link_timeout == 0) {
hconn->server.link_timeout = qd_timer_now() + LINK_TIMEOUT_MSEC;
hconn->server.reconnect_pause = 0;
} else {
if ((qd_timer_now() - hconn->server.link_timeout) >= 0)
_teardown_server_links(hconn);
if (hconn->server.reconnect_pause < RETRY_MAX_PAUSE_MSEC)
hconn->server.reconnect_pause += RETRY_PAUSE_MSEC;
}
reconnect = true;
}
// prevent core activation
sys_mutex_lock(qdr_http1_adaptor->lock);
hconn->raw_conn = 0;
if (reconnect)
qd_timer_schedule(hconn->server.reconnect_timer, hconn->server.reconnect_pause);
sys_mutex_unlock(qdr_http1_adaptor->lock);
// do not manipulate hconn further as it may now be processed by the
// timer thread
return;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
_send_request_message((_server_request_t*) DEQ_HEAD(hconn->requests));
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
_handle_conn_need_read_buffers(hconn);
break;
}
case PN_RAW_CONNECTION_WAKE: {
int error = 0;
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
if (sys_atomic_set(&hconn->q2_restart, 0)) {
// note: unit tests grep for this log!
qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from Q2 limit", hconn->conn_id);
hconn->q2_blocked = false;
error = _handle_conn_read_event(hconn); // restart receiving
_handle_conn_need_read_buffers(hconn);
}
while (qdr_connection_process(hconn->qdr_conn)) {}
if (error)
qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Connection processing complete", hconn->conn_id);
break;
}
case PN_RAW_CONNECTION_READ: {
if (!hconn->q2_blocked) {
int error = _handle_conn_read_event(hconn);
if (error)
qdr_http1_close_connection(hconn, "Incoming response message failed to parse");
}
break;
}
case PN_RAW_CONNECTION_WRITTEN: {
qdr_http1_free_written_buffers(hconn);
break;
}
default:
break;
}
//
// After each event check connection and request status
//
if (!hconn->qdr_conn) {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id);
qdr_http1_connection_free(hconn);
} else {
bool need_close = _process_request((_server_request_t*) DEQ_HEAD(hconn->requests));
if (need_close) {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
qdr_http1_close_connection(hconn, "HTTP Request requires connection close");
}
}
}
// Check the head request for completion. Return true if the connection must be
// closed before starting the next request.
static bool _process_request(_server_request_t *hreq)
{
bool need_close = false;
if (!hreq)
return need_close;
assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hreq->cancelled) {
// have to wait until all buffers returned from proton
// before we can release the request
if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
return false;
// clean up the request message delivery
if (hreq->request_dlv) {
if ((!hreq->request_acked || !hreq->request_settled) &&
hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
if (!hreq->request_dispo || hreq->request_dispo == PN_ACCEPTED)
hreq->request_dispo = (hreq->base.out_http1_octets > 0
? PN_MODIFIED : PN_RELEASED);
qd_message_set_send_complete(qdr_delivery_message(hreq->request_dlv));
qdr_link_complete_sent_message(qdr_http1_adaptor->core, hconn->out_link);
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
hreq->request_dlv,
hreq->request_dispo,
true, // settled
0, // delivery state
false);
hreq->request_acked = hreq->request_settled = true;
}
qdr_delivery_set_context(hreq->request_dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server request cancelled releasing delivery");
hreq->request_dlv = 0;
}
// drop in flight responses
_server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
while (rmsg) {
if (rmsg->dlv) {
qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
qdr_delivery_set_aborted(rmsg->dlv, true);
}
_server_response_msg_free(hreq, rmsg);
rmsg = DEQ_HEAD(hreq->responses);
}
// it is safe to keep the connection up if this request has never been
// written to the connection, otherwise the state of the connection is
// unknown so close it
if (hreq->base.out_http1_octets > 0)
need_close = true;
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" cancelled",
hconn->conn_id, hreq->base.msg_id);
_server_request_free(hreq);
if (hconn->out_link)
qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
} else if (hreq->codec_completed) {
// The request message has been fully encoded and the response msg(s)
// have been completely received. The terminal disposition for the
// request message delivery can be set now since the server is done
// responding. The request disposition can be settled after all the
// response messages have been delivered to the core.
// hreq->out_data.fifo ==> request message written to raw conn
// DEQ_IS_EMPTY(hreq->responses)
if ((!hreq->request_acked || (!hreq->request_settled
&& DEQ_IS_EMPTY(hreq->responses)))
&& hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
assert(hreq->request_dlv);
assert(hreq->request_dispo == PN_ACCEPTED);
hreq->request_settled = DEQ_IS_EMPTY(hreq->responses);
if (!hreq->request_acked) {
qd_message_set_send_complete(qdr_delivery_message(hreq->request_dlv));
qdr_link_complete_sent_message(qdr_http1_adaptor->core, hconn->out_link);
}
qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
hreq->request_dlv,
hreq->request_dispo,
hreq->request_settled,
0, // delivery state
false);
hreq->request_acked = true;
if (hreq->request_settled) {
qdr_delivery_set_context(hreq->request_dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server request settled releasing delivery");
hreq->request_dlv = 0;
}
}
if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data.fifo) == 0) {
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
hconn->conn_id, hreq->base.msg_id);
_server_request_free(hreq);
if (hconn->out_link)
qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
}
}
return need_close;
}
//////////////////////////////////////////////////////////////////////
// HTTP/1.x Encoder/Decoder Callbacks
//////////////////////////////////////////////////////////////////////
// Encoder has a buffer list to send to the server
//
static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *blist, unsigned int len)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hreq->request_discard)
qd_buffer_list_free_buffers(blist);
else {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
hconn->conn_id, hconn->out_link_id, len);
qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
}
}
// Encoder has body data to send to the server
//
static void _server_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hreq->request_discard)
qd_message_stream_data_release(stream_data);
else {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
hconn->conn_id, hconn->out_link_id);
qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data);
}
}
// Server will not be sending us HTTP requests
//
static int _server_rx_request_cb(h1_codec_request_state_t *hrs,
const char *method,
const char *target,
uint32_t version_major,
uint32_t version_minor)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(qdr_http1_adaptor->log, QD_LOG_ERROR,
"[C%"PRIu64"][L%"PRIu64"] Spurious HTTP request received from server",
hconn->conn_id, hconn->in_link_id);
return HTTP1_STATUS_BAD_REQ;
}
// called when decoding an HTTP response from the server.
//
static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
int status_code,
const char *reason_phrase,
uint32_t version_major,
uint32_t version_minor)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
// expected to be in-order
assert(hreq && hreq == (_server_request_t*) DEQ_HEAD(hconn->requests));
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP msg_id=%"PRIu64" response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
version_major, version_minor);
if (hconn->cfg.event_channel) {
return 0;
}
_server_response_msg_t *rmsg = new__server_response_msg_t();
ZERO(rmsg);
rmsg->hreq = hreq;
DEQ_INSERT_TAIL(hreq->responses, rmsg);
rmsg->msg_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_compose_start_map(rmsg->msg_props);
{
char version[64];
snprintf(version, 64, "%"PRIi32".%"PRIi32, version_major, version_minor);
qd_compose_insert_symbol(rmsg->msg_props, RESPONSE_HEADER_KEY);
qd_compose_insert_string(rmsg->msg_props, version);
qd_compose_insert_symbol(rmsg->msg_props, STATUS_HEADER_KEY);
qd_compose_insert_int(rmsg->msg_props, (int32_t)status_code);
if (reason_phrase) {
qd_compose_insert_symbol(rmsg->msg_props, REASON_HEADER_KEY);
qd_compose_insert_string(rmsg->msg_props, reason_phrase);
}
}
hreq->response_complete = false;
return 0;
}
// called for each decoded HTTP header.
//
static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, const char *value)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"]L%"PRIu64"] HTTP response header received: key='%s' value='%s'",
hconn->conn_id, hconn->in_link_id, key, value);
if (hconn->cfg.event_channel) {
return 0;
}
// expect: running incoming request at tail
_server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
assert(rmsg);
// We need to filter the connection header out
// @TODO(kgiusti): also have to remove headers given in value!
if (strcasecmp(key, "connection") != 0) {
qd_compose_insert_symbol(rmsg->msg_props, key);
qd_compose_insert_string(rmsg->msg_props, value);
}
return 0;
}
// called after the last header is decoded, before decoding any body data.
//
static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_body)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP response headers done.",
hconn->conn_id, hconn->in_link_id);
if (hconn->cfg.event_channel) {
return 0;
}
// expect: running incoming request at tail
_server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
assert(rmsg && !rmsg->msg);
// start building the AMQP message
rmsg->msg = qd_message();
qd_composed_field_t *hdrs = qd_compose(QD_PERFORMATIVE_HEADER, 0);
qd_compose_start_list(hdrs);
qd_compose_insert_bool(hdrs, 0); // durable
qd_compose_insert_null(hdrs); // priority
//qd_compose_insert_null(hdrs); // ttl
//qd_compose_insert_bool(hdrs, 0); // first-acquirer
//qd_compose_insert_uint(hdrs, 0); // delivery-count
qd_compose_end_list(hdrs);
qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, hdrs);
qd_compose_start_list(props);
qd_compose_insert_null(props); // message-id
qd_compose_insert_null(props); // user-id
qd_compose_insert_string(props, hreq->base.response_addr); // to
// subject:
qd_compose_insert_string(props, h1_codec_request_state_method(hrs));
qd_compose_insert_null(props); // reply-to
qd_compose_insert_ulong(props, hreq->base.msg_id); // correlation-id
qd_compose_insert_null(props); // content-type
qd_compose_insert_null(props); // content-encoding
qd_compose_insert_null(props); // absolute-expiry-time
qd_compose_insert_null(props); // creation-time
qd_compose_insert_string(props, hconn->cfg.site); // group-id
qd_compose_end_list(props);
qd_compose_end_map(rmsg->msg_props);
qd_message_compose_3(rmsg->msg, props, rmsg->msg_props, !has_body);
qd_compose_free(props);
qd_compose_free(rmsg->msg_props);
rmsg->msg_props = 0;
// future-proof: ensure the message headers have not caused Q2
// blocking. We only check for Q2 events while adding body data.
assert(!qd_message_is_Q2_blocked(rmsg->msg));
qd_alloc_safe_ptr_t hconn_sp = QD_SAFE_PTR_INIT(hconn);
qd_message_set_q2_unblocked_handler(rmsg->msg, qdr_http1_q2_unblocked_handler, hconn_sp);
// start delivery if possible
if (hconn->in_link_credit > 0 && rmsg == DEQ_HEAD(hreq->responses)) {
hconn->in_link_credit -= 1;
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Delivering msg-id=%"PRIu64" response to router addr=%s",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, hreq->base.response_addr);
qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
assert(addr);
qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
rmsg->dlv = qdr_link_deliver_to(hconn->in_link, rmsg->msg, 0, addr, false, 0, 0, 0, 0);
qdr_delivery_set_context(rmsg->dlv, (void*) hreq);
rmsg->msg = 0; // now owned by delivery
}
return 0;
}
// Called with decoded body data. This may be called multiple times as body
// data becomes available.
//
static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *body, size_t len,
bool more)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
bool q2_blocked = false;
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
if (hconn->cfg.event_channel) {
qd_buffer_list_free_buffers(body);
return 0;
}
_server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
qd_message_stream_data_append(msg, body, &q2_blocked);
hconn->q2_blocked = hconn->q2_blocked || q2_blocked;
if (q2_blocked) {
// note: unit tests grep for this log!
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] server link blocked on Q2 limit", hconn->conn_id);
}
//
// Notify the router that more data is ready to be pushed out on the delivery
//
if (!more)
qd_message_set_receive_complete(msg);
if (rmsg->dlv)
qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, false);
return 0;
}
// Called at the completion of response decoding.
//
static void _server_rx_done_cb(h1_codec_request_state_t *hrs)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hconn->cfg.event_channel) {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP response message msg-id=%"PRIu64" decoding complete.",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
hreq->response_complete = true;
return;
}
_server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP response message msg-id=%"PRIu64" decoding complete.",
hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
rmsg->rx_complete = true;
if (!qd_message_receive_complete(msg)) {
qd_message_set_receive_complete(msg);
if (rmsg->dlv) {
qdr_delivery_continue(qdr_http1_adaptor->core, rmsg->dlv, false);
}
}
if (rmsg->dlv && hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
// We've finished the delivery, and don't care about outcome/settlement
_server_response_msg_free(hreq, rmsg);
}
// only consider the response complete if terminal response code (!1xx)
if (h1_codec_request_state_response_code(hrs) / 100 != 1)
hreq->response_complete = true;
}
// called at the completion of a full Request/Response exchange, or as a result
// of cancelling the request. The hrs will be deleted on return from this
// call. Any hrs related state must be released before returning from this
// callback.
//
// Note: in the case where the request had multiple response messages, this
// call occurs when the LAST response has been completely received
// (_server_rx_done_cb())
//
static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
hreq->base.stop = qd_timer_now();
qdr_http1_record_server_request_info(qdr_http1_adaptor, &hreq->base);
hreq->base.lib_rs = 0;
hreq->cancelled = hreq->cancelled || cancelled;
hreq->codec_completed = !hreq->cancelled;
uint64_t in_octets, out_octets;
h1_codec_request_state_counters(hrs, &in_octets, &out_octets);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"] HTTP request/response %s. Octets read: %"PRIu64" written: %"PRIu64,
hconn->conn_id,
cancelled ? "cancelled!" : "codec done",
in_octets, out_octets);
}
//////////////////////////////////////////////////////////////////////
// Router Protocol Adapter Callbacks
//////////////////////////////////////////////////////////////////////
// credit has been granted - responses may now be sent to the
// router core.
//
void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_link_t *link,
int credit)
{
assert(link == hconn->in_link); // router only grants flow on incoming link
assert(qdr_link_is_anonymous(link)); // remove me
hconn->in_link_credit += credit;
qd_log(adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Credit granted on response link: %d",
hconn->conn_id, hconn->in_link_id, hconn->in_link_credit);
if (hconn->in_link_credit > 0) {
if (hconn->raw_conn)
qda_raw_conn_grant_read_buffers(hconn->raw_conn);
// check for pending responses that are blocked for credit
_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
if (hreq) {
_server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
while (rmsg && rmsg->msg && hconn->in_link_credit > 0) {
assert(!rmsg->dlv);
hconn->in_link_credit -= 1;
qd_log(adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Delivering blocked response to router addr=%s",
hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
rmsg->dlv = qdr_link_deliver_to(hconn->in_link, rmsg->msg, 0, addr, false, 0, 0, 0, 0);
qdr_delivery_set_context(rmsg->dlv, (void*) hreq);
rmsg->msg = 0;
if (!rmsg->rx_complete) {
// stop here since response must be complete before we can deliver the next one.
break;
}
if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
// stop here since response should not be freed until it is accepted
break;
}
// else the delivery is complete no need to save it
_server_response_msg_free(hreq, rmsg);
rmsg = DEQ_HEAD(hreq->responses);
}
}
}
}
// Handle disposition/settlement update for the outstanding HTTP response.
//
void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_http1_request_base_t *hbase,
qdr_delivery_t *dlv,
uint64_t disp,
bool settled)
{
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] HTTP response delivery update, outcome=0x%"PRIx64"%s",
hconn->conn_id, hconn->in_link_id, disp, settled ? " settled": "");
// Not much can be done with error dispositions (I think)
if (disp != PN_ACCEPTED) {
qd_log(adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] response message was not accepted, outcome=0x%"PRIx64,
hconn->conn_id, hconn->in_link_id, disp);
}
if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
_server_request_t *hreq = (_server_request_t*)hbase;
qd_message_set_send_complete(qdr_delivery_message(hreq->request_dlv));
qdr_link_complete_sent_message(qdr_http1_adaptor->core, hconn->out_link);
_accept_and_settle_request(hreq);
hreq->request_acked = true;
qd_log(adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] request accepted", hconn->conn_id, hconn->in_link_id);
_server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
_server_response_msg_free(hreq, rmsg);
}
}
//
// Request message forwarding
//
// Create a request context for a new request in msg, which is valid to a depth
// of at least QD_DEPTH_PROPERTIES
//
static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn, qd_message_t *msg)
{
uint64_t msg_id = 0;
char *reply_to = 0;
bool ok = false;
qd_parsed_field_t *msg_id_pf = 0;
qd_iterator_t *msg_id_itr = qd_message_field_iterator_typed(msg, QD_FIELD_MESSAGE_ID); // ulong
if (msg_id_itr) {
msg_id_pf = qd_parse(msg_id_itr);
if (msg_id_pf && qd_parse_ok(msg_id_pf)) {
msg_id = qd_parse_as_ulong(msg_id_pf);
ok = qd_parse_ok(msg_id_pf);
}
}
qd_parse_free(msg_id_pf);
qd_iterator_free(msg_id_itr);
if (!ok) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting message missing id.",
hconn->conn_id, hconn->out_link_id);
return 0;
}
qd_iterator_t *reply_to_itr = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
reply_to = (char*) qd_iterator_copy(reply_to_itr);
qd_iterator_free(reply_to_itr);
if (!reply_to && !hconn->cfg.event_channel) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.",
hconn->conn_id, hconn->out_link_id);
return 0;
}
qd_iterator_t *group_id_itr = qd_message_field_iterator(msg, QD_FIELD_GROUP_ID);
char* group_id = (char*) qd_iterator_copy(group_id_itr);
qd_iterator_free(group_id_itr);
_server_request_t *hreq = new__server_request_t();
ZERO(hreq);
hreq->base.hconn = hconn;
hreq->base.msg_id = msg_id;
hreq->base.response_addr = reply_to;
hreq->base.site = group_id;
hreq->base.start = qd_timer_now();
DEQ_INIT(hreq->out_data.fifo);
DEQ_INIT(hreq->responses);
DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg-id=%"PRIu64" reply-to=%s.",
hconn->conn_id, hconn->out_link_id, msg_id, reply_to);
return hreq;
}
// Start a new request to the server. msg has been validated to at least
// application properties depth. Returns 0 on success.
//
static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg)
{
// start encoding HTTP request. Need method, target and version
qdr_http1_connection_t *hconn = hreq->base.hconn;
char *method_str = 0;
char *target_str = 0;
qd_parsed_field_t *app_props = 0;
uint32_t major = 1;
uint32_t minor = 1;
uint64_t outcome = 0;
assert(!hreq->base.lib_rs);
assert(qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK);
// method is passed in the SUBJECT field
qd_iterator_t *method_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT);
if (!method_iter) {
return PN_REJECTED;
}
method_str = (char*) qd_iterator_copy(method_iter);
qd_iterator_free(method_iter);
if (!method_str || *method_str == 0) {
return PN_REJECTED;
}
// target, version info and other headers are in the app properties
qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES);
if (!app_props_iter) {
outcome = PN_REJECTED;
goto exit;
}
app_props = qd_parse(app_props_iter);
qd_iterator_free(app_props_iter);
if (!app_props) {
outcome = PN_REJECTED;
goto exit;
}
qd_parsed_field_t *ref = qd_parse_value_by_key(app_props, TARGET_HEADER_KEY);
target_str = (char*) qd_iterator_copy(qd_parse_raw(ref));
if (!target_str || *target_str == 0) {
outcome = PN_REJECTED;
goto exit;
}
// Pull the version info from the app properties (e.g. "1.1")
ref = qd_parse_value_by_key(app_props, REQUEST_HEADER_KEY);
if (ref) { // optional
char *version_str = (char*) qd_iterator_copy(qd_parse_raw(ref));
if (version_str)
sscanf(version_str, "%"SCNu32".%"SCNu32, &major, &minor);
free(version_str);
}
// done copying and converting!
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding request method=%s target=%s",
hconn->conn_id, hconn->out_link_id, method_str, target_str);
hreq->base.lib_rs = h1_codec_tx_request(hconn->http_conn, method_str, target_str, major, minor);
if (!hreq->base.lib_rs) {
outcome = PN_REJECTED;
goto exit;
}
h1_codec_request_state_set_context(hreq->base.lib_rs, (void*) hreq);
// now send all headers in app properties
qd_parsed_field_t *key = qd_field_first_child(app_props);
bool ok = true;
while (ok && key) {
qd_parsed_field_t *value = qd_field_next_child(key);
if (!value)
break;
qd_iterator_t *i_key = qd_parse_raw(key);
if (!i_key)
break;
if (hconn->cfg.host_override && qd_iterator_equal(i_key, (const unsigned char*) HOST_KEY)) {
//if host override option is in use, write the configured
//value rather than that submitted by client
char *header_key = (char*) qd_iterator_copy(i_key);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding request header %s:%s",
hconn->conn_id, hconn->out_link_id,
header_key, hconn->cfg.host_override);
ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, hconn->cfg.host_override);
free(header_key);
} else if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) {
// ignore the special headers added by the mapping
qd_iterator_t *i_value = qd_parse_raw(value);
if (!i_value)
break;
char *header_key = (char*) qd_iterator_copy(i_key);
char *header_value = (char*) qd_iterator_copy(i_value);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding request header %s:%s",
hconn->conn_id, hconn->out_link_id,
header_key, header_value);
ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value);
free(header_key);
free(header_value);
}
key = qd_field_next_child(value);
}
if (!ok)
outcome = PN_REJECTED;
exit:
free(method_str);
free(target_str);
qd_parse_free(app_props);
return outcome;
}
// Encode an outbound AMQP message as an HTTP Request. Sets the request_dispo
// when the encoding completes either successfully or in error.
//
static void _encode_request_message(_server_request_t *hreq)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = qdr_delivery_message(hreq->request_dlv);
if (!hreq->headers_encoded) {
hreq->request_dispo = _send_request_headers(hreq, msg);
hreq->headers_encoded = true;
if (hreq->request_dispo) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message msg-id=%"PRIu64,
hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
return;
}
}
while (hreq->request_dispo == 0) {
qd_message_stream_data_t *stream_data = 0;
switch (qd_message_next_stream_data(msg, &stream_data)) {
case QD_MESSAGE_STREAM_DATA_BODY_OK: {
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding request body data",
hconn->conn_id, hconn->out_link_id);
if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] body data encode failed",
hconn->conn_id, hconn->out_link_id);
hreq->request_dispo = PN_REJECTED;
}
break;
}
case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
qd_message_stream_data_release(stream_data);
break;
case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Request %p body data encode complete",
hconn->conn_id, hconn->out_link_id, (void*) hreq);
hreq->request_dispo = PN_ACCEPTED;
break;
case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
return; // wait for more
case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
hconn->conn_id, hconn->out_link_id);
hreq->request_dispo = PN_REJECTED;
break;
}
}
}
// encode the request message and write it out to the server.
static void _send_request_message(_server_request_t *hreq)
{
if (hreq) {
assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
qdr_http1_connection_t *hconn = hreq->base.hconn;
if (hreq->request_dispo == 0) {
_encode_request_message(hreq);
switch (hreq->request_dispo) {
case 0:
// streaming, not complete
break;
case PN_ACCEPTED: {
// completed successfully
bool ignore = false; // used client-facing only
h1_codec_tx_done(hreq->base.lib_rs, &ignore);
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] HTTP %p request message msg-id=%"PRIu64" encoding complete",
hconn->conn_id, hconn->out_link_id, (void*)hreq, hreq->base.msg_id);
break;
}
default:
// encoding failure
_cancel_request(hreq);
return;
}
}
// write encoded data to raw conn
_write_pending_request(hreq);
}
}
// The router wants to send this delivery out the link. This is either the
// start of a new incoming HTTP request or the continuation of an existing one.
// Note: returning a non-zero value will cause the delivery to be settled!
//
uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
qdr_link_t *link,
qdr_delivery_t *delivery,
bool settled)
{
qd_message_t *msg = qdr_delivery_message(delivery);
_server_request_t *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
if (!hreq) {
// new delivery - create new request:
switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
case QD_MESSAGE_DEPTH_INCOMPLETE:
return 0;
case QD_MESSAGE_DEPTH_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
return PN_REJECTED;
case QD_MESSAGE_DEPTH_OK:
hreq = _create_request_context(hconn, msg);
if (!hreq) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
qd_message_set_send_complete(msg);
qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
return PN_REJECTED;
}
hreq->request_dlv = delivery;
qdr_delivery_set_context(delivery, (void*) hreq);
qdr_delivery_incref(delivery, "HTTP1 server referencing request delivery");
break;
}
}
if (DEQ_HEAD(hconn->requests) == &hreq->base)
_send_request_message(hreq);
return 0;
}
//
// Misc
//
// free the response message
//
static void _server_response_msg_free(_server_request_t *hreq, _server_response_msg_t *rmsg)
{
DEQ_REMOVE(hreq->responses, rmsg);
// deactivate the Q2 callback
qd_message_t *msg = rmsg->dlv ? qdr_delivery_message(rmsg->dlv) : rmsg->msg;
qd_message_clear_q2_unblocked_handler(msg);
qd_message_free(rmsg->msg);
qd_compose_free(rmsg->msg_props);
if (rmsg->dlv) {
qdr_delivery_set_context(rmsg->dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, rmsg->dlv, "HTTP1 server releasing response delivery");
}
free__server_response_msg_t(rmsg);
}
// Release the request
//
static void _server_request_free(_server_request_t *hreq)
{
if (hreq) {
qdr_http1_request_base_cleanup(&hreq->base);
if (hreq->request_dlv) {
qdr_delivery_set_context(hreq->request_dlv, 0);
qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server releasing request delivery");
}
qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
_server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
while (rmsg) {
_server_response_msg_free(hreq, rmsg);
rmsg = DEQ_HEAD(hreq->responses);
}
free__server_request_t(hreq);
}
}
static void _write_pending_request(_server_request_t *hreq)
{
if (hreq && !hreq->cancelled) {
assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
hreq->base.out_http1_octets += written;
if (written)
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] %"PRIu64" request octets written to server",
hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, written);
}
}
void qdr_http1_server_conn_cleanup(qdr_http1_connection_t *hconn)
{
for (_server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
hreq;
hreq = (_server_request_t*) DEQ_HEAD(hconn->requests)) {
_server_request_free(hreq);
}
}
static void _cancel_request(_server_request_t *hreq)
{
if (!hreq->cancelled) {
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Cancelling HTTP Request msg-id=%"PRIu64,
hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
hreq->base.msg_id);
if (!hreq->base.lib_rs) {
// never even got to encoding it - manually mark it cancelled
hreq->cancelled = true;
} else {
// cleanup codec state - this will call _server_request_complete_cb()
// with cancelled = true
h1_codec_request_state_cancel(hreq->base.lib_rs);
}
}
// cleanup occurs at the end of the connection event handler
}
// handle connection close request from management
//
void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor,
qdr_http1_connection_t *hconn,
const char *error)
{
qdr_connection_t *qdr_conn = hconn->qdr_conn;
// prevent activation by core thread
sys_mutex_lock(qdr_http1_adaptor->lock);
qdr_connection_set_context(hconn->qdr_conn, 0);
hconn->qdr_conn = 0;
sys_mutex_unlock(qdr_http1_adaptor->lock);
// the core thread can no longer activate this connection
qdr_connection_closed(qdr_conn);
qdr_http1_close_connection(hconn, "Connection closed by management");
// it is expected that this callback is the final callback before returning
// from qdr_connection_process(). Free hconn when qdr_connection_process returns.
}