| /* Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <apr_lib.h> |
| #include <apr_atomic.h> |
| #include <apr_strings.h> |
| #include <apr_time.h> |
| #include <apr_buckets.h> |
| #include <apr_thread_mutex.h> |
| #include <apr_thread_cond.h> |
| |
| #include <httpd.h> |
| #include <http_protocol.h> |
| #include <http_log.h> |
| |
| #include "h2_private.h" |
| #include "h2_conn_ctx.h" |
| #include "h2_headers.h" |
| #include "h2_util.h" |
| #include "h2_bucket_beam.h" |
| |
| |
| #define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link); |
| #define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link) |
| #define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link) |
| #define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list) |
| #define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list) |
| #define H2_BLIST_INSERT_HEAD(b, e) do { \ |
| apr_bucket *ap__b = (e); \ |
| APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \ |
| } while (0) |
| #define H2_BLIST_INSERT_TAIL(b, e) do { \ |
| apr_bucket *ap__b = (e); \ |
| APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \ |
| } while (0) |
| #define H2_BLIST_CONCAT(a, b) do { \ |
| APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \ |
| } while (0) |
| #define H2_BLIST_PREPEND(a, b) do { \ |
| APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \ |
| } while (0) |
| |
| |
| static int buffer_is_empty(h2_bucket_beam *beam); |
| static apr_off_t get_buffered_data_len(h2_bucket_beam *beam); |
| |
| static int h2_blist_count(h2_blist *blist) |
| { |
| apr_bucket *b; |
| int count = 0; |
| |
| for (b = H2_BLIST_FIRST(blist); b != H2_BLIST_SENTINEL(blist); |
| b = APR_BUCKET_NEXT(b)) { |
| ++count; |
| } |
| return count; |
| } |
| |
| #define H2_BEAM_LOG(beam, c, level, rv, msg, bb) \ |
| do { \ |
| if (APLOG_C_IS_LEVEL((c),(level))) { \ |
| char buffer[4 * 1024]; \ |
| apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ |
| len = bb? h2_util_bb_print(buffer, bmax, "", "", bb) : 0; \ |
| ap_log_cerror(APLOG_MARK, (level), rv, (c), \ |
| "BEAM[%s,%s%sdata=%ld,buckets(send/consumed)=%d/%d]: %s %s", \ |
| (beam)->name, \ |
| (beam)->aborted? "aborted," : "", \ |
| buffer_is_empty(beam)? "empty," : "", \ |
| (long)get_buffered_data_len(beam), \ |
| h2_blist_count(&(beam)->buckets_to_send), \ |
| h2_blist_count(&(beam)->buckets_consumed), \ |
| (msg), len? buffer : ""); \ |
| } \ |
| } while (0) |
| |
| |
| static int bucket_is_mmap(apr_bucket *b) |
| { |
| #if APR_HAS_MMAP |
| return APR_BUCKET_IS_MMAP(b); |
| #else |
| /* if it is not defined as enabled, it should always be no */ |
| return 0; |
| #endif |
| } |
| |
| static apr_off_t bucket_mem_used(apr_bucket *b) |
| { |
| if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) { |
| return 0; |
| } |
| else { |
| /* should all have determinate length */ |
| return (apr_off_t)b->length; |
| } |
| } |
| |
| static int report_consumption(h2_bucket_beam *beam, int locked) |
| { |
| int rv = 0; |
| apr_off_t len = beam->recv_bytes - beam->recv_bytes_reported; |
| h2_beam_io_callback *cb = beam->cons_io_cb; |
| |
| if (len > 0) { |
| if (cb) { |
| void *ctx = beam->cons_ctx; |
| |
| if (locked) apr_thread_mutex_unlock(beam->lock); |
| cb(ctx, beam, len); |
| if (locked) apr_thread_mutex_lock(beam->lock); |
| rv = 1; |
| } |
| beam->recv_bytes_reported += len; |
| } |
| return rv; |
| } |
| |
| static apr_size_t calc_buffered(h2_bucket_beam *beam) |
| { |
| apr_size_t len = 0; |
| apr_bucket *b; |
| for (b = H2_BLIST_FIRST(&beam->buckets_to_send); |
| b != H2_BLIST_SENTINEL(&beam->buckets_to_send); |
| b = APR_BUCKET_NEXT(b)) { |
| if (b->length == ((apr_size_t)-1)) { |
| /* do not count */ |
| } |
| else if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) { |
| /* if unread, has no real mem footprint. */ |
| } |
| else { |
| len += b->length; |
| } |
| } |
| return len; |
| } |
| |
| static void purge_consumed_buckets(h2_bucket_beam *beam) |
| { |
| apr_bucket *b; |
| /* delete all sender buckets in purge brigade, needs to be called |
| * from sender thread only */ |
| while (!H2_BLIST_EMPTY(&beam->buckets_consumed)) { |
| b = H2_BLIST_FIRST(&beam->buckets_consumed); |
| apr_bucket_delete(b); |
| } |
| } |
| |
| static apr_size_t calc_space_left(h2_bucket_beam *beam) |
| { |
| if (beam->max_buf_size > 0) { |
| apr_size_t len = calc_buffered(beam); |
| return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0); |
| } |
| return APR_SIZE_MAX; |
| } |
| |
| static int buffer_is_empty(h2_bucket_beam *beam) |
| { |
| return H2_BLIST_EMPTY(&beam->buckets_to_send); |
| } |
| |
| static apr_status_t wait_not_empty(h2_bucket_beam *beam, conn_rec *c, apr_read_type_e block) |
| { |
| apr_status_t rv = APR_SUCCESS; |
| |
| while (buffer_is_empty(beam) && APR_SUCCESS == rv) { |
| if (beam->aborted) { |
| rv = APR_ECONNABORTED; |
| } |
| else if (beam->closed) { |
| rv = APR_EOF; |
| } |
| else if (APR_BLOCK_READ != block) { |
| rv = APR_EAGAIN; |
| } |
| else if (beam->timeout > 0) { |
| H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, timeout", NULL); |
| rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout); |
| } |
| else { |
| H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, forever", NULL); |
| rv = apr_thread_cond_wait(beam->change, beam->lock); |
| } |
| } |
| return rv; |
| } |
| |
| static apr_status_t wait_not_full(h2_bucket_beam *beam, conn_rec *c, |
| apr_read_type_e block, |
| apr_size_t *pspace_left) |
| { |
| apr_status_t rv = APR_SUCCESS; |
| apr_size_t left; |
| |
| while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) { |
| if (beam->aborted) { |
| rv = APR_ECONNABORTED; |
| } |
| else if (block != APR_BLOCK_READ) { |
| rv = APR_EAGAIN; |
| } |
| else { |
| if (beam->timeout > 0) { |
| H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, timeout", NULL); |
| rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout); |
| } |
| else { |
| H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, forever", NULL); |
| rv = apr_thread_cond_wait(beam->change, beam->lock); |
| } |
| } |
| } |
| *pspace_left = left; |
| return rv; |
| } |
| |
| static void h2_blist_cleanup(h2_blist *bl) |
| { |
| apr_bucket *e; |
| |
| while (!H2_BLIST_EMPTY(bl)) { |
| e = H2_BLIST_FIRST(bl); |
| apr_bucket_delete(e); |
| } |
| } |
| |
| static void beam_shutdown(h2_bucket_beam *beam, apr_shutdown_how_e how) |
| { |
| if (!beam->pool) { |
| /* pool being cleared already */ |
| return; |
| } |
| |
| /* shutdown both receiver and sender? */ |
| if (how == APR_SHUTDOWN_READWRITE) { |
| beam->cons_io_cb = NULL; |
| beam->recv_cb = NULL; |
| } |
| |
| /* shutdown sender (or both)? */ |
| if (how != APR_SHUTDOWN_READ) { |
| h2_blist_cleanup(&beam->buckets_to_send); |
| purge_consumed_buckets(beam); |
| } |
| } |
| |
| static apr_status_t beam_cleanup(void *data) |
| { |
| h2_bucket_beam *beam = data; |
| beam_shutdown(beam, APR_SHUTDOWN_READWRITE); |
| beam->pool = NULL; /* the pool is clearing now */ |
| return APR_SUCCESS; |
| } |
| |
| apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c) |
| { |
| if (beam->pool) { |
| H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroy", NULL); |
| apr_pool_cleanup_run(beam->pool, beam, beam_cleanup); |
| } |
| H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroyed", NULL); |
| return APR_SUCCESS; |
| } |
| |
| apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from, |
| apr_pool_t *pool, int id, const char *tag, |
| apr_size_t max_buf_size, |
| apr_interval_time_t timeout) |
| { |
| h2_bucket_beam *beam; |
| h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(from); |
| apr_status_t rv; |
| |
| beam = apr_pcalloc(pool, sizeof(*beam)); |
| beam->pool = pool; |
| beam->from = from; |
| beam->id = id; |
| beam->name = apr_psprintf(pool, "%s-%d-%s", |
| conn_ctx->id, id, tag); |
| |
| H2_BLIST_INIT(&beam->buckets_to_send); |
| H2_BLIST_INIT(&beam->buckets_consumed); |
| beam->tx_mem_limits = 1; |
| beam->max_buf_size = max_buf_size; |
| beam->timeout = timeout; |
| |
| rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool); |
| if (APR_SUCCESS != rv) goto cleanup; |
| rv = apr_thread_cond_create(&beam->change, pool); |
| if (APR_SUCCESS != rv) goto cleanup; |
| apr_pool_pre_cleanup_register(pool, beam, beam_cleanup); |
| |
| cleanup: |
| H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "created", NULL); |
| *pbeam = (APR_SUCCESS == rv)? beam : NULL; |
| return rv; |
| } |
| |
| void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->max_buf_size = buffer_size; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->copy_files = enabled; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) |
| { |
| apr_size_t buffer_size = 0; |
| |
| apr_thread_mutex_lock(beam->lock); |
| buffer_size = beam->max_buf_size; |
| apr_thread_mutex_unlock(beam->lock); |
| return buffer_size; |
| } |
| |
| apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam) |
| { |
| apr_interval_time_t timeout; |
| |
| apr_thread_mutex_lock(beam->lock); |
| timeout = beam->timeout; |
| apr_thread_mutex_unlock(beam->lock); |
| return timeout; |
| } |
| |
| void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->timeout = timeout; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->aborted = 1; |
| if (c == beam->from) { |
| /* sender aborts */ |
| if (beam->send_cb) { |
| beam->send_cb(beam->send_ctx, beam); |
| } |
| if (beam->was_empty_cb && buffer_is_empty(beam)) { |
| beam->was_empty_cb(beam->was_empty_ctx, beam); |
| } |
| /* no more consumption reporting to sender */ |
| report_consumption(beam, 1); |
| beam->cons_ctx = NULL; |
| |
| beam_shutdown(beam, APR_SHUTDOWN_WRITE); |
| } |
| else { |
| /* receiver aborts */ |
| beam_shutdown(beam, APR_SHUTDOWN_READ); |
| } |
| apr_thread_cond_broadcast(beam->change); |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| void h2_beam_close(h2_bucket_beam *beam, conn_rec *c) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| if (!beam->closed) { |
| /* should only be called from sender */ |
| ap_assert(c == beam->from); |
| beam->closed = 1; |
| if (beam->send_cb) { |
| beam->send_cb(beam->send_ctx, beam); |
| } |
| if (beam->was_empty_cb && buffer_is_empty(beam)) { |
| beam->was_empty_cb(beam->was_empty_ctx, beam); |
| } |
| apr_thread_cond_broadcast(beam->change); |
| } |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| static apr_status_t append_bucket(h2_bucket_beam *beam, |
| apr_bucket_brigade *bb, |
| apr_read_type_e block, |
| apr_size_t *pspace_left, |
| apr_off_t *pwritten) |
| { |
| apr_bucket *b; |
| const char *data; |
| apr_size_t len; |
| apr_status_t rv = APR_SUCCESS; |
| int can_beam = 0; |
| |
| (void)block; |
| if (beam->aborted) { |
| rv = APR_ECONNABORTED; |
| goto cleanup; |
| } |
| |
| ap_assert(beam->pool); |
| |
| b = APR_BRIGADE_FIRST(bb); |
| if (APR_BUCKET_IS_METADATA(b)) { |
| APR_BUCKET_REMOVE(b); |
| apr_bucket_setaside(b, beam->pool); |
| H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b); |
| goto cleanup; |
| } |
| /* non meta bucket */ |
| |
| /* in case of indeterminate length, we need to read the bucket, |
| * so that it transforms itself into something stable. */ |
| if (b->length == ((apr_size_t)-1)) { |
| rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); |
| if (rv != APR_SUCCESS) goto cleanup; |
| } |
| |
| if (APR_BUCKET_IS_FILE(b)) { |
| /* For file buckets the problem is their internal readpool that |
| * is used on the first read to allocate buffer/mmap. |
| * Since setting aside a file bucket will de-register the |
| * file cleanup function from the previous pool, we need to |
| * call that only from the sender thread. |
| * |
| * Currently, we do not handle file bucket with refcount > 1 as |
| * the beam is then not in complete control of the file's lifetime. |
| * Which results in the bug that a file get closed by the receiver |
| * while the sender or the beam still have buckets using it. |
| * |
| * Additionally, we allow callbacks to prevent beaming file |
| * handles across. The use case for this is to limit the number |
| * of open file handles and rather use a less efficient beam |
| * transport. */ |
| apr_bucket_file *bf = b->data; |
| can_beam = !beam->copy_files && (bf->refcount.refcount == 1); |
| } |
| else if (bucket_is_mmap(b)) { |
| can_beam = !beam->copy_files; |
| } |
| |
| if (b->length == 0) { |
| apr_bucket_delete(b); |
| rv = APR_SUCCESS; |
| goto cleanup; |
| } |
| |
| if (!*pspace_left) { |
| rv = APR_EAGAIN; |
| goto cleanup; |
| } |
| |
| /* bucket is accepted and added to beam->buckets_to_send */ |
| if (APR_BUCKET_IS_HEAP(b)) { |
| /* For heap buckets, a read from a receiver thread is fine. The |
| * data will be there and live until the bucket itself is |
| * destroyed. */ |
| rv = apr_bucket_setaside(b, beam->pool); |
| if (rv != APR_SUCCESS) goto cleanup; |
| } |
| else if (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) { |
| rv = apr_bucket_setaside(b, beam->pool); |
| if (rv != APR_SUCCESS) goto cleanup; |
| } |
| else { |
| /* we know of no special shortcut to transfer the bucket to |
| * another pool without copying. So we make it a heap bucket. */ |
| apr_bucket *b2; |
| |
| rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); |
| if (rv != APR_SUCCESS) goto cleanup; |
| /* this allocates and copies data */ |
| b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc); |
| apr_bucket_delete(b); |
| b = b2; |
| APR_BRIGADE_INSERT_HEAD(bb, b); |
| } |
| |
| APR_BUCKET_REMOVE(b); |
| H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b); |
| *pwritten += (apr_off_t)b->length; |
| if (b->length > *pspace_left) { |
| *pspace_left = 0; |
| } |
| else { |
| *pspace_left -= b->length; |
| } |
| |
| cleanup: |
| return rv; |
| } |
| |
| apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from, |
| apr_bucket_brigade *sender_bb, |
| apr_read_type_e block, |
| apr_off_t *pwritten) |
| { |
| apr_status_t rv = APR_SUCCESS; |
| apr_size_t space_left = 0; |
| int was_empty; |
| |
| ap_assert(beam->pool); |
| |
| /* Called from the sender thread to add buckets to the beam */ |
| apr_thread_mutex_lock(beam->lock); |
| ap_assert(beam->from == from); |
| ap_assert(sender_bb); |
| H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "start send", sender_bb); |
| purge_consumed_buckets(beam); |
| *pwritten = 0; |
| was_empty = buffer_is_empty(beam); |
| |
| space_left = calc_space_left(beam); |
| while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) { |
| rv = append_bucket(beam, sender_bb, block, &space_left, pwritten); |
| if (beam->aborted) { |
| goto cleanup; |
| } |
| else if (APR_EAGAIN == rv) { |
| /* bucket was not added, as beam buffer has no space left. |
| * Trigger event callbacks, so receiver can know there is something |
| * to receive before we do a conditional wait. */ |
| purge_consumed_buckets(beam); |
| if (beam->send_cb) { |
| beam->send_cb(beam->send_ctx, beam); |
| } |
| if (was_empty && beam->was_empty_cb) { |
| beam->was_empty_cb(beam->was_empty_ctx, beam); |
| } |
| rv = wait_not_full(beam, from, block, &space_left); |
| if (APR_SUCCESS != rv) { |
| break; |
| } |
| was_empty = buffer_is_empty(beam); |
| } |
| } |
| |
| cleanup: |
| if (beam->send_cb && !buffer_is_empty(beam)) { |
| beam->send_cb(beam->send_ctx, beam); |
| } |
| if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) { |
| beam->was_empty_cb(beam->was_empty_ctx, beam); |
| } |
| apr_thread_cond_broadcast(beam->change); |
| |
| report_consumption(beam, 1); |
| if (beam->aborted) { |
| rv = APR_ECONNABORTED; |
| } |
| H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "end send", sender_bb); |
| apr_thread_mutex_unlock(beam->lock); |
| return rv; |
| } |
| |
| apr_status_t h2_beam_receive(h2_bucket_beam *beam, |
| conn_rec *to, |
| apr_bucket_brigade *bb, |
| apr_read_type_e block, |
| apr_off_t readbytes) |
| { |
| apr_bucket *bsender, *brecv, *ng; |
| int transferred = 0; |
| apr_status_t rv = APR_SUCCESS; |
| apr_off_t remain; |
| int consumed_buckets = 0; |
| |
| apr_thread_mutex_lock(beam->lock); |
| H2_BEAM_LOG(beam, to, APLOG_TRACE2, 0, "start receive", bb); |
| if (readbytes <= 0) { |
| readbytes = (apr_off_t)APR_SIZE_MAX; |
| } |
| remain = readbytes; |
| |
| transfer: |
| if (beam->aborted) { |
| beam_shutdown(beam, APR_SHUTDOWN_READ); |
| rv = APR_ECONNABORTED; |
| goto leave; |
| } |
| |
| ap_assert(beam->pool); |
| |
| /* transfer from our sender brigade, transforming sender buckets to |
| * receiver ones until we have enough */ |
| while (remain >= 0 && !H2_BLIST_EMPTY(&beam->buckets_to_send)) { |
| |
| brecv = NULL; |
| bsender = H2_BLIST_FIRST(&beam->buckets_to_send); |
| if (bsender->length > 0 && remain <= 0) { |
| break; |
| } |
| |
| if (APR_BUCKET_IS_METADATA(bsender)) { |
| /* we need a real copy into the receivers bucket_alloc */ |
| if (APR_BUCKET_IS_EOS(bsender)) { |
| /* this closes the beam */ |
| beam->closed = 1; |
| brecv = apr_bucket_eos_create(bb->bucket_alloc); |
| } |
| else if (APR_BUCKET_IS_FLUSH(bsender)) { |
| brecv = apr_bucket_flush_create(bb->bucket_alloc); |
| } |
| #if AP_HAS_RESPONSE_BUCKETS |
| else if (AP_BUCKET_IS_RESPONSE(bsender)) { |
| brecv = ap_bucket_response_clone(bsender, bb->p, bb->bucket_alloc); |
| } |
| else if (AP_BUCKET_IS_REQUEST(bsender)) { |
| brecv = ap_bucket_request_clone(bsender, bb->p, bb->bucket_alloc); |
| } |
| else if (AP_BUCKET_IS_HEADERS(bsender)) { |
| brecv = ap_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc); |
| } |
| #else |
| else if (H2_BUCKET_IS_HEADERS(bsender)) { |
| brecv = h2_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc); |
| } |
| #endif /* AP_HAS_RESPONSE_BUCKETS */ |
| else if (AP_BUCKET_IS_ERROR(bsender)) { |
| ap_bucket_error *eb = bsender->data; |
| brecv = ap_bucket_error_create(eb->status, eb->data, |
| bb->p, bb->bucket_alloc); |
| } |
| } |
| else if (bsender->length == 0) { |
| /* nop */ |
| } |
| #if APR_HAS_MMAP |
| else if (APR_BUCKET_IS_MMAP(bsender)) { |
| apr_bucket_mmap *bmmap = bsender->data; |
| apr_mmap_t *mmap; |
| rv = apr_mmap_dup(&mmap, bmmap->mmap, bb->p); |
| if (rv != APR_SUCCESS) goto leave; |
| brecv = apr_bucket_mmap_create(mmap, bsender->start, bsender->length, bb->bucket_alloc); |
| } |
| #endif |
| else if (APR_BUCKET_IS_FILE(bsender)) { |
| /* This is setaside into the target brigade pool so that |
| * any read operation messes with that pool and not |
| * the sender one. */ |
| apr_bucket_file *f = (apr_bucket_file *)bsender->data; |
| apr_file_t *fd = f->fd; |
| int setaside = (f->readpool != bb->p); |
| |
| if (setaside) { |
| rv = apr_file_setaside(&fd, fd, bb->p); |
| if (rv != APR_SUCCESS) goto leave; |
| } |
| ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length, |
| bb->p); |
| #if APR_HAS_MMAP |
| /* disable mmap handling as this leads to segfaults when |
| * the underlying file is changed while memory pointer has |
| * been handed out. See also PR 59348 */ |
| apr_bucket_file_enable_mmap(ng, 0); |
| #endif |
| remain -= bsender->length; |
| ++transferred; |
| } |
| else { |
| const char *data; |
| apr_size_t dlen; |
| /* we did that when the bucket was added, so this should |
| * give us the same data as before without changing the bucket |
| * or anything (pool) connected to it. */ |
| rv = apr_bucket_read(bsender, &data, &dlen, APR_BLOCK_READ); |
| if (rv != APR_SUCCESS) goto leave; |
| rv = apr_brigade_write(bb, NULL, NULL, data, dlen); |
| if (rv != APR_SUCCESS) goto leave; |
| |
| remain -= dlen; |
| ++transferred; |
| } |
| |
| if (brecv) { |
| /* we have a proxy that we can give the receiver */ |
| APR_BRIGADE_INSERT_TAIL(bb, brecv); |
| remain -= brecv->length; |
| ++transferred; |
| } |
| APR_BUCKET_REMOVE(bsender); |
| H2_BLIST_INSERT_TAIL(&beam->buckets_consumed, bsender); |
| beam->recv_bytes += bsender->length; |
| ++consumed_buckets; |
| } |
| |
| if (beam->recv_cb && consumed_buckets > 0) { |
| beam->recv_cb(beam->recv_ctx, beam); |
| } |
| |
| if (transferred) { |
| apr_thread_cond_broadcast(beam->change); |
| rv = APR_SUCCESS; |
| } |
| else if (beam->aborted) { |
| rv = APR_ECONNABORTED; |
| } |
| else if (beam->closed) { |
| rv = APR_EOF; |
| } |
| else { |
| rv = wait_not_empty(beam, to, block); |
| if (rv != APR_SUCCESS) { |
| goto leave; |
| } |
| goto transfer; |
| } |
| |
| leave: |
| H2_BEAM_LOG(beam, to, APLOG_TRACE2, rv, "end receive", bb); |
| apr_thread_mutex_unlock(beam->lock); |
| return rv; |
| } |
| |
| void h2_beam_on_consumed(h2_bucket_beam *beam, |
| h2_beam_io_callback *io_cb, void *ctx) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->cons_io_cb = io_cb; |
| beam->cons_ctx = ctx; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| void h2_beam_on_received(h2_bucket_beam *beam, |
| h2_beam_ev_callback *recv_cb, void *ctx) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->recv_cb = recv_cb; |
| beam->recv_ctx = ctx; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| void h2_beam_on_send(h2_bucket_beam *beam, |
| h2_beam_ev_callback *send_cb, void *ctx) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->send_cb = send_cb; |
| beam->send_ctx = ctx; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| void h2_beam_on_was_empty(h2_bucket_beam *beam, |
| h2_beam_ev_callback *was_empty_cb, void *ctx) |
| { |
| apr_thread_mutex_lock(beam->lock); |
| beam->was_empty_cb = was_empty_cb; |
| beam->was_empty_ctx = ctx; |
| apr_thread_mutex_unlock(beam->lock); |
| } |
| |
| |
| static apr_off_t get_buffered_data_len(h2_bucket_beam *beam) |
| { |
| apr_bucket *b; |
| apr_off_t l = 0; |
| |
| for (b = H2_BLIST_FIRST(&beam->buckets_to_send); |
| b != H2_BLIST_SENTINEL(&beam->buckets_to_send); |
| b = APR_BUCKET_NEXT(b)) { |
| /* should all have determinate length */ |
| l += b->length; |
| } |
| return l; |
| } |
| |
| apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam) |
| { |
| apr_off_t l = 0; |
| |
| apr_thread_mutex_lock(beam->lock); |
| l = get_buffered_data_len(beam); |
| apr_thread_mutex_unlock(beam->lock); |
| return l; |
| } |
| |
| apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam) |
| { |
| apr_bucket *b; |
| apr_off_t l = 0; |
| |
| apr_thread_mutex_lock(beam->lock); |
| for (b = H2_BLIST_FIRST(&beam->buckets_to_send); |
| b != H2_BLIST_SENTINEL(&beam->buckets_to_send); |
| b = APR_BUCKET_NEXT(b)) { |
| l += bucket_mem_used(b); |
| } |
| apr_thread_mutex_unlock(beam->lock); |
| return l; |
| } |
| |
| int h2_beam_empty(h2_bucket_beam *beam) |
| { |
| int empty = 1; |
| |
| apr_thread_mutex_lock(beam->lock); |
| empty = buffer_is_empty(beam); |
| apr_thread_mutex_unlock(beam->lock); |
| return empty; |
| } |
| |
| int h2_beam_report_consumption(h2_bucket_beam *beam) |
| { |
| int rv = 0; |
| |
| apr_thread_mutex_lock(beam->lock); |
| rv = report_consumption(beam, 1); |
| apr_thread_mutex_unlock(beam->lock); |
| return rv; |
| } |