/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "CqService.hpp"

#include <sstream>

#include <geode/CqServiceStatistics.hpp>
#include <geode/CqStatusListener.hpp>
#include <geode/ExceptionTypes.hpp>
#include <geode/SystemProperties.hpp>

#include "CqEventImpl.hpp"
#include "CqQueryImpl.hpp"
#include "DistributedSystem.hpp"
#include "ReadWriteLock.hpp"
#include "TcrConnectionManager.hpp"
#include "ThinClientPoolDM.hpp"
#include "util/exception.hpp"

namespace apache {
namespace geode {
namespace client {

CqService::CqService(ThinClientBaseDM* tccdm,
                     StatisticsFactory* statisticsFactory)
    : m_tccdm(tccdm),
      m_statisticsFactory(statisticsFactory),
      m_notificationSema(1),
      m_stats(std::make_shared<CqServiceVsdStats>(m_statisticsFactory)) {
  m_running = true;
  LOGDEBUG("CqService Started");
}
CqService::~CqService() noexcept { LOGDEBUG("CqService Destroyed"); }

void CqService::updateStats() {
  auto stats = std::dynamic_pointer_cast<CqServiceVsdStats>(m_stats);

  stats->setNumCqsActive(0);
  stats->setNumCqsStopped(0);

  auto&& lock = m_cqQueryMap.make_lock();

  stats->setNumCqsOnClient(static_cast<uint32_t>(m_cqQueryMap.size()));

  if (m_cqQueryMap.empty()) return;

  for (const auto& kv : m_cqQueryMap) {
    auto& cquery = kv.second;
    switch (cquery->getState()) {
      case CqState::RUNNING:
        stats->incNumCqsActive();
        break;
      case CqState::STOPPED:
        stats->incNumCqsStopped();
        break;
      default:
        break;
    }
  }
}

bool CqService::checkAndAcquireLock() {
  if (m_running) {
    m_notificationSema.acquire();
    if (m_running == false) {
      m_notificationSema.release();
      return false;
    }
    return true;
  } else {
    return false;
  }
}
std::shared_ptr<CqQuery> CqService::newCq(
    const std::string& cqName, const std::string& queryString,
    const std::shared_ptr<CqAttributes>& cqAttributes, bool isDurable) {
  if (queryString.empty()) {
    throw IllegalArgumentException("Null queryString is passed. ");
  } else if (cqAttributes == nullptr) {
    throw IllegalArgumentException("Null cqAttribute is passed. ");
  }

  // Check if the subscription is enabled on the pool
  auto pool = dynamic_cast<ThinClientPoolDM*>(m_tccdm);
  if (pool != nullptr && !pool->getSubscriptionEnabled()) {
    LOGERROR(
        "Cannot create CQ because subscription is not enabled on the pool.");
    throw IllegalStateException(
        "Cannot create CQ because subscription is not enabled on the pool.");
  }

  // check for durable client
  if (isDurable) {
    auto&& durableID = m_tccdm->getConnectionManager()
                           .getCacheImpl()
                           ->getDistributedSystem()
                           .getSystemProperties()
                           .durableClientId();
    if (durableID.empty()) {
      LOGERROR("Cannot create durable CQ because client is not durable.");
      throw IllegalStateException(
          "Cannot create durable CQ because client is not durable.");
    }
  }

  // Check if the given cq already exists.
  if (!cqName.empty() && isCqExists(cqName)) {
    throw CqExistsException(
        ("CQ with the given name already exists. CqName : " + cqName).c_str());
  }

  std::shared_ptr<UserAttributes> ua = nullptr;
  if (m_tccdm != nullptr && m_tccdm->isMultiUserMode()) {
    ua = UserAttributes::threadLocalUserAttributes;
  }

  auto cQuery = std::make_shared<CqQueryImpl>(
      shared_from_this(), cqName, queryString, cqAttributes,
      m_statisticsFactory, isDurable, ua);
  cQuery->initCq();
  return std::move(cQuery);
}

/**
 * Adds the given CQ and cqQuery object into the CQ map.
 */
void CqService::addCq(const std::string& cqName, std::shared_ptr<CqQuery>& cq) {
  auto result = m_cqQueryMap.emplace(cqName, cq);
  if (!result.second) {
    throw CqExistsException("CQ with given name already exists. ");
  }
}

/**
 * Removes given CQ from the cqMap..
 */
void CqService::removeCq(const std::string& cqName) {
  m_cqQueryMap.erase(cqName);
}

/**
 * Retrieve a CqQuery by name.
 * @return the CqQuery or null if not found
 */
std::shared_ptr<CqQuery> CqService::getCq(const std::string& cqName) {
  auto&& lock = m_cqQueryMap.make_lock();
  const auto& found = m_cqQueryMap.find(cqName);
  if (found == m_cqQueryMap.end()) {
    LOGWARN("Failed to get the specified CQ: %s", cqName.c_str());
  } else {
    return found->second;
  }
  return nullptr;
}

/**
 * Clears the CQ Query Map.
 */
void CqService::clearCqQueryMap() {
  Log::fine("Cleaning clearCqQueryMap.");
  m_cqQueryMap.clear();
}

/**
 * Retrieve  all registered CQs
 */
CqService::query_container_type CqService::getAllCqs() {
  CqService::query_container_type cqVec;
  auto&& lock = m_cqQueryMap.make_lock();
  if (!m_cqQueryMap.empty()) {
    cqVec.reserve(m_cqQueryMap.size());
    for (auto& kv : m_cqQueryMap) {
      cqVec.push_back(kv.second);
    }
  }
  return cqVec;
}

/**
 * Executes all the cqs on this client.
 */
void CqService::executeAllClientCqs(bool afterFailover) {
  query_container_type cqVec = getAllCqs();
  executeCqs(cqVec, afterFailover);
}

/**
 * Executes all CQs on the specified endpoint after failover.
 */
GfErrType CqService::executeAllClientCqs(TcrEndpoint* endpoint) {
  query_container_type cqVec = getAllCqs();
  return executeCqs(cqVec, endpoint);
}

/**
 * Executes all the given cqs on the specified endpoint after failover.
 */
GfErrType CqService::executeCqs(query_container_type& cqs,
                                TcrEndpoint* endpoint) {
  if (cqs.empty()) {
    return GF_NOERR;
  }

  GfErrType err = GF_NOERR;
  GfErrType opErr = GF_NOERR;

  for (auto& cq : cqs) {
    if (!cq->isClosed() && cq->isRunning()) {
      opErr = std::static_pointer_cast<CqQueryImpl>(cq)->execute(endpoint);
      if (err == GF_NOERR) {
        err = opErr;
      }
    }
  }
  return err;
}

/**
 * Executes all the given cqs.
 */
void CqService::executeCqs(query_container_type& cqs, bool afterFailover) {
  if (cqs.empty()) {
    return;
  }
  std::string cqName;
  for (auto& cq : cqs) {
    if (!cq->isClosed() &&
        (cq->isStopped() || (cq->isRunning() && afterFailover))) {
      try {
        cqName = cq->getName();
        if (afterFailover) {
          std::static_pointer_cast<CqQueryImpl>(cq)->executeAfterFailover();
        } else {
          cq->execute();
        }
      } catch (QueryException& qe) {
        LOGFINE("%s", ("Failed to execute the CQ, CqName : " + cqName +
                       " Error : " + qe.what())
                          .c_str());
      } catch (CqClosedException& cce) {
        LOGFINE(("Failed to execute the CQ, CqName : " + cqName +
                 " Error : " + cce.what())
                    .c_str());
      }
    }
  }
}

/**
 * Stops all the cqs
 */
void CqService::stopAllClientCqs() {
  query_container_type cqVec = getAllCqs();
  // MapOfRegionGuard guard( m_cqQueryMap->mutex() );
  stopCqs(cqVec);
}

/**
 * Stops all the specified cqs.
 */
void CqService::stopCqs(query_container_type& cqs) {
  if (cqs.empty()) {
    return;
  }

  std::string cqName;
  for (auto cq : cqs) {
    if (!cq->isClosed() && cq->isRunning()) {
      try {
        cqName = cq->getName();
        cq->stop();
      } catch (QueryException& qe) {
        Log::fine(("Failed to stop the CQ, CqName : " + cqName +
                   " Error : " + qe.what())
                      .c_str());
      } catch (CqClosedException& cce) {
        Log::fine(("Failed to stop the CQ, CqName : " + cqName +
                   " Error : " + cce.what())
                      .c_str());
      }
    }
  }
}

void CqService::closeCqs(query_container_type& cqs) {
  LOGDEBUG("closeCqs() TcrMessage::isKeepAlive() = %d ",
           TcrMessage::isKeepAlive());
  if (!cqs.empty()) {
    std::string cqName;
    for (auto& cq : cqs) {
      try {
        auto cqi = std::static_pointer_cast<CqQueryImpl>(cq);
        cqName = cqi->getName();
        LOGDEBUG("closeCqs() cqname = %s isDurable = %d ", cqName.c_str(),
                 cqi->isDurable());
        if (!(cqi->isDurable() && TcrMessage::isKeepAlive())) {
          cqi->close(true);
        } else {
          cqi->close(false);
        }
      } catch (QueryException& qe) {
        Log::fine(("Failed to close the CQ, CqName : " + cqName +
                   " Error : " + qe.what())
                      .c_str());
      } catch (CqClosedException& cce) {
        Log::fine(("Failed to close the CQ, CqName : " + cqName +
                   " Error : " + cce.what())
                      .c_str());
      }
    }
  }
}

/**
 * Get statistics information for all CQs
 * @return the CqServiceStatistics
 */
std::shared_ptr<CqServiceStatistics> CqService::getCqServiceStatistics() {
  return m_stats;
}

/**
 * Close the CQ Service after cleanup if any.
 *
 */
void CqService::closeCqService() {
  if (m_running) {
    m_running = false;
    m_notificationSema.acquire();
    cleanup();
    m_notificationSema.release();
  }
}
void CqService::closeAllCqs() {
  Log::fine("closeAllCqs()");
  query_container_type cqVec = getAllCqs();
  Log::fine("closeAllCqs() 1");
  auto&& lock = m_cqQueryMap.make_lock();
  Log::fine("closeAllCqs() 2");
  closeCqs(cqVec);
}

/**
 * Cleans up the CqService.
 */
void CqService::cleanup() {
  Log::fine("Cleaning up CqService.");

  // Close All the CQs.
  // Need to take care when Clients are still connected...
  closeAllCqs();

  // Clear cqQueryMap.
  clearCqQueryMap();
}

/*
 * Checks if CQ with the given name already exists.
 * @param cqName name of the CQ.
 * @return true if exists else false.
 */
bool CqService::isCqExists(const std::string& cqName) {
  auto&& lock = m_cqQueryMap.make_lock();

  return m_cqQueryMap.find(cqName) != m_cqQueryMap.end();
}
void CqService::receiveNotification(TcrMessage* msg) {
  invokeCqListeners(msg->getCqs(), msg->getMessageTypeForCq(), msg->getKey(),
                    msg->getValue(), msg->getDeltaBytes(), msg->getEventId());
  _GEODE_SAFE_DELETE(msg);
  m_notificationSema.release();
}

/**
 * Invokes the CqListeners for the given CQs.
 * @param cqs list of cqs with the cq operation from the Server.
 * @param messageType base operation
 * @param key
 * @param value
 */
void CqService::invokeCqListeners(const std::map<std::string, int>* cqs,
                                  uint32_t messageType,
                                  std::shared_ptr<CacheableKey> key,
                                  std::shared_ptr<Cacheable> value,
                                  std::shared_ptr<CacheableBytes> deltaValue,
                                  std::shared_ptr<EventId> eventId) {
  LOGDEBUG("CqService::invokeCqListeners");
  for (const auto& kv : *cqs) {
    const auto cqName = kv.first;
    auto cQuery = getCq(cqName);
    auto cQueryImpl = std::dynamic_pointer_cast<CqQueryImpl>(cQuery);
    if (!(cQueryImpl && cQueryImpl->isRunning())) {
      LOGFINE("Unable to invoke CqListener, %s, CqName: %s",
              cQueryImpl ? "CQ not found" : "CQ is Not running",
              cqName.c_str());
      continue;
    }

    const auto cqOp = kv.second;

    // If Region destroy event, close the cq.
    if (cqOp == TcrMessage::DESTROY_REGION) {
      // The close will also invoke the listeners close().
      try {
        cQueryImpl->close(false);
      } catch (Exception& ex) {
        // handle?
        LOGFINE("Exception while invoking CQ listeners: %s", ex.what());
      }
      continue;
    }

    // Construct CqEvent.
    auto cqEvent =
        new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp),
                        key, value, m_tccdm, deltaValue, eventId);

    // Update statistics
    cQueryImpl->updateStats(*cqEvent);

    // invoke CQ Listeners.
    for (auto l : cQueryImpl->getCqAttributes()->getCqListeners()) {
      try {
        // Check if the listener is not null, it could have been changed/reset
        // by the CqAttributeMutator.
        if (l) {
          if (cqEvent->getError() == true) {
            l->onError(*cqEvent);
          } else {
            l->onEvent(*cqEvent);
          }
        }
        // Handle client side exceptions.
      } catch (Exception& ex) {
        LOGWARN(("Exception in the CqListener of the CQ named " + cqName +
                 ", error: " + ex.what())
                    .c_str());
      }
    }
    delete cqEvent;
  }
}

void CqService::invokeCqConnectedListeners(const std::string& poolName,
                                           bool connected) {
  query_container_type vec = getAllCqs();
  for (auto& cQuery : vec) {
    const auto& cqName = cQuery->getName();
    auto cQueryImpl = std::dynamic_pointer_cast<CqQueryImpl>(cQuery);
    if (cQueryImpl == nullptr || !cQueryImpl->isRunning()) {
      LOGFINE("Unable to invoke CqStatusListener, %s, CqName: %s",
              (cQueryImpl == nullptr) ? "CQ not found" : "CQ is Not running",
              cqName.c_str());
      continue;
    }

    // Check cq pool to determine if the pool matches, if not continue.
    auto* poolDM = dynamic_cast<ThinClientPoolDM*>(cQueryImpl->getDM());
    if (poolDM != nullptr) {
      std::string pName = poolDM->getName();
      if (pName.compare(poolName) != 0) {
        continue;
      }
    }

    // invoke CQ Listeners.
    for (auto l : cQueryImpl->getCqAttributes()->getCqListeners()) {
      try {
        // Check if the listener is not null, it could have been changed/reset
        // by the CqAttributeMutator.
        if (auto statusLstr = std::dynamic_pointer_cast<CqStatusListener>(l)) {
          if (connected) {
            statusLstr->onCqConnected();
          } else {
            statusLstr->onCqDisconnected();
          }
        }
        // Handle client side exceptions.
      } catch (Exception& ex) {
        LOGWARN(("Exception in the CqStatusListener of the CQ named " + cqName +
                 ", error: " + ex.what())
                    .c_str());
      }
    }
  }
}

/**
 * Returns the Operation for the given EnumListenerEvent type.
 * @param eventType
 * @return Operation
 */
CqOperation CqService::getOperation(int eventType) {
  CqOperation op = CqOperation::OP_TYPE_INVALID;
  switch (eventType) {
    case TcrMessage::LOCAL_CREATE:
      op = CqOperation::OP_TYPE_CREATE;
      break;

    case TcrMessage::LOCAL_UPDATE:
      op = CqOperation::OP_TYPE_UPDATE;
      break;

    case TcrMessage::LOCAL_DESTROY:
      op = CqOperation::OP_TYPE_DESTROY;
      break;

    case TcrMessage::LOCAL_INVALIDATE:
      op = CqOperation::OP_TYPE_INVALIDATE;
      break;

    case TcrMessage::CLEAR_REGION:
      op = CqOperation::OP_TYPE_REGION_CLEAR;
      break;

      //      case TcrMessage::INVALIDATE_REGION :
      //        op = CqOperation::OP_TYPE_REGION_INVALIDATE;
      //        break;
  }
  return op;
}

/**
 * Gets all the durable CQs registered by this client.
 *
 * @return List of names of registered durable CQs, empty list if no durable
 * cqs.
 */
std::shared_ptr<CacheableArrayList> CqService::getAllDurableCqsFromServer() {
  TcrMessageGetDurableCqs msg(
      new DataOutput(
          m_tccdm->getConnectionManager().getCacheImpl()->createDataOutput()),
      m_tccdm);
  TcrMessageReply reply(true, m_tccdm);

  // intialize the chunked response hadler for durable cqs list
  ChunkedDurableCQListResponse* resultCollector =
      new ChunkedDurableCQListResponse(reply);
  reply.setChunkedResultHandler(
      static_cast<TcrChunkedResult*>(resultCollector));
  reply.setTimeout(DEFAULT_QUERY_RESPONSE_TIMEOUT);

  GfErrType err = GF_NOERR;
  err = m_tccdm->sendSyncRequest(msg, reply);
  if (err != GF_NOERR) {
    LOGDEBUG("CqService::getAllDurableCqsFromServer!!!!");
    throwExceptionIfError("CqService::getAllDurableCqsFromServer:", err);
  }
  if (reply.getMessageType() == TcrMessage::EXCEPTION ||
      reply.getMessageType() == TcrMessage::GET_DURABLE_CQS_DATA_ERROR) {
    err = ThinClientRegion::handleServerException(
        "CqService::getAllDurableCqsFromServer", reply.getException());
    if (err == GF_CACHESERVER_EXCEPTION) {
      std::stringstream message;
      message << "CqService::getAllDurableCqsFromServer: exception "
              << "at the server side: " << reply.getException();
      throw CqQueryException(message.str());
    } else {
      throwExceptionIfError("CqService::getAllDurableCqsFromServer", err);
    }
  }

  auto tmpRes = resultCollector->getResults();
  delete resultCollector;
  return tmpRes;
}

}  // namespace client
}  // namespace geode
}  // namespace apache
