blob: 5fbc0bc6a121a7539fbeed4e04ccdd82962c060f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StompCommandReader.h"
#include <activemq/connector/stomp/commands/CommandConstants.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/util/Character.h>
using namespace std;
using namespace activemq;
using namespace activemq::concurrent;
using namespace activemq::connector;
using namespace activemq::connector::stomp;
using namespace activemq::transport;
using namespace activemq::io;
using namespace activemq::exceptions;
using namespace activemq::util;
////////////////////////////////////////////////////////////////////////////////
StompCommandReader::StompCommandReader(void)
{
inputStream = NULL;
}
////////////////////////////////////////////////////////////////////////////////
StompCommandReader::StompCommandReader( InputStream* is )
{
inputStream = is;
}
////////////////////////////////////////////////////////////////////////////////
Command* StompCommandReader::readCommand()
throw ( CommandIOException )
{
StompFrame* frame = NULL;
try
{
// Create a new Frame for reading to.
frame = new StompFrame();
// Read the command header.
readStompCommandHeader( *frame );
// Read the headers.
readStompHeaders( *frame );
// Read the body.
readStompBody( *frame );
// Return the Command, caller must delete it.
return marshaler.marshal( frame );
}
catch( CommandIOException& ex ){
if( frame != NULL ){
delete frame;
}
ex.setMark( __FILE__, __LINE__);
throw ex;
}
catch( ActiveMQException& ex ){
if( frame != NULL ){
delete frame;
}
ex.setMark( __FILE__, __LINE__ );
throw CommandIOException(ex);
}
catch( ... ){
if( frame != NULL ){
delete frame;
}
throw CommandIOException(__FILE__, __LINE__, "caught unknown exception");
}
}
////////////////////////////////////////////////////////////////////////////////
void StompCommandReader::readStompCommandHeader( StompFrame& frame )
throw ( CommandIOException )
{
try{
while( true )
{
// The command header is formatted
// just like any other stomp header.
readStompHeaderLine();
// Ignore all white space before the command.
long long offset = -1;
for( size_t ix = 0; ix < buffer.size()-1; ++ix )
{
// Find the first non whitespace character
if( !Character::isWhitespace(buffer[ix]) ){
offset = (long long)ix;
break;
}
}
if( offset >= 0 )
{
// Set the command in the frame - copy the memory.
frame.setCommand( reinterpret_cast<char*>(&buffer[(size_t)offset]) );
break;
}
}
}
AMQ_CATCH_RETHROW( CommandIOException )
AMQ_CATCH_EXCEPTION_CONVERT( IOException, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
////////////////////////////////////////////////////////////////////////////////
void StompCommandReader::readStompHeaders( StompFrame& frame )
throw ( CommandIOException )
{
try{
// Read the command;
bool endOfHeaders = false;
while( !endOfHeaders )
{
// Read in the next header line.
std::size_t numChars = readStompHeaderLine();
if( numChars == 0 )
{
// should never get here
throw StompConnectorException(
__FILE__, __LINE__,
"StompCommandReader::readStompHeaders: no characters read" );
}
// Check for an empty line to demark the end of the header section.
// if its not the end then we have a header to process, so parse it.
if( numChars == 1 && buffer[0] == '\0' )
{
endOfHeaders = true;
}
else
{
// Search through this line to separate the key/value pair.
for( size_t ix = 0; ix < buffer.size(); ++ix )
{
// If found the key/value separator...
if( buffer[ix] == ':' )
{
// Null-terminate the key.
buffer[ix] = '\0';
const char* key = reinterpret_cast<char*>(&buffer[0]);
const char* value = reinterpret_cast<char*>(&buffer[ix+1]);
// Assign the header key/value pair.
frame.getProperties().setProperty(key, value);
// Break out of the for loop.
break;
}
}
}
}
}
AMQ_CATCH_RETHROW( CommandIOException )
AMQ_CATCH_EXCEPTION_CONVERT( IOException, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
////////////////////////////////////////////////////////////////////////////////
std::size_t StompCommandReader::readStompHeaderLine()
throw ( CommandIOException )
{
try{
// Clear any data from the buffer.
buffer.clear();
std::size_t count = 0;
while( true )
{
// Read the next char from the stream.
buffer.push_back( inputStream->read() );
// Increment the position pointer.
count++;
// If we reached the line terminator, return the total number
// of characters read.
if( buffer[count-1] == '\n' )
{
// Overwrite the line feed with a null character.
buffer[count-1] = '\0';
return count;
}
}
// If we get here something bad must have happened.
throw StompConnectorException(
__FILE__, __LINE__,
"StompCommandReader::readStompHeaderLine: "
"Unrecoverable, error condition");
}
AMQ_CATCH_EXCEPTION_CONVERT( IOException, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
////////////////////////////////////////////////////////////////////////////////
void StompCommandReader::readStompBody( StompFrame& frame )
throw ( CommandIOException )
{
try{
// Clear any data from the buffer.
buffer.clear();
unsigned long content_length = 0;
if(frame.getProperties().hasProperty(
commands::CommandConstants::toString(
commands::CommandConstants::HEADER_CONTENTLENGTH)))
{
char* stopped_string = NULL;
string length =
frame.getProperties().getProperty(
commands::CommandConstants::toString(
commands::CommandConstants::HEADER_CONTENTLENGTH));
content_length = strtoul(
length.c_str(),
&stopped_string,
10 );
}
if( content_length != 0 )
{
// For this case its assumed that content length indicates how
// much to read. We reserve space in the buffer for it to
// minimize the number of reallocs that might occur. We are
// assuming that content length doesn't count the trailing null
// that indicates the end of frame. The reserve won't do anything
// if the buffer already has that much capacity. The resize call
// basically sets the end iterator to the correct location since
// this is a char vector and we already reserve enough space.
// Resize doesn't realloc the vector smaller if content_length
// is less than capacity of the buffer, it just move the end
// iterator. Reserve adds the benefit that the mem is set to
// zero. Over time as larger messages come in thsi will cause
// us to adapt to that size so that future messages that are
// around that size won't alloc any new memory.
buffer.reserve( content_length );
buffer.resize( content_length );
// Read the Content Length now
read( &buffer[0], content_length );
// Content Length read, now pop the end terminator off (\0\n).
if(inputStream->read() != '\0' )
{
throw StompConnectorException(
__FILE__, __LINE__,
"StompCommandReader::readStompBody: "
"Read Content Length, and no trailing null");
}
}
else
{
// Content length was either zero, or not set, so we read until the
// first null is encounted.
while( true )
{
char byte = inputStream->read();
buffer.push_back(byte);
content_length++;
if( byte != '\0' )
{
continue;
}
break; // Read null and newline we are done.
}
}
if( content_length != 0 )
{
// Set the body contents in the frame - copy the memory
frame.getBody() = buffer;
}
}
AMQ_CATCH_EXCEPTION_CONVERT( IOException, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
////////////////////////////////////////////////////////////////////////////////
std::size_t StompCommandReader::read( unsigned char* buffer, std::size_t count )
throw( io::IOException )
{
try{
if( inputStream == NULL )
{
throw IOException(
__FILE__, __LINE__,
"StompCommandReader::read(char*,int) - input stream is NULL" );
}
// Just delegate to the input stream.
return inputStream->read( buffer, count );
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
unsigned char StompCommandReader::readByte() throw( io::IOException )
{
try{
if( inputStream == NULL )
{
throw IOException(
__FILE__, __LINE__,
"StompCommandReader::read(char*,int) - input stream is NULL" );
}
return inputStream->read();
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCHALL_THROW( IOException )
}