/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "TestSupport.h"

#include <cppunit/extensions/HelperMacros.h>

#include <integration/IntegrationCommon.h>

#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/util/Guid.h>

#include <cms/Connection.h>
#include <cms/Session.h>

#include <sstream>

using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace activemq::concurrent;

using namespace integration;

TestSupport::TestSupport( const string& brokerUrl, cms::Session::AcknowledgeMode ackMode )
 :  connection( NULL ),
    session( NULL )
{
    this->ackMode = ackMode;
    this->brokerUrl = brokerUrl;
}

TestSupport::~TestSupport()
{
    close();
}

void TestSupport::close() {
    try
    {
        if( session != NULL ) {
            session->close();
            delete session;
        }

        if( connection != NULL ) {
            connection->close();
            delete connection;
        }
    }
    AMQ_CATCH_NOTHROW( ActiveMQException )
    AMQ_CATCHALL_NOTHROW( )

    session = NULL;
    connection = NULL;
}

void TestSupport::initialize(){
    try
    {
        numReceived = 0;

        // Now create the connection
        connection = createDetachedConnection(
            "", "", Guid().createGUIDString() );

        // Set ourself as a recipient of Exceptions
        connection->setExceptionListener( this );
        connection->start();

        // Create a Session
        session = connection->createSession( ackMode );
    }
    AMQ_CATCH_RETHROW( ActiveMQException )
    AMQ_CATCHALL_THROW( ActiveMQException )
}

cms::Connection* TestSupport::createDetachedConnection(
    const std::string& username,
    const std::string& password,
    const std::string& clientId ) {

    try
    {
        // Now create the connection
        cms::Connection* connection =
            ActiveMQConnectionFactory::createConnection(
                brokerUrl, username, password, clientId );

        return connection;
    }
    AMQ_CATCH_RETHROW( ActiveMQException )
    AMQ_CATCHALL_THROW( ActiveMQException )
}

void TestSupport::doSleep(void)
{
    Thread::sleep( IntegrationCommon::defaultDelay );
}

unsigned int TestSupport::produceTextMessages(
    cms::MessageProducer& producer,
    unsigned int count,
    cms::Destination* replyTo )
{
    try
    {
        // Send some text messages.
        ostringstream stream;
        string text = "this is a test text message: id = ";

        cms::TextMessage* textMsg =
            session->createTextMessage();
        textMsg->setCMSReplyTo( replyTo );

        unsigned int realCount = 0;

        for( unsigned int ix=0; ix<count; ++ix ){
            stream << text << ix << ends;
            textMsg->setText( stream.str().c_str() );
            stream.str("");
            producer.send( textMsg );
            doSleep();
            ++realCount;
        }

        delete textMsg;

        return realCount;
    }
    AMQ_CATCH_RETHROW( ActiveMQException )
    AMQ_CATCHALL_THROW( ActiveMQException )
}

unsigned int TestSupport::produceBytesMessages(
    cms::MessageProducer& producer,
    unsigned int count,
    cms::Destination* replyTo )
{
    try
    {
        unsigned char buf[10];
        memset( buf, 0, 10 );
        buf[0] = 0;
        buf[1] = 1;
        buf[2] = 2;
        buf[3] = 3;
        buf[4] = 0;
        buf[5] = 4;
        buf[6] = 5;
        buf[7] = 6;

        cms::BytesMessage* bytesMsg =
            session->createBytesMessage();
        bytesMsg->setBodyBytes( buf, 10 );
        bytesMsg->setCMSReplyTo( replyTo );

        unsigned int realCount = 0;
        for( unsigned int ix=0; ix<count; ++ix ){
            producer.send( bytesMsg );
            doSleep();
            ++realCount;
        }

        delete bytesMsg;

        return realCount;
    }
    AMQ_CATCH_RETHROW( ActiveMQException )
    AMQ_CATCHALL_THROW( ActiveMQException )
}

void TestSupport::waitForMessages( unsigned int count )
{
    try
    {
        synchronized( &mutex )
        {
            int stopAtZero = count + 5;

            while( numReceived < count )
            {
                mutex.wait( 500 );

                if( --stopAtZero == 0 )
                {
                    break;
                }
            }
        }
    }
    AMQ_CATCH_RETHROW( ActiveMQException )
    AMQ_CATCHALL_THROW( ActiveMQException )
}

void TestSupport::onException( const cms::CMSException& error )
{
    CPPUNIT_ASSERT_MESSAGE( error.getStackTraceString(), false );
}

void TestSupport::onMessage( const cms::Message* message )
{
    if( session->getAcknowledgeMode() == cms::Session::CLIENT_ACKNOWLEDGE ) {
        try {
            message->acknowledge();
        } catch( CMSException& ex ) {
            CPPUNIT_ASSERT_MESSAGE(ex.getStackTraceString(), false );
        }
    }

    // Got a text message.
    const cms::TextMessage* txtMsg =
        dynamic_cast<const cms::TextMessage*>(message);

    if( txtMsg != NULL )
    {
        std::string text = txtMsg->getText();

        if( IntegrationCommon::debug ) {
            printf("received text msg: %s\n", txtMsg->getText().c_str() );
        }

        numReceived++;

        // Signal that we got one
        synchronized( &mutex )
        {
            mutex.notifyAll();
        }

        return;
    }

    // Got a bytes msg.
    const cms::BytesMessage* bytesMsg =
        dynamic_cast<const cms::BytesMessage*>(message);

    if( bytesMsg != NULL )
    {
        const unsigned char* bytes = bytesMsg->getBodyBytes();

        string transcode( (const char*)bytes, bytesMsg->getBodyLength() );

        if( IntegrationCommon::debug ) {
            printf("received bytes msg: %s\n", transcode.c_str() );
        }

        numReceived++;

        // Signal that we got one
        synchronized( &mutex )
        {
            mutex.notifyAll();
        }

        return;
    }

}
