blob: a047db89e25099186057db72fdf6801cb416aa9e [file] [log] [blame]
/* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
#include <stdlib.h>
#include <apr_pools.h>
#include <apr_strings.h>
#include "serf.h"
#include "serf_bucket_util.h"
#include "serf_private.h"
#include "protocols/http2_buckets.h"
#include "protocols/http2_protocol.h"
struct serf_http2_stream_data_t
{
serf_request_t *request; /* May be NULL as streams may outlive requests */
serf_incoming_request_t *in_request;
serf_bucket_t *response_agg;
serf_hpack_table_t *tbl;
serf_bucket_t *data_tail;
bool resetted;
};
serf_http2_stream_t *
serf_http2__stream_create(serf_http2_protocol_t *h2,
apr_int32_t streamid,
apr_uint32_t lr_window,
apr_uint32_t rl_window,
serf_bucket_alloc_t *alloc)
{
serf_http2_stream_t *stream = serf_bucket_mem_alloc(alloc,
sizeof(*stream));
stream->h2 = h2;
stream->alloc = alloc;
stream->next = stream->prev = NULL;
/* Delay creating this? */
stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data));
stream->data->request = NULL;
stream->data->in_request = NULL;
stream->data->response_agg = NULL;
stream->data->tbl = NULL;
stream->data->data_tail = NULL;
stream->data->resetted = false;
stream->lr_window = lr_window;
stream->rl_window = rl_window;
stream->rl_window_upd_below = 1024 * 1024;
stream->rl_window_upd_to = 16 * 1024 * 1024;
if (streamid >= 0)
stream->streamid = streamid;
else
stream->streamid = -1; /* Undetermined yet */
stream->status = (streamid >= 0) ? H2S_IDLE : H2S_INIT;
stream->new_reserved_stream = NULL;
stream->prev_writable = stream->next_writable = NULL;
return stream;
}
void
serf_http2__stream_pre_cleanup(serf_http2_stream_t *stream)
{
if (stream->data) {
if (stream->data->data_tail) {
serf_bucket_destroy(stream->data->data_tail);
stream->data->data_tail = NULL;
}
}
}
void
serf_http2__stream_cleanup(serf_http2_stream_t *stream)
{
if (stream->data) {
if (stream->data->response_agg)
serf_bucket_destroy(stream->data->response_agg);
serf_bucket_mem_free(stream->alloc, stream->data);
stream->data = NULL;
}
serf_bucket_mem_free(stream->alloc, stream);
}
static apr_status_t stream_send_headers(serf_http2_stream_t *stream,
serf_bucket_t *hpack,
apr_size_t max_payload_size,
bool end_stream,
bool priority)
{
apr_status_t status;
bool first_frame = true;
/* And now schedule the packet for writing. Note that it is required
by the HTTP/2 spec to send HEADERS and CONTINUATION directly after
each other, without other frames inbetween. */
while (hpack != NULL)
{
serf_bucket_t *next;
apr_uint64_t remaining;
/* hpack buckets implement get_remaining. And if they didn't adding the
framing around them would apply some reads that fix the buckets.
So we can ignore the theoretical endless loop here for two different
reasons
*/
remaining = serf_bucket_get_remaining(hpack);
if (remaining > max_payload_size) {
serf_bucket_split_create(&next, &hpack, hpack,
max_payload_size - (max_payload_size / 4),
max_payload_size);
}
else
{
next = hpack;
hpack = NULL;
}
next = serf__bucket_http2_frame_create(next,
first_frame
? HTTP2_FRAME_TYPE_HEADERS
: HTTP2_FRAME_TYPE_CONTINUATION,
(end_stream
? HTTP2_FLAG_END_STREAM
: 0)
| ((hpack != NULL)
? 0
: HTTP2_FLAG_END_HEADERS)
| (priority
? HTTP2_FLAG_PRIORITY
: 0),
&stream->streamid,
serf_http2__allocate_stream_id,
stream,
max_payload_size,
next->allocator);
status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
if (SERF_BUCKET_READ_ERROR(status))
return status; /* Connection dead */
first_frame = false; /* Continue with 'continuation' frames */
}
return APR_SUCCESS;
}
typedef struct window_allocate_info_t
{
serf_http2_stream_t *stream;
serf_bucket_t *bkt;
apr_size_t allocated;
} window_allocate_info_t;
static apr_status_t data_write_started(void *baton,
apr_uint64_t bytes_read)
{
window_allocate_info_t *wai = baton;
bytes_read = serf_bucket_get_remaining(wai->bkt);
/* Handles unavailable for free */
if (bytes_read <= wai->allocated) {
/* Nice, we can return something now */
apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read);
serf_http2__return_window(wai->stream->h2, wai->stream, to_much);
wai->allocated = 0;
}
return APR_SUCCESS;
}
static apr_status_t data_write_done(void *baton,
apr_uint64_t bytes_read)
{
window_allocate_info_t *wai = baton;
if (wai->allocated && bytes_read <= wai->allocated) {
/* Nice, we can return something now */
apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read);
wai->stream->lr_window += to_much;
serf_http2__return_window(wai->stream->h2, wai->stream, to_much);
wai->allocated = 0;
}
serf_bucket_mem_free(wai->stream->alloc, wai);
return APR_SUCCESS;
}
static apr_status_t stream_send_data(serf_http2_stream_t *stream,
serf_bucket_t *data)
{
apr_uint64_t remaining;
serf_bucket_t *next;
apr_size_t prefix_len;
bool end_stream;
apr_status_t status;
SERF_H2_assert(stream->status == H2S_OPEN
|| stream->status == H2S_HALFCLOSED_REMOTE);
SERF_H2_assert(!stream->data->data_tail || (data ==
stream->data->data_tail));
/* Sending DATA frames over HTTP/2 is not easy as this usually requires
handling windowing, priority, etc. This code will improve over time */
stream->data->data_tail = NULL;
if (!data)
remaining = 0;
else
remaining = serf_bucket_get_remaining(data);
/* If the stream decided we are already done */
if (remaining == 0) {
if (stream->status == H2S_OPEN)
stream->status = H2S_HALFCLOSED_LOCAL;
else
stream->status = H2S_CLOSED;
serf_bucket_destroy(data);
next = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_DATA,
HTTP2_FLAG_END_STREAM,
&stream->streamid,
serf_http2__allocate_stream_id,
stream, 0, stream->alloc);
return serf_http2__enqueue_frame(stream->h2, next, false);
}
prefix_len = serf_http2__alloc_window(stream->h2, stream,
(remaining >= APR_SIZE_MAX)
? SERF_READ_ALL_AVAIL
: (apr_size_t)remaining);
if (prefix_len == 0) {
/* No window left */
stream->data->data_tail = data;
/* Write more later */
serf_http2__ensure_writable(stream);
return APR_SUCCESS;
}
if (prefix_len < remaining) {
window_allocate_info_t *wai;
serf_bucket_split_create(&data, &stream->data->data_tail, data,
MIN(prefix_len, 1024), prefix_len);
wai = serf_bucket_mem_alloc(stream->alloc, sizeof(*wai));
wai->stream = stream;
wai->bkt = data;
wai->allocated = prefix_len;
data = serf__bucket_event_create(data, wai,
data_write_started,
data_write_done, NULL, stream->alloc);
end_stream = false;
serf_http2__ensure_writable(stream);
}
else {
end_stream = true;
if (stream->status == H2S_OPEN)
stream->status = H2S_HALFCLOSED_LOCAL;
else
stream->status = H2S_CLOSED;
}
next = serf__bucket_http2_frame_create(data, HTTP2_FRAME_TYPE_DATA,
end_stream ? HTTP2_FLAG_END_STREAM
: 0,
&stream->streamid,
serf_http2__allocate_stream_id,
stream, prefix_len,
data->allocator);
status = serf_http2__enqueue_frame(stream->h2, next, TRUE);
return status;
}
apr_status_t
serf_http2__stream_write_data(serf_http2_stream_t *stream)
{
SERF_H2_assert(stream->status == H2S_OPEN
|| stream->status == H2S_HALFCLOSED_REMOTE);
SERF_H2_assert(stream->data->data_tail != NULL);
return stream_send_data(stream, stream->data->data_tail);
}
static apr_status_t destroy_request_bucket(void *baton,
apr_uint64_t bytes_read)
{
serf_request_t *request = baton;
serf_bucket_destroy(request->req_bkt);
request->req_bkt = NULL;
request->writing = SERF_WRITING_FINISHED;
return APR_SUCCESS;
}
apr_status_t
serf_http2__stream_setup_next_request(serf_http2_stream_t *stream,
serf_connection_t *conn,
apr_size_t max_payload_size,
serf_hpack_table_t *hpack_tbl)
{
serf_request_t *request = conn->unwritten_reqs;
apr_status_t status;
serf_bucket_t *hpack;
serf_bucket_t *body;
bool end_stream;
bool priority = false;
SERF_H2_assert(request != NULL);
if (!request)
return APR_EGENERAL;
stream->data->request = request;
request->protocol_baton = stream;
if (!request->req_bkt) {
status = serf__setup_request(request);
if (status)
return status;
}
conn->unwritten_reqs = request->next;
if (conn->unwritten_reqs_tail == request)
conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL;
request->next = NULL;
serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail,
request);
conn->nr_of_written_reqs++;
conn->nr_of_unwritten_reqs--;
serf__bucket_request_read(request->req_bkt, &body, NULL, NULL);
status = serf__bucket_hpack_create_from_request(
&hpack, hpack_tbl,
request->req_bkt,
request->conn->host_info.scheme,
request->allocator);
if (status)
return status;
if (request->depends_on && request->depends_on->protocol_baton)
{
serf_http2_stream_t *ds = request->depends_on->protocol_baton;
if (ds->streamid >= 0) {
serf_bucket_t *agg;
unsigned char priority_data[5];
agg = serf_bucket_aggregate_create(request->allocator);
priority_data[0] = (ds->streamid >> 24) & 0x7F;
/* bit 7 of [0] is the exclusive flag */
priority_data[1] = (ds->streamid >> 16) & 0xFF;
priority_data[2] = (ds->streamid >> 8) & 0xFF;
priority_data[3] = ds->streamid & 0xFF;
priority_data[4] = request->dep_priority >> 8;
serf_bucket_aggregate_append(
agg,
serf_bucket_simple_copy_create((void *)priority_data,
5, request->allocator));
serf_bucket_aggregate_append(agg, hpack);
hpack = agg;
priority = true;
}
}
if (!body) {
serf_bucket_destroy(request->req_bkt);
request->req_bkt = NULL;
end_stream = true;
}
else
end_stream = false;
status = stream_send_headers(stream, hpack, max_payload_size,
end_stream, priority);
if (status)
return status;
if (end_stream) {
stream->status = H2S_HALFCLOSED_LOCAL; /* Headers sent; no body */
return APR_SUCCESS;
}
/* Yuck... we are not allowed to destroy body */
body = serf_bucket_barrier_create(body, request->allocator);
/* Setup an event bucket to destroy the actual request bucket when
the body is done */
body = serf__bucket_event_create(body, request,
NULL, NULL, destroy_request_bucket,
request->allocator);
stream->status = H2S_OPEN; /* Headers sent. Body to go */
request->writing = SERF_WRITING_STARTED;
return stream_send_data(stream, body);
}
apr_status_t
serf_http2__stream_reset(serf_http2_stream_t *stream,
apr_status_t reason,
bool local_reset)
{
stream->status = H2S_CLOSED;
if (stream->streamid < 0 || stream->data->resetted)
return APR_SUCCESS;
stream->data->resetted = true;
if (local_reset)
return serf_http2__enqueue_stream_reset(stream->h2,
stream->streamid,
reason);
return APR_SUCCESS;
}
void serf_http2__stream_cancel_request(serf_http2_stream_t *stream,
serf_request_t *rq,
apr_status_t reason)
{
if (stream->streamid < 0)
return; /* Never hit the wire */
else if (stream->status == H2S_CLOSED)
return; /* We are already detached */
if (reason < SERF_ERROR_HTTP2_NO_ERROR
|| reason > SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED)
{
reason = SERF_ERROR_HTTP2_CANCEL;
}
stream->data->request = rq; /* Might have changed! */
/* Let the other party know we don't want anything */
serf_http2__stream_reset(stream, reason, true);
}
void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream,
serf_request_t *rq,
bool exclusive)
{
if (stream->streamid < 0)
return; /* Never hit the wire */
else if (stream->status == H2S_CLOSED)
return; /* We are already detached */
/* Ignore for now. We start by handling this at setup */
}
static apr_status_t
stream_response_eof(void *baton,
serf_bucket_t *aggregate_bucket)
{
serf_http2_stream_t *stream = baton;
if (stream->data->resetted)
return APR_EAGAIN;
switch (stream->status)
{
case H2S_CLOSED:
case H2S_HALFCLOSED_REMOTE:
return APR_EOF;
default:
return APR_EAGAIN;
}
}
static int set_hpack_header(void *baton,
const char *key,
const char *value)
{
serf_bucket_t *hpack = baton;
serf__bucket_hpack_setc(hpack, key, value);
return 0;
}
static apr_status_t
http2_stream_enqueue_response(serf_incoming_request_t *request,
void *enqueue_baton,
serf_bucket_t *response_bkt)
{
serf_http2_stream_t *stream = enqueue_baton;
serf_bucket_t *hpack;
serf_bucket_t *headers;
serf_bucket_t *h1_response;
serf_status_line sline;
apr_status_t status;
/* OK, this could be implemented using knowledge of the buckets, in
a 100% more efficient, but I don't want to introduce new bucket
types for this yet. Let's just read everything the http/1 way
and put it in HTTP/2 appropriate places */
h1_response = serf_bucket_response_create(response_bkt,
stream->alloc);
do
{
status = serf_bucket_response_status(h1_response, &sline);
} while (status != APR_SUCCESS);
if (status != APR_SUCCESS)
return APR_EGENERAL; /* Can't read statusline. No EAGAIN support before
the body (yet) */
hpack = serf__bucket_hpack_create(stream->data->tbl, stream->alloc);
serf__bucket_hpack_setc(hpack, ":status",
apr_itoa(stream->data->in_request->pool,
sline.code));
do
{
status = serf_bucket_response_wait_for_headers(h1_response);
} while (status != APR_SUCCESS);
if (status != APR_SUCCESS)
return APR_EGENERAL; /* Can't read body. No EAGAIN support before
the body (yet) */
headers = serf_bucket_response_get_headers(h1_response);
serf_bucket_headers_do(headers, set_hpack_header, hpack);
status = stream_send_headers(stream, hpack,
serf_http2__max_payload_size(stream->h2),
false /* eos */, false /* priority */);
if (status)
return status;
return stream_send_data(stream, response_bkt);
}
static apr_status_t
stream_setup_response(serf_http2_stream_t *stream,
serf_config_t *config)
{
serf_bucket_t *agg;
apr_status_t status;
agg = serf_bucket_aggregate_create(stream->alloc);
serf_bucket_aggregate_hold_open(agg, stream_response_eof, stream);
serf_bucket_set_config(agg, config);
stream->data->response_agg = agg;
if (stream->data->request) {
serf_request_t *request = stream->data->request;
if (!request->resp_bkt) {
apr_pool_t *scratch_pool = request->respool; /* ### Pass scratch pool */
request->resp_bkt = request->acceptor(request, agg,
request->acceptor_baton,
scratch_pool);
}
}
else {
serf_incoming_request_t *in_request = stream->data->in_request;
if (!in_request) {
serf_incoming_request_setup_t req_setup;
void *req_setup_baton;
status = serf_http2__setup_incoming_request(&in_request, &req_setup,
&req_setup_baton,
stream->h2);
if (status)
return status;
stream->data->in_request = in_request;
status = req_setup(&in_request->req_bkt, agg,
in_request, req_setup_baton,
&in_request->handler,
&in_request->handler_baton,
&in_request->response_setup,
&in_request->response_setup_baton,
in_request->pool);
if (status)
return status;
stream->status = H2S_OPEN;
in_request->enqueue_response = http2_stream_enqueue_response;
in_request->enqueue_baton = stream;
}
}
return APR_SUCCESS;
}
static apr_status_t
stream_promise_done(void *baton,
serf_bucket_t *done_agg)
{
serf_http2_stream_t *parent_stream = baton;
serf_http2_stream_t *stream = parent_stream->new_reserved_stream;
SERF_H2_assert(stream != NULL);
SERF_H2_assert(stream->status == H2S_RESERVED_REMOTE);
parent_stream->new_reserved_stream = NULL; /* End of PUSH_PROMISE */
/* Anything else? */
/* ### Absolute minimal implementation.
Just sending that we are not interested in the initial SETTINGS
would be the easier approach. */
serf_http2__stream_reset(stream, SERF_ERROR_HTTP2_REFUSED_STREAM, TRUE);
/* Exit condition:
* Either we should accept the stream and are ready to receive
HEADERS and DATA on it.
* Or we aren't and reject the stream
*/
SERF_H2_assert(stream->status == H2S_CLOSED
|| stream->data->request != NULL);
/* We must return a proper error or EOF here! */
return APR_EOF;
}
serf_bucket_t *
serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,
serf_bucket_t *bucket,
unsigned char frametype,
bool end_stream,
apr_size_t max_entry_size,
serf_hpack_table_t *hpack_tbl,
serf_config_t *config,
serf_bucket_alloc_t *allocator)
{
if (frametype == HTTP2_FRAME_TYPE_HEADERS) {
if (!stream->data->response_agg)
stream_setup_response(stream, config);
stream->data->tbl = hpack_tbl;
bucket = serf__bucket_hpack_decode_create(bucket, max_entry_size,
hpack_tbl, allocator);
serf_bucket_aggregate_append(stream->data->response_agg, bucket);
if (end_stream) {
if (stream->status == H2S_HALFCLOSED_LOCAL)
stream->status = H2S_CLOSED;
else
stream->status = H2S_HALFCLOSED_REMOTE;
}
return NULL; /* We want to drain the bucket ourselves */
}
else
{
serf_bucket_t *agg;
SERF_H2_assert(frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE);
/* First create the HPACK decoder as requested */
/* TODO: Store key+value somewhere to allow asking the application
if it is interested in the promised stream.
Most likely it is not interested *yet* as the HTTP/2 spec
recommends pushing promised items *before* the stream that
references them.
So we probably want to store the request anyway, to allow
matching this against a later added outgoing request.
*/
bucket = serf__bucket_hpack_decode_create(bucket, max_entry_size,
hpack_tbl, allocator);
/* And now wrap around it the easiest way to get an EOF callback */
agg = serf_bucket_aggregate_create(allocator);
serf_bucket_aggregate_append(agg, bucket);
serf_bucket_aggregate_hold_open(agg, stream_promise_done, stream);
/* And return the aggregate, so the bucket will be drained for us */
return agg;
}
}
serf_bucket_t *
serf_http2__stream_handle_data(serf_http2_stream_t *stream,
serf_bucket_t *bucket,
unsigned char frametype,
bool end_stream,
serf_config_t *config,
serf_bucket_alloc_t *allocator)
{
if (!stream->data->response_agg)
stream_setup_response(stream, config);
serf_bucket_aggregate_append(stream->data->response_agg, bucket);
if (end_stream) {
if (stream->status == H2S_HALFCLOSED_LOCAL)
stream->status = H2S_CLOSED;
else
stream->status = H2S_HALFCLOSED_REMOTE;
}
return NULL;
}
apr_status_t
serf_http2__stream_processor(void *baton,
serf_http2_protocol_t *h2,
serf_bucket_t *bucket)
{
serf_http2_stream_t *stream = baton;
serf_http2_stream_data_t *sd = stream->data;
apr_status_t status = APR_SUCCESS;
SERF_H2_assert(stream->data->response_agg != NULL
&& !stream->data->resetted);
if (sd->request) {
SERF_H2_assert(sd->request->resp_bkt != NULL);
/* Response handlers are expected to read until they get some error,
but at least some implementations assume that just returning
APR_SUCCESS will have them called again, as that used to work as
an APR_EAGAIN like system in HTTP/1.
But we can't just fall back with HTTP/2, as we might still have
some part of the frame open (good case), or we might have completed
the frame and are never called again. */
do {
status = serf__handle_response(sd->request,
sd->request->respool);
} while (status == APR_SUCCESS);
if (sd->resetted) {
status = APR_EOF;
}
else if (!APR_STATUS_IS_EOF(status)
&& !SERF_BUCKET_READ_ERROR(status))
{
return status;
}
/* Ok, the request thinks is done, let's handle the bookkeeping,
to remove it from the outstanding requests */
{
serf_connection_t *conn = serf_request_get_conn(sd->request);
serf_request_t **rq = &conn->written_reqs;
serf_request_t *last = NULL;
while (*rq && (*rq != sd->request)) {
last = *rq;
rq = &last->next;
}
if (*rq)
{
(*rq) = sd->request->next;
if (conn->written_reqs_tail == sd->request)
conn->written_reqs_tail = last;
conn->nr_of_written_reqs--;
}
serf__destroy_request(sd->request);
stream->data->request = NULL;
}
if (SERF_BUCKET_READ_ERROR(status)) {
if (stream->status != H2S_CLOSED) {
/* Tell the other side that we are no longer interested
to receive more data */
serf_http2__stream_reset(stream, status, TRUE);
}
return status;
}
SERF_H2_assert(APR_STATUS_IS_EOF(status));
/* Even though the request reported that it is done, we might not
have read all the data that we should (*cough* padding *cough*),
or perhaps an invalid 'Content-Length' value; maybe both.
This may even handle not-interested - return EOF cases, but that
would have broken the pipeline for HTTP/1.1.
*/
/* ### For now, fall through and eat whatever is left.
Usually this is 0 bytes */
status = APR_SUCCESS;
}
else if (stream->data->in_request) {
serf_incoming_request_t *request = stream->data->in_request;
SERF_H2_assert(request->req_bkt != NULL);
status = request->handler(request, request->req_bkt,
request->handler_baton,
request->pool);
if (!APR_STATUS_IS_EOF(status)
&& !SERF_BUCKET_READ_ERROR(status))
return status;
if (APR_STATUS_IS_EOF(status)) {
status = serf_incoming_response_create(request);
if (status)
return status;
}
if (SERF_BUCKET_READ_ERROR(status)) {
if (stream->status != H2S_CLOSED) {
/* Tell the other side that we are no longer interested
to receive more data */
serf_http2__stream_reset(stream, status, TRUE);
}
return status;
}
}
while (!status)
{
struct iovec vecs[SERF__STD_IOV_COUNT];
int vecs_used;
/* Drain the bucket as efficiently as possible */
status = serf_bucket_read_iovec(stream->data->response_agg,
SERF_READ_ALL_AVAIL, COUNT_OF(vecs),
vecs, &vecs_used);
if (vecs_used) {
/* We have data... What should we do with it? */
}
}
if ((APR_STATUS_IS_EOF(status) || sd->resetted)
&& (stream->status == H2S_CLOSED
|| stream->status == H2S_HALFCLOSED_REMOTE))
{
/* If there was a request, it is already gone, so we can now safely
destroy our aggregate which may include everything upto the http2
frames */
serf_bucket_destroy(stream->data->response_agg);
stream->data->response_agg = NULL;
status = APR_EOF;
}
return status;
}