| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| #include <activemq/connector/openwire/OpenWireConnector.h> | |
| #include <activemq/concurrent/Concurrent.h> | |
| #include <activemq/transport/Transport.h> | |
| #include <activemq/exceptions/UnsupportedOperationException.h> | |
| #include <activemq/util/Integer.h> | |
| #include <activemq/util/Boolean.h> | |
| #include <activemq/util/Long.h> | |
| #include <activemq/util/Guid.h> | |
| #include <activemq/connector/openwire/OpenWireConnectorException.h> | |
| #include <activemq/connector/openwire/OpenWireSessionInfo.h> | |
| #include <activemq/connector/openwire/OpenWireProducerInfo.h> | |
| #include <activemq/connector/openwire/OpenWireConsumerInfo.h> | |
| #include <activemq/connector/openwire/OpenWireTransactionInfo.h> | |
| #include <activemq/connector/openwire/BrokerException.h> | |
| #include <activemq/connector/openwire/OpenWireFormatFactory.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQMessage.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQBytesMessage.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQTextMessage.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQMapMessage.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQTopic.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQQueue.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQTempTopic.h> | |
| #include <activemq/connector/openwire/commands/ActiveMQTempQueue.h> | |
| #include <activemq/connector/openwire/commands/BrokerInfo.h> | |
| #include <activemq/connector/openwire/commands/BrokerError.h> | |
| #include <activemq/connector/openwire/commands/ConnectionId.h> | |
| #include <activemq/connector/openwire/commands/DestinationInfo.h> | |
| #include <activemq/connector/openwire/commands/ExceptionResponse.h> | |
| #include <activemq/connector/openwire/commands/Message.h> | |
| #include <activemq/connector/openwire/commands/MessageAck.h> | |
| #include <activemq/connector/openwire/commands/MessageDispatch.h> | |
| #include <activemq/connector/openwire/commands/RemoveInfo.h> | |
| #include <activemq/connector/openwire/commands/ShutdownInfo.h> | |
| #include <activemq/connector/openwire/commands/SessionInfo.h> | |
| #include <activemq/connector/openwire/commands/TransactionInfo.h> | |
| #include <activemq/connector/openwire/commands/LocalTransactionId.h> | |
| #include <activemq/connector/openwire/commands/WireFormatInfo.h> | |
| #include <activemq/connector/openwire/commands/RemoveSubscriptionInfo.h> | |
| using namespace std; | |
| using namespace activemq; | |
| using namespace activemq::connector; | |
| using namespace activemq::util; | |
| using namespace activemq::transport; | |
| using namespace activemq::exceptions; | |
| using namespace activemq::connector::openwire; | |
| //////////////////////////////////////////////////////////////////////////////// | |
| OpenWireConnector::OpenWireConnector( Transport* transport, | |
| const util::Properties& properties ) | |
| throw ( IllegalArgumentException ) | |
| { | |
| if( transport == NULL ) | |
| { | |
| throw IllegalArgumentException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::OpenWireConnector - Transport cannot be NULL"); | |
| } | |
| // Create our WireFormatFactory on the stack, only need it once. | |
| OpenWireFormatFactory wireFormatFactory; | |
| this->state = CONNECTION_STATE_DISCONNECTED; | |
| this->exceptionListener = NULL; | |
| this->messageListener = NULL; | |
| this->brokerInfo = NULL; | |
| this->brokerWireFormatInfo = NULL; | |
| this->properties.copy( &properties ); | |
| this->wireFormat = dynamic_cast<OpenWireFormat*>( | |
| wireFormatFactory.createWireFormat( properties ) ); | |
| this->transport = new OpenWireFormatNegotiator( wireFormat, transport, false ); | |
| // Observe the transport for events. | |
| this->transport->setCommandListener( this ); | |
| this->transport->setTransportExceptionListener( this ); | |
| // Setup the Reader and Writer with a Wire Format pointer. | |
| this->reader.setOpenWireFormat( wireFormat ); | |
| this->writer.setOpenWireFormat( wireFormat ); | |
| // Setup the reader and writer in the transport. | |
| this->transport->setCommandReader( &reader ); | |
| this->transport->setCommandWriter( &writer ); | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| OpenWireConnector::~OpenWireConnector() | |
| { | |
| try { | |
| close(); | |
| delete transport; | |
| delete wireFormat; | |
| delete brokerInfo; | |
| delete brokerWireFormatInfo; | |
| } | |
| AMQ_CATCH_NOTHROW( ActiveMQException ) | |
| AMQ_CATCHALL_NOTHROW( ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::enforceConnected() throw ( ConnectorException ) | |
| { | |
| if( state != CONNECTION_STATE_CONNECTED ) | |
| { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::enforceConnected - Not Connected!" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::start() throw( cms::CMSException ) | |
| { | |
| try | |
| { | |
| synchronized( &mutex ) | |
| { | |
| if( state == CONNECTION_STATE_CONNECTED ) | |
| { | |
| throw ActiveMQException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::start - already started" ); | |
| } | |
| // Start the transport - this establishes the socket. | |
| transport->start(); | |
| // Send the connect message to the broker. | |
| connect(); | |
| } | |
| } | |
| AMQ_CATCH_RETHROW( ActiveMQException ) | |
| AMQ_CATCHALL_THROW( ActiveMQException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::close() throw( cms::CMSException ){ | |
| try | |
| { | |
| if( state == CONNECTION_STATE_DISCONNECTED ){ | |
| return; | |
| } | |
| synchronized( &mutex ) { | |
| // Send the disconnect message to the broker. | |
| disconnect(); | |
| // Close the transport now that we've sent the last messages.. | |
| transport->close(); | |
| } | |
| } | |
| AMQ_CATCH_RETHROW( ActiveMQException ) | |
| AMQ_CATCHALL_THROW( ActiveMQException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::connect() throw (ConnectorException) | |
| { | |
| try | |
| { | |
| // Mark this connector as started. | |
| state = CONNECTION_STATE_CONNECTING; | |
| // Fill in our connection info. | |
| connectionInfo.setUserName( getUsername() ); | |
| connectionInfo.setPassword( getPassword() ); | |
| // Get or Create a Client Id | |
| string clientId = getClientId(); | |
| if( clientId.length() > 0 ){ | |
| connectionInfo.setClientId( clientId ); | |
| } else { | |
| connectionInfo.setClientId( Guid::createGUIDString() ); | |
| } | |
| // Generate a connectionId | |
| commands::ConnectionId* connectionId = new commands::ConnectionId(); | |
| connectionId->setValue( Guid::createGUIDString() ); | |
| connectionInfo.setConnectionId( connectionId ); | |
| // Now we ping the broker and see if we get an ack / nack | |
| Response* response = syncRequest( &connectionInfo ); | |
| // Tag us in the Connected State now. | |
| state = CONNECTION_STATE_CONNECTED; | |
| // Clean up the ack | |
| delete response; | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::disconnect() throw (ConnectorException) | |
| { | |
| try | |
| { | |
| // Mark state as no longer connected. | |
| state = CONNECTION_STATE_DISCONNECTED; | |
| // Remove our ConnectionId from the Broker | |
| disposeOf( connectionInfo.getConnectionId() ); | |
| // Send the disconnect command to the broker. | |
| commands::ShutdownInfo shutdown; | |
| oneway( &shutdown ); | |
| } catch( ConnectorException& ex ){ | |
| try{ transport->close(); } catch( ... ){} | |
| ex.setMark(__FILE__,__LINE__); | |
| throw ex; | |
| } catch( ... ) { | |
| try{ transport->close(); } catch( ... ){} | |
| throw OpenWireConnectorException(__FILE__, __LINE__, | |
| "Caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| connector::SessionInfo* OpenWireConnector::createSession( | |
| cms::Session::AcknowledgeMode ackMode ) | |
| throw( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| // Create and initialize a new SessionInfo object | |
| commands::SessionInfo* info = new commands::SessionInfo(); | |
| commands::SessionId* sessionId = new commands::SessionId(); | |
| sessionId->setConnectionId( connectionInfo.getConnectionId()->getValue() ); | |
| sessionId->setValue( sessionIds.getNextSequenceId() ); | |
| info->setSessionId( sessionId ); | |
| OpenWireSessionInfo* session = new OpenWireSessionInfo( this ); | |
| try{ | |
| // Send the subscription message to the broker. | |
| Response* response = syncRequest(info); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| // Create the Connector Session Wrapper Object and fill in its | |
| // data | |
| session->setSessionInfo( info ); | |
| session->setAckMode( ackMode ); | |
| // Return the session info. | |
| return session; | |
| } catch( ConnectorException& ex ) { | |
| // Something bad happened - free the session info object. | |
| delete info; | |
| delete session; | |
| ex.setMark(__FILE__, __LINE__); | |
| throw ex; | |
| } | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| ConsumerInfo* OpenWireConnector::createConsumer( | |
| const cms::Destination* destination, | |
| connector::SessionInfo* session, | |
| const std::string& selector, | |
| bool noLocal ) | |
| throw ( ConnectorException ) | |
| { | |
| OpenWireConsumerInfo* consumer = NULL; | |
| commands::ConsumerInfo* consumerInfo = NULL; | |
| try | |
| { | |
| enforceConnected(); | |
| consumer = new OpenWireConsumerInfo( this ); | |
| consumer->setSessionInfo( session ); | |
| consumerInfo = createConsumerInfo( destination, session ); | |
| consumer->setConsumerInfo( consumerInfo ); | |
| consumerInfo->setSelector( selector ); | |
| consumerInfo->setNoLocal( noLocal ); | |
| /** | |
| * Override default options with uri-encoded parameters. | |
| */ | |
| applyDestinationOptions( consumerInfo ); | |
| synchronized( &consumerInfoMap ) { | |
| // Optimistically place the Consumer into the Map. | |
| consumerInfoMap.setValue( | |
| consumerInfo->getConsumerId()->getValue(), | |
| consumer ); | |
| } | |
| return consumer; | |
| } catch( ConnectorException& ex ) { | |
| delete consumer; | |
| delete consumerInfo; | |
| ex.setMark( __FILE__, __LINE__ ); | |
| throw ex; | |
| } catch( ... ) { | |
| delete consumer; | |
| delete consumerInfo; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| ConsumerInfo* OpenWireConnector::createDurableConsumer( | |
| const cms::Topic* topic, | |
| connector::SessionInfo* session, | |
| const std::string& name, | |
| const std::string& selector, | |
| bool noLocal ) | |
| throw ( ConnectorException ) | |
| { | |
| OpenWireConsumerInfo* consumer = NULL; | |
| commands::ConsumerInfo* consumerInfo = NULL; | |
| try | |
| { | |
| enforceConnected(); | |
| consumer = new OpenWireConsumerInfo( this ); | |
| consumer->setSessionInfo( session ); | |
| consumerInfo = createConsumerInfo( topic, session ); | |
| consumer->setConsumerInfo( consumerInfo ); | |
| consumerInfo->setSelector( selector ); | |
| consumerInfo->setNoLocal( noLocal ); | |
| consumerInfo->setSubscriptionName( name ); | |
| /** | |
| * Override default options with uri-encoded parameters. | |
| */ | |
| applyDestinationOptions( consumerInfo ); | |
| synchronized( &consumerInfoMap ) { | |
| // Optimistically place the Consumer into the Map. | |
| consumerInfoMap.setValue( | |
| consumerInfo->getConsumerId()->getValue(), | |
| consumer ); | |
| } | |
| return consumer; | |
| } catch( ConnectorException& ex ) { | |
| delete consumer; | |
| delete consumerInfo; | |
| ex.setMark( __FILE__, __LINE__ ); | |
| throw ex; | |
| } catch( ... ) { | |
| delete consumer; | |
| delete consumerInfo; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::applyDestinationOptions( commands::ConsumerInfo* info ) | |
| { | |
| const commands::ActiveMQDestination* amqDestination = info->getDestination(); | |
| // Get any options specified in the destination and apply them to the | |
| // ConsumerInfo object. | |
| const ActiveMQProperties& options = amqDestination->getOptions(); | |
| std::string noLocalStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_NOLOCAL ); | |
| if( options.hasProperty( noLocalStr ) ) | |
| { | |
| info->setNoLocal( | |
| Boolean::parseBoolean( | |
| options.getProperty( noLocalStr ) ) ); | |
| } | |
| std::string selectorStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_SELECTOR ); | |
| if( options.hasProperty( selectorStr ) ) | |
| { | |
| info->setSelector( | |
| options.getProperty( selectorStr ) ); | |
| } | |
| std::string priorityStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_PRIORITY ); | |
| if( options.hasProperty( priorityStr ) ) | |
| { | |
| info->setPriority( | |
| Integer::parseInt( | |
| options.getProperty( priorityStr ) ) ); | |
| } | |
| std::string dispatchAsyncStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_DISPATCHASYNC ); | |
| if( options.hasProperty( dispatchAsyncStr ) ) | |
| { | |
| info->setDispatchAsync( | |
| Boolean::parseBoolean( | |
| options.getProperty( dispatchAsyncStr ) ) ); | |
| } | |
| std::string exclusiveStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_EXCLUSIVE ); | |
| if( options.hasProperty( exclusiveStr ) ) | |
| { | |
| info->setExclusive( | |
| Boolean::parseBoolean( | |
| options.getProperty( exclusiveStr ) ) ); | |
| } | |
| std::string maxPendingMsgLimitStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT ); | |
| if( options.hasProperty( maxPendingMsgLimitStr ) ) | |
| { | |
| info->setMaximumPendingMessageLimit( | |
| Integer::parseInt( | |
| options.getProperty( maxPendingMsgLimitStr ) ) ); | |
| } | |
| std::string prefetchSizeStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE ); | |
| if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr ) ) | |
| { | |
| info->setPrefetchSize( | |
| Integer::parseInt( | |
| options.getProperty( prefetchSizeStr, "1000" ) ) ); | |
| } | |
| std::string retroactiveStr = | |
| core::ActiveMQConstants::toString( | |
| core::ActiveMQConstants::CONSUMER_RETROACTIVE ); | |
| if( options.hasProperty( retroactiveStr ) ) | |
| { | |
| info->setRetroactive( | |
| Boolean::parseBoolean( | |
| options.getProperty( retroactiveStr ) ) ); | |
| } | |
| std::string browserStr = "consumer.browser"; | |
| if( options.hasProperty( browserStr ) ) | |
| { | |
| info->setBrowser( | |
| Boolean::parseBoolean( | |
| options.getProperty( browserStr ) ) ); | |
| } | |
| std::string networkSubscriptionStr = "consumer.networkSubscription"; | |
| if( options.hasProperty( networkSubscriptionStr ) ) | |
| { | |
| info->setNetworkSubscription( | |
| Boolean::parseBoolean( | |
| options.getProperty( networkSubscriptionStr ) ) ); | |
| } | |
| std::string optimizedAcknowledgeStr = "consumer.optimizedAcknowledge"; | |
| if( options.hasProperty( optimizedAcknowledgeStr ) ) | |
| { | |
| info->setOptimizedAcknowledge( | |
| Boolean::parseBoolean( | |
| options.getProperty( optimizedAcknowledgeStr ) ) ); | |
| } | |
| std::string noRangeAcksStr = "consumer.noRangeAcks"; | |
| if( options.hasProperty( noRangeAcksStr ) ) | |
| { | |
| info->setNoRangeAcks( | |
| Boolean::parseBoolean( | |
| options.getProperty( noRangeAcksStr ) ) ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| commands::ConsumerInfo* OpenWireConnector::createConsumerInfo( | |
| const cms::Destination* destination, | |
| connector::SessionInfo* session ) | |
| throw ( ConnectorException ) | |
| { | |
| commands::ConsumerInfo* consumerInfo = NULL; | |
| try | |
| { | |
| consumerInfo = new commands::ConsumerInfo(); | |
| commands::ConsumerId* consumerId = new commands::ConsumerId(); | |
| consumerInfo->setConsumerId( consumerId ); | |
| consumerId->setConnectionId( session->getConnectionId() ); | |
| consumerId->setSessionId( session->getSessionId() ); | |
| consumerId->setValue( consumerIds.getNextSequenceId() ); | |
| // Cast the destination to an OpenWire destination, so we can | |
| // get all the goodies. | |
| const commands::ActiveMQDestination* amqDestination = | |
| dynamic_cast<const commands::ActiveMQDestination*>( destination ); | |
| if( amqDestination == NULL ) { | |
| throw ConnectorException( __FILE__, __LINE__, | |
| "Destination was either NULL or not created by this OpenWireConnector" ); | |
| } | |
| consumerInfo->setDestination( amqDestination->cloneDataStructure() ); | |
| return consumerInfo; | |
| } catch( ConnectorException& ex ) { | |
| delete consumerInfo; | |
| ex.setMark( __FILE__, __LINE__ ); | |
| throw ex; | |
| } catch( std::exception& ex ) { | |
| delete consumerInfo; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| ex.what() ); | |
| } catch( ... ) { | |
| delete consumerInfo; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::startConsumer( ConsumerInfo* consumer ) | |
| throw ( ConnectorException ) { | |
| try | |
| { | |
| enforceConnected(); | |
| OpenWireConsumerInfo* consumerInfo = | |
| dynamic_cast<OpenWireConsumerInfo*>( consumer ); | |
| if( consumerInfo == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::startConsumer - " | |
| "Consumer was not of the OpenWire flavor."); | |
| } | |
| if( consumerInfo->getConsumerInfo() == NULL || | |
| consumerInfo->isStarted() == true ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::startConsumer - " | |
| "Consumer was not in the correct state."); | |
| } | |
| // Send the message to the broker. | |
| Response* response = syncRequest( consumerInfo->getConsumerInfo() ); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| // Tag the Consumer as started | |
| consumerInfo->setStarted( true ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| ProducerInfo* OpenWireConnector::createProducer( | |
| const cms::Destination* destination, | |
| connector::SessionInfo* session ) | |
| throw ( ConnectorException ) | |
| { | |
| OpenWireProducerInfo* producer = NULL; | |
| commands::ProducerInfo* producerInfo = NULL; | |
| try | |
| { | |
| enforceConnected(); | |
| producer = new OpenWireProducerInfo( this ); | |
| producer->setSessionInfo( session ); | |
| producerInfo = new commands::ProducerInfo(); | |
| producer->setProducerInfo( producerInfo ); | |
| commands::ProducerId* producerId = new commands::ProducerId(); | |
| producerInfo->setProducerId( producerId ); | |
| producerId->setConnectionId( session->getConnectionId() ); | |
| producerId->setSessionId( session->getSessionId() ); | |
| producerId->setValue( producerIds.getNextSequenceId() ); | |
| // Producers are allowed to have NULL destinations. In this case, the | |
| // destination is specified by the messages as they are sent. | |
| if( destination != NULL ) { | |
| // Cast the destination to an OpenWire destination, so we can | |
| // get all the goodies. | |
| const commands::ActiveMQDestination* amqDestination = | |
| dynamic_cast<const commands::ActiveMQDestination*>( destination ); | |
| if( amqDestination == NULL ) { | |
| throw ConnectorException( __FILE__, __LINE__, | |
| "Destination was not created by this OpenWireConnector" ); | |
| } | |
| // Get any options specified in the destination and apply them to the | |
| // ProducerInfo object. | |
| producerInfo->setDestination( amqDestination->cloneDataStructure() ); | |
| const ActiveMQProperties& options = amqDestination->getOptions(); | |
| producerInfo->setDispatchAsync( Boolean::parseBoolean( | |
| options.getProperty( "producer.dispatchAsync", "false" )) ); | |
| } | |
| // Send the message to the broker. | |
| Response* response = syncRequest(producerInfo); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| return producer; | |
| } catch( ConnectorException& ex ) { | |
| delete producer; | |
| delete producerInfo; | |
| ex.setMark( __FILE__, __LINE__ ); | |
| throw ex; | |
| } catch( std::exception& ex ) { | |
| delete producer; | |
| delete producerInfo; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| ex.what() ); | |
| } catch( ... ) { | |
| delete producer; | |
| delete producerInfo; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::Topic* OpenWireConnector::createTopic( const std::string& name, | |
| connector::SessionInfo* session AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| return new commands::ActiveMQTopic( name ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::Queue* OpenWireConnector::createQueue( const std::string& name, | |
| connector::SessionInfo* session AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| return new commands::ActiveMQQueue( name ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::TemporaryTopic* OpenWireConnector::createTemporaryTopic( | |
| connector::SessionInfo* session AMQCPP_UNUSED ) | |
| throw ( ConnectorException, UnsupportedOperationException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| commands::ActiveMQTempTopic* topic = new | |
| commands::ActiveMQTempTopic( createTemporaryDestinationName() ); | |
| // Register it with the Broker | |
| this->createTemporaryDestination( topic ); | |
| return topic; | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::TemporaryQueue* OpenWireConnector::createTemporaryQueue( | |
| connector::SessionInfo* session AMQCPP_UNUSED ) | |
| throw ( ConnectorException, UnsupportedOperationException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| commands::ActiveMQTempQueue* queue = new | |
| commands::ActiveMQTempQueue( createTemporaryDestinationName() ); | |
| // Register it with the Broker | |
| this->createTemporaryDestination( queue ); | |
| return queue; | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::send( cms::Message* message, | |
| ProducerInfo* producerInfo ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| OpenWireProducerInfo* producer = | |
| dynamic_cast<OpenWireProducerInfo*>( producerInfo ); | |
| if( producer == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::send - " | |
| "Producer was not of the OpenWire flavor."); | |
| } | |
| const SessionInfo* session = producerInfo->getSessionInfo(); | |
| commands::Message* amqMessage = | |
| dynamic_cast< commands::Message* >( message ); | |
| if( amqMessage == NULL ) | |
| { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::send - " | |
| "Message is not a valid Open Wire type."); | |
| } | |
| // Clear any old data that might be in the message object | |
| delete amqMessage->getMessageId(); | |
| delete amqMessage->getProducerId(); | |
| delete amqMessage->getTransactionId(); | |
| // Always assign the message ID, regardless of the disable | |
| // flag. Not adding a message ID will cause an NPE at the broker. | |
| commands::MessageId* id = new commands::MessageId(); | |
| id->setProducerId( | |
| producer->getProducerInfo()->getProducerId()->cloneDataStructure() ); | |
| id->setProducerSequenceId( producerSequenceIds.getNextSequenceId() ); | |
| amqMessage->setMessageId( id ); | |
| amqMessage->setProducerId( | |
| producer->getProducerInfo()->getProducerId()->cloneDataStructure() ); | |
| if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) { | |
| const OpenWireTransactionInfo* transactionInfo = | |
| dynamic_cast<const OpenWireTransactionInfo*>( | |
| producer->getSessionInfo()->getTransactionInfo() ); | |
| if( transactionInfo == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::acknowledge - " | |
| "Transacted Session, has no Transaction Info."); | |
| } | |
| amqMessage->setTransactionId( | |
| transactionInfo->getTransactionInfo()-> | |
| getTransactionId()->cloneDataStructure() ); | |
| } | |
| // Send the message to the broker. | |
| Response* response = syncRequest( amqMessage ); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| } catch( ConnectorException& ex ){ | |
| try{ transport->close(); } catch( ... ){} | |
| ex.setMark(__FILE__,__LINE__); | |
| throw ex; | |
| } catch( ... ) { | |
| try{ transport->close(); } catch( ... ){} | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "Caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::send( std::list<cms::Message*>& messages, | |
| ProducerInfo* producerInfo ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| list< cms::Message* >::const_iterator itr = messages.begin(); | |
| for( ; itr != messages.end(); ++itr ) | |
| { | |
| this->send( *itr, producerInfo ); | |
| } | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::acknowledge( const SessionInfo* session, | |
| const ConsumerInfo* consumer, | |
| const cms::Message* message, | |
| AckType ackType ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| const commands::Message* amqMessage = | |
| dynamic_cast<const commands::Message*>( message ); | |
| if( amqMessage == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::acknowledge - " | |
| "Message was not a commands::Message derivation."); | |
| } | |
| const OpenWireConsumerInfo* consumerInfo = | |
| dynamic_cast<const OpenWireConsumerInfo*>( consumer ); | |
| if( consumerInfo == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::acknowledge - " | |
| "Consumer was not of the OpenWire flavor."); | |
| } | |
| commands::MessageAck ack; | |
| ack.setAckType( (int)ackType ); | |
| ack.setConsumerId( | |
| consumerInfo->getConsumerInfo()->getConsumerId()->cloneDataStructure() ); | |
| ack.setDestination( amqMessage->getDestination()->cloneDataStructure() ); | |
| ack.setFirstMessageId( amqMessage->getMessageId()->cloneDataStructure() ); | |
| ack.setLastMessageId( amqMessage->getMessageId()->cloneDataStructure() ); | |
| ack.setMessageCount( 1 ); | |
| if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) { | |
| const OpenWireTransactionInfo* transactionInfo = | |
| dynamic_cast<const OpenWireTransactionInfo*>( | |
| session->getTransactionInfo() ); | |
| if( transactionInfo == NULL || | |
| transactionInfo->getTransactionInfo() == NULL || | |
| transactionInfo->getTransactionInfo()->getTransactionId() == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::acknowledge - " | |
| "Transacted Session, has no Transaction Info."); | |
| } | |
| const commands::TransactionId* transactionId = | |
| dynamic_cast<const commands::TransactionId*>( | |
| transactionInfo->getTransactionInfo()->getTransactionId() ); | |
| commands::TransactionId* clonedTransactionId = | |
| transactionId->cloneDataStructure(); | |
| ack.setTransactionId( clonedTransactionId ); | |
| } | |
| oneway( &ack ); | |
| } catch( ConnectorException& ex ){ | |
| try{ transport->close(); } catch( ... ){} | |
| ex.setMark(__FILE__,__LINE__); | |
| throw ex; | |
| } catch( ... ) { | |
| try{ transport->close(); } catch( ... ){} | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "Caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| TransactionInfo* OpenWireConnector::startTransaction( | |
| connector::SessionInfo* session ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| enforceConnected(); | |
| OpenWireTransactionInfo* transaction = | |
| new OpenWireTransactionInfo( this ); | |
| // Place Transaction Data in session for later use as well as | |
| // the session in the Transaction Data | |
| session->setTransactionInfo( transaction ); | |
| transaction->setSessionInfo( session ); | |
| // Prepare and send the Transaction command | |
| commands::TransactionInfo* info = new commands::TransactionInfo(); | |
| info->setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() ); | |
| info->setTransactionId( createLocalTransactionId() ); | |
| info->setType( (int)TRANSACTION_STATE_BEGIN ); | |
| oneway( info ); | |
| // Store for later | |
| transaction->setTransactionInfo( info ); | |
| return transaction; | |
| } catch( ConnectorException& ex ){ | |
| try{ transport->close(); } catch( ... ){} | |
| ex.setMark(__FILE__,__LINE__); | |
| throw ex; | |
| } catch( ... ) { | |
| try{ transport->close(); } catch( ... ){} | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "Caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::commit( TransactionInfo* transaction, | |
| SessionInfo* session AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| OpenWireTransactionInfo* transactionInfo = | |
| dynamic_cast<OpenWireTransactionInfo*>( transaction ); | |
| if( transactionInfo == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::commit - " | |
| "Transaction was not of the OpenWire flavor."); | |
| } | |
| commands::TransactionInfo* info = | |
| transactionInfo->getTransactionInfo(); | |
| info->setType( (int)TRANSACTION_STATE_COMMITONEPHASE ); | |
| oneway( info ); | |
| } catch( ConnectorException& ex ){ | |
| try{ transport->close(); } catch( ... ){} | |
| ex.setMark(__FILE__,__LINE__); | |
| throw ex; | |
| } catch( ... ) { | |
| try{ transport->close(); } catch( ... ){} | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "Caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::rollback( TransactionInfo* transaction, | |
| SessionInfo* session AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| enforceConnected(); | |
| OpenWireTransactionInfo* transactionInfo = | |
| dynamic_cast<OpenWireTransactionInfo*>( transaction ); | |
| if( transactionInfo == NULL ) { | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::commit - " | |
| "Transaction was not of the OpenWire flavor."); | |
| } | |
| commands::TransactionInfo* info = | |
| transactionInfo->getTransactionInfo(); | |
| info->setType( (int)TRANSACTION_STATE_ROLLBACK ); | |
| oneway( info ); | |
| } catch( ConnectorException& ex ){ | |
| try{ transport->close(); } catch( ... ){} | |
| ex.setMark(__FILE__,__LINE__); | |
| throw ex; | |
| } catch( ... ) { | |
| try{ transport->close(); } catch( ... ){} | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "Caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::Message* OpenWireConnector::createMessage( | |
| connector::SessionInfo* session AMQCPP_UNUSED, | |
| TransactionInfo* transaction AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| return new commands::ActiveMQMessage(); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::BytesMessage* OpenWireConnector::createBytesMessage( | |
| connector::SessionInfo* session AMQCPP_UNUSED, | |
| TransactionInfo* transaction AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| return new commands::ActiveMQBytesMessage(); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::TextMessage* OpenWireConnector::createTextMessage( | |
| connector::SessionInfo* session AMQCPP_UNUSED, | |
| TransactionInfo* transaction AMQCPP_UNUSED ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| return new commands::ActiveMQTextMessage(); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| cms::MapMessage* OpenWireConnector::createMapMessage( | |
| connector::SessionInfo* session AMQCPP_UNUSED, | |
| TransactionInfo* transaction AMQCPP_UNUSED ) | |
| throw ( ConnectorException, UnsupportedOperationException ) | |
| { | |
| try { | |
| return new commands::ActiveMQMapMessage(); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::unsubscribe( const std::string& name ) | |
| throw ( ConnectorException, UnsupportedOperationException ) | |
| { | |
| commands::RemoveSubscriptionInfo* rsi = NULL; | |
| try { | |
| enforceConnected(); | |
| rsi = new commands::RemoveSubscriptionInfo(); | |
| rsi->setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() ); | |
| rsi->setSubcriptionName( name ); | |
| rsi->setClientId( connectionInfo.getClientId() ); | |
| // Send the message to the broker. | |
| Response* response = syncRequest(rsi); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| } catch( ConnectorException& ex ) { | |
| delete rsi; | |
| ex.setMark( __FILE__, __LINE__ ); | |
| throw ex; | |
| } catch( ... ) { | |
| delete rsi; | |
| throw OpenWireConnectorException( __FILE__, __LINE__, | |
| "caught unknown exception" ); | |
| } | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::closeResource( ConnectorResource* resource ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| // if we don't get a resource or we aren't connected then we can't do | |
| // anything so we return quickly. | |
| if( resource == NULL || state != CONNECTION_STATE_CONNECTED ) { | |
| return; | |
| } | |
| commands::DataStructure* dataStructure = NULL; | |
| commands::ActiveMQTempDestination* tempDestination = | |
| dynamic_cast<commands::ActiveMQTempDestination*>( resource ); | |
| if( typeid( *resource ) == typeid( OpenWireConsumerInfo ) ) { | |
| OpenWireConsumerInfo* consumer = | |
| dynamic_cast<OpenWireConsumerInfo*>( resource ); | |
| // Remove this consumer from the consumer info map | |
| synchronized( &consumerInfoMap ) { | |
| consumerInfoMap.remove( | |
| consumer->getConsumerInfo()->getConsumerId()->getValue() ); | |
| } | |
| // Unstarted Consumers can just be deleted. | |
| if( consumer->isStarted() == false ) { | |
| return; | |
| } | |
| dataStructure = consumer->getConsumerInfo()->getConsumerId(); | |
| } else if( typeid( *resource ) == typeid( OpenWireProducerInfo ) ) { | |
| OpenWireProducerInfo* producer = | |
| dynamic_cast<OpenWireProducerInfo*>( resource ); | |
| dataStructure = producer->getProducerInfo()->getProducerId(); | |
| } else if( typeid( *resource ) == typeid( OpenWireSessionInfo ) ) { | |
| OpenWireSessionInfo* session = | |
| dynamic_cast<OpenWireSessionInfo*>(resource); | |
| dataStructure = session->getSessionInfo()->getSessionId(); | |
| } else if( typeid( *resource ) == typeid( OpenWireTransactionInfo ) ) { | |
| // Nothing to do for Transaction Info's | |
| return; | |
| } else if( tempDestination != NULL ) { | |
| // User deletes these | |
| destroyTemporaryDestination( tempDestination ); | |
| return; | |
| } | |
| if( dataStructure == NULL ) { | |
| throw OpenWireConnectorException(__FILE__,__LINE__, | |
| "attempting to destroy an invalid resource"); | |
| } | |
| // Dispose of this data structure at the broker. | |
| disposeOf( dataStructure ); | |
| } | |
| catch( ConnectorException& ex ) { | |
| ex.setMark(__FILE__, __LINE__ ); | |
| throw ex; | |
| } | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::onCommand( transport::Command* command ) | |
| { | |
| try | |
| { | |
| if( typeid( *command ) == typeid( commands::MessageDispatch ) ) { | |
| commands::MessageDispatch* dispatch = | |
| dynamic_cast<commands::MessageDispatch*>( command ); | |
| // Due to the severe suckiness of C++, in order to cast to | |
| // a type that is in a different branch of the inheritence hierarchy | |
| // we have to cast to the type at the "crotch" of the branch and then | |
| // we can implicitly cast up the other branch. | |
| core::ActiveMQMessage* message = dynamic_cast<core::ActiveMQMessage*>(dispatch->getMessage()); | |
| if( message == NULL ) { | |
| delete command; | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::onCommand - " | |
| "Received unsupported dispatch message" ); | |
| } | |
| // Get the consumer info object for this consumer. | |
| OpenWireConsumerInfo* info = NULL; | |
| synchronized( &consumerInfoMap ) { | |
| info = consumerInfoMap.getValue( dispatch->getConsumerId()->getValue() ); | |
| if( info == NULL ){ | |
| delete command; | |
| throw OpenWireConnectorException( | |
| __FILE__, __LINE__, | |
| "OpenWireConnector::onCommand - " | |
| "Received dispatch message for unregistered consumer" ); | |
| } | |
| } | |
| try{ | |
| // Callback the listener (the connection object). | |
| if( messageListener != NULL ){ | |
| messageListener->onConsumerMessage( info, message ); | |
| // Clear the Message as we've passed it onto the | |
| // listener, who is responsible for deleting it at | |
| // the appropriate time, which depends on things like | |
| // the session being transacted etc. | |
| dispatch->setMessage( NULL ); | |
| } | |
| }catch( ... ){/* do nothing*/} | |
| delete command; | |
| } else if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) { | |
| this->brokerWireFormatInfo = | |
| dynamic_cast<commands::WireFormatInfo*>( command ); | |
| } else if( typeid( *command ) == typeid( commands::BrokerInfo ) ) { | |
| this->brokerInfo = | |
| dynamic_cast<commands::BrokerInfo*>( command ); | |
| } else if( typeid( *command ) == typeid( commands::ShutdownInfo ) ) { | |
| try { | |
| if( state != CONNECTION_STATE_DISCONNECTED ) { | |
| fire( CommandIOException( | |
| __FILE__, | |
| __LINE__, | |
| "OpenWireConnector::onCommand - " | |
| "Broker closed this connection.")); | |
| } | |
| } catch( ... ) { /* do nothing */ } | |
| delete command; | |
| } | |
| else | |
| { | |
| //LOGCMS_WARN( logger, "Received an unknown command" ); | |
| delete command; | |
| } | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::onTransportException( | |
| transport::Transport* source AMQCPP_UNUSED, | |
| const exceptions::ActiveMQException& ex ) | |
| { | |
| try | |
| { | |
| // We're disconnected - the asynchronous error is expected. | |
| if( state == CONNECTION_STATE_DISCONNECTED ){ | |
| return; | |
| } | |
| // Mark the fact that we are in an error state | |
| state = CONNECTION_STATE_ERROR; | |
| // Inform the user of the error. | |
| fire( ex ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::oneway( Command* command ) | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| transport->oneway(command); | |
| } | |
| AMQ_CATCH_EXCEPTION_CONVERT( CommandIOException, OpenWireConnectorException ) | |
| AMQ_CATCH_EXCEPTION_CONVERT( UnsupportedOperationException, OpenWireConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| Response* OpenWireConnector::syncRequest( Command* command ) | |
| throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| Response* response = transport->request( command ); | |
| commands::ExceptionResponse* exceptionResponse = | |
| dynamic_cast<commands::ExceptionResponse*>( response ); | |
| if( exceptionResponse != NULL ) | |
| { | |
| // Create an exception to hold the error information. | |
| commands::BrokerError* brokerError = | |
| dynamic_cast<commands::BrokerError*>( | |
| exceptionResponse->getException() ); | |
| BrokerException exception( __FILE__, __LINE__, brokerError ); | |
| // Free the response command. | |
| delete response; | |
| // Throw the exception. | |
| throw exception; | |
| } | |
| // Nothing bad happened - just return the response. | |
| return response; | |
| } | |
| AMQ_CATCH_EXCEPTION_CONVERT( CommandIOException, OpenWireConnectorException ) | |
| AMQ_CATCH_EXCEPTION_CONVERT( UnsupportedOperationException, OpenWireConnectorException ) | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::disposeOf( | |
| commands::DataStructure* objectId ) throw ( ConnectorException ) | |
| { | |
| try | |
| { | |
| commands::RemoveInfo command; | |
| command.setObjectId( objectId->cloneDataStructure() ); | |
| oneway( &command ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::createTemporaryDestination( | |
| commands::ActiveMQTempDestination* tempDestination ) throw ( ConnectorException ) { | |
| try { | |
| commands::DestinationInfo command; | |
| command.setConnectionId( | |
| connectionInfo.getConnectionId()->cloneDataStructure() ); | |
| command.setOperationType( 0 ); // 0 is add | |
| command.setDestination( tempDestination->cloneDataStructure() ); | |
| // Send the message to the broker. | |
| Response* response = syncRequest(&command); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| // Now that its setup, link it to this Connector | |
| tempDestination->setConnector( this ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| void OpenWireConnector::destroyTemporaryDestination( | |
| commands::ActiveMQTempDestination* tempDestination ) throw ( ConnectorException ) { | |
| try { | |
| commands::DestinationInfo command; | |
| command.setConnectionId( | |
| connectionInfo.getConnectionId()->cloneDataStructure() ); | |
| command.setOperationType( 1 ); // 1 is remove | |
| command.setDestination( | |
| tempDestination->cloneDataStructure() ); | |
| // Send the message to the broker. | |
| Response* response = syncRequest(&command); | |
| // The broker did not return an error - this is good. | |
| // Just discard the response. | |
| delete response; | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| std::string OpenWireConnector::createTemporaryDestinationName() | |
| throw ( ConnectorException ) | |
| { | |
| try { | |
| return connectionInfo.getConnectionId()->getValue() + ":" + | |
| util::Long::toString( tempDestinationIds.getNextSequenceId() ); | |
| } | |
| AMQ_CATCH_RETHROW( ConnectorException ) | |
| AMQ_CATCHALL_THROW( OpenWireConnectorException ) | |
| } | |
| //////////////////////////////////////////////////////////////////////////////// | |
| commands::TransactionId* OpenWireConnector::createLocalTransactionId() | |
| throw ( ConnectorException ) { | |
| commands::LocalTransactionId* id = new commands::LocalTransactionId(); | |
| id->setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() ); | |
| id->setValue( transactionIds.getNextSequenceId() ); | |
| return id; | |
| } |