blob: deb5dad2549cf2e5ba9253c91d1a565eca0ce6a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.internal.cache.map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntryEventSerialization;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.RegionClearedException;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.ValueComparisonHelper;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.ReferenceCountHelper;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class RegionMapPut extends AbstractRegionMapPut {
protected static final Logger logger = LogService.getLogger();
private final CacheModificationLock cacheModificationLock;
private final EntryEventSerialization entryEventSerialization;
private final boolean ifNew;
private final boolean ifOld;
private final boolean overwriteDestroyed;
private boolean overwritePutIfAbsent;
private final boolean requireOldValue;
private final boolean retrieveOldValueForDelta;
private final boolean replaceOnClient;
private final boolean onlyExisting;
private final boolean cacheWrite;
private final CacheWriter cacheWriter;
private final Set netWriteRecipients;
private final Object expectedOldValue;
@Released
private Object oldValueForDelta;
public RegionMapPut(FocusedRegionMap focusedRegionMap, InternalRegion owner,
CacheModificationLock cacheModificationLock, EntryEventSerialization entryEventSerialization,
EntryEventImpl event, boolean ifNew, boolean ifOld, boolean overwriteDestroyed,
boolean requireOldValue, Object expectedOldValue) {
super(focusedRegionMap, owner, event);
this.cacheModificationLock = cacheModificationLock;
this.entryEventSerialization = entryEventSerialization;
this.ifNew = ifNew;
this.ifOld = ifOld;
this.overwriteDestroyed = overwriteDestroyed;
this.requireOldValue = requireOldValue;
this.retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
this.replaceOnClient = event.getOperation() == Operation.REPLACE && owner.hasServerProxy();
this.onlyExisting = ifOld && !isReplaceOnClient();
this.cacheWriter = owner.basicGetWriter();
this.cacheWrite = !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
&& (getCacheWriter() != null || owner.hasServerProxy() || owner.getScope().isDistributed());
this.expectedOldValue = expectedOldValue;
if (isCacheWrite() && getCacheWriter() == null) {
this.netWriteRecipients = owner.adviseNetWrite();
} else {
this.netWriteRecipients = null;
}
}
private boolean isIfNew() {
return ifNew;
}
private boolean isIfOld() {
return ifOld;
}
private boolean isOverwriteDestroyed() {
return overwriteDestroyed;
}
private boolean isRequireOldValue() {
return requireOldValue;
}
boolean isRetrieveOldValueForDelta() {
return retrieveOldValueForDelta;
}
boolean isReplaceOnClient() {
return replaceOnClient;
}
boolean isOverwritePutIfAbsent() {
return overwritePutIfAbsent;
}
boolean isCacheWrite() {
return cacheWrite;
}
private CacheWriter getCacheWriter() {
return cacheWriter;
}
private Set getNetWriteRecipients() {
return netWriteRecipients;
}
private Object getExpectedOldValue() {
return expectedOldValue;
}
private Object getOldValueForDelta() {
return this.oldValueForDelta;
}
private void setOldValueForDelta(Object value) {
this.oldValueForDelta = value;
}
@Override
protected boolean isOnlyExisting() {
return onlyExisting;
}
@Override
protected boolean entryExists(RegionEntry regionEntry) {
return regionEntry != null && !regionEntry.isTombstone();
}
@Override
protected void serializeNewValueIfNeeded() {
entryEventSerialization.serializeNewValueIfNeeded(getOwner(), getEvent());
}
@Override
protected void runWhileLockedForCacheModification(Runnable r) {
cacheModificationLock.lockForCacheModification(getOwner(), getEvent());
try {
super.runWhileLockedForCacheModification(r);
} finally {
cacheModificationLock.releaseCacheModificationLock(getOwner(), getEvent());
}
}
@Override
protected void setOldValueForDelta() {
if (isRetrieveOldValueForDelta()) {
getRegionMap().runWhileEvictionDisabled(() -> {
setOldValueForDelta(getRegionEntry().getValue(getOwner()));
});
}
}
@Override
protected void setOldValueInEvent() {
final EntryEventImpl event = getEvent();
final RegionEntry re = getRegionEntry();
event.setRegionEntry(re);
if (event.getOperation().guaranteesOldValue()) {
setOldValueEvenIfFaultedOut();
} else if (isCacheWrite() || isRequireOldValue()) {
setOldValueIfNotFaultedOut();
} else {
@Unretained
Object existingValue = re.getValue();
if (existingValue instanceof GatewaySenderEventImpl) {
event.setOldValue(existingValue, true);
}
}
}
private void setOldValueIfNotFaultedOut() {
final EntryEventImpl event = getEvent();
ReferenceCountHelper.skipRefCountTracking();
@Released
Object oldValueInVM = getRegionEntry().getValueRetain(event.getRegion(), true);
if (oldValueInVM == null) {
oldValueInVM = Token.NOT_AVAILABLE;
}
ReferenceCountHelper.unskipRefCountTracking();
try {
event.setOldValue(oldValueInVM);
} finally {
OffHeapHelper.releaseWithNoTracking(oldValueInVM);
}
}
private void setOldValueEvenIfFaultedOut() {
final EntryEventImpl event = getEvent();
ReferenceCountHelper.skipRefCountTracking();
@Released
Object oldValueInVMOrDisk =
getRegionEntry().getValueOffHeapOrDiskWithoutFaultIn(event.getRegion());
ReferenceCountHelper.unskipRefCountTracking();
try {
event.setOldValue(oldValueInVMOrDisk, true);
} finally {
OffHeapHelper.releaseWithNoTracking(oldValueInVMOrDisk);
}
}
@Override
protected void unsetOldValueForDelta() {
OffHeapHelper.release(getOldValueForDelta());
setOldValueForDelta(null);
if (isOverwritePutIfAbsent()) {
getEvent().setOldValue(null);
}
}
@Override
protected void invokeCacheWriter() {
final EntryEventImpl event = getEvent();
if (getOwner().isInitialized() && isCacheWrite()) {
if (!isReplaceOnClient()) {
if (getRegionEntry().isDestroyedOrRemoved()) {
event.makeCreate();
} else {
event.makeUpdate();
}
}
getOwner().cacheWriteBeforePut(event, getNetWriteRecipients(), getCacheWriter(),
isRequireOldValue(), getExpectedOldValue());
}
if (!getOwner().isInitialized() && !isCacheWrite()) {
event.oldValueNotAvailable();
}
}
@Override
protected void createOrUpdateEntry() {
try {
if (isUpdate()) {
updateEntry();
} else {
createEntry();
}
} catch (RegionClearedException rce) {
setClearOccurred(true);
} catch (ConcurrentCacheModificationException ccme) {
final EntryEventImpl event = getEvent();
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
getOwner().notifyTimestampsToGateways(event);
}
throw ccme;
}
}
@Override
protected void doBeforeCompletionActions() {
final EntryEventImpl event = getEvent();
getOwner().recordEvent(event);
if (!isOwnerInitialized()) {
event.inhibitCacheListenerNotification(true);
}
updateLru();
final RegionEntry re = getRegionEntry();
long lastModTime = getOwner().basicPutPart2(event, re, isOwnerInitialized(),
getLastModifiedTime(), isClearOccurred());
setLastModifiedTime(lastModTime);
}
private void updateLru() {
if (!isClearOccurred()) {
if (getEvent().getOperation().isCreate()) {
getRegionMap().lruEntryCreate(getRegionEntry());
} else {
getRegionMap().lruEntryUpdate(getRegionEntry());
}
}
}
@Override
protected boolean shouldCreatedEntryBeRemoved() {
return getRegionEntry().getValueAsToken() == Token.REMOVED_PHASE1;
}
@Override
protected void doAfterCompletionActions(boolean disabledEviction) {
try {
if (isCompleted()) {
final boolean invokeListeners = getEvent().basicGetNewValue() != Token.TOMBSTONE;
getOwner().basicPutPart3(getEvent(), getRegionEntry(), isOwnerInitialized(),
getLastModifiedTime(), invokeListeners, isIfNew(), isIfOld(), getExpectedOldValue(),
isRequireOldValue());
}
} finally {
finishEviction(disabledEviction);
}
}
private void finishEviction(boolean disabledEviction) {
if (disabledEviction) {
getRegionMap().enableLruUpdateCallback();
if (isCompleted()) {
lruUpdateCallbackIfNotCleared();
} else {
getRegionMap().resetThreadLocals();
}
}
}
private void lruUpdateCallbackIfNotCleared() {
if (!isClearOccurred()) {
try {
getRegionMap().lruUpdateCallback();
} catch (DiskAccessException dae) {
getOwner().handleDiskAccessException(dae);
throw dae;
}
}
}
private boolean isUpdate() {
if (isCacheWrite() && getEvent().getOperation().isUpdate()) {
// if there is a cacheWriter, type of event has already been set
return true;
}
if (isReplaceOnClient()) {
return true;
}
if (!getRegionEntry().isRemoved()) {
return true;
}
return false;
}
/**
* @return false if preconditions indicate that
* the put should not be done.
*/
@Override
protected boolean checkPreconditions() {
if (!checkUpdatePreconditions()) {
return false;
}
if (!checkUninitializedRegionPreconditions()) {
return false;
}
if (!checkCreatePreconditions()) {
return false;
}
if (!checkExpectedOldValuePrecondition()) {
return false;
}
return true;
}
private boolean checkUpdatePreconditions() {
if (isIfOld()) {
final EntryEventImpl event = getEvent();
final RegionEntry re = getRegionEntry();
// only update, so just do tombstone maintainence and exit
if (re.isTombstone() && event.getVersionTag() != null) {
// refresh the tombstone so it doesn't time out too soon
getRegionMap().processVersionTag(re, event);
try {
re.setValue(getOwner(), Token.TOMBSTONE);
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
}
getOwner().rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
return false;
}
if (re.isRemoved() && !isReplaceOnClient()) {
return false;
}
}
return true;
}
private boolean checkUninitializedRegionPreconditions() {
if (!getOwner().isInitialized()) {
if (!isOverwriteDestroyed()) {
Token oldValueInVM = getRegionEntry().getValueAsToken();
if (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE) {
getEvent().setOldValueDestroyedToken();
return false;
}
}
}
return true;
}
private boolean checkCreatePreconditions() {
if (isIfNew()) {
if (!getRegionEntry().isDestroyedOrRemoved()) {
// retain the version stamp of the existing entry for use in processing failures
EntryEventImpl event = getEvent();
if (getOwner().getConcurrencyChecksEnabled() &&
event.getOperation() == Operation.PUT_IF_ABSENT &&
!event.hasValidVersionTag() &&
event.isPossibleDuplicate()) {
Object retainedValue = getRegionEntry().getValueRetain(getOwner());
try {
if (ValueComparisonHelper.checkEquals(retainedValue,
getEvent().getRawNewValue(),
isCompressedOffHeap(event), getOwner().getCache())) {
if (logger.isDebugEnabled()) {
logger.debug("retried putIfAbsent found same value already in cache "
+ "- allowing the operation. entry={}; event={}", getRegionEntry(),
getEvent());
}
this.overwritePutIfAbsent = true;
return true;
}
} finally {
OffHeapHelper.release(retainedValue);
}
}
return false;
}
}
return true;
}
private boolean isCompressedOffHeap(EntryEventImpl event) {
return event.getRegion().getAttributes().getOffHeap()
&& event.getRegion().getAttributes().getCompressor() != null;
}
private boolean checkExpectedOldValuePrecondition() {
// replace is propagated to server, so no need to check
// satisfiesOldValue on client
final EntryEventImpl event = getEvent();
if (getExpectedOldValue() != null && !isReplaceOnClient()) {
assert event.getOperation().guaranteesOldValue();
// We already called setOldValueInEvent so the event will have the old value.
@Unretained
Object v = event.getRawOldValue();
// Note that v will be null instead of INVALID because setOldValue`
// converts INVALID to null.
// But checkExpectedOldValue handle this and says INVALID equals null.
if (!AbstractRegionEntry.checkExpectedOldValue(getExpectedOldValue(), v, event.getRegion())) {
return false;
}
}
return true;
}
private void createEntry() throws RegionClearedException {
final EntryEventImpl event = getEvent();
final RegionEntry re = getRegionEntry();
final boolean wasTombstone = re.isTombstone();
getRegionMap().processVersionTag(re, event);
event.putNewEntry(getOwner(), re);
updateSize(0, false, wasTombstone);
if (!event.getRegion().isInitialized()) {
getOwner().getImageState().removeDestroyedEntry(event.getKey());
}
}
private void updateEntry() throws RegionClearedException {
final EntryEventImpl event = getEvent();
final RegionEntry re = getRegionEntry();
final boolean wasTombstone = re.isTombstone();
final int oldSize = event.getRegion().calculateRegionEntryValueSize(re);
getRegionMap().processVersionTag(re, event);
event.putExistingEntry(event.getRegion(), re, isRequireOldValue(), getOldValueForDelta());
EntryLogger.logPut(event);
updateSize(oldSize, true/* isUpdate */, wasTombstone);
}
private void updateSize(int oldSize, boolean isUpdate, boolean wasTombstone) {
final EntryEventImpl event = getEvent();
final Object key = event.getKey();
final int newBucketSize = event.getNewValueBucketSize();
if (isUpdate && !wasTombstone) {
getOwner().updateSizeOnPut(key, oldSize, newBucketSize);
} else {
getOwner().updateSizeOnCreate(key, newBucketSize);
if (!wasTombstone) {
getOwner().getCachePerfStats().incEntryCount(1);
}
}
}
}