blob: d24107923f77e43975270f4aa308d3ba3484fb77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "IOTransport.h"
#include "CommandReader.h"
#include "CommandWriter.h"
#include <activemq/concurrent/Concurrent.h>
#include <activemq/exceptions/UnsupportedOperationException.h>
#include <activemq/util/Config.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::concurrent;
LOGCMS_INITIALIZE(logger, IOTransport, "activemq.transport.IOTransport" )
////////////////////////////////////////////////////////////////////////////////
IOTransport::IOTransport(){
listener = NULL;
reader = NULL;
writer = NULL;
exceptionListener = NULL;
inputStream = NULL;
outputStream = NULL;
closed = false;
thread = NULL;
}
////////////////////////////////////////////////////////////////////////////////
IOTransport::~IOTransport(){
close();
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::oneway( Command* command )
throw(CommandIOException, exceptions::UnsupportedOperationException)
{
if( closed ){
throw CommandIOException( __FILE__, __LINE__,
"IOTransport::oneway() - transport is closed!" );
}
// Make sure the thread has been started.
if( thread == NULL ){
throw CommandIOException(
__FILE__, __LINE__,
"IOTransport::oneway() - transport is not started" );
}
// Make sure the command object is valid.
if( command == NULL ){
throw CommandIOException(
__FILE__, __LINE__,
"IOTransport::oneway() - attempting to write NULL command" );
}
// Make sure we have an output strema to write to.
if( outputStream == NULL ){
throw CommandIOException(
__FILE__, __LINE__,
"IOTransport::oneway() - invalid output stream" );
}
synchronized( outputStream ){
// Write the command to the output stream.
writer->writeCommand( command );
}
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::start() throw( cms::CMSException ){
// Can't restart a closed transport.
if( closed ){
throw CommandIOException( __FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart" );
}
// If it's already started, do nothing.
if( thread != NULL ){
return;
}
// Make sure all variables that we need have been set.
if( inputStream == NULL || outputStream == NULL ||
reader == NULL || writer == NULL ){
throw CommandIOException(
__FILE__, __LINE__,
"IOTransport::start() - "
"IO sreams and reader/writer must be set before calling start" );
}
// Init the Command Reader and Writer with the Streams
reader->setInputStream( inputStream );
writer->setOutputStream( outputStream );
// Start the polling thread.
thread = new Thread( this );
thread->start();
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::close() throw( cms::CMSException ){
try{
if( closed ){
return;
}
// Mark this transport as closed.
closed = true;
// We have to close the input stream before
// we stop the thread. this will force us to
// wake up the thread if it's stuck in a read
// (which is likely). Otherwise, the join that
// follows will block forever.
if( inputStream != NULL ){
inputStream->close();
inputStream = NULL;
}
// Wait for the thread to die.
if( thread != NULL ){
thread->join();
delete thread;
thread = NULL;
}
// Close the output stream.
if( outputStream != NULL ){
outputStream->close();
outputStream = NULL;
}
}
AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::run(){
try{
while( !closed ){
// Read the next command from the input stream.
Command* command = reader->readCommand();
// Notify the listener.
fire( command );
}
}
catch( exceptions::ActiveMQException& ex ){
ex.setMark( __FILE__, __LINE__ );
fire( ex );
}
catch( ... ){
exceptions::ActiveMQException ex(
__FILE__, __LINE__,
"IOTransport::run - caught unknown exception" );
LOGCMS_WARN(logger, ex.getStackTraceString() );
fire( ex );
}
}
////////////////////////////////////////////////////////////////////////////////
Response* IOTransport::request( Command* command AMQCPP_UNUSED )
throw( CommandIOException, exceptions::UnsupportedOperationException ){
throw exceptions::UnsupportedOperationException( __FILE__, __LINE__, "IOTransport::request() - unsupported operation" );
}