| /* ==================================================================== |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * ==================================================================== |
| */ |
| |
| #include <apr_pools.h> |
| #include <apr_strings.h> |
| #include <apr_lib.h> |
| #include <apr_date.h> |
| |
| #include "serf.h" |
| #include "serf_bucket_util.h" |
| #include "serf_bucket_types.h" |
| |
| #include <stdlib.h> |
| |
| /* This is an implementation of Bidirectional Web Transfer Protocol (BWTP) |
| * See: |
| * http://bwtp.wikidot.com/ |
| * |
| * 2015-11: The BWTP protocol was one of the proposed implementations for |
| * Websockets, which were eventually standardized in RFC 6455 but |
| * using a completely different wire protocol. |
| */ |
| |
| typedef struct frame_context_t { |
| int channel; |
| int open; |
| int type; /* 0 = header, 1 = message */ /* TODO enum? */ |
| const char *phrase; |
| serf_bucket_t *headers; |
| |
| char req_line[1000]; |
| } frame_context_t; |
| |
| typedef struct incoming_context_t { |
| serf_bucket_t *stream; |
| serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ |
| serf_bucket_t *headers; /* holds parsed headers */ |
| |
| enum { |
| STATE_STATUS_LINE, /* reading status line */ |
| STATE_HEADERS, /* reading headers */ |
| STATE_BODY, /* reading body */ |
| STATE_DONE /* we've sent EOF */ |
| } state; |
| |
| /* Buffer for accumulating a line from the response. */ |
| serf_linebuf_t linebuf; |
| |
| int type; /* 0 = header, 1 = message */ /* TODO enum? */ |
| int channel; |
| char *phrase; |
| apr_size_t length; |
| } incoming_context_t; |
| |
| |
| serf_bucket_t *serf_bucket_bwtp_channel_close( |
| int channel, |
| serf_bucket_alloc_t *allocator) |
| { |
| frame_context_t *ctx; |
| |
| ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); |
| ctx->type = 0; |
| ctx->open = 0; |
| ctx->channel = channel; |
| ctx->phrase = "CLOSED"; |
| ctx->headers = serf_bucket_headers_create(allocator); |
| |
| return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); |
| } |
| |
| serf_bucket_t *serf_bucket_bwtp_channel_open( |
| int channel, |
| const char *uri, |
| serf_bucket_alloc_t *allocator) |
| { |
| frame_context_t *ctx; |
| |
| ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); |
| ctx->type = 0; |
| ctx->open = 1; |
| ctx->channel = channel; |
| ctx->phrase = uri; |
| ctx->headers = serf_bucket_headers_create(allocator); |
| |
| return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); |
| } |
| |
| serf_bucket_t *serf_bucket_bwtp_header_create( |
| int channel, |
| const char *phrase, |
| serf_bucket_alloc_t *allocator) |
| { |
| frame_context_t *ctx; |
| |
| ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); |
| ctx->type = 0; |
| ctx->open = 0; |
| ctx->channel = channel; |
| ctx->phrase = phrase; |
| ctx->headers = serf_bucket_headers_create(allocator); |
| |
| return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); |
| } |
| |
| serf_bucket_t *serf_bucket_bwtp_message_create( |
| int channel, |
| serf_bucket_t *body, |
| serf_bucket_alloc_t *allocator) |
| { |
| frame_context_t *ctx; |
| |
| ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); |
| ctx->type = 1; |
| ctx->open = 0; |
| ctx->channel = channel; |
| ctx->phrase = "MESSAGE"; |
| ctx->headers = serf_bucket_headers_create(allocator); |
| |
| return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); |
| } |
| |
| int serf_bucket_bwtp_frame_get_channel( |
| serf_bucket_t *bucket) |
| { |
| if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { |
| frame_context_t *ctx = bucket->data; |
| |
| return ctx->channel; |
| } |
| else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { |
| incoming_context_t *ctx = bucket->data; |
| |
| return ctx->channel; |
| } |
| |
| return -1; |
| } |
| |
| int serf_bucket_bwtp_frame_get_type( |
| serf_bucket_t *bucket) |
| { |
| if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { |
| frame_context_t *ctx = bucket->data; |
| |
| return ctx->type; |
| } |
| else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { |
| incoming_context_t *ctx = bucket->data; |
| |
| return ctx->type; |
| } |
| |
| return -1; |
| } |
| |
| const char *serf_bucket_bwtp_frame_get_phrase( |
| serf_bucket_t *bucket) |
| { |
| if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { |
| frame_context_t *ctx = bucket->data; |
| |
| return ctx->phrase; |
| } |
| else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { |
| incoming_context_t *ctx = bucket->data; |
| |
| return ctx->phrase; |
| } |
| |
| return NULL; |
| } |
| |
| serf_bucket_t *serf_bucket_bwtp_frame_get_headers( |
| serf_bucket_t *bucket) |
| { |
| if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { |
| frame_context_t *ctx = bucket->data; |
| |
| return ctx->headers; |
| } |
| else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { |
| incoming_context_t *ctx = bucket->data; |
| |
| return ctx->headers; |
| } |
| |
| return NULL; |
| } |
| |
| static int count_size(void *baton, const char *key, const char *value) |
| { |
| apr_size_t *c = baton; |
| /* TODO Deal with folding. Yikes. */ |
| |
| /* Add in ": " and CRLF - so an extra four bytes. */ |
| *c += strlen(key) + strlen(value) + 4; |
| |
| return 0; |
| } |
| |
| static apr_size_t calc_header_size(serf_bucket_t *hdrs) |
| { |
| apr_size_t size = 0; |
| |
| serf_bucket_headers_do(hdrs, count_size, &size); |
| |
| return size; |
| } |
| |
| static void serialize_data(serf_bucket_t *bucket) |
| { |
| frame_context_t *ctx = bucket->data; |
| serf_bucket_t *new_bucket; |
| apr_size_t req_len; |
| |
| /* Serialize the request-line and headers into one mother string, |
| * and wrap a bucket around it. |
| */ |
| req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line), |
| "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n", |
| (ctx->type ? "BWM" : "BWH"), |
| ctx->channel, |
| (apr_uint64_t)calc_header_size(ctx->headers), |
| (ctx->open ? "OPEN " : ""), |
| ctx->phrase); |
| new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len, |
| bucket->allocator); |
| |
| /* Build up the new bucket structure. |
| * |
| * Note that self needs to become an aggregate bucket so that a |
| * pointer to self still represents the "right" data. |
| */ |
| serf_bucket_aggregate_become(bucket); |
| |
| /* Insert the two buckets. */ |
| serf_bucket_aggregate_append(bucket, new_bucket); |
| serf_bucket_aggregate_append(bucket, ctx->headers); |
| |
| /* Our private context is no longer needed, and is not referred to by |
| * any existing bucket. Toss it. |
| */ |
| serf_bucket_mem_free(bucket->allocator, ctx); |
| } |
| |
| static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket, |
| apr_size_t requested, |
| const char **data, apr_size_t *len) |
| { |
| /* Seralize our private data into a new aggregate bucket. */ |
| serialize_data(bucket); |
| |
| /* Delegate to the "new" aggregate bucket to do the read. */ |
| return serf_bucket_read(bucket, requested, data, len); |
| } |
| |
| static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket, |
| int acceptable, int *found, |
| const char **data, apr_size_t *len) |
| { |
| /* Seralize our private data into a new aggregate bucket. */ |
| serialize_data(bucket); |
| |
| /* Delegate to the "new" aggregate bucket to do the readline. */ |
| return serf_bucket_readline(bucket, acceptable, found, data, len); |
| } |
| |
| static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket, |
| apr_size_t requested, |
| int vecs_size, |
| struct iovec *vecs, |
| int *vecs_used) |
| { |
| /* Seralize our private data into a new aggregate bucket. */ |
| serialize_data(bucket); |
| |
| /* Delegate to the "new" aggregate bucket to do the read. */ |
| return serf_bucket_read_iovec(bucket, requested, |
| vecs_size, vecs, vecs_used); |
| } |
| |
| static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket, |
| const char **data, |
| apr_size_t *len) |
| { |
| /* Seralize our private data into a new aggregate bucket. */ |
| serialize_data(bucket); |
| |
| /* Delegate to the "new" aggregate bucket to do the peek. */ |
| return serf_bucket_peek(bucket, data, len); |
| } |
| |
| const serf_bucket_type_t serf_bucket_type_bwtp_frame = { |
| "BWTP-FRAME", |
| serf_bwtp_frame_read, |
| serf_bwtp_frame_readline, |
| serf_bwtp_frame_read_iovec, |
| serf_default_read_for_sendfile, |
| serf_default_read_bucket, |
| serf_bwtp_frame_peek, |
| serf_default_destroy_and_data, |
| }; |
| |
| |
| serf_bucket_t *serf_bucket_bwtp_incoming_frame_create( |
| serf_bucket_t *stream, |
| serf_bucket_alloc_t *allocator) |
| { |
| incoming_context_t *ctx; |
| |
| ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); |
| ctx->stream = stream; |
| ctx->body = NULL; |
| ctx->headers = serf_bucket_headers_create(allocator); |
| ctx->state = STATE_STATUS_LINE; |
| ctx->length = 0; |
| ctx->channel = -1; |
| ctx->phrase = NULL; |
| |
| serf_linebuf_init(&ctx->linebuf); |
| |
| return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx); |
| } |
| |
| static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket) |
| { |
| incoming_context_t *ctx = bucket->data; |
| |
| if (ctx->state != STATE_STATUS_LINE && ctx->phrase) { |
| serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase); |
| } |
| |
| serf_bucket_destroy(ctx->stream); |
| if (ctx->body != NULL) |
| serf_bucket_destroy(ctx->body); |
| serf_bucket_destroy(ctx->headers); |
| |
| serf_default_destroy_and_data(bucket); |
| } |
| |
| static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable) |
| { |
| return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); |
| } |
| |
| static apr_status_t parse_status_line(incoming_context_t *ctx, |
| serf_bucket_alloc_t *allocator) |
| { |
| int res; |
| char *reason; /* ### stupid APR interface makes this non-const */ |
| |
| /* ctx->linebuf.line should be of form: BW* */ |
| res = apr_date_checkmask(ctx->linebuf.line, "BW*"); |
| if (!res) { |
| /* Not an BWTP response? Well, at least we won't understand it. */ |
| return APR_EGENERAL; |
| } |
| |
| if (ctx->linebuf.line[2] == 'H') { |
| ctx->type = 0; |
| } |
| else if (ctx->linebuf.line[2] == 'M') { |
| ctx->type = 1; |
| } |
| else { |
| ctx->type = -1; |
| } |
| |
| ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16); |
| |
| /* Skip leading spaces for the reason string. */ |
| if (apr_isspace(*reason)) { |
| reason++; |
| } |
| |
| ctx->length = apr_strtoi64(reason, &reason, 16); |
| |
| /* Skip leading spaces for the reason string. */ |
| if (reason - ctx->linebuf.line < ctx->linebuf.used) { |
| if (apr_isspace(*reason)) { |
| reason++; |
| } |
| |
| ctx->phrase = serf_bstrmemdup(allocator, reason, |
| ctx->linebuf.used |
| - (reason - ctx->linebuf.line)); |
| } else { |
| ctx->phrase = NULL; |
| } |
| |
| return APR_SUCCESS; |
| } |
| |
| /* This code should be replaced with header buckets. */ |
| static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx) |
| { |
| apr_status_t status; |
| |
| /* RFC 2616 says that CRLF is the only line ending, but we can easily |
| * accept any kind of line ending. |
| */ |
| status = fetch_line(ctx, SERF_NEWLINE_ANY); |
| if (SERF_BUCKET_READ_ERROR(status)) { |
| return status; |
| } |
| /* Something was read. Process it. */ |
| |
| if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { |
| const char *end_key; |
| const char *c; |
| |
| end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); |
| if (!c) { |
| /* Bad headers? */ |
| return APR_EGENERAL; |
| } |
| |
| /* Skip over initial : and spaces. */ |
| while (apr_isspace(*++c)) |
| continue; |
| |
| /* Always copy the headers (from the linebuf into new mem). */ |
| /* ### we should be able to optimize some mem copies */ |
| serf_bucket_headers_setx( |
| ctx->headers, |
| ctx->linebuf.line, end_key - ctx->linebuf.line, 1, |
| c, ctx->linebuf.line + ctx->linebuf.used - c, 1); |
| } |
| |
| return status; |
| } |
| |
| /* Perform one iteration of the state machine. |
| * |
| * Will return when one the following conditions occurred: |
| * 1) a state change |
| * 2) an error |
| * 3) the stream is not ready or at EOF |
| * 4) APR_SUCCESS, meaning the machine can be run again immediately |
| */ |
| static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx) |
| { |
| apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ |
| |
| switch (ctx->state) { |
| case STATE_STATUS_LINE: |
| /* RFC 2616 says that CRLF is the only line ending, but we can easily |
| * accept any kind of line ending. |
| */ |
| status = fetch_line(ctx, SERF_NEWLINE_ANY); |
| if (SERF_BUCKET_READ_ERROR(status)) |
| return status; |
| |
| if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { |
| /* The Status-Line is in the line buffer. Process it. */ |
| status = parse_status_line(ctx, bkt->allocator); |
| if (status) |
| return status; |
| |
| if (ctx->length) { |
| ctx->body = |
| serf_bucket_barrier_create(ctx->stream, bkt->allocator); |
| ctx->body = serf_bucket_limit_create(ctx->body, ctx->length, |
| bkt->allocator); |
| if (!ctx->type) { |
| ctx->state = STATE_HEADERS; |
| } else { |
| ctx->state = STATE_BODY; |
| } |
| } else { |
| ctx->state = STATE_DONE; |
| } |
| } |
| else { |
| /* The connection closed before we could get the next |
| * response. Treat the request as lost so that our upper |
| * end knows the server never tried to give us a response. |
| */ |
| if (APR_STATUS_IS_EOF(status)) { |
| return SERF_ERROR_REQUEST_LOST; |
| } |
| } |
| break; |
| case STATE_HEADERS: |
| status = fetch_headers(ctx->body, ctx); |
| if (SERF_BUCKET_READ_ERROR(status)) |
| return status; |
| |
| /* If an empty line was read, then we hit the end of the headers. |
| * Move on to the body. |
| */ |
| if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { |
| /* Advance the state. */ |
| ctx->state = STATE_DONE; |
| } |
| break; |
| case STATE_BODY: |
| /* Don't do anything. */ |
| break; |
| case STATE_DONE: |
| return APR_EOF; |
| default: |
| /* Not reachable */ |
| return APR_EGENERAL; |
| } |
| |
| return status; |
| } |
| |
| static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx) |
| { |
| apr_status_t status; |
| |
| /* Keep reading and moving through states if we aren't at the BODY */ |
| while (ctx->state != STATE_BODY) { |
| status = run_machine(bkt, ctx); |
| |
| /* Anything other than APR_SUCCESS means that we cannot immediately |
| * read again (for now). |
| */ |
| if (status) |
| return status; |
| } |
| /* in STATE_BODY */ |
| |
| return APR_SUCCESS; |
| } |
| |
| apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers( |
| serf_bucket_t *bucket) |
| { |
| incoming_context_t *ctx = bucket->data; |
| |
| return wait_for_body(bucket, ctx); |
| } |
| |
| static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket, |
| apr_size_t requested, |
| const char **data, apr_size_t *len) |
| { |
| incoming_context_t *ctx = bucket->data; |
| apr_status_t rv; |
| |
| rv = wait_for_body(bucket, ctx); |
| if (rv) { |
| /* It's not possible to have read anything yet! */ |
| *len = 0; |
| return rv; |
| } |
| |
| rv = serf_bucket_read(ctx->body, requested, data, len); |
| if (APR_STATUS_IS_EOF(rv)) { |
| ctx->state = STATE_DONE; |
| } |
| return rv; |
| } |
| |
| static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket, |
| int acceptable, int *found, |
| const char **data, apr_size_t *len) |
| { |
| incoming_context_t *ctx = bucket->data; |
| apr_status_t status; |
| |
| status = wait_for_body(bucket, ctx); |
| if (status) { |
| *found = SERF_NEWLINE_NONE; |
| *len = 0; |
| return status; |
| } |
| |
| /* Delegate to the stream bucket to do the readline. */ |
| return serf_bucket_readline(ctx->body, acceptable, found, data, len); |
| } |
| |
| const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = { |
| "BWTP-INCOMING", |
| bwtp_incoming_read, |
| bwtp_incoming_readline, |
| serf_default_read_iovec, |
| serf_default_read_for_sendfile, |
| serf_default_read_bucket, |
| serf_default_peek /* ### TODO */, |
| bwtp_incoming_destroy_and_data, |
| }; |