/**
 * Copyright 2016 Yahoo Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "HandlerBase.h"
#include <boost/bind.hpp>

#include <cassert>

#include "LogUtils.h"

DECLARE_LOG_OBJECT()

namespace pulsar {

HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic)
        : client_(client),
          topic_(topic),
          connection_(),
          mutex_(),
          creationTimestamp_(now()),
          operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
          state_(Pending),
          backoff_(milliseconds(100), seconds(60)),
          timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
}

HandlerBase::~HandlerBase() {
    timer_->cancel();
}

void HandlerBase::start() {
    grabCnx();
}

void HandlerBase::grabCnx() {
    if (connection_.lock()) {
        LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
        return;
    }

    LOG_INFO(getName() << "Getting connection from pool");
    ClientImplPtr client = client_.lock();
    Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
    future.addListener(
            boost::bind(&HandlerBase::handleNewConnection, _1, _2, get_weak_from_this()));
}

void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr connection,
                                      HandlerBaseWeakPtr weakHandler) {
    HandlerBasePtr handler = weakHandler.lock();
    if (!handler) {
        LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
        return;
    }
    if (result == ResultOk) {
        ClientConnectionPtr conn = connection.lock();
        if (conn) {
            LOG_DEBUG(handler->getName() << "Connected to broker: " << conn->cnxString());
            handler->connectionOpened(conn);
            return;
        }
        // TODO - look deeper into why the connection is null while the result is ResultOk
        LOG_INFO(handler->getName() << "ClientConnectionPtr is no longer valid");
    }
    handler->connectionFailed(result);
    scheduleReconnection(handler);
}

void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr connection,
                                      HandlerBaseWeakPtr weakHandler) {
    HandlerBasePtr handler = weakHandler.lock();
    if (!handler) {
        LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
        return;
    }

    Lock lock(handler->mutex_);
    State state = handler->state_;

    ClientConnectionPtr currentConnection = handler->connection_.lock();
    if (currentConnection && connection.lock().get() != currentConnection.get()) {
        LOG_WARN(
                handler->getName() << "Ignoring connection closed since we are already attached to a newer connection");
        return;
    }

    if (currentConnection) {
        currentConnection.reset();
    }

    switch (state) {
        case Pending:
        case Ready:
            scheduleReconnection(handler);
            break;

        case Closing:
        case Closed:
        case Failed:
            LOG_DEBUG(handler->getName() <<
                    "Ignoring connection closed event since the handler is not used anymore");
            break;
    }
}

bool HandlerBase::isRetriableError(Result result) {
    switch (result) {
        case ResultServiceUnitNotReady:
            return true;
        default:
            return false;
    }
}

void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
    if (handler->state_ == Pending || handler->state_ == Ready) {
        TimeDuration delay = handler->backoff_.next();

        LOG_INFO(
                handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s");
        handler->timer_->expires_from_now(delay);
        //passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
        //so we will not run into the case where grabCnx is invoked on out of scope handler
        handler->timer_->async_wait(boost::bind(&HandlerBase::handleTimeout, _1, handler));
    }
}

void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBasePtr handler) {
    if (ec) {
        LOG_DEBUG(handler->getName() << "Ignoring timer cancelled event, code[" << ec <<"]");
        return;
    } else {
        handler->grabCnx();
    }
}

ptime now() {
    return microsec_clock::universal_time();
}

int64_t currentTimeMillis() {
    static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));

    time_duration diff = now() - time_t_epoch;
    return diff.total_milliseconds();
}

}
