/* ====================================================================
 *    Licensed to the Apache Software Foundation (ASF) under one
 *    or more contributor license agreements.  See the NOTICE file
 *    distributed with this work for additional information
 *    regarding copyright ownership.  The ASF licenses this file
 *    to you under the Apache License, Version 2.0 (the
 *    "License"); you may not use this file except in compliance
 *    with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing,
 *    software distributed under the License is distributed on an
 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *    KIND, either express or implied.  See the License for the
 *    specific language governing permissions and limitations
 *    under the License.
 * ====================================================================
 */

#include <stdlib.h>

#include <apr_pools.h>
#include <apr_strings.h>
#include <apr_date.h>

#include "serf.h"
#include "serf_bucket_util.h"
#include "serf_private.h"

#include "protocols/fcgi_buckets.h"
#include "protocols/fcgi_protocol.h"

/* Fully opaque variant of serf_fcgi_stream_t */
struct serf_fcgi_stream_data_t
{
    serf_fcgi_stream_t stream_data;

    serf_bucket_t *req_agg;
    bool headers_eof;
    bool stdin_eof;

    serf_request_t *request;
    serf_incoming_request_t *in_request;
};

serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
                                              apr_uint16_t streamid,
                                              serf_bucket_alloc_t *alloc)
{
    serf_fcgi_stream_t *stream;
    serf_fcgi_stream_data_t *data = serf_bucket_mem_calloc(alloc,
                                                           sizeof(*data));

    stream = &data->stream_data;
    stream->data = data;

    stream->fcgi = fcgi;
    stream->alloc = alloc;
    stream->streamid = streamid;

    stream->next = stream->prev = NULL;

    return stream;
}

void serf_fcgi__stream_destroy(serf_fcgi_stream_t * stream)
{
    if (stream->data->in_request)
        serf__incoming_request_destroy(stream->data->in_request);


    /* Destroy stream and stream->data */
    serf_bucket_mem_free(stream->alloc, stream);
}

/* Aggregate hold open callback for what requests will think is the
   actual body */
static apr_status_t stream_agg_eof(void *baton,
                                   serf_bucket_t *bucket)
{
    serf_fcgi_stream_t *stream = baton;

    if (!stream->data->stdin_eof)
        return APR_EAGAIN;

    return APR_EOF;
}

static apr_status_t close_stream(void *baton,
                                 apr_uint64_t bytes_read)
{
    serf_fcgi_stream_t *stream = baton;

    serf_fcgi__close_stream(stream->fcgi, stream);

    return APR_SUCCESS;
}

static apr_status_t
fcgi_stream_enqueue_response(serf_incoming_request_t *request,
                             void *enqueue_baton,
                             serf_bucket_t *response_bkt)
{
    serf_fcgi_stream_t *stream = enqueue_baton;
    serf_bucket_alloc_t *alloc = response_bkt->allocator;
    serf_linebuf_t *linebuf;
    serf_bucket_t *agg;
    serf_bucket_t *tmp;
    apr_status_t status;

    /* With FCGI we don't send the usual first line of the response.
       We just send a "Status: 200" instead and the actual http
       server will handle the rest */
    agg = serf_bucket_aggregate_create(alloc);

    /* Too big for the stack :( */
    linebuf = serf_bucket_mem_alloc(alloc, sizeof(*linebuf));
    serf_linebuf_init(linebuf);

    do
    {
        status = serf_linebuf_fetch(linebuf, response_bkt, SERF_NEWLINE_ANY);
    } while (status == APR_SUCCESS && linebuf->state != SERF_LINEBUF_READY);

    if (status
        || linebuf->state != SERF_LINEBUF_READY
        || !apr_date_checkmask(linebuf->line, "HTTP/#.# ###*"))
    {
        /* We can't write a response in this state yet :( */
        serf_bucket_mem_free(alloc, linebuf);
        return status;
    }

    tmp = SERF_BUCKET_SIMPLE_STRING("Status: ", alloc);
    serf_bucket_aggregate_append(agg, tmp);

    /* Skip "HTTP/1.1 " but send status and reason */
    tmp = serf_bucket_simple_copy_create(linebuf->line + 9, linebuf->used - 9,
                                         alloc);
    serf_bucket_aggregate_append(agg, tmp);
    serf_bucket_mem_free(alloc, linebuf);

    tmp = SERF_BUCKET_SIMPLE_STRING("\r\n", alloc);
    serf_bucket_aggregate_append(agg, tmp);

    serf_bucket_aggregate_append(agg, response_bkt);

    /* Send response over STDOUT, closing stdout when done */
    status = serf_fcgi__enqueue_frame(
        stream->fcgi,
        serf__bucket_fcgi_frame_create(agg, stream->streamid,
                                       FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT),
                                       true, false,
                                       alloc), false);
    if (status)
        return status;

    /* As we don't use STDERR we don't have to close it either */

    /* Send end of request: FCGI_REQUEST_COMPLETE, exit code 0 */
    tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\0\0\0\0\0\0\0\0", 8, alloc);
    tmp = serf__bucket_fcgi_frame_create(tmp, stream->streamid,
                                         FCGI_FRAMETYPE(FCGI_V1, FCGI_END_REQUEST),
                                         false, false,
                                         alloc);

    tmp = serf__bucket_event_create(tmp, stream, NULL, NULL,
                                    close_stream, alloc);

    status = serf_fcgi__enqueue_frame(stream->fcgi, tmp, true);
    return status;
}

static apr_status_t
stream_setup_request(serf_fcgi_stream_t *stream,
                     serf_config_t *config)
{
    serf_bucket_t *agg;
    apr_status_t status;

    agg = serf_bucket_aggregate_create(stream->alloc);
    serf_bucket_aggregate_hold_open(agg, stream_agg_eof, stream);

    serf_bucket_set_config(agg, config);
    stream->data->req_agg = agg;

    if (stream->data->request) {
        serf_request_t *request = stream->data->request;

        if (!request->resp_bkt) {
            apr_pool_t *scratch_pool = request->respool; /* ### Pass scratch pool */

            request->resp_bkt = request->acceptor(request, agg,
                                                  request->acceptor_baton,
                                                  scratch_pool);
        }
    }
    else {
        serf_incoming_request_t *in_request = stream->data->in_request;

        if (!in_request) {
            serf_incoming_request_setup_t req_setup;
            void *req_setup_baton;

            status = serf_fcgi__setup_incoming_request(&in_request, &req_setup,
                                                       &req_setup_baton,
                                                       stream->fcgi);

            if (status)
                return status;

            stream->data->in_request = in_request;

            status = req_setup(&in_request->req_bkt, agg,
                               in_request, req_setup_baton,
                               &in_request->handler,
                               &in_request->handler_baton,
                               &in_request->response_setup,
                               &in_request->response_setup_baton,
                               in_request->pool);

            if (status)
                return status;

            in_request->enqueue_response = fcgi_stream_enqueue_response;
            in_request->enqueue_baton = stream;
        }
    }

    return APR_SUCCESS;
}

serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream,
                                                serf_bucket_t *body,
                                                serf_config_t *config,
                                                serf_bucket_alloc_t *alloc)
{
    apr_size_t remaining;
    if (!stream->data->req_agg) {
        stream_setup_request(stream, config);
    }

    remaining = (apr_size_t)serf_bucket_get_remaining(body);

    if (remaining) {
        body = serf__bucket_fcgi_params_decode_create(body, body->allocator);
    }
    else {
        stream->data->headers_eof = true;
    }

    /* And add it to our aggregate in both cases */
    serf_bucket_aggregate_append(stream->data->req_agg, body);

    return NULL;
}

serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream,
                                               serf_bucket_t *body,
                                               serf_config_t *config,
                                               serf_bucket_alloc_t *alloc)
{
    apr_size_t remaining;
    SERF_FCGI_assert(stream->data->headers_eof);
    if (!stream->data->req_agg) {
        stream_setup_request(stream, config);
    }

    remaining = (apr_size_t)serf_bucket_get_remaining(body);

    if (!remaining) {
        stream->data->stdin_eof = true;
    }

    /* And add it to our aggregate in both cases */
    serf_bucket_aggregate_append(stream->data->req_agg, body);

    return NULL;
}


apr_status_t serf_fcgi__stream_processor(void *baton,
                                         serf_fcgi_protocol_t *fcgi,
                                         serf_bucket_t *body)
{
    serf_fcgi_stream_t *stream = baton;
    apr_status_t status = APR_SUCCESS;

    SERF_FCGI_assert(stream->data->req_agg != NULL);

    if (stream->data->request) {
        /* ### TODO */
    }
    else if (stream->data->in_request) {
        serf_incoming_request_t *request = stream->data->in_request;

        SERF_FCGI_assert(request->req_bkt != NULL);

        status = request->handler(request, request->req_bkt,
                                  request->handler_baton,
                                  request->pool);

        if (!APR_STATUS_IS_EOF(status)
            && !SERF_BUCKET_READ_ERROR(status))
            return status;

        if (APR_STATUS_IS_EOF(status)) {
            status = serf_incoming_response_create(request);

            if (status)
                return status;
        }

        if (SERF_BUCKET_READ_ERROR(status)) {

            /* SEND ERROR status */

            return status;
        }
    }

    while (!status)
    {
        struct iovec vecs[SERF__STD_IOV_COUNT];
        int vecs_used;

        /* Drain the bucket as efficiently as possible */
        status = serf_bucket_read_iovec(stream->data->req_agg,
                                        SERF_READ_ALL_AVAIL,
                                        COUNT_OF(vecs), vecs, &vecs_used);

        if (vecs_used) {
            /* We have data... What should we do with it? */
            /*int i;

            for (i = 0; i < vecs_used; i++)
                fprintf(stderr, "%.*s", vecs[i].iov_len, vecs[i].iov_base);*/
        }
    }

    if (APR_STATUS_IS_EOF(status))
    {
        /* If there was a request, it is already gone, so we can now safely
        destroy our aggregate which may include everything upto the http2
        frames */
        serf_bucket_destroy(stream->data->req_agg);
        stream->data->req_agg = NULL;
    }

    return status;
}

