blob: ee031f30cdddcda95505cf42bb3afcbf0c364f0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
#include <cms/Session.h>
#include <cms/ExceptionListener.h>
#include <activemq/util/Config.h>
#include <activemq/util/Usage.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/commands/ActiveMQTempDestination.h>
#include <activemq/commands/Response.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/SessionInfo.h>
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/commands/ConsumerId.h>
#include <activemq/commands/ProducerId.h>
#include <activemq/commands/TransactionId.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/core/MessageDispatchChannel.h>
#include <activemq/util/LongSequenceGenerator.h>
#include <activemq/threads/Scheduler.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/ArrayList.h>
#include <decaf/util/Properties.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <string>
#include <memory>
namespace activemq {
namespace core {
class ActiveMQConnection;
class ActiveMQConsumer;
class ActiveMQProducer;
class ActiveMQSessionExecutor;
namespace kernels {
using decaf::lang::Pointer;
using decaf::util::concurrent::atomic::AtomicBoolean;
class SessionConfig;
class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher {
private:
friend class activemq::core::ActiveMQSessionExecutor;
protected:
SessionConfig* config;
/**
* SessionInfo for this Session
*/
Pointer<commands::SessionInfo> sessionInfo;
/**
* Transaction Management object
*/
Pointer<ActiveMQTransactionContext> transaction;
/**
* Connection
*/
ActiveMQConnection* connection;
/**
* Indicates that this connection has been closed, it is no longer
* usable after this becomes true
*/
AtomicBoolean closed;
/**
* Sends incoming messages to the registered consumers.
*/
std::auto_ptr<ActiveMQSessionExecutor> executor;
/**
* This Sessions Acknowledgment mode.
*/
cms::Session::AcknowledgeMode ackMode;
/**
* Next available Producer Id
*/
util::LongSequenceGenerator producerIds;
/**
* Next available Producer Sequence Id
*/
util::LongSequenceGenerator producerSequenceIds;
/**
* Next available Consumer Id
*/
util::LongSequenceGenerator consumerIds;
/**
* Last Delivered Sequence Id
*/
long long lastDeliveredSequenceId;
private:
ActiveMQSessionKernel(const ActiveMQSessionKernel&);
ActiveMQSessionKernel& operator=(const ActiveMQSessionKernel&);
public:
ActiveMQSessionKernel(ActiveMQConnection* connection,
const Pointer<commands::SessionId>& id,
cms::Session::AcknowledgeMode ackMode,
const decaf::util::Properties& properties);
virtual ~ActiveMQSessionKernel();
/**
* Redispatches the given set of unconsumed messages to the consumers.
* @param unconsumedMessages - unconsumed messages to be redelivered.
*/
virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
/**
* Stops asynchronous message delivery.
*/
virtual void start();
/**
* Starts asynchronous message delivery.
*/
virtual void stop();
/**
* Indicates whether or not the session is currently in the started
* state.
*/
bool isStarted() const;
virtual bool isAutoAcknowledge() const {
return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
}
virtual bool isDupsOkAcknowledge() const {
return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
}
virtual bool isClientAcknowledge() const {
return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
}
virtual bool isIndividualAcknowledge() const {
return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
}
/**
* Fires the given exception to the exception listener of the connection
*/
void fire(const exceptions::ActiveMQException& ex);
public: // Methods from ActiveMQMessageDispatcher
/**
* Dispatches a message to a particular consumer.
* @param message - the message to be dispatched
*/
virtual void dispatch(const Pointer<MessageDispatch>& message);
public: // Implements Methods
virtual void close();
virtual void commit();
virtual void rollback();
virtual void recover();
virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination);
virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
const std::string& selector);
virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
const std::string& selector,
bool noLocal);
virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination,
const std::string& name,
const std::string& selector,
bool noLocal = false);
virtual cms::MessageProducer* createProducer(const cms::Destination* destination);
virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue);
virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
virtual cms::Queue* createQueue(const std::string& queueName);
virtual cms::Topic* createTopic(const std::string& topicName);
virtual cms::TemporaryQueue* createTemporaryQueue();
virtual cms::TemporaryTopic* createTemporaryTopic();
virtual cms::Message* createMessage();
virtual cms::BytesMessage* createBytesMessage();
virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
virtual cms::StreamMessage* createStreamMessage();
virtual cms::TextMessage* createTextMessage();
virtual cms::TextMessage* createTextMessage( const std::string& text );
virtual cms::MapMessage* createMapMessage();
virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
virtual bool isTransacted() const;
virtual void unsubscribe(const std::string& name);
public: // ActiveMQSessionKernel specific Methods
/**
* Sends a message from the Producer specified using this session's connection
* the message will be sent using the best available means depending on the
* configuration of the connection.
* <p>
* Asynchronous sends will be chosen if at all possible.
*
* @param producer
* The sending Producer
* @param destination
* The target destination for the Message.
* @param message
* The message to send to the broker.
* @param deliveryMode
* The delivery mode to assign to the outgoing message.
* @param priority
* The priority value to assign to the outgoing message.
* @param timeToLive
* The time to live for the outgoing message.
* @param usage
* Pointer to a Usage tracker which if set will be increased by the size
* of the given message.
* @param sendTimeout
* The amount of time to block during send before failing, or 0 to wait forever.
*
* @throws CMSException if an error occurs while sending the message.
*/
void send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
cms::Message* message, int deliveryMode, int priority, long long timeToLive,
util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
/**
* This method gets any registered exception listener of this sessions
* connection and returns it. Mainly intended for use by the objects
* that this session creates so that they can notify the client of
* exceptions that occur in the context of another thread.
*
* @return the registered cms::ExceptionListener pointer or NULL
*/
cms::ExceptionListener* getExceptionListener();
/**
* Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer
* objects created from this Session.
*
* @param transformer
* Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers.
*/
virtual void setMessageTransformer(cms::MessageTransformer* transformer);
/**
* Gets the currently configured MessageTransformer for this Session.
*
* @return the pointer to the currently set cms::MessageTransformer.
*/
virtual cms::MessageTransformer* getMessageTransformer() const;
/**
* Gets the Session Information object for this session, if the
* session is closed than this method throws an exception.
* @return SessionInfo Reference
*/
const commands::SessionInfo& getSessionInfo() const {
this->checkClosed();
return *( this->sessionInfo );
}
/**
* Gets the Session Id object for this session, if the session
* is closed than this method throws an exception.
* @return SessionId Reference
*/
const commands::SessionId& getSessionId() const {
this->checkClosed();
return *( this->sessionInfo->getSessionId() );
}
/**
* Gets the ActiveMQConnection that is associated with this session.
*/
ActiveMQConnection* getConnection() const {
return this->connection;
}
/**
* Gets a Pointer to this Session's Scheduler instance
*/
Pointer<threads::Scheduler> getScheduler() const;
/**
* Gets the currently set Last Delivered Sequence Id
*
* @return long long containing the sequence id of the last delivered Message.
*/
long long getLastDeliveredSequenceId() const {
return this->lastDeliveredSequenceId;
}
/**
* Sets the value of the Last Delivered Sequence Id
*
* @param value
* The new value to assign to the Last Delivered Sequence Id property.
*/
void setLastDeliveredSequenceId(long long value) {
this->lastDeliveredSequenceId = value;
}
/**
* Sends a Command to the broker without requesting any Response be returned.
* .
* @param command
* The message to send to the Broker.
*
* @throws ActiveMQException if not currently connected, or if the
* operation fails for any reason.
*/
void oneway(Pointer<commands::Command> command);
/**
* Sends a synchronous request and returns the response from the broker.
* Converts any error responses into an exception.
*
* @param command
* The command to send to the broker.
* @param timeout
* The time to wait for a response, default is zero or infinite.
*
* @return Pointer to a Response object that the broker has returned for the Command sent.
*
* @throws ActiveMQException thrown if an error response was received
* from the broker, or if any other error occurred.
*/
Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
/**
* Adds a MessageConsumerKernel to this session registering it with the Connection and
* store a reference to it so the session can ensure that all resources are closed when
* the session is closed.
*
* @param consumer
* The ActiveMQConsumerKernel instance to add to this session.
*
* @throw ActiveMQException if an internal error occurs.
*/
void addConsumer(Pointer<ActiveMQConsumerKernel> consumer);
/**
* Dispose of a MessageConsumer from this session. Removes it from the Connection
* and clean up any resources associated with it.
*
* @param consumer
* The ActiveMQConsumerKernel instance to remove from this session.
*
* @throw ActiveMQException if an internal error occurs.
*/
void removeConsumer(Pointer<ActiveMQConsumerKernel> consumer);
/**
* Adds a MessageProducer to this session registering it with the Connection and store
* a reference to it so the session can ensure that all resources are closed when
* the session is closed.
*
* @param producer
* The ActiveMQProducerKernel instance to add to this session.
*
* @throw ActiveMQException if an internal error occurs.
*/
void addProducer(Pointer<ActiveMQProducerKernel> producer);
/**
* Dispose of a MessageProducer from this session. Removes it from the Connection
* and clean up any resources associated with it.
*
* @param producer
* The Producer kernel instance to remove from this session.
*
* @throw ActiveMQException if an internal error occurs.
*/
void removeProducer(Pointer<ActiveMQProducerKernel> producer);
/**
* Starts if not already start a Transaction for this Session. If the session
* is not a Transacted Session then an exception is thrown. If a transaction is
* already in progress then this method has no effect.
*
* @throw ActiveMQException if this is not a Transacted Session.
*/
virtual void doStartTransaction();
/**
* Gets the Pointer to this Session's TransactionContext
*
* @return a Pointer to this Session's TransactionContext
*/
Pointer<ActiveMQTransactionContext> getTransactionContext() {
return this->transaction;
}
/**
* Request that the Session inform all its consumers to Acknowledge all Message's
* that have been received so far.
*/
void acknowledge();
/**
* Request that this Session inform all of its consumers to deliver their pending
* acks.
*/
void deliverAcks();
/**
* Request that this Session inform all of its consumers to clear all messages that
* are currently in progress.
*/
void clearMessagesInProgress(
decaf::lang::Pointer<decaf::util::concurrent::atomic::AtomicInteger> transportsInterrupted);
/**
* Causes the Session to wakeup its executer and ensure all messages are dispatched.
*/
void wakeup();
/**
* Get the Next available Consumer Id
* @return the next id in the sequence.
*/
Pointer<commands::ConsumerId> getNextConsumerId();
/**
* Get the Next available Producer Id
* @return the next id in the sequence.
*/
Pointer<commands::ProducerId> getNextProducerId();
/**
* Performs the actual Session close operations. This method is meant for use
* by ActiveMQConnection, the connection object calls this when it has been
* closed to skip some of the extraneous processing done by the client level
* close method.
*/
void doClose();
/**
* Cleans up the Session object's resources without attempting to send the
* Remove command to the broker, this can be called from ActiveMQConnection when
* it knows that the transport is down and the doClose method would throw an
* exception when it attempt to send the Remove Command.
*/
void dispose();
/**
* Set the prefetch level for the given consumer if it exists in this Session to
* the value specified.
*
* @param id
* The consumer Id to search for and set prefetch level.
* @param prefetch
* The new prefetch value.
*/
void setPrefetchSize(Pointer<commands::ConsumerId> id, int prefetch);
/**
* Close the specified consumer if present in this Session.
*
* @param id
* The consumer Id to close.
*/
void close(Pointer<commands::ConsumerId> id);
/**
* Checks if the given destination is currently in use by any consumers in this Session.
*
* @return true if there is a consumer of this destination in this Session.
*/
bool isInUse(Pointer<commands::ActiveMQDestination> destination);
/**
* @return a Pointer to an ActiveMQProducerKernel using its ProducerId, or NULL.
*/
Pointer<ActiveMQProducerKernel> lookupProducerKernel(Pointer<commands::ProducerId> id);
/**
* @return a Pointer to an ActiveMQConsumerKernel using its ConsumerId, or NULL.
*/
Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId> id);
/**
* Gives each consumer a chance to dispatch messages that have been enqueued by calling
* each consumers iterate method. Returns true if this method needs to be called again
* because a consumer requires further processing time to complete its dispatching. Once
* all consumers are done this method returns false.
*
* @return true if more iterations are needed false otherwise.
*/
bool iterateConsumers();
/**
* Checks if any MessageConsumer owned by this Session has a set MessageListener
* and throws an exception if so. This enforces the rule that the MessageConsumers
* belonging to a Session either operate in sync or async receive as a group.
*/
void checkMessageListener() const;
/**
* Returns a Hash Code for this Session based on its SessionId.
*
* @return an int hash code based on the string balue of SessionId.
*/
virtual int getHashCode() const;
/**
* Sends the given MessageAck command to the Broker either via Synchronous call or
* an Asynchronous call depending on the value of the async parameter.
*
* @param ack
* The MessageAck command to send.
* @param async
* True if the command can be sent asynchronously.
*/
void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
/**
* Returns true if this session is dispatching messages to its consumers asynchronously.
*
* @return Returns the sessionAsyncDispatch.
*/
bool isSessionAsyncDispatch() const;
/**
* Configures asynchronous message dispatch to this session's consumers.
*
* @param sessionAsyncDispatch The sessionAsyncDispatch to set.
*/
void setSessionAsyncDispatch(bool sessionAsyncDispatch);
/**
* Returns an ArrayList containing a copy of all consumers currently in
* use on this Session. Since this list is copied from the main consumers
* list the usage is thread safe after return.
*
* @return a list containing a pointer to each consumer active in this session.
*/
decaf::util::ArrayList< Pointer<ActiveMQConsumerKernel> > getConsumers() const;
private:
/**
* Get the Next available Producer Sequence Id
* @return the next id in the sequence.
*/
long long getNextProducerSequenceId() {
return this->producerSequenceIds.getNextSequenceId();
}
// Checks for the closed state and throws if so.
void checkClosed() const;
// Send the Destination Creation Request to the Broker, alerting it
// that we've created a new Temporary Destination.
// @param tempDestination - The new Temporary Destination
void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
// Send the Destination Destruction Request to the Broker, alerting
// it that we've removed an existing Temporary Destination.
// @param tempDestination - The Temporary Destination to remove
void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
// Creates a new Temporary Destination name using the connection id
// and a rolling count.
// @return a unique Temporary Destination name
std::string createTemporaryDestinationName();
};
}}}
#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_ */