http://issues.apache.org/activemq/browse/AMQCPP-98
Add copy of AckHandler to Stomp Commands Cloning.
git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/tags/activemq-cpp-2.0@527909 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h b/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
index cb92730..3980b0a 100644
--- a/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
+++ b/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
@@ -36,40 +36,40 @@
* Implements the interface for a cms::BytesMessage. Uses the template
* class StompMessage to implement all cms::Message type functionality
* and implements the BytesMessage interface here.
- */
+ */
class BytesMessageCommand : public StompMessage< cms::BytesMessage >
{
private:
-
+
/**
* Flag that indicates what state the stream is in. If true, the
* message may only be read from. If false, the message may only be
* written to.
*/
bool readOnly;
-
+
/**
* InputStream that wraps around the frame's body when in read-only
* mode.
*/
io::ByteArrayInputStream inputStream;
-
+
/**
* OutputStream that wraps around the frame's body when in write-only
* mode.
*/
io::ByteArrayOutputStream outputStream;
-
+
/**
* DataInputStream wrapper around the input stream.
*/
io::DataInputStream dataInputStream;
-
+
/**
* DataOutputStream wrapper around the output stream.
*/
io::DataOutputStream dataOutputStream;
-
+
public:
/**
@@ -83,12 +83,12 @@
initialize( getFrame() );
clearBody();
}
-
+
/**
* Constructor for initialization in read-only mode.
* @param frame The stomp frame that was received from the broker.
*/
- BytesMessageCommand( StompFrame* frame ) :
+ BytesMessageCommand( StompFrame* frame ) :
StompMessage< cms::BytesMessage >( frame ),
dataInputStream(&inputStream),
dataOutputStream(&outputStream)
@@ -96,24 +96,24 @@
validate( getFrame() );
reset();
}
-
- virtual ~BytesMessageCommand() {}
+
+ virtual ~BytesMessageCommand() {}
/**
* Clears out the body of the message. This does not clear the
* headers or properties.
*/
virtual void clearBody(){
-
+
// Invoke base class's version.
StompMessage<cms::BytesMessage>::clearBody();
-
+
// Set the stream in write only mode.
readOnly = false;
-
+
outputStream.setBuffer( getBytes() );
}
-
+
/**
* Marshals the command to a stomp frame.
* @returns the stomp frame representation of this
@@ -127,52 +127,55 @@
// Before we send out the frame tag it with the content length
// as this is a bytes message and we can't ensure we have only
// a trailing NULL.
- setPropertyValue(
- CommandConstants::toString(
+ setPropertyValue(
+ CommandConstants::toString(
CommandConstants::HEADER_CONTENTLENGTH),
util::Long::toString( getFrame().getBodyLength() ) );
-
+
return StompMessage<cms::BytesMessage>::marshal();
}
-
+
/**
- * Puts the message body in read-only mode and repositions the stream
+ * Puts the message body in read-only mode and repositions the stream
* of bytes to the beginning.
* @throws CMSException
*/
virtual void reset() throw ( cms::CMSException ){
readOnly = true;
- inputStream.setBuffer( getBytes() );
+ inputStream.setBuffer( getBytes() );
}
-
+
/**
- * Clonse this message exactly, returns a new instance that the
+ * Clone this message exactly, returns a new instance that the
* caller is required to delete.
* @return new copy of this message
*/
virtual cms::BytesMessage* clone() const {
- StompFrame* frame = getFrame().clone();
-
- return new BytesMessageCommand( frame );
- }
+
+ BytesMessageCommand* command =
+ new BytesMessageCommand( getFrame().clone() );
+ command->setAckHandler( this->getAckHandler() );
+
+ return command;
+ }
/**
- * sets the bytes given to the message body.
+ * sets the bytes given to the message body.
* @param buffer Byte Buffer to copy
* @param numBytes Number of bytes in Buffer to copy
* @throws CMSException
*/
- virtual void setBodyBytes( const unsigned char* buffer,
- std::size_t numBytes )
+ virtual void setBodyBytes( const unsigned char* buffer,
+ std::size_t numBytes )
throw( cms::CMSException ) {
-
- checkWriteOnly();
+
+ checkWriteOnly();
this->setBytes( buffer, numBytes );
}
-
+
/**
* Gets the bytes that are contained in this message, user should
- * copy this data into a user allocated buffer. Call
+ * copy this data into a user allocated buffer. Call
* <code>getBodyLength</code> to determine the number of bytes
* to expect.
* @return const pointer to a byte buffer
@@ -182,10 +185,10 @@
if( bytes.size() == 0 ){
return NULL;
}
-
+
return &this->getBytes()[0];
}
-
+
/**
* Returns the number of bytes contained in the body of this message.
* @return number of bytes.
@@ -193,7 +196,7 @@
virtual std::size_t getBodyLength() const {
return this->getNumBytes();
}
-
+
/**
* Reads a Boolean from the Bytes message stream
* @returns boolean value from stream
@@ -203,10 +206,10 @@
checkReadOnly();
return dataInputStream.readBoolean();
}
-
+
/**
- * Writes a boolean to the bytes message stream as a 1-byte value.
- * The value true is written as the value (byte)1; the value false
+ * Writes a boolean to the bytes message stream as a 1-byte value.
+ * The value true is written as the value (byte)1; the value false
* is written as the value (byte)0.
* @param value - boolean to write to the stream
* @throws CMSException
@@ -240,22 +243,22 @@
* Reads a byte array from the bytes message stream.
*
* If the length of vector value is less than the number of bytes
- * remaining to be read from the stream, the vector should be filled. A
+ * remaining to be read from the stream, the vector should be filled. A
* subsequent call reads the next increment, and so on.
*
- * If the number of bytes remaining in the stream is less than the
- * length of vector value, the bytes should be read into the vector. The
+ * If the number of bytes remaining in the stream is less than the
+ * length of vector value, the bytes should be read into the vector. The
* return value of the total number of bytes read will be less than the
- * length of the vector, indicating that there are no more bytes left to
+ * length of the vector, indicating that there are no more bytes left to
* be read from the stream. The next read of the stream returns -1.
- *
+ *
* @param value - buffer to place data in
- * @returns the total number of bytes read into the buffer, or -1 if
- * there is no more data because the end of the stream has
+ * @returns the total number of bytes read into the buffer, or -1 if
+ * there is no more data because the end of the stream has
* been reached
* @throws CMSException if an error occurs.
*/
- virtual std::size_t readBytes( std::vector<unsigned char>& value )
+ virtual std::size_t readBytes( std::vector<unsigned char>& value )
throw ( cms::CMSException ){
checkReadOnly();
return dataInputStream.read( value );
@@ -267,7 +270,7 @@
* @param value - bytes to write to the stream
* @throws CMSException
*/
- virtual void writeBytes( const std::vector<unsigned char>& value )
+ virtual void writeBytes( const std::vector<unsigned char>& value )
throw ( cms::CMSException ){
checkWriteOnly();
dataOutputStream.write( value );
@@ -275,30 +278,30 @@
/**
* Reads a portion of the bytes message stream.
- *
- * If the length of array value is less than the number of bytes
- * remaining to be read from the stream, the array should be filled. A
+ *
+ * If the length of array value is less than the number of bytes
+ * remaining to be read from the stream, the array should be filled. A
* subsequent call reads the next increment, and so on.
- *
- * If the number of bytes remaining in the stream is less than the
- * length of array value, the bytes should be read into the array. The
- * return value of the total number of bytes read will be less than the
- * length of the array, indicating that there are no more bytes left to
+ *
+ * If the number of bytes remaining in the stream is less than the
+ * length of array value, the bytes should be read into the array. The
+ * return value of the total number of bytes read will be less than the
+ * length of the array, indicating that there are no more bytes left to
* be read from the stream. The next read of the stream returns -1.
- *
- * If length is negative, or length is greater than the length of the
- * array value, then an IndexOutOfBoundsException is thrown. No bytes
+ *
+ * If length is negative, or length is greater than the length of the
+ * array value, then an IndexOutOfBoundsException is thrown. No bytes
* will be read from the stream for this exception case.
- *
+ *
* @param value - the buffer into which the data is read
- * @param length - the number of bytes to read; must be less than or
+ * @param length - the number of bytes to read; must be less than or
* equal to value.length
- * @returns the total number of bytes read into the buffer, or -1 if
- * there is no more data because the end of the stream has
+ * @returns the total number of bytes read into the buffer, or -1 if
+ * there is no more data because the end of the stream has
* been reached
* @throws CMSException
*/
- virtual std::size_t readBytes( unsigned char*& buffer, std::size_t length )
+ virtual std::size_t readBytes( unsigned char*& buffer, std::size_t length )
throw ( cms::CMSException )
{
checkReadOnly();
@@ -469,7 +472,7 @@
checkReadOnly();
return dataInputStream.readString();
}
-
+
/**
* Writes an ASCII String to the Bytes message stream
* @param value The string to be written to the stream.
@@ -489,7 +492,7 @@
checkReadOnly();
return dataInputStream.readUTF();
}
-
+
/**
* Writes an UTF String to the BytesMessage stream
* @param value The string to be written to the stream.
@@ -499,31 +502,31 @@
checkWriteOnly();
dataOutputStream.writeUTF( value );
}
-
+
protected:
-
+
/**
* Throws an exception if not in write-only mode.
* @throws CMSException.
*/
void checkWriteOnly() throw (cms::CMSException){
if( readOnly ){
- throw exceptions::IllegalStateException( __FILE__, __LINE__,
+ throw exceptions::IllegalStateException( __FILE__, __LINE__,
"message is in read-only mode and cannot be written to" );
}
}
-
+
/**
* Throws an exception if not in read-only mode.
* @throws CMSException
*/
void checkReadOnly() throw (cms::CMSException){
if( !readOnly ){
- throw exceptions::IllegalStateException( __FILE__, __LINE__,
+ throw exceptions::IllegalStateException( __FILE__, __LINE__,
"message is in write-only mode and cannot be read from" );
}
}
-
+
};
}}}}
diff --git a/src/main/activemq/connector/stomp/commands/CommandConstants.cpp b/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
index 53a37df..f1346f3 100644
--- a/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
+++ b/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
@@ -66,7 +66,6 @@
stompHeaders[HEADER_CONSUMERPRIORITY] = "activemq.priority";
stompHeaders[HEADER_REPLYTO] = "reply-to";
stompHeaders[HEADER_TYPE] = "type";
- stompHeaders[HEADER_AMQMSGTYPE] = "amq-msg-type";
stompHeaders[HEADER_SELECTOR] = "selector";
stompHeaders[HEADER_DISPATCH_ASYNC] = "activemq.dispatchAsync";
stompHeaders[HEADER_EXCLUSIVE] = "activemq.exclusive";
diff --git a/src/main/activemq/connector/stomp/commands/CommandConstants.h b/src/main/activemq/connector/stomp/commands/CommandConstants.h
index f0c63b8..11b7733 100644
--- a/src/main/activemq/connector/stomp/commands/CommandConstants.h
+++ b/src/main/activemq/connector/stomp/commands/CommandConstants.h
@@ -68,7 +68,6 @@
HEADER_PERSISTENT,
HEADER_REPLYTO,
HEADER_TYPE,
- HEADER_AMQMSGTYPE,
HEADER_DISPATCH_ASYNC,
HEADER_EXCLUSIVE,
HEADER_MAXPENDINGMSGLIMIT,
diff --git a/src/main/activemq/connector/stomp/commands/MessageCommand.h b/src/main/activemq/connector/stomp/commands/MessageCommand.h
index 65f7889..4cb7023 100644
--- a/src/main/activemq/connector/stomp/commands/MessageCommand.h
+++ b/src/main/activemq/connector/stomp/commands/MessageCommand.h
@@ -26,7 +26,7 @@
namespace connector{
namespace stomp{
namespace commands{
-
+
/**
* Message command which represents a ActiveMQMessage with no body
* can be sent or recieved.
@@ -35,29 +35,32 @@
{
public:
- MessageCommand(void) :
+ MessageCommand() :
StompMessage< cms::Message >() {
initialize( getFrame() );
}
- MessageCommand( StompFrame* frame ) :
+ MessageCommand( StompFrame* frame ) :
StompMessage< cms::Message >( frame ) {
validate( getFrame() );
}
- virtual ~MessageCommand(void) {}
+ virtual ~MessageCommand() {}
/**
- * Clonse this message exactly, returns a new instance that the
+ * Clone this message exactly, returns a new instance that the
* caller is required to delete.
* @return new copy of this message
*/
- virtual cms::Message* clone(void) const {
- StompFrame* frame = getFrame().clone();
-
- return new MessageCommand( frame );
+ virtual cms::Message* clone() const {
+
+ MessageCommand* command =
+ new MessageCommand( getFrame().clone() );
+ command->setAckHandler( this->getAckHandler() );
+
+ return command;
}
};
-
+
}}}}
#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_MESSAGECOMMAND_H_*/
diff --git a/src/main/activemq/connector/stomp/commands/StompMessage.h b/src/main/activemq/connector/stomp/commands/StompMessage.h
index 253445d..2fdd84a 100644
--- a/src/main/activemq/connector/stomp/commands/StompMessage.h
+++ b/src/main/activemq/connector/stomp/commands/StompMessage.h
@@ -95,7 +95,7 @@
}
}
- virtual ~StompMessage() {
+ virtual ~StompMessage() {
if( dest != NULL ){
delete dest;
@@ -507,6 +507,15 @@
public: // ActiveMQMessage
/**
+ * Gets the Acknowledgement Handler that this Message will use
+ * when the Acknowledge method is called.
+ * @returns handler ActiveMQAckHandler
+ */
+ virtual core::ActiveMQAckHandler* getAckHandler() const {
+ return this->ackHandler;
+ }
+
+ /**
* Sets the Acknowledgement Handler that this Message will use
* when the Acknowledge method is called.
* @param handler ActiveMQAckHandler
@@ -577,7 +586,7 @@
testProperty( name );
if( !getProperties().hasProperty( name ) ){
- throw exceptions::NoSuchElementException(
+ throw exceptions::NoSuchElementException(
__FILE__, __LINE__,
"property not available in message" );
}
@@ -588,7 +597,7 @@
stream >> value;
if( stream.fail() ){
- throw exceptions::RuntimeException(
+ throw exceptions::RuntimeException(
__FILE__, __LINE__,
"Error extracting property from string" );
}
diff --git a/src/main/activemq/connector/stomp/commands/TextMessageCommand.h b/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
index 893d15d..1889dd9 100644
--- a/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
+++ b/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
@@ -36,13 +36,13 @@
StompMessage< cms::TextMessage >() {
initialize( getFrame() );
}
-
- TextMessageCommand( StompFrame* frame ) :
+
+ TextMessageCommand( StompFrame* frame ) :
StompMessage< cms::TextMessage >( frame ) {
validate( getFrame() );
}
-
- virtual ~TextMessageCommand() {}
+
+ virtual ~TextMessageCommand() {}
/**
* Clonse this message exactly, returns a new instance that the
@@ -50,25 +50,28 @@
* @return new copy of this message
*/
virtual cms::TextMessage* clone() const {
- StompFrame* frame = getFrame().clone();
-
- return new TextMessageCommand( frame );
- }
+
+ TextMessageCommand* command =
+ new TextMessageCommand( getFrame().clone() );
+ command->setAckHandler( this->getAckHandler() );
+
+ return command;
+ }
/**
* Gets the message character buffer.
* @return The message character buffer.
*/
virtual std::string getText() const throw( cms::CMSException ) {
-
+
const std::vector<unsigned char>& bytes = getBytes();
if( bytes.size() == 0 ){
return "";
}
-
+
return std::string( (char*)&bytes[0] );
}
-
+
/**
* Sets the message contents.
* @param msg The message buffer.