/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "fw_dunit.hpp"
#include <geode/CqAttributesFactory.hpp>
#include <geode/CqAttributes.hpp>
#include <geode/CqListener.hpp>
#include <geode/CqQuery.hpp>
#include <ace/OS.h>
#include <ace/High_Res_Timer.h>
#include <ace/Task.h>
#include <string>

#define ROOT_NAME "TestThinClientCqHAFailover"
#define ROOT_SCOPE DISTRIBUTED_ACK

#include "CacheHelper.hpp"

#include "QueryStrings.hpp"
#include "QueryHelper.hpp"

#include <geode/Query.hpp>
#include <geode/QueryService.hpp>

#include "ThinClientCQ.hpp"
#include <hacks/range.h>

#define CLIENT1 s1p1
#define CLIENT2 s1p2
#define SERVER1 s2p1
#define SERVER2 s2p2

using apache::geode::client::CqAttributesFactory;
using apache::geode::client::CqEvent;
using apache::geode::client::CqListener;
using apache::geode::client::Exception;
using apache::geode::client::IllegalStateException;
using apache::geode::client::QueryService;

const char *cqName = "MyCq";

const char *regionNamesCq[] = {"Portfolios", "Positions"};

class MyCqListener : public CqListener {
  bool m_failedOver;
  uint32_t m_cnt_before;
  uint32_t m_cnt_after;

 public:
  MyCqListener() : m_failedOver(false), m_cnt_before(0), m_cnt_after(0) {}

  void setFailedOver() { m_failedOver = true; }
  uint32_t getCountBefore() { return m_cnt_before; }
  uint32_t getCountAfter() { return m_cnt_after; }

  void onEvent(const CqEvent &) override {
    if (m_failedOver) {
      // LOG("after:MyCqListener::OnEvent called");
      m_cnt_after++;
    } else {
      // LOG("before:MyCqListener::OnEvent called");
      m_cnt_before++;
    }
  }
  void onError(const CqEvent &) override {
    if (m_failedOver) {
      // LOG("after: MyCqListener::OnError called");
      m_cnt_after++;
    } else {
      // LOG("before: MyCqListener::OnError called");
      m_cnt_before++;
    }
  }
  void close() override { LOG("MyCqListener::close called"); }
};

class KillServerThread : public ACE_Task_Base {
 public:
  bool m_running;
  MyCqListener *m_listener;
  explicit KillServerThread(MyCqListener *listener)
      : m_running(false), m_listener(listener) {}
  int svc(void) {
    while (m_running == true) {
      CacheHelper::closeServer(1);
      LOG("THREAD CLOSED SERVER 1");
      // m_listener->setFailedOver();
      m_running = false;
    }
    return 0;
  }
  void start() {
    m_running = true;
    activate();
  }
  void stop() {
    m_running = false;
    wait();
  }
};

void initClientCq() {
  if (cacheHelper == nullptr) {
    cacheHelper = new CacheHelper(true);
  }
  ASSERT(cacheHelper, "Failed to create a CacheHelper client instance.");

  try {
    auto serializationRegistry =
        CacheRegionHelper::getCacheImpl(cacheHelper->getCache().get())
            ->getSerializationRegistry();

    serializationRegistry->addDataSerializableType(
        Position::createDeserializable, 2);
    serializationRegistry->addDataSerializableType(
        Portfolio::createDeserializable, 3);
  } catch (const IllegalStateException &) {
    // ignore exception
  }
}

KillServerThread *kst = nullptr;

DUNIT_TASK_DEFINITION(SERVER1, CreateLocator)
  {
    if (isLocator) CacheHelper::initLocator(1);
    LOG("Locator1 started");
  }
END_TASK_DEFINITION

void createServer() {
  LOG("Starting SERVER1...");
  if (isLocalServer) {
    CacheHelper::initServer(1, "cqqueryfailover.xml", locatorsG);
  }
  LOG("SERVER1 started");
}

DUNIT_TASK_DEFINITION(SERVER1, CreateServer1_Locator)
  { createServer(); }
END_TASK_DEFINITION

void stepTwo() {
  LOG("Starting SERVER2...");
  // if ( isLocalServer ) CacheHelper::initServer( 2, "cqqueryfailover.xml");
  if (isLocalServer) {
    CacheHelper::initServer(2, "remotequery.xml", locatorsG);
  }
  LOG("SERVER2 started");

  LOG("StepTwo complete.");
}

DUNIT_TASK_DEFINITION(SERVER2, StepTwo_Locator)
  { stepTwo(); }
END_TASK_DEFINITION

void stepOne() {
  initClientCq();

  createRegionForCQ(regionNamesCq[0], USE_ACK, true, 1);

  auto regptr = getHelper()->getRegion(regionNamesCq[0]);
  auto subregPtr =
      regptr->createSubregion(regionNamesCq[1], regptr->getAttributes());

  QueryHelper *qh = &QueryHelper::getHelper();

  qh->populatePortfolioData(regptr, 100, 20, 10);
  qh->populatePositionData(subregPtr, 100, 20);

  LOG("StepOne complete.");
}

DUNIT_TASK_DEFINITION(CLIENT1, StepOne_PoolLocator)
  { stepOne(); }
END_TASK_DEFINITION

void stepOne2() {
  initClientCq();
  createRegionForCQ(regionNamesCq[0], USE_ACK, true, 1);
  auto regptr = getHelper()->getRegion(regionNamesCq[0]);
  auto subregPtr =
      regptr->createSubregion(regionNamesCq[1], regptr->getAttributes());

  LOG("StepOne2 complete.");
}

DUNIT_TASK_DEFINITION(CLIENT2, StepOne2_PoolLocator)
  { stepOne2(); }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(CLIENT1, StepThree)
  {
    try {
      auto pool =
          getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]);
      std::shared_ptr<QueryService> qs;
      if (pool != nullptr) {
        qs = pool->getQueryService();
      } else {
        qs = getHelper()->cachePtr->getQueryService();
      }
      CqAttributesFactory cqFac;
      auto cqLstner = std::make_shared<MyCqListener>();
      cqFac.addCqListener(cqLstner);
      auto cqAttr = cqFac.create();

      auto qryStr = "select * from /Portfolios p where p.ID != 1";
      // char* qryStr = (char*)"select * from /Portfolios p where p.ID != 2";
      // char* qryStr = (char*)"select * from /Portfolios p where p.ID < 3";
      auto &&qry = qs->newCq(cqName, qryStr, cqAttr);
      auto &&results = qry->executeWithInitialResults();

      char buf[100];
      auto count = results->size();
      sprintf(buf, "results size=%zd", count);
      LOG(buf);
      for (auto &&ser : hacks::range(*results)) {
        count--;
        if (auto portfolio = std::dynamic_pointer_cast<Portfolio>(ser)) {
          printf("   query pulled portfolio object ID %d, pkid %s\n",
                 portfolio->getID(), portfolio->getPkid()->value().c_str());
        } else if (auto position = std::dynamic_pointer_cast<Position>(ser)) {
          printf("   query  pulled position object secId %s, shares %d\n",
                 position->getSecId()->value().c_str(),
                 position->getSharesOutstanding());
        } else if (ser) {
          printf(" query pulled object %s\n", ser->toString().c_str());
        } else {
          printf("   query pulled nullptr object\n");
        }
      }
      sprintf(buf, "results last count=%zd", count);
      LOG(buf);
      //  ASSERT( count==0, "results traversal count incorrect!" );
      SLEEP(15000);
    } catch (IllegalStateException &ise) {
      char isemsg[500] = {0};
      ACE_OS::snprintf(isemsg, 499, "IllegalStateException: %s", ise.what());
      LOG(isemsg);
      FAIL(isemsg);
    } catch (Exception &excp) {
      char excpmsg[500] = {0};
      ACE_OS::snprintf(excpmsg, 499, "Exception: %s", excp.what());
      LOG(excpmsg);
      FAIL(excpmsg);
    } catch (...) {
      LOG("Got an exception!");
      FAIL("Got an exception!");
    }

    LOG("StepThree complete.");
  }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(CLIENT2, StepThree2)
  {
    auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
    auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);

    QueryHelper *qh = &QueryHelper::getHelper();

    qh->populatePortfolioData(regPtr0, 150, 40, 10);
    qh->populatePositionData(subregPtr0, 150, 40);
    for (int i = 1; i < 150; i++) {
      auto port = std::make_shared<Portfolio>(i, 20);

      auto keyport = CacheableKey::create("port1-1");
      regPtr0->put(keyport, port);
      SLEEP(100);  // sleep a while to allow server query to complete
    }

    LOG("StepTwo2 complete.");
    SLEEP(15000);  // sleep 0.25 min to allow server query to complete
  }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(CLIENT1, StepThree3)
  {
    // using region name as pool name
    auto pool =
        getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]);
    std::shared_ptr<QueryService> qs;
    if (pool != nullptr) {
      qs = pool->getQueryService();
    } else {
      qs = getHelper()->cachePtr->getQueryService();
    }
    auto qry = qs->getCq(cqName);
    ASSERT(qry != nullptr, "failed to get CqQuery");
    auto cqAttr = qry->getCqAttributes();
    ASSERT(cqAttr != nullptr, "failed to get CqAttributes");
    std::shared_ptr<CqListener> cqLstner = nullptr;
    try {
      auto vl = cqAttr->getCqListeners();
      cqLstner = vl[0];
    } catch (Exception &excp) {
      char excpmsg[500] = {0};
      ACE_OS::snprintf(excpmsg, 499, "Exception: %s", excp.what());
      LOG(excpmsg);
      ASSERT(false, "get listener failed");
    }
    ASSERT(cqLstner != nullptr, "listener is nullptr");
    MyCqListener *myListener = dynamic_cast<MyCqListener *>(cqLstner.get());
    ASSERT(myListener != nullptr, "my listener is nullptr<cast failed>");
    kst = new KillServerThread(myListener);
    char buf[1024];
    sprintf(buf, "before kill server 1, before=%d, after=%d",
            myListener->getCountBefore(), myListener->getCountAfter());
    LOG(buf);
    ASSERT(myListener->getCountAfter() == 0,
           "cq after failover should be zero");
    ASSERT(myListener->getCountBefore() == 6108,
           "check cq event count before failover");
    kst->start();
    SLEEP(1500);
    kst->stop();
    myListener->setFailedOver();
    /*
   auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
   auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
    for(int i=1; i < 150; i++)
    {
        auto port = std::make_shared<Portfolio>(i, 20);

       auto keyport = CacheableKey::create("port1-1");
        try {
          regPtr0->put(keyport, port);
        } catch (...)
        {
          LOG("Failover in progress sleep for 100 ms");
           SLEEP(100); // waiting for failover to complete
           continue;
        }
        LOG("Failover completed");
        myListener->setFailedOver();
        break;
    }
    */
  }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(CLIENT2, StepThree4)
  {
    auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
    auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);

    QueryHelper *qh = &QueryHelper::getHelper();

    qh->populatePortfolioData(regPtr0, 150, 40, 10);
    qh->populatePositionData(subregPtr0, 150, 40);
    for (int i = 1; i < 150; i++) {
      auto port = std::make_shared<Portfolio>(i, 20);

      auto keyport = CacheableKey::create("port1-1");
      regPtr0->put(keyport, port);
      SLEEP(100);  // sleep a while to allow server query to complete
    }

    LOG("StepTwo2 complete.");
    SLEEP(15000);  // sleep 0.25 min to allow server query to complete
  }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(CLIENT1, CloseCache1)
  {
    // using region name as pool name
    auto pool =
        getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]);
    std::shared_ptr<QueryService> qs;
    if (pool != nullptr) {
      qs = pool->getQueryService();
    } else {
      qs = getHelper()->cachePtr->getQueryService();
    }
    auto qry = qs->getCq(cqName);
    ASSERT(qry != nullptr, "failed to get CqQuery");
    auto cqAttr = qry->getCqAttributes();
    ASSERT(cqAttr != nullptr, "failed to get CqAttributes");
    std::shared_ptr<CqListener> cqLstner = nullptr;
    try {
      auto vl = cqAttr->getCqListeners();
      cqLstner = vl[0];
    } catch (Exception &excp) {
      char excpmsg[500] = {0};
      ACE_OS::snprintf(excpmsg, 499, "Exception: %s", excp.what());
      LOG(excpmsg);
      ASSERT(false, "get listener failed");
    }
    ASSERT(cqLstner != nullptr, "listener is nullptr");
    MyCqListener *myListener = dynamic_cast<MyCqListener *>(cqLstner.get());
    ASSERT(myListener != nullptr, "my listener is nullptr<cast failed>");
    char buf[1024];
    sprintf(buf, "after failed over: before=%d, after=%d",
            myListener->getCountBefore(), myListener->getCountAfter());
    LOG(buf);
    ASSERT(myListener->getCountBefore() == 6108,
           "check cq event count before failover");
    ASSERT(myListener->getCountAfter() == 6109,
           "check cq event count after failover");
    // ASSERT(myListener->getCountAfter()>0, "no cq after failover");

    LOG("cleanProc 1...");
    cleanProc();
  }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, CloseCache2)
  {
    LOG("cleanProc 2...");
    cleanProc();
  }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(SERVER2, CloseServer2)
  {
    LOG("closing Server2...");
    if (isLocalServer) {
      CacheHelper::closeServer(2);
      LOG("SERVER2 stopped");
    }
  }
END_TASK_DEFINITION

DUNIT_TASK_DEFINITION(SERVER1, CloseLocator)
  {
    if (isLocator) {
      CacheHelper::closeLocator(1);
      LOG("Locator1 stopped");
    }
  }
END_TASK_DEFINITION

/*
DUNIT_TASK(SERVER1,CloseServer1)
{
  LOG("closing Server1...");
  if ( isLocalServer ) {
    CacheHelper::closeServer( 1 );
    LOG("SERVER1 stopped");
  }
}
END_TASK(CloseServer1)
*/

void doThinClientCqHAFailover() {
  CALL_TASK(CreateLocator);
  CALL_TASK(CreateServer1_Locator);

  CALL_TASK(StepTwo_Locator);
  CALL_TASK(StepOne_PoolLocator);
  CALL_TASK(StepOne2_PoolLocator);

  CALL_TASK(StepThree);
  CALL_TASK(StepThree2);
  CALL_TASK(StepThree3);
  CALL_TASK(StepThree4);
  CALL_TASK(CloseCache1);
  CALL_TASK(CloseCache2);
  CALL_TASK(CloseServer2);

  CALL_TASK(CloseLocator);
}

DUNIT_MAIN
  { doThinClientCqHAFailover(); }
END_MAIN
