blob: 5b374e6b994e430287f73de168f383fd9b9f6cf5 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#include "fw_dunit.hpp"
#include "ThinClientHelper.hpp"
#include <gfcpp/GemfireCppCache.hpp>
#include <gfcpp/CqAttributesFactory.hpp>
#include <gfcpp/CqAttributes.hpp>
#include <gfcpp/CqListener.hpp>
#include <gfcpp/CqQuery.hpp>
#include <ace/OS.h>
#include <ace/High_Res_Timer.h>
#include <string>
#define ROOT_SCOPE DISTRIBUTED_ACK
#include "QueryStrings.hpp"
#include "QueryHelper.hpp"
#include "Query.hpp"
#include "QueryService.hpp"
#include "ThinClientCQ.hpp"
using namespace gemfire;
using namespace test;
using namespace testData;
#define CLIENT1 s1p1
#define SERVER1 s2p1
#define CLIENT2 s1p2
const char *durableIds[] = { "DurableId1" , "DurableId2" };
bool isLocalServer = false;
const char * endPoints = CacheHelper::getTcrEndpoints(isLocalServer, 1);
const char* cqName = "MyCq";
const char* durableCQNamesClient1[]={"durableCQ1Client1","durableCQ2Client1","durableCQ3Client1",
"durableCQ4Client1","durableCQ5Client1","durableCQ6Client1",
"durableCQ7Client1","durableCQ8Client1"};
const char* durableCQNamesClient2[]={"durableCQ1Client2","durableCQ2Client2","durableCQ3Client2",
"durableCQ4Client2","durableCQ5Client2","durableCQ6Client2",
"durableCQ7Client2","durableCQ8Client2"};
static bool m_isPdx = false;
void initClientWithId( bool pool, int ClientIdx, bool typeRegistered=false )
{
if(typeRegistered==false)
{
try {
Serializable::registerType(Position::createDeserializable);
Serializable::registerType(Portfolio::createDeserializable);
Serializable::registerPdxType(PositionPdx::createDeserializable);
Serializable::registerPdxType(PortfolioPdx::createDeserializable);
}
catch (const IllegalStateException& ) {
// ignore exception
}
}
PropertiesPtr pp = Properties::create();
pp->insert("durable-client-id", durableIds[ClientIdx]);
pp->insert("durable-timeout", 60);
pp->insert( "notify-ack-interval", 1 );
if (pool) {
initClient(true, pp);
}
else {
//set cache level endpoints.
initClient( endPoints, 0, pp );
}
}
class MyCqListener1 : public CqListener {
public:
static int m_cntEvents;
void onEvent(const CqEvent& cqe)
{
m_cntEvents++;
char* opStr = (char*)"Default";
CacheableInt32Ptr value( dynCast<CacheableInt32Ptr> (cqe.getNewValue()));
CacheableInt32Ptr key( dynCast<CacheableInt32Ptr> (cqe.getKey()));
switch (cqe.getQueryOperation())
{
case CqOperation::OP_TYPE_CREATE:
{
opStr = (char*)"CREATE";
break;
}
case CqOperation::OP_TYPE_UPDATE:
{
opStr = (char*)"UPDATE";
break;
}
case CqOperation::OP_TYPE_DESTROY:
{
opStr = (char*)"UPDATE";
break;
}
default:
break;
}
LOGINFO("MyCqListener1::OnEvent called with %s, key[%s], value=(%s)",
opStr, key->toString()->asChar(), value->toString()->asChar());
}
void onError(const CqEvent& cqe){
LOGINFO("MyCqListener1::OnError called");
}
void close(){
LOGINFO("MyCqListener1::close called");
}
};
int MyCqListener1::m_cntEvents = 0;
const char * regionNamesCq[] = { "Portfolios", "Positions", "Portfolios2", "Portfolios3" };
int onEventCount =0;
int onErrorCount =0;
int onEventCountBefore =0;
class MyCqListener : public CqListener {
void onEvent(const CqEvent& cqe){
// LOG("MyCqListener::OnEvent called");
onEventCount++;
}
void onError(const CqEvent& cqe){
// LOG("MyCqListener::OnError called");
onErrorCount++;
}
void close(){
LOG("MyCqListener::close called");
}
};
DUNIT_TASK_DEFINITION(SERVER1, CreateLocator)
{
if ( isLocator )
CacheHelper::initLocator( 1 );
LOG("Locator1 started");
}
END_TASK_DEFINITION
void createServer(bool locator = false)
{
LOG("Starting SERVER1...");
if ( isLocalServer ) CacheHelper::initServer( 1, "remotequery.xml", locator?locatorsG:NULL );
LOG("SERVER1 started");
}
void createServer_XML()
{
LOG("Starting SERVER...");
if ( isLocalServer ) CacheHelper::initServer( 1, "serverDurableClient.xml", NULL );
LOG("SERVER started");
}
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1)
{
createServer(false);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CreateServer)
{
createServer_XML();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1_Locator)
{
createServer(true);
}
END_TASK_DEFINITION
void stepOne(bool pool = false, bool locator = false)
{
initClientWithId(pool, 0);
createRegionForCQ( pool, locator, regionNamesCq[0], USE_ACK, endPoints, true);
RegionPtr regptr = getHelper()->getRegion(regionNamesCq[0]);
RegionAttributesPtr lattribPtr = regptr->getAttributes();
RegionPtr subregPtr = regptr->createSubregion( regionNamesCq[1], lattribPtr );
LOG( "StepOne complete." );
}
void RunDurableCqClient()
{
// Create durable client's properties using api.
PropertiesPtr pp = Properties::create();
pp->insert("durable-client-id", "DurableClientId");
pp->insert("durable-timeout", 3600);
// Create a GemFire Cache Programmatically.
CacheFactoryPtr cacheFactory = CacheFactory::createCacheFactory(pp);
CachePtr cachePtr = cacheFactory->setSubscriptionEnabled(true)
->setSubscriptionAckInterval(5000)
->setSubscriptionMessageTrackingTimeout(50000)
->create();
LOGINFO("Created the GemFire Cache Programmatically");
RegionFactoryPtr regionFactory = cachePtr->createRegionFactory(CACHING_PROXY);
// Create the Region Programmatically.
RegionPtr regionPtr = regionFactory->create("DistRegionAck");
LOGINFO("Created the Region Programmatically");
// Get the QueryService from the Cache.
QueryServicePtr qrySvcPtr = cachePtr->getQueryService();
//Create CqAttributes and Install Listener
CqAttributesFactory cqFac;
CqListenerPtr cqLstner (new MyCqListener1());
cqFac.addCqListener(cqLstner);
CqAttributesPtr cqAttr = cqFac.create();
LOGINFO("Attached CqListener");
//create a new Cq Query
const char* qryStr = "select * from /DistRegionAck ";
CqQueryPtr qry = qrySvcPtr->newCq((char*)"MyCq", qryStr, cqAttr, true);
LOGINFO("Created new CqQuery");
//execute Cq Query
qry->execute();
gemfire::millisleep(10000);
LOGINFO("Executed new CqQuery");
//Send ready for Event message to Server( only for Durable Clients ).
//Server will send queued events to client after recieving this.
cachePtr->readyForEvents();
LOGINFO("Sent ReadyForEvents message to server");
//wait for some time to recieve events
gemfire::millisleep(10000);
// Close the GemFire Cache with keepalive = true. Server will queue events for
// durable registered keys and will deliver all events when client will reconnect
// within timeout period and send "readyForEvents()"
cachePtr->close(true);
LOGINFO("Closed the GemFire Cache with keepalive as true");
}
void RunFeederClient() {
CacheFactoryPtr cacheFactory = CacheFactory::createCacheFactory();
LOGINFO("Feeder connected to the GemFire Distributed System");
CachePtr cachePtr = cacheFactory->create();
LOGINFO("Created the GemFire Cache");
RegionFactoryPtr regionFactory = cachePtr->createRegionFactory(PROXY);
LOGINFO("Created the RegionFactory");
// Create the Region Programmatically.
RegionPtr regionPtr = regionFactory->create("DistRegionAck");
LOGINFO("Created the Region Programmatically.");
for ( int i =0; i < 10; i++)
{
CacheableKeyPtr keyPtr = dynCast<CacheableKeyPtr> (CacheableInt32::create(i));
CacheableInt32Ptr valPtr = CacheableInt32::create( i );
regionPtr->put( keyPtr, valPtr );
}
gemfire::millisleep(10000);
LOGINFO("put on 0-10 keys done.");
// Close the GemFire Cache
cachePtr->close();
LOGINFO("Closed the GemFire Cache");
}
void RunFeederClient1() {
CacheFactoryPtr cacheFactory = CacheFactory::createCacheFactory();
LOGINFO("Feeder connected to the GemFire Distributed System");
CachePtr cachePtr = cacheFactory->create();
LOGINFO("Created the GemFire Cache");
RegionFactoryPtr regionFactory = cachePtr->createRegionFactory(PROXY);
LOGINFO("Created the RegionFactory");
// Create the Region Programmatically.
RegionPtr regionPtr = regionFactory->create("DistRegionAck");
LOGINFO("Created the Region Programmatically.");
for ( int i =10; i < 20; i++)
{
CacheableKeyPtr keyPtr = dynCast<CacheableKeyPtr> (CacheableInt32::create(i));
CacheableInt32Ptr valPtr = CacheableInt32::create( i );
regionPtr->put( keyPtr, valPtr );
}
gemfire::millisleep(10000);
LOGINFO("put on 0-10 keys done.");
// Close the GemFire Cache
cachePtr->close();
LOGINFO("Closed the GemFire Cache");
}
DUNIT_TASK_DEFINITION(CLIENT1, RunDurableClient)
{
RunDurableCqClient();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, RunFeeder)
{
RunFeederClient();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, RunFeeder1)
{
RunFeederClient1();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepOne)
{
stepOne();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepOne_PoolEP)
{
stepOne(true);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepOne_PoolLocator)
{
stepOne(true, true);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, VerifyEvents)
{
LOGINFO("MyCqListener1::m_cntEvents = %d ", MyCqListener1::m_cntEvents);
ASSERT(MyCqListener1::m_cntEvents == 20, "Incorrect events, expected 20");
}
END_TASK_DEFINITION
void stepOne2(bool pool = false, bool locator = false)
{
initClientWithId(pool, 1);
createRegionForCQ(pool, locator, regionNamesCq[0], USE_ACK, endPoints, true);
RegionPtr regptr = getHelper()->getRegion(regionNamesCq[0]);
RegionAttributesPtr lattribPtr = regptr->getAttributes();
RegionPtr subregPtr = regptr->createSubregion( regionNamesCq[1], lattribPtr );
LOG( "StepOne2 complete." );
}
DUNIT_TASK_DEFINITION(CLIENT2, StepOne2)
{
stepOne2();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, StepOne2_PoolEP)
{
stepOne2(true);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, StepOne2_PoolLocator)
{
stepOne2(true, true);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepTwo)
{
RegionPtr regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
RegionPtr subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
QueryHelper * qh = &QueryHelper::getHelper();
if(!m_isPdx){
qh->populatePortfolioData(regPtr0 , 130, 20, 20);
qh->populatePositionData(subregPtr0, 130, 20);
}else{
qh->populatePortfolioPdxData(regPtr0 , 130, 20, 20);
qh->populatePositionPdxData(subregPtr0, 130, 20);
}
LOG( "StepTwo complete." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepThree)
{
QueryHelper * qh = &QueryHelper::getHelper();
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
CqListenerPtr cqLstner (new MyCqListener());
cqFac.addCqListener(cqLstner);
CqAttributesPtr cqAttr = cqFac.create();
const char* qryStr = "select * from /Portfolios p where p.ID < 3";
CqQueryPtr qry = qs->newCq(cqName, qryStr, cqAttr);
SelectResultsPtr results;
try
{
LOG("EXECUTE 1 START");
results = qry->executeWithInitialResults();
LOG("EXECUTE 1 STOP");
SelectResultsIterator iter = results->getIterator();
char buf[100];
int count = results->size();
sprintf(buf, "results size=%d", count);
LOG(buf);
}
catch(const Exception& excp)
{
std::string logmsg = "";
logmsg += excp.getName();
logmsg += ": ";
logmsg += excp.getMessage();
LOG(logmsg.c_str());
excp.printStackTrace();
}
LOG( "StepThree complete." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, StepTwo2)
{
RegionPtr regPtr0 = getHelper()->getRegion(regionNamesCq[0]);
RegionPtr subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
QueryHelper * qh = &QueryHelper::getHelper();
qh->populatePortfolioData(regPtr0 , 140, 30, 20);
qh->populatePositionData(subregPtr0, 140, 30);
CacheablePtr port = NULLPTR;
for(int i=1; i < 140; i++)
{
if( !m_isPdx ){
port = CacheablePtr(new Portfolio(i, 20));
}else{
port = CacheablePtr(new PortfolioPdx(i, 20));
}
CacheableKeyPtr keyport = CacheableKey::create("port1-1");
regPtr0->put(keyport, port);
SLEEP(10); // sleep a while to allow server query to complete
}
LOG( "StepTwo2 complete." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1Down )
{
getHelper()->disconnect(true);
cleanProc();
LOG( "Clnt1Down complete: Keepalive = True" );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, Client2Down )
{
getHelper()->disconnect(true);
cleanProc();
LOG( "Clnt2Down complete: Keepalive = True" );
}
END_TASK_DEFINITION
void client1Up(bool pool = false, bool locator = false )
{
//No RegisterIntrest again
initClientWithId(pool, 0, true);
if ( pool ) {
createRegionForCQ( pool, locator, regionNamesCq[0], USE_ACK, endPoints, true);
}
LOG( "Client1Up complete." );
QueryHelper * qh = &QueryHelper::getHelper();
QueryServicePtr qs;
if (pool) {
qs = PoolManager::find(regionNamesCq[0])->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
CqListenerPtr cqLstner (new MyCqListener());
cqFac.addCqListener(cqLstner);
CqAttributesPtr cqAttr = cqFac.create();
const char* qryStr = "select * from /Portfolios p where p.ID < 3";
CqQueryPtr qry = qs->newCq(cqName, qryStr, cqAttr);
try
{
LOG("EXECUTE 1 START");
qry->execute();
LOG("EXECUTE 1 STOP");
}
catch(const Exception& excp)
{
std::string logmsg = "";
logmsg += excp.getName();
logmsg += ": ";
logmsg += excp.getMessage();
LOG(logmsg.c_str());
excp.printStackTrace();
}
try {
getHelper()->cachePtr->readyForEvents();
}catch(...) {
LOG("Exception occured while sending readyForEvents");
}
}
void client1UpDurableCQList(bool pool = false, bool locator = false )
{
//No RegisterIntrest again
initClientWithId(pool, 0, true);
if ( pool ) {
createRegionForCQ( pool, locator, regionNamesCq[0], USE_ACK, endPoints, true);
}
LOG( "Client1Up complete." );
QueryHelper * qh = &QueryHelper::getHelper();
}
void client2UpDurableCQList(bool pool = false, bool locator = false )
{
//No RegisterIntrest again
initClientWithId(pool, 1, true);
if ( pool ) {
createRegionForCQ( pool, locator, regionNamesCq[0], USE_ACK, endPoints, true);
}
LOG( "Client2Up complete." );
QueryHelper * qh = &QueryHelper::getHelper();
}
DUNIT_TASK_DEFINITION(CLIENT1, Client1Up )
{
client1Up();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1Up_Pool )
{
client1Up( true, true );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1Up_Pool_EP )
{
client1Up( true, false );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1UpDurableCQList )
{
client1UpDurableCQList();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1UpDurableCQList_Pool )
{
client1UpDurableCQList( true, true );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1UpDurableCQList_Pool_EP )
{
client1UpDurableCQList( true, false );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, Client2UpDurableCQList )
{
client2UpDurableCQList();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, Client2UpDurableCQList_Pool )
{
client2UpDurableCQList( true, true );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, Client2UpDurableCQList_Pool_EP )
{
client2UpDurableCQList( true, false );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepFour)
{
QueryHelper * qh = &QueryHelper::getHelper();
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
char buf[1024];
try {
//TEST_COVERAGE
LOGINFO("CLIENT-1 StepFour: verifying getCq() behaviour for the invalid CQ Name");
CqQueryPtr invalidCqptr = qs->getCq("InValidCQ");
ASSERT(invalidCqptr == NULLPTR , "Cqptr must be NULL, as it getCq() is invoked on invalid CQ name");
/*if(invalidCqptr == NULLPTR){
LOGINFO("Testing getCq(InvalidName) :: invalidCqptr is NULLPTR");
}else{
LOGINFO("Testing getCq(InvalidName) :: invalidCqptr is NOT NULL");
}*/
CqQueryPtr cqy = qs->getCq(cqName);
cqy->stop();
SLEEP(1500); // sleep 0.025 min to allow server stop query to complete
CqStatisticsPtr cqStats = cqy->getStatistics();
sprintf(buf, "numInserts[%d], numDeletes[%d], numUpdates[%d], numEvents[%d]", cqStats->numInserts(), cqStats->numDeletes(), cqStats->numUpdates(), cqStats->numEvents());
LOG(buf);
sprintf(buf, "MyCount:onEventCount=%d, onErrorCount=%d", onEventCount, onErrorCount);
LOG(buf);
ASSERT( cqStats->numEvents()>0, "stats incorrect!" );
cqy->close();
}
catch(Exception& excp)
{
std::string failmsg = "";
failmsg += excp.getName();
failmsg += ": ";
failmsg += excp.getMessage();
LOG(failmsg.c_str());
FAIL(failmsg.c_str());
excp.printStackTrace();
}
LOG( "StepFour complete." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1,CloseCache1)
{
LOG("cleanProc 1...");
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2,CloseCache2)
{
LOG("cleanProc 2...");
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1,CloseServer1)
{
LOG("closing Server1...");
if ( isLocalServer ) {
CacheHelper::closeServer( 1 );
LOG("SERVER1 stopped");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1,CloseLocator)
{
if ( isLocator ) {
CacheHelper::closeLocator( 1 );
LOG("Locator1 stopped");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, SetPortfolioTypeToPdx)
{
m_isPdx = true;
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, UnsetPortfolioTypeToPdx)
{
m_isPdx = false;
}
END_TASK_DEFINITION
bool isDurableCQName(const char*cqName, int clientID, bool isRecycled)
{
bool bRetVal =false;
int i=0;
if(clientID ==1) {
if(!isRecycled){
for(i=0;i<4;i++)
if(strcmp(cqName,durableCQNamesClient1[i]) == 0)
break;
if(i<4) bRetVal = true;
}
else{
for(i=0;i<8;i++)
if(strcmp(cqName,durableCQNamesClient1[i]) == 0)
break;
if(i<8)
bRetVal = true;
}
}
else if(clientID ==2){
if(!isRecycled){
for(i=0;i<4;i++)
if(strcmp(cqName,durableCQNamesClient2[i]) == 0)
break;
if(i<4)
bRetVal = true;
}
else{
for(i=0;i<8;i++)
if(strcmp(cqName,durableCQNamesClient2[i]) == 0)
break;
if(i<8)
bRetVal = true;
}
}
return bRetVal;
}
void doThinClientCqDurable( bool poolConfig = false, bool poolLocators = false )
{
if (poolConfig && poolLocators) {
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
} else {
CALL_TASK(CreateServer1);
}
if (poolConfig) {
if (poolLocators) {
CALL_TASK(StepOne_PoolLocator);
CALL_TASK(StepOne2_PoolLocator);
} else {
CALL_TASK(StepOne_PoolEP);
CALL_TASK(StepOne2_PoolEP);
}
} else {
CALL_TASK(StepOne);
CALL_TASK(StepOne2);
}
CALL_TASK(StepTwo);
CALL_TASK(StepThree);
CALL_TASK(StepTwo2);
CALL_TASK(Client1Down);
if (poolConfig) {
if ( poolLocators ) {
CALL_TASK( Client1Up_Pool );
}
else {
CALL_TASK( Client1Up_Pool_EP );
}
} else {
CALL_TASK(Client1Up);
}
CALL_TASK(StepFour);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer1);
if (poolConfig && poolLocators) {
CALL_TASK(CloseLocator);
}
}
DUNIT_TASK_DEFINITION(CLIENT1, RegisterCqs1)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
CqAttributesPtr cqAttr = cqFac.create();
qs->newCq(durableCQNamesClient1[0], "select * from /Portfolios p where p.ID < 3", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient1[1], "select * from /Portfolios p where p.ID > 5", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient1[2], "select * from /Portfolios p where p.ID > 10", cqAttr, false)->executeWithInitialResults();
qs->newCq(durableCQNamesClient1[3], "select * from /Portfolios p where p.ID = 0", cqAttr, false)->executeWithInitialResults();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, RegisterCqsAfterClientup1)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
CqAttributesPtr cqAttr = cqFac.create();
qs->newCq(durableCQNamesClient1[4], "select * from /Portfolios p where p.ID < 3", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient1[5], "select * from /Portfolios p where p.ID > 5", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient1[6], "select * from /Portfolios p where p.ID > 10", cqAttr, false)->executeWithInitialResults();
qs->newCq(durableCQNamesClient1[7], "select * from /Portfolios p where p.ID = 0", cqAttr, false)->executeWithInitialResults();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, RegisterCqs2)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
CqAttributesPtr cqAttr = cqFac.create();
qs->newCq(durableCQNamesClient2[0], "select * from /Portfolios p where p.ID < 3", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient2[1], "select * from /Portfolios p where p.ID > 5", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient2[2], "select * from /Portfolios p where p.ID > 10", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient2[3], "select * from /Portfolios p where p.ID = 5", cqAttr, true)->executeWithInitialResults();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, RegisterCqsAfterClientup2)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CqAttributesFactory cqFac;
CqAttributesPtr cqAttr = cqFac.create();
qs->newCq(durableCQNamesClient2[4], "select * from /Portfolios p where p.ID < 3", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient2[5], "select * from /Portfolios p where p.ID > 5", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient2[6], "select * from /Portfolios p where p.ID > 10", cqAttr, true)->executeWithInitialResults();
qs->newCq(durableCQNamesClient2[7], "select * from /Portfolios p where p.ID = 5", cqAttr, true)->executeWithInitialResults();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, VerifyCqs1)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CacheableArrayListPtr durableCqListPtr = qs->getAllDurableCqsFromServer();
ASSERT( durableCqListPtr != NULLPTR, "Durable CQ List should not be null" );
ASSERT( durableCqListPtr->length() == 2, "Durable CQ List lenght should be 2" );
ASSERT(isDurableCQName(durableCqListPtr->at(0)->toString()->asChar(),1,false),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(1)->toString()->asChar(),1,false),"Durable CQ name should be in the durable cq list");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, VerifyCqsAfterClientup1)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CacheableArrayListPtr durableCqListPtr = qs->getAllDurableCqsFromServer();
ASSERT( durableCqListPtr != NULLPTR, "Durable CQ List should not be null" );
ASSERT( durableCqListPtr->length() == 4, "Durable CQ List length should be 4" );
ASSERT(isDurableCQName(durableCqListPtr->at(0)->toString()->asChar(),1,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(1)->toString()->asChar(),1,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(2)->toString()->asChar(),1,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(3)->toString()->asChar(),1,true),"Durable CQ name should be in the durable cq list");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, VerifyCqs2)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CacheableArrayListPtr durableCqListPtr = qs->getAllDurableCqsFromServer();
ASSERT( durableCqListPtr != NULLPTR, "Durable CQ List should not be null" );
ASSERT( durableCqListPtr->length() == 4, "Durable CQ List lenght should be 4" );
ASSERT(isDurableCQName(durableCqListPtr->at(0)->toString()->asChar(),2,false),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(1)->toString()->asChar(),2,false),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(2)->toString()->asChar(),2,false),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(3)->toString()->asChar(),2,false),"Durable CQ name should be in the durable cq list");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, VerifyCqsAfterClientup2)
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CacheableArrayListPtr durableCqListPtr = qs->getAllDurableCqsFromServer();
ASSERT( durableCqListPtr != NULLPTR, "Durable CQ List should not be null" );
ASSERT( durableCqListPtr->length() == 8, "Durable CQ List lenght should be 8" );
ASSERT(isDurableCQName(durableCqListPtr->at(0)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(1)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(2)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(3)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(4)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(5)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(6)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
ASSERT(isDurableCQName(durableCqListPtr->at(7)->toString()->asChar(),2,true),"Durable CQ name should be in the durable cq list");
}
END_TASK_DEFINITION
void verifyEmptyDurableCQList()
{
PoolPtr pool = PoolManager::find(regionNamesCq[0]);
QueryServicePtr qs;
if (pool != NULLPTR) {
qs = pool->getQueryService();
} else {
qs = getHelper()->cachePtr->getQueryService();
}
CacheableArrayListPtr durableCqListPtr = qs->getAllDurableCqsFromServer();
ASSERT( durableCqListPtr == NULLPTR, "Durable CQ List should be null" );
}
DUNIT_TASK_DEFINITION(CLIENT1, VerifyEmptyDurableCQList1)
{
verifyEmptyDurableCQList();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, VerifyEmptyDurableCQList2)
{
verifyEmptyDurableCQList();
}
END_TASK_DEFINITION
void getDurableCQsFromServerEmptyList( bool poolConfig = false, bool poolLocators = false )
{
if (poolConfig && poolLocators){
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
}else{
CALL_TASK(CreateServer1);
}
if (poolConfig){
if (poolLocators){
CALL_TASK(StepOne_PoolLocator);
CALL_TASK(StepOne2_PoolLocator);
} else {
CALL_TASK(StepOne_PoolEP);
CALL_TASK(StepOne2_PoolEP);
}
}else{
CALL_TASK(StepOne);
CALL_TASK(StepOne2);
}
CALL_TASK(VerifyEmptyDurableCQList1);
CALL_TASK(VerifyEmptyDurableCQList2);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer1);
if (poolConfig && poolLocators) {
CALL_TASK(CloseLocator);
}
}
void getDurableCQsFromServer( bool poolConfig = false, bool poolLocators = false )
{
if (poolConfig && poolLocators){
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
}else{
CALL_TASK(CreateServer1);
}
if (poolConfig){
if (poolLocators){
CALL_TASK(StepOne_PoolLocator);
CALL_TASK(StepOne2_PoolLocator);
} else {
CALL_TASK(StepOne_PoolEP);
CALL_TASK(StepOne2_PoolEP);
}
}else{
CALL_TASK(StepOne);
CALL_TASK(StepOne2);
}
CALL_TASK(RegisterCqs1);
CALL_TASK(RegisterCqs2);
CALL_TASK(VerifyCqs1);
CALL_TASK(VerifyCqs2);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer1);
if (poolConfig && poolLocators) {
CALL_TASK(CloseLocator);
}
}
void getDurableCQsFromServerWithCyclicClients( bool poolConfig = false, bool poolLocators = false )
{
if (poolConfig && poolLocators){
CALL_TASK(CreateLocator);
CALL_TASK(CreateServer1_Locator);
}else{
CALL_TASK(CreateServer1);
}
if (poolConfig){
if (poolLocators){
CALL_TASK(StepOne_PoolLocator);
CALL_TASK(StepOne2_PoolLocator);
} else {
CALL_TASK(StepOne_PoolEP);
CALL_TASK(StepOne2_PoolEP);
}
}else{
CALL_TASK(StepOne);
CALL_TASK(StepOne2);
}
CALL_TASK(RegisterCqs1);
CALL_TASK(RegisterCqs2);
CALL_TASK(VerifyCqs1);
CALL_TASK(VerifyCqs2);
CALL_TASK(Client1Down);
CALL_TASK(Client2Down);
if (poolConfig) {
if ( poolLocators ) {
CALL_TASK( Client1UpDurableCQList_Pool );
CALL_TASK( Client2UpDurableCQList_Pool );
}
else {
CALL_TASK( Client1UpDurableCQList_Pool_EP );
CALL_TASK( Client2UpDurableCQList_Pool_EP );
}
} else {
CALL_TASK(Client1UpDurableCQList);
CALL_TASK(Client2UpDurableCQList);
}
CALL_TASK(RegisterCqsAfterClientup1);
CALL_TASK(RegisterCqsAfterClientup2);
CALL_TASK(VerifyCqsAfterClientup1);
CALL_TASK(VerifyCqsAfterClientup2);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer1);
if (poolConfig && poolLocators) {
CALL_TASK(CloseLocator);
}
}
void setPortfolioPdxType(){
CALL_TASK(SetPortfolioTypeToPdx)
}
void UnsetPortfolioType(){
CALL_TASK(UnsetPortfolioTypeToPdx)
}
void doThinClientCqDurable1()
{
CALL_TASK(CreateServer);
//First Run of Durable Client
CALL_TASK(RunDurableClient);
//Intermediate Feeder, feeding events
CALL_TASK(RunFeeder);
//Reconnect Durable Client
CALL_TASK(RunDurableClient);
//Intermediate Feeder, feeding events again
CALL_TASK(RunFeeder1);
//Reconnect Durable Client again
CALL_TASK(RunDurableClient);
//Verify we get 20 events
CALL_TASK(VerifyEvents);
CALL_TASK(CloseServer1);
}
DUNIT_MAIN
{
//doThinClientCqDurable(); // normal case: pool == false, locators == false
UnsetPortfolioType();
for(int runIdx = 1; runIdx <=2; ++runIdx)
{
doThinClientCqDurable(true); // pool-with-endpoints case: pool == true, locators == false
doThinClientCqDurable(true, true); // pool-with-locator case: pool == true, locators == true
setPortfolioPdxType();
getDurableCQsFromServerEmptyList(true); // pool-with-endpoints case: pool == true, locators == false
getDurableCQsFromServerEmptyList(true, true); // pool-with-locator case: pool == true, locators == true
getDurableCQsFromServer(true); // pool-with-endpoints case: pool == true, locators == false
getDurableCQsFromServer(true, true); // pool-with-locator case: pool == true, locators == true
getDurableCQsFromServerWithCyclicClients(true); // pool-with-endpoints case: pool == true, locators == false
getDurableCQsFromServerWithCyclicClients(true, true); // pool-with-locator case: pool == true, locators == true
}
doThinClientCqDurable1();
}
END_MAIN