/* ====================================================================
 *    Licensed to the Apache Software Foundation (ASF) under one
 *    or more contributor license agreements.  See the NOTICE file
 *    distributed with this work for additional information
 *    regarding copyright ownership.  The ASF licenses this file
 *    to you under the Apache License, Version 2.0 (the
 *    "License"); you may not use this file except in compliance
 *    with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing,
 *    software distributed under the License is distributed on an
 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *    KIND, either express or implied.  See the License for the
 *    specific language governing permissions and limitations
 *    under the License.
 * ====================================================================
 */

#include <apr_pools.h>
#include <apr_poll.h>
#include <apr_version.h>
#include <apr_portable.h>
#include <apr_strings.h>

#include "serf.h"
#include "serf_bucket_util.h"

#include "serf_private.h"
#include "protocols/http2_buckets.h"
#include "protocols/http2_protocol.h"

static apr_status_t
http2_outgoing_read(serf_connection_t *conn);

static apr_status_t
http2_outgoing_write(serf_connection_t *conn);

static apr_status_t
http2_outgoing_hangup(serf_connection_t *conn);

static void
http2_outgoing_teardown(serf_connection_t *conn);

static void
http2_outgoing_pre_teardown(serf_connection_t *conn);

static apr_status_t
http2_incoming_read(serf_incoming_t *client);

static apr_status_t
http2_incoming_write(serf_incoming_t *client);

static apr_status_t
http2_incoming_hangup(serf_incoming_t *client);

static void
http2_incoming_teardown(serf_incoming_t *conn);

static void
http2_incoming_pre_teardown(serf_incoming_t *conn);

static apr_status_t
http2_process(serf_http2_protocol_t *h2);

static void
http2_cancel_request(serf_request_t *rq, apr_status_t reason);

static void
http2_prioritize_request(serf_request_t *rq, bool exclusive);

static apr_status_t http2_write_data(serf_http2_protocol_t *h2);

static serf_bucket_t *
serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format,
                           ...)
{
    va_list argp;
    const char *c;
    char *buffer;
    apr_size_t sz = 0;
    unsigned char *r;

    va_start(argp, format);

    for (c = format; *c; c++)
    {
        SERF_H2_assert(*c >= '1' && *c <= '4');

        if (*c >= '1' && *c <= '4')
            sz += (*c - '0');
    }

    buffer = serf_bucket_mem_alloc(allocator, sz);
    r = (void*)buffer;
    for (c = format; *c; c++)
    {
        apr_uint32_t tmp;

        switch (*c)
        {
            case '1':
                *r++ = va_arg(argp, int) & 0xFF;
                break;
            case '2':
                tmp = va_arg(argp, int);
                *r++ = (tmp >> 8) & 0xFF;
                *r++ = tmp & 0xFF;
                break;
            case '3':
                tmp = va_arg(argp, apr_uint32_t);
                *r++ = (tmp >> 16) & 0xFF;
                *r++ = (tmp >> 8) & 0xFF;
                *r++ = tmp & 0xFF;
                break;
            case '4':
                tmp = va_arg(argp, apr_uint32_t);
                *r++ = (tmp >> 24) & 0xFF;
                *r++ = (tmp >> 16) & 0xFF;
                *r++ = (tmp >> 8) & 0xFF;
                *r++ = tmp & 0xFF;
                break;
        }
    }

    va_end(argp);

    return serf_bucket_simple_own_create(buffer, sz, allocator);
}

struct serf_http2_protocol_t
{
    apr_pool_t *pool;
    serf_connection_t *conn; /* Either CONN or CLIENT is set */
    serf_incoming_t *client;

    serf_io_baton_t *io; /* Low level connection */
    serf_pump_t *pump;

    serf_bucket_alloc_t *allocator;

    serf_http2_processor_t processor;
    void *processor_baton;
    serf_bucket_t *read_frame;   /* Frame currently being read */
    bool in_frame;
    apr_size_t prefix_left;

    serf_hpack_table_t *hpack_tbl;
    serf_config_t *config;

    /* Local -> Remote. Settings provided by other side */
    apr_uint32_t lr_default_window;
    apr_uint32_t lr_window;
    apr_uint32_t lr_max_framesize;
    apr_uint32_t lr_max_headersize;
    apr_uint32_t lr_max_concurrent;
    apr_uint32_t lr_hpack_table_size;
    apr_int32_t lr_next_streamid;
    bool lr_push_enabled;

    /* Remote -> Local. Settings set by us. Acknowledged by other side */
    apr_uint32_t rl_default_window;
    apr_uint32_t rl_window;
    apr_uint32_t rl_max_framesize;
    apr_uint32_t rl_max_headersize;
    apr_uint32_t rl_max_concurrent;
    apr_uint32_t rl_hpack_table_size;
    apr_int32_t rl_next_streamid;
    bool rl_push_enabled;

    apr_uint32_t rl_window_upd_below;
    apr_uint32_t rl_window_upd_to;

    serf_http2_stream_t *first;
    serf_http2_stream_t *last;

    int setting_acks;
    bool enforce_flow_control;

    serf_bucket_t *continuation_bucket;
    apr_int32_t continuation_streamid;

    serf_http2_stream_t *first_writable, *last_writable, *cur_writable;
};

/* Forward definition */
static apr_status_t
http2_bucket_processor(void *baton,
                       serf_http2_protocol_t *h2,
                       serf_bucket_t *frame_bucket);


static apr_status_t
http2_protocol_cleanup(void *state)
{
    serf_http2_protocol_t *h2 = state;
    serf_connection_t *conn = h2->conn;
    serf_incoming_t *client = h2->client;
    serf_http2_stream_t *stream, *next;

    /* First clean out all streams */
    for (stream = h2->first; stream; stream = next)
    {
        next = stream->next;
        serf_http2__stream_cleanup(stream);
    }

    h2->first = h2->last = NULL;

    if (h2->processor != NULL)
    {
        h2->read_frame = NULL;

        if (h2->processor == http2_bucket_processor)
        {
            serf_bucket_t *payload = h2->processor_baton;

            if (payload)
                serf_bucket_destroy(payload);

            h2->processor = NULL;
            h2->processor_baton = NULL;

        }
      /* Else: The processor (probably a stream)
               needs to handle this. It usually does that
               by adding frames to an aggregate to allow
               reading multiple frames as a stream. */
    }
    else if (h2->read_frame)
    {
        serf_bucket_destroy(h2->read_frame);
        h2->read_frame = NULL;
    }
    h2->in_frame = FALSE;

    if (conn)
        conn->protocol_baton = NULL;
    if (client)
        client->protocol_baton = NULL;

    return APR_SUCCESS;
}

static void http2_send_window_update(serf_http2_protocol_t *h2,
                                     serf_http2_stream_t *stream)
{
    apr_uint32_t increase;
    apr_uint32_t *window;
    apr_int32_t *stream_id;
    serf_bucket_t *bkt;
    apr_status_t status;
    struct window_update_t
    {
        unsigned char v3, v2, v1, v0;
    } window_update;

    if (!stream) {
        if (h2->rl_window >= h2->rl_window_upd_to)
            return;

        increase = h2->rl_window_upd_to - h2->rl_window;
        window = &h2->rl_window;
        stream_id = NULL;
    }
    else {
        if (stream->rl_window >= stream->rl_window_upd_to)
            return;

        increase = stream->rl_window_upd_to - stream->rl_window;
        window = &stream->rl_window;
        stream_id = &stream->streamid;
    }

    window_update.v3 = (increase >> 24) & 0xFF;
    window_update.v2 = (increase >> 16) & 0xFF;
    window_update.v1 = (increase >> 8) & 0xFF;
    window_update.v0 = increase & 0xFF;

    bkt = serf_bucket_simple_copy_create((void *)&window_update,
                                         sizeof(window_update),
                                         h2->allocator);

    bkt = serf__bucket_http2_frame_create(bkt, HTTP2_FRAME_TYPE_WINDOW_UPDATE,
                                          0, stream_id, NULL, NULL/* stream */,
                                          h2->lr_max_framesize,
                                          h2->allocator);
    status = serf_http2__enqueue_frame(h2, bkt, FALSE);

    if (!status) {
        /* Update our administration */
        (*window) += increase;
    }

    /* Ignore connection broken statee. Move along */
}

void serf__http2_protocol_init(serf_connection_t *conn)
{
    serf_http2_protocol_t *h2;
    apr_pool_t *protocol_pool;
    serf_bucket_t *tmp;
    const bool WE_ARE_CLIENT = true;

    apr_pool_create(&protocol_pool, conn->pool);

    h2 = apr_pcalloc(protocol_pool, sizeof(*h2));
    h2->pool = protocol_pool;
    h2->conn = conn;
    h2->io = &conn->io;
    h2->pump = &conn->pump;
    h2->allocator = conn->allocator;
    h2->config = conn->config;

    /* Defaults until negotiated */
    h2->rl_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->rl_next_streamid = WE_ARE_CLIENT ? 2 : 1;
    h2->rl_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
    h2->rl_max_headersize = APR_UINT32_MAX;
    h2->rl_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
    h2->rl_hpack_table_size = HTTP2_DEFAULT_HPACK_TABLE_SIZE;
    h2->rl_push_enabled = TRUE;

    h2->lr_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->lr_next_streamid = WE_ARE_CLIENT ? 1 : 2;
    h2->lr_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
    h2->lr_max_headersize = APR_UINT32_MAX;
    h2->lr_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
    h2->lr_hpack_table_size = HTTP2_DEFAULT_HPACK_TABLE_SIZE;
    h2->lr_push_enabled = TRUE;

    h2->rl_window_upd_below = 16 * 1024 * 1024; /* 16 MB*/
    h2->rl_window_upd_to = 128 * 1024 * 1024; /* 128 MB */

    h2->setting_acks = 0;
    h2->enforce_flow_control = TRUE;
    h2->continuation_bucket = NULL;
    h2->continuation_streamid = 0;

    h2->first = h2->last = NULL;

    h2->hpack_tbl = serf__hpack_table_create(TRUE,
                                             HTTP2_DEFAULT_HPACK_TABLE_SIZE,
                                             protocol_pool);

    apr_pool_cleanup_register(protocol_pool, h2, http2_protocol_cleanup,
                              apr_pool_cleanup_null);

    conn->perform_read = http2_outgoing_read;
    conn->perform_write = http2_outgoing_write;
    conn->perform_hangup = http2_outgoing_hangup;
    conn->perform_teardown = http2_outgoing_teardown;
    conn->perform_pre_teardown = http2_outgoing_pre_teardown;
    conn->perform_cancel_request = http2_cancel_request;
    conn->perform_prioritize_request = http2_prioritize_request;
    conn->protocol_baton = h2;

    /* Disable HTTP/1.1 guessing that affects writability */
    conn->probable_keepalive_limit = 0;
    conn->max_outstanding_requests = 0;

    /* Send the HTTP/2 Connection Preface */
    tmp = SERF_BUCKET_SIMPLE_STRING(HTTP2_CONNECTION_PREFIX, h2->allocator);
    serf_pump__add_output(h2->pump, tmp, false);

    /* And now a settings frame */
    tmp = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_SETTINGS,
                                              0,
                                              NULL, NULL, NULL, /* stream: 0 */
                                              h2->lr_max_framesize,
                                              h2->allocator);
    serf_http2__enqueue_frame(h2, tmp, FALSE);

    /* And an initial window update */
    http2_send_window_update(h2, NULL);
}

void serf__http2_protocol_init_server(serf_incoming_t *client)
{
    serf_http2_protocol_t *h2;
    apr_pool_t *protocol_pool;
    serf_bucket_t *tmp;
    const int WE_ARE_CLIENT = false;

    apr_pool_create(&protocol_pool, client->pool);

    h2 = apr_pcalloc(protocol_pool, sizeof(*h2));
    h2->pool = protocol_pool;
    h2->client = client;
    h2->io = &client->io;
    h2->pump = &client->pump;
    h2->allocator = client->allocator;
    h2->config = client->config;

    h2->prefix_left = sizeof(HTTP2_CONNECTION_PREFIX) - 1;

    /* Defaults until negotiated */
    h2->rl_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->rl_next_streamid = WE_ARE_CLIENT ? 2 : 1;
    h2->rl_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
    h2->rl_max_headersize = APR_UINT32_MAX;
    h2->rl_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
    h2->rl_hpack_table_size = HTTP2_DEFAULT_HPACK_TABLE_SIZE;
    h2->rl_push_enabled = TRUE;

    h2->lr_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
    h2->lr_next_streamid = WE_ARE_CLIENT ? 1 : 2;
    h2->lr_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
    h2->lr_max_headersize = APR_UINT32_MAX;
    h2->lr_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
    h2->lr_hpack_table_size = HTTP2_DEFAULT_HPACK_TABLE_SIZE;
    h2->lr_push_enabled = TRUE;

    h2->rl_window_upd_below = 16 * 1024 * 1024; /* 16 MB*/
    h2->rl_window_upd_to = 128 * 1024 * 1024; /* 128 MB */

    h2->setting_acks = 0;
    h2->enforce_flow_control = TRUE;
    h2->continuation_bucket = NULL;
    h2->continuation_streamid = 0;

    h2->first = h2->last = NULL;

    h2->hpack_tbl = serf__hpack_table_create(TRUE,
                                             HTTP2_DEFAULT_HPACK_TABLE_SIZE,
                                             protocol_pool);

    apr_pool_cleanup_register(protocol_pool, h2, http2_protocol_cleanup,
                              apr_pool_cleanup_null);

    client->perform_read = http2_incoming_read;
    client->perform_write = http2_incoming_write;
    client->perform_hangup = http2_incoming_hangup;
    client->perform_teardown = http2_incoming_teardown;
    client->perform_pre_teardown = http2_incoming_pre_teardown;
    client->protocol_baton = h2;

    /* Send a settings frame */
    tmp = serf__bucket_http2_frame_create(NULL, HTTP2_FRAME_TYPE_SETTINGS,
                                          0,
                                          NULL, NULL, NULL, /* stream: 0 */
                                          h2->lr_max_framesize,
                                          h2->allocator);

    serf_http2__enqueue_frame(h2, tmp, FALSE);

    /* And an initial window update*/
    http2_send_window_update(h2, NULL);
}

/* Creates a HTTP/2 request from a serf request */
static apr_status_t
enqueue_http2_request(serf_http2_protocol_t *h2)
{
    serf_http2_stream_t *stream;

    stream = serf_http2__stream_create(h2, -1,
                                       h2->lr_default_window,
                                       h2->rl_default_window,
                                       h2->allocator);

    if (h2->first)
    {
        stream->next = h2->first;
        h2->first->prev = stream;
        h2->first = stream;
    }
    else
        h2->last = h2->first = stream;

    return serf_http2__stream_setup_next_request(stream, h2->conn,
                                                 h2->lr_max_framesize,
                                                 h2->hpack_tbl);
}

apr_status_t
serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
                          serf_bucket_t *frame,
                          bool flush)
{
    return serf_pump__add_output(h2->pump, frame, flush);
}

/* Implements serf_bucket_prefix_handler_t.
   Handles PRIORITY frames and the priority prefix of HEADERS frames */
static apr_status_t
http2_handle_priority(void *baton,
                      serf_bucket_t *bucket,
                      const char *data,
                      apr_size_t len)
{
    serf_http2_stream_t *stream = baton;

    if (len != HTTP2_PRIORITY_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    if (stream == NULL)
        return APR_SUCCESS; /* Nothing to record this on */

    /* ### TODO: Store priority information on stream */
    SERF_H2_assert(stream->h2 != NULL);

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles the promise prefix of PUSH_PROMISE frames */
static apr_status_t
http2_handle_promise(void *baton,
                     serf_bucket_t *bucket,
                     const char *data,
                     apr_size_t len)
{
    serf_http2_stream_t *parent_stream = baton;
    serf_http2_protocol_t *h2 = parent_stream->h2;
    serf_http2_stream_t *promised_stream;
    apr_int32_t streamid;
    const struct promise_t
    {
        unsigned char s3, s2, s1, s0;
    } *promise;

    if (len != HTTP2_PROMISE_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    SERF_H2_assert(h2 != NULL);

    promise = (const void *)data;

    /* Highest bit is reserved */
    streamid = ((promise->s3 & 0x7F) << 24) | (promise->s2 << 16)
        | (promise->s1 << 8) | promise->s0;

    if (streamid == 0
        || (streamid < h2->rl_next_streamid)
        || (streamid & 0x01) != (h2->rl_next_streamid & 0x01))
    {
      /* The promised stream identifier MUST bet a valid choice for the
         next stream sent by the sender */

      /* A receiver MUST treat the receipt of a PUSH_PROMISE that promises an
         illegal stream identifier (Section 5.1.1) as a connection error
         (Section 5.4.1) of type PROTOCOL_ERROR.  Note that an illegal stream
         identifier is an identifier for a stream that is not currently in the
         "idle" state.*/

        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
    }
    else if (parent_stream->status != H2S_OPEN
             && parent_stream->status != H2S_HALFCLOSED_LOCAL)
    {
      /* PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
         is in either the "open" or "half-closed (remote)" state.  The stream
         identifier of a PUSH_PROMISE frame indicates the stream it is
         associated with.  If the stream identifier field specifies the value
         0x0, a recipient MUST respond with a connection error (Section 5.4.1)
         of type PROTOCOL_ERROR.*/

        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
    }

    promised_stream = serf_http2__stream_get(h2, streamid, TRUE, FALSE);
    if (!promised_stream || promised_stream->status != H2S_IDLE)
        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;

    promised_stream->status = H2S_RESERVED_REMOTE;

    /* Store data to allow stream to handle the promise */
    parent_stream->new_reserved_stream = promised_stream;

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles the promise prefix of FRAME_RSET frames */
static apr_status_t
http2_handle_frame_reset(void *baton,
                         serf_bucket_t *bucket,
                         const char *data,
                         apr_size_t len)
{
    serf_http2_stream_t *stream = baton;

    if (len != HTTP2_RST_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    SERF_H2_assert(stream->h2 != NULL);

    /* ### TODO: Handle error code, etc. */
    stream->status = H2S_CLOSED;

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles WINDOW_UPDATE frames when they apply to a stream */
static apr_status_t
http2_handle_stream_window_update(void *baton,
                                  serf_bucket_t *bucket,
                                  const char *data,
                                  apr_size_t len)
{
    serf_http2_stream_t *stream = baton;
    apr_uint32_t value;
    const struct window_update_t
    {
        unsigned char v3, v2, v1, v0;
    } *window_update;


    if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    window_update = (const void *)data;

    value = (window_update->v3 << 24) | (window_update->v2 << 16)
        | (window_update->v1 << 8) | window_update->v0;

    value &= HTTP2_WINDOW_MAX_ALLOWED; /* The highest bit is reserved */

    if (value == 0)
    {
      /* A receiver MUST treat the receipt of a WINDOW_UPDATE frame with an
        flow - control window increment of 0 as a stream error(Section 5.4.2)
        of type PROTOCOL_ERROR; errors on the connection flow - control window
        MUST be treated as a connection error(Section 5.4.1). */
        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
    }

    stream->lr_window += value;

    if (stream->lr_window > HTTP2_WINDOW_MAX_ALLOWED	)
    {
      /* A sender MUST NOT allow a flow-control window to exceed 2^31-1
         octets.  If a sender receives a WINDOW_UPDATE that causes a flow-
         control window to exceed this maximum, it MUST terminate either the
         stream or the connection, as appropriate.  For streams, the sender
         sends a RST_STREAM with an error code of FLOW_CONTROL_ERROR; for the
         connection, a GOAWAY frame with an error code of FLOW_CONTROL_ERROR
         is sent.*/
        return SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
    }

    serf__log(LOGLVL_INFO, SERF_LOGHTTP2, stream->h2->config,
              "Increasing window on stream %d with 0x%x to 0x%x\n",
              stream->streamid, value, stream->lr_window);

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles WINDOW_UPDATE frames when they apply to the connection */
static apr_status_t
http2_handle_connection_window_update(void *baton,
                                      serf_bucket_t *bucket,
                                      const char *data,
                                      apr_size_t len)
{
    serf_http2_protocol_t *h2 = baton;
    apr_uint32_t value;
    bool was0;
    const struct window_update_t
    {
        unsigned char v3, v2, v1, v0;
    } *window_update;

    if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    SERF_H2_assert(h2 != NULL);

    window_update = (const void *)data;

    value = (window_update->v3 << 24) | (window_update->v2 << 16)
        | (window_update->v1 << 8) | window_update->v0;

    value &= HTTP2_WINDOW_MAX_ALLOWED; /* The highest bit is reserved */

    if (value == 0)
    {
      /* A receiver MUST treat the receipt of a WINDOW_UPDATE frame with an
        flow - control window increment of 0 as a stream error(Section 5.4.2)
        of type PROTOCOL_ERROR; errors on the connection flow - control window
        MUST be treated as a connection error(Section 5.4.1). */
        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
    }

    was0 = (h2->lr_window == 0);
    h2->lr_window += value;

    if (h2->lr_window > HTTP2_WINDOW_MAX_ALLOWED)
    {
      /* A sender MUST NOT allow a flow-control window to exceed 2^31-1
         octets.  If a sender receives a WINDOW_UPDATE that causes a flow-
         control window to exceed this maximum, it MUST terminate either the
         stream or the connection, as appropriate.  For streams, the sender
         sends a RST_STREAM with an error code of FLOW_CONTROL_ERROR; for the
         connection, a GOAWAY frame with an error code of FLOW_CONTROL_ERROR
         is sent.*/
        return SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
    }

    serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
              "Increasing window on connection with 0x%x to 0x%x\n",
              value, h2->lr_window);

    if (was0)
        return http2_write_data(h2);
    else
        return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles PING frames for pings initiated remotely */
static apr_status_t
http2_handle_ping(void *baton,
                  serf_bucket_t *bucket,
                  const char *data,
                  apr_size_t len)
{
    serf_http2_protocol_t *h2 = baton;
    serf_bucket_t *body;
    apr_status_t status;

    if (len != HTTP2_PING_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    SERF_H2_assert(h2 != NULL);

    /* Reply with a PONG (=PING + ACK) with the same data*/

    body = serf_bucket_simple_copy_create(data, len,
                                          h2->allocator);

    status = serf_http2__enqueue_frame(
        h2,
        serf__bucket_http2_frame_create(body,
                                        HTTP2_FRAME_TYPE_PING,
                                        HTTP2_FLAG_ACK,
                                        NULL, NULL, NULL,
                                        h2->lr_max_framesize,
                                        h2->allocator),
        TRUE /* pump */);

    if (SERF_BUCKET_READ_ERROR(status))
        return status;

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles PING frames for pings initiated locally */
static apr_status_t
http2_handle_ping_ack(void *baton,
                      serf_bucket_t *bucket,
                      const char *data,
                      apr_size_t len)
{
    serf_http2_protocol_t *h2 = baton;
    if (len != HTTP2_PING_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    SERF_H2_assert(h2 != NULL);

    /* Did we send a ping? */

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles SETTINGS frames */
static apr_status_t
http2_handle_settings(void *baton,
                      serf_bucket_t *bucket,
                      const char *data,
                      apr_size_t len)
{
    serf_http2_protocol_t *h2 = baton;
    apr_size_t i;
    const struct setting_t
    {
        unsigned char s1, s0;
        unsigned char v3, v2, v1, v0;
    } *setting;

    if ((len % HTTP2_SETTING_SIZE) != 0)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

      /* ### TODO: Handle settings */
    setting = (const void *)data;
    for (i = 0, setting = (const void *)data;
    i < len;
        i += sizeof(*setting), setting++)
    {
        apr_uint16_t id = (setting->s1 << 8) | setting->s0;
        apr_uint32_t value = (setting->v3 << 24) | (setting->v2 << 16)
            | (setting->v1 << 8) | setting->v0;

        switch (id)
        {
            case HTTP2_SETTING_HEADER_TABLE_SIZE:
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Setting HPACK Table size to %u\n", value);
                serf__hpack_table_set_max_table_size(h2->hpack_tbl,
                                                     h2->rl_hpack_table_size,
                                                     value);
                break;
            case HTTP2_SETTING_ENABLE_PUSH:
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Setting Push enabled: %u\n", value);
                h2->lr_push_enabled = (value != 0);
                break;
            case HTTP2_SETTING_MAX_CONCURRENT_STREAMS:
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Setting Max Concurrent %u\n", value);
                h2->lr_max_concurrent = value;
                break;
            case HTTP2_SETTING_INITIAL_WINDOW_SIZE:
              /* Sanitize? */
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Setting Initial Window Size %u\n", value);
                /* This only affects the default window size for new streams
                   (the connection window size is left unchanged):

                   Both endpoints can adjust the initial window size for new
                   streams by including a value for SETTINGS_INITIAL_WINDOW_SIZE
                   in the SETTINGS frame that forms part of the connection
                   preface.  The connection flow-control window can only be
                   changed using WINDOW_UPDATE frames. */
                h2->lr_default_window = value;
                break;
            case HTTP2_SETTING_MAX_FRAME_SIZE:
              /* Sanitize? */
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Setting Max framesize %u\n", value);
                h2->lr_max_framesize = value;
                break;
            case HTTP2_SETTING_MAX_HEADER_LIST_SIZE:
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Setting Max header list size %u\n", value);
                h2->lr_max_headersize = value;
                break;
            default:
              /* An endpoint that receives a SETTINGS frame with any unknown
                 or unsupported identifier MUST ignore that setting. */
                serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                          "Ignoring unknown setting %d, value %u\n", id, value);
                break;
        }
    }

  /* Always ack settings */
    serf_http2__enqueue_frame(
        h2,
        serf__bucket_http2_frame_create(
            NULL,
            HTTP2_FRAME_TYPE_SETTINGS,
            HTTP2_FLAG_ACK,
            NULL, NULL, NULL,
            h2->lr_max_framesize,
            h2->allocator),
        TRUE);

    return APR_SUCCESS;
}

/* Implements serf_bucket_prefix_handler_t.
   Handles GOAWAY frames */
static apr_status_t
http2_handle_goaway(void *baton,
                    serf_bucket_t *bucket,
                    const char *data,
                    apr_size_t len)
{
    serf_http2_protocol_t *h2 = baton;
    apr_int32_t last_streamid;
    apr_uint32_t error_code;
    apr_uint32_t loglevel;
    const struct goaway_t
    {
        unsigned char s3, s2, s1, s0;
        unsigned char e3, e2, e1, e0;
    } *goaway;

    if (len < HTTP2_GOWAWAY_DATA_SIZE)
        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;

    SERF_H2_assert(h2 != NULL);

    goaway = (const void *)data;

    last_streamid = ((goaway->s3 & 0x7F) << 24) | (goaway->s2 << 16)
        | (goaway->s1 << 8) | goaway->s0;
    error_code = (goaway->e3 << 24) | (goaway->e2 << 16)
        | (goaway->e1 << 8) | goaway->e0;

    switch (error_code + SERF_ERROR_HTTP2_NO_ERROR)
    {
        case SERF_ERROR_HTTP2_PROTOCOL_ERROR:
        case SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR:
        case SERF_ERROR_HTTP2_SETTINGS_TIMEOUT:
        case SERF_ERROR_HTTP2_FRAME_SIZE_ERROR:
        case SERF_ERROR_HTTP2_COMPRESSION_ERROR:
        case SERF_ERROR_HTTP2_INADEQUATE_SECURITY:
            loglevel = LOGLVL_ERROR;
            break;

        case SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED:
        case SERF_ERROR_HTTP2_ENHANCE_YOUR_CALM:
            loglevel = LOGLVL_WARNING;
            break;

        case SERF_ERROR_HTTP2_REFUSED_STREAM:
        case SERF_ERROR_HTTP2_CANCEL:
        case SERF_ERROR_HTTP2_CONNECT_ERROR:
        case SERF_ERROR_HTTP2_STREAM_CLOSED:
          /* These errors should have been sent as a stream
             error. This usually tells us that we have an http/2
             implementation on the other side that doesn't implement
             full stream state handling. (See HTTP/2 RFC)*/
            loglevel = LOGLVL_ERROR;
            break;

        case SERF_ERROR_HTTP2_NO_ERROR:
            loglevel = LOGLVL_INFO;
            break;

        case SERF_ERROR_HTTP2_INTERNAL_ERROR:
        default:
            loglevel = LOGLVL_WARNING;
            break;
    }

    if (len > HTTP2_GOWAWAY_DATA_SIZE)
    {
        char *goaway_text;

        /* The server produced additional information in the error frame
           Usually this is some literal text explaining what went wrong.

           Copy the text to make it 0 terminated and then log it. */

        /* If this value appears truncated, that may be caused by the
           limit set in http2_process */

        goaway_text = serf_bstrmemdup(h2->allocator,
                                      data + HTTP2_GOWAWAY_DATA_SIZE,
                                      len - HTTP2_GOWAWAY_DATA_SIZE);

        serf__log(loglevel, SERF_LOGHTTP2, h2->config,
                  "Received GOAWAY, last-stream=0x%x, error=%u: %s\n",
                  last_streamid, error_code, goaway_text);

        serf_bucket_mem_free(h2->allocator, goaway_text);
    }
    else
    {
        serf__log(loglevel, SERF_LOGHTTP2, h2->config,
                  "Received GOAWAY, last-stream=0x%x, error=%u.\n",
                  last_streamid, error_code);
    }

  /* ### TODO: If the error is not critical stop creating new frames
               on this connection, while still going forward with the
               existing frames.

               We may receive a new error later, signalling a more
               important problem */

    return APR_SUCCESS;
}


/* Implements serf_bucket_aggregate_eof_t */
static apr_status_t
http2_handle_continuation(void *baton,
                          serf_bucket_t *aggregate_bucket)
{
    serf_http2_protocol_t *h2 = baton;
    apr_status_t status;
    const char *data;
    apr_size_t len;

    if (h2->continuation_bucket != aggregate_bucket)
        return APR_EOF; /* This is all we have */

    SERF_H2_assert(h2->read_frame == NULL);
    SERF_H2_assert(h2->continuation_bucket == aggregate_bucket);

    status = http2_process(h2);
    if (status)
        return status;

    if (h2->continuation_bucket == aggregate_bucket)
    {
      /* We expect more data in the future. Something
         was done in http2_process() or it didn't
         return APR_SUCCESS */
        return APR_SUCCESS;
    }

  /* As h2->continuation_bucket is no longer attached we don't
     recurse on peeking. Just check if there is more */
    return serf_bucket_peek(aggregate_bucket, &data, &len);
}

/* Implements the serf__bucket_http2_unframe_set_eof callback */
static apr_status_t
http2_end_of_frame(void *baton,
                   serf_bucket_t *frame)
{
    serf_http2_protocol_t *h2 = baton;

    SERF_H2_assert(h2->read_frame == frame);
    h2->read_frame = NULL;
    h2->in_frame = FALSE;
    h2->processor = NULL;
    h2->processor_baton = NULL;

    return APR_SUCCESS;
}

/* Implements serf_http2_processor_t */
static apr_status_t
http2_bucket_processor(void *baton,
                       serf_http2_protocol_t *h2,
                       serf_bucket_t *frame_bucket)
{
    struct iovec vecs[SERF__STD_IOV_COUNT];
    int vecs_used;
    serf_bucket_t *payload = baton;
    apr_status_t status;

    status = serf_bucket_read_iovec(payload, SERF_READ_ALL_AVAIL,
                                    COUNT_OF(vecs), vecs, &vecs_used);

    if (APR_STATUS_IS_EOF(status))
    {
        SERF_H2_assert(!h2->in_frame && !h2->read_frame);
        serf_bucket_destroy(payload);
    }

    return status;
}

/* Processes incoming HTTP2 data */
static apr_status_t
http2_process(serf_http2_protocol_t *h2)
{
    while (TRUE)
    {
        apr_status_t status;
        serf_bucket_t *body;

        if (h2->processor)
        {
            status = h2->processor(h2->processor_baton, h2, h2->read_frame);

            if (SERF_BUCKET_READ_ERROR(status))
                return status;
            else if (APR_STATUS_IS_EOF(status))
            {
              /* ### frame ended */
                SERF_H2_assert(h2->read_frame == NULL);
                h2->processor = NULL;
                h2->processor_baton = NULL;
            }
            else if (h2->in_frame)
            {
                if (status)
                    return status;
                else
                    continue;
            }
        }
        else
        {
            SERF_H2_assert(!h2->in_frame);
        }

        body = h2->read_frame;

        if (!body)
        {
            SERF_H2_assert(!h2->in_frame);

            body = serf__bucket_http2_unframe_create(
                h2->pump->stream,
                h2->rl_max_framesize,
                h2->allocator);

            serf__bucket_http2_unframe_set_eof(body,
                                               http2_end_of_frame, h2);

            serf_bucket_set_config(body, h2->config);
            h2->read_frame = body;
        }

        if (!h2->in_frame)
        {
            apr_int32_t sid;
            unsigned char frametype;
            unsigned char frameflags;
            apr_size_t remaining;
            serf_http2_processor_t process_handler = NULL;
            void *process_baton = NULL;
            serf_bucket_t *process_bucket = NULL;
            serf_http2_stream_t *stream;
            apr_uint32_t reset_reason;

            status = serf__bucket_http2_unframe_read_info(body, &sid,
                                                          &frametype,
                                                          &frameflags);

            if (APR_STATUS_IS_EOF(status))
            {
              /* Entire frame is already read (just header) */
                SERF_H2_assert(h2->read_frame == NULL);
                SERF_H2_assert(!h2->in_frame);
            }
            else if (status)
            {
                SERF_H2_assert(h2->read_frame != NULL);
                SERF_H2_assert(!h2->in_frame);
                return (status == SERF_ERROR_EMPTY_READ) ? APR_SUCCESS
                                                         : status;
            }
            else
            {
                h2->in_frame = TRUE;
                SERF_H2_assert(h2->read_frame != NULL);
            }

            serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
                      "Reading 0x%x frame, stream=0x%x, flags=0x%x\n",
                      frametype, sid, frameflags);

            /* If status is EOF then the frame doesn't have/declare a body */
            switch (frametype)
            {
      /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_DATA:
                case HTTP2_FRAME_TYPE_HEADERS:
                case HTTP2_FRAME_TYPE_PUSH_PROMISE:
                    if (h2->continuation_bucket)
                    {
                        h2->continuation_bucket = NULL;
                        h2->continuation_streamid = 0;
                        return APR_EAGAIN;
                    }

                    stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);

                    if (sid == 0)
                    {
                      /* DATA, HEADERS and PUSH_PROMISE:

                        These frames MUST be associated with a stream.  If a
                        XXX frame is received whose stream identifier field is
                        0x0, the recipient MUST respond with a connection error
                        (Section 5.4.1) of type PROTOCOL_ERROR. */
                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }

                    reset_reason = 0;

                    if (frametype == HTTP2_FRAME_TYPE_DATA)
                    {
                      /* Windowing is applied above padding! */
                        remaining =
                            (apr_size_t)serf_bucket_get_remaining(body);

                        if (h2->rl_window < remaining)
                        {
                            if (h2->enforce_flow_control) {
                                reset_reason =
                                    SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
                            }

                            h2->rl_window = 0;
                        }
                        else
                            h2->rl_window -= remaining;

                        if (h2->rl_window < h2->rl_window_upd_below)
                            http2_send_window_update(h2, NULL);

                        if (stream)
                        {
                            if (stream->rl_window < remaining)
                            {
                                if (h2->enforce_flow_control) {
                                    reset_reason =
                                        SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
                                }

                                stream->rl_window = 0;
                            }
                            else
                                stream->rl_window -= remaining;

                            /* If the stream is not at the end, perhaps we
                               should allow it to send more data */
                            if (!(frameflags & HTTP2_FLAG_END_STREAM)
                                && stream->rl_window < stream->rl_window_upd_below) {

                                http2_send_window_update(h2, stream);
                            }
                        }
                    }

                  /* DATA, HEADERS and PUSH_PROMISE can have padding */
                    if (frameflags & HTTP2_FLAG_PADDED) {
                        body = serf__bucket_http2_unpad_create(body,
                                                               h2->allocator);
                    }

                      /* An HEADERS frame can have an included priority 'frame' */
                    if (frametype == HTTP2_FRAME_TYPE_HEADERS
                        && (frameflags & HTTP2_FLAG_PRIORITY))
                    {
                        body = serf_bucket_prefix_create(body,
                                                         HTTP2_PRIORITY_DATA_SIZE,
                                                         http2_handle_priority,
                                                         stream, h2->allocator);
                    }
                    else if (frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE)
                    {
                        body = serf_bucket_prefix_create(body,
                                                         HTTP2_PROMISE_DATA_SIZE,
                                                         http2_handle_promise,
                                                         stream, h2->allocator);
                    }

                    if (!stream)
                    {
                        if (!reset_reason)
                            reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
                    }
                    else
                        switch (frametype)
                        {
                            case HTTP2_FRAME_TYPE_DATA:
                                if (stream->status != H2S_OPEN
                                    && stream->status != H2S_HALFCLOSED_LOCAL)
                                {
                                    reset_reason =
                                        SERF_ERROR_HTTP2_STREAM_CLOSED;
                                }
                                break;
                            case HTTP2_FRAME_TYPE_HEADERS:
                                if (stream->status != H2S_OPEN
                                    && stream->status != H2S_HALFCLOSED_LOCAL
                                    && stream->status != H2S_IDLE
                                    && stream->status != H2S_RESERVED_REMOTE)
                                {
                                    reset_reason =
                                        SERF_ERROR_HTTP2_STREAM_CLOSED;
                                }
                                break;
                            case HTTP2_FRAME_TYPE_PUSH_PROMISE:
                                if (stream->status != H2S_OPEN
                                    && stream->status != H2S_HALFCLOSED_LOCAL)
                                {
                                    reset_reason =
                                        SERF_ERROR_HTTP2_STREAM_CLOSED;
                                }
                                break;
                        }

                    if (reset_reason)
                    {
                        if (stream) {
                            serf_http2__stream_reset(stream, reset_reason,
                                                     TRUE);
                        }
                        else {
                            serf_http2__enqueue_stream_reset(h2, sid,
                                                             reset_reason);
                        }
                    }

                    if (frametype == HTTP2_FRAME_TYPE_HEADERS
                        || frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE)
                    {
                        if (!(frameflags & HTTP2_FLAG_END_HEADERS))
                        {
                          /* This header frame is *directly* followed by
                             continuation frames... We hide this from the
                             stream code, by providing an aggregate that will
                             read through the body of multiple frames */

                            h2->continuation_bucket =
                                serf_bucket_aggregate_create(h2->allocator);
                            h2->continuation_streamid = sid;

                            serf_bucket_aggregate_append(
                                h2->continuation_bucket, body);

                            serf_bucket_aggregate_hold_open(
                                h2->continuation_bucket,
                                http2_handle_continuation, h2);

                            body = h2->continuation_bucket;
                        }

                        if (stream && !reset_reason)
                        {
                            body = serf_http2__stream_handle_hpack(
                                stream, body, frametype,
                                (frameflags & HTTP2_FLAG_END_STREAM),
                                HTTP2_MAX_HEADER_ENTRYSIZE,
                                h2->hpack_tbl, h2->config,
                                h2->allocator);
                        }
                        else
                        {
                          /* Even when we don't want to process the headers we
                              must read them to update the HPACK state */
                            body = serf__bucket_hpack_decode_create(
                                body,
                                HTTP2_MAX_HEADER_ENTRYSIZE,
                                h2->hpack_tbl, h2->allocator);
                        }
                    }
                    else if (!reset_reason)
                    {
                      /* We have a data bucket */
                        body = serf_http2__stream_handle_data(
                            stream, body, frametype,
                            (frameflags & HTTP2_FLAG_END_STREAM),
                            h2->config, h2->allocator);
                    }

                    if (body) {
                        /* We will take care of discarding */
                        process_bucket = body;
                    }
                    else
                    {
                      /* The stream wants to handle the reading itself */
                        process_handler = serf_http2__stream_processor;
                        process_baton = stream;
                    }
                    break;

          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_PRIORITY:
                    if (sid == 0)
                    {
                      /* The PRIORITY frame always identifies a stream.  If a
                         PRIORITY frame is received with a stream identifier
                         of 0x0, the recipient MUST respond with a connection
                         error (Section 5.4.1) of type PROTOCOL_ERROR.*/

                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }
                    else if (serf_bucket_get_remaining(body)
                             != HTTP2_PRIORITY_DATA_SIZE)
                    {
                      /* A PRIORITY frame with a length other than 5 octets
                         MUST be treated as a stream error (Section 5.4.2) of
                         type FRAME_SIZE_ERROR.*/

                      /* ### But we currently upgrade this to a connection error
                       */
                        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
                    }

                    stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);

                    if (stream)
                    {
                        body = serf_bucket_prefix_create(
                                            body,
                                            HTTP2_PRIORITY_DATA_SIZE,
                                            http2_handle_priority,
                                            stream, h2->allocator);
                    }

                  /* Just reading will do the right thing now */
                    process_bucket = body;
                    break;

          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_RST_STREAM:
                    if (sid == 0)
                    {
                      /* RST_STREAM frames MUST be associated with a stream.
                         If a RST_STREAM frame is received with a stream
                         identifier of 0x0, the recipient MUST treat this as
                         a connection error (Section 5.4.1) of type
                         PROTOCOL_ERROR.
                       */

                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }
                    else if (serf_bucket_get_remaining(body)
                             != HTTP2_RST_DATA_SIZE)
                    {
                      /* A RST_STREAM frame with a length other than 4 octets
                         MUST be treated as a connection error (Section 5.4.1)
                         of type FRAME_SIZE_ERROR. */

                        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
                    }

                    stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);

                    if (stream)
                    {
                        body = serf_bucket_prefix_create(
                                                body, HTTP2_RST_DATA_SIZE,
                                                http2_handle_frame_reset,
                                                stream, h2->allocator);
                    }

                  /* Just reading will do the right thing now */
                    process_bucket = body;
                    break;

          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_SETTINGS:
                    if (sid != 0)
                    {
                      /* SETTINGS frames always apply to a connection, never a
                         single stream. The stream identifier for a SETTINGS
                         frame MUST be zero (0x0).  If an endpoint receives a
                         SETTINGS frame whose stream identifier field is
                         anything other than 0x0, the endpoint MUST respond
                         with a connection error (Section 5.4.1) of type
                         PROTOCOL_ERROR.
                      */
                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }

                    remaining = (apr_size_t)serf_bucket_get_remaining(body);
                    if (frameflags & HTTP2_FLAG_ACK)
                    {
                        if (remaining != 0)
                        {
                          /* When this bit is set, the payload of the SETTINGS
                             frame MUST be empty. Receipt of a SETTINGS frame
                             with the ACK flag set and a length field value
                             other than 0 MUST be treated as a connection error
                             (Section 5.4.1) of type FRAME_SIZE_ERROR. */
                            return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
                        }
                        h2->setting_acks++;
                    }
                    else if ((remaining % HTTP2_SETTING_SIZE) != 0)
                    {
                      /* A SETTINGS frame with a length other than a multiple
                         of 6 octets MUST be treated as a connection error
                         (Section 5.4.1) of type FRAME_SIZE_ERROR. */
                        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
                    }
                    else
                    {
                      /* Just read everything... We checked it against our
                         max-framesize */
                        body = serf_bucket_prefix_create(body, remaining,
                                                         http2_handle_settings,
                                                         h2, h2->allocator);
                    }

                  /* Just reading will do the right thing now */
                    process_bucket = body;
                    break;

          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_PING:
                    if (sid != 0)
                    {
                      /* PING frames are not associated with any individual
                         stream.  If a PING frame is received with a stream
                         identifier field value other than 0x0, the recipient
                         MUST respond with a connection error (Section 5.4.1)
                         of type PROTOCOL_ERROR.*/
                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }
                    else if (serf_bucket_get_remaining(body)
                             != HTTP2_PING_DATA_SIZE)
                    {
                      /* Receipt of a PING frame with a length field value
                         other than 8 MUST be treated as a connection error
                         (Section 5.4.1) of type FRAME_SIZE_ERROR.. */
                        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
                    }

                    body = serf_bucket_prefix_create(
                                            body,
                                            HTTP2_PING_DATA_SIZE,
                                            (frameflags & HTTP2_FLAG_ACK)
                                                    ? http2_handle_ping
                                                    : http2_handle_ping_ack,
                                            h2, h2->allocator);

                    /* Just reading will do the right thing now */
                    process_bucket = body;
                    break;
          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_GOAWAY:
                    if (sid != 0)
                    {
                      /* The GOAWAY frame applies to the connection, not a
                         specific stream. An endpoint MUST treat a GOAWAY frame
                         with a stream identifier other than 0x0 as a
                         connection error (Section 5.4.1) of type
                         PROTOCOL_ERROR. */
                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }

                  /* As the final go-away frame is best effort only we are not
                     checking the bodysize against HTTP2_GOWAWAY_DATA_SIZE
                     here. We'll see what we get in the goaway handler.

                     Go away frames may contain additional opaque debug
                     information at the end, so instead of reading
                     HTTP2_GOWAWAY_DATA_SIZE bytes, we just read the whole frame.
                   */
                    remaining = (apr_size_t)serf_bucket_get_remaining(body);

                    body = serf_bucket_prefix_create(body,
                                                     MIN(remaining,
                                                      HTTP2_GOWAWAY_DATA_SIZE
                                                      + 256),
                                                     http2_handle_goaway, h2,
                                                     h2->allocator);

                    /* Just reading will do the right thing now */
                    process_bucket = body;
                    break;
          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_WINDOW_UPDATE:
                    if (serf_bucket_get_remaining(body)
                        != HTTP2_WINDOW_UPDATE_DATA_SIZE)
                    {
                      /* A WINDOW_UPDATE frame with a length other than 4
                         octets MUST be treated as a connection error
                         (Section 5.4.1) of type FRAME_SIZE_ERROR. */
                        return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
                    }

                    if (sid == 0)
                    {
                        body = serf_bucket_prefix_create(
                            body,
                            HTTP2_WINDOW_UPDATE_DATA_SIZE,
                            http2_handle_connection_window_update, h2,
                            h2->allocator);
                    }
                    else
                    {
                        stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);

                        if (stream)
                            body = serf_bucket_prefix_create(
                                body,
                                HTTP2_WINDOW_UPDATE_DATA_SIZE,
                                http2_handle_stream_window_update, stream,
                                h2->allocator);
                    }

                  /* Just reading will do the right thing now */
                    process_bucket = body;
                    break;

          /* ---------------------------------------------------- */
                case HTTP2_FRAME_TYPE_CONTINUATION:
                    if (!h2->continuation_bucket
                        || sid != h2->continuation_streamid)
                    {
                      /* A CONTINUATION frame MUST be preceded by a HEADERS,
                         PUSH_PROMISE or CONTINUATION frame without the
                         END_HEADERS flag set. A recipient that observes
                         violation of this rule MUST respond with a connection
                         error(Section 5.4.1) of type PROTOCOL_ERROR. */
                        h2->continuation_bucket = NULL;
                        h2->continuation_streamid = 0;
                        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                    }

                    serf_bucket_aggregate_append(h2->continuation_bucket,
                                                 body);

                    if (frameflags & HTTP2_FLAG_END_HEADERS)
                    {
                        h2->continuation_bucket = NULL;
                        h2->continuation_streamid = 0;
                    }

                    return APR_SUCCESS;

          /* ---------------------------------------------------- */
                default:
                  /* We explicitly ignore all other frames as required,
                     so reading will do the right thing now */
                    process_bucket = body;
            } /* switch */

            if (body)
                serf_bucket_set_config(body, h2->config);

            SERF_H2_assert(h2->processor == NULL);

            if (process_handler)
            {
                h2->processor = process_handler;
                h2->processor_baton = process_baton;
            }
            else
            {
                SERF_H2_assert(process_bucket != NULL);
                h2->processor = http2_bucket_processor;
                h2->processor_baton = process_bucket;
            }
        }
    } /* while(TRUE) */
}

static apr_status_t http2_write_data(serf_http2_protocol_t *h2)
{
    serf_http2_stream_t *stream = h2->cur_writable;

    while (h2->lr_window > 0)
    {
        apr_status_t status;

        if (!stream)
            stream = h2->first_writable;

        if (!stream)
            return APR_SUCCESS;

        if (stream->status != H2S_OPEN
            && stream->status != H2S_HALFCLOSED_REMOTE)
        {
            /* This stream is NOT writable. Remove it */
            if (stream->prev_writable)
                stream->prev_writable->next_writable = stream->next_writable;
            else
                h2->first_writable = stream->next_writable;

            if (stream->next_writable)
                stream->next_writable->prev_writable = stream->prev_writable;
            else
                h2->last_writable = stream->prev_writable;

            stream->prev_writable = stream->next_writable = NULL;
            stream = NULL;
            continue;
        }

        status = serf_http2__stream_write_data(stream);
        if (status || stream->lr_window == 0)
            h2->cur_writable = stream->next_writable;

        return status ? status : APR_EAGAIN;
    }

    return APR_SUCCESS; /* Done for now */
}

static apr_status_t
http2_outgoing_read(serf_connection_t *conn)
{
    serf_http2_protocol_t *h2 = conn->protocol_baton;
    apr_status_t status;

    status = http2_process(h2);

    if (!status)
        return APR_SUCCESS;
    else if (APR_STATUS_IS_EOF(status))
    {
      /* TODO: Teardown connection, reset if necessary, etc. */
        return status;
    }
    else if (APR_STATUS_IS_EAGAIN(status)
             || status == SERF_ERROR_WAIT_CONN)
    {
      /* Update pollset, etc. etc. */
        return APR_SUCCESS;
    }
    else
        return status;
}

static apr_status_t
http2_outgoing_write(serf_connection_t *conn)
{
    serf_http2_protocol_t *h2 = conn->protocol_baton;
    apr_status_t status;

    if (conn->unwritten_reqs
        && conn->nr_of_written_reqs < h2->lr_max_concurrent)
    {
        status = enqueue_http2_request(h2);
        if (status)
            return status;
    }

    status = serf_pump__write(h2->pump, true);

    if (!status)
        status = http2_write_data(h2);

    if (APR_STATUS_IS_EAGAIN(status))
        return APR_SUCCESS;
    else if (status)
        return status;

      /* Probably nothing to write. Connection will check new requests */
    serf_io__set_pollset_dirty(&conn->io);

    return APR_SUCCESS;
}

static apr_status_t
http2_outgoing_hangup(serf_connection_t *conn)
{
  /* serf_http2_protocol_t *ctx = conn->protocol_baton; */

    return APR_EGENERAL;
}

static void
http2_outgoing_pre_teardown(serf_connection_t *conn)
{
    serf_http2_protocol_t *h2 = conn->protocol_baton;
    serf_http2_stream_t *s = h2->first;

    while (s) {
        serf_http2__stream_pre_cleanup(s);
        s = s->next;
    }
}

static void
http2_outgoing_teardown(serf_connection_t *conn)
{
    serf_http2_protocol_t *ctx = conn->protocol_baton;

    apr_pool_destroy(ctx->pool);
    conn->protocol_baton = NULL;
}

static void http2_cancel_request(serf_request_t *rq, apr_status_t reason)
{
    serf_connection_t *conn = rq->conn;

    if (!conn || !conn->protocol_baton || !rq->protocol_baton)
        return;

    serf_http2__stream_cancel_request(rq->protocol_baton,
                                      rq, reason);
}

static void http2_prioritize_request(serf_request_t *rq, bool exclusive)
{
    serf_connection_t *conn = rq->conn;

    if (!conn || !conn->protocol_baton || !rq->protocol_baton)
        return;

    serf_http2__stream_prioritize_request(rq->protocol_baton,
                                          rq, exclusive);
}

static apr_status_t
http2_incoming_read(serf_incoming_t *client)
{
    apr_status_t status;
    serf_http2_protocol_t *h2 = client->protocol_baton;

    if (h2->prefix_left) {
        serf_bucket_t *stream;

        if (client->proto_peek_bkt)
            stream = client->proto_peek_bkt;
        else
            stream = client->pump.stream;

        do {
            const char *data;
            apr_size_t len;

            status = serf_bucket_read(stream, h2->prefix_left,
                                      &data, &len);

            if (!SERF_BUCKET_READ_ERROR(status)) {
                if (len && memcmp(data,
                                  HTTP2_CONNECTION_PREFIX - h2->prefix_left - 1
                                  + sizeof(HTTP2_CONNECTION_PREFIX),
                                  len) != 0)
                {
                    return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
                }
                h2->prefix_left -= len;
            }
        } while (status == APR_SUCCESS && h2->prefix_left);

        if (!h2->prefix_left && client->proto_peek_bkt) {
            /* Peek buffer is now empty. Use actual stream */
            serf_bucket_destroy(client->proto_peek_bkt);
            client->proto_peek_bkt = NULL;
        }

        if (APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN)
        {
            return APR_SUCCESS;
        }
        else if (status) {
            return status;
        }
    }

    status = http2_process(h2);

    if (!status)
        return APR_SUCCESS;
    else if (APR_STATUS_IS_EOF(status))
    {
        /* TODO: Teardown connection, reset if necessary, etc. */
        return status;
    }
    else if (APR_STATUS_IS_EAGAIN(status)
             || status == SERF_ERROR_WAIT_CONN)
    {
        /* Update pollset, etc. etc. */
        return APR_SUCCESS;
    }
    else
        return status;
}

static apr_status_t
http2_incoming_write(serf_incoming_t *client)
{
    serf_http2_protocol_t *h2 = client->protocol_baton;
    apr_status_t status;

    status = serf_pump__write(h2->pump, true);

    if (APR_STATUS_IS_EAGAIN(status))
        return APR_SUCCESS;
    else if (status)
        return status;

    status = http2_write_data(h2);
    if (status)
        return status;

    /* Probably nothing to write. Connection will check new requests */
    serf_io__set_pollset_dirty(&client->io);

    return APR_SUCCESS;
}

static apr_status_t
http2_incoming_hangup(serf_incoming_t *client)
{
    /* serf_http2_protocol_t *ctx = conn->protocol_baton; */

    return APR_EGENERAL;
}

static void
http2_incoming_teardown(serf_incoming_t *client)
{
    serf_http2_protocol_t *ctx = client->protocol_baton;

    apr_pool_destroy(ctx->pool);
    client->protocol_baton = NULL;
}

static void
http2_incoming_pre_teardown(serf_incoming_t *conn)
{
    serf_http2_protocol_t *h2 = conn->protocol_baton;
    serf_http2_stream_t *s = h2->first;

    while (s) {
        serf_http2__stream_pre_cleanup(s);
        s = s->next;
    }
}


void
serf_http2__allocate_stream_id(void *baton,
                               apr_int32_t *streamid)
{
    serf_http2_stream_t *stream = baton;

    SERF_H2_assert(streamid == &stream->streamid);

    /* Do we need to assign a new id?

       We do this when converting the frame to on-wire data, to avoid
       creating frames out of order... which would make the other side
       deny our frame.
    */
    if (stream->streamid < 0)
    {
        stream->streamid = stream->h2->lr_next_streamid;
        stream->h2->lr_next_streamid += 2;

        if (stream->status == H2S_INIT)
            stream->status = H2S_IDLE;
    }
}

static void
move_to_head(serf_http2_stream_t *stream)
{
  /* Not implemented yet */
}

serf_http2_stream_t *
serf_http2__stream_get(serf_http2_protocol_t *h2,
                       apr_int32_t streamid,
                       bool create_for_remote,
                       bool move_first)
{
    serf_http2_stream_t *stream;

    if (streamid < 0)
        return NULL;

    for (stream = h2->first; stream; stream = stream->next)
    {
        if (stream->streamid == streamid)
        {
            if (move_first && stream != h2->first)
                move_to_head(stream);

            return stream;
        }
    }

    if (create_for_remote
        && (streamid & 0x01) == (h2->rl_next_streamid & 0x01))
    {
        stream = serf_http2__stream_create(h2, streamid,
                                           h2->lr_default_window,
                                           h2->rl_default_window,
                                           h2->allocator);

        if (h2->first)
        {
            stream->next = h2->first;
            h2->first->prev = stream;
            h2->first = stream;
        }
        else
            h2->last = h2->first = stream;

        if (streamid < h2->rl_next_streamid)
        {
          /* https://tools.ietf.org/html/rfc7540#section-5.1.1
             The first use of a new stream identifier implicitly closes
             all streams in the "idle" state that might have been
             initiated by that peer with a lower-valued stream identifier.
          */
            stream->status = H2S_CLOSED;
        }
        else
            h2->rl_next_streamid = (streamid + 2);

        return stream;
    }
    return NULL;
}

apr_status_t
serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2,
                                 apr_int32_t streamid,
                                 apr_status_t reason)
{
    serf_bucket_t *bkt;
    apr_int32_t http_reason;

    if (reason >= SERF_ERROR_HTTP2_NO_ERROR
        && reason <= SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED)
    {
        http_reason = (reason - SERF_ERROR_HTTP2_NO_ERROR);
    }
    else
        http_reason = SERF_ERROR_HTTP2_INTERNAL_ERROR;

    bkt = serf_bucket_create_numberv(h2->allocator, "4", http_reason);

    return serf_http2__enqueue_frame(
        h2,
        serf__bucket_http2_frame_create(bkt,
                                        HTTP2_FRAME_TYPE_RST_STREAM,
                                        0, &streamid, NULL, NULL,
                                        h2->lr_max_framesize,
                                        h2->allocator),
        TRUE);
}

apr_status_t
serf_http2__setup_incoming_request(serf_incoming_request_t **in_request,
                                   serf_incoming_request_setup_t *req_setup,
                                   void **req_setup_baton,
                                   serf_http2_protocol_t *h2)
{
    if (!h2->client)
        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;

    *in_request = serf__incoming_request_create(h2->client);
    *req_setup = h2->client->req_setup;
    *req_setup_baton = h2->client->req_setup_baton;

    return APR_SUCCESS;
}

apr_size_t
serf_http2__max_payload_size(serf_http2_protocol_t *h2)
{
    return h2->lr_max_framesize;
}

apr_size_t serf_http2__alloc_window(serf_http2_protocol_t *h2,
                                    serf_http2_stream_t *stream,
                                    apr_size_t requested)
{
    if (requested > h2->lr_max_framesize)
        requested = h2->lr_max_framesize;
    if (requested > h2->lr_window)
        requested = h2->lr_window;
    if (requested > stream->lr_window)
        requested = stream->lr_window;

    if (requested) {
        h2->lr_window -= requested;
        stream->lr_window -= requested;
    }

    return requested;
}

void serf_http2__return_window(serf_http2_protocol_t *h2,
                               serf_http2_stream_t *stream,
                               apr_size_t returned)
{
    SERF_H2_assert(h2->lr_window + returned <= HTTP2_WINDOW_MAX_ALLOWED);
    SERF_H2_assert(stream->lr_window + returned <= HTTP2_WINDOW_MAX_ALLOWED);
    h2->lr_window += returned;
    stream->lr_window += returned;
}

void serf_http2__ensure_writable(serf_http2_stream_t *stream)
{
    serf_http2_protocol_t *h2 = stream->h2;
    SERF_H2_assert(stream->status == H2S_OPEN
                   || stream->status == H2S_HALFCLOSED_REMOTE);

    if (stream->next_writable || stream->prev_writable)
        return;

    stream->prev_writable = h2->last_writable;
    h2->last_writable = stream;
    if (stream->prev_writable)
        stream->prev_writable->next_writable = stream;
    else
        h2->first_writable = stream;
}
