blob: 23d0875e9e0167f1f608aa182849559740976aae [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.ReplySender;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.NanoTimer;
import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT;
import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_OBJECT;
/**
* A class that specifies a destroy operation.
* Used by ReplicateRegions.
* Note: The reason for different classes for Destroy and Invalidate is to
* prevent sending an extra bit for every RemoteDestroyMessage to differentiate an
* invalidate versus a destroy. The assumption is that these operations are used
* frequently, if they are not then it makes sense to fold the destroy and the
* invalidate into the same message and use an extra bit to differentiate
*
* @since 6.5
*
*/
public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply implements OldValueImporter {
private static final Logger logger = LogService.getLogger();
private static final short FLAG_USEORIGINREMOTE = 0x01;
private static final short FLAG_HASOLDVALUE = 0x02;
private static final short FLAG_OLDVALUEISSERIALIZED = 0x04;
private static final short FLAG_POSSIBLEDUPLICATE = 0x08;
/** The key associated with the value that must be sent */
private Object key;
/** The callback arg */
private Object cbArg;
/** The operation performed on the sender */
private Operation op;
/** An additional object providing context for the operation, e.g., for BridgeServer notification */
ClientProxyMembershipID bridgeContext;
/** event identifier */
EventID eventId;
/** for relayed messages, this is the original sender of the message */
InternalDistributedMember originalSender;
/**whether the message has old value */
private boolean hasOldValue = false;
/**whether old value is serialized*/
private boolean oldValueIsSerialized = false;
/** expectedOldValue used for PartitionedRegion#remove(key, value) */
private Object expectedOldValue; // TODO OFFHEAP make it a cd
private byte[] oldValBytes;
@Unretained(ENTRY_EVENT_OLD_VALUE)
private transient Object oldValObj;
boolean useOriginRemote;
protected boolean possibleDuplicate;
VersionTag<?> versionTag;
// additional bitmask flags used for serialization/deserialization
protected static final short USE_ORIGIN_REMOTE = UNRESERVED_FLAGS_START;
protected static final short HAS_OLD_VALUE = (USE_ORIGIN_REMOTE << 1);
protected static final short CACHE_WRITE = (HAS_OLD_VALUE << 1);
protected static final short HAS_BRIDGE_CONTEXT = (CACHE_WRITE << 1);
protected static final short HAS_ORIGINAL_SENDER = (HAS_BRIDGE_CONTEXT << 1);
protected static final int HAS_VERSION_TAG = (HAS_ORIGINAL_SENDER << 1);
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
*/
public RemoteDestroyMessage() {
}
protected RemoteDestroyMessage(Set recipients,
String regionPath,
DirectReplyProcessor processor,
EntryEventImpl event,
Object expectedOldValue, int processorType,
boolean useOriginRemote,
boolean possibleDuplicate) {
super(recipients, regionPath, processor);
this.expectedOldValue = expectedOldValue;
this.key = event.getKey();
this.cbArg = event.getRawCallbackArgument();
this.op = event.getOperation();
this.bridgeContext = event.getContext();
this.eventId = event.getEventId();
this.processorType = processorType;
this.useOriginRemote = useOriginRemote;
this.possibleDuplicate = possibleDuplicate;
this.versionTag = event.getVersionTag();
Assert.assertTrue(this.eventId != null);
// added for old value if available sent over the wire for bridge servers.
if (event.hasOldValue()) {
this.hasOldValue = true;
event.exportOldValue(this);
}
}
@Override
public boolean isSevereAlertCompatible() {
// allow forced-disconnect processing for all cache op messages
return true;
}
private void setOldValBytes(byte[] valBytes){
this.oldValBytes = valBytes;
}
private final void setOldValObj(@Unretained(ENTRY_EVENT_OLD_VALUE) Object o) {
this.oldValObj = o;
}
public final byte[] getOldValueBytes() {
return this.oldValBytes;
}
private Object getOldValObj(){
return this.oldValObj;
}
protected boolean getHasOldValue(){
return this.hasOldValue;
}
protected boolean getOldValueIsSerialized() {
return this.oldValueIsSerialized;
}
/**
* Set the old value for this message, only used if there are cqs registered
* on one of the bridge servers.
*
* @param event underlying event.
* @since 5.5
*/
public void setOldValue(EntryEventImpl event){
if (event.hasOldValue()) {
this.hasOldValue = true;
CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue();
if (cd != null) {
if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
// it is a byte[]
this.oldValueIsSerialized = false;
setOldValBytes((byte[]) ((StoredObject) cd).getDeserializedForReading());
} else {
this.oldValueIsSerialized = true;
Object o = cd.getValue();
if (o instanceof byte[]) {
setOldValBytes((byte[])o);
} else {
// Defer serialization until toData is called.
setOldValObj(o);
}
}
} else {
Object old = event.getRawOldValue();
if (old instanceof byte[]) {
this.oldValueIsSerialized = false;
setOldValBytes((byte[]) old);
} else {
this.oldValueIsSerialized = true;
setOldValObj(old);
}
}
}
}
public static boolean distribute(EntryEventImpl event, Object expectedOldValue, boolean onlyPersistent) {
boolean successful = false;
DistributedRegion r = (DistributedRegion)event.getRegion();
Collection replicates = onlyPersistent ? r.getCacheDistributionAdvisor()
.adviseInitializedPersistentMembers().keySet() : r
.getCacheDistributionAdvisor().adviseInitializedReplicates();
if (replicates.isEmpty()) {
return false;
}
if (replicates.size() > 1) {
ArrayList l = new ArrayList(replicates);
Collections.shuffle(l);
replicates = l;
}
int attempts = 0;
for (Iterator<InternalDistributedMember> it=replicates.iterator(); it.hasNext(); ) {
InternalDistributedMember replicate = it.next();
try {
attempts++;
final boolean posDup = (attempts > 1);
RemoteDestroyReplyProcessor processor = send(replicate, event.getRegion(),
event, expectedOldValue, DistributionManager.SERIAL_EXECUTOR, false,
posDup);
processor.waitForCacheException();
VersionTag versionTag = processor.getVersionTag();
if (versionTag != null) {
event.setVersionTag(versionTag);
if (event.getRegion().getVersionVector() != null) {
event.getRegion().getVersionVector().recordVersion(versionTag.getMemberID(), versionTag);
}
}
event.setInhibitDistribution(true);
return true;
} catch (EntryNotFoundException e) {
throw new EntryNotFoundException(""+event.getKey());
} catch (TransactionDataNotColocatedException enfe) {
throw enfe;
} catch (CancelException e) {
event.getRegion().getCancelCriterion().checkCancelInProgress(e);
} catch (CacheException e) {
if (logger.isDebugEnabled()) {
logger.debug("RemoteDestroyMessage caught CacheException during distribution", e);
}
successful = true; // not a cancel-exception, so don't complain any more about it
} catch(RemoteOperationException e) {
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "RemoteDestroyMessage caught an unexpected exception during distribution", e);
}
}
}
return successful;
}
/**
* Sends a RemoteDestroyMessage
* {@link com.gemstone.gemfire.cache.Region#destroy(Object)}message to the
* recipient
*
* @param recipient the recipient of the message
* @param r
* the ReplicateRegion for which the destroy was performed
* @param event the event causing this message
* @param processorType the type of executor to use in processing the message
* @param useOriginRemote TODO
* @return the processor used to await the potential
* {@link com.gemstone.gemfire.cache.CacheException}
*/
public static RemoteDestroyReplyProcessor send(DistributedMember recipient,
LocalRegion r,
EntryEventImpl event,
Object expectedOldValue, int processorType,
boolean useOriginRemote,
boolean possibleDuplicate)
throws RemoteOperationException {
//Assert.assertTrue(recipient != null, "RemoteDestroyMessage NULL recipient"); recipient may be null for event notification
Set recipients = Collections.singleton(recipient);
RemoteDestroyReplyProcessor p = new RemoteDestroyReplyProcessor(r.getSystem(), recipients, false);
p.requireResponse();
RemoteDestroyMessage m = new RemoteDestroyMessage(recipients,
r.getFullPath(),
p,
event,
expectedOldValue, processorType,
useOriginRemote, possibleDuplicate);
m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
Set failures =r.getDistributionManager().putOutgoing(m);
if (failures != null && failures.size() > 0 ) {
throw new RemoteOperationException(LocalizedStrings.RemoteDestroyMessage_FAILED_SENDING_0.toLocalizedString(m));
}
return p;
}
@Override
public int getProcessorType() {
return this.processorType;
}
/**
* This method is called upon receipt and make the desired changes to the
* PartitionedRegion Note: It is very important that this message does NOT
* cause any deadlocks as the sender will wait indefinitely for the
* acknowledgement
*/
@Override
protected boolean operateOnRegion(DistributionManager dm,
LocalRegion r, long startTime)
throws EntryExistsException, RemoteOperationException
{
InternalDistributedMember eventSender = originalSender;
if (eventSender == null) {
eventSender = getSender();
}
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
}
EntryEventImpl event = null;
try {
if (this.bridgeContext != null) {
event = EntryEventImpl.create(r, getOperation(), getKey(), null/*newValue*/,
getCallbackArg(), false/*originRemote*/, eventSender,
true/*generateCallbacks*/);
event.setContext(this.bridgeContext);
// for cq processing and client notification by BS.
if (this.hasOldValue){
if (this.oldValueIsSerialized){
event.setSerializedOldValue(getOldValueBytes());
}
else{
event.setOldValue(getOldValueBytes());
}
}
} // bridgeContext != null
else {
event = EntryEventImpl.create(
r,
getOperation(),
getKey(),
null, /*newValue*/
getCallbackArg(),
this.useOriginRemote,
eventSender,
true/*generateCallbacks*/,
false/*initializeId*/);
}
event.setCausedByMessage(this);
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
event.setVersionTag(this.versionTag);
}
// for cq processing and client notification by BS.
if (this.hasOldValue){
if (this.oldValueIsSerialized){
event.setSerializedOldValue(getOldValueBytes());
}
else{
event.setOldValue(getOldValueBytes());
}
}
Assert.assertTrue(eventId != null);
event.setEventId(eventId);
event.setPossibleDuplicate(this.possibleDuplicate);
try {
r.getDataView().destroyOnRemote(event, true, this.expectedOldValue);
sendReply(dm, event.getVersionTag());
}
catch (CacheWriterException cwe) {
sendReply(getSender(), this.processorId, dm, new ReplyException(cwe), r, startTime);
return false;
}
catch (EntryNotFoundException eee) {
if (logger.isDebugEnabled()) {
logger.debug("operateOnRegion caught EntryNotFoundException", eee);
}
ReplyMessage.send(getSender(), getProcessorId(),
new ReplyException(eee), getReplySender(dm), r.isInternalRegion());
} catch (DataLocationException e) {
if (logger.isDebugEnabled()) {
logger.debug("operateOnRegion caught DataLocationException");
}
ReplyMessage.send(getSender(), getProcessorId(),
new ReplyException(e), getReplySender(dm), r.isInternalRegion());
}
return false;
} finally {
if (event != null) {
event.release();
}
}
}
public int getDSFID() {
return R_DESTROY_MESSAGE;
}
private void sendReply(DM dm, VersionTag versionTag) {
DestroyReplyMessage.send(this.getSender(), getReplySender(dm), this.processorId, versionTag);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException
{
super.fromData(in);
setKey(DataSerializer.readObject(in));
this.cbArg = DataSerializer.readObject(in);
this.op = Operation.fromOrdinal(in.readByte());
if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
this.bridgeContext = DataSerializer.readObject(in);
}
if ((flags & HAS_ORIGINAL_SENDER) != 0) {
this.originalSender = DataSerializer.readObject(in);
}
this.eventId = DataSerializer.readObject(in);
// for old values for CQs
if (this.hasOldValue){
//out.writeBoolean(this.hasOldValue);
// below boolean is not strictly required, but this is for compatibility
// with SQLFire code which writes as byte here to indicate whether
// oldValue is an object, serialized object or byte[]
in.readByte();
setOldValBytes(DataSerializer.readByteArray(in));
}
this.expectedOldValue = DataSerializer.readObject(in);
// to prevent bug 51024 always call readObject for versionTag
// since toData always calls writeObject for versionTag.
this.versionTag = DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out) throws IOException
{
super.toData(out);
DataSerializer.writeObject(getKey(), out);
DataSerializer.writeObject(this.cbArg, out);
out.writeByte(this.op.ordinal);
if (this.bridgeContext != null) {
DataSerializer.writeObject(this.bridgeContext, out);
}
if (this.originalSender != null) {
DataSerializer.writeObject(this.originalSender, out);
}
DataSerializer.writeObject(this.eventId, out);
// this will be on wire for cqs old value generations.
if (this.hasOldValue){
out.writeByte(this.oldValueIsSerialized ? 1 : 0);
byte policy = DistributedCacheOperation.valueIsToDeserializationPolicy(oldValueIsSerialized);
DistributedCacheOperation.writeValue(policy, getOldValObj(), getOldValueBytes(), out);
}
DataSerializer.writeObject(this.expectedOldValue, out);
DataSerializer.writeObject(this.versionTag, out);
}
@Override
protected void setFlags(short flags, DataInput in) throws IOException,
ClassNotFoundException {
super.setFlags(flags, in);
this.hasOldValue = (flags & HAS_OLD_VALUE) != 0;
this.useOriginRemote = (flags & USE_ORIGIN_REMOTE) != 0;
this.possibleDuplicate = (flags & POS_DUP) != 0;
}
@Override
protected short computeCompressedShort() {
short s = super.computeCompressedShort();
// this will be on wire for cqs old value generations.
if (this.hasOldValue)
s |= HAS_OLD_VALUE;
if (this.useOriginRemote)
s |= USE_ORIGIN_REMOTE;
if (this.possibleDuplicate)
s |= POS_DUP;
if (this.bridgeContext != null)
s |= HAS_BRIDGE_CONTEXT;
if (this.originalSender != null)
s |= HAS_ORIGINAL_SENDER;
if (this.versionTag != null)
s |= HAS_VERSION_TAG;
return s;
}
@Override
public EventID getEventID() {
return this.eventId;
}
/**
* Assists the toString method in reporting the contents of this message
*
*/
@Override
protected void appendFields(StringBuffer buff)
{
super.appendFields(buff);
buff.append("; key=").append(getKey());
if (originalSender != null) {
buff.append("; originalSender=").append(originalSender);
}
if (bridgeContext != null) {
buff.append("; bridgeContext=").append(bridgeContext);
}
if (eventId != null) {
buff.append("; eventId=").append(eventId);
}
buff.append("; hasOldValue= ").append(this.hasOldValue);
}
protected final Object getKey()
{
return this.key;
}
private final void setKey(Object key)
{
this.key = key;
}
public final Operation getOperation()
{
return this.op;
}
protected final Object getCallbackArg()
{
return this.cbArg;
}
@Override
public boolean prefersOldSerialized() {
return true;
}
@Override
public boolean isUnretainedOldReferenceOk() {
return true;
}
@Override
public boolean isCachedDeserializableValueOk() {
return false;
}
private void setOldValueIsSerialized(boolean isSerialized) {
if (isSerialized) {
if (CachedDeserializableFactory.preferObject()) {
this.oldValueIsSerialized = true; //VALUE_IS_OBJECT;
} else {
// Defer serialization until toData is called.
this.oldValueIsSerialized = true; //VALUE_IS_SERIALIZED_OBJECT;
}
} else {
this.oldValueIsSerialized = false; //VALUE_IS_BYTES;
}
}
@Override
public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized) {
setOldValueIsSerialized(isSerialized);
// Defer serialization until toData is called.
setOldValObj(ov);
}
@Override
public void importOldBytes(byte[] ov, boolean isSerialized) {
setOldValueIsSerialized(isSerialized);
setOldValBytes(ov);
}
public static class DestroyReplyMessage extends ReplyMessage {
private static final byte HAS_VERSION = 0x01;
private static final byte PERSISTENT = 0x02;
private VersionTag versionTag;
/** DSFIDFactory constructor */
public DestroyReplyMessage() {
}
static void send(InternalDistributedMember recipient, ReplySender dm, int procId, VersionTag versionTag) {
Assert.assertTrue(recipient != null, "DestroyReplyMessage NULL recipient");
DestroyReplyMessage m = new DestroyReplyMessage(recipient, procId, versionTag);
dm.putOutgoing(m);
}
DestroyReplyMessage(InternalDistributedMember recipient, int procId, VersionTag versionTag) {
this.setProcessorId(procId);
this.setRecipient(recipient);
this.versionTag = versionTag;
}
@Override
public boolean getInlineProcess() {
// TODO Auto-generated method stub
return true;
}
@Override
public int getDSFID() {
return R_DESTROY_REPLY_MESSAGE;
}
@Override
public void process(final DM dm, final ReplyProcessor21 rp) {
final long startTime = getTimestamp();
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "DestroyReplyMessage process invoking reply processor with processorId:{}", this.processorId);
}
if (rp == null) {
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "DestroyReplyMessage processor not found");
}
return;
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
}
if (rp instanceof RemoteDestroyReplyProcessor) {
RemoteDestroyReplyProcessor processor = (RemoteDestroyReplyProcessor)rp;
processor.setResponse(this.versionTag);
}
rp.process(this);
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "{} processed {}", rp, this);
}
dm.getStats().incReplyMessageTime(NanoTimer.getTime()-startTime);
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
byte b = 0;
if(this.versionTag != null) {
b |= HAS_VERSION;
}
if(this.versionTag instanceof DiskVersionTag) {
b |= PERSISTENT;
}
out.writeByte(b);
if (this.versionTag != null) {
InternalDataSerializer.invokeToData(this.versionTag, out);
}
}
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
super.fromData(in);
byte b = in.readByte();
boolean hasTag = (b & HAS_VERSION) != 0;
boolean persistentTag = (b & PERSISTENT) != 0;
if (hasTag) {
this.versionTag = VersionTag.create(persistentTag, in);
}
}
@Override
public String toString() {
StringBuilder sb = super.getStringBuilder();
sb.append(getShortClassName());
sb.append(" processorId=");
sb.append(this.processorId);
if (this.versionTag != null) {
sb.append(" version=").append(this.versionTag);
}
sb.append(" from ");
sb.append(this.getSender());
ReplyException ex = getException();
if (ex != null) {
sb.append(" with exception ");
sb.append(ex);
}
return sb.toString();
}
}
static class RemoteDestroyReplyProcessor extends RemoteOperationResponse {
VersionTag versionTag;
RemoteDestroyReplyProcessor(InternalDistributedSystem ds, Set recipients, Object key) {
super(ds, recipients, false);
}
void setResponse(VersionTag versionTag) {
this.versionTag = versionTag;
}
VersionTag getVersionTag() {
return this.versionTag;
}
}
}