blob: 6b4a559efe857791c42c1568401778cc295362d5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <proton/framing.h>
#include <proton/engine.h>
#include <proton/buffer.h>
#include "dispatcher.h"
#include "protocol.h"
#include "../util.h"
pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context)
{
pn_dispatcher_t *disp = calloc(sizeof(pn_dispatcher_t), 1);
disp->frame_type = frame_type;
disp->context = context;
disp->trace = PN_TRACE_OFF;
disp->input = pn_buffer(1024);
disp->channel = 0;
disp->code = 0;
disp->args = pn_data(16);
disp->payload = NULL;
disp->size = 0;
disp->output_args = pn_data(16);
disp->frame = pn_buffer( 4*1024 );
// XXX
disp->capacity = 4*1024;
disp->output = malloc(disp->capacity);
disp->available = 0;
disp->halt = false;
disp->batch = true;
return disp;
}
void pn_dispatcher_free(pn_dispatcher_t *disp)
{
if (disp) {
pn_buffer_free(disp->input);
pn_data_free(disp->args);
pn_data_free(disp->output_args);
free(disp->output);
free(disp);
}
}
void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code, const char *name,
pn_action_t *action)
{
disp->actions[code] = action;
disp->names[code] = name;
}
typedef enum {IN, OUT} pn_dir_t;
static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
pn_data_t *args, const char *payload, size_t size)
{
if (disp->trace & PN_TRACE_FRM) {
uint64_t code64;
bool scanned;
pn_data_scan(args, "D?L.", &scanned, &code64);
uint8_t code = scanned ? code64 : 0;
size_t n = SCRATCH;
pn_data_format(args, disp->scratch, &n);
pn_dispatcher_trace(disp, ch, "%s %s %s", dir == OUT ? "->" : "<-",
disp->names[code], disp->scratch);
if (size) {
char buf[1024];
int e = pn_quote_data(buf, 1024, payload, size);
fprintf(stderr, " (%zu) \"%s\"%s\n", size, buf,
e == PN_OVERFLOW ? "... (truncated)" : "");
} else {
fprintf(stderr, "\n");
}
}
}
void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...)
{
va_list ap;
fprintf(stderr, "[%p:%u] ", (void *) disp, ch);
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
}
int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
{
ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
if (dsize < 0) {
fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
pn_data_error(disp->args));
pn_fprint_data(stderr, frame.payload, frame.size);
fprintf(stderr, "\n");
return dsize;
}
disp->channel = frame.channel;
// XXX: assuming numeric
uint64_t lcode;
bool scanned;
int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
if (e) {
fprintf(stderr, "Scan error\n");
return e;
}
if (!scanned) {
fprintf(stderr, "Error dispatching frame\n");
return PN_ERR;
}
uint8_t code = lcode;
disp->code = code;
disp->size = frame.size - dsize;
if (disp->size)
disp->payload = frame.payload + dsize;
pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size);
pn_action_t *action = disp->actions[code];
int err = action(disp);
disp->channel = 0;
disp->code = 0;
pn_data_clear(disp->args);
disp->size = 0;
disp->payload = NULL;
return err;
}
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available)
{
size_t leftover = pn_buffer_size(disp->input);
if (leftover) {
int e = pn_buffer_append(disp->input, bytes, available);
if (e) return e;
pn_bytes_t b = pn_buffer_bytes(disp->input);
bytes = b.start;
available = b.size;
}
size_t read = 0;
while (!disp->halt) {
pn_frame_t frame;
size_t n = pn_read_frame(&frame, bytes + read, available - read);
if (n) {
int e = pn_dispatch_frame(disp, frame);
if (e) return e;
read += n;
} else {
if (leftover) {
pn_buffer_trim(disp->input, read, 0);
} else {
int e = pn_buffer_append(disp->input, bytes + read, available - read);
if (e) return e;
}
read = available;
break;
}
if (!disp->batch) break;
}
return read - leftover;
}
int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
int err = pn_data_vscan(disp->args, fmt, ap);
va_end(ap);
if (err) printf("scan error: %s\n", fmt);
return err;
}
void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size)
{
disp->output_payload = data;
disp->output_size = size;
}
int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
pn_data_clear(disp->output_args);
int err = pn_data_vfill(disp->output_args, fmt, ap);
va_end(ap);
if (err) {
fprintf(stderr, "error posting frame: %s, %s: %s\n", fmt, pn_code(err), pn_data_error(disp->output_args));
return PN_ERR;
}
pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, disp->output_size);
encode_performatives:
pn_buffer_clear( disp->frame );
pn_bytes_t buf = pn_buffer_bytes( disp->frame );
buf.size = pn_buffer_available( disp->frame );
ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size );
if (wr < 0) {
if (wr == PN_OVERFLOW) {
pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 );
goto encode_performatives;
}
fprintf(stderr, "error posting frame: %s", pn_code(wr));
return PN_ERR;
}
pn_frame_t frame = {disp->frame_type};
frame.channel = ch;
frame.payload = buf.start;
frame.size = wr;
size_t n;
while (!(n = pn_write_frame(disp->output + disp->available,
disp->capacity - disp->available, frame))) {
disp->capacity *= 2;
disp->output = realloc(disp->output, disp->capacity);
}
if (disp->trace & PN_TRACE_RAW) {
fprintf(stderr, "RAW: \"");
pn_fprint_data(stderr, disp->output + disp->available, n);
fprintf(stderr, "\"\n");
}
disp->available += n;
return 0;
}
ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size)
{
int n = disp->available < size ? disp->available : size;
memmove(bytes, disp->output, n);
memmove(disp->output, disp->output + n, disp->available - n);
disp->available -= n;
// XXX: need to check for errors
return n;
}
int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch,
uint32_t handle,
pn_sequence_t id,
const pn_bytes_t *tag,
uint32_t message_format,
bool settled,
bool more)
{
bool more_flag = more;
// create preformatives, assuming 'more' flag need not change
compute_performatives:
pn_data_clear(disp->output_args);
int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
handle, id, tag->size, tag->start,
message_format,
settled, more_flag);
if (err) {
fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err), pn_data_error(disp->output_args));
return PN_ERR;
}
do { // send as many frames as possible without changing the 'more' flag...
encode_performatives:
pn_buffer_clear( disp->frame );
pn_bytes_t buf = pn_buffer_bytes( disp->frame );
buf.size = pn_buffer_available( disp->frame );
ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size);
if (wr < 0) {
if (wr == PN_OVERFLOW) {
pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 );
goto encode_performatives;
}
fprintf(stderr, "error posting frame: %s", pn_code(wr));
return PN_ERR;
}
buf.size = wr;
// check if we need to break up the outbound frame
size_t available = disp->output_size;
if (disp->remote_max_frame) {
if ((available + buf.size) > disp->remote_max_frame - 8) {
available = disp->remote_max_frame - 8 - buf.size;
if (more_flag == false) {
more_flag = true;
goto compute_performatives; // deal with flag change
}
} else if (more_flag == true && more == false) {
// caller has no more, and this is the last frame
more_flag = false;
goto compute_performatives;
}
}
if (pn_buffer_available( disp->frame ) < (available + buf.size)) {
// not enough room for payload - try again...
pn_buffer_ensure( disp->frame, available + buf.size );
goto encode_performatives;
}
pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, disp->output_size);
memmove( buf.start + buf.size, disp->output_payload, available);
disp->output_payload += available;
disp->output_size -= available;
buf.size += available;
pn_frame_t frame = {disp->frame_type};
frame.channel = ch;
frame.payload = buf.start;
frame.size = buf.size;
size_t n;
while (!(n = pn_write_frame(disp->output + disp->available,
disp->capacity - disp->available, frame))) {
disp->capacity *= 2;
disp->output = realloc(disp->output, disp->capacity);
}
if (disp->trace & PN_TRACE_RAW) {
fprintf(stderr, "RAW: \"");
pn_fprint_data(stderr, disp->output + disp->available, n);
fprintf(stderr, "\"\n");
}
disp->available += n;
} while (disp->output_size > 0);
disp->output_payload = NULL;
return 0;
}