blob: 17c84cdaffdb84104486f49d6aa8bc486071e076 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "messaging_adapter.hpp"
#include "proton/delivery.hpp"
#include "proton/error.hpp"
#include "proton/receiver_options.hpp"
#include "proton/sender.hpp"
#include "proton/sender_options.hpp"
#include "proton/tracker.hpp"
#include "proton/transport.hpp"
#include "contexts.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include "proton_event.hpp"
#include <proton/connection.h>
#include <proton/delivery.h>
#include <proton/handlers.h>
#include <proton/link.h>
#include <proton/message.h>
#include <proton/session.h>
#include <proton/transport.h>
namespace proton {
namespace {
void credit_topup(pn_link_t *link) {
if (link && pn_link_is_receiver(link)) {
int window = link_context::get(link).credit_window;
if (window) {
int delta = window - pn_link_credit(link);
pn_link_flow(link, delta);
}
}
}
}
void messaging_adapter::on_reactor_init(proton_event &pe) {
delegate_.on_container_start(pe.container());
}
void messaging_adapter::on_reactor_final(proton_event &pe) {
delegate_.on_container_stop(pe.container());
}
void messaging_adapter::on_link_flow(proton_event &pe) {
pn_event_t *pne = pe.pn_event();
pn_link_t *lnk = pn_event_link(pne);
// TODO: process session flow data, if no link-specific data, just return.
if (!lnk) return;
link_context& lctx = link_context::get(lnk);
int state = pn_link_state(lnk);
if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
if (pn_link_is_sender(lnk)) {
if (pn_link_credit(lnk) > 0) {
sender s(make_wrapper<sender>(lnk));
if (pn_link_get_drain(lnk)) {
if (!lctx.draining) {
lctx.draining = true;
delegate_.on_sender_drain_start(s);
}
}
else {
lctx.draining = false;
}
// create on_message extended event
delegate_.on_sendable(s);
}
}
else {
// receiver
if (!pn_link_credit(lnk) && lctx.draining) {
lctx.draining = false;
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_drain_finish(r);
}
}
}
credit_topup(lnk);
}
void messaging_adapter::on_delivery(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
pn_delivery_t *dlv = pn_event_delivery(cevent);
link_context& lctx = link_context::get(lnk);
if (pn_link_is_receiver(lnk)) {
delivery d(make_wrapper<delivery>(dlv));
if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
// generate on_message
pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
connection_context& ctx = connection_context::get(pnc);
// Reusable per-connection message.
// Avoid expensive heap malloc/free overhead.
// See PROTON-998
class message &msg(ctx.event_message);
msg.decode(d);
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
if (lctx.auto_accept)
d.release();
} else {
delegate_.on_message(d, msg);
if (lctx.auto_accept && !d.settled())
d.accept();
if (lctx.draining && !pn_link_credit(lnk)) {
lctx.draining = false;
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_drain_finish(r);
}
}
}
else if (pn_delivery_updated(dlv) && d.settled()) {
delegate_.on_delivery_settle(d);
}
if (lctx.draining && pn_link_credit(lnk) == 0) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_drain_finish(r);
if (lctx.pending_credit) {
pn_link_flow(lnk, lctx.pending_credit);
lctx.pending_credit = 0;
}
}
credit_topup(lnk);
} else {
tracker t(make_wrapper<tracker>(dlv));
// sender
if (pn_delivery_updated(dlv)) {
uint64_t rstate = pn_delivery_remote_state(dlv);
if (rstate == PN_ACCEPTED) {
delegate_.on_tracker_accept(t);
}
else if (rstate == PN_REJECTED) {
delegate_.on_tracker_reject(t);
}
else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) {
delegate_.on_tracker_release(t);
}
if (t.settled()) {
delegate_.on_tracker_settle(t);
}
if (lctx.auto_settle)
t.settle();
}
}
}
namespace {
bool is_local_open(pn_state_t state) {
return state & PN_LOCAL_ACTIVE;
}
bool is_local_unititialised(pn_state_t state) {
return state & PN_LOCAL_UNINIT;
}
bool is_remote_unititialised(pn_state_t state) {
return state & PN_REMOTE_UNINIT;
}
} // namespace
void messaging_adapter::on_link_remote_detach(proton_event & pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_detach(r);
} else {
sender s(make_wrapper<sender>(lnk));
delegate_.on_sender_detach(s);
}
pn_link_detach(lnk);
}
void messaging_adapter::on_link_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
delegate_.on_receiver_error(r);
}
delegate_.on_receiver_close(r);
} else {
sender s(make_wrapper<sender>(lnk));
if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
delegate_.on_sender_error(s);
}
delegate_.on_sender_close(s);
}
pn_link_close(lnk);
}
void messaging_adapter::on_session_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_session_t *session = pn_event_session(cevent);
class session s(make_wrapper(session));
if (pn_condition_is_set(pn_session_remote_condition(session))) {
delegate_.on_session_error(s);
}
delegate_.on_session_close(s);
pn_session_close(session);
}
void messaging_adapter::on_connection_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_connection_t *conn = pn_event_connection(cevent);
connection c(make_wrapper(conn));
if (pn_condition_is_set(pn_connection_remote_condition(conn))) {
delegate_.on_connection_error(c);
}
delegate_.on_connection_close(c);
pn_connection_close(conn);
}
void messaging_adapter::on_connection_remote_open(proton_event &pe) {
// Generate on_transport_open event here until we find a better place
transport t(make_wrapper(pn_event_transport(pe.pn_event())));
delegate_.on_transport_open(t);
pn_connection_t *conn = pn_event_connection(pe.pn_event());
connection c(make_wrapper(conn));
delegate_.on_connection_open(c);
if (!is_local_open(pn_connection_state(conn)) && is_local_unititialised(pn_connection_state(conn))) {
pn_connection_open(conn);
}
}
void messaging_adapter::on_session_remote_open(proton_event &pe) {
pn_session_t *session = pn_event_session(pe.pn_event());
class session s(make_wrapper(session));
delegate_.on_session_open(s);
if (!is_local_open(pn_session_state(session)) && is_local_unititialised(pn_session_state(session))) {
pn_session_open(session);
}
}
void messaging_adapter::on_link_local_open(proton_event &pe) {
credit_topup(pn_event_link(pe.pn_event()));
}
void messaging_adapter::on_link_remote_open(proton_event &pe) {
pn_link_t *lnk = pn_event_link(pe.pn_event());
container& c = pe.container();
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_open(r);
if (is_local_unititialised(pn_link_state(lnk))) {
r.open(c.receiver_options());
}
} else {
sender s(make_wrapper<sender>(lnk));
delegate_.on_sender_open(s);
if (is_local_unititialised(pn_link_state(lnk))) {
s.open(c.sender_options());
}
}
credit_topup(lnk);
}
void messaging_adapter::on_transport_closed(proton_event &pe) {
pn_transport_t *tspt = pn_event_transport(pe.pn_event());
transport t(make_wrapper(tspt));
// If the connection isn't open generate on_transport_open event
// because we didn't generate it yet and the events won't match.
pn_connection_t *conn = pn_event_connection(pe.pn_event());
if (!conn || is_remote_unititialised(pn_connection_state(conn))) {
delegate_.on_transport_open(t);
}
if (pn_condition_is_set(pn_transport_condition(tspt))) {
delegate_.on_transport_error(t);
}
delegate_.on_transport_close(t);
}
}