blob: 0bd3f7b3cdbdf8936a8f976668a951f740f959f1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include "dispatcher.h"
#include "framing/framing.h"
#include "protocol.h"
#include "engine/engine-internal.h"
#include "dispatch_actions.h"
int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) {
pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative", frame_type);
return PN_ERR;
int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) {
pn_transport_logf(transport, "Error dispatching frame: Unknown frame type: %d", frame_type);
return PN_ERR;
// We could use a table based approach here if we needed to dynamically
// add new performatives
static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
pn_action_t *action;
switch (frame_type) {
/* Regular AMQP fames */
switch (lcode) {
case OPEN: action = pn_do_open; break;
case BEGIN: action = pn_do_begin; break;
case ATTACH: action = pn_do_attach; break;
case FLOW: action = pn_do_flow; break;
case TRANSFER: action = pn_do_transfer; break;
case DISPOSITION: action = pn_do_disposition; break;
case DETACH: action = pn_do_detach; break;
case END: action = pn_do_end; break;
case CLOSE: action = pn_do_close; break;
default: action = pni_bad_frame; break;
/* SASL frames */
switch (lcode) {
case SASL_MECHANISMS: action = pn_do_mechanisms; break;
case SASL_INIT: action = pn_do_init; break;
case SASL_CHALLENGE: action = pn_do_challenge; break;
case SASL_RESPONSE: action = pn_do_response; break;
case SASL_OUTCOME: action = pn_do_outcome; break;
default: action = pni_bad_frame; break;
default: action = pni_bad_frame_type; break;
return action(transport, frame_type, channel, args, payload);
static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame)
if (frame.size == 0) { // ignore null frames
if (transport->trace & PN_TRACE_FRM)
pn_transport_logf(transport, "%u <- (EMPTY FRAME)",;
return 0;
ssize_t dsize = pn_data_decode(args, frame.payload, frame.size);
if (dsize < 0) {
"Error decoding frame: %s %s\n", pn_code(dsize),
pn_quote(transport->scratch, frame.payload, frame.size);
pn_transport_log(transport, pn_string_get(transport->scratch));
return dsize;
uint8_t frame_type = frame.type;
uint16_t channel =;
// XXX: assuming numeric -
// if we get a symbol we should map it to the numeric value and dispatch on that
uint64_t lcode;
bool scanned;
int e = pn_data_scan(args, "D?L.", &scanned, &lcode);
if (e) {
pn_transport_log(transport, "Scan error");
return e;
if (!scanned) {
pn_transport_log(transport, "Error dispatching frame");
return PN_ERR;
size_t payload_size = frame.size - dsize;
const char *payload_mem = payload_size ? frame.payload + dsize : NULL;
pn_bytes_t payload = {payload_size, payload_mem};
pn_do_trace(transport, channel, IN, args, payload_mem, payload_size);
int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload);
return err;
ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t available, bool batch, bool *halt)
size_t read = 0;
while (available && !*halt) {
pn_frame_t frame;
ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame);
if (n > 0) {
read += n;
available -= n;
transport->input_frames_ct += 1;
int e = pni_dispatch_frame(transport, transport->args, frame);
if (e) return e;
} else if (n < 0) {
pn_do_error(transport, "amqp:connection:framing-error", "malformed frame");
return n;
} else {
if (!batch) break;
return read;
ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size)
int n = transport->available < size ? transport->available : size;
memmove(bytes, transport->output, n);
memmove(transport->output, transport->output + n, transport->available - n);
transport->available -= n;
// XXX: need to check for errors
return n;