blob: 5586386fba458d532ab442d8acd5a21e5fa49f3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ACTIVEMQ_COMMANDS_IOTRANSPORTTEST_H_
#define ACTIVEMQ_COMMANDS_IOTRANSPORTTEST_H_
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
#include <activemq/transport/IOTransport.h>
#include <activemq/transport/CommandListener.h>
#include <activemq/transport/CommandReader.h>
#include <activemq/transport/CommandWriter.h>
#include <activemq/transport/Command.h>
#include <activemq/transport/TransportExceptionListener.h>
#include <activemq/concurrent/Concurrent.h>
#include <activemq/io/BlockingByteArrayInputStream.h>
#include <activemq/io/ByteArrayOutputStream.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Mutex.h>
#include <activemq/util/Config.h>
namespace activemq{
namespace transport{
class IOTransportTest : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE( IOTransportTest );
CPPUNIT_TEST( testStartClose );
CPPUNIT_TEST( testRead );
CPPUNIT_TEST( testWrite );
CPPUNIT_TEST( testException );
CPPUNIT_TEST_SUITE_END();
public:
class MyCommand : public Command{
public:
MyCommand(){ c = 0; }
virtual ~MyCommand(){}
char c;
virtual void setCommandId( int id AMQCPP_UNUSED){}
virtual int getCommandId() const{ return 0; }
virtual void setResponseRequired( const bool required AMQCPP_UNUSED){}
virtual bool isResponseRequired() const{ return false; }
virtual std::string toString() const{ return ""; }
virtual Command* cloneCommand() const{
MyCommand* command = new MyCommand;
command->c = c;
return command;
}
};
class MyCommandListener : public CommandListener{
public:
MyCommandListener(){}
virtual ~MyCommandListener(){}
std::string str;
virtual void onCommand( Command* command ){
const MyCommand* cmd = dynamic_cast<const MyCommand*>(command);
str += cmd->c;
delete command;
}
};
class MyCommandReader : public CommandReader{
private:
/**
* The target input stream.
*/
io::InputStream* inputStream;
public:
MyCommandReader(){ throwException = false; }
virtual ~MyCommandReader(){}
bool throwException;
virtual void setInputStream(io::InputStream* is){
inputStream = is;
}
virtual io::InputStream* getInputStream(void){
return inputStream;
}
virtual Command* readCommand( void ) throw (CommandIOException){
try{
if( throwException ){
throw CommandIOException();
}
synchronized( inputStream ){
MyCommand* command = new MyCommand();
try{
command->c = inputStream->read();
} catch( exceptions::ActiveMQException& ex ){
// Free the memory.
delete command;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
return command;
}
assert(false);
return NULL;
}catch( exceptions::ActiveMQException& ex ){
CommandIOException cx( ex );
cx.setMark( __FILE__, __LINE__ );
throw cx;
}
}
virtual std::size_t read(unsigned char* buffer AMQCPP_UNUSED,
std::size_t count AMQCPP_UNUSED)
throw( io::IOException ) {
return 0;
}
virtual unsigned char readByte() throw(io::IOException) {
return 0;
}
};
class MyCommandWriter : public CommandWriter{
private:
/**
* Target output stream.
*/
io::OutputStream* outputStream;
public:
virtual ~MyCommandWriter(){}
virtual void setOutputStream(io::OutputStream* os){
outputStream = os;
}
virtual io::OutputStream* getOutputStream(void){
return outputStream;
}
virtual void writeCommand( Command* command )
throw (CommandIOException)
{
try{
synchronized( outputStream ){
const MyCommand* m =
dynamic_cast<const MyCommand*>(command);
outputStream->write( m->c );
}
}catch( exceptions::ActiveMQException& ex ){
CommandIOException cx( ex );
cx.setMark( __FILE__, __LINE__ );
throw cx;
}
}
virtual void write( const unsigned char* buffer AMQCPP_UNUSED,
std::size_t count AMQCPP_UNUSED)
throw(io::IOException) {}
virtual void writeByte(unsigned char v AMQCPP_UNUSED) throw(io::IOException) {}
};
class MyExceptionListener : public TransportExceptionListener{
public:
Transport* transport;
concurrent::Mutex mutex;
MyExceptionListener(){
transport = NULL;
}
virtual ~MyExceptionListener(){}
virtual void onTransportException( Transport* source,
const exceptions::ActiveMQException& ex AMQCPP_UNUSED){
transport = source;
synchronized(&mutex)
{
mutex.notify();
}
}
};
public:
virtual ~IOTransportTest(){}
// This will just test that we can start and stop the
// transport without any exceptions.
void testStartClose(){
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
MyCommandReader reader;
MyCommandWriter writer;
MyExceptionListener exListener;
IOTransport transport;
transport.setCommandListener( &listener );
transport.setCommandReader( &reader );
transport.setCommandWriter( &writer );
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
transport.start();
concurrent::Thread::sleep( 50 );
transport.close();
}
void testRead(){
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
MyCommandReader reader;
MyCommandWriter writer;
MyExceptionListener exListener;
IOTransport transport;
transport.setCommandListener( &listener );
transport.setCommandReader( &reader );
transport.setCommandWriter( &writer );
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
transport.start();
concurrent::Thread::sleep( 10 );
unsigned char buffer[10] = { '1', '2', '3', '4', '5', '6', '7', '8', '9', '0' };
try{
synchronized( &is ){
is.setByteArray( buffer, 10 );
}
}catch( exceptions::ActiveMQException& ex ){
ex.setMark( __FILE__, __LINE__ );
}
concurrent::Thread::sleep( 100 );
CPPUNIT_ASSERT( listener.str == "1234567890" );
transport.close();
}
void testWrite(){
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
MyCommandReader reader;
MyCommandWriter writer;
MyExceptionListener exListener;
IOTransport transport;
transport.setCommandListener( &listener );
transport.setCommandReader( &reader );
transport.setCommandWriter( &writer );
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
transport.start();
MyCommand cmd;
cmd.c = '1';
transport.oneway( &cmd );
cmd.c = '2';
transport.oneway( &cmd );
cmd.c = '3';
transport.oneway( &cmd );
cmd.c = '4';
transport.oneway( &cmd );
cmd.c = '5';
transport.oneway( &cmd );
const unsigned char* bytes = os.getByteArray();
std::size_t size = os.getByteArraySize();
CPPUNIT_ASSERT( size >= 5 );
CPPUNIT_ASSERT( bytes[0] == '1' );
CPPUNIT_ASSERT( bytes[1] == '2' );
CPPUNIT_ASSERT( bytes[2] == '3' );
CPPUNIT_ASSERT( bytes[3] == '4' );
CPPUNIT_ASSERT( bytes[4] == '5' );
transport.close();
}
void testException(){
io::BlockingByteArrayInputStream is;
io::ByteArrayOutputStream os;
MyCommandListener listener;
MyCommandReader reader;
MyCommandWriter writer;
MyExceptionListener exListener;
IOTransport transport;
transport.setCommandListener( &listener );
transport.setCommandReader( &reader );
reader.throwException = true;
transport.setCommandWriter( &writer );
transport.setInputStream( &is );
transport.setOutputStream( &os );
transport.setTransportExceptionListener( &exListener );
unsigned char buffer[1] = { '1' };
try{
synchronized( &is ){
is.setByteArray( buffer, 1);
}
}catch( exceptions::ActiveMQException& ex ){
ex.setMark(__FILE__, __LINE__ );
}
transport.start();
synchronized(&exListener.mutex)
{
if(exListener.transport != &transport)
{
exListener.mutex.wait(1000);
}
}
CPPUNIT_ASSERT( exListener.transport == &transport );
transport.close();
}
};
}}
#endif /*ACTIVEMQ_COMMANDS_IOTRANSPORTTEST_H_*/