blob: 9d34fad671c381d11f5960b697df6187d6a40a8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fw_dunit.hpp"
#include <geode/CqAttributesFactory.hpp>
#include <geode/CqAttributes.hpp>
#include <geode/CqListener.hpp>
#include <geode/CqStatusListener.hpp>
#include <geode/CqQuery.hpp>
#include <geode/CqServiceStatistics.hpp>
#include <ace/OS.h>
#include <ace/High_Res_Timer.h>
#include <string>
#define ROOT_NAME "TestThinClientCq"
#define ROOT_SCOPE DISTRIBUTED_ACK
#include "CacheHelper.hpp"
#include "QueryStrings.hpp"
#include "QueryHelper.hpp"
#include <geode/Query.hpp>
#include <geode/QueryService.hpp>
#include "ThinClientCQ.hpp"
#define CLIENT1 s1p1
#define SERVER1 s2p1
#define CLIENT2 s1p2
#define LOCATORSERVER s2p2
#define MAX_LISTNER 8
using apache::geode::client::Cacheable;
using apache::geode::client::CqAttributesFactory;
using apache::geode::client::CqEvent;
using apache::geode::client::CqListener;
using apache::geode::client::CqOperation;
using apache::geode::client::CqStatusListener;
using apache::geode::client::Exception;
using apache::geode::client::IllegalStateException;
using apache::geode::client::QueryService;
// CacheHelper* cacheHelper = nullptr;
static bool m_isPdx = false;
const char *locHostPort =
CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
const char *cqNames[MAX_LISTNER] = {"MyCq_0", "MyCq_1", "MyCq_2", "MyCq_3",
"MyCq_4", "MyCq_5", "MyCq_6", "MyCq_7"};
const char *regionName = "DistRegionAck";
const char *regionName1 = "DistRegionAck1";
const char *cqName = "testCQAllServersLeave";
const char *cqName1 = "testCQAllServersLeave1";
const char *cqQueryStatusString = "select * from /DistRegionAck";
const char *cqQueryStatusString1 = "select * from /DistRegionAck1";
const char *queryStrings[MAX_LISTNER] = {
"select * from /Portfolios p where p.ID < 4",
"select * from /Portfolios p where p.ID < 2",
"select * from /Portfolios p where p.ID != 2",
"select * from /Portfolios p where p.ID != 3",
"select * from /Portfolios p where p.ID != 4",
"select * from /Portfolios p where p.ID != 5",
"select * from /Portfolios p where p.ID != 6",
"select * from /Portfolios p where p.ID != 7"};
void initClientCq(const bool isthinClient) {
if (cacheHelper == nullptr) {
cacheHelper = new CacheHelper(isthinClient);
}
ASSERT(cacheHelper, "Failed to create a CacheHelper client instance.");
try {
auto serializationRegistry =
CacheRegionHelper::getCacheImpl(cacheHelper->getCache().get())
->getSerializationRegistry();
serializationRegistry->addDataSerializableType(
Position::createDeserializable, 2);
serializationRegistry->addDataSerializableType(
Portfolio::createDeserializable, 3);
serializationRegistry->addPdxSerializableType(
PositionPdx::createDeserializable);
serializationRegistry->addPdxSerializableType(
PortfolioPdx::createDeserializable);
} catch (const IllegalStateException &) {
// ignore exception
}
}
const char *regionNamesCq[] = {"Portfolios", "Positions", "Portfolios2",
"Portfolios3"};
class MyCqListener1026 : public CqListener {
public:
void close() override { LOGINFO("MyCqListener::close called"); }
void onError(const CqEvent &) override {
LOGINFO("MyCqListener::OnError called");
}
void onEvent(const CqEvent &) override {
LOGINFO("MyCqListener::onEvent called");
}
};
class MyCqListener : public CqListener {
uint8_t m_id;
uint32_t m_numInserts;
uint32_t m_numUpdates;
uint32_t m_numDeletes;
uint32_t m_numEvents;
public:
uint8_t getId() { return m_id; }
uint32_t getNumInserts() { return m_numInserts; }
uint32_t getNumUpdates() { return m_numUpdates; }
uint32_t getNumDeletes() { return m_numDeletes; }
uint32_t getNumEvents() { return m_numEvents; }
explicit MyCqListener(uint8_t id)
: m_id(id),
m_numInserts(0),
m_numUpdates(0),
m_numDeletes(0),
m_numEvents(0) {}
inline void updateCount(const CqEvent &cqEvent) {
m_numEvents++;
switch (cqEvent.getQueryOperation()) {
case CqOperation::OP_TYPE_CREATE:
m_numInserts++;
break;
case CqOperation::OP_TYPE_UPDATE:
m_numUpdates++;
break;
case CqOperation::OP_TYPE_DESTROY:
m_numDeletes++;
break;
default:
break;
}
}
void onEvent(const CqEvent &cqe) {
// LOG("MyCqListener::OnEvent called");
updateCount(cqe);
}
void onError(const CqEvent &cqe) {
updateCount(cqe);
// LOG("MyCqListener::OnError called");
}
void close() {
// LOG("MyCqListener::close called");
}
};
class MyCqStatusListener : public CqStatusListener {
uint8_t m_id;
uint32_t m_numInserts;
uint32_t m_numUpdates;
uint32_t m_numDeletes;
uint32_t m_numEvents;
uint32_t m_cqsConnectedCount;
uint32_t m_cqsDisconnectedCount;
public:
uint8_t getId() { return m_id; }
uint32_t getNumInserts() { return m_numInserts; }
uint32_t getNumUpdates() { return m_numUpdates; }
uint32_t getNumDeletes() { return m_numDeletes; }
uint32_t getNumEvents() { return m_numEvents; }
uint32_t getCqsConnectedCount() { return m_cqsConnectedCount; }
uint32_t getCqsDisConnectedCount() { return m_cqsDisconnectedCount; }
explicit MyCqStatusListener(uint8_t id)
: m_id(id),
m_numInserts(0),
m_numUpdates(0),
m_numDeletes(0),
m_numEvents(0),
m_cqsConnectedCount(0),
m_cqsDisconnectedCount(0) {}
inline void updateCount(const CqEvent &cqEvent) {
m_numEvents++;
switch (cqEvent.getQueryOperation()) {
case CqOperation::OP_TYPE_CREATE: {
m_numInserts++;
LOG("MyCqStatusListener::OnEvent OP_TYPE_CREATE");
break;
}
case CqOperation::OP_TYPE_UPDATE: {
LOG("MyCqStatusListener::OnEvent OP_TYPE_UPDATE");
m_numUpdates++;
break;
}
case CqOperation::OP_TYPE_DESTROY: {
LOG("MyCqStatusListener::OnEvent OP_TYPE_DESTROY");
m_numDeletes++;
break;
}
default:
break;
}
}
void onEvent(const CqEvent &cqe) {
LOGINFO("MyCqStatusListener::OnEvent %d called", m_id);
updateCount(cqe);
}
void onError(const CqEvent &cqe) {
updateCount(cqe);
LOGINFO("MyCqStatusListener::OnError %d called", m_id);
}
void close() { LOGINFO("MyCqStatusListener::close %d called", m_id); }
void onCqDisconnected() {
LOGINFO("MyCqStatusListener %d got onCqDisconnected", m_id);
m_cqsDisconnectedCount++;
}
void onCqConnected() {
LOGINFO("MyCqStatusListener %d got onCqConnected", m_id);
m_cqsConnectedCount++;
}
void clear() {
m_numInserts = 0;
m_numUpdates = 0;
m_numDeletes = 0;
m_numEvents = 0;
m_cqsDisconnectedCount = 0;
m_cqsConnectedCount = 0;
}
};
DUNIT_TASK_DEFINITION(LOCATORSERVER, CreateLocator)
{
if (isLocator) CacheHelper::initLocator(1);
LOG("Locator1 started");
}
END_TASK_DEFINITION
void createServer(bool locator = false) {
LOG("Starting SERVER1...");
if (isLocalServer) {
CacheHelper::initServer(1, "remotequery.xml",
locator ? locHostPort : nullptr);
}
LOG("SERVER1 started");
}
void createServer2(bool locator = false) {
LOG("Starting SERVER2...");
if (isLocalServer) {
CacheHelper::initServer(2, "remotequery2.xml",
locator ? locHostPort : nullptr);
}
LOG("SERVER2 started");
}
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1)
{ createServer(false); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(LOCATORSERVER, CreateServer2)
{ createServer2(false); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(LOCATORSERVER, CreateServer2_Locator)
{ createServer2(true); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1_Locator)
{ createServer(true); }
END_TASK_DEFINITION
void createServer_group(bool locator, const char *XML) {
LOG("Starting SERVER1...");
if (isLocalServer) {
CacheHelper::initServer(1, XML, locator ? locHostPort : nullptr);
}
LOG("SERVER1 started");
}
void createServer_group2(bool locator, const char *XML) {
LOG("Starting SERVER2...");
if (isLocalServer) {
CacheHelper::initServer(2, XML, locator ? locHostPort : nullptr);
}
LOG("SERVER2 started");
}
DUNIT_TASK_DEFINITION(SERVER1, CreateServer_servergrop)
{ createServer_group(true, "cacheserver_servergroup.xml"); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(LOCATORSERVER, CreateServer_servergrop2)
{ createServer_group2(true, "cacheserver_servergroup2.xml"); }
END_TASK_DEFINITION
void stepOne() {
initClientCq(true);
createRegionForCQ(regionNamesCq[0], USE_ACK, true);
auto regptr = getHelper()->getRegion(regionNamesCq[0]);
auto subregPtr =
regptr->createSubregion(regionNamesCq[1], regptr->getAttributes());
LOG("StepOne complete.");
}
void initCqStatusClient() {
if (cacheHelper == nullptr) {
cacheHelper = new CacheHelper(true);
}
ASSERT(cacheHelper, "Failed to create a CacheHelper client instance.");
createRegionForCQ(regionName, USE_ACK, true);
createRegionForCQ(regionName1, USE_ACK, true);
LOG("initCqStatusClient complete.");
}
DUNIT_TASK_DEFINITION(CLIENT1, initCqStatusClientLoc)
{ initCqStatusClient(); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepOne_Pooled_Locator)
{
initClient(true);
getHelper()->createPoolWithLocators("__TEST_POOL1__", locHostPort, true, -1,
std::chrono::seconds::zero(), -1, false,
"group1");
getHelper()->createRegionAndAttachPool(regionName, USE_ACK,
"__TEST_POOL1__", true);
getHelper()->createPoolWithLocators("__TEST_POOL2__", locHostPort, true, -1,
std::chrono::seconds::zero(), -1, false,
"group2");
getHelper()->createRegionAndAttachPool(regionName1, USE_ACK,
"__TEST_POOL2__", true);
LOG("StepOne_Pooled_Locator complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepOne_PoolLocator)
{ stepOne(); }
END_TASK_DEFINITION
void stepOne2() {
initClientCq(true);
createRegionForCQ(regionNamesCq[0], USE_ACK, true);
auto regptr = getHelper()->getRegion(regionNamesCq[0]);
auto subregPtr =
regptr->createSubregion(regionNamesCq[1], regptr->getAttributes());
LOG("StepOne2 complete.");
}
DUNIT_TASK_DEFINITION(CLIENT2, StepOne2_PoolLocator)
{ stepOne2(); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepTwo)
{
auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
QueryHelper *qh = &QueryHelper::getHelper();
if (!m_isPdx) {
qh->populatePortfolioData(regPtr0, 2, 1, 1);
qh->populatePositionData(subregPtr0, 2, 1);
} else {
qh->populatePortfolioPdxData(regPtr0, 2, 1, 1);
qh->populatePositionPdxData(subregPtr0, 2, 1);
}
LOG("StepTwo complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepThree)
{
uint8_t i = 0;
QueryHelper::getHelper();
auto pool =
getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]);
std::shared_ptr<QueryService> qs;
if (pool != nullptr) {
// Using region name as pool name as in ThinClientCq.hpp
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
for (i = 0; i < MAX_LISTNER; i++) {
auto cqLstner = std::make_shared<MyCqListener>(i);
cqFac.addCqListener(cqLstner);
auto cqAttr = cqFac.create();
auto qry = qs->newCq(cqNames[i], queryStrings[i], cqAttr);
}
try {
LOG("EXECUTE 1 START");
qs->executeCqs();
LOG("EXECUTE 1 STOP");
} catch (const Exception &excp) {
std::string logmsg = "";
logmsg += excp.getName();
logmsg += ": ";
logmsg += excp.what();
LOG(logmsg.c_str());
LOG(excp.getStackTrace());
}
/* Test for #1026 */
{
try {
LOG("Testing bug #1026");
auto regionPtr = getHelper()->getRegion(regionNamesCq[0]);
regionPtr->put("1026_Key1", "asdas");
regionPtr->put("1026_Key2",
"Schüler"); // string with extended charater set
regionPtr->put("1026_Key3",
"Gebäude"); // string with extended charater set
regionPtr->put("1026_Key4",
"Königin"); // string with extended charater set
const char *qryStr = "select * from /Portfolios p where p='Schüler'";
std::shared_ptr<CqListener> cqLstner(new MyCqListener1026);
cqFac.addCqListener(cqLstner);
auto cqAttr = cqFac.create();
auto qry = qs->newCq("1026_MyCq", qryStr, cqAttr);
// execute Cq Query with initial Results
auto resultsPtr = qry->executeWithInitialResults();
LOGINFO("ResultSet Query returned %d rows", resultsPtr->size());
LOG("Testing bug #1026 Complete");
// Iterate through the rows of the query result.
} catch (const Exception &geodeExcp) {
LOGERROR("CqQuery Geode Exception: %s", geodeExcp.what());
FAIL(geodeExcp.what());
}
}
LOG("StepThree complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, StepTwo2)
{
auto regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
QueryHelper *qh = &QueryHelper::getHelper();
if (!m_isPdx) {
qh->populatePortfolioData(regPtr0, 3, 2, 1);
qh->populatePositionData(subregPtr0, 3, 2);
} else {
qh->populatePortfolioPdxData(regPtr0, 3, 2, 1);
qh->populatePositionPdxData(subregPtr0, 3, 2);
}
std::shared_ptr<Cacheable> port = nullptr;
for (int i = 1; i < 3; i++) {
if (!m_isPdx) {
port = std::shared_ptr<Cacheable>(new Portfolio(i, 2));
} else {
port = std::shared_ptr<Cacheable>(new PortfolioPdx(i, 2));
}
auto keyport = CacheableKey::create("port1-1");
regPtr0->put(keyport, port);
SLEEP(10); // sleep a while to allow server query to complete
}
LOG("StepTwo2 complete. Sleeping .25 min for server query to complete...");
SLEEP(15000); // sleep .25 min to allow server query to complete
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepFour)
{
QueryHelper::getHelper();
auto pool =
getHelper()->getCache()->getPoolManager().find(regionNamesCq[0]);
std::shared_ptr<QueryService> qs;
if (pool != nullptr) {
// Using region name as pool name as in ThinClientCq.hpp
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
char buf[1024];
uint8_t i = 0;
int j = 0;
uint32_t inserts[MAX_LISTNER];
uint32_t updates[MAX_LISTNER];
uint32_t deletes[MAX_LISTNER];
uint32_t events[MAX_LISTNER];
for (i = 0; i < MAX_LISTNER; i++) {
inserts[i] = 0;
updates[i] = 0;
deletes[i] = 0;
events[i] = 0;
}
CqAttributesFactory cqFac;
for (i = 0; i < MAX_LISTNER; i++) {
sprintf(buf, "get info for cq[%s]:", cqNames[i]);
LOG(buf);
auto cqy = qs->getCq(cqNames[i]);
auto cqStats = cqy->getStatistics();
sprintf(buf,
"Cq[%s]From CqStatistics: numInserts[%d], numDeletes[%d], "
"numUpdates[%d], numEvents[%d]",
cqNames[i], cqStats->numInserts(), cqStats->numDeletes(),
cqStats->numUpdates(), cqStats->numEvents());
LOG(buf);
for (j = 0; j <= i; j++) {
inserts[j] += cqStats->numInserts();
updates[j] += cqStats->numUpdates();
deletes[j] += cqStats->numDeletes();
events[j] += cqStats->numEvents();
}
auto cqAttr = cqy->getCqAttributes();
auto vl = cqAttr->getCqListeners();
sprintf(buf, "number of listeners for cq[%s] is %zd", cqNames[i],
vl.size());
LOG(buf);
ASSERT(vl.size() == static_cast<size_t>(i + 1),
"incorrect number of listeners");
if (i == (MAX_LISTNER - 1)) {
MyCqListener *myLl[MAX_LISTNER];
for (int k = 0; k < MAX_LISTNER; k++) {
MyCqListener *ml = dynamic_cast<MyCqListener *>(vl[k].get());
myLl[ml->getId()] = ml;
}
for (j = 0; j < MAX_LISTNER; j++) {
MyCqListener *ml = myLl[j];
sprintf(buf,
"MyCount for Listener[%d]: numInserts[%d], numDeletes[%d], "
"numUpdates[%d], numEvents[%d]",
j, ml->getNumInserts(), ml->getNumDeletes(),
ml->getNumUpdates(), ml->getNumEvents());
LOG(buf);
sprintf(buf,
"sum of stats for Listener[%d]: numInserts[%d], "
"numDeletes[%d], numUpdates[%d], numEvents[%d]",
j, inserts[j], deletes[j], updates[j], events[j]);
LOG(buf);
ASSERT(ml->getNumInserts() == inserts[j],
"accumulative insert count incorrect");
ASSERT(ml->getNumUpdates() == updates[j],
"accumulative updates count incorrect");
ASSERT(ml->getNumDeletes() == deletes[j],
"accumulative deletes count incorrect");
ASSERT(ml->getNumEvents() == events[j],
"accumulative events count incorrect");
}
LOG("removing listener");
auto cqAttrMtor = cqy->getCqAttributesMutator();
auto ptr = vl[0];
cqAttrMtor.removeCqListener(ptr);
vl = cqAttr->getCqListeners();
sprintf(buf, "number of listeners for cq[%s] is %zd", cqNames[i],
vl.size());
LOG(buf);
ASSERT(vl.size() == i, "incorrect number of listeners");
}
}
try {
auto cqy = qs->getCq(cqNames[1]);
cqy->stop();
cqy = qs->getCq(cqNames[6]);
sprintf(buf, "cq[%s] should have been running!", cqNames[6]);
ASSERT(cqy->isRunning() == true, buf);
bool got_exception = false;
try {
cqy->execute();
} catch (IllegalStateException &excp) {
std::string failmsg = "";
failmsg += excp.getName();
failmsg += ": ";
failmsg += excp.what();
LOG(failmsg.c_str());
got_exception = true;
}
sprintf(buf, "cq[%s] should gotten exception!", cqNames[6]);
ASSERT(got_exception == true, buf);
cqy->stop();
sprintf(buf, "cq[%s] should have been stopped!", cqNames[6]);
ASSERT(cqy->isStopped() == true, buf);
cqy = qs->getCq(cqNames[2]);
cqy->close();
sprintf(buf, "cq[%s] should have been closed!", cqNames[2]);
ASSERT(cqy->isClosed() == true, buf);
cqy = qs->getCq(cqNames[2]);
sprintf(buf, "cq[%s] should have been removed after close!", cqNames[2]);
ASSERT(cqy == nullptr, buf);
} catch (Exception &excp) {
std::string failmsg = "";
failmsg += excp.getName();
failmsg += ": ";
failmsg += excp.what();
LOG(failmsg.c_str());
FAIL(failmsg.c_str());
LOG(excp.getStackTrace());
}
auto serviceStats = qs->getCqServiceStatistics();
ASSERT(serviceStats != nullptr, "serviceStats is nullptr");
sprintf(buf,
"numCqsActive=%d, numCqsCreated=%d, "
"numCqsClosed=%d,numCqsStopped=%d, numCqsOnClient=%d",
serviceStats->numCqsActive(), serviceStats->numCqsCreated(),
serviceStats->numCqsClosed(), serviceStats->numCqsStopped(),
serviceStats->numCqsOnClient());
LOG(buf);
/*
for(i=0; i < MAX_LISTNER; i++)
{
auto cqy = qs->getCq(cqNames[i]);
CqState state = cqy->getState();
CqState cqState;
cqState.setState(state);
sprintf(buf, "cq[%s] is in state[%s]", cqNames[i], cqState.toString());
LOG(buf);
}
*/
ASSERT(serviceStats->numCqsActive() == 6, "active count incorrect!");
ASSERT(serviceStats->numCqsCreated() == 9, "created count incorrect!");
ASSERT(serviceStats->numCqsClosed() == 1, "closed count incorrect!");
ASSERT(serviceStats->numCqsStopped() == 2, "stopped count incorrect!");
ASSERT(serviceStats->numCqsOnClient() == 8, "cq count incorrect!");
try {
qs->stopCqs();
} catch (Exception &excp) {
std::string failmsg = "";
failmsg += excp.getName();
failmsg += ": ";
failmsg += excp.what();
LOG(failmsg.c_str());
FAIL(failmsg.c_str());
LOG(excp.getStackTrace());
}
sprintf(buf,
"numCqsActive=%d, numCqsCreated=%d, "
"numCqsClosed=%d,numCqsStopped=%d, numCqsOnClient=%d",
serviceStats->numCqsActive(), serviceStats->numCqsCreated(),
serviceStats->numCqsClosed(), serviceStats->numCqsStopped(),
serviceStats->numCqsOnClient());
LOG(buf);
ASSERT(serviceStats->numCqsActive() == 0, "active count incorrect!");
ASSERT(serviceStats->numCqsCreated() == 9, "created count incorrect!");
ASSERT(serviceStats->numCqsClosed() == 1, "closed count incorrect!");
ASSERT(serviceStats->numCqsStopped() == 8, "stopped count incorrect!");
ASSERT(serviceStats->numCqsOnClient() == 8, "cq count incorrect!");
try {
qs->closeCqs();
} catch (Exception &excp) {
std::string failmsg = "";
failmsg += excp.getName();
failmsg += ": ";
failmsg += excp.what();
LOG(failmsg.c_str());
FAIL(failmsg.c_str());
LOG(excp.getStackTrace());
}
sprintf(buf,
"numCqsActive=%d, numCqsCreated=%d, "
"numCqsClosed=%d,numCqsStopped=%d, numCqsOnClient=%d",
serviceStats->numCqsActive(), serviceStats->numCqsCreated(),
serviceStats->numCqsClosed(), serviceStats->numCqsStopped(),
serviceStats->numCqsOnClient());
LOG(buf);
ASSERT(serviceStats->numCqsActive() == 0, "active count incorrect!");
ASSERT(serviceStats->numCqsCreated() == 9, "created count incorrect!");
ASSERT(serviceStats->numCqsClosed() == 9, "closed count incorrect!");
ASSERT(serviceStats->numCqsStopped() == 0, "stopped count incorrect!");
ASSERT(serviceStats->numCqsOnClient() == 0, "cq count incorrect!");
i = 0;
auto cqLstner = std::make_shared<MyCqListener>(i);
cqFac.addCqListener(cqLstner);
auto cqAttr = cqFac.create();
try {
auto qry = qs->newCq(cqNames[i], queryStrings[i], cqAttr);
qry->execute();
qry->stop();
qry->close();
} catch (Exception &excp) {
std::string failmsg = "";
failmsg += excp.getName();
failmsg += ": ";
failmsg += excp.what();
LOG(failmsg.c_str());
FAIL(failmsg.c_str());
}
LOG("StepFour complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CloseCache1)
{
LOG("cleanProc 1...");
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, CloseCache2)
{
LOG("cleanProc 2...");
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CloseServer1)
{
LOG("closing Server1...");
if (isLocalServer) {
CacheHelper::closeServer(1);
LOG("SERVER1 stopped");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(LOCATORSERVER, CloseServer2)
{
LOG("closing Server2...");
if (isLocalServer) {
CacheHelper::closeServer(2);
LOG("SERVER2 stopped");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(LOCATORSERVER, CloseLocator)
{
if (isLocator) {
CacheHelper::closeLocator(1);
LOG("Locator1 stopped");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, SetPortfolioTypeToPdxC1)
{ m_isPdx = true; }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, UnsetPortfolioTypeToPdxC1)
{ m_isPdx = false; }
END_TASK_DEFINITION
//
DUNIT_TASK_DEFINITION(CLIENT2, SetPortfolioTypeToPdxC2)
{ m_isPdx = true; }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, UnsetPortfolioTypeToPdxC2)
{ m_isPdx = false; }
END_TASK_DEFINITION
//
void doThinClientCq() {
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
CALL_TASK(StepOne_PoolLocator);
CALL_TASK(StepOne2_PoolLocator);
CALL_TASK(StepTwo);
CALL_TASK(StepThree);
CALL_TASK(StepTwo2);
CALL_TASK(StepFour);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer1);
CALL_TASK(CloseLocator);
}
DUNIT_TASK_DEFINITION(CLIENT1, createCQ)
{
SLEEP(10000);
// Create CqAttributes and Install Listener
auto pool = getHelper()->getCache()->getPoolManager().find(regionName);
auto qs = pool->getQueryService();
CqAttributesFactory cqFac;
auto cqLstner = std::make_shared<MyCqStatusListener>(100);
cqFac.addCqListener(cqLstner);
auto cqAttr = cqFac.create();
auto cq =
qs->newCq(const_cast<char *>(cqName), cqQueryStatusString, cqAttr);
cq->execute();
SLEEP(20000);
cqAttr = cq->getCqAttributes();
auto vl = cqAttr->getCqListeners();
MyCqStatusListener *myStatusCq =
dynamic_cast<MyCqStatusListener *>(vl[0].get());
LOGINFO("checkCQStatusOnConnect = %d ", myStatusCq->getCqsConnectedCount());
ASSERT(myStatusCq->getCqsConnectedCount() == 1,
"incorrect number of CqStatus Connected count.");
LOG("createCQ complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, createCQ_Pool)
{
auto pool =
getHelper()->getCache()->getPoolManager().find("__TEST_POOL1__");
auto qs = pool->getQueryService();
CqAttributesFactory cqFac;
auto cqLstner = std::make_shared<MyCqStatusListener>(100);
cqFac.addCqListener(cqLstner);
auto cqAttr = cqFac.create();
auto cq =
qs->newCq(const_cast<char *>(cqName), cqQueryStatusString, cqAttr);
cq->execute();
SLEEP(20000);
cqAttr = cq->getCqAttributes();
auto vl = cqAttr->getCqListeners();
MyCqStatusListener *myStatusCq =
dynamic_cast<MyCqStatusListener *>(vl[0].get());
LOGINFO("checkCQStatusOnConnect = %d ", myStatusCq->getCqsConnectedCount());
ASSERT(myStatusCq->getCqsConnectedCount() == 1,
"incorrect number of CqStatus Connected count.");
auto pool2 =
getHelper()->getCache()->getPoolManager().find("__TEST_POOL2__");
auto qs2 = pool2->getQueryService();
CqAttributesFactory cqFac1;
auto cqLstner1 = std::make_shared<MyCqStatusListener>(101);
cqFac1.addCqListener(cqLstner1);
auto cqAttr1 = cqFac1.create();
auto cq2 =
qs2->newCq(const_cast<char *>(cqName1), cqQueryStatusString1, cqAttr1);
cq2->execute();
SLEEP(20000);
cqAttr1 = cq2->getCqAttributes();
auto vl2 = cqAttr1->getCqListeners();
MyCqStatusListener *myStatusCq2 =
dynamic_cast<MyCqStatusListener *>(vl2[0].get());
LOGINFO("checkCQStatusOnConnect = %d ",
myStatusCq2->getCqsConnectedCount());
ASSERT(myStatusCq2->getCqsConnectedCount() == 1,
"incorrect number of CqStatus Connected count.");
auto regPtr0 = getHelper()->getRegion(regionName);
auto regPtr1 = getHelper()->getRegion(regionName1);
std::shared_ptr<Cacheable> val = nullptr;
char KeyStr[256] = {0};
char valStr[256] = {0};
for (int i = 1; i <= 5; i++) {
ACE_OS::snprintf(KeyStr, 256, "Key-%d ", i);
ACE_OS::snprintf(valStr, 256, "val-%d ", i);
auto keyport = CacheableKey::create(KeyStr);
auto valport = CacheableString::create(valStr);
regPtr0->put(keyport, valport);
regPtr1->put(keyport, valport);
SLEEP(10 * 1000); // sleep a while to allow server query to complete
}
LOGINFO("putEntries complete");
LOGINFO("checkCQStatusOnPutEvent = %d ", myStatusCq->getNumInserts());
ASSERT(myStatusCq->getNumInserts() == 5,
"incorrect number of CqStatus Updates count.");
LOGINFO("checkCQStatusOnPutEvent = %d ", myStatusCq2->getNumInserts());
ASSERT(myStatusCq2->getNumInserts() == 5,
"incorrect number of CqStatus Updates count.");
LOG("createCQ_Pool complete.");
}
END_TASK_DEFINITION
void executeCq(const char *poolName, const char *name) {
auto pool = getHelper()->getCache()->getPoolManager().find(poolName);
std::shared_ptr<QueryService> qs;
if (pool != nullptr) {
qs = pool->getQueryService();
}
auto cq = qs->getCq(const_cast<char *>(name));
cq->execute();
SLEEP(20000);
LOG("executeCq complete");
}
DUNIT_TASK_DEFINITION(CLIENT1, executeCQ)
{
executeCq(regionName, cqName);
LOG("executeCQ complete.");
}
END_TASK_DEFINITION
void checkCQStatusOnConnect(const char *poolName, const char *name,
uint32_t connect) {
auto pool = getHelper()->getCache()->getPoolManager().find(poolName);
std::shared_ptr<QueryService> qs;
if (pool != nullptr) {
qs = pool->getQueryService();
}
auto cq = qs->getCq(const_cast<char *>(name));
auto cqAttr = cq->getCqAttributes();
auto vl = cqAttr->getCqListeners();
MyCqStatusListener *myStatusCq =
dynamic_cast<MyCqStatusListener *>(vl[0].get());
LOGINFO("checkCQStatusOnConnect = %d ", myStatusCq->getCqsConnectedCount());
ASSERT(myStatusCq->getCqsConnectedCount() == connect,
"incorrect number of CqStatus Connected count.");
}
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnConnect)
{
SLEEP(20000);
checkCQStatusOnConnect(regionName, const_cast<char *>(cqName), 1);
LOG("checkCQStatusOnConnect complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnConnect2)
{
SLEEP(20000);
checkCQStatusOnConnect(regionName, const_cast<char *>(cqName), 2);
LOG("checkCQStatusOnConnect2 complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnConnect_Pool)
{
SLEEP(20000);
checkCQStatusOnConnect("__TEST_POOL1__", const_cast<char *>(cqName), 1);
checkCQStatusOnConnect("__TEST_POOL2__", const_cast<char *>(cqName1), 1);
LOG("checkCQStatusOnConnect_Pool complete.");
}
END_TASK_DEFINITION
void checkCQStatusOnDisConnect(const char *poolName, const char *cqName,
uint32_t disconnect) {
auto pool = getHelper()->getCache()->getPoolManager().find(poolName);
std::shared_ptr<QueryService> qs;
if (pool != nullptr) {
qs = pool->getQueryService();
}
auto cq = qs->getCq(const_cast<char *>(cqName));
auto cqAttr = cq->getCqAttributes();
auto vl = cqAttr->getCqListeners();
auto myStatusCq = std::dynamic_pointer_cast<MyCqStatusListener>(vl[0]);
LOGINFO("checkCQStatusOnDisConnect = %d ",
myStatusCq->getCqsDisConnectedCount());
ASSERT(myStatusCq->getCqsDisConnectedCount() == disconnect,
"incorrect number of CqStatus Disconnected count.");
}
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnDisConnect0)
{
SLEEP(20000);
checkCQStatusOnDisConnect(regionName, const_cast<char *>(cqName), 0);
LOG("checkCQStatusOnDisConnect0 complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnDisConnect_Pool)
{
SLEEP(20000);
checkCQStatusOnDisConnect("__TEST_POOL1__", const_cast<char *>(cqName), 1);
checkCQStatusOnDisConnect("__TEST_POOL2__", const_cast<char *>(cqName1), 1);
LOG("checkCQStatusOnDisConnect_Pool complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnDisConnect1)
{
SLEEP(20000);
checkCQStatusOnDisConnect(regionName, const_cast<char *>(cqName), 1);
LOG("checkCQStatusOnDisConnect1 complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnDisConnect2)
{
SLEEP(20000);
checkCQStatusOnDisConnect(regionName, const_cast<char *>(cqName), 2);
LOG("checkCQStatusOnDisConnect2 complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, putEntries)
{
auto regPtr0 = getHelper()->getRegion(regionName);
auto regPtr1 = getHelper()->getRegion(regionName1);
std::shared_ptr<Cacheable> val = nullptr;
char KeyStr[256] = {0};
char valStr[256] = {0};
for (int i = 1; i <= 5; i++) {
ACE_OS::snprintf(KeyStr, 256, "Key-%d ", i);
ACE_OS::snprintf(valStr, 256, "val-%d ", i);
auto keyport = CacheableKey::create(KeyStr);
auto valport = CacheableString::create(valStr);
regPtr0->put(keyport, valport);
regPtr1->put(keyport, valport);
SLEEP(10 * 1000); // sleep a while to allow server query to complete
}
LOGINFO("putEntries complete");
}
END_TASK_DEFINITION
void checkCQStatusOnPutEvent(const char *poolName, const char *cqName,
uint32_t count) {
auto pool = getHelper()->getCache()->getPoolManager().find(poolName);
std::shared_ptr<QueryService> qs;
if (pool != nullptr) {
qs = pool->getQueryService();
}
auto cq = qs->getCq(const_cast<char *>(cqName));
auto cqAttr = cq->getCqAttributes();
auto vl = cqAttr->getCqListeners();
MyCqStatusListener *myStatusCq =
dynamic_cast<MyCqStatusListener *>(vl[0].get());
LOGINFO("checkCQStatusOnPutEvent = %d ", myStatusCq->getNumInserts());
ASSERT(myStatusCq->getNumInserts() == count,
"incorrect number of CqStatus Updates count.");
}
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnPutEvent)
{
checkCQStatusOnPutEvent(regionName, cqName, 5);
LOG("checkCQStatusOnPutEvent complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, checkCQStatusOnPutEvent_Pool)
{
checkCQStatusOnPutEvent("__TEST_POOL1__", cqName, 5);
checkCQStatusOnPutEvent("__TEST_POOL2__", cqName1, 5);
LOG("checkCQStatusOnPutEvent_Pool complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, ProcessCQ)
{
SLEEP(10000);
// Create CqAttributes and Install Listener
auto pool = getHelper()->getCache()->getPoolManager().find(regionName);
auto qs = pool->getQueryService();
CqAttributesFactory cqFac;
auto cqLstner = std::make_shared<MyCqListener>(1);
auto cqStatusLstner = std::make_shared<MyCqStatusListener>(100);
cqFac.addCqListener(cqLstner);
cqFac.addCqListener(cqStatusLstner);
auto cqAttr = cqFac.create();
auto cq =
qs->newCq(const_cast<char *>(cqName), cqQueryStatusString, cqAttr);
cq->execute();
SLEEP(20000);
LOG("ProcessCQ Query executed.");
auto regPtr0 = getHelper()->getRegion(regionName);
std::shared_ptr<Cacheable> val = nullptr;
char KeyStr[256] = {0};
char valStr[256] = {0};
for (int i = 1; i <= 5; i++) {
ACE_OS::snprintf(KeyStr, 256, "Key-%d ", i);
ACE_OS::snprintf(valStr, 256, "val-%d ", i);
auto keyport = CacheableKey::create(KeyStr);
auto valport = CacheableString::create(valStr);
regPtr0->put(keyport, valport);
SLEEP(10 * 1000); // sleep a while to allow server query to complete
}
LOGINFO("putEntries complete");
cqAttr = cq->getCqAttributes();
auto vl = cqAttr->getCqListeners();
ASSERT(vl.size() == 2, "incorrect number of CqListeners count.");
MyCqStatusListener *myStatusCq =
dynamic_cast<MyCqStatusListener *>(vl[1].get());
LOGINFO("No of insert events = %d ", myStatusCq->getNumInserts());
LOGINFO("No of OnCqConnected events = %d ",
myStatusCq->getCqsConnectedCount());
ASSERT(myStatusCq->getNumInserts() == 5,
"incorrect number of CqStatus Updates count.");
ASSERT(myStatusCq->getCqsConnectedCount() == 1,
"incorrect number of CqStatus Connected count.");
MyCqListener *myCq = dynamic_cast<MyCqListener *>(vl[0].get());
LOGINFO("No of insert events = %d ", myCq->getNumInserts());
ASSERT(myCq->getNumInserts() == 5,
"incorrect number of CqStatus Updates count.");
auto cqAttrMtor = cq->getCqAttributesMutator();
auto ptr = vl[0];
cqAttrMtor.removeCqListener(ptr);
vl = cqAttr->getCqListeners();
LOGINFO("number of listeners = %d", vl.size());
ASSERT(vl.size() == 1, "incorrect number of listeners");
cqAttrMtor.removeCqListener(vl[0]);
LOGINFO("removeCqListener again");
vl = cqAttr->getCqListeners();
LOGINFO("number of listeners = %d", vl.size());
ASSERT(vl.size() == 0, "incorrect number of listeners");
std::vector<std::shared_ptr<CqListener>> v2;
v2.push_back(cqStatusLstner);
v2.push_back(cqLstner);
cqAttrMtor.setCqListeners(v2);
LOG("ProcessCQ setCqListeneres done.");
cqAttr = cq->getCqAttributes();
auto vl3 = cqAttr->getCqListeners();
ASSERT(vl3.size() == 2, "incorrect number of CqListeners count.");
auto myStatusCq2 = std::dynamic_pointer_cast<MyCqStatusListener>(vl3[0]);
myStatusCq2->clear();
for (int i = 1; i <= 5; i++) {
ACE_OS::snprintf(KeyStr, 256, "Key-%d ", i);
ACE_OS::snprintf(valStr, 256, "val-%d ", i);
auto keyport = CacheableKey::create(KeyStr);
auto valport = CacheableString::create(valStr);
regPtr0->put(keyport, valport);
SLEEP(10 * 1000); // sleep a while to allow server query to complete
}
LOGINFO("putEntries complete again");
std::vector<std::shared_ptr<CqListener>> vl21;
vl21.push_back(cqStatusLstner);
vl21.push_back(cqLstner);
cqFac.initCqListeners(vl21);
LOGINFO("initCqListeners complete.");
cqAttr = cq->getCqAttributes();
auto vl2 = cqAttr->getCqListeners();
ASSERT(vl2.size() == 2, "incorrect number of CqListeners count.");
myStatusCq2 = std::dynamic_pointer_cast<MyCqStatusListener>(vl2[0]);
LOGINFO("No of insert events = %d ", myStatusCq2->getNumUpdates());
LOGINFO("No of OnCqConnected events = %d ",
myStatusCq2->getCqsConnectedCount());
ASSERT(myStatusCq2->getNumUpdates() == 5,
"incorrect number of CqStatus Updates count.");
ASSERT(myStatusCq2->getCqsConnectedCount() == 0,
"incorrect number of CqStatus Connected count.");
auto myCq2 = std::dynamic_pointer_cast<MyCqListener>(vl2[1]);
LOGINFO("No of insert events = %d ", myCq2->getNumInserts());
ASSERT(myCq2->getNumUpdates() == 5,
"incorrect number of CqStatus Updates count.");
LOG("ProcessCQ complete.");
}
END_TASK_DEFINITION
void doThinClientCqStatus() {
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
CALL_TASK(initCqStatusClientLoc);
CALL_TASK(createCQ);
// CALL_TASK(executeCQ);
// CALL_TASK(checkCQStatusOnConnect);
CALL_TASK(putEntries);
CALL_TASK(checkCQStatusOnPutEvent);
CALL_TASK(CreateServer2_Locator);
CALL_TASK(CloseServer1);
CALL_TASK(checkCQStatusOnDisConnect0);
CALL_TASK(CloseServer2);
CALL_TASK(checkCQStatusOnDisConnect1);
CALL_TASK(CreateServer1_Locator);
CALL_TASK(checkCQStatusOnConnect2);
CALL_TASK(CloseServer1);
CALL_TASK(checkCQStatusOnDisConnect2);
CALL_TASK(CloseCache1);
CALL_TASK(CloseLocator);
}
void doThinClientCqStatus2() {
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer_servergrop);
CALL_TASK(CreateServer_servergrop2);
CALL_TASK(StepOne_Pooled_Locator);
CALL_TASK(createCQ_Pool);
CALL_TASK(CloseServer1);
CALL_TASK(CloseServer2);
CALL_TASK(checkCQStatusOnDisConnect_Pool);
CALL_TASK(CloseCache1);
CALL_TASK(CloseLocator);
}
void doThinClientCqStatus3() {
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
CALL_TASK(initCqStatusClientLoc);
CALL_TASK(ProcessCQ);
CALL_TASK(CloseServer1);
CALL_TASK(checkCQStatusOnDisConnect1);
CALL_TASK(CloseCache1);
CALL_TASK(CloseLocator);
}
void setPortfolioPdxTypeC1() { CALL_TASK(SetPortfolioTypeToPdxC1) }
void UnsetPortfolioTypeC1() { CALL_TASK(UnsetPortfolioTypeToPdxC1) }
//
void setPortfolioPdxTypeC2() { CALL_TASK(SetPortfolioTypeToPdxC2) }
void UnsetPortfolioTypeC2(){CALL_TASK(UnsetPortfolioTypeToPdxC2)}
DUNIT_MAIN {
UnsetPortfolioTypeC1();
UnsetPortfolioTypeC2();
for (int runIdx = 1; runIdx <= 2; ++runIdx) {
doThinClientCq();
setPortfolioPdxTypeC1();
setPortfolioPdxTypeC2();
}
{ doThinClientCqStatus3(); }
{ doThinClientCqStatus(); }
{ doThinClientCqStatus2(); }
}
END_MAIN