blob: cbf7f348da7e3dd1f4b359d50627a7e7b88224ca [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <apr_lib.h>
#include <apr_atomic.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <apr_buckets.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <httpd.h>
#include <http_protocol.h>
#include <http_log.h>
#include "h2_private.h"
#include "h2_conn_ctx.h"
#include "h2_headers.h"
#include "h2_util.h"
#include "h2_bucket_beam.h"
#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link);
#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link)
#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list)
#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list)
#define H2_BLIST_INSERT_HEAD(b, e) do { \
apr_bucket *ap__b = (e); \
APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \
} while (0)
#define H2_BLIST_INSERT_TAIL(b, e) do { \
apr_bucket *ap__b = (e); \
APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \
} while (0)
#define H2_BLIST_CONCAT(a, b) do { \
APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
#define H2_BLIST_PREPEND(a, b) do { \
APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
static int buffer_is_empty(h2_bucket_beam *beam);
static apr_off_t get_buffered_data_len(h2_bucket_beam *beam);
static int h2_blist_count(h2_blist *blist)
{
apr_bucket *b;
int count = 0;
for (b = H2_BLIST_FIRST(blist); b != H2_BLIST_SENTINEL(blist);
b = APR_BUCKET_NEXT(b)) {
++count;
}
return count;
}
#define H2_BEAM_LOG(beam, c, level, rv, msg, bb) \
do { \
if (APLOG_C_IS_LEVEL((c),(level))) { \
char buffer[4 * 1024]; \
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
len = bb? h2_util_bb_print(buffer, bmax, "", "", bb) : 0; \
ap_log_cerror(APLOG_MARK, (level), rv, (c), \
"BEAM[%s,%s%sdata=%ld,buckets(send/consumed)=%d/%d]: %s %s", \
(beam)->name, \
(beam)->aborted? "aborted," : "", \
buffer_is_empty(beam)? "empty," : "", \
(long)get_buffered_data_len(beam), \
h2_blist_count(&(beam)->buckets_to_send), \
h2_blist_count(&(beam)->buckets_consumed), \
(msg), len? buffer : ""); \
} \
} while (0)
static int bucket_is_mmap(apr_bucket *b)
{
#if APR_HAS_MMAP
return APR_BUCKET_IS_MMAP(b);
#else
/* if it is not defined as enabled, it should always be no */
return 0;
#endif
}
static apr_off_t bucket_mem_used(apr_bucket *b)
{
if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) {
return 0;
}
else {
/* should all have determinate length */
return (apr_off_t)b->length;
}
}
static int report_consumption(h2_bucket_beam *beam, int locked)
{
int rv = 0;
apr_off_t len = beam->recv_bytes - beam->recv_bytes_reported;
h2_beam_io_callback *cb = beam->cons_io_cb;
if (len > 0) {
if (cb) {
void *ctx = beam->cons_ctx;
if (locked) apr_thread_mutex_unlock(beam->lock);
cb(ctx, beam, len);
if (locked) apr_thread_mutex_lock(beam->lock);
rv = 1;
}
beam->recv_bytes_reported += len;
}
return rv;
}
static apr_size_t calc_buffered(h2_bucket_beam *beam)
{
apr_size_t len = 0;
apr_bucket *b;
for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
b = APR_BUCKET_NEXT(b)) {
if (b->length == ((apr_size_t)-1)) {
/* do not count */
}
else if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) {
/* if unread, has no real mem footprint. */
}
else {
len += b->length;
}
}
return len;
}
static void purge_consumed_buckets(h2_bucket_beam *beam)
{
apr_bucket *b;
/* delete all sender buckets in purge brigade, needs to be called
* from sender thread only */
while (!H2_BLIST_EMPTY(&beam->buckets_consumed)) {
b = H2_BLIST_FIRST(&beam->buckets_consumed);
apr_bucket_delete(b);
}
}
static apr_size_t calc_space_left(h2_bucket_beam *beam)
{
if (beam->max_buf_size > 0) {
apr_size_t len = calc_buffered(beam);
return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
}
return APR_SIZE_MAX;
}
static int buffer_is_empty(h2_bucket_beam *beam)
{
return H2_BLIST_EMPTY(&beam->buckets_to_send);
}
static apr_status_t wait_not_empty(h2_bucket_beam *beam, conn_rec *c, apr_read_type_e block)
{
apr_status_t rv = APR_SUCCESS;
while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
if (beam->aborted) {
rv = APR_ECONNABORTED;
}
else if (beam->closed) {
rv = APR_EOF;
}
else if (APR_BLOCK_READ != block) {
rv = APR_EAGAIN;
}
else if (beam->timeout > 0) {
H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, timeout", NULL);
rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout);
}
else {
H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, forever", NULL);
rv = apr_thread_cond_wait(beam->change, beam->lock);
}
}
return rv;
}
static apr_status_t wait_not_full(h2_bucket_beam *beam, conn_rec *c,
apr_read_type_e block,
apr_size_t *pspace_left)
{
apr_status_t rv = APR_SUCCESS;
apr_size_t left;
while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
if (beam->aborted) {
rv = APR_ECONNABORTED;
}
else if (block != APR_BLOCK_READ) {
rv = APR_EAGAIN;
}
else {
if (beam->timeout > 0) {
H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, timeout", NULL);
rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout);
}
else {
H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, forever", NULL);
rv = apr_thread_cond_wait(beam->change, beam->lock);
}
}
}
*pspace_left = left;
return rv;
}
static void h2_blist_cleanup(h2_blist *bl)
{
apr_bucket *e;
while (!H2_BLIST_EMPTY(bl)) {
e = H2_BLIST_FIRST(bl);
apr_bucket_delete(e);
}
}
static void beam_shutdown(h2_bucket_beam *beam, apr_shutdown_how_e how)
{
if (!beam->pool) {
/* pool being cleared already */
return;
}
/* shutdown both receiver and sender? */
if (how == APR_SHUTDOWN_READWRITE) {
beam->cons_io_cb = NULL;
beam->recv_cb = NULL;
}
/* shutdown sender (or both)? */
if (how != APR_SHUTDOWN_READ) {
h2_blist_cleanup(&beam->buckets_to_send);
purge_consumed_buckets(beam);
}
}
static apr_status_t beam_cleanup(void *data)
{
h2_bucket_beam *beam = data;
beam_shutdown(beam, APR_SHUTDOWN_READWRITE);
beam->pool = NULL; /* the pool is clearing now */
return APR_SUCCESS;
}
apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c)
{
if (beam->pool) {
H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroy", NULL);
apr_pool_cleanup_run(beam->pool, beam, beam_cleanup);
}
H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroyed", NULL);
return APR_SUCCESS;
}
apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from,
apr_pool_t *pool, int id, const char *tag,
apr_size_t max_buf_size,
apr_interval_time_t timeout)
{
h2_bucket_beam *beam;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(from);
apr_status_t rv;
beam = apr_pcalloc(pool, sizeof(*beam));
beam->pool = pool;
beam->from = from;
beam->id = id;
beam->name = apr_psprintf(pool, "%s-%d-%s",
conn_ctx->id, id, tag);
H2_BLIST_INIT(&beam->buckets_to_send);
H2_BLIST_INIT(&beam->buckets_consumed);
beam->tx_mem_limits = 1;
beam->max_buf_size = max_buf_size;
beam->timeout = timeout;
rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
if (APR_SUCCESS != rv) goto cleanup;
rv = apr_thread_cond_create(&beam->change, pool);
if (APR_SUCCESS != rv) goto cleanup;
apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
cleanup:
H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "created", NULL);
*pbeam = (APR_SUCCESS == rv)? beam : NULL;
return rv;
}
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
{
apr_thread_mutex_lock(beam->lock);
beam->max_buf_size = buffer_size;
apr_thread_mutex_unlock(beam->lock);
}
void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled)
{
apr_thread_mutex_lock(beam->lock);
beam->copy_files = enabled;
apr_thread_mutex_unlock(beam->lock);
}
apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
{
apr_size_t buffer_size = 0;
apr_thread_mutex_lock(beam->lock);
buffer_size = beam->max_buf_size;
apr_thread_mutex_unlock(beam->lock);
return buffer_size;
}
apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
{
apr_interval_time_t timeout;
apr_thread_mutex_lock(beam->lock);
timeout = beam->timeout;
apr_thread_mutex_unlock(beam->lock);
return timeout;
}
void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
apr_thread_mutex_lock(beam->lock);
beam->timeout = timeout;
apr_thread_mutex_unlock(beam->lock);
}
void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c)
{
apr_thread_mutex_lock(beam->lock);
beam->aborted = 1;
if (c == beam->from) {
/* sender aborts */
if (beam->send_cb) {
beam->send_cb(beam->send_ctx, beam);
}
if (beam->was_empty_cb && buffer_is_empty(beam)) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
/* no more consumption reporting to sender */
report_consumption(beam, 1);
beam->cons_ctx = NULL;
beam_shutdown(beam, APR_SHUTDOWN_WRITE);
}
else {
/* receiver aborts */
beam_shutdown(beam, APR_SHUTDOWN_READ);
}
apr_thread_cond_broadcast(beam->change);
apr_thread_mutex_unlock(beam->lock);
}
void h2_beam_close(h2_bucket_beam *beam, conn_rec *c)
{
apr_thread_mutex_lock(beam->lock);
if (!beam->closed) {
/* should only be called from sender */
ap_assert(c == beam->from);
beam->closed = 1;
if (beam->send_cb) {
beam->send_cb(beam->send_ctx, beam);
}
if (beam->was_empty_cb && buffer_is_empty(beam)) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
apr_thread_cond_broadcast(beam->change);
}
apr_thread_mutex_unlock(beam->lock);
}
static apr_status_t append_bucket(h2_bucket_beam *beam,
apr_bucket_brigade *bb,
apr_read_type_e block,
apr_size_t *pspace_left,
apr_off_t *pwritten)
{
apr_bucket *b;
const char *data;
apr_size_t len;
apr_status_t rv = APR_SUCCESS;
int can_beam = 0;
(void)block;
if (beam->aborted) {
rv = APR_ECONNABORTED;
goto cleanup;
}
ap_assert(beam->pool);
b = APR_BRIGADE_FIRST(bb);
if (APR_BUCKET_IS_METADATA(b)) {
APR_BUCKET_REMOVE(b);
apr_bucket_setaside(b, beam->pool);
H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
goto cleanup;
}
/* non meta bucket */
/* in case of indeterminate length, we need to read the bucket,
* so that it transforms itself into something stable. */
if (b->length == ((apr_size_t)-1)) {
rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (rv != APR_SUCCESS) goto cleanup;
}
if (APR_BUCKET_IS_FILE(b)) {
/* For file buckets the problem is their internal readpool that
* is used on the first read to allocate buffer/mmap.
* Since setting aside a file bucket will de-register the
* file cleanup function from the previous pool, we need to
* call that only from the sender thread.
*
* Currently, we do not handle file bucket with refcount > 1 as
* the beam is then not in complete control of the file's lifetime.
* Which results in the bug that a file get closed by the receiver
* while the sender or the beam still have buckets using it.
*
* Additionally, we allow callbacks to prevent beaming file
* handles across. The use case for this is to limit the number
* of open file handles and rather use a less efficient beam
* transport. */
apr_bucket_file *bf = b->data;
can_beam = !beam->copy_files && (bf->refcount.refcount == 1);
}
else if (bucket_is_mmap(b)) {
can_beam = !beam->copy_files;
}
if (b->length == 0) {
apr_bucket_delete(b);
rv = APR_SUCCESS;
goto cleanup;
}
if (!*pspace_left) {
rv = APR_EAGAIN;
goto cleanup;
}
/* bucket is accepted and added to beam->buckets_to_send */
if (APR_BUCKET_IS_HEAP(b)) {
/* For heap buckets, a read from a receiver thread is fine. The
* data will be there and live until the bucket itself is
* destroyed. */
rv = apr_bucket_setaside(b, beam->pool);
if (rv != APR_SUCCESS) goto cleanup;
}
else if (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) {
rv = apr_bucket_setaside(b, beam->pool);
if (rv != APR_SUCCESS) goto cleanup;
}
else {
/* we know of no special shortcut to transfer the bucket to
* another pool without copying. So we make it a heap bucket. */
apr_bucket *b2;
rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (rv != APR_SUCCESS) goto cleanup;
/* this allocates and copies data */
b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc);
apr_bucket_delete(b);
b = b2;
APR_BRIGADE_INSERT_HEAD(bb, b);
}
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
*pwritten += (apr_off_t)b->length;
if (b->length > *pspace_left) {
*pspace_left = 0;
}
else {
*pspace_left -= b->length;
}
cleanup:
return rv;
}
apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from,
apr_bucket_brigade *sender_bb,
apr_read_type_e block,
apr_off_t *pwritten)
{
apr_status_t rv = APR_SUCCESS;
apr_size_t space_left = 0;
int was_empty;
ap_assert(beam->pool);
/* Called from the sender thread to add buckets to the beam */
apr_thread_mutex_lock(beam->lock);
ap_assert(beam->from == from);
ap_assert(sender_bb);
H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "start send", sender_bb);
purge_consumed_buckets(beam);
*pwritten = 0;
was_empty = buffer_is_empty(beam);
space_left = calc_space_left(beam);
while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
rv = append_bucket(beam, sender_bb, block, &space_left, pwritten);
if (beam->aborted) {
goto cleanup;
}
else if (APR_EAGAIN == rv) {
/* bucket was not added, as beam buffer has no space left.
* Trigger event callbacks, so receiver can know there is something
* to receive before we do a conditional wait. */
purge_consumed_buckets(beam);
if (beam->send_cb) {
beam->send_cb(beam->send_ctx, beam);
}
if (was_empty && beam->was_empty_cb) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
rv = wait_not_full(beam, from, block, &space_left);
if (APR_SUCCESS != rv) {
break;
}
was_empty = buffer_is_empty(beam);
}
}
cleanup:
if (beam->send_cb && !buffer_is_empty(beam)) {
beam->send_cb(beam->send_ctx, beam);
}
if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
apr_thread_cond_broadcast(beam->change);
report_consumption(beam, 1);
if (beam->aborted) {
rv = APR_ECONNABORTED;
}
H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "end send", sender_bb);
apr_thread_mutex_unlock(beam->lock);
return rv;
}
apr_status_t h2_beam_receive(h2_bucket_beam *beam,
conn_rec *to,
apr_bucket_brigade *bb,
apr_read_type_e block,
apr_off_t readbytes)
{
apr_bucket *bsender, *brecv, *ng;
int transferred = 0;
apr_status_t rv = APR_SUCCESS;
apr_off_t remain;
int consumed_buckets = 0;
apr_thread_mutex_lock(beam->lock);
H2_BEAM_LOG(beam, to, APLOG_TRACE2, 0, "start receive", bb);
if (readbytes <= 0) {
readbytes = (apr_off_t)APR_SIZE_MAX;
}
remain = readbytes;
transfer:
if (beam->aborted) {
beam_shutdown(beam, APR_SHUTDOWN_READ);
rv = APR_ECONNABORTED;
goto leave;
}
ap_assert(beam->pool);
/* transfer from our sender brigade, transforming sender buckets to
* receiver ones until we have enough */
while (remain >= 0 && !H2_BLIST_EMPTY(&beam->buckets_to_send)) {
brecv = NULL;
bsender = H2_BLIST_FIRST(&beam->buckets_to_send);
if (bsender->length > 0 && remain <= 0) {
break;
}
if (APR_BUCKET_IS_METADATA(bsender)) {
/* we need a real copy into the receivers bucket_alloc */
if (APR_BUCKET_IS_EOS(bsender)) {
/* this closes the beam */
beam->closed = 1;
brecv = apr_bucket_eos_create(bb->bucket_alloc);
}
else if (APR_BUCKET_IS_FLUSH(bsender)) {
brecv = apr_bucket_flush_create(bb->bucket_alloc);
}
#if AP_HAS_RESPONSE_BUCKETS
else if (AP_BUCKET_IS_RESPONSE(bsender)) {
brecv = ap_bucket_response_clone(bsender, bb->p, bb->bucket_alloc);
}
else if (AP_BUCKET_IS_REQUEST(bsender)) {
brecv = ap_bucket_request_clone(bsender, bb->p, bb->bucket_alloc);
}
else if (AP_BUCKET_IS_HEADERS(bsender)) {
brecv = ap_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
}
#else
else if (H2_BUCKET_IS_HEADERS(bsender)) {
brecv = h2_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
}
#endif /* AP_HAS_RESPONSE_BUCKETS */
else if (AP_BUCKET_IS_ERROR(bsender)) {
ap_bucket_error *eb = bsender->data;
brecv = ap_bucket_error_create(eb->status, eb->data,
bb->p, bb->bucket_alloc);
}
}
else if (bsender->length == 0) {
/* nop */
}
#if APR_HAS_MMAP
else if (APR_BUCKET_IS_MMAP(bsender)) {
apr_bucket_mmap *bmmap = bsender->data;
apr_mmap_t *mmap;
rv = apr_mmap_dup(&mmap, bmmap->mmap, bb->p);
if (rv != APR_SUCCESS) goto leave;
brecv = apr_bucket_mmap_create(mmap, bsender->start, bsender->length, bb->bucket_alloc);
}
#endif
else if (APR_BUCKET_IS_FILE(bsender)) {
/* This is setaside into the target brigade pool so that
* any read operation messes with that pool and not
* the sender one. */
apr_bucket_file *f = (apr_bucket_file *)bsender->data;
apr_file_t *fd = f->fd;
int setaside = (f->readpool != bb->p);
if (setaside) {
rv = apr_file_setaside(&fd, fd, bb->p);
if (rv != APR_SUCCESS) goto leave;
}
ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length,
bb->p);
#if APR_HAS_MMAP
/* disable mmap handling as this leads to segfaults when
* the underlying file is changed while memory pointer has
* been handed out. See also PR 59348 */
apr_bucket_file_enable_mmap(ng, 0);
#endif
remain -= bsender->length;
++transferred;
}
else {
const char *data;
apr_size_t dlen;
/* we did that when the bucket was added, so this should
* give us the same data as before without changing the bucket
* or anything (pool) connected to it. */
rv = apr_bucket_read(bsender, &data, &dlen, APR_BLOCK_READ);
if (rv != APR_SUCCESS) goto leave;
rv = apr_brigade_write(bb, NULL, NULL, data, dlen);
if (rv != APR_SUCCESS) goto leave;
remain -= dlen;
++transferred;
}
if (brecv) {
/* we have a proxy that we can give the receiver */
APR_BRIGADE_INSERT_TAIL(bb, brecv);
remain -= brecv->length;
++transferred;
}
APR_BUCKET_REMOVE(bsender);
H2_BLIST_INSERT_TAIL(&beam->buckets_consumed, bsender);
beam->recv_bytes += bsender->length;
++consumed_buckets;
}
if (beam->recv_cb && consumed_buckets > 0) {
beam->recv_cb(beam->recv_ctx, beam);
}
if (transferred) {
apr_thread_cond_broadcast(beam->change);
rv = APR_SUCCESS;
}
else if (beam->aborted) {
rv = APR_ECONNABORTED;
}
else if (beam->closed) {
rv = APR_EOF;
}
else {
rv = wait_not_empty(beam, to, block);
if (rv != APR_SUCCESS) {
goto leave;
}
goto transfer;
}
leave:
H2_BEAM_LOG(beam, to, APLOG_TRACE2, rv, "end receive", bb);
apr_thread_mutex_unlock(beam->lock);
return rv;
}
void h2_beam_on_consumed(h2_bucket_beam *beam,
h2_beam_io_callback *io_cb, void *ctx)
{
apr_thread_mutex_lock(beam->lock);
beam->cons_io_cb = io_cb;
beam->cons_ctx = ctx;
apr_thread_mutex_unlock(beam->lock);
}
void h2_beam_on_received(h2_bucket_beam *beam,
h2_beam_ev_callback *recv_cb, void *ctx)
{
apr_thread_mutex_lock(beam->lock);
beam->recv_cb = recv_cb;
beam->recv_ctx = ctx;
apr_thread_mutex_unlock(beam->lock);
}
void h2_beam_on_send(h2_bucket_beam *beam,
h2_beam_ev_callback *send_cb, void *ctx)
{
apr_thread_mutex_lock(beam->lock);
beam->send_cb = send_cb;
beam->send_ctx = ctx;
apr_thread_mutex_unlock(beam->lock);
}
void h2_beam_on_was_empty(h2_bucket_beam *beam,
h2_beam_ev_callback *was_empty_cb, void *ctx)
{
apr_thread_mutex_lock(beam->lock);
beam->was_empty_cb = was_empty_cb;
beam->was_empty_ctx = ctx;
apr_thread_mutex_unlock(beam->lock);
}
static apr_off_t get_buffered_data_len(h2_bucket_beam *beam)
{
apr_bucket *b;
apr_off_t l = 0;
for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
b = APR_BUCKET_NEXT(b)) {
/* should all have determinate length */
l += b->length;
}
return l;
}
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
{
apr_off_t l = 0;
apr_thread_mutex_lock(beam->lock);
l = get_buffered_data_len(beam);
apr_thread_mutex_unlock(beam->lock);
return l;
}
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
{
apr_bucket *b;
apr_off_t l = 0;
apr_thread_mutex_lock(beam->lock);
for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
b = APR_BUCKET_NEXT(b)) {
l += bucket_mem_used(b);
}
apr_thread_mutex_unlock(beam->lock);
return l;
}
int h2_beam_empty(h2_bucket_beam *beam)
{
int empty = 1;
apr_thread_mutex_lock(beam->lock);
empty = buffer_is_empty(beam);
apr_thread_mutex_unlock(beam->lock);
return empty;
}
int h2_beam_report_consumption(h2_bucket_beam *beam)
{
int rv = 0;
apr_thread_mutex_lock(beam->lock);
rv = report_consumption(beam, 1);
apr_thread_mutex_unlock(beam->lock);
return rv;
}