blob: 9d1004ab80f9b0edec49386e91b093a2cc5ca5bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VersionStamp.hpp"
#include <string>
#include "CacheImpl.hpp"
#include "MemberListForVersionStamp.hpp"
#include "RegionInternal.hpp"
#include "ThinClientPoolDM.hpp"
#include "ThinClientRegion.hpp"
namespace apache {
namespace geode {
namespace client {
void VersionStamp::setVersions(std::shared_ptr<VersionTag> versionTag) {
int32_t eVersion = versionTag->getEntryVersion();
m_entryVersionLowBytes = static_cast<uint16_t>(eVersion & 0xffff);
m_entryVersionHighByte = static_cast<uint8_t>((eVersion & 0xff0000) >> 16);
m_regionVersionHighBytes = versionTag->getRegionVersionHighBytes();
m_regionVersionLowBytes = versionTag->getRegionVersionLowBytes();
m_memberID = versionTag->getInternalMemID();
}
void VersionStamp::setVersions(VersionStamp& versionStamp) {
m_entryVersionLowBytes = versionStamp.m_entryVersionLowBytes;
m_entryVersionHighByte = versionStamp.m_entryVersionHighByte;
m_regionVersionHighBytes = versionStamp.m_regionVersionHighBytes;
m_regionVersionLowBytes = versionStamp.m_regionVersionLowBytes;
m_memberID = versionStamp.m_memberID;
}
int32_t VersionStamp::getEntryVersion() const {
return (m_entryVersionHighByte << 16) | m_entryVersionLowBytes;
}
int64_t VersionStamp::getRegionVersion() const {
return ((static_cast<int64_t>(m_regionVersionHighBytes)) << 32) |
m_regionVersionLowBytes;
}
uint16_t VersionStamp::getMemberId() const { return m_memberID; }
// Processes version tag. Checks if there is a conflict with the existing
// version.
// Also checks if it is a delta update than it is based on the existing version.
// This is based on the basicprocessVersionTag function of
// AbstractRegionEntry.java
// Any change to the java function should be reflected here as well.
GfErrType VersionStamp::processVersionTag(
const RegionInternal* region, const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<VersionTag>& tag, const bool deltaCheck) const {
if (nullptr == tag) {
LOGERROR("Cannot process version tag as it is nullptr.");
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
return checkForConflict(region, keyPtr->toString().c_str(), tag, deltaCheck);
}
GfErrType VersionStamp::checkForConflict(const RegionInternal* region,
const std::string& keystr,
const std::shared_ptr<VersionTag>& tag,
const bool deltaCheck) const {
if (getEntryVersion() == 0 && getRegionVersion() == 0 && getMemberId() == 0) {
LOGDEBUG(
"Version stamp on existing entry not found. applying change: key=%s",
keystr.c_str());
return GF_NOERR;
}
if (tag->getEntryVersion() == 0 && tag->getRegionVersionHighBytes() == 0 &&
tag->getRegionVersionLowBytes() == 0 && tag->getInternalMemID() == 0) {
LOGDEBUG("Version Tag not available. applying change: key=%s",
keystr.c_str());
return GF_NOERR;
}
int64_t stampVersion = getEntryVersion() & 0xffffffffL;
int64_t tagVersion = tag->getEntryVersion() & 0xffffffffL;
auto memberList = region->getCacheImpl()->getMemberListForVersionStamp();
bool apply = false;
if (stampVersion != 0) {
// check for int wraparound on the version number
int64_t difference = tagVersion - stampVersion;
if (0x10000 < difference || difference < -0x10000) {
LOGDEBUG("version rollover detected: key=%s tag=%lld stamp=%lld",
keystr.c_str(), tagVersion, stampVersion);
int64_t temp = 0x100000000LL;
if (difference < 0) {
tagVersion += temp;
} else {
stampVersion += temp;
}
}
}
if (deltaCheck) {
auto err =
checkForDeltaConflict(region, keystr, stampVersion, tagVersion, tag);
if (err != GF_NOERR) return err;
}
if (stampVersion == 0 || stampVersion < tagVersion) {
LOGDEBUG("applying change: key=%s", keystr.c_str());
apply = true;
} else if (stampVersion > tagVersion) {
LOGDEBUG("disallowing change: key=%s", keystr.c_str());
} else {
// compare member IDs
auto stampID = memberList->getDSMember(getMemberId());
if (nullptr == stampID) {
// This scenario is not possible. But added for just in case
LOGERROR(
"MemberId of the version stamp could not be found. Disallowing a "
"possible inconsistent change: key=%s",
keystr.c_str());
// throw error
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
auto tagID = memberList->getDSMember(tag->getInternalMemID());
if (nullptr == tagID) {
// This scenario is not possible. But added for just in case
LOGERROR(
"MemberId of the version tag could not be found. Disallowing a "
"possible inconsistent change. key=%s",
keystr.c_str());
// throw error
return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
}
if (!apply) {
LOGDEBUG(
"comparing tagID %s with stampId %s for version comparison of key %s",
tagID->getHashKey().c_str(), stampID->getHashKey().c_str(),
keystr.c_str());
int compare = stampID->compareTo(*tagID);
if (compare < 0) {
LOGDEBUG("applying change: key=%s", keystr.c_str());
apply = true;
} else if (compare > 0) {
LOGDEBUG("disallowing change: key=%s", keystr.c_str());
} else {
LOGDEBUG(
"allowing the change as both the version tag and version stamp are "
"same: key=%s",
keystr.c_str());
// This is required for local ops to succeed.
apply = true;
}
}
}
if (!apply) {
region->getCacheImpl()->getCachePerfStats().incConflatedEvents();
return GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION;
}
return GF_NOERR;
}
GfErrType VersionStamp::checkForDeltaConflict(
const RegionInternal* region, const std::string& keystr,
const int64_t stampVersion, const int64_t tagVersion,
const std::shared_ptr<VersionTag>& tag) const {
auto memberList = region->getCacheImpl()->getMemberListForVersionStamp();
auto tcRegion = dynamic_cast<const ThinClientRegion*>(region);
ThinClientPoolDM* poolDM = nullptr;
if (tcRegion) {
poolDM = dynamic_cast<ThinClientPoolDM*>(tcRegion->getDistMgr());
}
if (tagVersion != stampVersion + 1) {
LOGDEBUG(
"delta requires full value due to version mismatch. key=%s tagVersion "
"%lld stampVersion %lld ",
keystr.c_str(), tagVersion, stampVersion);
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
} else {
// make sure the tag was based on the value in this entry by checking the
// tag's previous-changer ID against this stamp's current ID
auto stampID = memberList->getDSMember(getMemberId());
if (nullptr == stampID) {
LOGERROR(
"MemberId of the version stamp could not be found. Requesting full "
"delta value. key=%s",
keystr.c_str());
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
}
auto tagID = memberList->getDSMember(tag->getPreviousMemID());
if (nullptr == tagID) {
LOGERROR(
"Previous MemberId of the version tag could not be found. Requesting "
"full delta value. key=%s",
keystr.c_str());
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
}
if (tagID->compareTo(*stampID) != 0) {
LOGDEBUG(
"delta requires full value due to version mismatch. key=%s. \
tag.previous=%s but stamp.current=%s",
keystr.c_str(), tagID->getHashKey().c_str(),
stampID->getHashKey().c_str());
if (poolDM) poolDM->updateNotificationStats(false, 0);
return GF_INVALID_DELTA;
}
return GF_NOERR;
}
}
} // namespace client
} // namespace geode
} // namespace apache