blob: 52309eab1859984538a2d12dfcbcd8ac7cbaf85e [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#include "fw_dunit.hpp"
#include <gfcpp/GemfireCppCache.hpp>
#include <ace/High_Res_Timer.h>
#include <ace/OS.h>
#include <string>
#define ROOT_NAME "DistOps"
#define ROOT_SCOPE DISTRIBUTED_ACK
#include "CacheHelper.hpp"
using namespace gemfire;
using namespace test;
class DupChecker : public CacheListener
{
int m_ops;
HashMapOfCacheable m_map;
void check(const EntryEvent& event)
{
m_ops++;
CacheableKeyPtr key = event.getKey();
CacheableInt32Ptr value = dynCast<CacheableInt32Ptr>(event.getNewValue());
HashMapOfCacheable::Iterator item = m_map.find(key);
if (item != m_map.end())
{
CacheableInt32Ptr check = dynCast<CacheableInt32Ptr>(item.second());
ASSERT(check->value() + 1 == value->value(), "Duplicate or older value received");
m_map.update(key, value);
}
else
{
m_map.insert(key, value);
}
}
public:
DupChecker():m_ops(0) {}
~DupChecker()
{
m_map.clear();
}
void validate()
{
ASSERT(m_map.size() == 4, "Expected 4 keys for the region");
ASSERT(m_ops == 400, "Expected 400 events (100 per key) for the region");
for (HashMapOfCacheable::Iterator item = m_map.begin(); item != m_map.end(); item++)
{
CacheableInt32Ptr check = dynCast<CacheableInt32Ptr>(item.second());
ASSERT(check->value() == 100, "Expected final value to be 100");
}
}
virtual void afterCreate( const EntryEvent& event )
{
check(event);
}
virtual void afterUpdate( const EntryEvent& event )
{
check(event);
}
virtual void afterRegionInvalidate( const RegionEvent& event ) {};
virtual void afterRegionDestroy( const RegionEvent& event ) {};
};
typedef SharedPtr<DupChecker> DupCheckerPtr;
///////////////////////////////////////////////////////
#define CLIENT1 s1p1
#define CLIENT2 s1p2
#define SERVER1 s2p1
#define SERVER2 s2p2
CacheHelper* cacheHelper = NULL;
static bool isLocalServer = false;
static bool isLocator = false;
static int numberOfLocators = 1;
const char * endPoints = CacheHelper::getTcrEndpoints( isLocalServer, 1 );
const char* locatorsG = CacheHelper::getLocatorHostPort( isLocator, numberOfLocators);
bool g_poolConfig = false;
bool g_poolLocators = false;
int g_redundancyLevel = 0;
void initClient( int redundancyLevel )
{
PropertiesPtr props = Properties::create();
props->insert("notify-ack-interval", 3600); // set to 1 hr.
props->insert("notify-dupcheck-life", 3600);
if ( cacheHelper == NULL ) {
cacheHelper = new CacheHelper( endPoints, redundancyLevel, props );
}
g_redundancyLevel = redundancyLevel;
ASSERT( cacheHelper, "Failed to create a CacheHelper client instance." );
}
void initClient( )
{
PropertiesPtr props = Properties::create();
props->insert("notify-ack-interval", 3600); // set to 1 hr.
props->insert("notify-dupcheck-life", 3600);
if ( cacheHelper == NULL ) {
cacheHelper = new CacheHelper( true , props );
}
ASSERT( cacheHelper, "Failed to create a CacheHelper client instance." );
}
void cleanProc()
{
if ( cacheHelper != NULL ) {
delete cacheHelper;
cacheHelper = NULL;
}
}
CacheHelper * getHelper()
{
ASSERT( cacheHelper != NULL, "No cacheHelper initialized." );
return cacheHelper;
}
void _verifyEntry( const char * name, const char * key, const char * val, bool noKey, bool isCreated = false )
{
// Verify key and value exist in this region, in this process.
const char * value = ( val == 0 ) ? "" : val;
char * buf = (char *)malloc( 1024 + strlen( key ) + strlen( value ));
ASSERT( buf, "Unable to malloc buffer for logging." );
if ( !isCreated ) {
if ( noKey )
sprintf( buf, "Verify key %s does not exist in region %s", key, name );
else if ( val == 0 )
sprintf( buf, "Verify value for key %s does not exist in region %s", key, name );
else
sprintf( buf, "Verify value for key %s is: %s in region %s", key, value, name );
LOG( buf );
}
free(buf);
RegionPtr regPtr = getHelper()->getRegion( name );
ASSERT( regPtr != NULLPTR, "Region not found." );
CacheableKeyPtr keyPtr = createKey( key );
// if the region is no ack, then we may need to wait...
if ( !isCreated ) {
if ( noKey == false ) { // need to find the key!
ASSERT( regPtr->containsKey( keyPtr ), "Key not found in region." );
}
if ( val != NULL ) { // need to have a value!
//ASSERT( regPtr->containsValueForKey( keyPtr ), "Value not found in region." );
}
}
// loop up to MAX times, testing condition
uint32_t MAX = 100;
// ARB: changed sleep from 10 ms
uint32_t SLEEP = 10; // milliseconds
uint32_t containsKeyCnt = 0;
uint32_t containsValueCnt = 0;
uint32_t testValueCnt = 0;
for ( int i = MAX; i >= 0; i-- ) {
if ( isCreated ) {
if ( !regPtr->containsKey( keyPtr ) )
containsKeyCnt++;
else
break;
ASSERT( containsKeyCnt < MAX, "Key has not been created in region." );
}
else {
if ( noKey ) {
if ( regPtr->containsKey( keyPtr ) )
containsKeyCnt++;
else
break;
ASSERT( containsKeyCnt < MAX, "Key found in region." );
}
if ( val == NULL ) {
if ( regPtr->containsValueForKey( keyPtr ) )
containsValueCnt++;
else
break;
ASSERT( containsValueCnt < MAX, "Value found in region." );
}
if ( val != NULL ) {
CacheableStringPtr checkPtr = dynCast<CacheableStringPtr>( regPtr->get( keyPtr ) );
ASSERT( checkPtr != NULLPTR, "Value Ptr should not be null." );
char buf[1024];
sprintf( buf, "In verify loop, get returned %s for key %s", checkPtr->asChar(), key );
LOG( buf );
if ( strcmp( checkPtr->asChar(), value ) != 0 ){
testValueCnt++;
}else{
break;
}
ASSERT( testValueCnt < MAX, "Incorrect value found." );
}
}
dunit::sleep( SLEEP );
}
}
void _verifyIntEntry( const char * name, const char * key, const int val, bool noKey, bool isCreated = false )
{
// Verify key and value exist in this region, in this process.
int value = val;
char * buf = (char *)malloc( 1024 + strlen( key ) + 20);
ASSERT( buf, "Unable to malloc buffer for logging." );
if ( !isCreated ) {
if ( noKey )
sprintf( buf, "Verify key %s does not exist in region %s", key, name );
else if ( val == 0 )
sprintf( buf, "Verify value for key %s does not exist in region %s", key, name );
else
sprintf( buf, "Verify value for key %s is: %d in region %s", key, value, name );
LOG( buf );
}
free(buf);
RegionPtr regPtr = getHelper()->getRegion( name );
ASSERT( regPtr != NULLPTR, "Region not found." );
CacheableKeyPtr keyPtr = createKey( key );
// if the region is no ack, then we may need to wait...
if ( !isCreated ) {
if ( noKey == false ) { // need to find the key!
ASSERT( regPtr->containsKey( keyPtr ), "Key not found in region." );
}
if ( val != 0 ) { // need to have a value!
//ASSERT( regPtr->containsValueForKey( keyPtr ), "Value not found in region." );
}
}
// loop up to MAX times, testing condition
uint32_t MAX = 100;
// ARB: changed sleep from 10 ms
uint32_t SLEEP = 10; // milliseconds
uint32_t containsKeyCnt = 0;
uint32_t containsValueCnt = 0;
uint32_t testValueCnt = 0;
for ( int i = MAX; i >= 0; i-- ) {
if ( isCreated ) {
if ( !regPtr->containsKey( keyPtr ) )
containsKeyCnt++;
else
break;
ASSERT( containsKeyCnt < MAX, "Key has not been created in region." );
}
else {
if ( noKey ) {
if ( regPtr->containsKey( keyPtr ) )
containsKeyCnt++;
else
break;
ASSERT( containsKeyCnt < MAX, "Key found in region." );
}
if ( val == 0 ) {
if ( regPtr->containsValueForKey( keyPtr ) )
containsValueCnt++;
else
break;
ASSERT( containsValueCnt < MAX, "Value found in region." );
}
if ( val != 0 ) {
CacheableInt32Ptr checkPtr = dynCast<CacheableInt32Ptr>( regPtr->get( keyPtr ) );
ASSERT( checkPtr != NULLPTR, "Value Ptr should not be null." );
char buf[1024];
sprintf( buf, "In verify loop, get returned %d for key %s", checkPtr->value(), key );
LOG( buf );
//if ( strcmp( checkPtr->asChar(), value ) != 0 ){
if ( checkPtr->value() != value) {
testValueCnt++;
}else{
break;
}
ASSERT( testValueCnt < MAX, "Incorrect value found." );
}
}
dunit::sleep( SLEEP );
}
}
#define verifyEntry( x, y, z ) _verifyEntry( x, y, z, __LINE__ )
void _verifyEntry( const char * name, const char * key, const char * val, int line )
{
char logmsg[1024];
sprintf( logmsg, "verifyEntry() called from %d.\n", line );
LOG( logmsg );
_verifyEntry( name, key, val, false );
LOG( "Entry verified." );
}
#define verifyIntEntry( x, y, z ) _verifyIntEntry( x, y, z, __LINE__ )
void _verifyIntEntry( const char * name, const char * key, const int val, int line )
{
char logmsg[1024];
sprintf( logmsg, "verifyIntEntry() called from %d.\n", line );
LOG( logmsg );
_verifyIntEntry( name, key, val, false );
LOG( "Entry verified." );
}
#define verifyCreated( x, y ) _verifyCreated( x, y, __LINE__ )
void _verifyCreated( const char *name, const char *key, int line )
{
char logmsg[1024];
sprintf( logmsg, "verifyCreated() called from %d.\n", line );
LOG( logmsg );
_verifyEntry( name, key, NULL, false, true );
LOG( "Entry created." );
}
void createRegion( const char* name, bool ackMode, bool clientNotificationEnabled = true )
{
LOG( "createRegion() entered." );
fprintf( stdout, "Creating region -- %s ackMode is %d\n", name, ackMode );
fflush( stdout );
char* endpoints = NULL;
// ack, caching
RegionPtr regPtr = getHelper()->createRegion(name, ackMode, true,
NULLPTR, endpoints, clientNotificationEnabled);
ASSERT( regPtr != NULLPTR, "Failed to create region." );
LOG( "Region created." );
}
void createEntry( const char * name, const char * key, const char * value )
{
LOG( "createEntry() entered." );
fprintf( stdout, "Creating entry -- key: %s value: %s in region %s\n", key, value, name );
fflush( stdout );
// Create entry, verify entry is correct
CacheableKeyPtr keyPtr = createKey( key );
CacheableStringPtr valPtr = CacheableString::create( value );
RegionPtr regPtr = getHelper()->getRegion( name );
ASSERT( regPtr != NULLPTR, "Region not found." );
ASSERT( !regPtr->containsKey( keyPtr ), "Key should not have been found in region." );
ASSERT( !regPtr->containsValueForKey( keyPtr ), "Value should not have been found in region." );
//regPtr->create( keyPtr, valPtr );
regPtr->put( keyPtr, valPtr );
LOG( "Created entry." );
verifyEntry( name, key, value );
LOG( "Entry created." );
}
void createIntEntry( const char * name, const char * key, const int value )
{
LOG( "createEntry() entered." );
fprintf( stdout, "Creating entry -- key: %s value: %d in region %s\n", key, value, name );
fflush( stdout );
// Create entry, verify entry is correct
CacheableKeyPtr keyPtr = createKey( key );
CacheableInt32Ptr valPtr = CacheableInt32::create( value );
RegionPtr regPtr = getHelper()->getRegion( name );
ASSERT( regPtr != NULLPTR, "Region not found." );
//ASSERT( !regPtr->containsKey( keyPtr ), "Key should not have been found in region." );
//ASSERT( !regPtr->containsValueForKey( keyPtr ), "Value should not have been found in region." );
//regPtr->create( keyPtr, valPtr );
regPtr->put( keyPtr, valPtr );
LOG( "Created entry." );
verifyIntEntry( name, key, value );
LOG( "Entry created." );
}
void setCacheListener(const char *regName, DupCheckerPtr checker)
{
RegionPtr reg = getHelper()->getRegion(regName);
AttributesMutatorPtr attrMutator = reg->getAttributesMutator();
attrMutator->setCacheListener(checker);
}
const char * keys[] = { "Key-1", "Key-2", "Key-3", "Key-4" };
const char * vals[] = { "Value-1", "Value-2", "Value-3", "Value-4" };
const char * nvals[] = { "New Value-1", "New Value-2", "New Value-3", "New Value-4" };
const char * regionNames[] = { "DistRegionAck", "DistRegionNoAck" };
const bool USE_ACK = true;
const bool NO_ACK = false;
DupCheckerPtr checker1;
DupCheckerPtr checker2;
void initClientAndRegion( int redundancy, bool clientNotificationEnabled = true, const char* endpointsList = endPoints)
{
PropertiesPtr pp = Properties::create();
if ( g_poolConfig ) {
if (g_poolLocators) {
getHelper()->createPoolWithLocators("__TESTPOOL1_", locatorsG, clientNotificationEnabled, redundancy);
}
else {
getHelper()->createPoolWithEPs("__TESTPOOL1_", endpointsList, clientNotificationEnabled, redundancy);
}
getHelper()->createRegionAndAttachPool(regionNames[0], USE_ACK, "__TESTPOOL1_", true);
getHelper()->createRegionAndAttachPool(regionNames[1], NO_ACK, "__TESTPOOL1_", true);
}
else {
createRegion( regionNames[0], USE_ACK, clientNotificationEnabled );
createRegion( regionNames[1], NO_ACK, clientNotificationEnabled );
}
}
#include "LocatorHelper.hpp"
#include "ThinClientTasks_C2S2.hpp"
DUNIT_TASK_DEFINITION( SERVER1, CreateServer1 )
{
if ( isLocalServer )
CacheHelper::initServer( 1, "cacheserver_notify_subscription.xml" );
LOG( "SERVER1 started" );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( CLIENT1, InitClient1_R1 )
{
if(!g_poolConfig)
initClient( 1 );
else
initClient();
initClientAndRegion( 1, false );
LOG( "Initialized client with redundancy level 1." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( CLIENT2, InitClient2_R1 )
{
if( !g_poolConfig )
initClient( 1 );
else
initClient();
initClientAndRegion( 1 );
LOG( "Initialized client with redundancy level 1." );
checker1 = new DupChecker();
checker2 = new DupChecker();
setCacheListener(regionNames[0], checker1);
setCacheListener(regionNames[1], checker2);
try
{
getHelper()->getRegion(regionNames[0])->registerAllKeys();
} catch(...) {}
try
{
getHelper()->getRegion(regionNames[1])->registerAllKeys();
} catch(...) {}
LOG( "CreateRegionsC2 complete." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( SERVER2, CreateServer2 )
{
if ( isLocalServer )
CacheHelper::initServer( 2, "cacheserver_notify_subscription2.xml" );
LOG( "SERVER2 started" );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CreateEntries)
{
for (int value = 1; value <= 100; value++)
{
createIntEntry( regionNames[0], keys[0], value );
gemfire::millisleep(10);
createIntEntry( regionNames[0], keys[1], value );
gemfire::millisleep(10);
createIntEntry( regionNames[0], keys[2], value );
gemfire::millisleep(10);
createIntEntry( regionNames[0], keys[3], value );
gemfire::millisleep(10);
createIntEntry( regionNames[1], keys[0], value );
gemfire::millisleep(10);
createIntEntry( regionNames[1], keys[1], value );
gemfire::millisleep(10);
createIntEntry( regionNames[1], keys[2], value );
gemfire::millisleep(10);
createIntEntry( regionNames[1], keys[3], value );
gemfire::millisleep(10);
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( SERVER1, CloseServer1 )
{
if ( isLocalServer ) {
CacheHelper::closeServer( 1 );
LOG( "SERVER1 stopped" );
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( CLIENT2, CheckClient2 )
{
gemfire::millisleep( 30000 ); // wait 30 sec for notifications to complete
verifyIntEntry( regionNames[0], keys[0], 100 );
verifyIntEntry( regionNames[0], keys[1], 100 );
verifyIntEntry( regionNames[0], keys[2], 100 );
verifyIntEntry( regionNames[0], keys[3], 100 );
verifyIntEntry( regionNames[1], keys[0], 100 );
verifyIntEntry( regionNames[1], keys[1], 100 );
verifyIntEntry( regionNames[1], keys[2], 100 );
verifyIntEntry( regionNames[1], keys[3], 100 );
LOG("Validating checker1 cachelistener");
checker1->validate();
LOG("Validating checker2 cachelistener");
checker2->validate();
LOG( "CheckClient2 complete." );
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( CLIENT1, CloseClient1 )
{
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( CLIENT2, CloseClient2 )
{
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION( SERVER2, CloseServer2 )
{
if ( isLocalServer ) {
CacheHelper::closeServer( 2 );
LOG( "SERVER2 stopped" );
}
}
END_TASK_DEFINITION
void runEventIDMap( bool poolConfig = true , bool poolLocators = true )
{
initLocatorSettings( poolConfig, poolLocators );
if( poolConfig && poolLocators )
{
CALL_TASK( CreateLocator1 );
CALL_TASK( CreateServer1_With_Locator_XML );
}
else
{
CALL_TASK( CreateServer1 );
}
CALL_TASK( InitClient1_R1 );
CALL_TASK( InitClient2_R1 );
if( poolConfig && poolLocators )
{
CALL_TASK( CreateServer2_With_Locator_XML );
}
else
{
CALL_TASK( CreateServer2 );
}
CALL_TASK( CreateEntries );
CALL_TASK( CloseServer1 );
CALL_TASK( CheckClient2 );
CALL_TASK( CloseClient1 );
CALL_TASK( CloseClient2 );
CALL_TASK( CloseServer2 );
if( poolConfig && poolLocators)
CALL_TASK( CloseLocator1 );
}
DUNIT_MAIN
{
runEventIDMap( false );
runEventIDMap( true , true );
runEventIDMap( true , false ) ;
}
END_MAIN