/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "CqQueryImpl.hpp"

#include <geode/CqAttributesFactory.hpp>
#include <geode/CqAttributesMutator.hpp>
#include <geode/ExceptionTypes.hpp>

#include "ReadWriteLock.hpp"
#include "ResultSetImpl.hpp"
#include "StructSetImpl.hpp"
#include "ThinClientRegion.hpp"
#include "UserAttributes.hpp"
#include "util/Log.hpp"
#include "util/bounds.hpp"
#include "util/exception.hpp"

namespace apache {
namespace geode {
namespace client {

CqQueryImpl::CqQueryImpl(
    const std::shared_ptr<CqService>& cqService, const std::string& cqName,
    const std::string& queryString,
    const std::shared_ptr<CqAttributes>& cqAttributes,
    StatisticsFactory* factory, const bool isDurable,
    const std::shared_ptr<UserAttributes>& userAttributesPtr)
    : m_cqName(cqName),
      m_queryString(queryString),
      m_cqAttributes(CqAttributesFactory(cqAttributes).create()),
      m_cqAttributesMutator(m_cqAttributes),
      m_cqService(cqService),
      // On Client Side serverCqName and cqName will be same.
      m_serverCqName(cqName),
      m_isDurable(isDurable),
      m_stats(std::make_shared<CqQueryVsdStats>(factory, m_cqName)),
      m_cqState(CqState::STOPPED),  // Initial state is stopped
      m_cqOperation(CqOperation::OP_TYPE_INVALID),
      m_tccdm(m_cqService->getDM()) {
  if (userAttributesPtr != nullptr) {
    m_authenticatedView = userAttributesPtr->getAuthenticatedView();
  } else {
    m_authenticatedView = nullptr;
  }
}

CqQueryImpl::~CqQueryImpl() {}
/**
 * returns CQ name
 */
void CqQueryImpl::updateStats() { m_cqService->updateStats(); }

const std::string& CqQueryImpl::getName() const { return m_cqName; }

/**
 * sets the CqName.
 */
void CqQueryImpl::setName(std::string cqName) {
  m_cqName = m_serverCqName = cqName;
}

/**
 * Initializes the CqQuery.
 * creates Query object, if its valid adds into repository.
 */
void CqQueryImpl::initCq() {
  addToCqMap();

  // Initialize the VSD statistics

  // Update statistics with CQ creation.
  auto& stats = m_cqService->getCqServiceVsdStats();
  // stats.incNumCqsStopped();
  stats.incNumCqsCreated();
  // stats.incNumCqsOnClient();
  updateStats();
}

/**
 * Closes the Query.
 *        On Client side, sends the cq close request to server.
 *        On Server side, takes care of repository cleanup.
 * @throws CqException
 */
void CqQueryImpl::close() { close(true); }

/**
 * Closes the Query.
 *        On Client side, sends the cq close request to server.
 * @param sendRequestToServer true to send the request to server.
 * @throws CqException
 */
void CqQueryImpl::close(bool sendRequestToServer) {
  // Check if the cq is already closed.
  if (isClosed()) {
    // throw CqClosedException("CQ is already closed, CqName : " + this.cqName);
    LOGFINE("CQ is already closed, CqName : %s", m_cqName.c_str());
    return;
  }

  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    gua.setAuthenticatedView(m_authenticatedView);
  }
  LOGFINE("Started closing CQ CqName : %s", m_cqName.c_str());

  // bool isClosed = false;

  // Stat update.
  auto& stats = m_cqService->getCqServiceVsdStats();
  /*
  if (isRunning()) {
      stats.decNumCqsActive();
  }
  else if (isStopped()) {
      stats.decNumCqsStopped();
  }
  */
  setCqState(CqState::CLOSING);
  if (sendRequestToServer == true) {
    try {
      sendStopOrClose(TcrMessage::CLOSECQ_MSG_TYPE);
    } catch (...) {
      // ignore and move on
    }
  }

  // Set the state to close, and update stats
  setCqState(CqState::CLOSED);
  stats.incNumCqsClosed();

  // Invoke close on Listeners if any.
  if (m_cqAttributes) {
    auto cqListeners = m_cqAttributes->getCqListeners();

    if (!cqListeners.empty()) {
      LOGFINE(
          "Invoking CqListeners close() api for the CQ, CqName : %s  Number of "
          "CqListeners : %d",
          m_cqName.c_str(), cqListeners.size());

      for (auto& l : cqListeners) {
        try {
          l->close();
          // Handle client side exceptions.
        } catch (Exception& ex) {
          LOGWARN(
              "Exception occoured in the CqListener of the CQ, CqName : "
              "%sError : %s",
              m_cqName.c_str(), ex.what());
        }
      }
    }
  }

  removeFromCqMap();
  updateStats();
  LOGFINE("Successfully closed the CQ. %s", m_cqName.c_str());
}

/**
 * Store this CQ in the cqService's cqMap.
 * @throws CqException
 */
void CqQueryImpl::addToCqMap() {
  // Add CQ to the CQ repository
  try {
    LOGFINE("Adding to CQ Repository. CqName : %s Server CqName : %s",
            m_cqName.c_str(), m_serverCqName.c_str());
    std::shared_ptr<CqQuery> cq = shared_from_this();
    m_cqService->addCq(m_cqName, cq);
  } catch (Exception& ex) {
    std::string errMsg =
        "Failed to store Continuous Query in the repository. CqName: " +
        m_cqName + ex.what();
    LOGERROR(errMsg.c_str());
    throw CqException(errMsg.c_str());
  }
  LOGFINE("Stored CQ in the CQ repository. %s", m_cqName.c_str());
}

/**
 * Removes the CQ from CQ repository.
 * @throws CqException
 */
void CqQueryImpl::removeFromCqMap() {
  try {
    m_cqService->removeCq(m_cqName);
  } catch (Exception& ex) {
    std::string errMsg =
        "Failed to remove Continuous Query From the repository. CqName: " +
        m_cqName + " Error : " + ex.what();
    LOGERROR(errMsg.c_str());
    throw CqException(errMsg.c_str());
  }
  LOGFINE("Removed CQ from the CQ repository. CQ Name: %s", m_cqName.c_str());
}

/**
 * Returns the QueryString of this CQ.
 */
const std::string& CqQueryImpl::getQueryString() const { return m_queryString; }

/**
 * Return the query
 * @return the Query for the query string
 */
std::shared_ptr<Query> CqQueryImpl::getQuery() const { return m_query; }

/**
 * @see org.apache.geode.cache.query.CqQuery#getStatistics()
 */
std::shared_ptr<CqStatistics> CqQueryImpl::getStatistics() const {
  return m_stats;
}

std::shared_ptr<CqAttributes> CqQueryImpl::getCqAttributes() const {
  return m_cqAttributes;
}

/**
 * Clears the resource used by CQ.
 * @throws CqException
 */
void CqQueryImpl::cleanup() { removeFromCqMap(); }

/**
 * @return Returns the cqListeners.
 */
void CqQueryImpl::getCqListeners(
    CqAttributes::listener_container_type& cqListener) {
  cqListener = m_cqAttributes->getCqListeners();
}

GfErrType CqQueryImpl::execute(TcrEndpoint* endpoint) {
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  if (m_cqState != CqState::RUNNING) {
    return GF_NOERR;
  }

  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    gua.setAuthenticatedView(m_authenticatedView);
  }

  LOGFINE("Executing CQ [%s]", m_cqName.c_str());

  TcrMessageExecuteCq request(new DataOutput(m_cqService->getDM()
                                                 ->getConnectionManager()
                                                 .getCacheImpl()
                                                 ->createDataOutput()),
                              m_cqName, m_queryString, CqState::RUNNING,
                              isDurable(), m_tccdm);
  TcrMessageReply reply(true, m_tccdm);

  GfErrType err = GF_NOERR;

  err = m_tccdm->sendRequestToEP(request, reply, endpoint);

  if (err != GF_NOERR) {
    // GfErrTypeToException("CqQuery::execute(endpoint)", err);
    return err;
  }

  if (reply.getMessageType() == TcrMessage::EXCEPTION ||
      reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
      reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
    err = ThinClientRegion::handleServerException("CqQuery::execute(endpoint)",
                                                  reply.getException());
    /*
    if (err == GF_CACHESERVER_EXCEPTION) {
      throw CqQueryException("CqQuery::execute(endpoint): exception at the
    server side: ",
              reply.getException());
    }
    else {
      GfErrTypeToException("CqQuery::execute(endpoint)", err);
    }
    */
  }

  return err;
}

void CqQueryImpl::executeAfterFailover() {
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  if (m_cqState != CqState::RUNNING) {
    return;
  }
  executeCq(TcrMessage::EXECUTECQ_MSG_TYPE);
}

void CqQueryImpl::execute() {
  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    gua.setAuthenticatedView(m_authenticatedView);
  }

  ACE_Guard<ACE_Recursive_Thread_Mutex> guardRedundancy(
      *(m_tccdm->getRedundancyLock()));

  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  if (m_cqState == CqState::RUNNING) {
    throw IllegalStateException("CqQuery::execute: cq is already running");
  }
  executeCq(TcrMessage::EXECUTECQ_MSG_TYPE);
}

// for          EXECUTE_REQUEST or REDUNDANT_EXECUTE_REQUEST
bool CqQueryImpl::executeCq(TcrMessage::MsgType) {
  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    gua.setAuthenticatedView(m_authenticatedView);
  }

  LOGDEBUG("CqQueryImpl::executeCq");
  TcrMessageExecuteCq msg(new DataOutput(m_cqService->getDM()
                                             ->getConnectionManager()
                                             .getCacheImpl()
                                             ->createDataOutput()),
                          m_cqName, m_queryString, CqState::RUNNING,
                          isDurable(), m_tccdm);
  TcrMessageReply reply(true, m_tccdm);

  GfErrType err = GF_NOERR;
  err = m_tccdm->sendSyncRequest(msg, reply);
  if (err != GF_NOERR) {
    GfErrTypeToException("CqQuery::executeCq:", err);
  }
  if (reply.getMessageType() == TcrMessage::EXCEPTION ||
      reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
      reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
    err = ThinClientRegion::handleServerException("CqQuery::executeCq",
                                                  reply.getException());
    if (err == GF_CACHESERVER_EXCEPTION) {
      throw CqQueryException(
          std::string("CqQuery::executeCq: exception at the server side: ") +
          reply.getException());
    } else {
      GfErrTypeToException("CqQuery::executeCq", err);
    }
  }
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  m_cqState = CqState::RUNNING;
  updateStats();
  return true;
}

// for EXECUTE_INITIAL_RESULTS_REQUEST :
std::shared_ptr<CqResults> CqQueryImpl::executeWithInitialResults(
    std::chrono::milliseconds timeout) {
  util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout);

  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    gua.setAuthenticatedView(m_authenticatedView);
  }

  ACE_Guard<ACE_Recursive_Thread_Mutex> guardRedundancy(
      *(m_tccdm->getRedundancyLock()));

  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  if (m_cqState == CqState::RUNNING) {
    throw IllegalStateException(
        "CqQuery::executeWithInitialResults: cq is already running");
  }
  // QueryResult values;
  TcrMessageExecuteCqWithIr msg(new DataOutput(m_cqService->getDM()
                                                   ->getConnectionManager()
                                                   .getCacheImpl()
                                                   ->createDataOutput()),
                                m_cqName, m_queryString, CqState::RUNNING,
                                isDurable(), m_tccdm);

  TcrMessageReply reply(true, m_tccdm);
  auto resultCollector =
      std::unique_ptr<ChunkedQueryResponse>(new ChunkedQueryResponse(reply));
  reply.setChunkedResultHandler(resultCollector.get());
  reply.setTimeout(timeout);

  GfErrType err = GF_NOERR;
  err = m_tccdm->sendSyncRequest(msg, reply);
  if (err != GF_NOERR) {
    LOGDEBUG("CqQueryImpl::executeCqWithInitialResults errorred!!!!");
    GfErrTypeToException("CqQuery::executeCqWithInitialResults:", err);
  }
  if (reply.getMessageType() == TcrMessage::EXCEPTION ||
      reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
      reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
    err = ThinClientRegion::handleServerException(
        "CqQuery::executeCqWithInitialResults", reply.getException());
    if (err == GF_CACHESERVER_EXCEPTION) {
      throw CqQueryException(
          std::string("CqQuery::executeWithInitialResults: exception ") +
          "at the server side: " + reply.getException());
    } else {
      GfErrTypeToException("CqQuery::executeWithInitialResults", err);
    }
  }
  m_cqState = CqState::RUNNING;
  updateStats();
  std::shared_ptr<CqResults> sr;
  auto&& values = resultCollector->getQueryResults();
  auto&& fieldNameVec = resultCollector->getStructFieldNames();
  auto sizeOfFieldNamesVec = fieldNameVec.size();
  if (sizeOfFieldNamesVec == 0) {
    LOGFINEST("Query::execute: creating ResultSet for query: %s",
              m_queryString.c_str());
    sr = std::dynamic_pointer_cast<CqResults>(
        std::make_shared<ResultSetImpl>(values));
  } else {
    if (values->size() % fieldNameVec.size() != 0) {
      throw MessageException(
          "Query::execute: Number of values coming "
          "from server has to be exactly divisible by field count");
    } else {
      LOGFINEST("Query::execute: creating StructSet for query: %s",
                m_queryString.c_str());
      sr = std::dynamic_pointer_cast<CqResults>(
          std::make_shared<StructSetImpl>(values, fieldNameVec));
    }
  }
  return sr;
}

/**
 * Stop or pause executing the query.
 */
void CqQueryImpl::stop() {
  if (isClosed()) {
    throw CqClosedException(("CQ is closed, CqName : " + m_cqName).c_str());
  }

  GuardUserAttributes gua;
  if (m_authenticatedView != nullptr) {
    gua.setAuthenticatedView(m_authenticatedView);
  }

  if (!(isRunning())) {
    throw IllegalStateException(
        ("CQ is not in running state, stop CQ does not apply, CqName : " +
         m_cqName)
            .c_str());
  }

  sendStopOrClose(TcrMessage::STOPCQ_MSG_TYPE);
  /*
  CqServiceVsdStats & stats = m_cqService->getCqServiceVsdStats();
  stats.decNumCqsActive();
  */
  setCqState(CqState::STOPPED);
  // stats.incNumCqsStopped();
  updateStats();
}
void CqQueryImpl::sendStopOrClose(TcrMessage::MsgType requestType) {
  GfErrType err = GF_NOERR;
  TcrMessageReply reply(true, m_tccdm);

  if (requestType == TcrMessage::STOPCQ_MSG_TYPE) {
    TcrMessageStopCQ msg(new DataOutput(m_cqService->getDM()
                                            ->getConnectionManager()
                                            .getCacheImpl()
                                            ->createDataOutput()),
                         m_cqName, std::chrono::milliseconds(-1), m_tccdm);
    err = m_tccdm->sendSyncRequest(msg, reply);
  } else if (requestType == TcrMessage::CLOSECQ_MSG_TYPE) {
    TcrMessageCloseCQ msg(new DataOutput(m_cqService->getDM()
                                             ->getConnectionManager()
                                             .getCacheImpl()
                                             ->createDataOutput()),
                          m_cqName, std::chrono::milliseconds(-1), m_tccdm);
    err = m_tccdm->sendSyncRequest(msg, reply);
  }

  if (err != GF_NOERR) {
    GfErrTypeToException("CqQuery::stop/close:", err);
  }
  if (reply.getMessageType() == TcrMessage::EXCEPTION ||
      reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
      reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
    err = ThinClientRegion::handleServerException("CqQuery::stop/close",
                                                  reply.getException());
    if (err == GF_CACHESERVER_EXCEPTION) {
      throw CqQueryException(
          std::string("CqQuery::stop/close: exception at the server side: ") +
          reply.getException());
    } else {
      GfErrTypeToException("CqQuery::stop/close", err);
    }
  }
}

/**
 * Return the state of this query.
 * @return STOPPED RUNNING or CLOSED
 */
CqState CqQueryImpl::getState() { return m_cqState; }

/**
 * Sets the state of the cq.
 * Server side method. Called during cq registration time.
 */
void CqQueryImpl::setCqState(CqState state) {
  if (isClosed()) {
    throw CqClosedException(("CQ is closed, CqName : " + m_cqName).c_str());
  }
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  m_cqState = state;
}

CqAttributesMutator CqQueryImpl::getCqAttributesMutator() const {
  return m_cqAttributesMutator;
}

/**
 * @return Returns the cqOperation.
 */
CqOperation CqQueryImpl::getCqOperation() const { return m_cqOperation; }

/**
 * @param cqOperation The cqOperation to set.
 */
void CqQueryImpl::setCqOperation(CqOperation cqOperation) {
  m_cqOperation = cqOperation;
}

/**
 * Update CQ stats
 * @param cqEvent object
 */
void CqQueryImpl::updateStats(CqEvent& cqEvent) {
  auto stats = std::static_pointer_cast<CqQueryVsdStats>(m_stats);
  stats->incNumEvents();
  switch (cqEvent.getQueryOperation()) {
    case CqOperation::OP_TYPE_CREATE:
      stats->incNumInserts();
      break;
    case CqOperation::OP_TYPE_UPDATE:
      stats->incNumUpdates();
      break;
    case CqOperation::OP_TYPE_DESTROY:
      stats->incNumDeletes();
      break;
    default:
      break;
  }
}

/**
 * Return true if the CQ is in running state
 * @return true if running, false otherwise
 */
bool CqQueryImpl::isRunning() const {
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  return m_cqState == CqState::RUNNING;
}

/**
 * Return true if the CQ is in Sstopped state
 * @return true if stopped, false otherwise
 */
bool CqQueryImpl::isStopped() const {
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  return m_cqState == CqState::STOPPED ||
         (m_authenticatedView && m_authenticatedView->isClosed());
}

/**
 * Return true if the CQ is closed
 * @return true if closed, false otherwise
 */
bool CqQueryImpl::isClosed() const {
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_mutex);
  return m_cqState == CqState::CLOSED ||
         (m_authenticatedView && m_authenticatedView->isClosed());
}

/**
 * Return true if the CQ is durable
 * @return true if durable, false otherwise
 */
bool CqQueryImpl::isDurable() const { return m_isDurable; }

}  // namespace client
}  // namespace geode
}  // namespace apache
