blob: 15b8ea3f39f1a7fddfec78a1c57464e7fc11e071 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VersionedCacheableObjectPartList.hpp"
#include <geode/CacheableString.hpp>
#include <geode/ExceptionTypes.hpp>
#include "CacheableToken.hpp"
#include "DiskStoreId.hpp"
#include "DiskVersionTag.hpp"
#include "ThinClientRegion.hpp"
namespace apache {
namespace geode {
namespace client {
const uint8_t VersionedCacheableObjectPartList::FLAG_NULL_TAG = 0;
const uint8_t VersionedCacheableObjectPartList::FLAG_FULL_TAG = 1;
const uint8_t VersionedCacheableObjectPartList::FLAG_TAG_WITH_NEW_ID = 2;
const uint8_t VersionedCacheableObjectPartList::FLAG_TAG_WITH_NUMBER_ID = 3;
void VersionedCacheableObjectPartList::toData(DataOutput&) const {
throw UnsupportedOperationException(
"VersionedCacheableObjectPartList::toData not implemented");
}
void VersionedCacheableObjectPartList::readObjectPart(
int32_t index, DataInput& input, std::shared_ptr<CacheableKey> keyPtr) {
auto objType = input.read();
std::shared_ptr<Cacheable> value;
m_byteArray[index] = objType;
bool isException = (objType == 2 ? 1 : 0);
if (isException) { // Exception case
// Skip the exception that is in java serialized format, we cant read it.
input.advanceCursor(input.readArrayLength());
const auto exMsg = input.readString(); ////4.1
std::shared_ptr<Exception> ex;
if (exMsg == "org.apache.geode.security.NotAuthorizedException") {
auto message = "Authorization exception at server: " + exMsg;
ex = std::make_shared<NotAuthorizedException>(message);
} else {
auto message = "Exception at remote server: " + exMsg;
ex = std::make_shared<CacheServerException>(message);
}
m_exceptions->emplace(keyPtr, ex);
} else if (m_serializeValues) {
// read length
int32_t skipLen = input.readArrayLength();
int8_t* bytes = nullptr;
if (skipLen > 0) {
// readObject
bytes = new int8_t[skipLen];
input.readBytesOnly(bytes, skipLen);
}
m_values->emplace(keyPtr, CacheableBytes::create(
std::vector<int8_t>(bytes, bytes + skipLen)));
/* adongre
* CID 29377: Resource leak (RESOURCE_LEAK)Calling allocation function
* "apache::geode::client::DataInput::readBytes(unsigned char **, int *)" on
* "bytes".
*/
_GEODE_SAFE_DELETE_ARRAY(bytes);
} else {
// set nullptr to indicate that there is no exception for the key on this
// index
// readObject
input.readObject(value);
if (m_values) m_values->emplace(keyPtr, value);
}
}
void VersionedCacheableObjectPartList::fromData(DataInput& input) {
std::lock_guard<decltype(m_responseLock)> guard(m_responseLock);
LOGDEBUG("VersionedCacheableObjectPartList::fromData");
uint8_t flags = input.read();
m_hasKeys = (flags & 0x01) == 0x01;
bool hasObjects = (flags & 0x02) == 0x02;
m_hasTags = (flags & 0x04) == 0x04;
m_regionIsVersioned = (flags & 0x08) == 0x08;
m_serializeValues = (flags & 0x10) == 0x10;
bool persistent = (flags & 0x20) == 0x20;
std::shared_ptr<CacheableString> exMsgPtr;
int32_t len = 0;
bool valuesNULL = false;
int32_t keysOffset = (m_keysOffset != nullptr ? *m_keysOffset : 0);
// bool readObjLen = false;
// int32_t lenOfObjects = 0;
if (m_values == nullptr) {
m_values = std::make_shared<HashMapOfCacheable>();
valuesNULL = true;
}
if (!m_hasKeys && !hasObjects && !m_hasTags) {
LOGDEBUG(
"VersionedCacheableObjectPartList::fromData: Looks like message has no "
"data. Returning,");
}
auto localKeys =
std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>();
if (m_hasKeys) {
len = static_cast<int32_t>(input.readUnsignedVL());
for (int32_t index = 0; index < len; ++index) {
auto key = std::dynamic_pointer_cast<CacheableKey>(input.readObject());
if (m_resultKeys != nullptr) {
m_resultKeys->push_back(key);
}
m_tempKeys->push_back(key);
localKeys->push_back(key);
}
} else if (m_keys != nullptr) {
LOGDEBUG("VersionedCacheableObjectPartList::fromData: m_keys NOT nullptr");
/*
if (m_hasKeys) {
int64_t tempLen;
input.readUnsignedVL(&tempLen);
len = (int32_t)tempLen;
}else{
len = m_keys->size();
}
lenOfObjects = len;
readObjLen = true;
for (int32_t index = keysOffset; index < keysOffset + len; ++index) {
key = m_keys->at(index);
if (m_resultKeys != nullptr) {
m_resultKeys->push_back(key);
}
}*/
} else if (hasObjects) {
if (m_keys == nullptr && m_resultKeys == nullptr) {
LOGERROR(
"VersionedCacheableObjectPartList::fromData: Exception: hasObjects "
"is true and m_keys and m_resultKeys are also nullptr");
throw FatalInternalException(
"VersionedCacheableObjectPartList: "
"hasObjects is true and m_keys is also nullptr");
} else {
LOGDEBUG(
"VersionedCacheableObjectPartList::fromData m_keys or m_resultKeys "
"not null");
}
} else {
LOGDEBUG(
"VersionedCacheableObjectPartList::fromData m_hasKeys, m_keys, "
"hasObjects all are nullptr");
} // m_hasKeys else ends here
if (hasObjects) {
len = static_cast<int32_t>(input.readUnsignedVL());
m_byteArray.resize(len);
for (int32_t index = 0; index < len; ++index) {
if (m_keys != nullptr && !m_hasKeys) {
readObjectPart(index, input, m_keys->at(index + keysOffset));
} else /*if (m_resultKeys != nullptr && m_resultKeys->size() > 0)*/ {
readObjectPart(index, input, localKeys->at(index));
} /*else{
LOGERROR("VersionedCacheableObjectPartList::fromData: hasObjects = true
but m_keys is nullptr and m_resultKeys== nullptr or m_resultKeys->size=0"
);
}*/
}
} // hasObjects ends here
if (m_hasTags) {
len = static_cast<int32_t>(input.readUnsignedVL());
;
m_versionTags.resize(len);
std::vector<uint16_t> ids;
MemberListForVersionStamp& memberListForVersionStamp =
*(m_region->getCacheImpl()->getMemberListForVersionStamp());
for (int32_t index = 0; index < len; index++) {
uint8_t entryType = input.read();
std::shared_ptr<VersionTag> versionTag;
switch (entryType) {
case FLAG_NULL_TAG: {
break;
}
case FLAG_FULL_TAG: {
if (persistent) {
versionTag = std::shared_ptr<VersionTag>(
new DiskVersionTag(memberListForVersionStamp));
} else {
versionTag = std::shared_ptr<VersionTag>(
new VersionTag(memberListForVersionStamp));
}
versionTag->fromData(input);
versionTag->replaceNullMemberId(getEndpointMemId());
break;
}
case FLAG_TAG_WITH_NEW_ID: {
if (persistent) {
versionTag = std::shared_ptr<VersionTag>(
new DiskVersionTag(memberListForVersionStamp));
} else {
versionTag = std::shared_ptr<VersionTag>(
new VersionTag(memberListForVersionStamp));
}
versionTag->fromData(input);
ids.push_back(versionTag->getInternalMemID());
break;
}
case FLAG_TAG_WITH_NUMBER_ID: {
if (persistent) {
versionTag = std::shared_ptr<VersionTag>(
new DiskVersionTag(memberListForVersionStamp));
} else {
versionTag = std::shared_ptr<VersionTag>(
new VersionTag(memberListForVersionStamp));
}
versionTag->fromData(input);
auto idNumber = input.readUnsignedVL();
versionTag->setInternalMemID(ids.at(idNumber));
break;
}
default: { break; }
}
m_versionTags[index] = versionTag;
}
} else { // if consistancyEnabled=false, we need to pass empty or
// std::shared_ptr<NULL> m_versionTags
for (int32_t index = 0; index < len; ++index) {
std::shared_ptr<VersionTag> versionTag;
m_versionTags[index] = versionTag;
}
}
if (hasObjects) {
std::shared_ptr<CacheableKey> key;
std::shared_ptr<VersionTag> versionTag;
std::shared_ptr<Cacheable> value;
for (int32_t index = 0; index < len; ++index) {
if (m_keys != nullptr && !m_hasKeys) {
key = m_keys->at(index + keysOffset);
} else /*if (m_resultKeys != nullptr && m_resultKeys->size() > 0)*/ {
key = localKeys->at(index);
} /*else{
LOGERROR("VersionedCacheableObjectPartList::fromData: hasObjects = true
but m_keys is nullptr AND m_resultKeys=nullptr or m_resultKeys->size=0"
);
}*/
const auto& iter = m_values->find(key);
value = iter == m_values->end() ? nullptr : iter->second;
if (m_byteArray[index] != 3) { // 3 - key not found on server
std::shared_ptr<Cacheable> oldValue;
if (m_addToLocalCache) {
int updateCount = -1;
versionTag = m_versionTags[index];
GfErrType err =
m_region->putLocal("getAll", false, key, value, oldValue, true,
updateCount, m_destroyTracker, versionTag);
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"VersionedCacheableObjectPartList::fromData putLocal for key [%s] failed because the cache \
already contains an entry with higher version.",
Utils::nullSafeToString(key).c_str());
// replace the value with higher version tag
(*m_values)[key] = oldValue;
}
} // END::m_addToLocalCache
else { // m_addToLocalCache = false
m_region->getEntry(key, oldValue);
// if value has already been received via notification or put by
// another thread, then return that
if (oldValue != nullptr && !CacheableToken::isInvalid(oldValue)) {
// replace the value with new value
(*m_values)[key] = oldValue;
}
}
}
}
}
if (m_keysOffset != nullptr) *m_keysOffset += len;
if (valuesNULL) m_values = nullptr;
}
} // namespace client
} // namespace geode
} // namespace apache