| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.distributed.internal.ConflationKey; |
| import org.apache.geode.distributed.internal.DirectReplyProcessor; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyMessage; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.cache.EntryEventImpl.NewValueImporter; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.offheap.annotations.Retained; |
| import org.apache.geode.internal.offheap.annotations.Unretained; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Handles distribution messaging for updating an entry in a region. |
| * |
| */ |
| public class UpdateOperation extends AbstractUpdateOperation { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** Creates a new instance of UpdateOperation */ |
| public UpdateOperation(EntryEventImpl event, long lastModifiedTime) { |
| super(event, lastModifiedTime); |
| } |
| |
| // protected Set getRecipients() { |
| // DistributionAdvisor advisor = getRegion().getDistributionAdvisor(); |
| // return super.getRecipients(); |
| // } |
| |
| @Override |
| protected boolean supportsDeltaPropagation() { |
| return true; |
| } |
| |
| @Override |
| protected CacheOperationMessage createMessage() { |
| EntryEventImpl ev = getEvent(); |
| if (ev.isBridgeEvent()) { |
| UpdateWithContextMessage mssgwithContxt = new UpdateWithContextMessage(); |
| // getContext is not in EntryEvent interface because it exposes a private |
| // class |
| mssgwithContxt.clientID = ev.getContext(); |
| return mssgwithContxt; |
| } else { |
| return new UpdateMessage(); |
| } |
| } |
| |
| @Override |
| protected void initMessage(CacheOperationMessage msg, DirectReplyProcessor p) { |
| super.initMessage(msg, p); |
| UpdateMessage m = (UpdateMessage) msg; |
| EntryEventImpl ev = getEvent(); |
| m.event = ev; |
| m.eventId = ev.getEventId(); |
| m.key = ev.getKey(); |
| m.deserializationPolicy = DESERIALIZATION_POLICY_LAZY; |
| ev.exportNewValue(m); |
| } |
| |
| @Override |
| protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) { |
| if (processor != null) { |
| if (msg instanceof UpdateWithContextMessage) { |
| processor.msg = new UpdateWithContextMessage((UpdateWithContextMessage) msg); |
| } else { |
| processor.msg = new UpdateMessage((UpdateMessage) msg); |
| } |
| } |
| } |
| |
| public static class UpdateMessage extends AbstractUpdateMessage implements NewValueImporter { |
| |
| /** |
| * Indicates if and when the new value should be deserialized on the the receiver |
| */ |
| protected byte deserializationPolicy; |
| |
| protected EntryEventImpl event = null; |
| |
| protected EventID eventId = null; |
| |
| protected Object key; |
| |
| protected byte[] newValue; |
| |
| @Unretained(ENTRY_EVENT_NEW_VALUE) |
| protected transient Object newValueObj; |
| |
| private byte[] deltaBytes; |
| |
| private boolean sendDeltaWithFullValue = true; |
| |
| // extraFlags |
| static final int HAS_EVENTID = getNextByteMask(DESERIALIZATION_POLICY_END); |
| static final int HAS_DELTA_WITH_FULL_VALUE = getNextByteMask(HAS_EVENTID); |
| |
| private Long tailKey = 0L; |
| |
| public UpdateMessage() {} |
| |
| /** |
| * copy constructor |
| */ |
| public UpdateMessage(UpdateMessage upMsg) { |
| this.appliedOperation = upMsg.appliedOperation; |
| this.callbackArg = upMsg.callbackArg; |
| this.deserializationPolicy = upMsg.deserializationPolicy; |
| this.directAck = upMsg.directAck; |
| this.event = upMsg.event; |
| this.eventId = upMsg.eventId; |
| this.hasDelta = upMsg.hasDelta; |
| this.key = upMsg.key; |
| this.lastModified = upMsg.lastModified; |
| this.newValue = upMsg.newValue; |
| this.newValueObj = upMsg.newValueObj; |
| this.op = upMsg.op; |
| this.owner = upMsg.owner; |
| this.possibleDuplicate = upMsg.possibleDuplicate; |
| this.processorId = upMsg.processorId; |
| this.regionAllowsConflation = upMsg.regionAllowsConflation; |
| this.regionPath = upMsg.regionPath; |
| this.sendDelta = upMsg.sendDelta; |
| this.sender = upMsg.sender; |
| this.processor = upMsg.processor; |
| this.filterRouting = upMsg.filterRouting; |
| this.needsRouting = upMsg.needsRouting; |
| this.versionTag = upMsg.versionTag; |
| } |
| |
| @Override |
| public ConflationKey getConflationKey() { |
| if (!super.regionAllowsConflation || this.directAck || getProcessorId() != 0) { |
| // if the publisher's region attributes do not support conflation |
| // or if it is an ack region |
| // then don't even bother with a conflation key |
| return null; |
| } else { |
| // only conflate if it is not a create |
| // and we don't want an ack |
| return new ConflationKey(this.key, super.regionPath, getOperation().isUpdate()); |
| } |
| } |
| |
| @Override |
| @Retained |
| protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException { |
| EntryEventImpl ev = createEntryEvent(rgn); |
| boolean evReturned = false; |
| try { |
| ev.setEventId(this.eventId); |
| |
| ev.setDeltaBytes(this.deltaBytes); |
| |
| if (hasDelta()) { |
| this.newValueObj = null; |
| // New value will be set once it is generated with fromDelta() inside |
| // EntryEventImpl.processDeltaBytes() |
| ev.setNewValue(this.newValueObj); |
| } else { |
| setNewValueInEvent(this.newValue, this.newValueObj, ev, this.deserializationPolicy); |
| } |
| if (this.filterRouting != null) { |
| ev.setLocalFilterInfo(this.filterRouting.getFilterInfo(rgn.getMyId())); |
| } |
| ev.setTailKey(tailKey); |
| |
| ev.setVersionTag(this.versionTag); |
| |
| ev.setInhibitAllNotifications(this.inhibitAllNotifications); |
| |
| evReturned = true; |
| return ev; |
| } finally { |
| if (!evReturned) { |
| ev.release(); |
| } |
| } |
| } |
| |
| @Override |
| boolean processReply(final ReplyMessage replyMessage, CacheOperationReplyProcessor processor) { |
| ReplyException ex = replyMessage.getException(); |
| if (ex != null && ex.getCause() instanceof InvalidDeltaException) { |
| // msg can be null when PR data store throws exception back to |
| // accessor. |
| UpdateMessage message = this; |
| if (!(message.hasBridgeContext() && message.getDataPolicy() == DataPolicy.EMPTY)) { |
| final UpdateMessage updateMsg; |
| final DistributionManager dm = this.event.getRegion().getDistributionManager(); |
| if (this instanceof UpdateWithContextMessage) { |
| updateMsg = |
| new UpdateOperation.UpdateWithContextMessage((UpdateWithContextMessage) this); |
| } else { |
| updateMsg = new UpdateOperation.UpdateMessage((UpdateMessage) this); |
| } |
| Runnable sendMessage = new Runnable() { |
| @Override |
| public void run() { |
| synchronized (updateMsg) { // prevent concurrent update of |
| // recipient list |
| updateMsg.resetRecipients(); |
| updateMsg.setRecipient(replyMessage.getSender()); |
| updateMsg.setSendDelta(false); |
| updateMsg.setSendDeltaWithFullValue(false); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Sending full object ({}) to {}", updateMsg, |
| replyMessage.getSender()); |
| } |
| dm.putOutgoing(updateMsg); |
| } |
| updateMsg.event.getRegion().getCachePerfStats().incDeltaFullValuesSent(); |
| } |
| |
| @Override |
| public String toString() { |
| return "Sending full object {" + updateMsg.toString() + "}"; |
| } |
| }; |
| |
| if (processor.isExpectingDirectReply()) { |
| sendMessage.run(); |
| } else { |
| dm.getExecutors().getWaitingThreadPool().execute(sendMessage); |
| } |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Utility to set the new value in the EntryEventImpl based on the given deserialization value; |
| * also called from QueuedOperation |
| */ |
| static void setNewValueInEvent(byte[] newValue, Object newValueObj, EntryEventImpl event, |
| byte deserializationPolicy) { |
| if (newValue == null) { |
| // in an UpdateMessage this results from a create(key, null) call, |
| // set local invalid flag in event if this is a normal region. Otherwise |
| // it should be a distributed invalid. |
| if (event.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) { |
| event.setLocalInvalid(true); |
| } |
| event.setNewValue(newValue); |
| Assert.assertTrue(deserializationPolicy == DESERIALIZATION_POLICY_NONE); |
| return; |
| } |
| |
| switch (deserializationPolicy) { |
| case DESERIALIZATION_POLICY_LAZY: |
| event.setSerializedNewValue(newValue); |
| break; |
| case DESERIALIZATION_POLICY_NONE: |
| event.setNewValue(newValue); |
| break; |
| default: |
| throw new InternalGemFireError( |
| String.format("unknown deserialization policy: %s", |
| Byte.valueOf(deserializationPolicy))); |
| } |
| } |
| |
| @Retained |
| protected EntryEventImpl createEntryEvent(DistributedRegion rgn) { |
| Object argNewValue = null; |
| final boolean originRemote = true, generateCallbacks = true; |
| @Retained |
| EntryEventImpl result = EntryEventImpl.create(rgn, getOperation(), this.key, argNewValue, // oldValue, |
| this.callbackArg, originRemote, getSender(), generateCallbacks); |
| setOldValueInEvent(result); |
| result.setTailKey(this.tailKey); |
| if (this.versionTag != null) { |
| result.setVersionTag(this.versionTag); |
| } |
| return result; |
| } |
| |
| @Override |
| protected void appendFields(StringBuilder buff) { |
| super.appendFields(buff); |
| buff.append("; key="); |
| buff.append(this.key); |
| if (this.hasDelta()) { |
| byte[] bytes; |
| if (this.event != null) { |
| bytes = this.event.getDeltaBytes(); |
| } else { |
| bytes = this.deltaBytes; |
| } |
| if (bytes == null) { |
| buff.append("; null delta bytes"); |
| } else { |
| buff.append("; ").append(bytes.length).append(" delta bytes"); |
| } |
| } else if (this.newValueObj != null) { |
| buff.append("; newValueObj="); |
| buff.append(this.newValueObj); |
| } else { |
| buff.append("; newValue="); |
| // buff.append(this.newValue); |
| buff.append(newValue == null ? "null" : "(" + newValue.length + " bytes)"); |
| } |
| if (this.eventId != null) { |
| buff.append("; eventId=").append(this.eventId); |
| } |
| buff.append("; deserializationPolicy="); |
| buff.append(deserializationPolicyToString(this.deserializationPolicy)); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return UPDATE_MESSAGE; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| final byte extraFlags = in.readByte(); |
| final boolean hasEventId = (extraFlags & HAS_EVENTID) != 0; |
| if (hasEventId) { |
| this.eventId = new EventID(); |
| InternalDataSerializer.invokeFromData(this.eventId, in); |
| |
| boolean hasTailKey = in.readBoolean(); |
| if (hasTailKey) { |
| this.tailKey = in.readLong(); |
| } |
| } else { |
| this.eventId = null; |
| } |
| this.key = DataSerializer.readObject(in); |
| |
| this.deserializationPolicy = (byte) (extraFlags & DESERIALIZATION_POLICY_MASK); |
| if (hasDelta()) { |
| this.deltaBytes = DataSerializer.readByteArray(in); |
| } else { |
| this.newValue = DataSerializer.readByteArray(in); |
| if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) { |
| this.deltaBytes = DataSerializer.readByteArray(in); |
| } |
| } |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| DistributedRegion region = (DistributedRegion) this.event.getRegion(); |
| setDeltaFlag(region); |
| super.toData(out, context); |
| |
| byte extraFlags = this.deserializationPolicy; |
| if (this.eventId != null) |
| extraFlags |= HAS_EVENTID; |
| if (this.deserializationPolicy != DistributedCacheOperation.DESERIALIZATION_POLICY_NONE |
| && this.sendDeltaWithFullValue && this.event.getDeltaBytes() != null) { |
| extraFlags |= HAS_DELTA_WITH_FULL_VALUE; |
| } |
| out.writeByte(extraFlags); |
| |
| if (this.eventId != null) { |
| InternalDataSerializer.invokeToData(this.eventId, out); |
| if (region instanceof BucketRegion) { |
| PartitionedRegion pr = region.getPartitionedRegion(); |
| // TODO Kishor: Since here we are talking about tail key |
| // then we are surely considering Paralle Gateway |
| if (!pr.isParallelWanEnabled()) { |
| out.writeBoolean(false); |
| } else { |
| out.writeBoolean(true); |
| out.writeLong(this.event.getTailKey()); |
| } |
| } else { |
| out.writeBoolean(false); |
| } |
| } |
| DataSerializer.writeObject(key, out); |
| |
| if (hasDelta()) { |
| DataSerializer.writeByteArray(this.event.getDeltaBytes(), out); |
| this.event.getRegion().getCachePerfStats().incDeltasSent(); |
| } else { |
| DistributedCacheOperation.writeValue(this.deserializationPolicy, this.newValueObj, |
| this.newValue, out); |
| if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) { |
| DataSerializer.writeByteArray(this.event.getDeltaBytes(), out); |
| } |
| } |
| } |
| |
| @Override |
| public EventID getEventID() { |
| return this.eventId; |
| } |
| |
| private void setDeltaFlag(DistributedRegion region) { |
| try { |
| if (region != null && region.getSystem().getConfig().getDeltaPropagation() && this.sendDelta |
| && !region.scope.isDistributedNoAck() && this.event.getDeltaBytes() != null) { |
| setHasDelta(true); |
| return; |
| } |
| setHasDelta(false); |
| } catch (RuntimeException re) { |
| throw new InvalidDeltaException( |
| "Caught exception while sending delta. ", |
| re); |
| } |
| } |
| |
| public boolean hasBridgeContext() { |
| if (this.event != null) { |
| return this.event.getContext() != null; |
| } |
| return false; |
| } |
| |
| public DataPolicy getDataPolicy() { |
| if (this.event != null) { |
| return this.event.getRegion().getAttributes().getDataPolicy(); |
| } |
| return null; |
| } |
| |
| public void setSendDeltaWithFullValue(boolean bool) { |
| this.sendDeltaWithFullValue = bool; |
| } |
| |
| @Override |
| public boolean prefersNewSerialized() { |
| return true; |
| } |
| |
| @Override |
| public boolean isUnretainedNewReferenceOk() { |
| return true; |
| } |
| |
| @Override |
| public void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, |
| boolean isSerialized) { |
| if (nv == null) { |
| this.deserializationPolicy = DESERIALIZATION_POLICY_NONE; |
| this.newValue = null; |
| } else { |
| if (!isSerialized) { |
| this.deserializationPolicy = DESERIALIZATION_POLICY_NONE; |
| } |
| this.newValueObj = nv; |
| } |
| } |
| |
| @Override |
| public void importNewBytes(byte[] nv, boolean isSerialized) { |
| if (!isSerialized) { |
| this.deserializationPolicy = DESERIALIZATION_POLICY_NONE; |
| } |
| this.newValue = nv; |
| } |
| } |
| |
| public static class UpdateWithContextMessage extends UpdateMessage { |
| |
| protected transient ClientProxyMembershipID clientID; |
| |
| @Override |
| @Retained |
| public EntryEventImpl createEntryEvent(DistributedRegion rgn) { |
| // Object oldValue = null; |
| final Object argNewValue = null; |
| // boolean localLoad = false, netLoad = false, netSearch = false, |
| // distributed = true; |
| final boolean originRemote = true, generateCallbacks = true; |
| @Retained |
| EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key, argNewValue, |
| this.callbackArg, originRemote, getSender(), generateCallbacks); |
| ev.setContext(this.clientID); |
| setOldValueInEvent(ev); |
| return ev; |
| // localLoad, netLoad, netSearch, |
| // distributed, this.isExpiration, originRemote, this.context); |
| } |
| |
| public UpdateWithContextMessage() {} |
| |
| public UpdateWithContextMessage(UpdateWithContextMessage msg) { |
| super(msg); |
| this.clientID = msg.clientID; |
| } |
| |
| @Override |
| protected void appendFields(StringBuilder buff) { |
| super.appendFields(buff); |
| buff.append("; context=").append(this.clientID); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return UPDATE_WITH_CONTEXT_MESSAGE; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.clientID = ClientProxyMembershipID.readCanonicalized(in); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| DataSerializer.writeObject(this.clientID, out); |
| } |
| } |
| |
| } |