blob: 794e79fd198ff208fe73bd4ba588796d4c45a2a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FarSideEntryOp.hpp"
#include "ClientProxyMembershipID.hpp"
#include "DiskVersionTag.hpp"
#include "RegionCommit.hpp"
#include "util/exception.hpp"
namespace apache {
namespace geode {
namespace client {
FarSideEntryOp::FarSideEntryOp(
MemberListForVersionStamp& memberListForVersionStamp)
: m_op(0),
m_modSerialNum(0),
m_eventOffset(0),
m_didDestroy(false),
m_memberListForVersionStamp(memberListForVersionStamp)
{}
bool FarSideEntryOp::isDestroy(int8_t op) {
return op == DESTROY || op == LOCAL_DESTROY || op == EVICT_DESTROY ||
op == EXPIRE_DESTROY || op == EXPIRE_LOCAL_DESTROY || op == REMOVE;
}
bool FarSideEntryOp::isInvalidate(int8_t op) {
return op == INVALIDATE || op == LOCAL_INVALIDATE ||
op == EXPIRE_INVALIDATE || op == EXPIRE_LOCAL_INVALIDATE;
}
void FarSideEntryOp::fromData(DataInput& input, bool largeModCount,
uint16_t memId) {
m_key = std::dynamic_pointer_cast<CacheableKey>(input.readObject());
m_op = input.read();
if (largeModCount) {
m_modSerialNum = input.readInt32();
} else {
m_modSerialNum = input.read();
}
m_callbackArg = input.readObject();
skipFilterRoutingInfo(input);
m_versionTag =
TcrMessage::readVersionTagPart(input, memId, m_memberListForVersionStamp);
// std::shared_ptr<Serializable> sPtr;
// input.readObject(sPtr);
m_eventOffset = input.readInt32();
if (!isDestroy(m_op)) {
m_didDestroy = input.readBoolean();
if (!isInvalidate(m_op)) {
if (input.readBoolean()) {
int32_t rewind = 1;
int16_t fixedId = 0;
if (input.read() == static_cast<int8_t>(DSCode::FixedIDShort)) {
fixedId = input.readInt16();
rewind += 2;
}
// TOKEN_INVALID = 141;
// TOKEN_LOCAL_INVALID = 142;
// TOKEN_DESTROYED = 43;
// TOKEN_REMOVED = 144;
// TOKEN_REMOVED2 = 145;
if (fixedId >= 141 && fixedId < 146) {
m_value = nullptr;
} else {
input.rewindCursor(rewind);
input.readObject(m_value);
}
} else {
// uint8_t* buf = nullptr;
input.readArrayLength(); // ignore len
input.readObject(m_value);
// input.readBytes(&buf, &len);
// m_value = CacheableBytes::create(buf, len);
}
}
}
}
void FarSideEntryOp::apply(std::shared_ptr<Region>& region) {
// LocalRegion* localRegion = static_cast<LocalRegion*>(region.get());
// localRegion->acquireReadLock();
auto ri = std::static_pointer_cast<RegionInternal>(region);
if (isDestroy(m_op)) {
ri->txDestroy(m_key, m_callbackArg, m_versionTag);
} else if (isInvalidate(m_op)) {
ri->txInvalidate(m_key, m_callbackArg, m_versionTag);
} else {
ri->txPut(m_key, m_value, m_callbackArg, m_versionTag);
}
}
void FarSideEntryOp::skipFilterRoutingInfo(DataInput& input) {
std::shared_ptr<Cacheable> tmp;
auto structType =
static_cast<DSCode>(input.read()); // this is DataSerializable (45)
if (structType == DSCode::NullObj) {
return;
} else if (structType == DSCode::DataSerializable) {
input.read(); // ignore classbyte
input.readObject(); // ignore object
int32_t size = input.readInt32();
for (int i = 0; i < size; i++) {
// ignore ClientProxyMembershipID
ClientProxyMembershipID memId;
memId.readEssentialData(input);
// Ignore filter info
if (input.readBoolean()) {
auto len = input.readArrayLength();
for (int j = 0; j < len; j++) {
input.readUnsignedVL();
input.readUnsignedVL();
}
}
// ignore interestedClients
auto len = input.readInt32();
if (len != -1) {
const auto isLong = input.readBoolean();
input.advanceCursor(len * (isLong ? sizeof(int64_t) : sizeof(int32_t)));
}
// ignore interestedClientsInv
len = input.readInt32();
if (len != -1) {
const auto isLong = input.readBoolean();
input.advanceCursor(len * (isLong ? sizeof(int64_t) : sizeof(int32_t)));
}
}
} else {
LOGERROR(
"FarSideEntryOp::skipFilterRoutingInfo Unexpected type id: %d while "
"desirializing commit response",
structType);
GfErrTypeThrowException(
"FarSideEntryOp::skipFilterRoutingInfo Unable to handle commit "
"response",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
}
/* std::shared_ptr<EntryEvent> FarSideEntryOp::getEntryEvent(Cache* cache)
{
return std::shared_ptr<EntryEvent>(new EntryEvent(
m_region->getRegion(cache),
m_key,
nullptr,
m_value,
m_callbackArg,
false));
}
*/
} // namespace client
} // namespace geode
} // namespace apache