/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "TcrPoolEndPoint.hpp"

#include <geode/SystemProperties.hpp>

#include "ThinClientPoolDM.hpp"

namespace apache {
namespace geode {
namespace client {

TcrPoolEndPoint::TcrPoolEndPoint(const std::string& name, CacheImpl* cache,
                                 ACE_Semaphore& failoverSema,
                                 ACE_Semaphore& cleanupSema,
                                 ACE_Semaphore& redundancySema,
                                 ThinClientPoolDM* dm)
    : TcrEndpoint(name, cache, failoverSema, cleanupSema, redundancySema, dm),
      m_dm(dm) {}
bool TcrPoolEndPoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
  return m_dm->checkDupAndAdd(eventid);
}

void TcrPoolEndPoint::processMarker() { m_dm->processMarker(); }
std::shared_ptr<QueryService> TcrPoolEndPoint::getQueryService() {
  return m_dm->getQueryServiceWithoutCheck();
}
void TcrPoolEndPoint::sendRequestForChunkedResponse(const TcrMessage& request,
                                                    TcrMessageReply& reply,
                                                    TcrConnection* conn) {
  conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
                                      request.getTimeout(), reply.getTimeout());
}
ThinClientPoolDM* TcrPoolEndPoint::getPoolHADM() { return m_dm; }
void TcrPoolEndPoint::triggerRedundancyThread() {
  m_dm->triggerRedundancyThread();
}
void TcrPoolEndPoint::closeFailedConnection(TcrConnection*& conn) {
  if (!m_dm->getThreadLocalConnections()) closeConnection(conn);
}

bool TcrPoolEndPoint::isMultiUserMode() { return m_dm->isMultiUserMode(); }

void TcrPoolEndPoint::closeNotification() {
  LOGFINE("TcrPoolEndPoint::closeNotification..");
  m_notifyReceiver->stopNoblock();
  m_notifyConnectionList.push_back(m_notifyConnection);
  m_notifyReceiverList.push_back(m_notifyReceiver.get());
  m_isQueueHosted = false;
}

GfErrType TcrPoolEndPoint::registerDM(bool, bool isSecondary, bool,
                                      ThinClientBaseDM*) {
  GfErrType err = GF_NOERR;
  ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_dm->getPoolLock());
  std::lock_guard<decltype(getQueueHostedMutex())> guardQueueHosted(
      getQueueHostedMutex());
  auto& sysProp = m_cacheImpl->getDistributedSystem().getSystemProperties();
  if (!connected()) {
    TcrConnection* newConn;
    if ((err = createNewConnection(newConn, false, false,
                                   sysProp.connectTimeout(), 0, connected())) !=
        GF_NOERR) {
      setConnected(false);
      return err;
    }
    m_dm->addConnection(newConn);
    // m_connected = true;
    setConnected(true);
  }

  LOGFINEST(
      "TcrEndpoint::registerPoolDM( ): registering DM and notification "
      "channel for endpoint %s",
      name().c_str());

  if (m_numRegionListener == 0) {
    if ((err = createNewConnection(m_notifyConnection, true, isSecondary,
                                   sysProp.connectTimeout() * 3, 0)) !=
        GF_NOERR) {
      setConnected(false);
      LOGWARN("Failed to start subscription channel for endpoint %s",
              name().c_str());
      return err;
    }
    m_notifyReceiver = std::unique_ptr<Task<TcrEndpoint>>(new Task<TcrEndpoint>(
        this, &TcrEndpoint::receiveNotification, NC_Notification));
    m_notifyReceiver->start();
  }
  ++m_numRegionListener;
  LOGFINEST("Incremented notification count for endpoint %s to %d",
            name().c_str(), m_numRegionListener);

  m_isQueueHosted = true;
  setConnected(true);
  return err;
}
void TcrPoolEndPoint::unregisterDM(bool, ThinClientBaseDM*,
                                   bool checkQueueHosted) {
  std::lock_guard<decltype(getQueueHostedMutex())> guardQueueHosted(
      getQueueHostedMutex());

  if (checkQueueHosted && !m_isQueueHosted) {
    LOGFINEST(
        "TcrEndpoint: unregistering pool DM, notification channel not present "
        "for %s",
        name().c_str());
    return;
  }

  LOGFINEST(
      "TcrEndpoint: unregistering pool DM and closing notification "
      "channel for endpoint %s",
      name().c_str());
  std::lock_guard<decltype(m_notifyReceiverLock)> guard2(m_notifyReceiverLock);
  if (m_numRegionListener > 0 && --m_numRegionListener == 0) {
    closeNotification();
  }
  LOGFINEST("Decremented notification count for endpoint %s to %d",
            name().c_str(), m_numRegionListener);
  LOGFINEST("TcrEndpoint: unregisterPoolDM done for endpoint %s",
            name().c_str());
}

bool TcrPoolEndPoint::handleIOException(const std::string& message,
                                        TcrConnection*& conn, bool isBgThread) {
  if (!isBgThread) {
    m_dm->setStickyNull(false);
  }
  return TcrEndpoint::handleIOException(message, conn);
}

void TcrPoolEndPoint::handleNotificationStats(int64_t byteLength) {
  m_dm->getStats().incReceivedBytes(byteLength);
  m_dm->getStats().incMessageBeingReceived();
}

}  // namespace client
}  // namespace geode
}  // namespace apache
