blob: 2ce1f3a40a69f98c8e923cc678f1256ca10bb45c [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#include "VersionStamp.hpp"
#include <string>
#include "MemberListForVersionStamp.hpp"
#include "CacheImpl.hpp"
#include "RegionInternal.hpp"
#include "ThinClientRegion.hpp"
#include "ThinClientPoolDM.hpp"
using namespace gemfire;
void VersionStamp::setVersions(VersionTagPtr versionTag)
{
int32_t eVersion = versionTag->getEntryVersion();
m_entryVersionLowBytes = (uint16_t)(eVersion & 0xffff);
m_entryVersionHighByte = (uint8_t)((eVersion & 0xff0000) >> 16);
m_regionVersionHighBytes = versionTag->getRegionVersionHighBytes();
m_regionVersionLowBytes = versionTag->getRegionVersionLowBytes();
m_memberID = versionTag->getInternalMemID();
}
void VersionStamp::setVersions(VersionStamp& versionStamp)
{
m_entryVersionLowBytes = versionStamp.m_entryVersionLowBytes;
m_entryVersionHighByte = versionStamp.m_entryVersionHighByte;
m_regionVersionHighBytes = versionStamp.m_regionVersionHighBytes;
m_regionVersionLowBytes = versionStamp.m_regionVersionLowBytes;
m_memberID = versionStamp.m_memberID;
}
int32_t VersionStamp::getEntryVersion() {
return (m_entryVersionHighByte << 16) | m_entryVersionLowBytes;
}
int64_t VersionStamp::getRegionVersion() {
return (((int64_t)m_regionVersionHighBytes) << 32) | m_regionVersionLowBytes;
}
uint16_t VersionStamp::getMemberId() const {
return m_memberID;
}
// Processes version tag. Checks if there is a conflict with the existing version.
// Also checks if it is a delta update than it is based on the existing version.
// This is based on the basicprocessVersionTag function of AbstractRegionEntry.java
// Any change to the java function should be reflected here as well.
GfErrType VersionStamp::processVersionTag(RegionInternal* region, CacheableKeyPtr keyPtr, VersionTagPtr tag,
bool deltaCheck){
char key[256];
int16_t keyLen = keyPtr->logString(key, 256);
std::string keystr (key, keyLen);
if (tag.ptr() == NULL)
{
LOGERROR("Cannot process version tag as it is NULL.");
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
return checkForConflict(region, keystr, tag, deltaCheck);
}
GfErrType VersionStamp::checkForConflict(RegionInternal* region, std::string keystr, VersionTagPtr tag, bool deltaCheck)
{
if (getEntryVersion() == 0 && getRegionVersion() == 0 && getMemberId() == 0) {
LOGDEBUG("Version stamp on existing entry not found. applying change: key=%s", keystr.c_str());
return GF_NOERR;
}
if (tag->getEntryVersion() == 0 && tag->getRegionVersionHighBytes() ==0 &&
tag->getRegionVersionLowBytes() ==0 && tag->getInternalMemID() ==0 ){
LOGDEBUG("Version Tag not available. applying change: key=%s", keystr.c_str());
return GF_NOERR;
}
int64_t stampVersion = getEntryVersion() & 0xffffffffL;
int64_t tagVersion = tag->getEntryVersion() & 0xffffffffL;
MemberListForVersionStampPtr memberList = region->getCacheImpl()->getMemberListForVersionStamp();
bool apply = false;
if (stampVersion != 0) {
// check for int wraparound on the version number
int64_t difference = tagVersion - stampVersion;
if (0x10000 < difference || difference < -0x10000)
{
LOGDEBUG("version rollover detected: key=%s tag=%lld stamp=%lld", keystr.c_str(), tagVersion, stampVersion);
int64_t temp = 0x100000000LL;
if (difference < 0) {
tagVersion += temp;
} else {
stampVersion += temp;
}
}
}
if (deltaCheck) {
GfErrType err = checkForDeltaConflict(region, keystr, stampVersion, tagVersion, tag);
if (err != GF_NOERR)
return err;
}
if (stampVersion == 0 || stampVersion < tagVersion) {
LOGDEBUG("applying change: key=%s", keystr.c_str());
apply = true;
} else if (stampVersion > tagVersion) {
LOGDEBUG("disallowing change: key=%s", keystr.c_str());
} else {
// compare member IDs
DSMemberForVersionStampPtr stampID = memberList->getDSMember(getMemberId());
if (stampID == NULLPTR && stampID.ptr() == NULL) {
// This scenario is not possible. But added for just in case
LOGERROR("MemberId of the version stamp could not be found. Disallowing a possible inconsistent change: key=%s",
keystr.c_str());
// throw error
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
DSMemberForVersionStampPtr tagID = memberList->getDSMember(tag->getInternalMemID());
if (tagID == NULLPTR && tagID.ptr() == NULL) {
// This scenario is not possible. But added for just in case
LOGERROR("MemberId of the version tag could not be found. Disallowing a possible inconsistent change. key=%s",
keystr.c_str());
// throw error
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
if (!apply)
{
LOGDEBUG("comparing tagID %s with stampId %s for version comparison of key %s",
tagID->getHashKey().c_str(), stampID->getHashKey().c_str(), keystr.c_str());
int compare = stampID->compareTo(tagID);
if (compare < 0) {
LOGDEBUG("applying change: key=%s", keystr.c_str());
apply = true;
} else if (compare > 0) {
LOGDEBUG("disallowing change: key=%s", keystr.c_str());
}
else{
LOGDEBUG("allowing the change as both the version tag and version stamp are same: key=%s", keystr.c_str());
// This is required for local ops to succeed.
apply = true;
}
}
}
if (!apply) {
region->getCacheImpl()->m_cacheStats->incConflatedEvents();
return GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION;
}
return GF_NOERR;
}
GfErrType VersionStamp::checkForDeltaConflict(RegionInternal* region, std::string keystr, int64_t stampVersion, int64_t tagVersion,
VersionTagPtr tag)
{
MemberListForVersionStampPtr memberList = region->getCacheImpl()->getMemberListForVersionStamp();
ThinClientRegion* tcRegion = dynamic_cast<ThinClientRegion*> (region);
ThinClientPoolDM* poolDM = NULL;
if( tcRegion ) {
poolDM = dynamic_cast<ThinClientPoolDM*> (tcRegion->getDistMgr( ) );
}
if (tagVersion != stampVersion+1) {
LOGDEBUG("delta requires full value due to version mismatch. key=%s tagVersion %lld stampVersion %lld ", keystr.c_str(), tagVersion, stampVersion);
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
} else {
// make sure the tag was based on the value in this entry by checking the
// tag's previous-changer ID against this stamp's current ID
DSMemberForVersionStampPtr stampID = memberList->getDSMember(getMemberId());
if (stampID.ptr() == NULL) {
LOGERROR("MemberId of the version stamp could not be found. Requesting full delta value. key=%s",
keystr.c_str());
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
}
DSMemberForVersionStampPtr tagID = memberList->getDSMember(tag->getPreviousMemID());
if (tagID.ptr() == NULL) {
LOGERROR("Previous MemberId of the version tag could not be found. Requesting full delta value. key=%s",
keystr.c_str());
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
}
if (tagID->compareTo(stampID) != 0) {
LOGDEBUG("delta requires full value due to version mismatch. key=%s. \
tag.previous=%s but stamp.current=%s", keystr.c_str(),
tagID->getHashKey().c_str(), stampID->getHashKey().c_str());
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
}
return GF_NOERR;
}
}