blob: 3cda8cd2dca39eac0651ffd4f0cec41413a1c87d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "messaging_adapter.hpp"
#include "proton/connection.hpp"
#include "proton/container.hpp"
#include "proton/delivery.hpp"
#include "proton/error.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/receiver.hpp"
#include "proton/receiver_options.hpp"
#include "proton/sender.hpp"
#include "proton/sender_options.hpp"
#include "proton/session.hpp"
#include "proton/tracker.hpp"
#include "proton/transport.hpp"
#include "contexts.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include <proton/connection.h>
#include <proton/delivery.h>
#include <proton/handlers.h>
#include <proton/link.h>
#include <proton/message.h>
#include <proton/session.h>
#include <proton/transport.h>
#include <assert.h>
namespace proton {
namespace {
// This must only be called for receiver links
void credit_topup(pn_link_t *link) {
assert(pn_link_is_receiver(link));
int window = link_context::get(link).credit_window;
if (window) {
int delta = window - pn_link_credit(link);
pn_link_flow(link, delta);
}
}
void on_link_flow(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
// TODO: process session flow data, if no link-specific data, just return.
if (!lnk) return;
int state = pn_link_state(lnk);
if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
link_context& lctx = link_context::get(lnk);
if (pn_link_is_sender(lnk)) {
if (pn_link_credit(lnk) > 0) {
sender s(make_wrapper<sender>(lnk));
bool draining = pn_link_get_drain(lnk);
if ( draining && !lctx.draining) {
handler.on_sender_drain_start(s);
}
lctx.draining = draining;
// create on_message extended event
handler.on_sendable(s);
}
} else {
// receiver
if (!pn_link_credit(lnk) && lctx.draining) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
}
credit_topup(lnk);
}
}
}
// Decode the message corresponding to a delivery from a link.
void message_decode(message& msg, proton::delivery delivery) {
std::vector<char> buf;
buf.resize(pn_delivery_pending(unwrap(delivery)));
if (buf.empty())
throw error("message decode: no delivery pending on link");
proton::receiver link = delivery.receiver();
assert(!buf.empty());
ssize_t n = pn_link_recv(unwrap(link), const_cast<char *>(&buf[0]), buf.size());
if (n != ssize_t(buf.size())) throw error(MSG("receiver read failure"));
msg.clear();
msg.decode(buf);
pn_link_advance(unwrap(link));
}
void on_delivery(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
link_context& lctx = link_context::get(lnk);
if (pn_link_is_receiver(lnk)) {
delivery d(make_wrapper<delivery>(dlv));
if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
// generate on_message
pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
connection_context& ctx = connection_context::get(pnc);
// Reusable per-connection message.
// Avoid expensive heap malloc/free overhead.
// See PROTON-998
class message &msg(ctx.event_message);
message_decode(msg, d);
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
if (lctx.auto_accept)
d.release();
} else {
handler.on_message(d, msg);
if (lctx.auto_accept && pn_delivery_local_state(dlv) == 0) // Not set by handler
d.accept();
if (lctx.draining && !pn_link_credit(lnk)) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
}
}
}
else if (pn_delivery_updated(dlv) && d.settled()) {
handler.on_delivery_settle(d);
}
if (lctx.draining && pn_link_credit(lnk) == 0) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
if (lctx.pending_credit) {
pn_link_flow(lnk, lctx.pending_credit);
lctx.pending_credit = 0;
}
}
credit_topup(lnk);
} else {
// sender
if (pn_delivery_updated(dlv)) {
tracker t(make_wrapper<tracker>(dlv));
switch(pn_delivery_remote_state(dlv)) {
case PN_ACCEPTED:
handler.on_tracker_accept(t);
break;
case PN_REJECTED:
handler.on_tracker_reject(t);
break;
case PN_RELEASED:
case PN_MODIFIED:
handler.on_tracker_release(t);
break;
}
if (t.settled()) {
handler.on_tracker_settle(t);
if (lctx.auto_settle)
t.settle();
}
}
}
}
bool is_remote_uninitialized(pn_state_t state) {
return state & PN_REMOTE_UNINIT;
}
void on_link_remote_detach(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_detach(r);
} else {
sender s(make_wrapper<sender>(lnk));
handler.on_sender_detach(s);
}
pn_link_detach(lnk);
}
void on_link_remote_close(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
handler.on_receiver_error(r);
}
handler.on_receiver_close(r);
} else {
sender s(make_wrapper<sender>(lnk));
if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
handler.on_sender_error(s);
}
handler.on_sender_close(s);
}
pn_link_close(lnk);
}
void on_session_remote_close(messaging_handler& handler, pn_event_t* event) {
pn_session_t *session = pn_event_session(event);
class session s(make_wrapper(session));
if (pn_condition_is_set(pn_session_remote_condition(session))) {
handler.on_session_error(s);
}
handler.on_session_close(s);
pn_session_close(session);
}
void on_connection_remote_close(messaging_handler& handler, pn_event_t* event) {
pn_connection_t *conn = pn_event_connection(event);
connection c(make_wrapper(conn));
if (pn_condition_is_set(pn_connection_remote_condition(conn))) {
handler.on_connection_error(c);
}
handler.on_connection_close(c);
pn_connection_close(conn);
}
void on_connection_bound(messaging_handler& handler, pn_event_t* event) {
connection c(make_wrapper(pn_event_connection(event)));
}
void on_connection_remote_open(messaging_handler& handler, pn_event_t* event) {
// Generate on_transport_open event here until we find a better place
transport t(make_wrapper(pn_event_transport(event)));
handler.on_transport_open(t);
pn_connection_t *conn = pn_event_connection(event);
connection c(make_wrapper(conn));
handler.on_connection_open(c);
}
void on_session_remote_open(messaging_handler& handler, pn_event_t* event) {
pn_session_t *session = pn_event_session(event);
class session s(make_wrapper(session));
handler.on_session_open(s);
}
void on_link_local_open(messaging_handler& handler, pn_event_t* event) {
pn_link_t* lnk = pn_event_link(event);
if ( pn_link_is_receiver(lnk) ) {
credit_topup(lnk);
// We know local is active so don't check for it
} else if ( pn_link_state(lnk)&PN_REMOTE_ACTIVE && pn_link_credit(lnk) > 0) {
sender s(make_wrapper<sender>(lnk));
handler.on_sendable(s);
}
}
void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link
// Copy source and target from remote end.
pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk));
pn_terminus_copy(pn_link_target(lnk), pn_link_remote_target(lnk));
}
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_open(r);
credit_topup(lnk);
} else {
sender s(make_wrapper<sender>(lnk));
handler.on_sender_open(s);
}
}
void on_transport_closed(messaging_handler& handler, pn_event_t* event) {
pn_transport_t *tspt = pn_event_transport(event);
transport t(make_wrapper(tspt));
// If the connection isn't open generate on_transport_open event
// because we didn't generate it yet and the events won't match.
pn_connection_t *conn = pn_event_connection(event);
if (!conn || is_remote_uninitialized(pn_connection_state(conn))) {
handler.on_transport_open(t);
}
if (pn_condition_is_set(pn_transport_condition(tspt))) {
handler.on_transport_error(t);
}
handler.on_transport_close(t);
}
void on_connection_wake(messaging_handler& handler, pn_event_t* event) {
connection c(make_wrapper(pn_event_connection(event)));
handler.on_connection_wake(c);
}
}
void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
{
pn_event_type_t type = pn_event_type(event);
// Only handle events we are interested in
switch(type) {
case PN_CONNECTION_BOUND: on_connection_bound(handler, event); break;
case PN_CONNECTION_REMOTE_OPEN: on_connection_remote_open(handler, event); break;
case PN_CONNECTION_REMOTE_CLOSE: on_connection_remote_close(handler, event); break;
case PN_SESSION_REMOTE_OPEN: on_session_remote_open(handler, event); break;
case PN_SESSION_REMOTE_CLOSE: on_session_remote_close(handler, event); break;
case PN_LINK_LOCAL_OPEN: on_link_local_open(handler, event); break;
case PN_LINK_REMOTE_OPEN: on_link_remote_open(handler, event); break;
case PN_LINK_REMOTE_CLOSE: on_link_remote_close(handler, event); break;
case PN_LINK_REMOTE_DETACH: on_link_remote_detach(handler, event); break;
case PN_LINK_FLOW: on_link_flow(handler, event); break;
case PN_DELIVERY: on_delivery(handler, event); break;
case PN_TRANSPORT_CLOSED: on_transport_closed(handler, event); break;
case PN_CONNECTION_WAKE: on_connection_wake(handler, event); break;
// Ignore everything else
default: break;
}
}
}