blob: 4dfa6e7d7fffe0955ff266ea4f888c3ac694e28b [file] [log] [blame]
/*
* =========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.distributed.internal.ConflationKey;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.cache.EntryEventImpl.NewValueImporter;
import com.gemstone.gemfire.internal.cache.EntryEventImpl.SerializedCacheValueImpl;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.internal.util.Breadcrumbs;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
/**
* Handles distribution messaging for updating an entry in a region.
*
* @author Eric Zoerner
*/
public class UpdateOperation extends AbstractUpdateOperation
{
private static final Logger logger = LogService.getLogger();
/** Creates a new instance of UpdateOperation */
public UpdateOperation(EntryEventImpl event, long lastModifiedTime) {
super(event, lastModifiedTime);
}
// protected Set getRecipients() {
// DistributionAdvisor advisor = getRegion().getDistributionAdvisor();
// return super.getRecipients();
// }
@Override
protected boolean supportsDeltaPropagation() {
return true;
}
@Override
protected CacheOperationMessage createMessage()
{
EntryEventImpl ev = getEvent();
if (ev.isBridgeEvent()) {
UpdateWithContextMessage mssgwithContxt = new UpdateWithContextMessage();
// getContext is not in EntryEvent interface because it exposes a private
// class
mssgwithContxt.clientID = ev.getContext();
return mssgwithContxt;
}
else {
return new UpdateMessage();
}
}
@Override
protected void initMessage(CacheOperationMessage msg,
DirectReplyProcessor p)
{
super.initMessage(msg, p);
UpdateMessage m = (UpdateMessage)msg;
EntryEventImpl ev = getEvent();
m.event = ev;
m.eventId = ev.getEventId();
m.key = ev.getKey();
if (CachedDeserializableFactory.preferObject() || ev.hasDelta()) {
m.deserializationPolicy = DESERIALIZATION_POLICY_EAGER;
} else {
m.deserializationPolicy = DESERIALIZATION_POLICY_LAZY;
}
ev.exportNewValue(m);
}
@Override
protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
if (processor != null) {
if(msg instanceof UpdateWithContextMessage){
processor.msg = new UpdateWithContextMessage((UpdateWithContextMessage)msg);
}
else{
processor.msg = new UpdateMessage((UpdateMessage)msg);
}
}
}
public static class UpdateMessage extends AbstractUpdateMessage implements NewValueImporter {
/**
* Indicates if and when the new value should be deserialized on the the
* receiver
*/
protected byte deserializationPolicy;
protected EntryEventImpl event = null;
protected EventID eventId = null;
protected Object key;
protected byte[] newValue;
@Unretained(ENTRY_EVENT_NEW_VALUE)
protected transient Object newValueObj;
private byte[] deltaBytes;
private boolean sendDeltaWithFullValue = true;
// extraFlags
static final int HAS_EVENTID = getNextByteMask(DESERIALIZATION_POLICY_END);
static final int HAS_DELTA_WITH_FULL_VALUE = getNextByteMask(HAS_EVENTID);
private Long tailKey = 0L;
public UpdateMessage() {
}
/**
* copy constructor
*/
public UpdateMessage(UpdateMessage upMsg) {
this.appliedOperation = upMsg.appliedOperation;
this.callbackArg = upMsg.callbackArg;
this.deserializationPolicy = upMsg.deserializationPolicy;
this.directAck = upMsg.directAck;
this.event = upMsg.event;
this.eventId = upMsg.eventId;
this.hasDelta = upMsg.hasDelta;
this.key = upMsg.key;
this.lastModified = upMsg.lastModified;
this.newValue = upMsg.newValue;
this.newValueObj = upMsg.newValueObj;
this.op = upMsg.op;
this.owner = upMsg.owner;
this.possibleDuplicate = upMsg.possibleDuplicate;
this.processorId = upMsg.processorId;
this.regionAllowsConflation = upMsg.regionAllowsConflation;
this.regionPath = upMsg.regionPath;
this.sendDelta = upMsg.sendDelta;
this.sender = upMsg.sender;
this.processor = upMsg.processor;
this.filterRouting = upMsg.filterRouting;
this.needsRouting = upMsg.needsRouting;
this.versionTag = upMsg.versionTag;
}
@Override
public ConflationKey getConflationKey()
{
if (!super.regionAllowsConflation || this.directAck
|| getProcessorId() != 0) {
// if the publisher's region attributes do not support conflation
// or if it is an ack region
// then don't even bother with a conflation key
return null;
}
else {
// only conflate if it is not a create
// and we don't want an ack
return new ConflationKey(this.key, super.regionPath, getOperation()
.isUpdate());
}
}
@Override
protected InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException {
EntryEventImpl ev = createEntryEvent(rgn);
boolean evReturned = false;
try {
ev.setEventId(this.eventId);
ev.setDeltaBytes(this.deltaBytes);
if (hasDelta()) {
this.newValueObj = null;
// New value will be set once it is generated with fromDelta() inside
// EntryEventImpl.processDeltaBytes()
ev.setNewValue(this.newValueObj);
}
else {
setNewValueInEvent(this.newValue, this.newValueObj, ev,
this.deserializationPolicy);
}
if (this.filterRouting != null) {
ev.setLocalFilterInfo(this.filterRouting
.getFilterInfo(rgn.getMyId()));
}
ev.setTailKey(tailKey);
ev.setVersionTag(this.versionTag);
ev.setInhibitAllNotifications(this.inhibitAllNotifications);
evReturned = true;
return ev;
} finally {
if (!evReturned) {
ev.release();
}
}
}
@Override
boolean processReply(final ReplyMessage replyMessage, CacheOperationReplyProcessor processor) {
ReplyException ex = replyMessage.getException();
if (ex != null && ex.getCause() instanceof InvalidDeltaException) {
// msg can be null when PR data store throws exception back to
// accessor.
UpdateMessage message = this;
if (!(message.hasBridgeContext() && message.getDataPolicy() == DataPolicy.EMPTY)) {
final UpdateMessage updateMsg;
final DM dm = this.event.getRegion().getDistributionManager();
if (this instanceof UpdateWithContextMessage) {
updateMsg = new UpdateOperation.UpdateWithContextMessage(
(UpdateWithContextMessage)this);
}
else {
updateMsg = new UpdateOperation.UpdateMessage(
(UpdateMessage)this);
}
Runnable sendMessage = new Runnable() {
public void run() {
synchronized (updateMsg) { // prevent concurrent update of
// recipient list
updateMsg.resetRecipients();
updateMsg.setRecipient(replyMessage.getSender());
updateMsg.setSendDelta(false);
updateMsg.setSendDeltaWithFullValue(false);
if (logger.isDebugEnabled()) {
logger.debug("Sending full object ({}) to {}", updateMsg, replyMessage.getSender());
}
dm.putOutgoing(updateMsg);
}
updateMsg.event.getRegion().getCachePerfStats()
.incDeltaFullValuesSent();
}
@Override
public String toString() {
return "Sending full object {" + updateMsg.toString() + "}";
}
};
if (processor.isExpectingDirectReply()) {
sendMessage.run();
} else {
dm.getWaitingThreadPool().execute(
sendMessage);
}
return false;
}
}
return true;
}
/**
* Utility to set the new value in the EntryEventImpl based on the given
* deserialization value; also called from QueuedOperation
*/
static void setNewValueInEvent(byte[] newValue, Object newValueObj,
EntryEventImpl event, byte deserializationPolicy) {
if (newValue == null
&& deserializationPolicy != DESERIALIZATION_POLICY_EAGER) {
// in an UpdateMessage this results from a create(key, null) call,
// set local invalid flag in event if this is a normal region. Otherwise
// it should be a distributed invalid.
if(event.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) {
event.setLocalInvalid(true);
}
event.setNewValue(newValue);
Assert.assertTrue(deserializationPolicy == DESERIALIZATION_POLICY_NONE);
return;
}
switch (deserializationPolicy) {
case DESERIALIZATION_POLICY_LAZY:
event.setSerializedNewValue(newValue);
break;
case DESERIALIZATION_POLICY_NONE:
event.setNewValue(newValue);
break;
case DESERIALIZATION_POLICY_EAGER:
event.setNewValue(newValueObj);
break;
default:
throw new InternalGemFireError(LocalizedStrings
.UpdateOperation_UNKNOWN_DESERIALIZATION_POLICY_0
.toLocalizedString(Byte.valueOf(deserializationPolicy)));
}
}
protected EntryEventImpl createEntryEvent(DistributedRegion rgn)
{
Object argNewValue = null;
final boolean originRemote = true, generateCallbacks = true;
if (rgn.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(rgn);
}
EntryEventImpl result = EntryEventImpl.create(rgn, getOperation(), this.key,
argNewValue, // oldValue,
this.callbackArg, originRemote, getSender(), generateCallbacks);
setOldValueInEvent(result);
result.setTailKey(this.tailKey);
if (this.versionTag != null) {
result.setVersionTag(this.versionTag);
}
return result;
}
@Override
protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; key=");
buff.append(this.key);
if (this.hasDelta()) {
byte[] bytes;
if (this.event != null) {
bytes = this.event.getDeltaBytes();
} else {
bytes = this.deltaBytes;
}
if (bytes == null) {
buff.append("; null delta bytes");
} else {
buff.append("; ").append(bytes.length).append(" delta bytes");
}
}
else if (this.newValueObj != null) {
buff.append("; newValueObj=");
buff.append(this.newValueObj);
}
else {
buff.append("; newValue=");
// buff.append(this.newValue);
buff.append(newValue == null ? "null" : "(" + newValue.length
+ " bytes)");
}
if (this.eventId != null) {
buff.append("; eventId=").append(this.eventId);
}
buff.append("; deserializationPolicy=");
buff.append(deserializationPolicyToString(this.deserializationPolicy));
}
public int getDSFID()
{
return UPDATE_MESSAGE;
}
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
super.fromData(in);
final byte extraFlags = in.readByte();
final boolean hasEventId = (extraFlags & HAS_EVENTID) != 0;
if (hasEventId) {
this.eventId = new EventID();
InternalDataSerializer.invokeFromData(this.eventId, in);
boolean hasTailKey = in.readBoolean();
if (hasTailKey){
this.tailKey = in.readLong();
}
}
else {
this.eventId = null;
}
this.key = DataSerializer.readObject(in);
this.deserializationPolicy = (byte)(extraFlags
& DESERIALIZATION_POLICY_MASK);
if (hasDelta()) {
this.deltaBytes = DataSerializer.readByteArray(in);
}
else {
if (this.deserializationPolicy
== DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) {
this.newValueObj = DataSerializer.readObject(in);
}
else {
this.newValue = DataSerializer.readByteArray(in);
}
if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) {
this.deltaBytes = DataSerializer.readByteArray(in);
}
}
}
@Override
public void toData(DataOutput out) throws IOException
{
DistributedRegion region = (DistributedRegion)this.event.getRegion();
setDeltaFlag(region);
super.toData(out);
byte extraFlags = this.deserializationPolicy;
if (this.eventId != null) extraFlags |= HAS_EVENTID;
if (this.deserializationPolicy != DistributedCacheOperation.DESERIALIZATION_POLICY_NONE
&& this.sendDeltaWithFullValue && this.event.getDeltaBytes() != null) {
extraFlags |= HAS_DELTA_WITH_FULL_VALUE;
}
out.writeByte(extraFlags);
if (this.eventId != null) {
InternalDataSerializer.invokeToData(this.eventId, out);
if (region instanceof BucketRegion) {
PartitionedRegion pr = region.getPartitionedRegion();
// TODO Kishor: Since here we are talking about tail key
// then we are surely considering Paralle Gateway
if (!pr.isParallelWanEnabled()) {
out.writeBoolean(false);
}
else {
out.writeBoolean(true);
out.writeLong(this.event.getTailKey());
}
}
else {
out.writeBoolean(false);
}
}
DataSerializer.writeObject(key, out);
if (hasDelta()) {
DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
this.event.getRegion().getCachePerfStats().incDeltasSent();
} else {
// TODO OFFHEAP MERGE: add a writeValue that will cache in the event like so:
//byte[] newValueBytes = BlobHelper.serializeToBlob(this.newValueObj);
//this.event.setCachedSerializedNewValue(newValueBytes);
DistributedCacheOperation.writeValue(this.deserializationPolicy, this.newValueObj, this.newValue, out);
if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) {
DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
}
}
}
@Override
public EventID getEventID() {
return this.eventId;
}
private void setDeltaFlag(DistributedRegion region) {
try {
if (region != null
&& region.getSystem().getConfig().getDeltaPropagation()
&& this.sendDelta && !region.scope.isDistributedNoAck()
&& this.event.getDeltaBytes() != null) {
setHasDelta(true);
return;
}
setHasDelta(false);
} catch (RuntimeException re) {
throw new InvalidDeltaException(
LocalizedStrings.DistributionManager_CAUGHT_EXCEPTION_WHILE_SENDING_DELTA
.toLocalizedString(), re);
}
}
@Override
public List getOperations()
{
byte[] valueBytes = null;
Object valueObj = null;
if (this.newValueObj != null) {
if (this.deserializationPolicy ==
DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) {
valueObj = this.newValueObj;
}
else {
valueBytes = EntryEventImpl.serialize(this.newValueObj);
}
}
else {
valueBytes = this.newValue;
}
return Collections.singletonList(new QueuedOperation(getOperation(),
this.key, valueBytes, valueObj, this.deserializationPolicy,
this.callbackArg));
}
public boolean hasBridgeContext() {
if (this.event != null) {
return this.event.getContext() != null;
}
return false;
}
public DataPolicy getDataPolicy() {
if (this.event != null) {
return this.event.getRegion().getAttributes().getDataPolicy();
}
return null;
}
public void setSendDeltaWithFullValue(boolean bool) {
this.sendDeltaWithFullValue = bool;
}
@Override
public boolean prefersNewSerialized() {
return true;
}
@Override
public boolean isUnretainedNewReferenceOk() {
return true;
}
@Override
public void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized) {
if (nv == null) {
this.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
this.newValue = null;
} else {
if (!isSerialized) {
this.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
}
this.newValueObj = nv;
}
}
@Override
public void importNewBytes(byte[] nv, boolean isSerialized) {
if (!isSerialized) {
this.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
}
this.newValue = nv;
}
}
public static final class UpdateWithContextMessage extends UpdateMessage
{
protected transient ClientProxyMembershipID clientID;
@Override
final public EntryEventImpl createEntryEvent(DistributedRegion rgn)
{
// Object oldValue = null;
final Object argNewValue = null;
// boolean localLoad = false, netLoad = false, netSearch = false,
// distributed = true;
final boolean originRemote = true, generateCallbacks = true;
if (rgn.keyRequiresRegionContext()) {
((KeyWithRegionContext)this.key).setRegionContext(rgn);
}
EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
argNewValue, this.callbackArg, originRemote, getSender(),
generateCallbacks);
ev.setContext(this.clientID);
setOldValueInEvent(ev);
return ev;
// localLoad, netLoad, netSearch,
// distributed, this.isExpiration, originRemote, this.context);
}
public UpdateWithContextMessage() {
}
public UpdateWithContextMessage(UpdateWithContextMessage msg) {
super(msg);
this.clientID = msg.clientID;
}
@Override
protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; context=").append(this.clientID);
}
@Override
public int getDSFID()
{
return UPDATE_WITH_CONTEXT_MESSAGE;
}
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException
{
super.fromData(in);
this.clientID = ClientProxyMembershipID.readCanonicalized(in);
}
@Override
public void toData(DataOutput out) throws IOException
{
super.toData(out);
DataSerializer.writeObject(this.clientID, out);
}
}
}