blob: a9ff28e9f31a903862456c5d5b1e57a28a325a50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PdxHelper.hpp"
#include <geode/Cache.hpp>
#include <geode/DataInput.hpp>
#include <geode/PoolManager.hpp>
#include "CacheRegionHelper.hpp"
#include "DataInputInternal.hpp"
#include "DataOutputInternal.hpp"
#include "PdxInstanceImpl.hpp"
#include "PdxLocalReader.hpp"
#include "PdxReaderWithTypeCollector.hpp"
#include "PdxRemoteReader.hpp"
#include "PdxRemoteWriter.hpp"
#include "PdxType.hpp"
#include "PdxTypeRegistry.hpp"
#include "PdxWriterWithTypeCollector.hpp"
#include "SerializationRegistry.hpp"
#include "ThinClientPoolDM.hpp"
#include "Utils.hpp"
namespace apache {
namespace geode {
namespace client {
uint8_t PdxHelper::PdxHeader = 8;
PdxHelper::PdxHelper() {}
PdxHelper::~PdxHelper() {}
void PdxHelper::serializePdx(
DataOutput& output, const std::shared_ptr<PdxSerializable>& pdxObject) {
auto pdxII = std::dynamic_pointer_cast<PdxInstanceImpl>(pdxObject);
auto cacheImpl = CacheRegionHelper::getCacheImpl(output.getCache());
auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry();
auto& cachePerfStats = cacheImpl->getCachePerfStats();
if (pdxII != nullptr) {
auto piPt = pdxII->getPdxType();
if (piPt != nullptr &&
piPt->getTypeId() ==
0) // from pdxInstance factory need to get typeid from server
{
int typeId = pdxTypeRegistry->getPDXIdForType(
piPt, DataOutputInternal::getPool(output));
pdxII->setPdxId(typeId);
}
auto plw = PdxLocalWriter(output, piPt, pdxTypeRegistry);
pdxII->toData(plw);
plw.endObjectWriting(); // now write typeid
int len = 0;
uint8_t* pdxStream = plw.getPdxStream(len);
pdxII->updatePdxStream(pdxStream, len);
delete[] pdxStream;
return;
}
auto&& className = pdxObject->getClassName();
auto localPdxType = pdxTypeRegistry->getLocalPdxType(className);
if (localPdxType == nullptr) {
// need to grab type info, as fromdata is not called yet
PdxWriterWithTypeCollector ptc(output, className, pdxTypeRegistry);
pdxObject->toData(ptc);
auto nType = ptc.getPdxLocalType();
nType->InitializeType();
int32_t nTypeId = pdxTypeRegistry->getPDXIdForType(
className.c_str(), DataOutputInternal::getPool(output), nType, true);
nType->setTypeId(nTypeId);
ptc.endObjectWriting();
pdxTypeRegistry->addLocalPdxType(className, nType);
pdxTypeRegistry->addPdxType(nTypeId, nType);
if (cacheImpl != nullptr) {
uint8_t* stPos = const_cast<uint8_t*>(output.getBuffer()) +
ptc.getStartPositionOffset();
int pdxLen = PdxHelper::readInt32(stPos);
cachePerfStats.incPdxSerialization(
pdxLen + 1 + 2 * 4); // pdxLen + 93 DSID + len + typeID
}
} else // we know locasl type, need to see preerved data
{
// if object got from server than create instance of RemoteWriter otherwise
// local writer.
// now always remotewriter as we have API Read/WriteUnreadFields
// so we don't know whether user has used those or not;; Can we do some
// trick here?
auto createPdxRemoteWriter = [&]() -> PdxRemoteWriter {
if (auto pd = pdxTypeRegistry->getPreserveData(pdxObject)) {
auto mergedPdxType = pdxTypeRegistry->getPdxType(pd->getMergedTypeId());
return PdxRemoteWriter(output, mergedPdxType, pd, pdxTypeRegistry);
} else {
return PdxRemoteWriter(output, className, pdxTypeRegistry);
}
};
PdxRemoteWriter prw = createPdxRemoteWriter();
pdxObject->toData(prw);
prw.endObjectWriting();
//[ToDo] need to write bytes for stats
if (cacheImpl != nullptr) {
uint8_t* stPos = const_cast<uint8_t*>(output.getBuffer()) +
prw.getStartPositionOffset();
int pdxLen = PdxHelper::readInt32(stPos);
cachePerfStats.incPdxSerialization(
pdxLen + 1 + 2 * 4); // pdxLen + 93 DSID + len + typeID
}
}
}
std::shared_ptr<PdxSerializable> PdxHelper::deserializePdx(DataInput& dataInput,
int32_t typeId,
int32_t length) {
std::shared_ptr<PdxSerializable> pdxObjectptr = nullptr;
std::shared_ptr<PdxType> pdxLocalType = nullptr;
auto cacheImpl = CacheRegionHelper::getCacheImpl(dataInput.getCache());
auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry();
auto serializationRegistry = cacheImpl->getSerializationRegistry();
auto pType = pdxTypeRegistry->getPdxType(typeId);
if (pType != nullptr) { // this may happen with PdxInstanceFactory {
pdxLocalType = pdxTypeRegistry->getLocalPdxType(
pType->getPdxClassName()); // this should be fine for IPdxTypeMapper
}
if (pType != nullptr && pdxLocalType != nullptr) // type found
{
auto&& pdxClassname = pType->getPdxClassName();
LOGDEBUG("deserializePdx ClassName = " + pdxClassname +
", isLocal = " + std::to_string(pType->isLocal()));
pdxObjectptr = serializationRegistry->getPdxSerializableType(pdxClassname);
if (pType->isLocal()) // local type no need to read Unread data
{
auto plr = PdxLocalReader(dataInput, pType, length, pdxTypeRegistry);
pdxObjectptr->fromData(plr);
plr.moveStream();
} else {
auto prr = PdxRemoteReader(dataInput, pType, length, pdxTypeRegistry);
pdxObjectptr->fromData(prr);
auto mergedVersion = pdxTypeRegistry->getMergedType(pType->getTypeId());
auto preserveData = prr.getPreservedData(mergedVersion, pdxObjectptr);
if (preserveData != nullptr) {
pdxTypeRegistry->setPreserveData(
pdxObjectptr, preserveData,
cacheImpl
->getExpiryTaskManager()); // it will set data in weakhashmap
}
prr.moveStream();
}
} else {
// type not found; need to get from server
if (pType == nullptr) {
pType = std::dynamic_pointer_cast<PdxType>(
serializationRegistry->GetPDXTypeById(
DataInputInternal::getPool(dataInput), typeId));
pdxLocalType = pdxTypeRegistry->getLocalPdxType(pType->getPdxClassName());
}
/* adongre - Coverity II
* CID 29298: Unused pointer value (UNUSED_VALUE)
* Pointer "pdxClassname" returned by "pType->getPdxClassName()" is never
* used.
* Fix : Commented the line
*/
// pdxClassname = pType->getPdxClassName();
pdxObjectptr =
serializationRegistry->getPdxSerializableType(pType->getPdxClassName());
auto pdxRealObject = pdxObjectptr;
if (pdxLocalType == nullptr) // need to know local type
{
auto prtc =
PdxReaderWithTypeCollector(dataInput, pType, length, pdxTypeRegistry);
pdxObjectptr->fromData(prtc);
// Check for the PdxWrapper
pdxLocalType = prtc.getLocalType();
if (pType->Equals(pdxLocalType)) {
pdxTypeRegistry->addLocalPdxType(pdxRealObject->getClassName(), pType);
pdxTypeRegistry->addPdxType(pType->getTypeId(), pType);
pType->setLocal(true);
} else {
// Need to know local type and then merge type
pdxLocalType->InitializeType();
pdxLocalType->setTypeId(pdxTypeRegistry->getPDXIdForType(
pdxObjectptr->getClassName(), DataInputInternal::getPool(dataInput),
pdxLocalType, true));
pdxLocalType->setLocal(true);
pdxTypeRegistry->addLocalPdxType(pdxRealObject->getClassName(),
pdxLocalType); // added local type
pdxTypeRegistry->addPdxType(pdxLocalType->getTypeId(), pdxLocalType);
pType->InitializeType();
pdxTypeRegistry->addPdxType(pType->getTypeId(),
pType); // adding remote type
// create merge type
createMergedType(pdxLocalType, pType, dataInput);
auto mergedVersion = pdxTypeRegistry->getMergedType(pType->getTypeId());
if (auto preserveData =
prtc.getPreservedData(mergedVersion, pdxObjectptr)) {
pdxTypeRegistry->setPreserveData(pdxObjectptr, preserveData,
cacheImpl->getExpiryTaskManager());
}
}
prtc.moveStream();
} else { // remote reader will come here as local type is there
pType->InitializeType();
LOGDEBUG("Adding type %d ", pType->getTypeId());
pdxTypeRegistry->addPdxType(pType->getTypeId(),
pType); // adding remote type
auto prr = PdxRemoteReader(dataInput, pType, length, pdxTypeRegistry);
pdxObjectptr->fromData(prr);
// Check for PdxWrapper to getObject.
createMergedType(pdxLocalType, pType, dataInput);
auto mergedVersion = pdxTypeRegistry->getMergedType(pType->getTypeId());
auto preserveData = prr.getPreservedData(mergedVersion, pdxObjectptr);
if (preserveData != nullptr) {
pdxTypeRegistry->setPreserveData(pdxObjectptr, preserveData,
cacheImpl->getExpiryTaskManager());
}
prr.moveStream();
}
}
return pdxObjectptr;
}
std::shared_ptr<PdxSerializable> PdxHelper::deserializePdx(
DataInput& dataInput, bool forceDeserialize) {
auto cacheImpl = CacheRegionHelper::getCacheImpl(dataInput.getCache());
auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry();
auto serializationRegistry = cacheImpl->getSerializationRegistry();
auto& cachePerfStats = cacheImpl->getCachePerfStats();
if (pdxTypeRegistry->getPdxReadSerialized() == false || forceDeserialize) {
// Read Length
int32_t len = dataInput.readInt32();
int32_t typeId = dataInput.readInt32();
cachePerfStats.incPdxDeSerialization(len + 9); // pdxLen + 1 + 2*4
return PdxHelper::deserializePdx(dataInput, typeId, len);
} else {
// Read Length
int32_t len = dataInput.readInt32();
int typeId = dataInput.readInt32();
auto pType = pdxTypeRegistry->getPdxType(typeId);
if (pType == nullptr) {
// TODO shared_ptr why redef?
auto pType = std::dynamic_pointer_cast<PdxType>(
serializationRegistry->GetPDXTypeById(
DataInputInternal::getPool(dataInput), typeId));
pdxTypeRegistry->addLocalPdxType(pType->getPdxClassName(), pType);
pdxTypeRegistry->addPdxType(pType->getTypeId(), pType);
}
cachePerfStats.incPdxInstanceCreations();
// TODO::Enable it once the PdxInstanceImple is CheckedIn.
auto pdxObject = std::make_shared<PdxInstanceImpl>(
const_cast<uint8_t*>(dataInput.currentBufferPosition()), len, typeId,
cachePerfStats, *pdxTypeRegistry, *cacheImpl,
cacheImpl->getDistributedSystem()
.getSystemProperties()
.getEnableTimeStatistics());
dataInput.advanceCursor(len);
return std::move(pdxObject);
}
}
void PdxHelper::createMergedType(std::shared_ptr<PdxType> localType,
std::shared_ptr<PdxType> remoteType,
DataInput& dataInput) {
auto mergedVersion = localType->mergeVersion(remoteType);
auto cacheImpl = CacheRegionHelper::getCacheImpl(dataInput.getCache());
auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry();
auto serializaionRegistry = cacheImpl->getSerializationRegistry();
if (mergedVersion->Equals(localType)) {
pdxTypeRegistry->setMergedType(remoteType->getTypeId(), localType);
} else if (mergedVersion->Equals(remoteType)) {
pdxTypeRegistry->setMergedType(remoteType->getTypeId(), remoteType);
} else { // need to create new version
mergedVersion->InitializeType();
if (mergedVersion->getTypeId() == 0) {
mergedVersion->setTypeId(serializaionRegistry->GetPDXIdForType(
DataInputInternal::getPool(dataInput), mergedVersion));
}
// PdxTypeRegistry::AddPdxType(remoteType->TypeId, mergedVersion);
pdxTypeRegistry->addPdxType(mergedVersion->getTypeId(), mergedVersion);
pdxTypeRegistry->setMergedType(remoteType->getTypeId(), mergedVersion);
pdxTypeRegistry->setMergedType(mergedVersion->getTypeId(), mergedVersion);
}
}
int32_t PdxHelper::readInt32(uint8_t* offsetPosition) {
int32_t data = offsetPosition[0];
data = (data << 8) | offsetPosition[1];
data = (data << 8) | offsetPosition[2];
data = (data << 8) | offsetPosition[3];
return data;
}
void PdxHelper::writeInt32(uint8_t* offsetPosition, int32_t value) {
offsetPosition[0] = static_cast<uint8_t>(value >> 24);
offsetPosition[1] = static_cast<uint8_t>(value >> 16);
offsetPosition[2] = static_cast<uint8_t>(value >> 8);
offsetPosition[3] = static_cast<uint8_t>(value);
}
int32_t PdxHelper::readInt16(uint8_t* offsetPosition) {
int16_t data = offsetPosition[0];
data = (data << 8) | offsetPosition[1];
return static_cast<int32_t>(data);
}
int32_t PdxHelper::readUInt16(uint8_t* offsetPosition) {
uint16_t data = offsetPosition[0];
data = (data << 8) | offsetPosition[1];
return static_cast<int32_t>(data);
}
int32_t PdxHelper::readByte(uint8_t* offsetPosition) {
return static_cast<int32_t>(offsetPosition[0]);
}
void PdxHelper::writeInt16(uint8_t* offsetPosition, int32_t value) {
int16_t val = static_cast<int16_t>(value);
offsetPosition[0] = static_cast<uint8_t>(val >> 8);
offsetPosition[1] = static_cast<uint8_t>(val);
}
void PdxHelper::writeByte(uint8_t* offsetPosition, int32_t value) {
offsetPosition[0] = static_cast<uint8_t>(value);
}
int32_t PdxHelper::readInt(uint8_t* offsetPosition, int size) {
switch (size) {
case 1:
return readByte(offsetPosition);
case 2:
return readUInt16(offsetPosition);
case 4:
return readInt32(offsetPosition);
}
throw;
}
int32_t PdxHelper::getEnumValue(
const char* enumClassName, const char* enumName, int hashcode,
std::shared_ptr<PdxTypeRegistry> pdxTypeRegistry) {
const auto& ei =
std::make_shared<EnumInfo>(enumClassName, enumName, hashcode);
return pdxTypeRegistry->getEnumValue(ei);
}
std::shared_ptr<EnumInfo> PdxHelper::getEnum(
int enumId, std::shared_ptr<PdxTypeRegistry> pdxTypeRegistry) {
const auto& ei = pdxTypeRegistry->getEnum(enumId);
return ei;
}
} // namespace client
} // namespace geode
} // namespace apache