blob: 84956e4c21ef0023e15ba0caa04cab2f2d93f7ef [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/session.hpp"
#include "proton/connection.hpp"
#include "proton/container.hpp"
#include "proton/delivery.hpp"
#include "proton/error.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/receiver_options.hpp"
#include "proton/sender_options.hpp"
#include "proton/session_options.hpp"
#include "proton/target_options.hpp"
#include "proton/tracker.hpp"
#include "proton/transfer.hpp"
#include "proton/types.hpp"
#include "contexts.hpp"
#include "link_namer.hpp"
#include "proactor_container_impl.hpp"
#include "proton_bits.hpp"
#include "types_internal.hpp"
#include <proton/connection.h>
#include "proton/delivery.h"
#include <proton/session.h>
#include <string>
namespace proton {
session::~session() = default;
void session::open() {
pn_session_open(pn_object());
}
void session::open(const session_options &opts) {
opts.apply(*this);
pn_session_open(pn_object());
}
void session::close()
{
pn_session_close(pn_object());
}
container& session::container() const {
return connection().container();
}
work_queue& session::work_queue() const {
return connection().work_queue();
}
connection session::connection() const {
return make_wrapper(pn_session_connection(pn_object()));
}
namespace {
std::string next_link_name(const connection& c) {
io::link_namer* ln = connection_context::get(unwrap(c)).link_gen;
return ln ? ln->link_name() : uuid::random().str();
}
}
sender session::open_sender(const std::string &addr) {
return open_sender(addr, sender_options());
}
sender session::open_sender(const std::string &addr, const sender_options &so) {
std::string name = so.get_name() ? *so.get_name() : next_link_name(connection());
pn_link_t *lnk = pn_sender(pn_object(), name.c_str());
pn_terminus_set_address(pn_link_target(lnk), addr.c_str());
sender snd(make_wrapper<sender>(lnk));
snd.open(so);
return snd;
}
receiver session::open_receiver(const std::string &addr) {
return open_receiver(addr, receiver_options());
}
receiver session::open_receiver(const std::string &addr, const receiver_options &ro)
{
std::string name = ro.get_name() ? *ro.get_name() : next_link_name(connection());
pn_link_t *lnk = pn_receiver(pn_object(), name.c_str());
pn_terminus_set_address(pn_link_source(lnk), addr.c_str());
receiver rcv(make_wrapper<receiver>(lnk));
rcv.open(ro);
return rcv;
}
error_condition session::error() const {
return make_wrapper(pn_session_remote_condition(pn_object()));
}
size_t session::incoming_bytes() const {
return pn_session_incoming_bytes(pn_object());
}
size_t session::outgoing_bytes() const {
return pn_session_outgoing_bytes(pn_object());
}
sender_range session::senders() const {
pn_link_t *lnk = pn_link_head(pn_session_connection(pn_object()), 0);
while (lnk) {
if (pn_link_is_sender(lnk) && pn_link_session(lnk) == pn_object())
break;
lnk = pn_link_next(lnk, 0);
}
return sender_range(sender_iterator(make_wrapper<sender>(lnk), pn_object()));
}
receiver_range session::receivers() const {
pn_link_t *lnk = pn_link_head(pn_session_connection(pn_object()), 0);
while (lnk) {
if (pn_link_is_receiver(lnk) && pn_link_session(lnk) == pn_object())
break;
lnk = pn_link_next(lnk, 0);
}
return receiver_range(receiver_iterator(make_wrapper<receiver>(lnk), pn_object()));
}
session_iterator session_iterator::operator++() {
obj_ = pn_session_next(unwrap(obj_), 0);
return *this;
}
void session::user_data(void* user_data) const {
pn_session_t* ssn = pn_object();
session_context& sctx = session_context::get(ssn);
sctx.user_data_ = user_data;
}
void* session::user_data() const {
pn_session_t* ssn = pn_object();
session_context& sctx = session_context::get(ssn);
return sctx.user_data_;
}
namespace {
std::unique_ptr<transaction_context>& get_transaction_context(const session& s) {
return session_context::get(unwrap(s)).transaction_context_;
}
bool transaction_is_empty(const session& s) {
auto& txn = get_transaction_context(s);
return !txn || txn->state == transaction_context::State::NO_TRANSACTION;
}
proton::tracker transaction_send_ctrl(sender&& coordinator, const symbol& descriptor, const value& value) {
proton::value msg_value;
proton::codec::encoder enc(msg_value);
enc << proton::codec::start::described()
<< descriptor
<< value
<< proton::codec::finish();
return coordinator.send(msg_value);
}
void transaction_discharge(const session& s, bool failed) {
auto& transaction_context = get_transaction_context(s);
if (transaction_is_empty(s) || transaction_context->state != transaction_context::State::DECLARED)
throw proton::error("Only a declared txn can be discharged.");
transaction_context->state = transaction_context::State::DISCHARGING;
transaction_context->failed = failed;
transaction_send_ctrl(
make_wrapper<sender>(transaction_context->coordinator),
"amqp:discharge:list", std::list<proton::value>{transaction_context->transaction_id, failed});
}
pn_link_t* open_coordinator_sender(session& s) {
auto l = pn_sender(unwrap(s), next_link_name(s.connection()).c_str());
auto t = pn_link_target(l);
pn_terminus_set_type(t, PN_COORDINATOR);
auto caps = pn_terminus_capabilities(t);
// As we only have a single symbol in the capabilities we don't have to create an array
pn_data_put_symbol(caps, pn_bytes("amqp:local-transactions"));
pn_link_open(l);
return l;
}
bool has_unsettled_outgoing_deliveries(const session& s) {
auto session = unwrap(s);
auto& transaction_context = get_transaction_context(s);
auto coordinator = transaction_context ? transaction_context->coordinator : nullptr;
auto link = pn_link_head(pn_session_connection(session), 0);
while (link) {
if (pn_link_is_sender(link) && pn_link_session(link) == session && link != coordinator) {
if (pn_link_unsettled(link) > 0)
return true;
}
link = pn_link_next(link, 0);
}
return false;
}
}
void session::transaction_declare() {
if (!transaction_is_empty(*this))
throw proton::error("Session has already declared transaction");
// Check to see if there are any unsettled deliveries for this session
// This is to simplify keeping track of the deliveries that are involved in the transaction:
// The simplification is that we will only allow transactioned messages to be sent on this session
// so that we can find all the transactioned outgoing deliveries for the session.
// If we want to allow multiple transactions on the session or allow untransactioned messages,
// we would need to keep track of the deliveries that are involved in the transaction separately.
if (has_unsettled_outgoing_deliveries(*this))
throw proton::error("Session has unsettled outgoing deliveries, cannot declare transaction");
auto& txn_context = get_transaction_context(*this);
if (!txn_context) {
txn_context = std::make_unique<transaction_context>(open_coordinator_sender(*this));
}
// Declare txn
txn_context->state = transaction_context::State::DECLARING;
transaction_send_ctrl(make_wrapper<sender>(txn_context->coordinator), "amqp:declare:list", std::list<proton::value>{});
}
binary session::transaction_id() const {
auto& txn_context = get_transaction_context(*this);
if (txn_context) {
return txn_context->transaction_id;
} else {
return binary();
}
}
void session::transaction_commit() { transaction_discharge(*this, false); }
void session::transaction_abort() { transaction_discharge(*this, true); }
bool session::transaction_is_declared() const { return (!transaction_is_empty(*this)) && get_transaction_context(*this)->state == transaction_context::State::DECLARED; }
error_condition session::transaction_error() const {
auto& txn_context = get_transaction_context(*this);
return txn_context ? txn_context->error ? make_wrapper(txn_context->error) : error_condition() : error_condition();
}
} // namespace proton