blob: e268714f8b479c58f80dde2d60dbea55f756c812 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* testThinClientCqDelta.cpp
*
* Created on: Sept 17, 2009
* Author: abhaware
*/
#include "testobject/DeltaTestImpl.hpp"
#include "fw_dunit.hpp"
#include <string>
#include "CacheHelper.hpp"
#include <gfcpp/CqAttributesFactory.hpp>
#include <gfcpp/CqAttributes.hpp>
#include <gfcpp/CqListener.hpp>
#include <gfcpp/CqQuery.hpp>
#include <gfcpp/CqServiceStatistics.hpp>
using namespace apache::geode::client;
using namespace test;
using namespace testobject;
CacheHelper* cacheHelper = NULL;
#include "locator_globals.hpp"
#define CLIENT1 s1p1
#define CLIENT2 s1p2
#define SERVER1 s2p1
#include "LocatorHelper.hpp"
class CqDeltaListener : public CqListener {
public:
CqDeltaListener() : m_deltaCount(0), m_valueCount(0) {}
virtual void onEvent(const CqEvent& aCqEvent) {
CacheableBytesPtr deltaValue = aCqEvent.getDeltaValue();
DeltaTestImpl newValue;
DataInput input(deltaValue->value(), deltaValue->length());
newValue.fromDelta(input);
if (newValue.getIntVar() == 5) {
m_deltaCount++;
}
DeltaTestImplPtr dptr =
staticCast<DeltaTestImplPtr>(aCqEvent.getNewValue());
if (dptr->getIntVar() == 5) {
m_valueCount++;
}
}
int getDeltaCount() { return m_deltaCount; }
int getValueCount() { return m_valueCount; }
private:
int m_deltaCount;
int m_valueCount;
};
typedef SharedPtr<CqDeltaListener> CqDeltaListenerPtr;
CqDeltaListenerPtr g_CqListener;
void initClient(const bool isthinClient) {
if (cacheHelper == NULL) {
cacheHelper = new CacheHelper(isthinClient);
}
ASSERT(cacheHelper, "Failed to create a CacheHelper client instance.");
}
void initClientNoPools() {
cacheHelper = new CacheHelper(0);
ASSERT(cacheHelper, "Failed to create a CacheHelper client instance.");
}
void cleanProc() {
if (cacheHelper != NULL) {
delete cacheHelper;
cacheHelper = NULL;
}
}
CacheHelper* getHelper() {
ASSERT(cacheHelper != NULL, "No cacheHelper initialized.");
return cacheHelper;
}
void createPooledRegion(const char* name, bool ackMode, const char* locators,
const char* poolname,
bool clientNotificationEnabled = false,
bool cachingEnable = true) {
LOG("createRegion_Pool() entered.");
fprintf(stdout, "Creating region -- %s ackMode is %d\n", name, ackMode);
fflush(stdout);
RegionPtr regPtr =
getHelper()->createPooledRegion(name, ackMode, locators, poolname,
cachingEnable, clientNotificationEnabled);
ASSERT(regPtr != NULLPTR, "Failed to create region.");
LOG("Pooled Region created.");
}
void createPooledLRURegion(const char* name, bool ackMode, const char* locators,
const char* poolname,
bool clientNotificationEnabled = false,
bool cachingEnable = true) {
LOG(" createPooledLRURegion entered");
RegionPtr regPtr = getHelper()->createPooledRegionDiscOverFlow(
name, ackMode, locators, poolname, cachingEnable,
clientNotificationEnabled, 0, 0, 0, 0, 3 /*LruLimit = 3*/);
LOG(" createPooledLRURegion exited");
}
void createRegion(const char* name, bool ackMode,
bool clientNotificationEnabled = false) {
LOG("createRegion() entered.");
fprintf(stdout, "Creating region -- %s ackMode is %d\n", name, ackMode);
fflush(stdout);
// ack, caching
RegionPtr regPtr = getHelper()->createRegion(name, ackMode, true, NULLPTR,
clientNotificationEnabled);
ASSERT(regPtr != NULLPTR, "Failed to create region.");
LOG("Region created.");
}
const char* keys[] = {"Key-1", "Key-2", "Key-3", "Key-4"};
const char* regionNames[] = {"DistRegionAck", "DistRegionAck1"};
const bool USE_ACK = true;
const bool NO_ACK ATTR_UNUSED = false;
DUNIT_TASK_DEFINITION(CLIENT1, CreateClient1)
{
initClient(true);
createPooledRegion(regionNames[0], USE_ACK, locatorsG, "__TESTPOOL1_",
true);
try {
Serializable::registerType(DeltaTestImpl::create);
} catch (IllegalStateException&) {
// ignore exception caused by type reregistration.
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, CreateClient2)
{
initClient(true);
createPooledRegion(regionNames[0], USE_ACK, locatorsG, "__TESTPOOL1_",
true);
try {
Serializable::registerType(DeltaTestImpl::create);
} catch (IllegalStateException&) {
// ignore exception caused by type reregistration.
}
RegionPtr regPtr = getHelper()->getRegion(regionNames[0]);
PoolPtr pool = PoolManager::find("__TESTPOOL1_");
QueryServicePtr qs;
qs = pool->getQueryService();
CqAttributesFactory cqFac;
g_CqListener = new CqDeltaListener();
CqListenerPtr cqListener = g_CqListener;
cqFac.addCqListener(cqListener);
CqAttributesPtr cqAttr = cqFac.create();
CqQueryPtr qry =
qs->newCq("Cq_with_delta",
"select * from /DistRegionAck d where d.intVar > 4", cqAttr);
qs->executeCqs();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CreateClient1_NoPools)
{
initClientNoPools();
createRegion(regionNames[0], USE_ACK, true);
try {
Serializable::registerType(DeltaTestImpl::create);
} catch (IllegalStateException&) {
// ignore exception caused by type reregistration.
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, CreateClient2_NoPools)
{
initClientNoPools();
createRegion(regionNames[0], USE_ACK, true);
try {
Serializable::registerType(DeltaTestImpl::create);
} catch (IllegalStateException&) {
// ignore exception caused by type reregistration.
}
RegionPtr regPtr = getHelper()->getRegion(regionNames[0]);
QueryServicePtr qs;
qs = getHelper()->getQueryService();
CqAttributesFactory cqFac;
g_CqListener = new CqDeltaListener();
CqListenerPtr cqListener = g_CqListener;
cqFac.addCqListener(cqListener);
CqAttributesPtr cqAttr = cqFac.create();
CqQueryPtr qry =
qs->newCq("Cq_with_delta",
"select * from /DistRegionAck d where d.intVar > 4", cqAttr);
qs->executeCqs();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1_Put)
{
CacheableKeyPtr keyPtr = createKey(keys[0]);
DeltaTestImplPtr dptr(new DeltaTestImpl());
CacheablePtr valPtr(dptr);
RegionPtr regPtr = getHelper()->getRegion(regionNames[0]);
regPtr->put(keyPtr, valPtr);
dptr->setIntVar(5);
dptr->setDelta(true);
regPtr->put(keyPtr, valPtr);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, Client2_VerifyDelta)
{
// Wait for notification
SLEEP(5000);
ASSERT(g_CqListener->getDeltaCount() == 1,
"Delta from CQ event does not have expected value");
ASSERT(g_CqListener->getValueCount() == 1,
"Value from CQ event is incorrect");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CloseCache1)
{ cleanProc(); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, CloseCache2)
{ cleanProc(); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CloseServer1)
{
if (isLocalServer) {
CacheHelper::closeServer(1);
LOG("SERVER1 stopped");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1_ForCqDelta)
{
// starting servers
if (isLocalServer) {
CacheHelper::initServer(1, "cacheserver_with_delta_test_impl.xml",
locatorsG);
}
}
END_TASK_DEFINITION
DUNIT_MAIN
{
CALL_TASK(CreateLocator1);
CALL_TASK(CreateServer1_ForCqDelta)
CALL_TASK(CreateClient1);
CALL_TASK(CreateClient2);
CALL_TASK(Client1_Put);
CALL_TASK(Client2_VerifyDelta);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer1);
CALL_TASK(CloseLocator1);
}
END_MAIN