blob: ff028628f571ec98305a742abcfa566fd1c1dcd3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "OpenWireFormat.h"
#include <activemq/util/Boolean.h>
#include <activemq/util/Integer.h>
#include <activemq/util/Long.h>
#include <activemq/util/Guid.h>
#include <activemq/util/Math.h>
#include <activemq/io/ByteArrayOutputStream.h>
#include <activemq/connector/openwire/utils/BooleanStream.h>
#include <activemq/connector/openwire/commands/WireFormatInfo.h>
#include <activemq/connector/openwire/commands/DataStructure.h>
#include <activemq/connector/openwire/marshal/MarshalAware.h>
#include <activemq/connector/openwire/marshal/DataStreamMarshaller.h>
#include <activemq/connector/openwire/marshal/v2/MarshallerFactory.h>
#include <activemq/connector/openwire/marshal/v1/MarshallerFactory.h>
using namespace std;
using namespace activemq;
using namespace activemq::wireformat;
using namespace activemq::io;
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::transport;
using namespace activemq::exceptions;
using namespace activemq::connector::openwire;
using namespace activemq::connector::openwire::commands;
using namespace activemq::connector::openwire::marshal;
using namespace activemq::connector::openwire::utils;
////////////////////////////////////////////////////////////////////////////////
const unsigned char OpenWireFormat::NULL_TYPE = 0;
////////////////////////////////////////////////////////////////////////////////
OpenWireFormat::OpenWireFormat( const activemq::util::Properties& properties ) {
// Copy config data
this->properties.copy( &properties );
this->preferedWireFormatInfo = NULL;
// Fill in that DataStreamMarshallers collection
dataMarshallers.resize( 256 );
// Generate an ID
this->id = Guid::createGUIDString();
// Set defaults for initial WireFormat negotiation
this->version = 0;
this->stackTraceEnabled = false;
this->cacheEnabled = false;
this->tcpNoDelayEnabled = false;
this->tightEncodingEnabled = false;
this->sizePrefixDisabled = false;
// Set to Default as lowest common denominator, then we will try
// and move up to the prefered when the wireformat is negotiated.
this->setVersion( DEFAULT_VERSION );
}
////////////////////////////////////////////////////////////////////////////////
OpenWireFormat::~OpenWireFormat()
{
try {
this->destroyMarshalers();
delete preferedWireFormatInfo;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::destroyMarshalers() {
try {
for( size_t i = 0; i < dataMarshallers.size(); ++i ) {
delete dataMarshallers[i];
dataMarshallers[i] = NULL;
}
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::setVersion( int version ) throw ( IllegalArgumentException ) {
if( version == this->getVersion() ){
return;
}
// Clear old marshalers in preperation for the new set.
this->destroyMarshalers();
this->version = version;
switch( this->version ){
case 1:
v1::MarshallerFactory().configure( this );
break;
case 2:
v2::MarshallerFactory().configure( this );
break;
default:
throw IllegalArgumentException(
__FILE__, __LINE__,
"OpenWireFormat::setVersion - "
"Given Version: %d , is not supported", version );
}
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::addMarshaller( DataStreamMarshaller* marshaller )
{
unsigned char type = marshaller->getDataStructureType();
dataMarshallers[type & 0xFF] = marshaller;
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::setPreferedWireFormatInfo(
commands::WireFormatInfo* info ) throw ( IllegalStateException ) {
delete preferedWireFormatInfo;
this->preferedWireFormatInfo = info;
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::marshal( transport::Command* command,
io::DataOutputStream* dataOut )
throw ( io::IOException ) {
try {
int size = 1;
if( command != NULL ) {
DataStructure* dataStructure =
dynamic_cast< DataStructure* >( command );
unsigned char type = dataStructure->getDataStructureType();
DataStreamMarshaller* dsm = dataMarshallers[type & 0xFF];
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( type ) ).c_str() );
}
if( tightEncodingEnabled ) {
BooleanStream bs;
size += dsm->tightMarshal1( this, dataStructure, &bs );
size += bs.marshalledSize();
if( !sizePrefixDisabled ) {
dataOut->writeInt( size );
}
dataOut->writeByte( type );
bs.marshal( dataOut );
dsm->tightMarshal2( this, dataStructure, dataOut, &bs );
} else {
DataOutputStream* looseOut = dataOut;
ByteArrayOutputStream* baos = NULL;
if( !sizePrefixDisabled ) {
baos = new ByteArrayOutputStream();
looseOut = new DataOutputStream( baos );
}
looseOut->writeByte( type );
dsm->looseMarshal( this, dataStructure, looseOut );
if( !sizePrefixDisabled ) {
looseOut->close();
dataOut->writeInt( (int)baos->getByteArraySize() );
dataOut->write( baos->getByteArray(),
baos->getByteArraySize() );
// Delete allocated resource
delete baos;
delete looseOut;
}
}
} else {
dataOut->writeInt( size );
dataOut->writeByte( NULL_TYPE );
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
transport::Command* OpenWireFormat::unmarshal( io::DataInputStream* dis )
throw ( io::IOException ) {
try {
if( !sizePrefixDisabled ) {
dis->readInt();
}
// Get the unmarshalled DataStructure
DataStructure* data = doUnmarshal( dis );
if( data == NULL ) {
throw IOException(
__FILE__, __LINE__,
"OpenWireFormat::doUnmarshal - "
"Failed to unmarshal an Object" );
}
// Now all unmarshals from this level should result in an object
// that is a transport::Command type, if its not then we throw an
// exception.
transport::Command* command =
dynamic_cast< transport::Command* >( data );
if( command == NULL ) {
delete data;
throw IOException(
__FILE__, __LINE__,
"OpenWireFormat::doUnmarshal - "
"Unmarshalled a non Command Type" );
}
return command;
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
commands::DataStructure* OpenWireFormat::doUnmarshal( DataInputStream* dis )
throw ( IOException ) {
try {
unsigned char dataType = dis->readByte();
if( dataType != NULL_TYPE ) {
DataStreamMarshaller* dsm =
dynamic_cast< DataStreamMarshaller* >(
dataMarshallers[dataType & 0xFF] );
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( dataType ) ).c_str() );
}
// Ask the DataStreamMarshaller to create a new instance of its
// command so that we can fill in its data.
DataStructure* data = dsm->createObject();
if( this->tightEncodingEnabled ) {
BooleanStream bs;
bs.unmarshal( dis );
dsm->tightUnmarshal( this, data, dis, &bs );
} else {
dsm->looseUnmarshal( this, data, dis );
}
return data;
}
return NULL;
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
int OpenWireFormat::tightMarshalNestedObject1( commands::DataStructure* object,
utils::BooleanStream* bs )
throw ( io::IOException ) {
try {
bs->writeBoolean( object != NULL );
if( object == NULL ) {
return 0;
}
if( object->isMarshalAware() ) {
std::vector<unsigned char> sequence =
object->getMarshaledForm(this);
bs->writeBoolean( !sequence.empty() );
if( !sequence.empty() ) {
return (int)(1 + sequence.size());
}
}
unsigned char type = object->getDataStructureType();
if( type == 0 ) {
throw IOException(
__FILE__, __LINE__,
"No valid data structure type for object of this type");
}
DataStreamMarshaller* dsm = dataMarshallers[type & 0xFF];
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( type ) ).c_str() );
}
return 1 + dsm->tightMarshal1( this, object, bs );
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::tightMarshalNestedObject2( DataStructure* o,
DataOutputStream* ds,
BooleanStream* bs )
throw ( IOException ) {
try {
if( !bs->readBoolean() ) {
return;
}
unsigned char type = o->getDataStructureType();
ds->writeByte(type);
if( o->isMarshalAware() && bs->readBoolean() ) {
MarshalAware* ma = dynamic_cast< MarshalAware* >( o );
vector<unsigned char> sequence = ma->getMarshaledForm( this );
ds->write( &sequence[0], sequence.size() );
} else {
DataStreamMarshaller* dsm = dataMarshallers[type & 0xFF];
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( type ) ).c_str() );
}
dsm->tightMarshal2( this, o, ds, bs );
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
DataStructure* OpenWireFormat::tightUnmarshalNestedObject( DataInputStream* dis,
BooleanStream* bs )
throw ( io::IOException ) {
try {
if( bs->readBoolean() ) {
const unsigned char dataType = dis->readByte();
DataStreamMarshaller* dsm = dataMarshallers[dataType & 0xFF];
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( dataType ) ).c_str() );
}
DataStructure* data = dsm->createObject();
if( data->isMarshalAware() && bs->readBoolean() ) {
dis->readInt();
dis->readByte();
BooleanStream bs2;
bs2.unmarshal( dis );
dsm->tightUnmarshal( this, data, dis, &bs2 );
} else {
dsm->tightUnmarshal( this, data, dis, bs );
}
return data;
} else {
return NULL;
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
DataStructure* OpenWireFormat::looseUnmarshalNestedObject( io::DataInputStream* dis )
throw ( IOException ) {
try{
if( dis->readBoolean() ) {
unsigned char dataType = dis->readByte();
DataStreamMarshaller* dsm = dataMarshallers[dataType & 0xFF];
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( dataType ) ).c_str() );
}
DataStructure* data = dsm->createObject();
dsm->looseUnmarshal( this, data, dis );
return data;
} else {
return NULL;
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::looseMarshalNestedObject( commands::DataStructure* o,
io::DataOutputStream* dataOut )
throw ( io::IOException ) {
try{
dataOut->writeBoolean( o != NULL );
if( o != NULL ) {
unsigned char dataType = o->getDataStructureType();
dataOut->writeByte( dataType );
DataStreamMarshaller* dsm = dataMarshallers[dataType & 0xFF];
if( dsm == NULL ) {
throw IOException(
__FILE__, __LINE__,
( string( "OpenWireFormat::marshal - Unknown data type: " ) +
Integer::toString( dataType ) ).c_str() );
}
dsm->looseMarshal( this, o, dataOut );
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireFormat::renegotiateWireFormat( WireFormatInfo* info )
throw ( exceptions::IllegalStateException ) {
if( preferedWireFormatInfo == NULL ) {
throw IllegalStateException(
__FILE__, __LINE__,
"OpenWireFormat::renegotiateWireFormat - "
"Wireformat cannot not be renegotiated." );
}
this->setVersion( Math::min( preferedWireFormatInfo->getVersion(),
info->getVersion() ) );
this->stackTraceEnabled = info->isStackTraceEnabled() &&
preferedWireFormatInfo->isStackTraceEnabled();
this->tcpNoDelayEnabled = info->isTcpNoDelayEnabled() &&
preferedWireFormatInfo->isTcpNoDelayEnabled();
this->cacheEnabled = info->isCacheEnabled() &&
preferedWireFormatInfo->isCacheEnabled();
this->tightEncodingEnabled = info->isTightEncodingEnabled() &&
preferedWireFormatInfo->isTightEncodingEnabled();
this->sizePrefixDisabled = info->isSizePrefixDisabled() &&
preferedWireFormatInfo->isSizePrefixDisabled();
}