blob: 1276a52b7c927341f6f16a5a7d46b67098d96eb7 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.ReplySender;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.NanoTimer;
import com.gemstone.gemfire.internal.cache.DataLocationException;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.FilterRoutingInfo;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
/**
* A class that specifies a destroy operation.
*
* Note: The reason for different classes for Destroy and Invalidate is to
* prevent sending an extra bit for every DestroyMessage to differentiate an
* invalidate versus a destroy. The assumption is that these operations are used
* frequently, if they are not then it makes sense to fold the destroy and the
* invalidate into the same message and use an extra bit to differentiate
*
* @author mthomas
* @author bruce
* @since 5.0
*
*/
public class DestroyMessage extends PartitionMessageWithDirectReply {
private static final Logger logger = LogService.getLogger();
/** The key associated with the value that must be sent */
private Object key;
/** The callback arg */
private Object cbArg;
/** The operation performed on the sender */
private Operation op;
/** An additional object providing context for the operation, e.g., for BridgeServer notification */
ClientProxyMembershipID bridgeContext;
/** event identifier */
EventID eventId;
/** for relayed messages, this is the original sender of the message */
InternalDistributedMember originalSender;
/** expectedOldValue used for PartitionedRegion#remove(key, value) */
private Object expectedOldValue; // TODO OFFHEAP make it a cd
/** client routing information for notificationOnly=true messages */
protected FilterRoutingInfo filterInfo;
protected VersionTag versionTag;
private static final byte HAS_VERSION_TAG = 0x01;
private static final byte PERSISTENT_TAG = 0x02;
// additional bitmask flags used for serialization/deserialization
protected static final short CACHE_WRITE = UNRESERVED_FLAGS_START;
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
*/
public DestroyMessage() {
}
protected DestroyMessage(Set recipients,
boolean notifyOnly,
int regionId,
DirectReplyProcessor processor,
EntryEventImpl event,
Object expectedOldValue) {
super(recipients, regionId, processor, event);
this.expectedOldValue = expectedOldValue;
this.key = event.getKey();
this.cbArg = event.getRawCallbackArgument();
this.op = event.getOperation();
this.notificationOnly = notifyOnly;
this.bridgeContext = event.getContext();
this.eventId = event.getEventId();
this.versionTag = event.getVersionTag();
}
/** a cloning constructor for relaying the message to listeners */
DestroyMessage(DestroyMessage original, EntryEventImpl event, Set members) {
this(original);
if (event != null) {
this.posDup = event.isPossibleDuplicate();
this.versionTag = event.getVersionTag();
}
}
/** a cloning constructor for relaying the message to listeners */
DestroyMessage(DestroyMessage original) {
this.expectedOldValue = original.expectedOldValue;
this.regionId = original.regionId;
this.processorId = original.processorId;
this.key = original.key;
this.cbArg = original.cbArg;
this.op = original.op;
this.notificationOnly = true;
this.bridgeContext = original.bridgeContext;
this.originalSender = original.getSender();
// Assert.assertTrue(original.eventId != null); bug #47235 - region invalidation has no event id, so this fails
this.eventId = original.eventId;
this.posDup = original.posDup;
this.versionTag = original.versionTag;
}
@Override
public boolean isSevereAlertCompatible() {
// allow forced-disconnect processing for all cache op messages
return true;
}
/**
* send a notification-only message to a set of listeners. The processor
* id is passed with the message for reply message processing. This method
* does not wait on the processor.
*
* @param cacheOpReceivers receivers of associated bucket CacheOperationMessage
* @param adjunctRecipients receivers that must get the event
* @param filterRoutingInfo client routing information
* @param r the region affected by the event
* @param event the event that prompted this action
* @param processor the processor to reply to
* @return members that could not be notified
*/
public static Set notifyListeners(Set cacheOpReceivers, Set adjunctRecipients,
FilterRoutingInfo filterRoutingInfo,
PartitionedRegion r, EntryEventImpl event,
DirectReplyProcessor processor) {
DestroyMessage msg = new DestroyMessage(Collections.EMPTY_SET,
true, r.getPRId(), processor, event, null);
msg.versionTag = event.getVersionTag();
return msg.relayToListeners(cacheOpReceivers, adjunctRecipients,
filterRoutingInfo, event, r, processor);
}
/**
* Sends a DestroyMessage
* {@link com.gemstone.gemfire.cache.Region#destroy(Object)}message to the
* recipient
*
* @param recipient the recipient of the message
* @param r
* the PartitionedRegion for which the destroy was performed
* @param event the event causing this message
* @return the processor used to await the potential
* {@link com.gemstone.gemfire.cache.CacheException}
* @throws ForceReattemptException if the peer is no longer available
*/
public static DestroyResponse send(DistributedMember recipient,
PartitionedRegion r,
EntryEventImpl event,
Object expectedOldValue)
throws ForceReattemptException {
//Assert.assertTrue(recipient != null, "DestroyMessage NULL recipient"); recipient may be null for event notification
Set recipients = Collections.singleton(recipient);
DestroyResponse p = new DestroyResponse(r.getSystem(), recipients, false);
p.requireResponse();
DestroyMessage m = new DestroyMessage(recipients,
false,
r.getPRId(),
p,
event,
expectedOldValue);
m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
Set failures =r.getDistributionManager().putOutgoing(m);
if (failures != null && failures.size() > 0 ) {
throw new ForceReattemptException(LocalizedStrings.DestroyMessage_FAILED_SENDING_0.toLocalizedString(m));
}
return p;
}
@Override
public PartitionMessage getMessageForRelayToListeners(EntryEventImpl event, Set members) {
DestroyMessage msg = new DestroyMessage(this, event, members );
//Fix for 43000 - don't send the expected old value to listeners.
msg.expectedOldValue = null;
return msg;
}
/**
* This method is called upon receipt and make the desired changes to the
* PartitionedRegion Note: It is very important that this message does NOT
* cause any deadlocks as the sender will wait indefinitely for the
* acknowledgement
*/
@Override
protected boolean operateOnPartitionedRegion(DistributionManager dm,
PartitionedRegion r, long startTime)
throws EntryExistsException, DataLocationException
{
InternalDistributedMember eventSender = originalSender;
if (eventSender == null) {
eventSender = getSender();
}
EntryEventImpl event = null;
try {
if (r.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(r);
}
if (this.bridgeContext != null) {
event = EntryEventImpl.create(r, getOperation(), this.key, null/*newValue*/,
getCallbackArg(), false/*originRemote*/, eventSender,
true/*generateCallbacks*/);
event.setContext(this.bridgeContext);
} // bridgeContext != null
else {
event = EntryEventImpl.create(
r,
getOperation(),
this.key,
null, /*newValue*/
getCallbackArg(),
false/*originRemote - false to force distribution in buckets*/,
eventSender,
true/*generateCallbacks*/,
false/*initializeId*/);
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
event.setVersionTag(this.versionTag);
}
event.setInvokePRCallbacks(!notificationOnly);
Assert.assertTrue(eventId != null);
event.setEventId(eventId);
event.setPossibleDuplicate(this.posDup);
PartitionedRegionDataStore ds = r.getDataStore();
boolean sendReply = true;
if (!notificationOnly) {
Assert.assertTrue(ds!=null, "This process should have storage for an item in " + this.toString());
try {
Integer bucket = Integer.valueOf(PartitionedRegionHelper.getHashKey(r,
null, this.key, null, this.cbArg));
// try {
// // the event must show its true origin for cachewriter invocation
// event.setOriginRemote(true);
// event.setPartitionMessage(this);
// r.doCacheWriteBeforeDestroy(event);
// }
// finally {
// event.setOriginRemote(false);
// }
event.setCausedByMessage(this);
r.getDataView().destroyOnRemote(event, true/*cacheWrite*/, this.expectedOldValue);
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "{} updated bucket: {} with key: {}", getClass().getName(), bucket, this.key);
}
}
catch (CacheWriterException cwe) {
sendReply(getSender(), this.processorId, dm, new ReplyException(cwe), r, startTime);
return false;
}
catch (EntryNotFoundException eee) {
logger.trace(LogMarker.DM, "{}: operateOnRegion caught EntryNotFoundException", getClass().getName());
ReplyMessage.send(getSender(), getProcessorId(),
new ReplyException(eee), getReplySender(dm), r.isInternalRegion());
sendReply = false; // this prevents us from acking later
}
catch (PrimaryBucketException pbe) {
sendReply(getSender(), getProcessorId(), dm, new ReplyException(pbe), r, startTime);
sendReply = false;
} finally {
this.versionTag = event.getVersionTag();
}
}
else {
EntryEventImpl e2 = createListenerEvent(event, r, dm.getDistributionManagerId());
try {
r.invokeDestroyCallbacks(EnumListenerEvent.AFTER_DESTROY, e2, r.isInitialized(), true);
} finally {
// if e2 == ev then no need to free it here. The outer finally block will get it.
if (e2 != event) {
e2.release();
}
}
}
return sendReply;
} finally {
if (event != null) {
event.release();
}
}
}
@Override
protected void sendReply(InternalDistributedMember member, int procId, DM dm, ReplyException ex, PartitionedRegion pr, long startTime) {
if (pr != null && startTime > 0) {
pr.getPrStats().endPartitionMessagesProcessing(startTime);
}
if (ex == null) {
DestroyReplyMessage.send(getSender(), getReplySender(dm), this.processorId, this.versionTag, pr != null && pr.isInternalRegion());
} else {
ReplyMessage.send(getSender(), this.processorId, ex, getReplySender(dm), pr != null && pr.isInternalRegion());
}
}
public int getDSFID() {
return PR_DESTROY;
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException
{
super.fromData(in);
setKey(DataSerializer.readObject(in));
this.cbArg = DataSerializer.readObject(in);
this.op = Operation.fromOrdinal(in.readByte());
this.notificationOnly = in.readBoolean();
this.bridgeContext = ClientProxyMembershipID.readCanonicalized(in);
this.originalSender = (InternalDistributedMember)DataSerializer.readObject(in);
this.eventId = (EventID)DataSerializer.readObject(in);
this.expectedOldValue = DataSerializer.readObject(in);
final boolean hasFilterInfo = ((flags & HAS_FILTER_INFO) != 0);
if (hasFilterInfo) {
this.filterInfo = new FilterRoutingInfo();
InternalDataSerializer.invokeFromData(this.filterInfo, in);
}
this.versionTag = DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out) throws IOException
{
super.toData(out);
DataSerializer.writeObject(getKey(), out);
DataSerializer.writeObject(this.cbArg, out);
out.writeByte(this.op.ordinal);
out.writeBoolean(this.notificationOnly);
DataSerializer.writeObject(this.bridgeContext, out);
DataSerializer.writeObject(this.originalSender, out);
DataSerializer.writeObject(this.eventId, out);
DataSerializer.writeObject(this.expectedOldValue, out);
if (this.filterInfo != null) {
InternalDataSerializer.invokeToData(this.filterInfo, out);
}
DataSerializer.writeObject(this.versionTag, out);
}
@Override
protected short computeCompressedShort(short s) {
s = super.computeCompressedShort(s);
if (this.filterInfo != null) s |= HAS_FILTER_INFO;
return s;
}
@Override
public EventID getEventID() {
return this.eventId;
}
/** create a new EntryEvent to be used in notifying listeners, bridge servers, etc. */
EntryEventImpl createListenerEvent(EntryEventImpl sourceEvent, PartitionedRegion r,
InternalDistributedMember member) {
final EntryEventImpl e2;
if (this.notificationOnly && this.bridgeContext == null) {
e2 = sourceEvent;
}
else {
e2 = new EntryEventImpl(sourceEvent);
if (this.bridgeContext != null) {
e2.setContext(this.bridgeContext);
}
}
e2.setRegion(r);
e2.setOriginRemote(true);
e2.setInvokePRCallbacks(!notificationOnly);
if (this.filterInfo != null) {
e2.setLocalFilterInfo(this.filterInfo.getFilterInfo(member));
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
e2.setVersionTag(this.versionTag);
}
return e2;
}
/**
* Assists the toString method in reporting the contents of this message
*
* @see PartitionMessage#toString()
*/
@Override
protected void appendFields(StringBuffer buff)
{
super.appendFields(buff);
buff.append("; key=").append(getKey());
if (originalSender != null) {
buff.append("; originalSender=").append(originalSender);
}
if (bridgeContext != null) {
buff.append("; bridgeContext=").append(bridgeContext);
}
if (eventId != null) {
buff.append("; eventId=").append(eventId);
}
if (this.versionTag != null) {
buff.append("; version=").append(this.versionTag);
}
if (filterInfo != null) {
buff.append("; ").append(filterInfo);
}
}
protected final Object getKey()
{
return this.key;
}
private final void setKey(Object key)
{
this.key = key;
}
public final Operation getOperation()
{
return this.op;
}
protected final Object getCallbackArg()
{
return this.cbArg;
}
@Override
public void setFilterInfo(FilterRoutingInfo filterInfo){
if (filterInfo != null){
this.filterInfo = filterInfo;
}
}
@Override
protected boolean mayAddToMultipleSerialGateways(DistributionManager dm) {
return _mayAddToMultipleSerialGateways(dm);
}
public static class DestroyReplyMessage extends ReplyMessage {
private VersionTag versionTag;
/** DSFIDFactory constructor */
public DestroyReplyMessage() {
}
static void send(InternalDistributedMember recipient, ReplySender dm, int procId, VersionTag versionTag, boolean internal) {
Assert.assertTrue(recipient != null, "DestroyReplyMessage NULL recipient");
DestroyReplyMessage m = new DestroyReplyMessage(recipient, procId, versionTag);
m.internal = internal;
dm.putOutgoing(m);
}
DestroyReplyMessage(InternalDistributedMember recipient, int procId, VersionTag versionTag) {
this.setProcessorId(procId);
this.setRecipient(recipient);
this.versionTag = versionTag;
}
@Override
public int getDSFID() {
return PR_DESTROY_REPLY_MESSAGE;
}
@Override
public void process(final DM dm, final ReplyProcessor21 rp) {
final long startTime = getTimestamp();
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "DestroyReplyMessage process invoking reply processor with processorId: {}", this.processorId);
}
//dm.getLogger().warning("RemotePutResponse processor is " + ReplyProcessor21.getProcessor(this.processorId));
if (rp == null) {
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "DestroyReplyMessage processor not found");
}
return;
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
}
if (rp instanceof DestroyResponse) {
DestroyResponse processor = (DestroyResponse)rp;
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(this.getSender());
}
processor.setResponse(this.versionTag);
}
rp.process(this);
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.debug("{} processed {} ", rp, this);
}
dm.getStats().incReplyMessageTime(NanoTimer.getTime()-startTime);
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
byte b = this.versionTag != null ? HAS_VERSION_TAG : 0;
b |= this.versionTag instanceof DiskVersionTag ? PERSISTENT_TAG : 0;
out.writeByte(b);
if (this.versionTag != null) {
InternalDataSerializer.invokeToData(this.versionTag, out);
}
}
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
super.fromData(in);
byte b = in.readByte();
boolean hasTag = (b & HAS_VERSION_TAG) != 0;
boolean persistentTag = (b & PERSISTENT_TAG) != 0;
if (hasTag) {
this.versionTag = VersionTag.create(persistentTag, in);
}
}
@Override
public String toString() {
StringBuilder sb = super.getStringBuilder();
if (this.versionTag != null) {
sb.append(" version=").append(this.versionTag);
}
sb.append(" from ");
sb.append(this.getSender());
ReplyException ex = getException();
if (ex != null) {
sb.append(" with exception ");
sb.append(ex);
}
return sb.toString();
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.distributed.internal.ReplyMessage#getInlineProcess()
*/
@Override
public boolean getInlineProcess() {
return true;
}
}
public static class DestroyResponse extends PartitionResponse {
VersionTag versionTag;
DestroyResponse(InternalDistributedSystem ds, Set recipients, Object key) {
super(ds, recipients, false);
}
void setResponse(VersionTag versionTag) {
this.versionTag = versionTag;
}
public VersionTag getVersionTag() {
return this.versionTag;
}
}
}