| #ifndef QPID_CLIENT_AMQP0_10_SESSIONIMPL_H |
| #define QPID_CLIENT_AMQP0_10_SESSIONIMPL_H |
| |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "qpid/messaging/SessionImpl.h" |
| #include "qpid/messaging/Duration.h" |
| #include "qpid/messaging/exceptions.h" |
| #include "qpid/client/Session.h" |
| #include "qpid/client/SubscriptionManager.h" |
| #include "qpid/client/amqp0_10/AddressResolution.h" |
| #include "qpid/client/amqp0_10/IncomingMessages.h" |
| #include "qpid/sys/Mutex.h" |
| #include "qpid/framing/reply_exceptions.h" |
| #include <boost/intrusive_ptr.hpp> |
| |
| namespace qpid { |
| |
| namespace messaging { |
| class Address; |
| class Connection; |
| class Message; |
| class Receiver; |
| class Sender; |
| class Session; |
| } |
| |
| namespace client { |
| namespace amqp0_10 { |
| |
| class ConnectionImpl; |
| class ReceiverImpl; |
| class SenderImpl; |
| |
| /** |
| * Implementation of the protocol independent Session interface using |
| * AMQP 0-10. |
| */ |
| class SessionImpl : public qpid::messaging::SessionImpl |
| { |
| public: |
| SessionImpl(ConnectionImpl&, bool transactional); |
| void commit(); |
| void rollback(); |
| void acknowledge(bool sync); |
| void reject(qpid::messaging::Message&); |
| void release(qpid::messaging::Message&); |
| void acknowledge(qpid::messaging::Message& msg, bool cumulative); |
| void close(); |
| void sync(bool block); |
| qpid::messaging::Sender createSender(const qpid::messaging::Address& address); |
| qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); |
| |
| qpid::messaging::Sender getSender(const std::string& name) const; |
| qpid::messaging::Receiver getReceiver(const std::string& name) const; |
| |
| bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout); |
| qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout); |
| |
| qpid::messaging::Connection getConnection() const; |
| void checkError(); |
| bool hasError(); |
| |
| bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); |
| |
| void releasePending(const std::string& destination); |
| void receiverCancelled(const std::string& name); |
| void senderCancelled(const std::string& name); |
| |
| uint32_t getReceivable(); |
| uint32_t getReceivable(const std::string& destination); |
| |
| uint32_t getUnsettledAcks(); |
| uint32_t getUnsettledAcks(const std::string& destination); |
| |
| void setSession(qpid::client::Session); |
| |
| template <class T> bool execute(T& f) |
| { |
| try { |
| f(); |
| return true; |
| } catch (const qpid::TransportFailure&) { |
| reconnect(); |
| return false; |
| } catch (const qpid::framing::ResourceLimitExceededException& e) { |
| if (backoff()) return false; |
| else throw qpid::messaging::TargetCapacityExceeded(e.what()); |
| } catch (const qpid::framing::UnauthorizedAccessException& e) { |
| throw qpid::messaging::UnauthorizedAccess(e.what()); |
| } catch (const qpid::SessionException& e) { |
| throw qpid::messaging::SessionError(e.what()); |
| } catch (const qpid::ConnectionException& e) { |
| throw qpid::messaging::ConnectionError(e.what()); |
| } catch (const qpid::ChannelException& e) { |
| throw qpid::messaging::MessagingException(e.what()); |
| } |
| } |
| |
| static SessionImpl& convert(qpid::messaging::Session&); |
| |
| private: |
| typedef std::map<std::string, qpid::messaging::Receiver> Receivers; |
| typedef std::map<std::string, qpid::messaging::Sender> Senders; |
| |
| mutable qpid::sys::Mutex lock; |
| boost::intrusive_ptr<ConnectionImpl> connection; |
| qpid::client::Session session; |
| AddressResolution resolver; |
| IncomingMessages incoming; |
| Receivers receivers; |
| Senders senders; |
| const bool transactional; |
| |
| bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); |
| bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); |
| bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); |
| void reconnect(); |
| bool backoff(); |
| |
| void commitImpl(); |
| void rollbackImpl(); |
| void acknowledgeImpl(); |
| void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); |
| void rejectImpl(qpid::messaging::Message&); |
| void releaseImpl(qpid::messaging::Message&); |
| void closeImpl(); |
| void syncImpl(bool block); |
| qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address); |
| qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address); |
| uint32_t getReceivableImpl(const std::string* destination); |
| uint32_t getUnsettledAcksImpl(const std::string* destination); |
| |
| //functors for public facing methods (allows locking and retry |
| //logic to be centralised) |
| struct Command |
| { |
| SessionImpl& impl; |
| |
| Command(SessionImpl& i) : impl(i) {} |
| }; |
| |
| struct Commit : Command |
| { |
| Commit(SessionImpl& i) : Command(i) {} |
| void operator()() { impl.commitImpl(); } |
| }; |
| |
| struct Rollback : Command |
| { |
| Rollback(SessionImpl& i) : Command(i) {} |
| void operator()() { impl.rollbackImpl(); } |
| }; |
| |
| struct Acknowledge : Command |
| { |
| Acknowledge(SessionImpl& i) : Command(i) {} |
| void operator()() { impl.acknowledgeImpl(); } |
| }; |
| |
| struct Sync : Command |
| { |
| Sync(SessionImpl& i) : Command(i) {} |
| void operator()() { impl.syncImpl(true); } |
| }; |
| |
| struct NonBlockingSync : Command |
| { |
| NonBlockingSync(SessionImpl& i) : Command(i) {} |
| void operator()() { impl.syncImpl(false); } |
| }; |
| |
| struct Reject : Command |
| { |
| qpid::messaging::Message& message; |
| |
| Reject(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} |
| void operator()() { impl.rejectImpl(message); } |
| }; |
| |
| struct Release : Command |
| { |
| qpid::messaging::Message& message; |
| |
| Release(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} |
| void operator()() { impl.releaseImpl(message); } |
| }; |
| |
| struct Acknowledge2 : Command |
| { |
| qpid::messaging::Message& message; |
| bool cumulative; |
| |
| Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} |
| void operator()() { impl.acknowledgeImpl(message, cumulative); } |
| }; |
| |
| struct CreateSender; |
| struct CreateReceiver; |
| struct UnsettledAcks; |
| struct Receivable; |
| |
| //helper templates for some common patterns |
| template <class F> bool execute() |
| { |
| F f(*this); |
| return execute(f); |
| } |
| |
| template <class F> void retry() |
| { |
| while (!execute<F>()) {} |
| } |
| |
| template <class F, class P> bool execute1(P p) |
| { |
| F f(*this, p); |
| return execute(f); |
| } |
| |
| template <class F, class R, class P> R get1(P p) |
| { |
| F f(*this, p); |
| while (!execute(f)) {} |
| return f.result; |
| } |
| }; |
| }}} // namespace qpid::client::amqp0_10 |
| |
| #endif /*!QPID_CLIENT_AMQP0_10_SESSIONIMPL_H*/ |