blob: c2fe210ce8f0c8412d053d4b7f2682c13bd1797e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "messaging_adapter.hpp"
#include "proton/sender.hpp"
#include "proton/error.hpp"
#include "contexts.hpp"
#include "messaging_event.hpp"
#include "msg.hpp"
#include "proton/connection.h"
#include "proton/delivery.h"
#include "proton/handlers.h"
#include "proton/link.h"
#include "proton/message.h"
#include "proton/session.h"
#include "proton/transport.h"
namespace proton {
namespace {
class c_flow_controller : public proton_handler
{
public:
pn_handler_t *flowcontroller;
// TODO: pn_flowcontroller requires a window > 1.
c_flow_controller(int window) : flowcontroller(pn_flowcontroller(std::max(window, 2))) {}
~c_flow_controller() {
pn_decref(flowcontroller);
}
void redirect(proton_event &pne) {
pn_handler_dispatch(flowcontroller, pne.pn_event(), pn_event_type_t(pne.type()));
}
virtual void on_link_local_open(proton_event &e) { redirect(e); }
virtual void on_link_remote_open(proton_event &e) { redirect(e); }
virtual void on_link_flow(proton_event &e) { redirect(e); }
virtual void on_delivery(proton_event &e) { redirect(e); }
};
} // namespace
void messaging_adapter::create_helpers() {
if (prefetch_ > 0) {
flow_controller_.reset(new c_flow_controller(prefetch_));
add_child_handler(*flow_controller_);
}
}
messaging_adapter::messaging_adapter(handler &delegate,
int prefetch, bool auto_accept, bool auto_settle, bool peer_close_iserror) :
delegate_(delegate),
prefetch_(prefetch),
auto_accept_(auto_accept),
auto_settle_(auto_settle),
peer_close_iserror_(peer_close_iserror)
{
create_helpers();
//add_child_handler(*this);
}
messaging_adapter::~messaging_adapter(){}
void messaging_adapter::on_reactor_init(proton_event &pe) {
messaging_event mevent(messaging_event::START, pe);
delegate_.on_start(mevent);
}
void messaging_adapter::on_link_flow(proton_event &pe) {
pn_event_t *pne = pe.pn_event();
pn_link_t *lnk = pn_event_link(pne);
if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) {
// create on_message extended event
messaging_event mevent(messaging_event::SENDABLE, pe);
delegate_.on_sendable(mevent);;
}
}
void messaging_adapter::on_delivery(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
delivery dlv = pe.delivery();
if (pn_link_is_receiver(lnk)) {
if (!dlv.partial() && dlv.readable()) {
// generate on_message
messaging_event mevent(messaging_event::MESSAGE, pe);
pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
connection_context& ctx = connection_context::get(pnc);
// Reusable per-connection message.
// Avoid expensive heap malloc/free overhead.
// See PROTON-998
class message &msg(ctx.event_message);
mevent.message_ = &msg;
mevent.message_->decode(lnk, dlv);
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
if (auto_accept_)
dlv.release();
} else {
delegate_.on_message(mevent);
if (auto_accept_ && !dlv.settled())
dlv.accept();
}
}
else if (dlv.updated() && dlv.settled()) {
messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe);
delegate_.on_delivery_settle(mevent);
}
} else {
// sender
if (dlv.updated()) {
amqp_ulong rstate = dlv.remote_state();
if (rstate == PN_ACCEPTED) {
messaging_event mevent(messaging_event::DELIVERY_ACCEPT, pe);
delegate_.on_delivery_accept(mevent);
}
else if (rstate == PN_REJECTED) {
messaging_event mevent(messaging_event::DELIVERY_REJECT, pe);
delegate_.on_delivery_reject(mevent);
}
else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) {
messaging_event mevent(messaging_event::DELIVERY_RELEASE, pe);
delegate_.on_delivery_release(mevent);
}
if (dlv.settled()) {
messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe);
delegate_.on_delivery_settle(mevent);
}
if (auto_settle_)
dlv.settle();
}
}
}
namespace {
bool is_local_open(pn_state_t state) {
return state & PN_LOCAL_ACTIVE;
}
bool is_local_unititialised(pn_state_t state) {
return state & PN_LOCAL_UNINIT;
}
} // namespace
void messaging_adapter::on_link_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_link_t *lnk = pn_event_link(cevent);
if (peer_close_iserror_ || pn_condition_is_set(pn_link_remote_condition(lnk))) {
messaging_event mevent(messaging_event::LINK_ERROR, pe);
delegate_.on_link_error(mevent);
}
messaging_event mevent(messaging_event::LINK_CLOSE, pe);
delegate_.on_link_close(mevent);
pn_link_close(lnk);
}
void messaging_adapter::on_session_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_session_t *session = pn_event_session(cevent);
if (peer_close_iserror_ || pn_condition_is_set(pn_session_remote_condition(session))) {
messaging_event mevent(messaging_event::SESSION_ERROR, pe);
delegate_.on_session_error(mevent);
}
messaging_event mevent(messaging_event::SESSION_CLOSE, pe);
delegate_.on_session_close(mevent);
pn_session_close(session);
}
void messaging_adapter::on_connection_remote_close(proton_event &pe) {
pn_event_t *cevent = pe.pn_event();
pn_connection_t *connection = pn_event_connection(cevent);
if (peer_close_iserror_ || pn_condition_is_set(pn_connection_remote_condition(connection))) {
messaging_event mevent(messaging_event::CONNECTION_ERROR, pe);
delegate_.on_connection_error(mevent);
}
messaging_event mevent(messaging_event::CONNECTION_CLOSE, pe);
delegate_.on_connection_close(mevent);
pn_connection_close(connection);
}
void messaging_adapter::on_connection_remote_open(proton_event &pe) {
messaging_event mevent(messaging_event::CONNECTION_OPEN, pe);
delegate_.on_connection_open(mevent);
pn_connection_t *connection = pn_event_connection(pe.pn_event());
if (!is_local_open(pn_connection_state(connection)) && is_local_unititialised(pn_connection_state(connection))) {
pn_connection_open(connection);
}
}
void messaging_adapter::on_session_remote_open(proton_event &pe) {
messaging_event mevent(messaging_event::SESSION_OPEN, pe);
delegate_.on_session_open(mevent);
pn_session_t *session = pn_event_session(pe.pn_event());
if (!is_local_open(pn_session_state(session)) && is_local_unititialised(pn_session_state(session))) {
pn_session_open(session);
}
}
void messaging_adapter::on_link_remote_open(proton_event &pe) {
messaging_event mevent(messaging_event::LINK_OPEN, pe);
delegate_.on_link_open(mevent);
pn_link_t *link = pn_event_link(pe.pn_event());
if (!is_local_open(pn_link_state(link)) && is_local_unititialised(pn_link_state(link))) {
pn_link_open(link);
}
}
void messaging_adapter::on_transport_tail_closed(proton_event &pe) {
pn_connection_t *conn = pn_event_connection(pe.pn_event());
if (conn && is_local_open(pn_connection_state(conn))) {
pn_transport_t *t = pn_event_transport(pe.pn_event());
if (pn_condition_is_set(pn_transport_condition(t))) {
messaging_event mevent(messaging_event::TRANSPORT_ERROR, pe);
delegate_.on_transport_error(mevent);
}
messaging_event mevent(messaging_event::TRANSPORT_CLOSE, pe);
delegate_.on_transport_close(mevent);
}
}
void messaging_adapter::on_timer_task(proton_event& pe)
{
messaging_event mevent(messaging_event::TIMER, pe);
delegate_.on_timer(mevent);
}
}