blob: c5048133911f1a7cb47cffb2f4f0ec29efabafac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tx;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.EntryExistsException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.ReplySender;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.AbstractRegion;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.DataLocationException;
import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RemoteOperationException;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.DiskVersionTag;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* A class that specifies a destroy operation. Used by ReplicateRegions. Note: The reason for
* different classes for Destroy and Invalidate is to prevent sending an extra bit for every
* RemoteDestroyMessage to differentiate an invalidate versus a destroy. The assumption is that
* these operations are used frequently, if they are not then it makes sense to fold the destroy and
* the invalidate into the same message and use an extra bit to differentiate.
*
* This message is used by transactions to destroy an entry on a transaction hosted on a remote
* member. It is also used by non-transactional region destroys that need to generate a VersionTag
* on a remote member.
*
* @since GemFire 6.5
*
*/
public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
implements OldValueImporter {
private static final Logger logger = LogService.getLogger();
/** The key associated with the value that must be sent */
private Object key;
/** The callback arg */
private Object cbArg;
/** The operation performed on the sender */
private Operation op;
/**
* An additional object providing context for the operation, e.g., for BridgeServer notification
*/
ClientProxyMembershipID bridgeContext;
/** event identifier */
EventID eventId;
/** for relayed messages, this is the original sender of the message */
InternalDistributedMember originalSender;
/** whether the message has old value */
private boolean hasOldValue = false;
/** whether old value is serialized */
private boolean oldValueIsSerialized = false;
private Object expectedOldValue;
private byte[] oldValBytes;
@Unretained(ENTRY_EVENT_OLD_VALUE)
private transient Object oldValObj;
boolean useOriginRemote;
protected boolean possibleDuplicate;
VersionTag<?> versionTag;
// additional bitmask flags used for serialization/deserialization
protected static final short USE_ORIGIN_REMOTE = UNRESERVED_FLAGS_START;
protected static final short HAS_OLD_VALUE = (USE_ORIGIN_REMOTE << 1);
protected static final short CACHE_WRITE = (HAS_OLD_VALUE << 1);
protected static final short HAS_BRIDGE_CONTEXT = (CACHE_WRITE << 1);
protected static final short HAS_ORIGINAL_SENDER = (HAS_BRIDGE_CONTEXT << 1);
protected static final int HAS_VERSION_TAG = (HAS_ORIGINAL_SENDER << 1);
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
*/
public RemoteDestroyMessage() {}
protected RemoteDestroyMessage(DistributedMember recipient, String regionPath,
DirectReplyProcessor processor, EntryEventImpl event, Object expectedOldValue,
boolean useOriginRemote, boolean possibleDuplicate) {
super((InternalDistributedMember) recipient, regionPath, processor);
this.expectedOldValue = expectedOldValue;
this.key = event.getKey();
this.cbArg = event.getRawCallbackArgument();
this.op = event.getOperation();
this.bridgeContext = event.getContext();
this.eventId = event.getEventId();
this.useOriginRemote = useOriginRemote;
this.possibleDuplicate = possibleDuplicate;
this.versionTag = event.getVersionTag();
Assert.assertTrue(this.eventId != null);
// added for old value if available sent over the wire for cache servers.
if (event.hasOldValue()) {
this.hasOldValue = true;
event.exportOldValue(this);
}
}
private void setOldValBytes(byte[] valBytes) {
this.oldValBytes = valBytes;
}
private void setOldValObj(@Unretained(ENTRY_EVENT_OLD_VALUE) Object o) {
this.oldValObj = o;
}
public byte[] getOldValueBytes() {
return this.oldValBytes;
}
private Object getOldValObj() {
return this.oldValObj;
}
protected boolean getHasOldValue() {
return this.hasOldValue;
}
protected boolean getOldValueIsSerialized() {
return this.oldValueIsSerialized;
}
/**
* Set the old value for this message, only used if there are cqs registered on one of the bridge
* servers.
*
* @param event underlying event.
* @since GemFire 5.5
*/
public void setOldValue(EntryEventImpl event) {
if (event.hasOldValue()) {
this.hasOldValue = true;
CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue();
if (cd != null) {
if (!cd.isSerialized()) {
// it is a byte[]
this.oldValueIsSerialized = false;
setOldValBytes((byte[]) cd.getDeserializedForReading());
} else {
this.oldValueIsSerialized = true;
Object o = cd.getValue();
if (o instanceof byte[]) {
setOldValBytes((byte[]) o);
} else {
// Defer serialization until toData is called.
setOldValObj(o);
}
}
} else {
Object old = event.getRawOldValue();
if (old instanceof byte[]) {
this.oldValueIsSerialized = false;
setOldValBytes((byte[]) old);
} else {
this.oldValueIsSerialized = true;
setOldValObj(AbstractRegion.handleNotAvailable(old));
}
}
}
}
@SuppressWarnings("unchecked")
public static boolean distribute(EntryEventImpl event, Object expectedOldValue,
boolean onlyPersistent) {
boolean successful = false;
DistributedRegion r = (DistributedRegion) event.getRegion();
Collection<InternalDistributedMember> replicates = onlyPersistent
? r.getCacheDistributionAdvisor().adviseInitializedPersistentMembers().keySet()
: r.getCacheDistributionAdvisor().adviseInitializedReplicates();
if (replicates.isEmpty()) {
return false;
}
if (replicates.size() > 1) {
ArrayList<InternalDistributedMember> l = new ArrayList<>(replicates);
Collections.shuffle(l);
replicates = l;
}
int attempts = 0;
for (InternalDistributedMember replicate : replicates) {
try {
attempts++;
final boolean posDup = (attempts > 1);
RemoteDestroyReplyProcessor processor =
send(replicate, event.getRegion(), event, expectedOldValue, false, posDup);
processor.waitForRemoteResponse();
VersionTag<?> versionTag = processor.getVersionTag();
if (versionTag != null) {
event.setVersionTag(versionTag);
if (event.getRegion().getVersionVector() != null) {
event.getRegion().getVersionVector().recordVersion(versionTag.getMemberID(),
versionTag);
}
}
event.setInhibitDistribution(true);
return true;
} catch (EntryNotFoundException e) {
throw new EntryNotFoundException("" + event.getKey());
} catch (TransactionDataNotColocatedException enfe) {
throw enfe;
} catch (CancelException e) {
event.getRegion().getCancelCriterion().checkCancelInProgress(e);
} catch (CacheException e) {
if (logger.isDebugEnabled()) {
logger.debug("RemoteDestroyMessage caught CacheException during distribution", e);
}
successful = true; // not a cancel-exception, so don't complain any more about it
} catch (RegionDestroyedException | RemoteOperationException e) {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE,
"RemoteDestroyMessage caught an exception during distribution; retrying to another member",
e);
}
}
}
return successful;
}
/**
* Sends a RemoteDestroyMessage {@link org.apache.geode.cache.Region#destroy(Object)}message to
* the recipient
*
* @param recipient the recipient of the message
* @param r the ReplicateRegion for which the destroy was performed
* @param event the event causing this message
* @return the processor used to await the potential {@link org.apache.geode.cache.CacheException}
*/
public static RemoteDestroyReplyProcessor send(DistributedMember recipient, InternalRegion r,
EntryEventImpl event, Object expectedOldValue, boolean useOriginRemote,
boolean possibleDuplicate) throws RemoteOperationException {
RemoteDestroyReplyProcessor p =
new RemoteDestroyReplyProcessor(r.getSystem(), recipient, false);
p.requireResponse();
RemoteDestroyMessage m = new RemoteDestroyMessage(recipient, r.getFullPath(), p, event,
expectedOldValue, useOriginRemote, possibleDuplicate);
Set<?> failures = r.getDistributionManager().putOutgoing(m);
if (failures != null && failures.size() > 0) {
throw new RemoteOperationException(
String.format("Failed sending < %s >", m));
}
return p;
}
/**
* This method is called upon receipt and make the desired changes to the region. Note: It is very
* important that this message does NOT cause any deadlocks as the sender will wait indefinitely
* for the acknowledgement
*/
@Override
protected boolean operateOnRegion(ClusterDistributionManager dm, LocalRegion r, long startTime)
throws EntryExistsException, RemoteOperationException {
InternalDistributedMember eventSender = originalSender;
if (eventSender == null) {
eventSender = getSender();
}
@Released
EntryEventImpl event = null;
try {
if (this.bridgeContext != null) {
event = EntryEventImpl.create(r, getOperation(), getKey(), null/* newValue */,
getCallbackArg(), false/* originRemote */, eventSender, true/* generateCallbacks */);
event.setContext(this.bridgeContext);
// for cq processing and client notification by BS.
if (this.hasOldValue) {
if (this.oldValueIsSerialized) {
event.setSerializedOldValue(getOldValueBytes());
} else {
event.setOldValue(getOldValueBytes());
}
}
} // bridgeContext != null
else {
event = EntryEventImpl.create(r, getOperation(), getKey(), null, /* newValue */
getCallbackArg(), this.useOriginRemote, eventSender, true/* generateCallbacks */,
false/* initializeId */);
}
event.setCausedByMessage(this);
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
event.setVersionTag(this.versionTag);
}
// for cq processing and client notification by BS.
if (this.hasOldValue) {
if (this.oldValueIsSerialized) {
event.setSerializedOldValue(getOldValueBytes());
} else {
event.setOldValue(getOldValueBytes());
}
}
Assert.assertTrue(eventId != null);
event.setEventId(eventId);
event.setPossibleDuplicate(this.possibleDuplicate);
try {
r.getDataView().destroyOnRemote(event, true, this.expectedOldValue);
sendReply(dm, event.getVersionTag());
} catch (CacheWriterException cwe) {
sendReply(getSender(), this.processorId, dm, new ReplyException(cwe), r, startTime);
return false;
} catch (EntryNotFoundException eee) {
if (logger.isDebugEnabled()) {
logger.debug("operateOnRegion caught EntryNotFoundException", eee);
}
ReplyMessage.send(getSender(), getProcessorId(), new ReplyException(eee),
getReplySender(dm), r.isInternalRegion());
} catch (DataLocationException e) {
if (logger.isDebugEnabled()) {
logger.debug("operateOnRegion caught DataLocationException");
}
ReplyMessage.send(getSender(), getProcessorId(), new ReplyException(e), getReplySender(dm),
r.isInternalRegion());
}
return false;
} finally {
if (event != null) {
event.release();
}
}
}
@Override
public int getDSFID() {
return R_DESTROY_MESSAGE;
}
private void sendReply(DistributionManager dm, VersionTag<?> versionTag) {
DestroyReplyMessage.send(this.getSender(), getReplySender(dm), this.processorId, versionTag);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
setKey(DataSerializer.readObject(in));
this.cbArg = DataSerializer.readObject(in);
this.op = Operation.fromOrdinal(in.readByte());
if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
this.bridgeContext = DataSerializer.readObject(in);
}
if ((flags & HAS_ORIGINAL_SENDER) != 0) {
this.originalSender = DataSerializer.readObject(in);
}
this.eventId = DataSerializer.readObject(in);
// for old values for CQs
if (this.hasOldValue) {
// out.writeBoolean(this.hasOldValue);
// below boolean is not strictly required, but this is for compatibility
in.readByte();
setOldValBytes(DataSerializer.readByteArray(in));
}
this.expectedOldValue = DataSerializer.readObject(in);
// to prevent bug 51024 always call readObject for versionTag
// since toData always calls writeObject for versionTag.
this.versionTag = DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeObject(getKey(), out);
DataSerializer.writeObject(this.cbArg, out);
out.writeByte(this.op.ordinal);
if (this.bridgeContext != null) {
DataSerializer.writeObject(this.bridgeContext, out);
}
if (this.originalSender != null) {
DataSerializer.writeObject(this.originalSender, out);
}
DataSerializer.writeObject(this.eventId, out);
// this will be on wire for cqs old value generations.
if (this.hasOldValue) {
out.writeByte(this.oldValueIsSerialized ? 1 : 0);
byte policy = DistributedCacheOperation.valueIsToDeserializationPolicy(oldValueIsSerialized);
DistributedCacheOperation.writeValue(policy, getOldValObj(), getOldValueBytes(), out);
}
DataSerializer.writeObject(this.expectedOldValue, out);
DataSerializer.writeObject(this.versionTag, out);
}
@Override
protected void setFlags(short flags, DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.setFlags(flags, in, context);
this.hasOldValue = (flags & HAS_OLD_VALUE) != 0;
this.useOriginRemote = (flags & USE_ORIGIN_REMOTE) != 0;
this.possibleDuplicate = (flags & POS_DUP) != 0;
}
@Override
protected short computeCompressedShort() {
short s = super.computeCompressedShort();
// this will be on wire for cqs old value generations.
if (this.hasOldValue)
s |= HAS_OLD_VALUE;
if (this.useOriginRemote)
s |= USE_ORIGIN_REMOTE;
if (this.possibleDuplicate)
s |= POS_DUP;
if (this.bridgeContext != null)
s |= HAS_BRIDGE_CONTEXT;
if (this.originalSender != null)
s |= HAS_ORIGINAL_SENDER;
if (this.versionTag != null)
s |= HAS_VERSION_TAG;
return s;
}
@Override
public EventID getEventID() {
return this.eventId;
}
/**
* Assists the toString method in reporting the contents of this message
*
*/
@Override
protected void appendFields(StringBuffer buff) {
super.appendFields(buff);
buff.append("; key=").append(getKey());
if (originalSender != null) {
buff.append("; originalSender=").append(originalSender);
}
if (bridgeContext != null) {
buff.append("; bridgeContext=").append(bridgeContext);
}
if (eventId != null) {
buff.append("; eventId=").append(eventId);
}
buff.append("; hasOldValue= ").append(this.hasOldValue);
}
protected Object getKey() {
return this.key;
}
private void setKey(Object key) {
this.key = key;
}
public Operation getOperation() {
return this.op;
}
protected Object getCallbackArg() {
return this.cbArg;
}
@Override
public boolean prefersOldSerialized() {
return true;
}
@Override
public boolean isUnretainedOldReferenceOk() {
return true;
}
@Override
public boolean isCachedDeserializableValueOk() {
return false;
}
private void setOldValueIsSerialized(boolean isSerialized) {
if (isSerialized) {
// Defer serialization until toData is called.
this.oldValueIsSerialized = true; // VALUE_IS_SERIALIZED_OBJECT;
} else {
this.oldValueIsSerialized = false; // VALUE_IS_BYTES;
}
}
@Override
public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized) {
setOldValueIsSerialized(isSerialized);
// Defer serialization until toData is called.
setOldValObj(ov);
}
@Override
public void importOldBytes(byte[] ov, boolean isSerialized) {
setOldValueIsSerialized(isSerialized);
setOldValBytes(ov);
}
public static class DestroyReplyMessage extends ReplyMessage {
private static final byte HAS_VERSION = 0x01;
private static final byte PERSISTENT = 0x02;
private VersionTag<?> versionTag;
/** DSFIDFactory constructor */
public DestroyReplyMessage() {}
static void send(InternalDistributedMember recipient, ReplySender dm, int procId,
VersionTag<?> versionTag) {
Assert.assertTrue(recipient != null, "DestroyReplyMessage NULL recipient");
DestroyReplyMessage m = new DestroyReplyMessage(recipient, procId, versionTag);
dm.putOutgoing(m);
}
DestroyReplyMessage(InternalDistributedMember recipient, int procId, VersionTag<?> versionTag) {
this.setProcessorId(procId);
this.setRecipient(recipient);
this.versionTag = versionTag;
}
@Override
public boolean getInlineProcess() {
return true;
}
@Override
public int getDSFID() {
return R_DESTROY_REPLY_MESSAGE;
}
@Override
public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
final long startTime = getTimestamp();
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE,
"DestroyReplyMessage process invoking reply processor with processorId:{}",
this.processorId);
}
if (rp == null) {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "DestroyReplyMessage processor not found");
}
return;
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
}
if (rp instanceof RemoteDestroyReplyProcessor) {
RemoteDestroyReplyProcessor processor = (RemoteDestroyReplyProcessor) rp;
processor.setResponse(this.versionTag);
}
rp.process(this);
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", rp, this);
}
dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
byte b = 0;
if (this.versionTag != null) {
b |= HAS_VERSION;
}
if (this.versionTag instanceof DiskVersionTag) {
b |= PERSISTENT;
}
out.writeByte(b);
if (this.versionTag != null) {
InternalDataSerializer.invokeToData(this.versionTag, out);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
byte b = in.readByte();
boolean hasTag = (b & HAS_VERSION) != 0;
boolean persistentTag = (b & PERSISTENT) != 0;
if (hasTag) {
this.versionTag = VersionTag.create(persistentTag, in);
}
}
@Override
public String toString() {
StringBuilder sb = super.getStringBuilder();
sb.append(getShortClassName());
sb.append(" processorId=");
sb.append(this.processorId);
if (this.versionTag != null) {
sb.append(" version=").append(this.versionTag);
}
sb.append(" from ");
sb.append(this.getSender());
ReplyException ex = getException();
if (ex != null) {
sb.append(" with exception ");
sb.append(ex);
}
return sb.toString();
}
}
static class RemoteDestroyReplyProcessor extends RemoteOperationResponse {
VersionTag<?> versionTag;
RemoteDestroyReplyProcessor(InternalDistributedSystem ds, DistributedMember recipient,
Object key) {
super(ds, (InternalDistributedMember) recipient, false);
}
void setResponse(VersionTag<?> versionTag) {
this.versionTag = versionTag;
}
VersionTag<?> getVersionTag() {
return this.versionTag;
}
}
}