| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_ |
| #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_ |
| |
| #include <cms/EnhancedConnection.h> |
| #include <activemq/util/Config.h> |
| #include <activemq/core/Dispatcher.h> |
| #include <activemq/commands/ActiveMQTempDestination.h> |
| #include <activemq/commands/ConnectionInfo.h> |
| #include <activemq/commands/ConsumerInfo.h> |
| #include <activemq/commands/SessionId.h> |
| #include <activemq/exceptions/ActiveMQException.h> |
| #include <activemq/transport/Transport.h> |
| #include <activemq/transport/TransportListener.h> |
| #include <activemq/threads/Scheduler.h> |
| #include <activemq/core/kernels/ActiveMQProducerKernel.h> |
| #include <activemq/core/kernels/ActiveMQSessionKernel.h> |
| #include <decaf/util/Properties.h> |
| #include <decaf/util/ArrayList.h> |
| #include <decaf/util/concurrent/atomic/AtomicBoolean.h> |
| #include <decaf/util/concurrent/ExecutorService.h> |
| #include <decaf/lang/exceptions/UnsupportedOperationException.h> |
| #include <decaf/lang/exceptions/NullPointerException.h> |
| #include <decaf/lang/exceptions/IllegalStateException.h> |
| |
| #include <string> |
| #include <memory> |
| |
| namespace activemq { |
| namespace core { |
| |
| using decaf::lang::Pointer; |
| |
| class ActiveMQSession; |
| class ConnectionConfig; |
| class PrefetchPolicy; |
| class RedeliveryPolicy; |
| |
| /** |
| * Concrete connection used for all connectors to the |
| * ActiveMQ broker. |
| * |
| * @since 2.0 |
| */ |
| class AMQCPP_API ActiveMQConnection : public virtual cms::EnhancedConnection, |
| public transport::TransportListener { |
| private: |
| |
| ConnectionConfig* config; |
| |
| /** |
| * The instance of ConnectionMetaData to return to clients. |
| */ |
| std::auto_ptr<cms::ConnectionMetaData> connectionMetaData; |
| |
| /** |
| * Indicates if this Connection is started |
| */ |
| decaf::util::concurrent::atomic::AtomicBoolean started; |
| |
| /** |
| * Indicates that this connection has been closed, it is no longer |
| * usable after this becomes true |
| */ |
| decaf::util::concurrent::atomic::AtomicBoolean closed; |
| |
| /** |
| * Indicates that this connection has been closed, it is no longer |
| * usable after this becomes true |
| */ |
| decaf::util::concurrent::atomic::AtomicBoolean closing; |
| |
| /** |
| * Indicates that this connection's Transport has failed. |
| */ |
| decaf::util::concurrent::atomic::AtomicBoolean transportFailed; |
| |
| private: |
| |
| ActiveMQConnection(const ActiveMQConnection&); |
| ActiveMQConnection& operator=(const ActiveMQConnection&); |
| |
| public: |
| |
| /** |
| * Constructor |
| * |
| * @param transport |
| * The Transport requested for this connection to the Broker. |
| * @param properties |
| * The Properties that were defined for this connection |
| */ |
| ActiveMQConnection(const Pointer<transport::Transport> transport, |
| const Pointer<decaf::util::Properties> properties); |
| |
| virtual ~ActiveMQConnection(); |
| |
| /** |
| * Adds the session resources for the given session instance. |
| * |
| * @param session |
| * The session to be added to this connection. |
| * |
| * @throws CMSException if an error occurs while removing performing the operation. |
| */ |
| virtual void addSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session); |
| |
| /** |
| * Removes the session resources for the given session instance. |
| * |
| * @param session |
| * The session to be unregistered from this connection. |
| * |
| * @throws CMSException if an error occurs while removing performing the operation. |
| */ |
| virtual void removeSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session); |
| |
| /** |
| * Adds an active Producer to the Set of known producers. |
| * |
| * @param producer |
| * The Producer to add from the the known set. |
| * |
| * @throws CMSException if an error occurs while removing performing the operation. |
| */ |
| virtual void addProducer(Pointer<kernels::ActiveMQProducerKernel> producer); |
| |
| /** |
| * Removes an active Producer to the Set of known producers. |
| * @param producerId - The ProducerId to remove from the the known set. |
| * @throws CMSException if an error occurs while removing performing the operation. |
| */ |
| virtual void removeProducer(const Pointer<commands::ProducerId>& producerId); |
| |
| /** |
| * Adds a dispatcher for a consumer. |
| * @param consumer - The consumer for which to register a dispatcher. |
| * @param dispatcher - The dispatcher to handle incoming messages for the consumer. |
| * @throws CMSException if an error occurs while removing performing the operation. |
| */ |
| virtual void addDispatcher(const Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher); |
| |
| /** |
| * Removes the dispatcher for a consumer. |
| * @param consumer - The consumer for which to remove the dispatcher. |
| * @throws CMSException if an error occurs while removing performing the operation. |
| */ |
| virtual void removeDispatcher(const Pointer<commands::ConsumerId>& consumer); |
| |
| /** |
| * If supported sends a message pull request to the service provider asking |
| * for the delivery of a new message. This is used in the case where the |
| * service provider has been configured with a zero prefetch or is only |
| * capable of delivering messages on a pull basis. |
| * @param consumer - the ConsumerInfo for the requesting Consumer. |
| * @param timeout - the time that the client is willing to wait. |
| * |
| * @throws ActiveMQException if an error occurs while removing performing the operation. |
| */ |
| virtual void sendPullRequest(const commands::ConsumerInfo* consumer, long long timeout); |
| |
| /** |
| * Checks if this connection has been closed |
| * @return true if the connection is closed |
| */ |
| bool isClosed() const { |
| return this->closed.get(); |
| } |
| |
| /** |
| * Check if this connection has been started. |
| * @return true if the start method has been called. |
| */ |
| bool isStarted() const { |
| return this->started.get(); |
| } |
| |
| /** |
| * Checks if the Connection's Transport has failed |
| * @return true if the Connection's Transport has failed. |
| */ |
| bool isTransportFailed() const { |
| return this->transportFailed.get(); |
| } |
| |
| /** |
| * Requests that the Broker removes the given Destination. Calling this |
| * method implies that the client is finished with the Destination and that |
| * no other messages will be sent or received for the given Destination. The |
| * Broker frees all resources it has associated with this Destination. |
| * |
| * @param destination |
| * The Destination the Broker will be requested to remove. |
| * |
| * @throws NullPointerException |
| * If the passed Destination is Null |
| * @throws IllegalStateException |
| * If the connection is closed. |
| * @throws UnsupportedOperationException |
| * If the wire format in use does not support this operation. |
| * @throws ActiveMQException |
| * If any other error occurs during the attempt to destroy the destination. |
| */ |
| virtual void destroyDestination(const commands::ActiveMQDestination* destination); |
| |
| /** |
| * Requests that the Broker removes the given Destination. Calling this |
| * method implies that the client is finished with the Destination and that |
| * no other messages will be sent or received for the given Destination. The |
| * Broker frees all resources it has associated with this Destination. |
| * |
| * @param destination |
| * The CMS Destination the Broker will be requested to remove. |
| * |
| * @throws NullPointerException |
| * If the passed Destination is Null |
| * @throws IllegalStateException |
| * If the connection is closed. |
| * @throws UnsupportedOperationException |
| * If the wire format in use does not support this operation. |
| * @throws ActiveMQException |
| * If any other error occurs during the attempt to destroy the destination. |
| */ |
| virtual void destroyDestination(const cms::Destination* destination); |
| |
| /** |
| * Allows Consumers to check if an incoming Message is a Duplicate. |
| * |
| * @param dispatcher |
| * The Dispatcher that is checking the Message for Duplication. |
| * @param message |
| * The Message that should be checked. |
| * |
| * @return true if the Message was seen before. |
| */ |
| bool isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message); |
| |
| /** |
| * Mark message as received. |
| * |
| * @param dispatcher |
| * The Dispatcher instance that has received the Message. |
| * @param message |
| * The Message that has been received. |
| */ |
| void rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message); |
| |
| /** |
| * Removes the Audit information stored for a given MessageConsumer |
| * |
| * @param dispatcher |
| * The Dispatcher instance that has received the Message. |
| */ |
| void removeAuditedDispatcher(Dispatcher* dispatcher); |
| |
| public: // Connection Interface Methods |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual const cms::ConnectionMetaData* getMetaData() const { |
| return connectionMetaData.get(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual cms::Session* createSession(); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual std::string getClientID() const; |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual void setClientID(const std::string& clientID); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual void close(); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual void start(); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual void stop(); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual cms::ExceptionListener* getExceptionListener() const; |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual void setExceptionListener(cms::ExceptionListener* listener); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual void setMessageTransformer(cms::MessageTransformer* transformer); |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual cms::MessageTransformer* getMessageTransformer() const; |
| |
| /** |
| * {@inheritDoc} |
| */ |
| virtual cms::DestinationSource* getDestinationSource(); |
| |
| public: // Configuration Options |
| |
| /** |
| * Sets the username that should be used when creating a new connection |
| * @param username string |
| */ |
| void setUsername(const std::string& username); |
| |
| /** |
| * Gets the username that this factory will use when creating a new |
| * connection instance. |
| * @return username string, "" for default credentials |
| */ |
| const std::string& getUsername() const; |
| |
| /** |
| * Sets the password that should be used when creating a new connection |
| * @param password string |
| */ |
| void setPassword(const std::string& password); |
| |
| /** |
| * Gets the password that this factory will use when creating a new |
| * connection instance. |
| * @return password string, "" for default credentials |
| */ |
| const std::string& getPassword() const; |
| |
| /** |
| * Sets the Client Id. |
| * @param clientId - The new clientId value. |
| */ |
| void setDefaultClientId(const std::string& clientId); |
| |
| /** |
| * Sets the Broker URL that should be used when creating a new |
| * connection instance |
| * @param brokerURL string |
| */ |
| void setBrokerURL(const std::string& brokerURL); |
| |
| /** |
| * Gets the Broker URL that this factory will use when creating a new |
| * connection instance. |
| * @return brokerURL string |
| */ |
| const std::string& getBrokerURL() const; |
| |
| /** |
| * Sets the PrefetchPolicy instance that this factory should use when it creates |
| * new Connection instances. The PrefetchPolicy passed becomes the property of the |
| * factory and will be deleted when the factory is destroyed. |
| * |
| * @param policy |
| * The new PrefetchPolicy that the ConnectionFactory should clone for Connections. |
| */ |
| void setPrefetchPolicy(PrefetchPolicy* policy); |
| |
| /** |
| * Gets the pointer to the current PrefetchPolicy that is in use by this ConnectionFactory. |
| * |
| * @return a pointer to this objects PrefetchPolicy. |
| */ |
| PrefetchPolicy* getPrefetchPolicy() const; |
| |
| /** |
| * Sets the RedeliveryPolicy instance that this factory should use when it creates |
| * new Connection instances. The RedeliveryPolicy passed becomes the property of the |
| * factory and will be deleted when the factory is destroyed. |
| * |
| * @param policy |
| * The new RedeliveryPolicy that the ConnectionFactory should clone for Connections. |
| */ |
| void setRedeliveryPolicy(RedeliveryPolicy* policy); |
| |
| /** |
| * Gets the pointer to the current RedeliveryPolicy that is in use by this ConnectionFactory. |
| * |
| * @return a pointer to this objects RedeliveryPolicy. |
| */ |
| RedeliveryPolicy* getRedeliveryPolicy() const; |
| |
| /** |
| * @return The value of the dispatch asynchronously option sent to the broker. |
| */ |
| bool isDispatchAsync() const; |
| |
| /** |
| * Should messages be dispatched synchronously or asynchronously from the producer |
| * thread for non-durable topics in the broker? For fast consumers set this to false. |
| * For slow consumers set it to true so that dispatching will not block fast consumers. . |
| * |
| * @param value |
| * The value of the dispatch asynchronously option sent to the broker. |
| */ |
| void setDispatchAsync(bool value); |
| |
| /** |
| * Gets if the Connection should always send things Synchronously. |
| * |
| * @return true if sends should always be Synchronous. |
| */ |
| bool isAlwaysSyncSend() const; |
| |
| /** |
| * Sets if the Connection should always send things Synchronously. |
| * @param value |
| * true if sends should always be Synchronous. |
| */ |
| void setAlwaysSyncSend(bool value); |
| |
| /** |
| * Gets if the useAsyncSend option is set |
| * @return true if on false if not. |
| */ |
| bool isUseAsyncSend() const; |
| |
| /** |
| * Sets the useAsyncSend option |
| * @param value - true to activate, false to disable. |
| */ |
| void setUseAsyncSend(bool value); |
| |
| /** |
| * Gets if the Connection is configured for Message body compression. |
| * @return if the Message body will be Compressed or not. |
| */ |
| bool isUseCompression() const; |
| |
| /** |
| * Sets whether Message body compression is enabled. |
| * |
| * @param value |
| * Boolean indicating if Message body compression is enabled. |
| */ |
| void setUseCompression(bool value); |
| |
| /** |
| * Sets the Compression level used when Message body compression is enabled, a |
| * value of -1 causes the Compression Library to use the default setting which |
| * is a balance of speed and compression. The range of compression levels is |
| * [0..9] where 0 indicates best speed and 9 indicates best compression. |
| * |
| * @param value |
| * A signed int value that controls the compression level. |
| */ |
| void setCompressionLevel(int value); |
| |
| /** |
| * Gets the currently configured Compression level for Message bodies. |
| * |
| * @return the int value of the current compression level. |
| */ |
| int getCompressionLevel() const; |
| |
| /** |
| * Gets the assigned send timeout for this Connector |
| * @return the send timeout configured in the connection uri |
| */ |
| unsigned int getSendTimeout() const; |
| |
| /** |
| * Sets the send timeout to use when sending Message objects, this will |
| * cause all messages to be sent using a Synchronous request is non-zero. |
| * @param timeout - The time to wait for a response. |
| */ |
| void setSendTimeout(unsigned int timeout); |
| |
| /** |
| * Gets the assigned close timeout for this Connector |
| * @return the close timeout configured in the connection uri |
| */ |
| unsigned int getCloseTimeout() const; |
| |
| /** |
| * Sets the close timeout to use when sending the disconnect request. |
| * @param timeout - The time to wait for a close message. |
| */ |
| void setCloseTimeout(unsigned int timeout); |
| |
| /** |
| * Gets the configured producer window size for Producers that are created |
| * from this connector. This only applies if there is no send timeout and the |
| * producer is able to send asynchronously. |
| * @return size in bytes of messages that this producer can produce before |
| * it must block and wait for ProducerAck messages to free resources. |
| */ |
| unsigned int getProducerWindowSize() const; |
| |
| /** |
| * Sets the size in Bytes of messages that a producer can send before it is blocked |
| * to await a ProducerAck from the broker that frees enough memory to allow another |
| * message to be sent. |
| * @param windowSize - The size in bytes of the Producers memory window. |
| */ |
| void setProducerWindowSize(unsigned int windowSize); |
| |
| /** |
| * @return true if the Connections that this factory creates should support the |
| * message based priority settings. |
| */ |
| bool isMessagePrioritySupported() const; |
| |
| /** |
| * Set whether or not this factory should create Connection objects with the Message |
| * priority support function enabled. |
| * |
| * @param value |
| * Boolean indicating if Message priority should be enabled. |
| */ |
| void setMessagePrioritySupported(bool value); |
| |
| /** |
| * Get the Next Temporary Destination Id |
| * @return the next id in the sequence. |
| */ |
| long long getNextTempDestinationId(); |
| |
| /** |
| * Get the Next Temporary Destination Id |
| * @return the next id in the sequence. |
| */ |
| long long getNextLocalTransactionId(); |
| |
| /** |
| * Is the Connection configured to watch for advisory messages to maintain state of |
| * temporary destination create and destroy. |
| * |
| * @return true if the Connection will listen for temporary topic advisory messages. |
| */ |
| bool isWatchTopicAdvisories() const; |
| |
| /** |
| * Sets whether this Connection is listening for advisory messages regarding temporary |
| * destination creation and deletion. |
| * |
| * @param value |
| * Boolean indicating if advisory message monitoring should be enabled. |
| */ |
| void setWatchTopicAdvisories(bool value); |
| |
| /** |
| * Get the audit depth for Messages for consumers when using a fault |
| * tolerant transport. The higher the value the more messages are checked |
| * for duplication, and the larger the performance impact of duplicate |
| * detection will be. |
| * |
| * @return the configured audit depth. |
| */ |
| int getAuditDepth() const; |
| |
| /** |
| * Set the audit depth for Messages for consumers when using a fault |
| * tolerant transport. The higher the value the more messages are checked |
| * for duplication, and the larger the performance impact of duplicate |
| * detection will be. |
| * |
| * @param auditDepth |
| * The configured audit depth. |
| */ |
| void setAuditDepth(int auditDepth); |
| |
| /** |
| * The number of Producers that will be audited. |
| * |
| * @return the configured number of producers to include in the audit. |
| */ |
| int getAuditMaximumProducerNumber() const; |
| |
| /** |
| * The number of Producers that will be audited. |
| * |
| * @param auditMaximumProducerNumber |
| * The configured number of producers to include in the audit. |
| */ |
| void setAuditMaximumProducerNumber(int auditMaximumProducerNumber); |
| |
| /** |
| * Gets the value of the configured Duplicate Message detection feature. |
| * |
| * When enabled and a fault tolerant transport is used (think failover) then |
| * this feature will help to detect and filter duplicate messages that might |
| * otherwise be delivered to a consumer after a connection failure. |
| * |
| * Disabling this can increase performance since no Message auditing will |
| * occur. |
| * |
| * @return the checkForDuplicates value currently set. |
| */ |
| bool isCheckForDuplicates() const; |
| |
| /** |
| * Gets the value of the configured Duplicate Message detection feature. |
| * |
| * When enabled and a fault tolerant transport is used (think failover) then |
| * this feature will help to detect and filter duplicate messages that might |
| * otherwise be delivered to a consumer after a connection failure. |
| * |
| * Disabling this can increase performance since no Message auditing will |
| * occur. |
| * |
| * @param checkForDuplicates |
| * The checkForDuplicates value to be configured. |
| */ |
| void setCheckForDuplicates(bool checkForDuplicates); |
| |
| /** |
| * when true, submit individual transacted acks immediately rather than with transaction |
| * completion. This allows the acks to represent delivery status which can be persisted on |
| * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery. |
| * |
| * @return true if this option is enabled. |
| */ |
| bool isTransactedIndividualAck() const; |
| |
| /** |
| * when true, submit individual transacted acks immediately rather than with transaction |
| * completion. This allows the acks to represent delivery status which can be persisted on |
| * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery. |
| * |
| * @param transactedIndividualAck |
| * The value to set. |
| */ |
| void setTransactedIndividualAck(bool transactedIndividualAck); |
| |
| /** |
| * Returns true if non-blocking redelivery of Messages is configured for Consumers |
| * that are rolled back or recovered. |
| * |
| * @return true if non-blocking redelivery is enabled. |
| */ |
| bool isNonBlockingRedelivery() const; |
| |
| /** |
| * When true a MessageConsumer will not stop Message delivery before re-delivering Messages |
| * from a rolled back transaction. This implies that message order will not be preserved and |
| * also will result in the TransactedIndividualAck option to be enabled. |
| * |
| * @param nonBlockingRedelivery |
| * The value to configure for non-blocking redelivery. |
| */ |
| void setNonBlockingRedelivery(bool nonBlockingRedelivery); |
| |
| /** |
| * Gets the delay period for a consumer redelivery. |
| * |
| * @return configured time delay in milliseconds. |
| */ |
| long long getConsumerFailoverRedeliveryWaitPeriod() const; |
| |
| /** |
| * Sets the delay period for a consumer redelivery. |
| * |
| * @param value |
| * The configured time delay in milliseconds. |
| */ |
| void setConsumerFailoverRedeliveryWaitPeriod(long long value); |
| |
| /** |
| * @return true if optimizeAcknowledge is enabled. |
| */ |
| bool isOptimizeAcknowledge() const; |
| |
| /** |
| * Sets if Consumers are configured to use Optimized Acknowledge by default. |
| * |
| * @param optimizeAcknowledge |
| * The optimizeAcknowledge mode to set. |
| */ |
| void setOptimizeAcknowledge(bool optimizeAcknowledge); |
| |
| /** |
| * Gets the time between optimized ack batches in milliseconds. |
| * |
| * @return time between optimized ack batches in Milliseconds. |
| */ |
| long long getOptimizeAcknowledgeTimeOut() const; |
| |
| /** |
| * The max time in milliseconds between optimized ack batches. |
| * |
| * @param optimizeAcknowledgeTimeOut |
| * The time in milliseconds for optimized ack batches. |
| */ |
| void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut); |
| |
| /** |
| * Gets the configured time interval that is used to force all MessageConsumers that have |
| * optimizedAcknowledge enabled to send an ack for any outstanding Message Acks. By default |
| * this value is set to zero meaning that the consumers will not do any background Message |
| * acknowledgment. |
| * |
| * @return the scheduledOptimizedAckInterval |
| */ |
| long long getOptimizedAckScheduledAckInterval() const; |
| |
| /** |
| * Sets the amount of time between scheduled sends of any outstanding Message Acks for |
| * consumers that have been configured with optimizeAcknowledge enabled. |
| * |
| * Time is given in Milliseconds. |
| * |
| * @param optimizedAckScheduledAckInterval |
| * The scheduledOptimizedAckInterval to use for new Consumers. |
| */ |
| void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval); |
| |
| /** |
| * Should all created consumers be retroactive. |
| * |
| * @return true if consumer will be created with the retroactive flag set. |
| */ |
| bool isUseRetroactiveConsumer() const; |
| |
| /** |
| * Sets whether or not retroactive consumers are enabled. Retroactive |
| * consumers allow non-durable topic subscribers to receive old messages |
| * that were published before the non-durable subscriber started. |
| * |
| * @param useRetroactiveConsumer |
| * The value of this configuration option. |
| */ |
| void setUseRetroactiveConsumer(bool useRetroactiveConsumer); |
| |
| /** |
| * Should all created consumers be exclusive. |
| * |
| * @return true if consumer will be created with the exclusive flag set. |
| */ |
| bool isExclusiveConsumer() const; |
| |
| /** |
| * Enables or disables whether or not queue consumers should be exclusive or |
| * not for example to preserve ordering when not using Message Groups. |
| * |
| * @param exclusiveConsumer |
| * The value of this configuration option. |
| */ |
| void setExclusiveConsumer(bool exclusiveConsumer); |
| |
| /** |
| * Returns whether Message acknowledgments are sent asynchronously meaning no |
| * response is required from the broker before the ack completes. |
| * |
| * @return the sendAcksAsync configured value. |
| */ |
| bool isSendAcksAsync() const; |
| |
| /** |
| * Sets whether Message acknowledgments are sent asynchronously meaning no |
| * response is required from the broker before the ack completes. |
| * |
| * @param sendAcksAsync |
| * The sendAcksAsync configuration value to set. |
| */ |
| void setSendAcksAsync(bool sendAcksAsync); |
| |
| /** |
| * @return Returns the alwaysSessionAsync configuration setting. |
| */ |
| bool isAlwaysSessionAsync() const; |
| |
| /** |
| * If this flag is not set then a separate thread is not used for dispatching messages |
| * for each Session in the Connection. However, a separate thread is always used if there |
| * is more than one session, or the session isn't in auto acknowledge or duplicates ok mode. |
| * By default this value is set to true and session dispatch happens asynchronously. |
| */ |
| void setAlwaysSessionAsync(bool alwaysSessionAsync); |
| |
| /** |
| * @return true if the consumer will skip checking messages for expiration. |
| */ |
| bool isConsumerExpiryCheckEnabled(); |
| |
| /** |
| * Configures whether this consumer will perform message expiration processing |
| * on all incoming messages. This feature is enabled by default. |
| * |
| * @param consumerExpiryCheckEnabled |
| * False if the default message expiration checks should be disabled. |
| */ |
| void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled); |
| |
| /** |
| * @return the current connection's OpenWire protocol version. |
| */ |
| int getProtocolVersion() const; |
| |
| public: // TransportListener |
| |
| /** |
| * Adds a transport listener so that a client can be notified of events in |
| * the underlying transport, client's are always notified after the event has |
| * been processed by the Connection class. Client's should ensure that the |
| * registered listener does not block or take a long amount of time to execute |
| * in order to not degrade performance of this Connection. |
| * |
| * @param transportListener |
| * The TransportListener instance to add to this Connection's set of listeners |
| * to notify of Transport events. |
| */ |
| void addTransportListener(transport::TransportListener* transportListener); |
| |
| /** |
| * Removes a registered TransportListener from the Connection's set of Transport |
| * listeners, this listener will no longer receive any Transport related events. The |
| * caller is responsible for freeing the listener in all cases. |
| * |
| * @param transportListener |
| * The pointer to the TransportListener to remove from the set of listeners. |
| */ |
| void removeTransportListener(transport::TransportListener* transportListener); |
| |
| /** |
| * Event handler for the receipt of a non-response command from the |
| * transport. |
| * @param command the received command object. |
| */ |
| virtual void onCommand(const Pointer<commands::Command> command); |
| |
| /** |
| * Event handler for an exception from a command transport. |
| * @param ex The exception. |
| */ |
| virtual void onException(const decaf::lang::Exception& ex); |
| |
| /** |
| * The transport has suffered an interruption from which it hopes to recover |
| */ |
| virtual void transportInterrupted(); |
| |
| /** |
| * The transport has resumed after an interruption |
| */ |
| virtual void transportResumed(); |
| |
| public: |
| |
| /** |
| * Gets the ConnectionInfo for this Object, if the Connection is not open |
| * than this method throws an exception. |
| * |
| * @throws ActiveMQException if an error occurs while performing this operation. |
| */ |
| const commands::ConnectionInfo& getConnectionInfo() const; |
| |
| /** |
| * Gets the ConnectionId for this Object, if the Connection is not open |
| * than this method throws an exception. |
| * |
| * @throws ActiveMQException if an error occurs while performing this operation. |
| */ |
| const commands::ConnectionId& getConnectionId() const; |
| |
| /** |
| * Gets a reference to this object's Transport instance. |
| * |
| * @return a reference to the Transport that is in use by this Connection. |
| */ |
| transport::Transport& getTransport() const; |
| |
| /** |
| * Gets a reference to the Connection objects built in Scheduler instance. |
| * |
| * @return a reference to a Scheduler instance owned by this Connection. |
| */ |
| Pointer<threads::Scheduler> getScheduler() const; |
| |
| /** |
| * Returns the Id of the Resource Manager that this client will use should |
| * it be entered into an XA Transaction. |
| * |
| * @return a string containing the resource manager Id for XA Transactions. |
| */ |
| std::string getResourceManagerId() const; |
| |
| /** |
| * Clean up this connection object, reseting it back to a state that mirrors |
| * what a newly created ActiveMQConnection object has. |
| */ |
| void cleanup(); |
| |
| /** |
| * Sends a message without request that the broker send a response to indicate that |
| * it was received. |
| * |
| * @param command |
| * The Command object to send to the Broker. |
| * |
| * @throws ActiveMQException if not currently connected, or if the operation |
| * fails for any reason. |
| */ |
| void oneway(Pointer<commands::Command> command); |
| |
| /** |
| * Sends a synchronous request and returns the response from the broker. This |
| * method converts any error responses it receives into an exception. |
| * |
| * @param command |
| * The Command object that is to be sent to the broker. |
| * @param timeout |
| * The time in milliseconds to wait for a response, default is zero or infinite. |
| * |
| * @return a Pointer instance to the Response object sent from the Broker. |
| * |
| * @throws BrokerException if the response from the broker is of type ExceptionResponse. |
| * @throws ActiveMQException if any other error occurs while sending the Command. |
| */ |
| Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0); |
| |
| /** |
| * Sends a synchronous request and returns the response from the broker. This |
| * method converts any error responses it receives into an exception. |
| * |
| * @param command |
| * The Command object that is to be sent to the broker. |
| * @param onComplete |
| * Completion callback that will be notified on send success or failure. |
| * |
| * @throws BrokerException if the response from the broker is of type ExceptionResponse. |
| * @throws ActiveMQException if any other error occurs while sending the Command. |
| */ |
| void asyncRequest(Pointer<commands::Command> command, cms::AsyncCallback* onComplete); |
| |
| /** |
| * Notify the exception listener |
| * @param ex the exception to fire |
| */ |
| virtual void fire(const exceptions::ActiveMQException& ex); |
| |
| /** |
| * Indicates that a Connection resource that is processing the transportInterrupted |
| * event has completed. |
| */ |
| void setTransportInterruptionProcessingComplete(); |
| |
| /** |
| * Sets the pointer to the first exception that caused the Connection to become failed. |
| * |
| * @param error |
| * pointer to the exception instance that is to be the first failure error if the |
| * first error is already set this value is deleted. |
| */ |
| void setFirstFailureError(decaf::lang::Exception* error); |
| |
| /** |
| * Gets the pointer to the first exception that caused the Connection to become failed. |
| * |
| * @return pointer to an Exception instance or NULL if none is set. |
| */ |
| decaf::lang::Exception* getFirstFailureError() const; |
| |
| /** |
| * Event handler for dealing with async exceptions. |
| * |
| * @param ex |
| * The exception that caused the error condition. |
| */ |
| void onAsyncException(const decaf::lang::Exception& ex); |
| |
| /** |
| * Handles async client internal exceptions which don't usually affect the connection |
| * itself. These are reported but do not shutdown the Connection. |
| * |
| * @param error the exception that the problem |
| */ |
| void onClientInternalException(const decaf::lang::Exception& ex); |
| |
| /** |
| * Check for Closed State and Throw an exception if true. |
| * |
| * @throws CMSException if the Connection is closed. |
| */ |
| void checkClosed() const; |
| |
| /** |
| * Check for Closed State and Failed State and Throw an exception if either is true. |
| * |
| * @throws CMSException if the Connection is closed or failed. |
| */ |
| void checkClosedOrFailed() const; |
| |
| /** |
| * If its not been sent, then send the ConnectionInfo to the Broker. |
| */ |
| void ensureConnectionInfoSent(); |
| |
| /** |
| * @return the ExecutorService used to run jobs for this Connection |
| */ |
| decaf::util::concurrent::ExecutorService* getExecutor() const; |
| |
| /** |
| * Adds the given Temporary Destination to this Connections collection of known |
| * Temporary Destinations. |
| * |
| * @param destination |
| * The temporary destination that this connection should track. |
| */ |
| void addTempDestination(Pointer<commands::ActiveMQTempDestination> destination); |
| |
| /** |
| * Removes the given Temporary Destination to this Connections collection of known |
| * Temporary Destinations. |
| * |
| * @param destination |
| * The temporary destination that this connection should stop tracking. |
| */ |
| void removeTempDestination(Pointer<commands::ActiveMQTempDestination> destination); |
| |
| /** |
| * Removes the given Temporary Destination to this Connections collection of known |
| * Temporary Destinations. |
| * |
| * @param destination |
| * The temporary destination that this connection should remove from the Broker. |
| * |
| * @throws CMSException if the temporary destination is in use by an active Session. |
| */ |
| void deleteTempDestination(Pointer<commands::ActiveMQTempDestination> destination); |
| |
| /** |
| * Removes any TempDestinations that this connection has cached, ignoring any exceptions |
| * generated because the destination is in use as they should not be removed. This method |
| * is useful for Connection pools that retain connection objects for long durations and |
| * want to periodically purge old temporary destination instances this connection is tracking. |
| */ |
| void cleanUpTempDestinations(); |
| |
| /** |
| * Determines whether the supplied Temporary Destination has already been deleted from the |
| * Broker. If watchTopicAdvisories is disabled this method will always return false. |
| * |
| * @return true if the temporary destination was deleted already. |
| */ |
| bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const; |
| |
| /** |
| * Returns an ArrayList that contains a copy of all Sessions that are |
| * currently active in the Connection |
| * |
| * @return an ArrayList of Sessions active in this connection. |
| */ |
| decaf::util::ArrayList< Pointer<activemq::core::kernels::ActiveMQSessionKernel> > getSessions() const; |
| |
| protected: |
| |
| /** |
| * @return the next available Session Id. |
| */ |
| virtual Pointer<commands::SessionId> getNextSessionId(); |
| |
| // Sends a oneway disconnect message to the broker. |
| void disconnect(long long lastDeliveredSequenceId); |
| |
| // Waits for all Consumers to handle the Transport Interrupted event. |
| void waitForTransportInterruptionProcessingToComplete(); |
| |
| // Marks processing complete for a single caller when interruption processing completes. |
| void signalInterruptionProcessingComplete(); |
| |
| // Allow subclasses to access the original Properties object for this connection. |
| const decaf::util::Properties& getProperties() const; |
| |
| // Process the WireFormatInfo command |
| void onWireFormatInfo(Pointer<commands::Command> command); |
| |
| // Process the ControlCommand command |
| void onControlCommand(Pointer<commands::Command> command); |
| |
| // Process the ConnectionControl command |
| void onConnectionControl(Pointer<commands::Command> command); |
| |
| // Process the ConsumerControl command |
| void onConsumerControl(Pointer<commands::Command> command); |
| |
| }; |
| |
| }} |
| |
| #endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_*/ |