blob: fb3c27d6dfd555c58ca3620701c13d37237f5775 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_SUBSCRIBECOMMAND_H_
#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_SUBSCRIBECOMMAND_H_
#include <activemq/connector/stomp/commands/AbstractCommand.h>
#include <activemq/connector/stomp/commands/CommandConstants.h>
#include <activemq/transport/Command.h>
#include <activemq/util/Boolean.h>
namespace activemq{
namespace connector{
namespace stomp{
namespace commands{
/**
* Command sent to the broker to subscribe to a topic
* or queue.
*/
class SubscribeCommand : public AbstractCommand< transport::Command >
{
public:
SubscribeCommand() :
AbstractCommand<transport::Command>() {
initialize( getFrame() );
}
SubscribeCommand( StompFrame* frame ) :
AbstractCommand< transport::Command >( frame ) {
validate( getFrame() );
}
virtual ~SubscribeCommand(void) {}
/**
* Clone the StompCommand and return the new copy.
* @returns new copy of this command caller owns it.
*/
virtual StompCommand* cloneStompCommand() const {
return new SubscribeCommand( getFrame().clone() );
}
/**
* Get the destination
* @returns the destination Name String
*/
virtual std::string getDestination() const{
return getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_DESTINATION ), "" );
}
/**
* Set the destination
* @param destination the destination Name String
*/
virtual void setDestination( const std::string& destination ){
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_DESTINATION ),
destination );
}
/**
* Set the Ack Mode of this Subscription
* @param mode Ack mode setting.
*/
virtual void setAckMode( const CommandConstants::AckMode mode ){
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_ACK ),
CommandConstants::toString( mode ) );
}
/**
* Get the Ack Mode of this Subscription
* @return mode setting.
*/
virtual CommandConstants::AckMode getAckMode() const{
return CommandConstants::toAckMode(
getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_ACK ) ) );
}
/**
* Sets the Message Selector that is associated with this
* subscribe request
* @param selector Destination selector string
*/
virtual void setMessageSelector( const std::string& selector ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_SELECTOR ),
selector );
}
/**
* Gets the Message Selector that is associated with this
* subscribe request
* @returns the selector string
*/
virtual std::string getMessageSelector() const{
return getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_SELECTOR ), "" );
}
/**
* Sets the Subscription Name that is associated with this
* subscribe request
* @param name Subscription Name
*/
virtual void setSubscriptionName( const std::string& name ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_SUBSCRIPTIONNAME ),
name );
}
/**
* Gets the Subscription Name that is associated with this
* subscribe request
* @returns the Subscription Name
*/
virtual std::string getSubscriptionName() const{
return getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_SUBSCRIPTIONNAME ), "" );
}
/**
* Gets whether or not locally sent messages should be ignored for
* subscriptions. Set to true to filter out locally sent messages
* @return NoLocal value
*/
virtual bool getNoLocal(void) const {
return util::Boolean::parseBoolean( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_NOLOCAL ),
"false" ) );
}
/**
* Sets whether or not locally sent messages should be ignored for
* subscriptions. Set to true to filter out locally sent messages
* @param noLocal No Local delivery value
*/
virtual void setNoLocal( bool noLocal ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_NOLOCAL ),
util::Boolean::toString( noLocal ) );
}
/**
* Sets whether or not the broker is to dispatch messages in an
* asynchronous manner. Set to true if you want Async.
* @return true if in dispatch async mode
*/
virtual bool getDispatchAsync() const {
return util::Boolean::parseBoolean( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_DISPATCH_ASYNC ),
"false" ) );
}
/**
* Sets whether or not the broker is to dispatch messages in an
* asynchronous manner. Set to true if you want Async.
* @param dispatchAsync true for async dispatch mode
*/
virtual void setDispatchAsync( bool dispatchAsync ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_DISPATCH_ASYNC ),
util::Boolean::toString( dispatchAsync ) );
}
/**
* Gets whether or not this consumer is an exclusive consumer for
* this destination.
* @return true for exclusive mode
*/
virtual bool getExclusive() const {
return util::Boolean::parseBoolean( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_EXCLUSIVE ),
"false" ) );
}
/**
* Sets whether or not this consumer is an exclusive consumer for
* this destination.
* @param exclusive true if in exclusive mode
*/
virtual void setExclusive( bool exclusive ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_EXCLUSIVE ),
util::Boolean::toString( exclusive ) );
}
/**
* Get the max number of pending messages on a destination
* For Slow Consumer Handlingon non-durable topics by dropping old
* messages - we can set a maximum pending limit which once a slow
* consumer backs up to this high water mark we begin to discard
* old messages
* @return Max value
*/
virtual int getMaxPendingMsgLimit() const {
return util::Integer::parseInt( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_MAXPENDINGMSGLIMIT ),
"0" ) );
}
/**
* Set the max number of pending messages on a destination
* For Slow Consumer Handlingon non-durable topics by dropping old
* messages - we can set a maximum pending limit which once a slow
* consumer backs up to this high water mark we begin to discard
* old messages
* @param limit Max Pending value
*/
virtual void setMaxPendingMsgLimit( int limit ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_MAXPENDINGMSGLIMIT ),
util::Integer::toString( limit ) );
}
/**
* Get the maximum number of pending messages that will be
* dispatched to the client. Once this maximum is reached no more
* messages are dispatched until the client acknowledges a message.
* Set to 1 for very fair distribution of messages across consumers
* where processing messages can be slow
* @return prefetch size value
*/
virtual int getPrefetchSize() const {
return util::Integer::parseInt( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_PREFETCHSIZE ),
"1000" ) );
}
/**
* Set the maximum number of pending messages that will be
* dispatched to the client. Once this maximum is reached no more
* messages are dispatched until the client acknowledges a message.
* Set to 1 for very fair distribution of messages across consumers
* where processing messages can be slow
* @param size prefetch size value
*/
virtual void setPrefetchSize( int size ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_PREFETCHSIZE ),
util::Integer::toString( size ) );
}
/**
* Gets the priority of the consumer so that dispatching can be
* weighted in priority order
* @return priority level
*/
virtual int getPriority() const {
return util::Integer::parseInt( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_CONSUMERPRIORITY ),
"0" ) );
}
/**
* Sets the priority of the consumer so that dispatching can be
* weighted in priority order
* @param priority message prioirty level
*/
virtual void setPriority( int priority ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_CONSUMERPRIORITY ),
util::Integer::toString( priority ) );
}
/**
* Get For non-durable topics if this subscription is set to be
* retroactive
* @return true for retroactive mode
*/
virtual bool getRetroactive() const {
return util::Boolean::parseBoolean( getPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_RETROACTIVE ),
"false" ) );
}
/**
* Set For non-durable topics if this subscription is set to be
* retroactive
* @param retroactive true if in retroactive mode
*/
virtual void setRetroactive( bool retroactive ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_RETROACTIVE ),
util::Boolean::toString( retroactive ) );
}
protected:
/**
* Inheritors are required to override this method to init the
* frame with data appropriate for the command type.
* @param frame Frame to init
*/
virtual void initialize( StompFrame& frame )
{
frame.setCommand( CommandConstants::toString(
CommandConstants::SUBSCRIBE ) );
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_ACK ),
CommandConstants::toString(
CommandConstants::ACK_AUTO ) );
}
/**
* Inheritors are required to override this method to validate
* the passed stomp frame before it is marshalled or unmarshaled
* @param frame Frame to validate
* @returns frame true if frame is valid
*/
virtual bool validate( const StompFrame& frame ) const
{
if((frame.getCommand() ==
CommandConstants::toString( CommandConstants::SUBSCRIBE )) &&
(frame.getProperties().hasProperty(
CommandConstants::toString(
CommandConstants::HEADER_DESTINATION ) ) ) )
{
return true;
}
return false;
}
};
}}}}
#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_SUBSCRIBECOMMAND_H_*/