blob: 151245977a6c31437e642e196e93959ad70f2290 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <activemq/util/Config.h>
#include <activemq/commands/WireFormatInfo.h>
#include <activemq/commands/DataStructure.h>
#include <activemq/wireformat/WireFormat.h>
#include <activemq/wireformat/openwire/utils/BooleanStream.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/Properties.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <memory>
namespace activemq {
namespace wireformat {
namespace openwire {
namespace marshal {
class DataStreamMarshaller;
using decaf::lang::Pointer;
class AMQCPP_API OpenWireFormat : public wireformat::WireFormat {
// Declared here to make life easier.
static const unsigned char NULL_TYPE;
// V1 if the default version we start at.
static const int DEFAULT_VERSION;
// Defines the maximum supported openwire version
static const int MAX_SUPPORTED_VERSION;
// Configuration parameters
decaf::util::Properties properties;
// Preferred WireFormatInfo
Pointer<commands::WireFormatInfo> preferedWireFormatInfo;
// Marshalers
std::vector< marshal::DataStreamMarshaller* > dataMarshallers;
// Uniquely Generated ID, initialize in the Ctor
std::string id;
// Indicates when we are in the doUnmarshal call
decaf::util::concurrent::atomic::AtomicBoolean receiving;
// WireFormat Data
int version;
bool stackTraceEnabled;
bool tcpNoDelayEnabled;
bool cacheEnabled;
int cacheSize;
bool tightEncodingEnabled;
bool sizePrefixDisabled;
long long maxInactivityDuration;
long long maxInactivityDurationInitialDelay;
* Constructs a new OpenWireFormat object
* @param properties - can contain optional config params.
OpenWireFormat(const decaf::util::Properties& properties);
virtual ~OpenWireFormat();
* {@inheritDoc}
virtual bool hasNegotiator() const {
return true;
* {@inheritDoc}
virtual Pointer<transport::Transport> createNegotiator(const Pointer<transport::Transport> transport);
* Allows an external source to add marshalers to this object for
* types that may be marshaled or unmarshaled.
* @param marshaler - the Marshaler to add to the collection.
void addMarshaller(marshal::DataStreamMarshaller* marshaler);
* {@inheritDoc}
virtual void marshal(const Pointer<commands::Command> command, const activemq::transport::Transport* transport, decaf::io::DataOutputStream* out);
* {@inheritDoc}
virtual Pointer<commands::Command> unmarshal(const activemq::transport::Transport* transport, decaf::io::DataInputStream* in);
* Utility method for Tight Marshaling the given object to the boolean
* stream passed.
* @param object - The DataStructure to marshal
* @param bs - the BooleanStream to write to
* @return size of the data returned.
virtual int tightMarshalNestedObject1(commands::DataStructure* object, utils::BooleanStream* bs);
* Utility method that will Tight marshal some internally nested object
* that implements the DataStructure interface. Writes the data to the
* Data Output Stream provided.
* @param o - DataStructure object
* @param ds - DataOuputStream for writing
* @param bs - BooleanStream
* @throws IOException if an error occurs.
void tightMarshalNestedObject2(commands::DataStructure* o, decaf::io::DataOutputStream* ds, utils::BooleanStream* bs);
* Utility method used to Unmarshal a Nested DataStructure type object
* from the given DataInputStream. The DataStructure instance that is
* returned is now the property of the caller.
* @param dis - DataInputStream to read from
* @param bs - BooleanStream to read from
* @return Newly allocated DataStructure Object
* @throws IOException if an error occurs.
commands::DataStructure* tightUnmarshalNestedObject(decaf::io::DataInputStream* dis, utils::BooleanStream* bs);
* Utility method to unmarshal an DataStructure object from an
* DataInputStream using the Loose Unmarshaling format. Will read
* the Data and construct a new DataStructure based Object, the
* pointer to the Object returned is now owned by the caller.
* @param dis - the DataInputStream to read the data from
* @return a new DataStructure derived Object pointer
* @throws IOException if an error occurs.
commands::DataStructure* looseUnmarshalNestedObject(decaf::io::DataInputStream* dis);
* Utility method to loosely Marshal an object that is derived from the
* DataStrucutre interface. The marshaled data is written to the
* passed in DataOutputStream.
* @param o - DataStructure derived Object to Marshal
* @param dataOut - DataOutputStream to write the data to
* @throw IOException if an error occurs.
void looseMarshalNestedObject(commands::DataStructure* o, decaf::io::DataOutputStream* dataOut);
* Called to re-negotiate the settings for the WireFormatInfo, these
* determine how the client and broker communicate.
* @param info
* The new Wireformat Info settings.
* @throws IllegalStateException is wire format can't be negotiated.
void renegotiateWireFormat(const commands::WireFormatInfo& info);
* Configures this object using the provided WireformatInfo object
* @param info
* A WireFormatInfo object, takes ownership.
* @throws IllegalStateException if the WireFormat object has not been initialized.
void setPreferedWireFormatInfo(const Pointer<commands::WireFormatInfo> info);
* Gets the Preferred WireFormatInfo object that this class holds
* @return pointer to a preferred WireFormatInfo object
const Pointer<commands::WireFormatInfo>& getPreferedWireFormatInfo() const {
return this->preferedWireFormatInfo;
* Checks if the stackTraceEnabled flag is on
* @return true if the flag is on.
bool isStackTraceEnabled() const {
return stackTraceEnabled;
* Sets if the stackTraceEnabled flag is on
* @param stackTraceEnabled - true to turn flag is on
void setStackTraceEnabled(bool stackTraceEnabled) {
this->stackTraceEnabled = stackTraceEnabled;
* Checks if the tcpNoDelayEnabled flag is on
* @return true if the flag is on.
bool isTcpNoDelayEnabled() const {
return tcpNoDelayEnabled;
* Sets if the tcpNoDelayEnabled flag is on
* @param tcpNoDelayEnabled - true to turn flag is on
void setTcpNoDelayEnabled(bool tcpNoDelayEnabled) {
this->tcpNoDelayEnabled = tcpNoDelayEnabled;
* Get the current Wireformat Version
* @return int that identifies the version
int getVersion() const {
return version;
* Set the current Wireformat Version
* @param version
* An int that identifies the version
* @throws IllegalArgumentException if the version given is not supported.
void setVersion(int version);
* Is there a Message being unmarshaled?
* @return true while in the doUnmarshal method.
virtual bool inReceive() const {
return this->receiving.get();
* Checks if the cacheEnabled flag is on
* @return true if the flag is on.
bool isCacheEnabled() const {
return cacheEnabled;
* Sets if the cacheEnabled flag is on
* @param cacheEnabled - true to turn flag is on
void setCacheEnabled(bool cacheEnabled) {
this->cacheEnabled = cacheEnabled;
* Returns the currently set Cache size.
* @return the current value of the broker's cache size.
int getCacheSize() const {
return cacheSize;
* Sets the current Cache size.
* @param value - the value to send as the broker's cache size.
void setCacheSize(int value) {
this->cacheSize = value;
* Checks if the tightEncodingEnabled flag is on
* @return true if the flag is on.
bool isTightEncodingEnabled() const {
return tightEncodingEnabled;
* Sets if the tightEncodingEnabled flag is on
* @param tightEncodingEnabled - true to turn flag is on
void setTightEncodingEnabled(bool tightEncodingEnabled) {
this->tightEncodingEnabled = tightEncodingEnabled;
* Checks if the sizePrefixDisabled flag is on
* @return true if the flag is on.
bool isSizePrefixDisabled() const {
return sizePrefixDisabled;
* Sets if the sizePrefixDisabled flag is on
* @param sizePrefixDisabled - true to turn flag is on
void setSizePrefixDisabled(bool sizePrefixDisabled) {
this->sizePrefixDisabled = sizePrefixDisabled;
* Gets the MaxInactivityDuration setting.
* @return maximum inactivity duration value in milliseconds.
long long getMaxInactivityDuration() const {
return this->maxInactivityDuration;
* Sets the MaxInactivityDuration setting.
* @param value - the Max inactivity duration value in milliseconds.
void setMaxInactivityDuration(long long value) {
this->maxInactivityDuration = value;
* Gets the MaxInactivityDurationInitialDelay setting.
* @return maximum inactivity duration initial delay value in milliseconds.
long long getMaxInactivityDurationInitialDelay() const {
return this->maxInactivityDurationInitialDelay;
* Sets the MaxInactivityDurationInitialDelay setting.
* @param value - the Max inactivity Initial Delay duration value in milliseconds.
void setMaxInactivityDurationInitialDelay(long long value) {
this->maxInactivityDurationInitialDelay = value;
* Perform the actual unmarshal of data from the given DataInputStream
* return the unmarshalled DataStrucutre object once done, caller takes
* ownership of this object. This method can return null if the type
* of the object to unmarshal is NULL, empty data.
* @param dis
* The DataInputStream to read from.
* @return new DataStructure* that the caller owns.
* @throws IOException if an error occurs during the unmarshal.
commands::DataStructure* doUnmarshal(decaf::io::DataInputStream* dis);
* Cleans up all registered Marshallers and empties the dataMarshallers
* vector. This should be called before a reconfiguration of the version
* marshallers, or on destruction of this object
void destroyMarshalers();