| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "proton/session.hpp" |
| |
| #include "proton/connection.hpp" |
| #include "proton/container.hpp" |
| #include "proton/delivery.hpp" |
| #include "proton/error.hpp" |
| #include "proton/messaging_handler.hpp" |
| #include "proton/receiver_options.hpp" |
| #include "proton/sender_options.hpp" |
| #include "proton/session_options.hpp" |
| #include "proton/target_options.hpp" |
| #include "proton/tracker.hpp" |
| #include "proton/transfer.hpp" |
| #include "proton/types.hpp" |
| |
| #include "contexts.hpp" |
| #include "link_namer.hpp" |
| #include "proactor_container_impl.hpp" |
| #include "proton_bits.hpp" |
| #include "types_internal.hpp" |
| |
| #include <proton/connection.h> |
| #include "proton/delivery.h" |
| #include <proton/session.h> |
| |
| #include <string> |
| |
| namespace proton { |
| |
| session::~session() = default; |
| |
| void session::open() { |
| pn_session_open(pn_object()); |
| } |
| |
| void session::open(const session_options &opts) { |
| opts.apply(*this); |
| pn_session_open(pn_object()); |
| } |
| |
| void session::close() |
| { |
| pn_session_close(pn_object()); |
| } |
| |
| container& session::container() const { |
| return connection().container(); |
| } |
| |
| work_queue& session::work_queue() const { |
| return connection().work_queue(); |
| } |
| |
| connection session::connection() const { |
| return make_wrapper(pn_session_connection(pn_object())); |
| } |
| |
| namespace { |
| std::string next_link_name(const connection& c) { |
| io::link_namer* ln = connection_context::get(unwrap(c)).link_gen; |
| |
| return ln ? ln->link_name() : uuid::random().str(); |
| } |
| } |
| |
| sender session::open_sender(const std::string &addr) { |
| return open_sender(addr, sender_options()); |
| } |
| |
| sender session::open_sender(const std::string &addr, const sender_options &so) { |
| std::string name = so.get_name() ? *so.get_name() : next_link_name(connection()); |
| pn_link_t *lnk = pn_sender(pn_object(), name.c_str()); |
| pn_terminus_set_address(pn_link_target(lnk), addr.c_str()); |
| sender snd(make_wrapper<sender>(lnk)); |
| snd.open(so); |
| return snd; |
| } |
| |
| receiver session::open_receiver(const std::string &addr) { |
| return open_receiver(addr, receiver_options()); |
| } |
| |
| receiver session::open_receiver(const std::string &addr, const receiver_options &ro) |
| { |
| std::string name = ro.get_name() ? *ro.get_name() : next_link_name(connection()); |
| pn_link_t *lnk = pn_receiver(pn_object(), name.c_str()); |
| pn_terminus_set_address(pn_link_source(lnk), addr.c_str()); |
| receiver rcv(make_wrapper<receiver>(lnk)); |
| rcv.open(ro); |
| return rcv; |
| } |
| |
| error_condition session::error() const { |
| return make_wrapper(pn_session_remote_condition(pn_object())); |
| } |
| |
| size_t session::incoming_bytes() const { |
| return pn_session_incoming_bytes(pn_object()); |
| } |
| |
| size_t session::outgoing_bytes() const { |
| return pn_session_outgoing_bytes(pn_object()); |
| } |
| |
| sender_range session::senders() const { |
| pn_link_t *lnk = pn_link_head(pn_session_connection(pn_object()), 0); |
| while (lnk) { |
| if (pn_link_is_sender(lnk) && pn_link_session(lnk) == pn_object()) |
| break; |
| lnk = pn_link_next(lnk, 0); |
| } |
| return sender_range(sender_iterator(make_wrapper<sender>(lnk), pn_object())); |
| } |
| |
| receiver_range session::receivers() const { |
| pn_link_t *lnk = pn_link_head(pn_session_connection(pn_object()), 0); |
| while (lnk) { |
| if (pn_link_is_receiver(lnk) && pn_link_session(lnk) == pn_object()) |
| break; |
| lnk = pn_link_next(lnk, 0); |
| } |
| return receiver_range(receiver_iterator(make_wrapper<receiver>(lnk), pn_object())); |
| } |
| |
| session_iterator session_iterator::operator++() { |
| obj_ = pn_session_next(unwrap(obj_), 0); |
| return *this; |
| } |
| |
| void session::user_data(void* user_data) const { |
| pn_session_t* ssn = pn_object(); |
| session_context& sctx = session_context::get(ssn); |
| sctx.user_data_ = user_data; |
| } |
| |
| void* session::user_data() const { |
| pn_session_t* ssn = pn_object(); |
| session_context& sctx = session_context::get(ssn); |
| return sctx.user_data_; |
| } |
| |
| namespace { |
| |
| std::unique_ptr<transaction_context>& get_transaction_context(const session& s) { |
| return session_context::get(unwrap(s)).transaction_context_; |
| } |
| |
| bool transaction_is_empty(const session& s) { |
| auto& txn = get_transaction_context(s); |
| return !txn || txn->state == transaction_context::State::NO_TRANSACTION; |
| } |
| |
| proton::tracker transaction_send_ctrl(sender&& coordinator, const symbol& descriptor, const value& value) { |
| proton::value msg_value; |
| proton::codec::encoder enc(msg_value); |
| enc << proton::codec::start::described() |
| << descriptor |
| << value |
| << proton::codec::finish(); |
| |
| return coordinator.send(msg_value); |
| } |
| |
| void transaction_discharge(const session& s, bool failed) { |
| auto& transaction_context = get_transaction_context(s); |
| if (transaction_is_empty(s) || transaction_context->state != transaction_context::State::DECLARED) |
| throw proton::error("Only a declared txn can be discharged."); |
| transaction_context->state = transaction_context::State::DISCHARGING; |
| |
| transaction_context->failed = failed; |
| transaction_send_ctrl( |
| make_wrapper<sender>(transaction_context->coordinator), |
| "amqp:discharge:list", std::list<proton::value>{transaction_context->transaction_id, failed}); |
| } |
| |
| pn_link_t* open_coordinator_sender(session& s) { |
| auto l = pn_sender(unwrap(s), next_link_name(s.connection()).c_str()); |
| auto t = pn_link_target(l); |
| pn_terminus_set_type(t, PN_COORDINATOR); |
| auto caps = pn_terminus_capabilities(t); |
| // As we only have a single symbol in the capabilities we don't have to create an array |
| pn_data_put_symbol(caps, pn_bytes("amqp:local-transactions")); |
| pn_link_open(l); |
| return l; |
| } |
| |
| bool has_unsettled_outgoing_deliveries(const session& s) { |
| auto session = unwrap(s); |
| auto& transaction_context = get_transaction_context(s); |
| auto coordinator = transaction_context ? transaction_context->coordinator : nullptr; |
| auto link = pn_link_head(pn_session_connection(session), 0); |
| while (link) { |
| if (pn_link_is_sender(link) && pn_link_session(link) == session && link != coordinator) { |
| if (pn_link_unsettled(link) > 0) |
| return true; |
| } |
| link = pn_link_next(link, 0); |
| } |
| return false; |
| } |
| |
| } |
| |
| void session::transaction_declare() { |
| if (!transaction_is_empty(*this)) |
| throw proton::error("Session has already declared transaction"); |
| |
| // Check to see if there are any unsettled deliveries for this session |
| // This is to simplify keeping track of the deliveries that are involved in the transaction: |
| // The simplification is that we will only allow transactioned messages to be sent on this session |
| // so that we can find all the transactioned outgoing deliveries for the session. |
| // If we want to allow multiple transactions on the session or allow untransactioned messages, |
| // we would need to keep track of the deliveries that are involved in the transaction separately. |
| if (has_unsettled_outgoing_deliveries(*this)) |
| throw proton::error("Session has unsettled outgoing deliveries, cannot declare transaction"); |
| |
| auto& txn_context = get_transaction_context(*this); |
| if (!txn_context) { |
| txn_context = std::make_unique<transaction_context>(open_coordinator_sender(*this)); |
| } |
| |
| // Declare txn |
| txn_context->state = transaction_context::State::DECLARING; |
| |
| transaction_send_ctrl(make_wrapper<sender>(txn_context->coordinator), "amqp:declare:list", std::list<proton::value>{}); |
| } |
| |
| |
| binary session::transaction_id() const { |
| auto& txn_context = get_transaction_context(*this); |
| if (txn_context) { |
| return txn_context->transaction_id; |
| } else { |
| return binary(); |
| } |
| } |
| void session::transaction_commit() { transaction_discharge(*this, false); } |
| void session::transaction_abort() { transaction_discharge(*this, true); } |
| bool session::transaction_is_declared() const { return (!transaction_is_empty(*this)) && get_transaction_context(*this)->state == transaction_context::State::DECLARED; } |
| error_condition session::transaction_error() const { |
| auto& txn_context = get_transaction_context(*this); |
| return txn_context ? txn_context->error ? make_wrapper(txn_context->error) : error_condition() : error_condition(); |
| } |
| |
| } // namespace proton |