| /* ==================================================================== |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * ==================================================================== |
| */ |
| |
| #include <stdlib.h> |
| |
| #include <apr_pools.h> |
| #include <apr_strings.h> |
| |
| #include "serf.h" |
| #include "serf_bucket_util.h" |
| #include "serf_private.h" |
| |
| #include "protocols/http2_buckets.h" |
| #include "protocols/http2_protocol.h" |
| |
| struct serf_http2_stream_data_t |
| { |
| serf_request_t *request; /* May be NULL as streams may outlive requests */ |
| serf_incoming_request_t *in_request; |
| serf_bucket_t *response_agg; |
| serf_hpack_table_t *tbl; |
| serf_bucket_t *data_tail; |
| bool resetted; |
| }; |
| |
| serf_http2_stream_t * |
| serf_http2__stream_create(serf_http2_protocol_t *h2, |
| apr_int32_t streamid, |
| apr_uint32_t lr_window, |
| apr_uint32_t rl_window, |
| serf_bucket_alloc_t *alloc) |
| { |
| serf_http2_stream_t *stream = serf_bucket_mem_alloc(alloc, |
| sizeof(*stream)); |
| |
| stream->h2 = h2; |
| stream->alloc = alloc; |
| |
| stream->next = stream->prev = NULL; |
| |
| /* Delay creating this? */ |
| stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data)); |
| stream->data->request = NULL; |
| stream->data->in_request = NULL; |
| stream->data->response_agg = NULL; |
| stream->data->tbl = NULL; |
| stream->data->data_tail = NULL; |
| stream->data->resetted = false; |
| |
| stream->lr_window = lr_window; |
| stream->rl_window = rl_window; |
| |
| stream->rl_window_upd_below = 1024 * 1024; |
| stream->rl_window_upd_to = 16 * 1024 * 1024; |
| |
| if (streamid >= 0) |
| stream->streamid = streamid; |
| else |
| stream->streamid = -1; /* Undetermined yet */ |
| |
| stream->status = (streamid >= 0) ? H2S_IDLE : H2S_INIT; |
| stream->new_reserved_stream = NULL; |
| |
| stream->prev_writable = stream->next_writable = NULL; |
| |
| return stream; |
| } |
| |
| void |
| serf_http2__stream_pre_cleanup(serf_http2_stream_t *stream) |
| { |
| if (stream->data) { |
| if (stream->data->data_tail) { |
| serf_bucket_destroy(stream->data->data_tail); |
| stream->data->data_tail = NULL; |
| } |
| } |
| } |
| |
| void |
| serf_http2__stream_cleanup(serf_http2_stream_t *stream) |
| { |
| if (stream->data) { |
| if (stream->data->response_agg) |
| serf_bucket_destroy(stream->data->response_agg); |
| |
| serf_bucket_mem_free(stream->alloc, stream->data); |
| stream->data = NULL; |
| } |
| serf_bucket_mem_free(stream->alloc, stream); |
| } |
| |
| static apr_status_t stream_send_headers(serf_http2_stream_t *stream, |
| serf_bucket_t *hpack, |
| apr_size_t max_payload_size, |
| bool end_stream, |
| bool priority) |
| { |
| apr_status_t status; |
| bool first_frame = true; |
| |
| /* And now schedule the packet for writing. Note that it is required |
| by the HTTP/2 spec to send HEADERS and CONTINUATION directly after |
| each other, without other frames inbetween. */ |
| while (hpack != NULL) |
| { |
| serf_bucket_t *next; |
| apr_uint64_t remaining; |
| |
| /* hpack buckets implement get_remaining. And if they didn't adding the |
| framing around them would apply some reads that fix the buckets. |
| |
| So we can ignore the theoretical endless loop here for two different |
| reasons |
| */ |
| remaining = serf_bucket_get_remaining(hpack); |
| |
| if (remaining > max_payload_size) { |
| serf_bucket_split_create(&next, &hpack, hpack, |
| max_payload_size - (max_payload_size / 4), |
| max_payload_size); |
| } |
| else |
| { |
| next = hpack; |
| hpack = NULL; |
| } |
| |
| next = serf__bucket_http2_frame_create(next, |
| first_frame |
| ? HTTP2_FRAME_TYPE_HEADERS |
| : HTTP2_FRAME_TYPE_CONTINUATION, |
| (end_stream |
| ? HTTP2_FLAG_END_STREAM |
| : 0) |
| | ((hpack != NULL) |
| ? 0 |
| : HTTP2_FLAG_END_HEADERS) |
| | (priority |
| ? HTTP2_FLAG_PRIORITY |
| : 0), |
| &stream->streamid, |
| serf_http2__allocate_stream_id, |
| stream, |
| max_payload_size, |
| next->allocator); |
| status = serf_http2__enqueue_frame(stream->h2, next, TRUE); |
| |
| if (SERF_BUCKET_READ_ERROR(status)) |
| return status; /* Connection dead */ |
| |
| first_frame = false; /* Continue with 'continuation' frames */ |
| } |
| |
| return APR_SUCCESS; |
| } |
| |
| typedef struct window_allocate_info_t |
| { |
| serf_http2_stream_t *stream; |
| serf_bucket_t *bkt; |
| apr_size_t allocated; |
| } window_allocate_info_t; |
| |
| static apr_status_t data_write_started(void *baton, |
| apr_uint64_t bytes_read) |
| { |
| window_allocate_info_t *wai = baton; |
| |
| bytes_read = serf_bucket_get_remaining(wai->bkt); |
| |
| /* Handles unavailable for free */ |
| if (bytes_read <= wai->allocated) { |
| /* Nice, we can return something now */ |
| apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read); |
| |
| serf_http2__return_window(wai->stream->h2, wai->stream, to_much); |
| |
| wai->allocated = 0; |
| } |
| return APR_SUCCESS; |
| } |
| |
| static apr_status_t data_write_done(void *baton, |
| apr_uint64_t bytes_read) |
| { |
| window_allocate_info_t *wai = baton; |
| |
| if (wai->allocated && bytes_read <= wai->allocated) { |
| /* Nice, we can return something now */ |
| apr_size_t to_much = (wai->allocated - (apr_size_t)bytes_read); |
| wai->stream->lr_window += to_much; |
| |
| serf_http2__return_window(wai->stream->h2, wai->stream, to_much); |
| |
| wai->allocated = 0; |
| } |
| |
| serf_bucket_mem_free(wai->stream->alloc, wai); |
| return APR_SUCCESS; |
| } |
| |
| |
| static apr_status_t stream_send_data(serf_http2_stream_t *stream, |
| serf_bucket_t *data) |
| { |
| apr_uint64_t remaining; |
| serf_bucket_t *next; |
| apr_size_t prefix_len; |
| bool end_stream; |
| apr_status_t status; |
| |
| SERF_H2_assert(stream->status == H2S_OPEN |
| || stream->status == H2S_HALFCLOSED_REMOTE); |
| SERF_H2_assert(!stream->data->data_tail || (data == |
| stream->data->data_tail)); |
| |
| /* Sending DATA frames over HTTP/2 is not easy as this usually requires |
| handling windowing, priority, etc. This code will improve over time */ |
| stream->data->data_tail = NULL; |
| |
| if (!data) |
| remaining = 0; |
| else |
| remaining = serf_bucket_get_remaining(data); |
| |
| /* If the stream decided we are already done */ |
| if (remaining == 0) { |
| if (stream->status == H2S_OPEN) |
| stream->status = H2S_HALFCLOSED_LOCAL; |
| else |
| stream->status = H2S_CLOSED; |
| |
| serf_bucket_destroy(data); |
| |
| next = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_DATA, |
| HTTP2_FLAG_END_STREAM, |
| &stream->streamid, |
| serf_http2__allocate_stream_id, |
| stream, 0, stream->alloc); |
| return serf_http2__enqueue_frame(stream->h2, next, false); |
| } |
| |
| prefix_len = serf_http2__alloc_window(stream->h2, stream, |
| (remaining >= APR_SIZE_MAX) |
| ? SERF_READ_ALL_AVAIL |
| : (apr_size_t)remaining); |
| |
| if (prefix_len == 0) { |
| /* No window left */ |
| stream->data->data_tail = data; |
| |
| /* Write more later */ |
| serf_http2__ensure_writable(stream); |
| |
| return APR_SUCCESS; |
| } |
| |
| if (prefix_len < remaining) { |
| window_allocate_info_t *wai; |
| serf_bucket_split_create(&data, &stream->data->data_tail, data, |
| MIN(prefix_len, 1024), prefix_len); |
| |
| wai = serf_bucket_mem_alloc(stream->alloc, sizeof(*wai)); |
| wai->stream = stream; |
| wai->bkt = data; |
| wai->allocated = prefix_len; |
| |
| data = serf__bucket_event_create(data, wai, |
| data_write_started, |
| data_write_done, NULL, stream->alloc); |
| end_stream = false; |
| |
| serf_http2__ensure_writable(stream); |
| } |
| else { |
| end_stream = true; |
| |
| if (stream->status == H2S_OPEN) |
| stream->status = H2S_HALFCLOSED_LOCAL; |
| else |
| stream->status = H2S_CLOSED; |
| } |
| |
| next = serf__bucket_http2_frame_create(data, HTTP2_FRAME_TYPE_DATA, |
| end_stream ? HTTP2_FLAG_END_STREAM |
| : 0, |
| &stream->streamid, |
| serf_http2__allocate_stream_id, |
| stream, prefix_len, |
| data->allocator); |
| |
| status = serf_http2__enqueue_frame(stream->h2, next, TRUE); |
| |
| return status; |
| } |
| |
| apr_status_t |
| serf_http2__stream_write_data(serf_http2_stream_t *stream) |
| { |
| SERF_H2_assert(stream->status == H2S_OPEN |
| || stream->status == H2S_HALFCLOSED_REMOTE); |
| SERF_H2_assert(stream->data->data_tail != NULL); |
| |
| return stream_send_data(stream, stream->data->data_tail); |
| } |
| |
| static apr_status_t destroy_request_bucket(void *baton, |
| apr_uint64_t bytes_read) |
| { |
| serf_request_t *request = baton; |
| |
| serf_bucket_destroy(request->req_bkt); |
| request->req_bkt = NULL; |
| request->writing = SERF_WRITING_FINISHED; |
| return APR_SUCCESS; |
| } |
| |
| apr_status_t |
| serf_http2__stream_setup_next_request(serf_http2_stream_t *stream, |
| serf_connection_t *conn, |
| apr_size_t max_payload_size, |
| serf_hpack_table_t *hpack_tbl) |
| { |
| serf_request_t *request = conn->unwritten_reqs; |
| apr_status_t status; |
| serf_bucket_t *hpack; |
| serf_bucket_t *body; |
| bool end_stream; |
| bool priority = false; |
| |
| SERF_H2_assert(request != NULL); |
| if (!request) |
| return APR_EGENERAL; |
| |
| stream->data->request = request; |
| request->protocol_baton = stream; |
| |
| if (!request->req_bkt) { |
| status = serf__setup_request(request); |
| if (status) |
| return status; |
| } |
| |
| conn->unwritten_reqs = request->next; |
| if (conn->unwritten_reqs_tail == request) |
| conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL; |
| |
| request->next = NULL; |
| |
| serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail, |
| request); |
| conn->nr_of_written_reqs++; |
| conn->nr_of_unwritten_reqs--; |
| |
| serf__bucket_request_read(request->req_bkt, &body, NULL, NULL); |
| status = serf__bucket_hpack_create_from_request( |
| &hpack, hpack_tbl, |
| request->req_bkt, |
| request->conn->host_info.scheme, |
| request->allocator); |
| if (status) |
| return status; |
| |
| if (request->depends_on && request->depends_on->protocol_baton) |
| { |
| serf_http2_stream_t *ds = request->depends_on->protocol_baton; |
| |
| if (ds->streamid >= 0) { |
| serf_bucket_t *agg; |
| unsigned char priority_data[5]; |
| |
| agg = serf_bucket_aggregate_create(request->allocator); |
| |
| priority_data[0] = (ds->streamid >> 24) & 0x7F; |
| /* bit 7 of [0] is the exclusive flag */ |
| priority_data[1] = (ds->streamid >> 16) & 0xFF; |
| priority_data[2] = (ds->streamid >> 8) & 0xFF; |
| priority_data[3] = ds->streamid & 0xFF; |
| priority_data[4] = request->dep_priority >> 8; |
| |
| serf_bucket_aggregate_append( |
| agg, |
| serf_bucket_simple_copy_create((void *)priority_data, |
| 5, request->allocator)); |
| |
| serf_bucket_aggregate_append(agg, hpack); |
| hpack = agg; |
| |
| priority = true; |
| } |
| } |
| |
| if (!body) { |
| serf_bucket_destroy(request->req_bkt); |
| request->req_bkt = NULL; |
| end_stream = true; |
| } |
| else |
| end_stream = false; |
| |
| status = stream_send_headers(stream, hpack, max_payload_size, |
| end_stream, priority); |
| if (status) |
| return status; |
| |
| if (end_stream) { |
| stream->status = H2S_HALFCLOSED_LOCAL; /* Headers sent; no body */ |
| return APR_SUCCESS; |
| } |
| |
| /* Yuck... we are not allowed to destroy body */ |
| body = serf_bucket_barrier_create(body, request->allocator); |
| |
| /* Setup an event bucket to destroy the actual request bucket when |
| the body is done */ |
| body = serf__bucket_event_create(body, request, |
| NULL, NULL, destroy_request_bucket, |
| request->allocator); |
| |
| stream->status = H2S_OPEN; /* Headers sent. Body to go */ |
| request->writing = SERF_WRITING_STARTED; |
| return stream_send_data(stream, body); |
| } |
| |
| apr_status_t |
| serf_http2__stream_reset(serf_http2_stream_t *stream, |
| apr_status_t reason, |
| bool local_reset) |
| { |
| stream->status = H2S_CLOSED; |
| |
| if (stream->streamid < 0 || stream->data->resetted) |
| return APR_SUCCESS; |
| |
| stream->data->resetted = true; |
| |
| if (local_reset) |
| return serf_http2__enqueue_stream_reset(stream->h2, |
| stream->streamid, |
| reason); |
| |
| return APR_SUCCESS; |
| } |
| |
| void serf_http2__stream_cancel_request(serf_http2_stream_t *stream, |
| serf_request_t *rq, |
| apr_status_t reason) |
| { |
| if (stream->streamid < 0) |
| return; /* Never hit the wire */ |
| else if (stream->status == H2S_CLOSED) |
| return; /* We are already detached */ |
| |
| if (reason < SERF_ERROR_HTTP2_NO_ERROR |
| || reason > SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED) |
| { |
| reason = SERF_ERROR_HTTP2_CANCEL; |
| } |
| |
| stream->data->request = rq; /* Might have changed! */ |
| |
| /* Let the other party know we don't want anything */ |
| serf_http2__stream_reset(stream, reason, true); |
| |
| } |
| |
| void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream, |
| serf_request_t *rq, |
| bool exclusive) |
| { |
| if (stream->streamid < 0) |
| return; /* Never hit the wire */ |
| else if (stream->status == H2S_CLOSED) |
| return; /* We are already detached */ |
| |
| /* Ignore for now. We start by handling this at setup */ |
| } |
| |
| |
| static apr_status_t |
| stream_response_eof(void *baton, |
| serf_bucket_t *aggregate_bucket) |
| { |
| serf_http2_stream_t *stream = baton; |
| |
| if (stream->data->resetted) |
| return APR_EAGAIN; |
| |
| switch (stream->status) |
| { |
| case H2S_CLOSED: |
| case H2S_HALFCLOSED_REMOTE: |
| return APR_EOF; |
| default: |
| return APR_EAGAIN; |
| } |
| } |
| |
| static int set_hpack_header(void *baton, |
| const char *key, |
| const char *value) |
| { |
| serf_bucket_t *hpack = baton; |
| |
| serf__bucket_hpack_setc(hpack, key, value); |
| return 0; |
| } |
| |
| static apr_status_t |
| http2_stream_enqueue_response(serf_incoming_request_t *request, |
| void *enqueue_baton, |
| serf_bucket_t *response_bkt) |
| { |
| serf_http2_stream_t *stream = enqueue_baton; |
| serf_bucket_t *hpack; |
| serf_bucket_t *headers; |
| serf_bucket_t *h1_response; |
| serf_status_line sline; |
| apr_status_t status; |
| |
| /* OK, this could be implemented using knowledge of the buckets, in |
| a 100% more efficient, but I don't want to introduce new bucket |
| types for this yet. Let's just read everything the http/1 way |
| and put it in HTTP/2 appropriate places */ |
| h1_response = serf_bucket_response_create(response_bkt, |
| stream->alloc); |
| do |
| { |
| status = serf_bucket_response_status(h1_response, &sline); |
| } while (status != APR_SUCCESS); |
| |
| if (status != APR_SUCCESS) |
| return APR_EGENERAL; /* Can't read statusline. No EAGAIN support before |
| the body (yet) */ |
| |
| hpack = serf__bucket_hpack_create(stream->data->tbl, stream->alloc); |
| serf__bucket_hpack_setc(hpack, ":status", |
| apr_itoa(stream->data->in_request->pool, |
| sline.code)); |
| |
| do |
| { |
| status = serf_bucket_response_wait_for_headers(h1_response); |
| } while (status != APR_SUCCESS); |
| |
| if (status != APR_SUCCESS) |
| return APR_EGENERAL; /* Can't read body. No EAGAIN support before |
| the body (yet) */ |
| |
| headers = serf_bucket_response_get_headers(h1_response); |
| |
| serf_bucket_headers_do(headers, set_hpack_header, hpack); |
| |
| status = stream_send_headers(stream, hpack, |
| serf_http2__max_payload_size(stream->h2), |
| false /* eos */, false /* priority */); |
| |
| if (status) |
| return status; |
| |
| return stream_send_data(stream, response_bkt); |
| } |
| |
| static apr_status_t |
| stream_setup_response(serf_http2_stream_t *stream, |
| serf_config_t *config) |
| { |
| serf_bucket_t *agg; |
| apr_status_t status; |
| |
| agg = serf_bucket_aggregate_create(stream->alloc); |
| serf_bucket_aggregate_hold_open(agg, stream_response_eof, stream); |
| |
| serf_bucket_set_config(agg, config); |
| stream->data->response_agg = agg; |
| |
| if (stream->data->request) { |
| serf_request_t *request = stream->data->request; |
| |
| if (!request->resp_bkt) { |
| apr_pool_t *scratch_pool = request->respool; /* ### Pass scratch pool */ |
| |
| request->resp_bkt = request->acceptor(request, agg, |
| request->acceptor_baton, |
| scratch_pool); |
| } |
| } |
| else { |
| serf_incoming_request_t *in_request = stream->data->in_request; |
| |
| if (!in_request) { |
| serf_incoming_request_setup_t req_setup; |
| void *req_setup_baton; |
| |
| status = serf_http2__setup_incoming_request(&in_request, &req_setup, |
| &req_setup_baton, |
| stream->h2); |
| |
| if (status) |
| return status; |
| |
| stream->data->in_request = in_request; |
| |
| status = req_setup(&in_request->req_bkt, agg, |
| in_request, req_setup_baton, |
| &in_request->handler, |
| &in_request->handler_baton, |
| &in_request->response_setup, |
| &in_request->response_setup_baton, |
| in_request->pool); |
| |
| if (status) |
| return status; |
| |
| stream->status = H2S_OPEN; |
| |
| in_request->enqueue_response = http2_stream_enqueue_response; |
| in_request->enqueue_baton = stream; |
| } |
| } |
| |
| return APR_SUCCESS; |
| } |
| |
| static apr_status_t |
| stream_promise_done(void *baton, |
| serf_bucket_t *done_agg) |
| { |
| serf_http2_stream_t *parent_stream = baton; |
| serf_http2_stream_t *stream = parent_stream->new_reserved_stream; |
| |
| SERF_H2_assert(stream != NULL); |
| SERF_H2_assert(stream->status == H2S_RESERVED_REMOTE); |
| parent_stream->new_reserved_stream = NULL; /* End of PUSH_PROMISE */ |
| |
| /* Anything else? */ |
| |
| |
| /* ### Absolute minimal implementation. |
| Just sending that we are not interested in the initial SETTINGS |
| would be the easier approach. */ |
| serf_http2__stream_reset(stream, SERF_ERROR_HTTP2_REFUSED_STREAM, TRUE); |
| |
| |
| |
| |
| /* Exit condition: |
| * Either we should accept the stream and are ready to receive |
| HEADERS and DATA on it. |
| * Or we aren't and reject the stream |
| */ |
| SERF_H2_assert(stream->status == H2S_CLOSED |
| || stream->data->request != NULL); |
| |
| /* We must return a proper error or EOF here! */ |
| return APR_EOF; |
| } |
| |
| serf_bucket_t * |
| serf_http2__stream_handle_hpack(serf_http2_stream_t *stream, |
| serf_bucket_t *bucket, |
| unsigned char frametype, |
| bool end_stream, |
| apr_size_t max_entry_size, |
| serf_hpack_table_t *hpack_tbl, |
| serf_config_t *config, |
| serf_bucket_alloc_t *allocator) |
| { |
| if (frametype == HTTP2_FRAME_TYPE_HEADERS) { |
| |
| if (!stream->data->response_agg) |
| stream_setup_response(stream, config); |
| |
| stream->data->tbl = hpack_tbl; |
| |
| bucket = serf__bucket_hpack_decode_create(bucket, max_entry_size, |
| hpack_tbl, allocator); |
| |
| serf_bucket_aggregate_append(stream->data->response_agg, bucket); |
| |
| if (end_stream) { |
| |
| if (stream->status == H2S_HALFCLOSED_LOCAL) |
| stream->status = H2S_CLOSED; |
| else |
| stream->status = H2S_HALFCLOSED_REMOTE; |
| } |
| return NULL; /* We want to drain the bucket ourselves */ |
| } |
| else |
| { |
| serf_bucket_t *agg; |
| SERF_H2_assert(frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE); |
| |
| /* First create the HPACK decoder as requested */ |
| |
| /* TODO: Store key+value somewhere to allow asking the application |
| if it is interested in the promised stream. |
| |
| Most likely it is not interested *yet* as the HTTP/2 spec |
| recommends pushing promised items *before* the stream that |
| references them. |
| |
| So we probably want to store the request anyway, to allow |
| matching this against a later added outgoing request. |
| */ |
| bucket = serf__bucket_hpack_decode_create(bucket, max_entry_size, |
| hpack_tbl, allocator); |
| |
| /* And now wrap around it the easiest way to get an EOF callback */ |
| agg = serf_bucket_aggregate_create(allocator); |
| serf_bucket_aggregate_append(agg, bucket); |
| |
| serf_bucket_aggregate_hold_open(agg, stream_promise_done, stream); |
| |
| /* And return the aggregate, so the bucket will be drained for us */ |
| return agg; |
| } |
| } |
| |
| serf_bucket_t * |
| serf_http2__stream_handle_data(serf_http2_stream_t *stream, |
| serf_bucket_t *bucket, |
| unsigned char frametype, |
| bool end_stream, |
| serf_config_t *config, |
| serf_bucket_alloc_t *allocator) |
| { |
| if (!stream->data->response_agg) |
| stream_setup_response(stream, config); |
| |
| serf_bucket_aggregate_append(stream->data->response_agg, bucket); |
| |
| if (end_stream) { |
| |
| if (stream->status == H2S_HALFCLOSED_LOCAL) |
| stream->status = H2S_CLOSED; |
| else |
| stream->status = H2S_HALFCLOSED_REMOTE; |
| } |
| |
| return NULL; |
| } |
| |
| apr_status_t |
| serf_http2__stream_processor(void *baton, |
| serf_http2_protocol_t *h2, |
| serf_bucket_t *bucket) |
| { |
| serf_http2_stream_t *stream = baton; |
| serf_http2_stream_data_t *sd = stream->data; |
| apr_status_t status = APR_SUCCESS; |
| |
| SERF_H2_assert(stream->data->response_agg != NULL |
| && !stream->data->resetted); |
| |
| if (sd->request) { |
| |
| SERF_H2_assert(sd->request->resp_bkt != NULL); |
| |
| /* Response handlers are expected to read until they get some error, |
| but at least some implementations assume that just returning |
| APR_SUCCESS will have them called again, as that used to work as |
| an APR_EAGAIN like system in HTTP/1. |
| |
| But we can't just fall back with HTTP/2, as we might still have |
| some part of the frame open (good case), or we might have completed |
| the frame and are never called again. */ |
| do { |
| status = serf__handle_response(sd->request, |
| sd->request->respool); |
| } while (status == APR_SUCCESS); |
| |
| if (sd->resetted) { |
| status = APR_EOF; |
| } |
| else if (!APR_STATUS_IS_EOF(status) |
| && !SERF_BUCKET_READ_ERROR(status)) |
| { |
| return status; |
| } |
| |
| /* Ok, the request thinks is done, let's handle the bookkeeping, |
| to remove it from the outstanding requests */ |
| { |
| serf_connection_t *conn = serf_request_get_conn(sd->request); |
| serf_request_t **rq = &conn->written_reqs; |
| serf_request_t *last = NULL; |
| |
| while (*rq && (*rq != sd->request)) { |
| last = *rq; |
| rq = &last->next; |
| } |
| |
| if (*rq) |
| { |
| (*rq) = sd->request->next; |
| |
| if (conn->written_reqs_tail == sd->request) |
| conn->written_reqs_tail = last; |
| |
| conn->nr_of_written_reqs--; |
| } |
| |
| serf__destroy_request(sd->request); |
| stream->data->request = NULL; |
| } |
| |
| if (SERF_BUCKET_READ_ERROR(status)) { |
| |
| if (stream->status != H2S_CLOSED) { |
| /* Tell the other side that we are no longer interested |
| to receive more data */ |
| serf_http2__stream_reset(stream, status, TRUE); |
| } |
| |
| return status; |
| } |
| |
| SERF_H2_assert(APR_STATUS_IS_EOF(status)); |
| |
| /* Even though the request reported that it is done, we might not |
| have read all the data that we should (*cough* padding *cough*), |
| or perhaps an invalid 'Content-Length' value; maybe both. |
| |
| This may even handle not-interested - return EOF cases, but that |
| would have broken the pipeline for HTTP/1.1. |
| */ |
| |
| /* ### For now, fall through and eat whatever is left. |
| Usually this is 0 bytes */ |
| |
| status = APR_SUCCESS; |
| } |
| else if (stream->data->in_request) { |
| serf_incoming_request_t *request = stream->data->in_request; |
| |
| SERF_H2_assert(request->req_bkt != NULL); |
| |
| status = request->handler(request, request->req_bkt, |
| request->handler_baton, |
| request->pool); |
| |
| if (!APR_STATUS_IS_EOF(status) |
| && !SERF_BUCKET_READ_ERROR(status)) |
| return status; |
| |
| if (APR_STATUS_IS_EOF(status)) { |
| status = serf_incoming_response_create(request); |
| |
| if (status) |
| return status; |
| } |
| |
| if (SERF_BUCKET_READ_ERROR(status)) { |
| |
| if (stream->status != H2S_CLOSED) { |
| /* Tell the other side that we are no longer interested |
| to receive more data */ |
| serf_http2__stream_reset(stream, status, TRUE); |
| } |
| |
| return status; |
| } |
| } |
| |
| while (!status) |
| { |
| struct iovec vecs[SERF__STD_IOV_COUNT]; |
| int vecs_used; |
| |
| /* Drain the bucket as efficiently as possible */ |
| status = serf_bucket_read_iovec(stream->data->response_agg, |
| SERF_READ_ALL_AVAIL, COUNT_OF(vecs), |
| vecs, &vecs_used); |
| |
| if (vecs_used) { |
| /* We have data... What should we do with it? */ |
| } |
| } |
| |
| if ((APR_STATUS_IS_EOF(status) || sd->resetted) |
| && (stream->status == H2S_CLOSED |
| || stream->status == H2S_HALFCLOSED_REMOTE)) |
| { |
| /* If there was a request, it is already gone, so we can now safely |
| destroy our aggregate which may include everything upto the http2 |
| frames */ |
| serf_bucket_destroy(stream->data->response_agg); |
| stream->data->response_agg = NULL; |
| status = APR_EOF; |
| } |
| |
| return status; |
| } |