/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "Event.h"
#include "HaBroker.h"
#include "IdSetter.h"
#include "LogPrefix.h"
#include "QueueReplicator.h"
#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "types.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/bind.hpp>


namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
using boost::shared_ptr;

const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");

std::string QueueReplicator::replicatorName(const std::string& queueName) {
    return QUEUE_REPLICATOR_PREFIX + queueName;
}

bool QueueReplicator::isReplicatorName(const std::string& name) {
    return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}

namespace {
void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) {
    shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
    if (qr) v.push_back(qr);
}
}

void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
    registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}

// Debug log expected exceptions on queue replicator, check incoming execution
// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
  public:
    ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
        : queueReplicator(qr), logPrefix(qr->logPrefix.prePrefix, qr->logPrefix.get()) {}

    void connectionException(framing::connection::CloseCode code, const std::string& msg) {
        QPID_LOG(error, logPrefix << "Outgoing " << framing::createConnectionException(code, msg).what());
    }
    void channelException(framing::session::DetachCode code, const std::string& msg) {
        QPID_LOG(error, logPrefix << "Outgoing " << framing::createChannelException(code, msg).what());
    }
    void executionException(framing::execution::ErrorCode code, const std::string& msg) {
        QPID_LOG(error, logPrefix  << "Outgoing " << framing::createSessionException(code, msg).what());
    }
    void incomingExecutionException(ErrorCode code, const std::string& msg) {
        boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
        if (!(qr && qr->deletedOnPrimary(code, msg)))
            QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
    }
    void detach() {}

  private:
    boost::weak_ptr<QueueReplicator> queueReplicator;
    LogPrefix2 logPrefix;
};

class QueueReplicator::QueueObserver : public broker::QueueObserver {
  public:
    typedef boost::shared_ptr<QueueReplicator> Ptr;
    QueueObserver(Ptr qr) : queueReplicator(qr) {}

    void enqueued(const Message& m) {
        Ptr qr = queueReplicator.lock();
        if (qr) qr->enqueued(m);
    }

    void dequeued(const Message& m) {
        Ptr qr = queueReplicator.lock();
        if (qr) qr->dequeued(m);
    }

    void acquired(const Message&) {}
    void requeued(const Message&) {}
    void consumerAdded( const Consumer& ) {}
    void consumerRemoved( const Consumer& ) {}
    // Queue observer is destroyed when the queue is.
    void destroy() {
        Ptr qr = queueReplicator.lock();
        if (qr) qr->destroy();
    }

  private:
    boost::weak_ptr<QueueReplicator> queueReplicator;
};


boost::shared_ptr<QueueReplicator> QueueReplicator::create(
    HaBroker& hb, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l)
{
    boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(hb, q, l));
    qr->initialize();
    return qr;
}

QueueReplicator::QueueReplicator(HaBroker& hb,
                                 boost::shared_ptr<Queue> q,
                                 boost::shared_ptr<Link> l)
    : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
      haBroker(hb),
      brokerInfo(hb.getBrokerInfo()),
      link(l),
      queue(q),
      sessionHandler(0),
      logPrefix(hb.logPrefix, "Backup of "+q->getName()+": "),
      subscribed(false),
      settings(hb.getSettings()),
      nextId(0), maxId(0)
{
    QPID_LOG(debug, logPrefix << "Created");
    // The QueueReplicator will take over setting replication IDs.
    boost::shared_ptr<IdSetter> setter =
        q->getMessageInterceptors().findType<IdSetter>();
    if (setter) q->getMessageInterceptors().remove(setter);

    args.setString(QPID_REPLICATE, printable(NONE).str());
    Uuid uuid(true);
    bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
    framing::FieldTable args = getArgs();
    args.setString(QPID_REPLICATE, printable(NONE).str());
    setArgs(args);
    // Don't allow backup queues to auto-delete, primary decides when to delete.
    if (q->isAutoDelete()) q->markInUse(false);

    dispatch[DequeueEvent::KEY] =
        boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
    dispatch[IdEvent::KEY] =
        boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}

QueueReplicator::~QueueReplicator() {}

void QueueReplicator::initialize() {
    Mutex::ScopedLock l(lock);
    if (!queue) return;         // Already destroyed

    // Enable callback to route()
    if (!getBroker()->getExchanges().registerExchange(shared_from_this()))
        throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));

    // Enable callback to initializeBridge
    boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare(
        bridgeName,
        *link,
        false,              // durable
        queue->getName(),   // src
        getName(),          // dest
        "",                 // key
        false,              // isQueue
        false,              // isLocal
        "",                 // id/tag
        "",                 // excludes
        false,              // dynamic
        0,                  // sync?
        LinkRegistry::INFINITE_CREDIT,
        // Include shared_ptr to self to ensure we are not deleted
        // before initializeBridge is called.
        boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
    ).first;
    b->setErrorListener(
        boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
    bridge = b;                 // bridge is a weak_ptr to avoid a cycle.

    // Enable callback to destroy()
    queue->getObservers().add(
        boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}

void QueueReplicator::disconnect() {
    Mutex::ScopedLock l(lock);
    sessionHandler = 0;
}

// Called from Queue::destroyed()
void QueueReplicator::destroy() {
    QPID_LOG(debug, logPrefix << "Destroyed");
    boost::shared_ptr<Bridge> bridge2; // To call outside of lock
    {
        Mutex::ScopedLock l(lock);
        if (!queue) return;     // Already destroyed
        bridge2 = bridge.lock(); // !call close outside the lock.
        destroy(l);
    }
    if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}

void QueueReplicator::destroy(Mutex::ScopedLock&) {
    // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
    queue.reset();
    bridge.reset();
    getBroker()->getExchanges().destroy(getName());
}

boost::shared_ptr<QueueSnapshot> QueueReplicator::getSnapshot()
{
    boost::shared_ptr<broker::Queue> q;
    {
        Mutex::ScopedLock l(lock);
        if (queue) {
            q = queue;
        } else {
            return boost::shared_ptr<QueueSnapshot>();
        }
    }
    return q->getObservers().findType<QueueSnapshot>();
}


// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
    boost::shared_ptr<QueueSnapshot> qs = getSnapshot();//do outside the lock
    Mutex::ScopedLock l(lock);
    if (!queue) return;         // Already destroyed

    sessionHandler = &sessionHandler_;
    if (sessionHandler->getSession()) {
        // Don't overwrite the exchange property set on the primary.
        sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false);
    }
    AMQP_ServerProxy peer(sessionHandler->out);
    const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
    FieldTable arguments;
    arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
    arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
    arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
    ReplicationIdSet snapshot;
    if (qs) {
        snapshot = qs->getSnapshot();
        arguments.set(
            ReplicatingSubscription::QPID_ID_SET,
            FieldTable::ValuePtr(new Var32Value(encodeStr(snapshot), TYPE_CODE_VBIN32)));
    }
    try {
        peer.getMessage().subscribe(
            args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
            false/*exclusive*/, "", 0, arguments);
        peer.getMessage().setFlowMode(getName(), 1); // Window
        peer.getMessage().flow(getName(), 0, settings.getFlowMessages());
        peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
    }
    catch(const exception& e) {
        QPID_LOG(error, logPrefix << "Cannot connect to primary: " << e.what());
        throw;
    }
    qpid::Address primary;
    link->getRemoteAddress(primary);
    QPID_LOG(debug, logPrefix << "Connected to " << primary << " snapshot=" << snapshot << " bridge=" << bridgeName);
    QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}

namespace {
template <class T> T decodeContent(Message& m) {
    std::string content = m.getContent();
    Buffer buffer(const_cast<char*>(content.c_str()), content.size());
    T result;
    result.decode(buffer);
    return result;
}
}

void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
    DequeueEvent e;
    decodeStr(data, e);
    QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
    //TODO: should be able to optimise the following
    for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
        QueuePosition position;
        {
            Mutex::ScopedLock l(lock);
            PositionMap::iterator j = positions.find(*i);
            if (j == positions.end()) continue;
            position = j->second;
        }
        Mutex::ScopedUnlock u(lock);//this method is called under lock, so need to release
        queue->dequeueMessageAt(position); // Outside lock, will call dequeued().
        // positions will be cleaned up in dequeued()
    }
}

// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& deliverable)
{
    try {
        broker::Message& message(deliverable.getMessage());
        {
            Mutex::ScopedLock l(lock);
            if (!queue) return;     // Already destroyed
            string key(message.getRoutingKey());
            if (isEventKey(key)) {
                DispatchMap::iterator i = dispatch.find(key);
                if (i == dispatch.end()) {
                    QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
                } else {
                    (i->second)(message.getContent(), l);
                }
                return;
            }
            ReplicationId id = nextId++;
            message.setReplicationId(id);
            PositionMap::iterator i = positions.find(id);
            if (i != positions.end()) {
                QPID_LOG(trace, logPrefix << "Already on queue: " << logMessageId(*queue, message));
                return;
            }
            QPID_LOG(trace, logPrefix << "Received: " << logMessageId(*queue, message));
        }
        deliver(message);       // Outside lock, will call enqueued()
    }
    catch (const std::exception& e) {
        haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
    }

}

void QueueReplicator::deliver(const broker::Message& m) {
    queue->deliver(m);
}

// Called via QueueObserver when message is enqueued. Could be as part of deliver()
// or in a different thread if a message is enqueued via a transaction.
//
void QueueReplicator::enqueued(const broker::Message& m) {
    Mutex::ScopedLock l(lock);
    maxId = std::max(maxId, ReplicationId(m.getReplicationId()));
    positions[m.getReplicationId()] = m.getSequence();
    QPID_LOG(trace, logPrefix << "Enqueued " << logMessageId(*queue, m));
}

// Called via QueueObserver
void QueueReplicator::dequeued(const broker::Message& m) {
    Mutex::ScopedLock l(lock);
    positions.erase(m.getReplicationId());
}

void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
    nextId = decodeStr<IdEvent>(data).id;
}

bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
    if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
        // If the queue is destroyed at the same time we are subscribing, we may
        // get a not-found or resource-deleted exception before the
        // BrokerReplicator gets the queue-delete event. Shut down the bridge by
        // calling destroy(), we can let the BrokerReplicator delete the queue
        // when the queue-delete arrives.
        QPID_LOG(debug, logPrefix << "Deleted on primary: "
                 << framing::createSessionException(e, msg).what());
        destroy();
        return true;
    }
    return false;
}

// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }

void QueueReplicator::promoted() {
    if (queue) {
        // On primary QueueReplicator no longer sets IDs, start an IdSetter.
        QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
        queue->getMessageInterceptors().add(
            boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, queue->getName(), maxId+1)));
        // Process auto-deletes
        if (queue->isAutoDelete()) {
            // Make a temporary shared_ptr to prevent premature deletion of queue.
            // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
            // which could delete the queue while it's still running it's destroyed logic.
            boost::shared_ptr<Queue> q(queue);
            // See markInUse in ctor: release but don't delete if never used.
            q->releaseFromUse(false/*controller*/, subscribed/*doDelete*/);
        }
    }
}

}} // namespace qpid::broker
