/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "FailoverTransport.h"

#include <activemq/commands/ConnectionControl.h>
#include <activemq/commands/ShutdownInfo.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/transport/TransportRegistry.h>
#include <activemq/threads/DedicatedTaskRunner.h>
#include <activemq/threads/CompositeTaskRunner.h>
#include <decaf/util/Random.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Integer.h>

using namespace std;
using namespace activemq;
using namespace activemq::state;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::transport;
using namespace activemq::transport::failover;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::net;
using namespace decaf::util;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;

////////////////////////////////////////////////////////////////////////////////
FailoverTransport::FailoverTransport() {

    this->timeout = -1;
    this->initialReconnectDelay = 10;
    this->maxReconnectDelay = 1000 * 30;
    this->backOffMultiplier = 2;
    this->useExponentialBackOff = true;
    this->initialized = false;
    this->maxReconnectAttempts = 0;
    this->startupMaxReconnectAttempts = 0;
    this->connectFailures = 0;
    this->reconnectDelay = this->initialReconnectDelay;
    this->trackMessages = false;
    this->trackTransactionProducers = true;
    this->maxCacheSize = 128 * 1024;

    this->started = false;
    this->closed = false;
    this->connected = false;
    this->connectionInterruptProcessingComplete = false;
    this->firstConnection = true;

    this->transportListener = NULL;
    this->uris.reset( new URIPool() );
    this->stateTracker.setTrackTransactions( true );
    this->myTransportListener.reset( new FailoverTransportListener( this ) );
    this->closeTask.reset( new CloseTransportsTask() );
    this->taskRunner.reset( new CompositeTaskRunner() );
    this->backups.reset( new BackupTransportPool( taskRunner, closeTask, uris ) );

    this->taskRunner->addTask( this );
    this->taskRunner->addTask( this->closeTask.get() );
}

////////////////////////////////////////////////////////////////////////////////
FailoverTransport::~FailoverTransport() {
    try{
        close();
    }
    AMQ_CATCH_NOTHROW( Exception )
    AMQ_CATCHALL_NOTHROW()
}

////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isShutdownCommand( const Pointer<Command>& command ) const {

    if( command != NULL ) {

        if( command->isShutdownInfo() || command->isRemoveInfo() ) {
            return true;
        }
    }

    return false;
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::add( const std::string& uri ) {

    try {
        uris->addURI( URI( uri ) );

        reconnect();
    }
    AMQ_CATCHALL_NOTHROW()
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::addURI( const List<URI>& uris ) {

    std::auto_ptr< Iterator<URI> > iter( uris.iterator() );

    while( iter->hasNext() ) {
        this->uris->addURI( iter->next() );
    }

    reconnect();
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::removeURI( const List<URI>& uris ) {

    std::auto_ptr< Iterator<URI> > iter( uris.iterator() );

    while( iter->hasNext() ) {
        this->uris->removeURI( iter->next() );
    }

    reconnect();
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect( const decaf::net::URI& uri )
    throw( decaf::io::IOException ) {

    try {

        this->uris->addURI( uri );

        reconnect();
    }
    AMQ_CATCH_RETHROW( IOException )
    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
    AMQ_CATCHALL_THROW( IOException )
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTransportListener( TransportListener* listener ) {
    synchronized( &listenerMutex ) {
        this->transportListener = listener;
        listenerMutex.notifyAll();
    }
}

////////////////////////////////////////////////////////////////////////////////
TransportListener* FailoverTransport::getTransportListener() const {
    synchronized( &listenerMutex ) {
        return this->transportListener;
    }

    return NULL;
}

////////////////////////////////////////////////////////////////////////////////
std::string FailoverTransport::getRemoteAddress() const {
    synchronized( &reconnectMutex ) {
        if( connectedTransport != NULL ) {
            return connectedTransport->getRemoteAddress();
        }
    }
    return "";
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::oneway( const Pointer<Command>& command )
    throw( IOException, decaf::lang::exceptions::UnsupportedOperationException ) {

    Pointer<Exception> error;

    try {

        synchronized( &reconnectMutex ) {

            if( isShutdownCommand( command ) && connectedTransport == NULL ) {

                if( command->isShutdownInfo() ) {
                    // Skipping send of ShutdownInfo command when not connected.
                    return;
                }

                if( command->isRemoveInfo() || command->isMessageAck() ) {
                    // Simulate response to RemoveInfo command or Ack as they will be stale.
                    stateTracker.track( command );
                    Pointer<Response> response( new Response() );
                    response->setCorrelationId( command->getCommandId() );
                    myTransportListener->onCommand( response );
                    return;
                }
            }

            // Keep trying until the message is sent.
            for( int i = 0; !closed; i++ ) {
                try {

                    // Wait for transport to be connected.
                    Pointer<Transport> transport = connectedTransport;
                    long long start = System::currentTimeMillis();
                    bool timedout = false;

                    while( transport == NULL && !closed && connectionFailure == NULL ) {
                        long long end = System::currentTimeMillis();
                        if( timeout > 0 && ( end - start > timeout ) ) {
                            timedout = true;
                            break;
                        }

                        reconnectMutex.wait( 100 );
                        transport = connectedTransport;
                    }

                    if( transport == NULL ) {
                        // Previous loop may have exited due to us being disposed.
                        if( closed ) {
                            error.reset( new IOException(
                                __FILE__, __LINE__, "Transport disposed.") );
                        } else if( connectionFailure != NULL ) {
                            error = connectionFailure;
                        } else if( timedout == true ) {
                            error.reset( new IOException(
                                __FILE__, __LINE__,
                                "Failover timeout of %d ms reached.", timeout ) );
                        } else {
                            error.reset( new IOException(
                                __FILE__, __LINE__, "Unexpected failure.") );
                        }

                        break;
                    }

                    // If it was a request and it was not being tracked by the state
                    // tracker, then hold it in the requestMap so that we can replay
                    // it later.
                    Pointer<Tracked> tracked;
                    try{
                        tracked = stateTracker.track( command );
                        synchronized( &requestMap ) {
                            if( tracked != NULL && tracked->isWaitingForResponse() ) {
                                requestMap.put( command->getCommandId(), tracked );
                            } else if( tracked == NULL && command->isResponseRequired() ) {
                                requestMap.put( command->getCommandId(), command );
                            }
                        }
                    } catch( Exception& ex ) {
                        ex.setMark( __FILE__, __LINE__ );
                        error.reset( ex.clone() );
                        break;
                    }

                    // Send the message.
                    try {
                        transport->oneway( command );
                        stateTracker.trackBack( command );
                    } catch( IOException& e ) {

                        e.setMark( __FILE__, __LINE__ );

                        // If the command was not tracked.. we will retry in
                        // this method
                        if( tracked == NULL ) {

                            // since we will retry in this method.. take it out of the
                            // request map so that it is not sent 2 times on recovery
                            if( command->isResponseRequired() ) {
                                requestMap.remove( command->getCommandId() );
                            }

                            // Rethrow the exception so it will handled by
                            // the outer catch
                            throw e;
                        }
                    }

                    return;
                } catch( IOException& e ) {
                    e.setMark( __FILE__, __LINE__ );
                    handleTransportFailure( e );
                }
            }
        }
    }
    AMQ_CATCH_NOTHROW( Exception )
    AMQ_CATCHALL_NOTHROW()

    if( !closed ) {
        if( error != NULL ) {
            throw IOException( *error );
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
Pointer<Response> FailoverTransport::request( const Pointer<Command>& command AMQCPP_UNUSED )
    throw( IOException,
           decaf::lang::exceptions::UnsupportedOperationException ) {

    throw decaf::lang::exceptions::UnsupportedOperationException(
        __FILE__, __LINE__, "FailoverTransport::request - Not Supported" );
}

////////////////////////////////////////////////////////////////////////////////
Pointer<Response> FailoverTransport::request( const Pointer<Command>& command AMQCPP_UNUSED,
                                                unsigned int timeout AMQCPP_UNUSED )
    throw( IOException,
           decaf::lang::exceptions::UnsupportedOperationException ) {

    throw decaf::lang::exceptions::UnsupportedOperationException(
        __FILE__, __LINE__, "FailoverTransport::request - Not Supported" );
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::start() throw( IOException ) {

    try{

        synchronized( &reconnectMutex ) {

            if( this->started ) {
                return;
            }

            started = true;

            stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
            stateTracker.setTrackMessages( this->isTrackMessages() );
            stateTracker.setTrackTransactionProducers( this->isTrackTransactionProducers() );

            if( connectedTransport != NULL ) {
                stateTracker.restore( connectedTransport );
            } else {
                reconnect();
            }
        }
    }
    AMQ_CATCH_RETHROW( IOException )
    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
    AMQ_CATCHALL_THROW( IOException )
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::stop() throw( IOException ) {

    try{
    }
    AMQ_CATCH_RETHROW( IOException )
    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
    AMQ_CATCHALL_THROW( IOException )
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::close() throw( IOException ) {

    try{

        Pointer<Transport> transportToStop;

        synchronized( &reconnectMutex ) {
            if (!started) {
                return;
            }

            started = false;
            closed = true;
            connected = false;

            backups->setEnabled( false );
            requestMap.clear();

            if( connectedTransport != NULL ) {
                transportToStop.swap( connectedTransport );
            }

            reconnectMutex.notifyAll();
        }

        synchronized( &sleepMutex ) {
            sleepMutex.notifyAll();
        }

        taskRunner->shutdown( 2000 );

        if( transportToStop != NULL ) {
            transportToStop->close();
        }
    }
    AMQ_CATCH_RETHROW( IOException )
    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
    AMQ_CATCHALL_THROW( IOException )
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect() {

    synchronized( &reconnectMutex  ) {
        if( started ) {
            taskRunner->wakeup();
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::restoreTransport( const Pointer<Transport>& transport )
    throw( IOException ) {

    try{

        transport->start();

        //send information to the broker - informing it we are an ft client
        Pointer<ConnectionControl> cc( new ConnectionControl() );
        cc->setFaultTolerant( true );
        transport->oneway( cc );

        stateTracker.restore( transport );
        std::vector< Pointer<Command> > commands;
        synchronized( &requestMap ) {
            commands = requestMap.values();
        }

        std::vector< Pointer<Command> >::const_iterator iter = commands.begin();
        for( ; iter != commands.end(); ++iter ) {
            transport->oneway( *iter );
        }
    }
    AMQ_CATCH_RETHROW( IOException )
    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
    AMQ_CATCHALL_THROW( IOException )
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::handleTransportFailure( const decaf::lang::Exception& error AMQCPP_UNUSED )
    throw( decaf::lang::Exception ) {

    Pointer<Transport> transport;
    synchronized( &reconnectMutex ) {
        connectedTransport.swap( transport );
    }

    if( transport != NULL ) {

        //std::cout << "Failover: Connection to has been unexpectedly terminated." << std::endl;

        if( this->disposedListener != NULL ) {
            transport->setTransportListener( disposedListener.get() );
        }

        // Hand off to the close task so it gets done in a different thread.
        closeTask->add( transport );

        synchronized( &reconnectMutex ) {

            initialized = false;
            uris->addURI( *connectedTransportURI );
            connectedTransportURI.reset( NULL );
            connected = false;

            // Place the State Tracker into a reconnection state.
            this->stateTracker.transportInterrupted();

            // Notify before we attempt to reconnect so that the consumers have a chance
            // to cleanup their state.
            if( transportListener != NULL ) {
                transportListener->transportInterrupted();
            }

            if( started ) {
                taskRunner->wakeup();
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isPending() const {
    bool result = false;

    synchronized( &reconnectMutex ) {
        if( this->connectedTransport == NULL && !closed && started ) {

            int reconnectAttempts = 0;
            if( firstConnection ) {
                if( startupMaxReconnectAttempts != 0 ) {
                    reconnectAttempts = startupMaxReconnectAttempts;
                }
            }

            if( reconnectAttempts == 0 ) {
                reconnectAttempts = maxReconnectAttempts;
            }

            if( reconnectAttempts > 0 && connectFailures >= reconnectAttempts ) {
                result = false;
            } else {
                result = true;
            }
        }
    }

    return result;
}

////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::iterate() {

    Pointer<Exception> failure;

    synchronized( &reconnectMutex ) {

        if( closed || connectionFailure != NULL ) {
            reconnectMutex.notifyAll();
        }

        if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
            return false;
        } else {

            StlList<URI> failures;
            Pointer<Transport> transport;
            URI uri;

            if( !useExponentialBackOff ) {
                reconnectDelay = initialReconnectDelay;
            }

            if( backups->isEnabled() ) {

                Pointer<BackupTransport> backupTransport = backups->getBackup();

                if( backupTransport != NULL ) {

                    transport = backupTransport->getTransport();
                    uri = backupTransport->getUri();
                    transport->setTransportListener( myTransportListener.get() );

                    try {

                        if( started ) {
                            restoreTransport( transport );
                        }

                    } catch( Exception& e ) {

                        if( transport != NULL ) {
                            if( this->disposedListener != NULL ) {
                                transport->setTransportListener( disposedListener.get() );
                            }

                            // Hand off to the close task so it gets done in a different thread
                            // this prevents a deadlock from occurring if the Transport happens
                            // to call back through our onException method or locks in some other
                            // way.
                            closeTask->add( transport );
                            taskRunner->wakeup();
                            transport.reset( NULL );
                        }

                        this->uris->addURI( uri );
                    }
                }
            }

            while( transport == NULL && !closed ) {

                try{
                    uri = uris->getURI();
                } catch( NoSuchElementException& ex ) {
                    break;
                }

                try {

                    transport = createTransport( uri );
                    transport->setTransportListener( myTransportListener.get() );
                    transport->start();

                    if( started ) {
                        restoreTransport( transport );
                    }

                } catch( Exception& e ) {
                    e.setMark( __FILE__, __LINE__ );

                    if( transport != NULL ) {
                        if( this->disposedListener != NULL ) {
                            transport->setTransportListener( disposedListener.get() );
                        }

                        try{
                            transport->stop();
                        } catch(...) {}

                        // Hand off to the close task so it gets done in a different thread
                        // this prevents a deadlock from occurring if the Transport happens
                        // to call back through our onException method or locks in some other
                        // way.
                        closeTask->add( transport );
                        taskRunner->wakeup();
                        transport.reset( NULL );
                    }

                    failures.add( uri );
                    failure.reset( e.clone() );
                }
            }

            // Return the failures to the pool, we will try again on the next iteration.
            this->uris->addURIs( failures );

            if( transport != NULL ) {
                reconnectDelay = initialReconnectDelay;
                connectedTransportURI.reset( new URI( uri ) );
                connectedTransport = transport;
                reconnectMutex.notifyAll();
                connectFailures = 0;
                connected = true;

                // Make sure on initial startup, that the transportListener
                // has been initialized for this instance.
                synchronized( &listenerMutex ) {
                    if( transportListener == NULL ) {
                        // if it isn't set after 2secs - it probably never will be
                        listenerMutex.wait( 2000 );
                    }
                }

                if( transportListener != NULL ) {
                    transportListener->transportResumed();
                }

                if( firstConnection ) {
                    firstConnection = false;
                }

                return false;
            }
        }

        int reconnectAttempts = 0;
        if( firstConnection ) {
            if( startupMaxReconnectAttempts != 0 ) {
                reconnectAttempts = startupMaxReconnectAttempts;
            }
        }
        if( reconnectAttempts == 0 ) {
            reconnectAttempts = maxReconnectAttempts;
        }

        if( reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts ) {
            connectionFailure = failure;

            // Make sure on initial startup, that the transportListener has been initialized
            // for this instance.
            synchronized( &listenerMutex ) {
                if( transportListener == NULL ) {
                    listenerMutex.wait( 2000 );
                }
            }

            if( transportListener != NULL ) {

                Pointer<IOException> ioException;
                try{
                    ioException = connectionFailure.dynamicCast<IOException>();
                }
                AMQ_CATCH_NOTHROW( ClassCastException )

                if( ioException != NULL ) {
                    transportListener->onException( *connectionFailure );
                } else {
                    transportListener->onException( IOException( *connectionFailure ) );
                }
            }

            reconnectMutex.notifyAll();
            return false;
        }
    }

    if( !closed ) {

        synchronized( &sleepMutex ) {
            sleepMutex.wait( (unsigned int)reconnectDelay );
        }

        if( useExponentialBackOff ) {
            // Exponential increment of reconnect delay.
            reconnectDelay *= backOffMultiplier;
            if( reconnectDelay > maxReconnectDelay ) {
                reconnectDelay = maxReconnectDelay;
            }
        }
    }

    return !closed;
}

////////////////////////////////////////////////////////////////////////////////
Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const
    throw ( decaf::io::IOException ) {

    try{

        TransportFactory* factory =
            TransportRegistry::getInstance().findFactory( location.getScheme() );

        if( factory == NULL ) {
            throw new IOException(
                __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
        }

        Pointer<Transport> transport( factory->createComposite( location ) );

        return transport;
    }
    AMQ_CATCH_RETHROW( IOException )
    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
    AMQ_CATCHALL_THROW( IOException )
}

////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId ) {

    synchronized( &reconnectMutex ) {
        stateTracker.connectionInterruptProcessingComplete( this, connectionId );
    }
}
