blob: 6c206a1bfb300ed2ce573fcc803fbb23bec3e318 [file] [log] [blame]
/* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
#include "serf.h"
#include "serf_bucket_util.h"
/* Should be an APR_RING? */
typedef struct bucket_list {
serf_bucket_t *bucket;
struct bucket_list *next;
} bucket_list_t;
typedef struct aggregate_context_t {
bucket_list_t *list; /* active buckets */
bucket_list_t *last; /* last bucket of the list */
bucket_list_t *done; /* we finished reading this; now pending a destroy */
serf_bucket_aggregate_eof_t hold_open;
void *hold_open_baton;
serf_config_t *config;
} aggregate_context_t;
static void cleanup_aggregate(aggregate_context_t *ctx,
serf_bucket_alloc_t *allocator)
{
bucket_list_t *next_list;
/* If we finished reading a bucket during the previous read, then
* we can now toss that bucket.
*/
while (ctx->done != NULL) {
next_list = ctx->done->next;
serf_bucket_destroy(ctx->done->bucket);
serf_bucket_mem_free(allocator, ctx->done);
ctx->done = next_list;
}
}
void serf_bucket_aggregate_cleanup(
serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
{
aggregate_context_t *ctx = bucket->data;
cleanup_aggregate(ctx, allocator);
}
static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
{
aggregate_context_t *ctx;
ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
ctx->list = NULL;
ctx->last = NULL;
ctx->done = NULL;
ctx->hold_open = NULL;
ctx->hold_open_baton = NULL;
ctx->config = NULL;
return ctx;
}
serf_bucket_t *serf_bucket_aggregate_create(
serf_bucket_alloc_t *allocator)
{
aggregate_context_t *ctx;
ctx = create_aggregate(allocator);
return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
}
static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
{
aggregate_context_t *ctx = bucket->data;
bucket_list_t *next_ctx;
while (ctx->list) {
serf_bucket_destroy(ctx->list->bucket);
next_ctx = ctx->list->next;
serf_bucket_mem_free(bucket->allocator, ctx->list);
ctx->list = next_ctx;
}
cleanup_aggregate(ctx, bucket->allocator);
serf_default_destroy_and_data(bucket);
}
void serf_bucket_aggregate_become(serf_bucket_t *bucket)
{
aggregate_context_t *ctx;
ctx = create_aggregate(bucket->allocator);
bucket->type = &serf_bucket_type_aggregate;
bucket->data = ctx;
/* The allocator remains the same. */
}
void serf_bucket_aggregate_prepend(
serf_bucket_t *aggregate_bucket,
serf_bucket_t *prepend_bucket)
{
aggregate_context_t *ctx = aggregate_bucket->data;
bucket_list_t *new_list;
new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
sizeof(*new_list));
new_list->bucket = prepend_bucket;
new_list->next = ctx->list;
if (ctx->list == NULL)
ctx->last = new_list;
ctx->list = new_list;
/* Share our config with this new bucket */
serf_bucket_set_config(prepend_bucket, ctx->config);
}
void serf_bucket_aggregate_append(
serf_bucket_t *aggregate_bucket,
serf_bucket_t *append_bucket)
{
aggregate_context_t *ctx = aggregate_bucket->data;
bucket_list_t *new_list;
new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
sizeof(*new_list));
new_list->bucket = append_bucket;
new_list->next = NULL;
/* If we use APR_RING, this is trivial. So, wait.
new_list->next = ctx->list;
ctx->list = new_list;
*/
if (ctx->list == NULL) {
ctx->list = new_list;
ctx->last = new_list;
}
else {
ctx->last->next = new_list;
ctx->last = ctx->last->next;
}
/* Share our config with this new bucket */
serf_bucket_set_config(append_bucket, ctx->config);
}
void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
serf_bucket_aggregate_eof_t fn,
void *baton)
{
aggregate_context_t *ctx = aggregate_bucket->data;
ctx->hold_open = fn;
ctx->hold_open_baton = baton;
}
void serf_bucket_aggregate_prepend_iovec(
serf_bucket_t *aggregate_bucket,
struct iovec *vecs,
int vecs_count)
{
int i;
/* Add in reverse order. */
for (i = vecs_count - 1; i >= 0; i--) {
serf_bucket_t *new_bucket;
new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
vecs[i].iov_len,
NULL, NULL,
aggregate_bucket->allocator);
serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
}
}
void serf_bucket_aggregate_append_iovec(
serf_bucket_t *aggregate_bucket,
struct iovec *vecs,
int vecs_count)
{
/* aggregate_context_t *ctx = aggregate_bucket->data; */
serf_bucket_t *new_bucket;
new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
aggregate_bucket->allocator);
serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
}
static apr_status_t read_aggregate(serf_bucket_t *bucket,
apr_size_t requested,
int vecs_size, struct iovec *vecs,
int *vecs_used)
{
aggregate_context_t *ctx = bucket->data;
int cur_vecs_used;
apr_status_t status;
*vecs_used = 0;
if (!ctx->list) {
if (ctx->hold_open) {
return ctx->hold_open(ctx->hold_open_baton, bucket);
}
else {
return APR_EOF;
}
}
status = APR_SUCCESS;
while (requested) {
serf_bucket_t *head = ctx->list->bucket;
status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
&cur_vecs_used);
if (SERF_BUCKET_READ_ERROR(status))
return status;
/* Add the number of vecs we read to our running total. */
*vecs_used += cur_vecs_used;
if (cur_vecs_used > 0 || status) {
bucket_list_t *next_list;
/* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
* as it isn't safe to read more without returning to our caller.
*/
if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
return status;
}
/* However, if we read EOF, we can stash this bucket in a
* to-be-freed list and move on to the next bucket. This ensures
* that the bucket stays alive (so as not to violate our read
* semantics). We'll destroy this list of buckets the next time
* we are asked to perform a read operation - thus ensuring the
* proper read lifetime.
*/
if (cur_vecs_used > 0) {
next_list = ctx->list->next;
ctx->list->next = ctx->done;
ctx->done = ctx->list;
ctx->list = next_list;
}
else {
/* This bucket didn't add a single byte.
We can destroy it directly */
next_list = ctx->list;
ctx->list = next_list->next;
serf_bucket_destroy(next_list->bucket);
serf_bucket_mem_free(bucket->allocator, next_list);
}
/* If we have no more in our list, return EOF. */
if (!ctx->list) {
ctx->last = NULL;
if (ctx->hold_open) {
status = ctx->hold_open(ctx->hold_open_baton, bucket);
if (status || !ctx->list)
return status;
/* Wow, we 'magically' refilled! */
}
else {
return APR_EOF;
}
}
/* At this point, it safe to read the next bucket - if we can. */
/* If the caller doesn't want ALL_AVAIL, decrement the size
* of the items we just read from the list.
*/
if (requested != SERF_READ_ALL_AVAIL) {
int i;
for (i = 0; i < cur_vecs_used; i++)
requested -= vecs[i].iov_len;
}
/* Adjust our vecs to account for what we just read. */
vecs_size -= cur_vecs_used;
vecs += cur_vecs_used;
/* We reached our max. Oh well. */
if (!requested || !vecs_size) {
return APR_SUCCESS;
}
}
else if (!status) {
/* Success and no data. Let's return what we have.
Better luck next time.
This scenario is triggered by test_split_buckets(),
in a case where EAGAIN is really not what we want.
*/
return APR_SUCCESS;
}
}
return status;
}
static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
apr_size_t requested,
const char **data, apr_size_t *len)
{
aggregate_context_t *ctx = bucket->data;
struct iovec vec;
int vecs_used;
apr_status_t status;
cleanup_aggregate(ctx, bucket->allocator);
status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
if (!vecs_used) {
*len = 0;
}
else {
*data = vec.iov_base;
*len = vec.iov_len;
}
return status;
}
static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
apr_size_t requested,
int vecs_size,
struct iovec *vecs,
int *vecs_used)
{
aggregate_context_t *ctx = bucket->data;
cleanup_aggregate(ctx, bucket->allocator);
return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
}
static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
int acceptable, int *found,
const char **data, apr_size_t *len)
{
aggregate_context_t *ctx = bucket->data;
apr_status_t status;
*found = SERF_NEWLINE_NONE;
cleanup_aggregate(ctx, bucket->allocator);
do {
serf_bucket_t *head;
*len = 0;
if (!ctx->list) {
if (ctx->hold_open) {
return ctx->hold_open(ctx->hold_open_baton, bucket);
}
else {
return APR_EOF;
}
}
head = ctx->list->bucket;
status = serf_bucket_readline(head, acceptable, found,
data, len);
if (SERF_BUCKET_READ_ERROR(status))
return status;
if (APR_STATUS_IS_EOF(status)) {
bucket_list_t *next_list;
/* head bucket is empty, move to to-be-cleaned-up list. */
next_list = ctx->list->next;
ctx->list->next = ctx->done;
ctx->done = ctx->list;
ctx->list = next_list;
/* If we have no more in our list, return EOF. */
if (!ctx->list) {
ctx->last = NULL;
if (ctx->hold_open) {
return ctx->hold_open(ctx->hold_open_baton, bucket);
}
else {
return APR_EOF;
}
}
/* we read something, so bail out and let the appl. read again. */
if (*len)
status = APR_SUCCESS;
}
/* continue with APR_SUCCESS or APR_EOF and no data read yet. */
} while (!*len && status != APR_EAGAIN);
return status;
}
static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
const char **data,
apr_size_t *len)
{
aggregate_context_t *ctx = bucket->data;
serf_bucket_t *head;
apr_status_t status;
cleanup_aggregate(ctx, bucket->allocator);
/* Peek the first bucket in the list, if any. */
if (!ctx->list) {
*len = 0;
if (ctx->hold_open) {
status = ctx->hold_open(ctx->hold_open_baton, bucket);
if (APR_STATUS_IS_EAGAIN(status))
status = APR_SUCCESS;
return status;
}
else {
return APR_EOF;
}
}
head = ctx->list->bucket;
status = serf_bucket_peek(head, data, len);
/* Is the current head *at* eof? */
while (APR_STATUS_IS_EOF(status) && !*len) {
bucket_list_t *item = ctx->list;
if (item->next)
ctx->list = item->next;
else
ctx->list = ctx->last = NULL;
/* We don't have outstanding data. We are free to release now */
serf_bucket_destroy(item->bucket);
serf_bucket_mem_free(bucket->allocator, item);
if (ctx->list) {
head = ctx->list->bucket;
status = serf_bucket_peek(head, data, len);
}
else
break; /* Check hold open below */
}
if (APR_STATUS_IS_EOF(status)) {
if (ctx->list && ctx->list->next) {
status = APR_SUCCESS;
} else {
if (ctx->hold_open) {
status = ctx->hold_open(ctx->hold_open_baton, bucket);
if (APR_STATUS_IS_EAGAIN(status))
status = APR_SUCCESS;
return status;
}
}
}
return status;
}
static serf_bucket_t * serf_aggregate_read_bucket(
serf_bucket_t *bucket,
const serf_bucket_type_t *type)
{
aggregate_context_t *ctx = bucket->data;
bucket_list_t *list;
if (!ctx->list) {
return NULL;
}
list = ctx->list;
if (list->bucket->type == type) {
/* Got the bucket. Consume it from our list. */
serf_bucket_t *found_bucket = list->bucket;
ctx->list = list->next;
/* And destroy the now unused item */
serf_bucket_mem_free(bucket->allocator, list);
return found_bucket;
}
/* Call read_bucket on first one in our list. */
return serf_bucket_read_bucket(ctx->list->bucket, type);
}
static apr_uint64_t serf_aggregate_get_remaining(serf_bucket_t *bucket)
{
aggregate_context_t *ctx = bucket->data;
apr_uint64_t remaining = 0;
bucket_list_t *cur;
if (ctx->hold_open) {
return SERF_LENGTH_UNKNOWN;
}
for(cur = ctx->list; cur != NULL; cur = cur->next) {
apr_uint64_t bucket_remaining = serf_bucket_get_remaining(cur->bucket);
if (bucket_remaining == SERF_LENGTH_UNKNOWN) {
return SERF_LENGTH_UNKNOWN;
}
remaining += bucket_remaining;
}
return remaining;
}
static apr_status_t serf_aggregate_set_config(serf_bucket_t *bucket,
serf_config_t *config)
{
/* This bucket doesn't need/update any shared config, but we need to pass
it along to our wrapped buckets. Store it for all buckets that will be
be added later. */
aggregate_context_t *ctx = bucket->data;
bucket_list_t *cur;
apr_status_t err_status = APR_SUCCESS;
ctx->config = config;
for(cur = ctx->list; cur != NULL; cur = cur->next) {
apr_status_t status;
status = serf_bucket_set_config(cur->bucket, config);
if (status)
err_status = status;
}
return err_status;
}
const serf_bucket_type_t serf_bucket_type_aggregate = {
"AGGREGATE",
serf_aggregate_read,
serf_aggregate_readline,
serf_aggregate_read_iovec,
serf_default_read_for_sendfile,
serf_buckets_are_v2,
serf_aggregate_peek,
serf_aggregate_destroy_and_data,
serf_aggregate_read_bucket,
serf_aggregate_get_remaining,
serf_aggregate_set_config,
};