/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.partitioned;

import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.EntryExistsException;
import org.apache.geode.cache.Operation;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.ReplySender;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.DataLocationException;
import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EntryEventImpl.NewValueImporter;
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tx.RemotePutMessage;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * A Partitioned Region update message. Meant to be sent only to a bucket's primary owner. In
 * addition to updating an entry it is also used to send Partitioned Region event information.
 *
 * @since GemFire 5.0
 */
public class PutMessage extends PartitionMessageWithDirectReply implements NewValueImporter {
  private static final Logger logger = LogService.getLogger();

  /** The key associated with the value that must be sent */
  private Object key;

  /** The value associated with the key that must be sent */
  private byte[] valBytes;

  /**
   * Used on sender side only to defer serialization until toData is called.
   */
  @Unretained(ENTRY_EVENT_NEW_VALUE)
  private transient Object valObj;

  /** The callback arg of the operation */
  private Object cbArg;

  /** The time stamp when the value was created */
  protected long lastModified;

  /** The operation performed on the sender */
  private Operation op;

  /**
   * An additional object providing context for the operation, e.g., for BridgeServer notification
   */
  ClientProxyMembershipID bridgeContext;

  /** event identifier */
  EventID eventId;

  /**
   * for relayed messages, this is the sender of the original message. It should be used in
   * constructing events for listener notification.
   */
  InternalDistributedMember originalSender;

  /**
   * Indicates if and when the new value should be deserialized on the the receiver. Distinguishes
   * between a non-byte[] value that was serialized (DESERIALIZATION_POLICY_LAZY) and a byte[] array
   * value that didn't need to be serialized (DESERIALIZATION_POLICY_NONE). While this seems like an
   * extra data, it isn't, because serializing a byte[] causes the type (a byte) to be written in
   * the stream, AND what's better is that handling this distinction at this level reduces
   * processing for values that are byte[].
   */
  protected byte deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE;

  /**
   * whether it's okay to create a new key
   */
  private boolean ifNew;

  /**
   * whether it's okay to update an existing key
   */
  private boolean ifOld;

  /**
   * Whether an old value is required in the response
   */
  private boolean requireOldValue;

  /**
   * For put to happen, the old value must be equal to this expectedOldValue.
   *
   * @see PartitionedRegion#replace(Object, Object, Object)
   */
  private Object expectedOldValue;

  private transient InternalDistributedSystem internalDs;

  /**
   * state from operateOnRegion that must be preserved for transmission from the waiting pool
   */
  transient boolean result = false;

  /** client routing information for notificationOnly=true messages */
  private FilterRoutingInfo filterInfo;

  private boolean hasFilterInfo;

  /** whether value has delta **/
  private boolean hasDelta = false;

  /** whether new value is formed by applying delta **/
  private transient boolean isDeltaApplied = false;

  /** whether to send delta or full value **/
  private transient boolean sendDelta = false;

  private EntryEventImpl event = null;

  private byte[] deltaBytes = null;

  private VersionTag versionTag;

  // additional bitmask flags used for serialization/deserialization

  protected static final short CACHE_WRITE = UNRESERVED_FLAGS_START;
  protected static final short HAS_EXPECTED_OLD_VAL = (CACHE_WRITE << 1);
  protected static final short HAS_VERSION_TAG = (HAS_EXPECTED_OLD_VAL << 1);

  // extraFlags
  protected static final int HAS_BRIDGE_CONTEXT =
      getNextByteMask(DistributedCacheOperation.DESERIALIZATION_POLICY_END);
  protected static final int HAS_ORIGINAL_SENDER = getNextByteMask(HAS_BRIDGE_CONTEXT);
  protected static final int HAS_DELTA_WITH_FULL_VALUE = getNextByteMask(HAS_ORIGINAL_SENDER);
  protected static final int HAS_CALLBACKARG = getNextByteMask(HAS_DELTA_WITH_FULL_VALUE);
  // TODO this should really have been at the PartitionMessage level but all
  // masks there are taken
  // also switching the masks will impact backwards compatibility. Need to
  // verify if it is ok to break backwards compatibility

  /*
   * private byte[] oldValBytes; private transient Object oldValObj; private boolean hasOldValue =
   * false; private boolean oldValueIsSerialized = false;
   */
  /**
   * Empty constructor to satisfy {@link DataSerializer}requirements
   */
  public PutMessage() {}

  /** cloning constructor for relaying to listeners */
  PutMessage(PutMessage original, EntryEventImpl event, Set members) {
    super(original, event);
    this.key = original.key;
    if (original.valBytes != null) {
      this.valBytes = original.valBytes;
    } else {
      if (original.valObj instanceof CachedDeserializable) {
        CachedDeserializable cd = (CachedDeserializable) original.valObj;
        if (!cd.isSerialized()) {
          this.valObj = cd.getDeserializedForReading();
        } else {
          Object val = cd.getValue();
          if (val instanceof byte[]) {
            this.valBytes = (byte[]) val;
          } else {
            this.valObj = val;
          }
        }
      } else {
        this.valObj = original.valObj;
      }
    }
    this.cbArg = original.cbArg;
    this.lastModified = original.lastModified;
    this.op = original.op;
    this.bridgeContext = original.bridgeContext;
    this.deserializationPolicy = original.deserializationPolicy;
    this.originalSender = original.getSender();
    Assert.assertTrue(original.eventId != null);
    this.eventId = original.eventId;
    this.result = original.result;
    this.ifNew = original.ifNew;
    this.ifOld = original.ifOld;
    this.internalDs = original.internalDs;
    this.requireOldValue = original.requireOldValue;
    this.expectedOldValue = original.expectedOldValue;
    this.processor = original.processor;
    this.event = event;
    this.versionTag = event.getVersionTag();
  }

  /**
   * copy constructor
   */
  PutMessage(PutMessage original) {
    super(original, null);
    this.bridgeContext = original.bridgeContext;
    this.cbArg = original.cbArg;
    this.deserializationPolicy = original.deserializationPolicy;
    this.event = original.event;
    this.eventId = original.eventId;
    this.expectedOldValue = original.expectedOldValue;
    this.hasDelta = original.hasDelta;
    this.ifNew = original.ifNew;
    this.ifOld = original.ifOld;
    this.internalDs = original.internalDs;
    this.isDeltaApplied = original.isDeltaApplied;
    this.key = original.key;
    this.lastModified = original.lastModified;
    this.notificationOnly = original.notificationOnly;
    this.op = original.op;
    this.originalSender = original.originalSender;
    this.requireOldValue = original.requireOldValue;
    this.result = original.result;
    this.sendDelta = original.sendDelta;
    this.sender = original.sender;
    this.valBytes = original.valBytes;
    this.valObj = original.valObj;
    this.filterInfo = original.filterInfo;
    this.versionTag = original.versionTag;
    /*
     * this.oldValBytes = original.oldValBytes; this.oldValObj = original.oldValObj;
     * this.oldValueIsSerialized = original.oldValueIsSerialized;
     */
  }


  @Override
  public PartitionMessage getMessageForRelayToListeners(EntryEventImpl ev, Set members) {
    PutMessage msg = new PutMessage(this, ev, members);
    msg.requireOldValue = false;
    msg.expectedOldValue = null;
    return msg;
  }

  /**
   * send a notification-only message to a set of listeners. The processor id is passed with the
   * message for reply message processing. This method does not wait on the processor.
   *
   * @param cacheOpReceivers receivers of associated bucket CacheOperationMessage
   * @param adjunctRecipients receivers that must get the event
   * @param filterInfo all client routing information
   * @param r the region affected by the event
   * @param event the event that prompted this action
   * @param processor the processor to reply to
   * @return members that could not be notified
   */
  public static Set notifyListeners(Set cacheOpReceivers, Set adjunctRecipients,
      FilterRoutingInfo filterInfo, PartitionedRegion r, EntryEventImpl event, boolean ifNew,
      boolean ifOld, DirectReplyProcessor processor, boolean sendDeltaWithFullValue) {
    PutMessage msg = new PutMessage(Collections.EMPTY_SET, true, r.getPRId(), processor, event, 0,
        ifNew, ifOld, null, false);
    msg.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
    msg.setInternalDs(r.getSystem());
    msg.versionTag = event.getVersionTag();
    msg.setSendDeltaWithFullValue(sendDeltaWithFullValue);
    return msg.relayToListeners(cacheOpReceivers, adjunctRecipients, filterInfo, event, r,
        processor);
  }

  PutMessage(Set recipients, boolean notifyOnly, int regionId, DirectReplyProcessor processor,
      EntryEventImpl event, final long lastModified, boolean ifNew, boolean ifOld,
      Object expectedOldValue, boolean requireOldValue) {
    super(recipients, regionId, processor, event);
    this.processor = processor;
    this.notificationOnly = notifyOnly;
    this.requireOldValue = requireOldValue;
    this.expectedOldValue = expectedOldValue;
    this.key = event.getKey();
    if (event.hasNewValue()) {
      this.deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY;
      event.exportNewValue(this);
    } else {
      // assert that if !event.hasNewValue, then deserialization policy is NONE
      assert this.deserializationPolicy == DistributedCacheOperation.DESERIALIZATION_POLICY_NONE : this.deserializationPolicy;
    }

    this.event = event;

    this.cbArg = event.getRawCallbackArgument();
    this.lastModified = lastModified;
    this.op = event.getOperation();
    this.bridgeContext = event.getContext();
    this.eventId = event.getEventId();
    this.versionTag = event.getVersionTag();
    Assert.assertTrue(this.eventId != null);
    this.ifNew = ifNew;
    this.ifOld = ifOld;
  }

  @Override
  public boolean isSevereAlertCompatible() {
    // allow forced-disconnect processing for all cache op messages
    return true;
  }

  @Override
  protected Object clone() throws CloneNotSupportedException {
    // TODO Auto-generated method stub
    return super.clone();
  }

  /**
   * Sends a PartitionedRegion {@link org.apache.geode.cache.Region#put(Object, Object)} message to
   * the recipient
   *
   * @param recipient the member to which the put message is sent
   * @param r the PartitionedRegion for which the put was performed
   * @param event the event prompting this message
   * @param ifNew whether a new entry must be created
   * @param ifOld whether an old entry must be updated (no creates)
   * @return the processor used to await acknowledgement that the update was sent, or null to
   *         indicate that no acknowledgement will be sent
   * @throws ForceReattemptException if the peer is no longer available
   */
  public static PartitionResponse send(DistributedMember recipient, PartitionedRegion r,
      EntryEventImpl event, final long lastModified, boolean ifNew, boolean ifOld,
      Object expectedOldValue, boolean requireOldValue) throws ForceReattemptException {
    // Assert.assertTrue(recipient != null, "PutMessage NULL recipient"); recipient can be null for
    // event notifications
    Set recipients = Collections.singleton(recipient);

    PutResponse processor = new PutResponse(r.getSystem(), recipients, event.getKey());

    PutMessage m = new PutMessage(recipients, false, r.getPRId(), processor, event, lastModified,
        ifNew, ifOld, expectedOldValue, requireOldValue);
    m.setInternalDs(r.getSystem());
    m.setSendDelta(true);
    m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());

    processor.setPutMessage(m);

    Set failures = r.getDistributionManager().putOutgoing(m);
    if (failures != null && failures.size() > 0) {
      throw new ForceReattemptException(
          String.format("Failed sending < %s >", m));
    }
    return processor;
  }


  /**
   * create a new EntryEvent to be used in notifying listeners, cache servers, etc. Caller must
   * release result if it is != to sourceEvent
   */
  @Retained
  EntryEventImpl createListenerEvent(EntryEventImpl sourceEvent, PartitionedRegion r,
      InternalDistributedMember member) {
    final EntryEventImpl e2;
    if (this.notificationOnly && this.bridgeContext == null) {
      e2 = sourceEvent;
    } else {
      e2 = new EntryEventImpl(sourceEvent);
      if (this.bridgeContext != null) {
        e2.setContext(this.bridgeContext);
      }
    }
    e2.setRegion(r);
    e2.setOriginRemote(true);
    e2.setInvokePRCallbacks(!notificationOnly);

    if (!sourceEvent.hasOldValue()) {
      e2.oldValueNotAvailable();
    }

    if (this.filterInfo != null) {
      e2.setLocalFilterInfo(this.filterInfo.getFilterInfo(member));
    }

    if (this.versionTag != null) {
      this.versionTag.replaceNullIDs(getSender());
      e2.setVersionTag(this.versionTag);
    }

    return e2;
  }

  public Object getKey() {
    return this.key;
  }

  public void setKey(Object key) {
    this.key = key;
  }

  public byte[] getValBytes() {
    return this.valBytes;
  }

  private void setValBytes(byte[] valBytes) {
    this.valBytes = valBytes;
  }

  private void setValObj(@Unretained(ENTRY_EVENT_NEW_VALUE) Object o) {
    this.valObj = o;
  }

  /**
   * (ashetkar) Strictly for Delta Propagation purpose.
   *
   * @param o Object of type Delta
   */
  public void setDeltaValObj(Object o) {
    if (this.valObj == null) {
      this.valObj = o;
    }
  }

  public Object getCallbackArg() {
    return this.cbArg;
  }

  protected Operation getOperation() {
    return this.op;
  }

  @Override
  public void setOperation(Operation operation) {
    this.op = operation;
  }


  @Override
  public void setFilterInfo(FilterRoutingInfo filterInfo) {
    if (filterInfo != null) {
      this.filterInfo = filterInfo;
    }
  }

  @Override
  public int getDSFID() {
    return PR_PUT_MESSAGE;
  }

  @Override
  public void fromData(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    super.fromData(in, context);

    final int extraFlags = in.readUnsignedByte();
    setKey(DataSerializer.readObject(in));
    this.cbArg = DataSerializer.readObject(in);
    this.lastModified = in.readLong();
    this.op = Operation.fromOrdinal(in.readByte());
    if ((extraFlags & HAS_BRIDGE_CONTEXT) != 0) {
      this.bridgeContext = ClientProxyMembershipID.readCanonicalized(in);
    }
    if ((extraFlags & HAS_ORIGINAL_SENDER) != 0) {
      this.originalSender = (InternalDistributedMember) DataSerializer.readObject(in);
    }
    this.eventId = new EventID();
    InternalDataSerializer.invokeFromData(this.eventId, in);

    if ((flags & HAS_EXPECTED_OLD_VAL) != 0) {
      this.expectedOldValue = DataSerializer.readObject(in);
    }
    if (this.hasFilterInfo) {
      this.filterInfo = new FilterRoutingInfo();
      InternalDataSerializer.invokeFromData(this.filterInfo, in);
    }
    this.deserializationPolicy =
        (byte) (extraFlags & DistributedCacheOperation.DESERIALIZATION_POLICY_MASK);

    if (this.hasDelta) {
      this.deltaBytes = DataSerializer.readByteArray(in);
    } else {
      setValBytes(DataSerializer.readByteArray(in));
      if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) {
        this.deltaBytes = DataSerializer.readByteArray(in);
      }
    }
    if ((flags & HAS_VERSION_TAG) != 0) {
      this.versionTag = DataSerializer.readObject(in);
    }
  }

  @Override
  public EventID getEventID() {
    return this.eventId;
  }

  @Override
  public void toData(DataOutput out,
      SerializationContext context) throws IOException {
    PartitionedRegion region = null;
    try {
      boolean flag = internalDs.getConfig().getDeltaPropagation();
      if (this.event.getDeltaBytes() != null && flag && this.sendDelta) {
        this.hasDelta = true;
      } else {
        // Reset the flag when sending full object.
        this.hasDelta = false;
      }
    } catch (RuntimeException re) {
      throw new InvalidDeltaException(re);
    }
    super.toData(out, context);

    int extraFlags = this.deserializationPolicy;
    if (this.bridgeContext != null)
      extraFlags |= HAS_BRIDGE_CONTEXT;
    if (this.deserializationPolicy != DistributedCacheOperation.DESERIALIZATION_POLICY_NONE
        && (this.valObj != null || getValBytes() != null) && this.sendDeltaWithFullValue
        && this.event.getDeltaBytes() != null) {
      extraFlags |= HAS_DELTA_WITH_FULL_VALUE;
    }
    if (this.originalSender != null)
      extraFlags |= HAS_ORIGINAL_SENDER;
    out.writeByte(extraFlags);

    DataSerializer.writeObject(getKey(), out);
    DataSerializer.writeObject(getCallbackArg(), out);
    out.writeLong(this.lastModified);
    out.writeByte(this.op.ordinal);
    if (this.bridgeContext != null) {
      DataSerializer.writeObject(this.bridgeContext, out);
    }
    if (this.originalSender != null) {
      DataSerializer.writeObject(this.originalSender, out);
    }
    InternalDataSerializer.invokeToData(this.eventId, out);
    if (this.expectedOldValue != null) {
      DataSerializer.writeObject(this.expectedOldValue, out);
    }
    if (this.hasFilterInfo) {
      InternalDataSerializer.invokeToData(this.filterInfo, out);
    }
    if (this.hasDelta) {
      try {
        region = PartitionedRegion.getPRFromId(this.regionId);
      } catch (PRLocallyDestroyedException e) {
        throw new IOException("Delta can not be extracted as region is locally destroyed");
      }
      if (region == null || region.getCachePerfStats() == null) {
        throw new IOException(
            "Delta can not be extracted as region can't be found or is in an invalid state");
      }
      DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
      region.getCachePerfStats().incDeltasSent();
    } else {
      DistributedCacheOperation.writeValue(this.deserializationPolicy, this.valObj, getValBytes(),
          out);
      if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) {
        DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
      }
    }
    if (this.versionTag != null) {
      DataSerializer.writeObject(this.versionTag, out);
    }
  }

  @Override
  protected short computeCompressedShort(short s) {
    s = super.computeCompressedShort(s);
    if (this.ifNew)
      s |= IF_NEW;
    if (this.ifOld)
      s |= IF_OLD;
    if (this.requireOldValue)
      s |= REQUIRED_OLD_VAL;
    if (this.expectedOldValue != null)
      s |= HAS_EXPECTED_OLD_VAL;
    if (this.filterInfo != null) {
      s |= HAS_FILTER_INFO;
      this.hasFilterInfo = true;
    }
    if (this.hasDelta) {
      s |= HAS_DELTA;
      if (this.bridgeContext != null) {
        // delta bytes sent by client to accessor or secondary data store
        // requires to set data policy explicitly to LAZY.
        this.deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY;
      }
    }
    if (this.versionTag != null)
      s |= HAS_VERSION_TAG;
    return s;
  }

  @Override
  protected void setBooleans(short s, DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    super.setBooleans(s, in, context);
    this.ifNew = ((s & IF_NEW) != 0);
    this.ifOld = ((s & IF_OLD) != 0);
    this.requireOldValue = ((s & REQUIRED_OLD_VAL) != 0);
    this.hasFilterInfo = ((s & HAS_FILTER_INFO) != 0);
    this.hasDelta = ((s & HAS_DELTA) != 0);
  }

  /**
   * This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
   * It is very important that this message does NOT cause any deadlocks as the sender will wait
   * indefinitely for the acknowledgement
   */
  @Override
  protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion r,
      long startTime) throws EntryExistsException, DataLocationException, IOException {
    this.setInternalDs(r.getSystem());// set the internal DS. Required to
                                      // checked DS level delta-enabled property
                                      // while sending delta
    PartitionedRegionDataStore ds = r.getDataStore();
    boolean sendReply = true;

    InternalDistributedMember eventSender = originalSender;
    if (eventSender == null) {
      eventSender = getSender();
    }
    @Released
    final EntryEventImpl ev =
        EntryEventImpl.create(r, getOperation(), getKey(), null, /* newValue */
            getCallbackArg(), false/* originRemote - false to force distribution in buckets */,
            eventSender, true/* generateCallbacks */, false/* initializeId */);
    try {
      if (this.versionTag != null) {
        this.versionTag.replaceNullIDs(getSender());
        ev.setVersionTag(this.versionTag);
      }
      if (this.bridgeContext != null) {
        ev.setContext(this.bridgeContext);
      }
      Assert.assertTrue(eventId != null);
      ev.setEventId(eventId);
      ev.setCausedByMessage(this);
      ev.setInvokePRCallbacks(!notificationOnly);
      ev.setPossibleDuplicate(this.posDup);
      /*
       * if (this.hasOldValue) { if (this.oldValueIsSerialized) {
       * ev.setSerializedOldValue(getOldValueBytes()); } else { ev.setOldValue(getOldValueBytes());
       * } }
       */

      ev.setDeltaBytes(this.deltaBytes);
      if (this.hasDelta) {
        this.valObj = null;
        // New value will be set once it is generated with fromDelta() inside
        // EntryEventImpl.processDeltaBytes()
        ev.setNewValue(this.valObj);
      } else {
        switch (this.deserializationPolicy) {
          case DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY:
            ev.setSerializedNewValue(getValBytes());
            break;
          case DistributedCacheOperation.DESERIALIZATION_POLICY_NONE:
            ev.setNewValue(getValBytes());
            break;
          default:
            throw new AssertionError("unknown deserialization policy: " + deserializationPolicy);
        }
      }

      if (!notificationOnly) {
        if (ds == null) {
          throw new AssertionError(
              "This process should have storage" + " for this operation: " + this.toString());
        }
        try {
          ev.setOriginRemote(false);
          result =
              r.getDataView().putEntryOnRemote(ev, this.ifNew, this.ifOld, this.expectedOldValue,
                  this.requireOldValue, this.lastModified, true/* overwriteDestroyed *not* used */);

          if (!this.result) { // make sure the region hasn't gone away
            r.checkReadiness();
          }
        } catch (CacheWriterException | PrimaryBucketException cwe) {
          sendReply(getSender(), getProcessorId(), dm, new ReplyException(cwe), r, startTime);
          return false;
        } catch (InvalidDeltaException ide) {
          sendReply(getSender(), getProcessorId(), dm, new ReplyException(ide), r, startTime);
          r.getCachePerfStats().incDeltaFullValuesRequested();
          return false;
        }
        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
          logger.trace(LogMarker.DM_VERBOSE, "PutMessage {} with key: {} val: {}",
              (result ? "updated bucket" : "did not update bucket"), getKey(),
              (getValBytes() == null ? "null" : "(" + getValBytes().length + " bytes)"));
        }
      } else { // notificationOnly
        @Released
        EntryEventImpl e2 = createListenerEvent(ev, r, dm.getDistributionManagerId());
        final EnumListenerEvent le;
        try {
          if (e2.getOperation().isCreate()) {
            le = EnumListenerEvent.AFTER_CREATE;
          } else {
            le = EnumListenerEvent.AFTER_UPDATE;
          }
          r.invokePutCallbacks(le, e2, r.isInitialized(), true);
        } finally {
          // if e2 == ev then no need to free it here. The outer finally block will get it.
          if (e2 != ev) {
            e2.release();
          }
        }
        result = true;
      }

      setOperation(ev.getOperation()); // set operation for reply message

      if (sendReply) {
        sendReply(getSender(), getProcessorId(), dm, null, r, startTime, ev);
      }
      return false;
    } finally {
      ev.release();
    }
  }


  // override reply processor type from PartitionMessage
  PartitionResponse createReplyProcessor(PartitionedRegion r, Set recipients, Object k) {
    return new PutResponse(r.getSystem(), recipients, k);
  }


  protected void sendReply(InternalDistributedMember member, int procId, DistributionManager dm,
      ReplyException ex, PartitionedRegion pr, long startTime, EntryEventImpl ev) {
    if (pr != null && startTime > 0) {
      pr.getPrStats().endPartitionMessagesProcessing(startTime);
      pr.getCancelCriterion().checkCancelInProgress(null); // bug 39014 - don't send a positive
                                                           // response if we may have failed
    }
    PutReplyMessage.send(member, procId, getReplySender(dm), result, getOperation(), ex, this, ev);
  }


  @Override
  protected void appendFields(StringBuilder buff) {
    super.appendFields(buff);
    buff.append("; key=").append(getKey()).append("; value=");
    // buff.append(getValBytes());
    buff.append(getValBytes() == null ? this.valObj : "(" + getValBytes().length + " bytes)");
    buff.append("; callback=").append(this.cbArg).append("; op=").append(this.op);
    if (this.originalSender != null) {
      buff.append("; originalSender=").append(originalSender);
    }
    if (this.bridgeContext != null) {
      buff.append("; bridgeContext=").append(this.bridgeContext);
    }
    if (this.eventId != null) {
      buff.append("; eventId=").append(this.eventId);
    }
    buff.append("; ifOld=").append(this.ifOld).append("; ifNew=").append(this.ifNew).append("; op=")
        .append(this.getOperation());
    if (this.versionTag != null) {
      buff.append("; version=").append(this.versionTag);
    }
    buff.append("; deserializationPolicy=");
    buff.append(
        DistributedCacheOperation.deserializationPolicyToString(this.deserializationPolicy));
    if (this.hasDelta) {
      buff.append("; hasDelta=");
      buff.append(this.hasDelta);
    }
    if (this.sendDelta) {
      buff.append("; sendDelta=");
      buff.append(this.sendDelta);
    }
    if (this.isDeltaApplied) {
      buff.append("; isDeltaApplied=");
      buff.append(this.isDeltaApplied);
    }
    if (this.filterInfo != null) {
      buff.append("; ");
      buff.append(this.filterInfo.toString());
    }
  }

  public InternalDistributedSystem getInternalDs() {
    return internalDs;
  }

  public void setInternalDs(InternalDistributedSystem internalDs) {
    this.internalDs = internalDs;
  }

  @Override
  protected boolean mayNotifySerialGatewaySender(ClusterDistributionManager dm) {
    return notifiesSerialGatewaySender(dm);
  }

  public static class PutReplyMessage extends ReplyMessage implements OldValueImporter {
    /** Result of the Put operation */
    boolean result;

    /** The Operation actually performed */
    Operation op;

    /**
     * Old value in serialized form: either a byte[] or CachedDeserializable, or null if not set.
     */
    @Unretained(ENTRY_EVENT_OLD_VALUE)
    Object oldValue;

    VersionTag versionTag;

    /**
     * Set to true by the import methods if the oldValue is already serialized. In that case toData
     * should just copy the bytes to the stream. In either case fromData just calls readObject.
     */
    private transient boolean oldValueIsSerialized;

    @Override
    public boolean getInlineProcess() {
      return true;
    }

    /**
     * Empty constructor to conform to DataSerializable interface
     */
    public PutReplyMessage() {}

    // package access for unit test
    PutReplyMessage(int processorId, boolean result, Operation op, ReplyException ex,
        Object oldValue, VersionTag version) {
      super();
      this.op = op;
      this.result = result;
      setProcessorId(processorId);
      setException(ex);
      this.oldValue = oldValue;
      this.versionTag = version;
    }

    /** Send an ack */
    public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm,
        boolean result, Operation op, ReplyException ex, PutMessage sourceMessage,
        EntryEventImpl ev) {
      Assert.assertTrue(recipient != null, "PutReplyMessage NULL reply message");
      PutReplyMessage m =
          new PutReplyMessage(processorId, result, op, ex, null, ev.getVersionTag());
      if (!sourceMessage.notificationOnly && sourceMessage.requireOldValue) {
        ev.exportOldValue(m);
      }

      m.setRecipient(recipient);
      dm.putOutgoing(m);
    }

    /**
     * Processes this message. This method is invoked by the receiver of the message.
     *
     * @param dm the distribution manager that is processing the message.
     */
    @Override
    public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
      final long startTime = getTimestamp();
      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
        logger.trace(LogMarker.DM_VERBOSE,
            "PutReplyMessage process invoking reply processor with processorId: {}",
            this.processorId);
      }
      if (rp == null) {
        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
          logger.trace(LogMarker.DM_VERBOSE, "PutReplyMessage processor not found");
        }
        return;
      }
      if (rp instanceof PutResponse) {
        PutResponse processor = (PutResponse) rp;
        processor.setResponse(this);
      }
      rp.process(this);

      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
        logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", rp, this);
      }
      dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
    }

    /** Return oldValue in serialized form */
    public Object getOldValue() {
      return this.oldValue;
    }

    @Override
    public int getDSFID() {
      return PR_PUT_REPLY_MESSAGE;
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      super.fromData(in, context);
      this.result = in.readBoolean();
      this.op = Operation.fromOrdinal(in.readByte());
      this.oldValue = DataSerializer.readObject(in);
      this.versionTag = (VersionTag) DataSerializer.readObject(in);
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      super.toData(out, context);
      out.writeBoolean(this.result);
      out.writeByte(this.op.ordinal);
      Object ov = getOldValue();
      RemotePutMessage.PutReplyMessage.oldValueToData(out, getOldValue(),
          this.oldValueIsSerialized);
      DataSerializer.writeObject(this.versionTag, out);
    }

    @Override
    public String toString() {
      StringBuffer sb = new StringBuffer();
      sb.append("PutReplyMessage ").append("processorid=").append(this.processorId)
          .append(" returning ").append(this.result).append(" op=").append(op).append(" exception=")
          .append(getException()).append(" oldValue=")
          .append(this.oldValue == null ? "null" : "not null").append(" version=")
          .append(this.versionTag);
      return sb.toString();
    }

    @Override
    public boolean prefersOldSerialized() {
      return true;
    }

    @Override
    public boolean isUnretainedOldReferenceOk() {
      return true;
    }

    @Override
    public boolean isCachedDeserializableValueOk() {
      return true;
    }


    @Override
    public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov,
        boolean isSerialized) {
      this.oldValue = ov;
      this.oldValueIsSerialized = isSerialized;
    }

    @Override
    public void importOldBytes(byte[] ov, boolean isSerialized) {
      importOldObject(ov, isSerialized);
    }
  }

  /**
   * A processor to capture the value returned by {@link PutMessage}
   *
   * @since GemFire 5.1
   */
  public static class PutResponse extends PartitionResponse {
    private volatile boolean returnValue;
    private volatile Operation op;
    private volatile Object oldValue;
    private final Object key;
    private PutMessage putMessage;
    private VersionTag versionTag;

    public PutResponse(InternalDistributedSystem ds, Set recipients, Object key) {
      super(ds, recipients, false);
      this.key = key;
    }


    public void setPutMessage(PutMessage putMessage) {
      this.putMessage = putMessage;
    }

    public void setResponse(PutReplyMessage response) {
      // boolean response, Operation op, Object oldValue) {

      this.returnValue = response.result;
      this.op = response.op;
      this.oldValue = response.oldValue;
      this.versionTag = response.versionTag;
      if (this.versionTag != null) {
        this.versionTag.replaceNullIDs(response.getSender());
      }
    }

    /**
     * @return the result of the remote put operation
     * @throws ForceReattemptException if the peer is no longer available
     * @throws CacheException if the peer generates an error
     */
    public PutResult waitForResult() throws CacheException, ForceReattemptException {
      try {
        waitForCacheException();
      } catch (ForceReattemptException e) {
        e.checkKey(key);
        throw e;
      }
      if (this.op == null) {
        throw new ForceReattemptException(
            "did not receive a valid reply");
      }
      // try {
      // waitForRepliesUninterruptibly();
      // }
      // catch (ReplyException e) {
      // Throwable t = e.getCause();
      // if (t instanceof CacheClosedException) {
      // throw new PartitionedRegionCommunicationException("Put operation received an exception",
      // t);
      // }
      // e.handleAsUnexpected();
      // }
      return new PutResult(this.returnValue, this.op, this.oldValue, this.versionTag);
    }

    @Override
    public void process(final DistributionMessage msg) {

      if (msg instanceof ReplyMessage) {
        ReplyException ex = ((ReplyMessage) msg).getException();
        if (this.putMessage.bridgeContext == null
            // Why is this code not happening for bug 41916?
            && (ex != null && ex.getCause() instanceof InvalidDeltaException)) {
          final PutMessage putMsg = new PutMessage(this.putMessage);
          final DistributionManager dm = getDistributionManager();
          Runnable sendFullObject = new Runnable() {
            @Override
            public void run() {
              putMsg.resetRecipients();
              putMsg.setRecipient(msg.getSender());
              putMsg.setSendDelta(false);
              if (logger.isDebugEnabled()) {
                logger.debug("Sending full object({}) to {}", putMsg,
                    Arrays.toString(putMsg.getRecipients()));
              }
              dm.putOutgoing(putMsg);

              // Update stats
              try {
                PartitionedRegion.getPRFromId(putMsg.regionId).getCachePerfStats()
                    .incDeltaFullValuesSent();
              } catch (Exception e) {
              }
            }

            @Override
            public String toString() {
              return "Sending full object {" + putMsg.toString() + "}";
            }
          };
          if (isExpectingDirectReply()) {
            sendFullObject.run();
          } else {
            getDistributionManager().getExecutors().getWaitingThreadPool().execute(sendFullObject);
          }
          return;
        }
      }
      super.process(msg);
    }
  }

  public static class PutResult {
    /** the result of the put operation */
    public boolean returnValue;
    /** the actual operation performed (CREATE/UPDATE) */
    public Operation op;

    /** the old value, or null if not set */
    public Object oldValue;

    /** the concurrency control version tag */
    public VersionTag versionTag;

    public PutResult(boolean flag, Operation actualOperation, Object oldValue, VersionTag version) {
      this.returnValue = flag;
      this.op = actualOperation;
      this.oldValue = oldValue;
      this.versionTag = version;
    }
  }

  public void setSendDelta(boolean sendDelta) {
    this.sendDelta = sendDelta;
  }

  // NewValueImporter methods

  @Override
  public boolean prefersNewSerialized() {
    return true;
  }

  @Override
  public boolean isUnretainedNewReferenceOk() {
    return true;
  }

  private void setDeserializationPolicy(boolean isSerialized) {
    if (!isSerialized) {
      this.deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE;
    }
  }

  @Override
  public void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized) {
    setDeserializationPolicy(isSerialized);
    setValObj(nv);
  }

  @Override
  public void importNewBytes(byte[] nv, boolean isSerialized) {
    setDeserializationPolicy(isSerialized);
    setValBytes(nv);
  }
}
