/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.TX_ENTRY_STATE;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheRuntimeException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.lang.StringUtils;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.pdx.PdxSerializationException;

/**
 * TXEntryState is the entity that tracks transactional changes, except for those tracked by
 * {@link TXEntryUserAttrState}, to an entry.
 *
 * @since GemFire 4.0
 */
public class TXEntryState implements Releasable {
  private static final Logger logger = LogService.getLogger();

  /**
   * This field is final except for when it is nulled out during cleanup
   */
  @Retained(TX_ENTRY_STATE)
  private Object originalVersionId;
  private final Object originalValue;

  /**
   * Serial number that is set each time this entry is modified. Used to order the events in a
   * TransactionEvent.
   */
  protected int modSerialNum;
  /**
   * Used to remember the event id to use on the farSide for this entry. See bug 39434.
   *
   * @since GemFire 5.7
   */
  private int farSideEventOffset = -1;
  /**
   * Used to remember the event id to use on the nearSide for this entry. See bug 39434.
   *
   * @since GemFire 5.7
   */
  private int nearSideEventOffset = -1;

  private Object pendingValue;

  private byte[] serializedPendingValue;

  /**
   * Remember the callback argument for listener invocation
   */
  private Object callBackArgument;

  private byte op;

  /**
   * destroy field remembers the strongest destroy op perfomed on this entry
   */
  private byte destroy; // DESTROY_NONE, DESTROY_LOCAL, DESTROY_DISTRIBUTED

  // ORDER of the following is important to the implementation!
  private static final byte DESTROY_NONE = 0;

  private static final byte DESTROY_LOCAL = 1;

  private static final byte DESTROY_DISTRIBUTED = 2;

  // ORDER of the following is important to the implementation!
  private static final byte OP_NULL = 0;

  private static final byte OP_L_DESTROY = 1;

  private static final byte OP_CREATE_LD = 2;

  private static final byte OP_LLOAD_CREATE_LD = 3;

  private static final byte OP_NLOAD_CREATE_LD = 4;

  private static final byte OP_PUT_LD = 5;

  private static final byte OP_LLOAD_PUT_LD = 6;

  private static final byte OP_NLOAD_PUT_LD = 7;

  private static final byte OP_D_INVALIDATE_LD = 8;

  private static final byte OP_D_DESTROY = 9;

  private static final byte OP_L_INVALIDATE = 10;

  private static final byte OP_PUT_LI = 11;

  private static final byte OP_LLOAD_PUT_LI = 12;

  private static final byte OP_NLOAD_PUT_LI = 13;

  private static final byte OP_D_INVALIDATE = 14;

  private static final byte OP_CREATE_LI = 15;

  private static final byte OP_LLOAD_CREATE_LI = 16;

  private static final byte OP_NLOAD_CREATE_LI = 17;

  private static final byte OP_CREATE = 18;

  private static final byte OP_SEARCH_CREATE = 19;

  private static final byte OP_LLOAD_CREATE = 20;

  private static final byte OP_NLOAD_CREATE = 21;

  private static final byte OP_LOCAL_CREATE = 22;

  private static final byte OP_PUT = 23;

  static byte getOpCreate() {
    return OP_CREATE;
  }

  static byte getOpSearchCreate() {
    return OP_SEARCH_CREATE;
  }

  static byte getOpLloadCreate() {
    return OP_LLOAD_CREATE;
  }

  static byte getOpNloadCreate() {
    return OP_NLOAD_CREATE;
  }

  static byte getOpPut() {
    return OP_PUT;
  }

  static byte getOpSearchPut() {
    return OP_SEARCH_PUT;
  }

  static byte getOpLloadPut() {
    return OP_LLOAD_PUT;
  }

  static byte getOpNloadPut() {
    return OP_NLOAD_PUT;
  }

  private static final byte OP_SEARCH_PUT = 24;

  private static final byte OP_LLOAD_PUT = 25;

  private static final byte OP_NLOAD_PUT = 26;

  /**
   * System property to be set when read conflicts should be detected. Benefits of read conflict
   * detection are at: https://wiki.gemstone.com/display/PR/Read+conflict+detection
   */
  private static final boolean DETECT_READ_CONFLICTS =
      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "detectReadConflicts");

  // TODO: optimize footprint by having this field on a subclass
  // that is only created by TXRegionState when it knows its region needs refCounts.
  /**
   * A reference to the RegionEntry, in committed state, that this tx entry has referenced. Note:
   * this field is only needed if the committed region has eviction or expiration enabled. In both
   * those cases the tx needs to do reference counting.
   */
  private final RegionEntry refCountEntry;

  /**
   * Set to true once this entry has been written by a tx operation.
   */
  private boolean dirty;

  /**
   * Set to true if this operation is result of a bulk op. We use this boolean to determine op type
   * rather than extending the operation algebra.
   */
  private boolean bulkOp;

  private FilterRoutingInfo filterRoutingInfo = null;
  private Set<InternalDistributedMember> adjunctRecipients = null;

  /**
   * versionTag to be sent to other members. This is propogated up from AbstractRegionMap and then
   * used while building TXCommitMessage
   */
  private VersionTag versionTag = null;

  /**
   * tailKey (used by wan) to be sent to other members. This is propagated up from AbstractRegionMap
   * and then used while building TXCommitMessage
   */
  private long tailKey = -1;

  /**
   * versionTag that is fetched from remote members, if this member's data policy is not REPLICATE
   */
  private VersionTag remoteVersionTag = null;

  /**
   * Next region version generated on the primary
   */
  private long nextRegionVersion = -1;

  /*
   * For Distributed Transaction. THis value is set when applying commit
   */
  private transient DistTxThinEntryState distTxThinEntryState;

  /**
   * Use this system property if you need to display/log string values in conflict messages
   */
  private static final boolean VERBOSE_CONFLICT_STRING =
      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "verboseConflictString");

  /**
   * This constructor is used to create a singleton used by LocalRegion to signal that noop
   * invalidate op has been performed. The instance returned by this constructor is just a marker;
   * it is not good for anything else.
   */
  protected TXEntryState() {
    this.op = OP_NULL;
    this.originalVersionId = Token.REMOVED_PHASE1;
    this.originalValue = Token.REMOVED_PHASE1;
    this.refCountEntry = null;
  }

  private TXRegionState txRegionState = null;

  /**
   * This constructor is used when creating an entry
   */
  protected TXEntryState(RegionEntry re, Object pvId, Object pv, TXRegionState txRegionState,
      boolean isDistributed) {
    Object vId = pvId;
    if (vId == null) {
      vId = Token.REMOVED_PHASE1;
    }
    if (pv == null) {
      pv = Token.REMOVED_PHASE1;
    }
    this.op = OP_NULL;
    this.pendingValue = pv;
    this.originalVersionId = vId;
    this.originalValue = pv;
    if (txRegionState.needsRefCounts()) {
      this.refCountEntry = re;
      if (re != null) {
        re.incRefCount();
      }
    } else {
      this.refCountEntry = null;
    }
    this.txRegionState = txRegionState;
    if (isDistributed) {
      this.distTxThinEntryState = new DistTxThinEntryState();
    }
  }

  private byte[] getSerializedPendingValue() {
    return serializedPendingValue;
  }

  private void setSerializedPendingValue(byte[] serializedPendingValue) {
    this.serializedPendingValue = serializedPendingValue;
  }

  void serializePendingValue() {
    Object pendingValue = getPendingValue();
    if (Token.isInvalidOrRemoved(pendingValue)) {
      // token
      return;
    }
    if (pendingValue instanceof byte[]) {
      // byte[]
      return;
    }

    // CachedDeserialized, Object or PDX
    if (pendingValue instanceof CachedDeserializable) {
      CachedDeserializable cachedDeserializable = (CachedDeserializable) pendingValue;
      setSerializedPendingValue(cachedDeserializable.getSerializedValue());
    } else {
      setSerializedPendingValue(EntryEventImpl.serialize(pendingValue));
    }
  }

  public TXRegionState getTXRegionState() {
    return txRegionState;
  }

  public Object getOriginalVersionId() {
    return this.originalVersionId;
  }

  public Object getOriginalValue() {
    return this.originalValue;
  }

  public Object getPendingValue() {
    return this.pendingValue;
  }

  public Object getCallbackArgument() {
    return this.callBackArgument;
  }

  /**
   * Gets the pending value for near side operations. Special cases local destroy and local
   * invalidate to fix bug 34387.
   */
  @Unretained
  public Object getNearSidePendingValue() {
    if (isOpLocalDestroy()) {
      return null;
    } else if (isOpLocalInvalidate()) {
      return Token.LOCAL_INVALID;
    } else {
      return getPendingValue();
    }
  }

  void setPendingValue(Object pv) {
    this.pendingValue = pv;
  }

  void setCallbackArgument(Object callbackArgument) {
    this.callBackArgument = callbackArgument;
  }

  /**
   * Fetches the current modification serial number from the txState and puts it in this entry.
   *
   * @param modNum the next modified serial number gotten from TXState
   *
   * @since GemFire 5.0
   */
  public void updateForWrite(final int modNum) {
    this.dirty = true;
    this.modSerialNum = modNum;
  }

  /**
   * Returns true if this entry has been written; false if only read
   *
   * @since GemFire 5.1
   */
  public boolean isDirty() {
    return DETECT_READ_CONFLICTS || this.dirty;
  }

  /**
   * Return true if entry has an operation.
   *
   * @since GemFire 5.5
   */
  public boolean hasOp() {
    return this.op != OP_NULL;
  }

  /**
   * Returns true if the transaction state has this entry existing "locally". Returns false if the
   * transaction is going to remove this entry.
   */
  public boolean existsLocally() {
    if (this.op == OP_NULL) {
      return !Token.isRemoved(getOriginalValue());
    } else {
      return this.op > OP_D_DESTROY;
    }
  }

  private boolean isOpLocalDestroy() {
    return this.op >= OP_L_DESTROY && this.op <= OP_D_INVALIDATE_LD;
  }

  private boolean isOpLocalInvalidate() {
    return this.op >= OP_L_INVALIDATE && this.op <= OP_NLOAD_CREATE_LI
        && this.op != OP_D_INVALIDATE;
  }

  /**
   * Return true if this transaction has completely invalidated or destroyed the value of this entry
   * in the entire distributed system. Return false if a netsearch should be done.
   */
  public boolean noValueInSystem() {
    if (this.op == OP_D_DESTROY || this.op == OP_D_INVALIDATE_LD || this.op == OP_D_INVALIDATE) {
      return true;
    } else if (getNearSidePendingValue() == Token.INVALID) {
      // Note that we are not interested in LOCAL_INVALID
      return (this.op >= OP_CREATE_LD && this.op != OP_L_INVALIDATE && this.op != OP_SEARCH_CREATE
          && this.op != OP_LOCAL_CREATE && this.op != OP_SEARCH_PUT);
    } else {
      return false;
    }
  }

  /**
   * Returns true if the transaction state has this entry existing locally and has a valid local
   * value. Returns false if the transaction is going to remove this entry or its local value is
   * invalid.
   */
  public boolean isLocallyValid(boolean isProxy) {
    if (this.op == OP_NULL) {
      if (isProxy) {
        // If it is a proxy that consider it locally valid
        // since we don't have any local committed state
        return true;
      } else {
        return !Token.isInvalidOrRemoved(getOriginalValue());
      }
    } else {
      return this.op >= OP_CREATE && !Token.isInvalid(getNearSidePendingValue());
    }
  }

  @Retained
  public Object getValueInVM(Object key) throws EntryNotFoundException {
    if (!existsLocally()) {
      throw new EntryNotFoundException(String.valueOf(key));
    }
    return getNearSidePendingValue();
  }

  /**
   * @return the value, or null if the value does not exist in the cache, Token.INVALID or
   *         Token.LOCAL_INVALID if the value is invalid
   */
  public Object getValue(Object key, Region r, boolean preferCD) {
    if (!existsLocally()) {
      return null;
    }
    Object v = getNearSidePendingValue();
    if (v instanceof CachedDeserializable && !preferCD) {
      // The only reason we would is if we do a read
      // in a transaction and the initial op that created the TXEntryState
      // did not call getDeserializedValue.
      // If a write op is done then the pendingValue does not belong to
      // the cache yet so we do not need the RegionEntry.
      // If we do need the RegionEntry then TXEntryState
      // will need to be changed to remember the RegionEntry it is created with.
      v = ((CachedDeserializable) v).getDeserializedValue(r, this.refCountEntry);
    }

    return v;
  }

  private boolean isOpCreate() {
    return this.op >= OP_CREATE_LI && this.op <= OP_LOCAL_CREATE;
  }

  boolean isOpCreateEvent() {
    return isOpCreate();
  }

  private boolean isOpPut() {
    return this.op >= OP_PUT;
  }

  protected boolean isOpPutEvent() {
    return isOpPut();
  }

  private boolean isOpInvalidate() {
    // Note that OP_CREATE_LI, OP_LLOAD_CREATE_LI, and OP_NLOAD_CREATE_LI
    // do not return true here because they are actually creates
    // with a value of LOCAL_INVALID locally and some other value remotely.
    return this.op <= OP_D_INVALIDATE && this.op >= OP_L_INVALIDATE;
  }

  boolean isOpInvalidateEvent() {
    return isOpInvalidate();
  }

  private boolean isOpDestroy() {
    return this.op <= OP_D_DESTROY && this.op >= OP_L_DESTROY;
  }

  boolean isOpDestroyEvent(InternalRegion r) {
    // Note that if the region is a proxy then we go ahead and distributed
    // the destroy because we can't eliminate it based on committed state
    return isOpDestroy()
        && (r.isProxy() || (getOriginalValue() != null && !Token.isRemoved(getOriginalValue())));
  }

  /**
   * Returns true if this operation has an event for the tx listener
   *
   * @since GemFire 5.0
   */
  boolean isOpAnyEvent(InternalRegion r) {
    return isOpPutEvent() || isOpCreateEvent() || isOpInvalidateEvent() || isOpDestroyEvent(r);
  }

  boolean isOpSearch() {
    return this.op == OP_SEARCH_CREATE || this.op == OP_SEARCH_PUT;
  }

  String opToString() {
    return opToString(this.op);
  }

  private String opToString(byte opCode) {
    switch (opCode) {
      case OP_NULL:
        return "OP_NULL";
      case OP_L_DESTROY:
        return "OP_L_DESTROY";
      case OP_CREATE_LD:
        return "OP_CREATE_LD";
      case OP_LLOAD_CREATE_LD:
        return "OP_LLOAD_CREATE_LD";
      case OP_NLOAD_CREATE_LD:
        return "OP_NLOAD_CREATE_LD";
      case OP_PUT_LD:
        return "OP_PUT_LD";
      case OP_LLOAD_PUT_LD:
        return "OP_LLOAD_PUT_LD";
      case OP_NLOAD_PUT_LD:
        return "OP_NLOAD_PUT_LD";
      case OP_D_INVALIDATE_LD:
        return "OP_D_INVALIDATE_LD";
      case OP_D_DESTROY:
        return "OP_D_DESTROY";
      case OP_L_INVALIDATE:
        return "OP_L_INVALIDATE";
      case OP_PUT_LI:
        return "OP_PUT_LI";
      case OP_LLOAD_PUT_LI:
        return "OP_LLOAD_PUT_LI";
      case OP_NLOAD_PUT_LI:
        return "OP_NLOAD_PUT_LI";
      case OP_D_INVALIDATE:
        return "OP_D_INVALIDATE";
      case OP_CREATE_LI:
        return "OP_CREATE_LI";
      case OP_LLOAD_CREATE_LI:
        return "OP_LLOAD_CREATE_LI";
      case OP_NLOAD_CREATE_LI:
        return "OP_NLOAD_CREATE_LI";
      case OP_CREATE:
        return "OP_CREATE";
      case OP_SEARCH_CREATE:
        return "OP_SEARCH_CREATE";
      case OP_LLOAD_CREATE:
        return "OP_LLOAD_CREATE";
      case OP_NLOAD_CREATE:
        return "OP_NLOAD_CREATE";
      case OP_LOCAL_CREATE:
        return "OP_LOCAL_CREATE";
      case OP_PUT:
        return "OP_PUT";
      case OP_SEARCH_PUT:
        return "OP_SEARCH_PUT";
      case OP_LLOAD_PUT:
        return "OP_LLOAD_PUT";
      case OP_NLOAD_PUT:
        return "OP_NLOAD_PUT";
      default:
        return "<unhandled op " + opCode + " >";
    }
  }

  /**
   * Returns an Operation instance that matches what the transactional operation done on this entry
   * in the cache the the transaction was performed in.
   */
  protected Operation getNearSideOperation() {
    switch (this.op) {
      case OP_NULL:
        return null;
      case OP_L_DESTROY:
        return Operation.LOCAL_DESTROY;
      case OP_CREATE_LD:
        return Operation.LOCAL_DESTROY;
      case OP_LLOAD_CREATE_LD:
        return Operation.LOCAL_DESTROY;
      case OP_NLOAD_CREATE_LD:
        return Operation.LOCAL_DESTROY;
      case OP_PUT_LD:
        return Operation.LOCAL_DESTROY;
      case OP_LLOAD_PUT_LD:
        return Operation.LOCAL_DESTROY;
      case OP_NLOAD_PUT_LD:
        return Operation.LOCAL_DESTROY;
      case OP_D_INVALIDATE_LD:
        return Operation.LOCAL_DESTROY;
      case OP_D_DESTROY:
        return getDestroyOperation();
      case OP_L_INVALIDATE:
        return Operation.LOCAL_INVALIDATE;
      case OP_PUT_LI:
        return Operation.LOCAL_INVALIDATE;
      case OP_LLOAD_PUT_LI:
        return Operation.LOCAL_INVALIDATE;
      case OP_NLOAD_PUT_LI:
        return Operation.LOCAL_INVALIDATE;
      case OP_D_INVALIDATE:
        return Operation.INVALIDATE;
      case OP_CREATE_LI:
        return getCreateOperation();
      case OP_LLOAD_CREATE_LI:
        return getCreateOperation();
      case OP_NLOAD_CREATE_LI:
        return getCreateOperation();
      case OP_CREATE:
        return getCreateOperation();
      case OP_SEARCH_CREATE:
        return Operation.SEARCH_CREATE;
      case OP_LLOAD_CREATE:
        return Operation.LOCAL_LOAD_CREATE;
      case OP_NLOAD_CREATE:
        return Operation.NET_LOAD_CREATE;
      case OP_LOCAL_CREATE:
        return getCreateOperation();
      case OP_PUT:
        return getUpdateOperation();
      case OP_SEARCH_PUT:
        return Operation.SEARCH_UPDATE;
      case OP_LLOAD_PUT:
        return Operation.LOCAL_LOAD_UPDATE;
      case OP_NLOAD_PUT:
        return Operation.NET_LOAD_CREATE;
      default:
        throw new IllegalStateException(
            String.format("<unhandled op %s >", Byte.valueOf(this.op)));
    }
  }

  /**
   * @return true when the operation is the result of a bulk op
   */
  private boolean isBulkOp() {
    return this.bulkOp;
  }

  /**
   * Calculate and return the event offset based on the sequence id on TXState.
   *
   * @since GemFire 5.7
   */
  private static int generateEventOffset(TXState txState) {
    long seqId = EventID.reserveSequenceId();
    int offset = (int) (seqId - txState.getBaseSequenceId());
    return offset;
  }

  /**
   * Generate offsets for different eventIds; one for nearside and one for farside for the ops for
   * this entry.
   *
   * @since GemFire 5.7
   */
  private void generateBothEventOffsets(TXState txState) {
    assert this.farSideEventOffset == -1;
    this.farSideEventOffset = generateEventOffset(txState);
    generateNearSideEventOffset(txState);
  }

  /**
   * Generate the offset for an eventId that will be used for both a farside and nearside op for
   * this entry.
   *
   * @since GemFire 5.7
   */
  private void generateSharedEventOffset(TXState txState) {
    assert this.farSideEventOffset == -1;
    generateNearSideEventOffset(txState);
    this.farSideEventOffset = this.nearSideEventOffset;
  }

  /**
   * Generate the offset for an eventId that will be used for the nearside op for this entry. No
   * farside op will be done.
   *
   * @since GemFire 5.7
   */
  private void generateNearSideOnlyEventOffset(TXState txState) {
    generateNearSideEventOffset(txState);
  }

  private void generateNearSideEventOffset(TXState txState) {
    assert this.nearSideEventOffset == -1;
    this.nearSideEventOffset = generateEventOffset(txState);
  }

  private int getFarSideEventOffset() {
    assert this.nearSideEventOffset != -1;
    return this.nearSideEventOffset;
  }

  private static EventID createEventID(TXState txState, int offset) {
    return new EventID(txState.getBaseMembershipId(), txState.getBaseThreadId(),
        txState.getBaseSequenceId() + offset);
  }

  /**
   * Calculate (if farside has not already done so) and return then eventID to use for near side op
   * applications.
   *
   * @since GemFire 5.7
   */
  private EventID getNearSideEventId(TXState txState) {
    assert this.nearSideEventOffset != -1;
    return createEventID(txState, this.nearSideEventOffset);
  }

  /**
   * Calculate and return the event offset for this entry's farSide operation.
   *
   * @since GemFire 5.7
   */
  void generateEventOffsets(TXState txState) {
    switch (this.op) {
      case OP_NULL:
        // no eventIds needed
        break;
      case OP_L_DESTROY:
        generateNearSideOnlyEventOffset(txState);
        break;
      case OP_CREATE_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_LLOAD_CREATE_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_NLOAD_CREATE_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_PUT_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_LLOAD_PUT_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_NLOAD_PUT_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_D_INVALIDATE_LD:
        generateBothEventOffsets(txState);
        break;
      case OP_D_DESTROY:
        generateSharedEventOffset(txState);
        break;
      case OP_L_INVALIDATE:
        generateNearSideOnlyEventOffset(txState);
        break;
      case OP_PUT_LI:
        generateBothEventOffsets(txState);
        break;
      case OP_LLOAD_PUT_LI:
        generateBothEventOffsets(txState);
        break;
      case OP_NLOAD_PUT_LI:
        generateBothEventOffsets(txState);
        break;
      case OP_D_INVALIDATE:
        generateSharedEventOffset(txState);
        break;
      case OP_CREATE_LI:
        generateBothEventOffsets(txState);
        break;
      case OP_LLOAD_CREATE_LI:
        generateBothEventOffsets(txState);
        break;
      case OP_NLOAD_CREATE_LI:
        generateBothEventOffsets(txState);
        break;
      case OP_CREATE:
        generateSharedEventOffset(txState);
        break;
      case OP_SEARCH_CREATE:
        generateNearSideOnlyEventOffset(txState);
        break;
      case OP_LLOAD_CREATE:
        generateSharedEventOffset(txState);
        break;
      case OP_NLOAD_CREATE:
        generateSharedEventOffset(txState);
        break;
      case OP_LOCAL_CREATE:
        generateNearSideOnlyEventOffset(txState);
        break;
      case OP_PUT:
        generateSharedEventOffset(txState);
        break;
      case OP_SEARCH_PUT:
        generateNearSideOnlyEventOffset(txState);
        break;
      case OP_LLOAD_PUT:
        generateSharedEventOffset(txState);
        break;
      case OP_NLOAD_PUT:
        generateSharedEventOffset(txState);
        break;
      default:
        throw new IllegalStateException("<unhandled op " + this.op + " >");
    }
  }

  /**
   * Gets the operation code for the operation done on this entry in caches remote from the
   * originator of the tx (i.e. the "far side").
   *
   * @return null if no far side operation
   */
  private Operation getFarSideOperation() {
    switch (this.op) {
      case OP_NULL:
        return null;
      case OP_L_DESTROY:
        return null;
      case OP_CREATE_LD:
        return getCreateOperation();
      case OP_LLOAD_CREATE_LD:
        return Operation.LOCAL_LOAD_CREATE;
      case OP_NLOAD_CREATE_LD:
        return Operation.NET_LOAD_CREATE;
      case OP_PUT_LD:
        return getUpdateOperation();
      case OP_LLOAD_PUT_LD:
        return Operation.LOCAL_LOAD_UPDATE;
      case OP_NLOAD_PUT_LD:
        return Operation.NET_LOAD_UPDATE;
      case OP_D_INVALIDATE_LD:
        return Operation.INVALIDATE;
      case OP_D_DESTROY:
        return getDestroyOperation();
      case OP_L_INVALIDATE:
        return null;
      case OP_PUT_LI:
        return getUpdateOperation();
      case OP_LLOAD_PUT_LI:
        return Operation.LOCAL_LOAD_UPDATE;
      case OP_NLOAD_PUT_LI:
        return Operation.NET_LOAD_UPDATE;
      case OP_D_INVALIDATE:
        return Operation.INVALIDATE;
      case OP_CREATE_LI:
        return getCreateOperation();
      case OP_LLOAD_CREATE_LI:
        return Operation.LOCAL_LOAD_CREATE;
      case OP_NLOAD_CREATE_LI:
        return Operation.NET_LOAD_CREATE;
      case OP_CREATE:
        return getCreateOperation();
      case OP_SEARCH_CREATE:
        return null;
      case OP_LLOAD_CREATE:
        return Operation.LOCAL_LOAD_CREATE;
      case OP_NLOAD_CREATE:
        return Operation.NET_LOAD_CREATE;
      case OP_LOCAL_CREATE:
        return getCreateOperation();
      case OP_PUT:
        return getUpdateOperation();
      case OP_SEARCH_PUT:
        return null;
      case OP_LLOAD_PUT:
        return Operation.LOCAL_LOAD_UPDATE;
      case OP_NLOAD_PUT:
        return Operation.NET_LOAD_UPDATE;
      default:
        throw new IllegalStateException(
            String.format("<unhandled op %s >", Byte.valueOf(this.op)));
    }
  }

  @Retained
  EntryEvent getEvent(InternalRegion r, Object key, TXState txs) {
    InternalRegion eventRegion = r;
    if (r.isUsedForPartitionedRegionBucket()) {
      eventRegion = r.getPartitionedRegion();
    }
    @Retained
    EntryEventImpl result = new TxEntryEventImpl(eventRegion, key);
    boolean returnedResult = false;
    try {
      if (this.destroy == DESTROY_NONE || isOpDestroy()) {
        result.setOldValue(getOriginalValue());
      }
      if (txs.isOriginRemoteForEvents()) {
        result.setOriginRemote(true);
      } else {
        result.setOriginRemote(false);
      }
      result.setTransactionId(txs.getTransactionId());
      returnedResult = true;
      return result;
    } finally {
      if (!returnedResult)
        result.release();
    }
  }

  /**
   * @return true if invalidate was done
   */
  public boolean invalidate(EntryEventImpl event) throws EntryNotFoundException {
    if (event.isLocalInvalid()) {
      performOp(adviseOp(OP_L_INVALIDATE, event), event);
    } else {
      performOp(adviseOp(OP_D_INVALIDATE, event), event);
    }

    return true;
  }

  /**
   * @return true if destroy was done
   */
  public boolean destroy(EntryEventImpl event, boolean cacheWrite, boolean originRemote)
      throws CacheWriterException, EntryNotFoundException, TimeoutException {
    InternalRegion internalRegion = event.getRegion();
    CacheWriter cWriter = internalRegion.basicGetWriter();
    byte advisedOp = adviseOp((cacheWrite ? OP_D_DESTROY : OP_L_DESTROY), event);
    if (cWriter != null && cacheWrite && !event.inhibitAllNotifications()) {
      boolean oldOriginRemote = event.isOriginRemote();
      if (event.hasClientOrigin() || originRemote) {
        event.setOriginRemote(true);
      }
      cWriter.beforeDestroy(event);
      event.setOriginRemote(oldOriginRemote);
    }

    if (advisedOp != OP_NULL) {
      this.bulkOp = event.getOperation().isRemoveAll();
      if (cacheWrite) {
        performOp(advisedOp, event);
        this.destroy = DESTROY_DISTRIBUTED;
      } else {
        performOp(advisedOp, event);
        if (this.destroy != DESTROY_DISTRIBUTED) {
          this.destroy = DESTROY_LOCAL;
        }
      }
    }

    return true;
  }

  /**
   * @param event the event object for this operation, with the exception that the oldValue
   *        parameter is not yet filled in. The oldValue will be filled in by this operation.
   *
   * @param ifNew true if this operation must not overwrite an existing key
   * @param originRemote value for cacheWriter's isOriginRemote flag
   * @return true if put was done; otherwise returns false
   */
  public boolean basicPut(EntryEventImpl event, boolean ifNew, boolean originRemote)
      throws CacheWriterException, TimeoutException {
    byte putOp = OP_CREATE;
    boolean doingCreate = true;
    if (!ifNew) {
      if (existsLocally()) {
        putOp = OP_PUT;
        doingCreate = false;
      }
    }
    if (doingCreate) {
      event.makeCreate();
    } else {
      event.makeUpdate();
    }
    if (event.isNetSearch()) {
      putOp += (byte) (OP_SEARCH_PUT - OP_PUT);
    } else if (event.isLocalLoad()) {
      putOp += (byte) (OP_LLOAD_PUT - OP_PUT);
    } else if (event.isNetLoad()) {
      putOp += (byte) (OP_NLOAD_PUT - OP_PUT);
    }
    this.bulkOp = event.getOperation().isPutAll();
    byte advisedOp = adviseOp(putOp, event);

    InternalRegion internalRegion = event.getRegion();
    if (!event.isNetSearch()) {
      CacheWriter cWriter = internalRegion.basicGetWriter();
      boolean oldOriginRemote = event.isOriginRemote();
      if (event.hasClientOrigin() || originRemote) {
        event.setOriginRemote(true);
      }
      if (cWriter != null && event.isGenerateCallbacks() && !event.inhibitAllNotifications()) {
        if (doingCreate) {
          cWriter.beforeCreate(event);
        } else {
          cWriter.beforeUpdate(event);
        }
      }
      event.setOriginRemote(oldOriginRemote);
    }

    performOp(advisedOp, event);

    if (putOp == OP_CREATE) {
      fetchRemoteVersionTag(event);
    }
    return true;
  }

  /**
   * We will try to establish TXState on members with dataPolicy REPLICATE, this is done for the
   * first region to be involved in a transaction. For subsequent region if the dataPolicy is not
   * REPLICATE, we fetch the VersionTag from replicate members.
   */
  private void fetchRemoteVersionTag(EntryEventImpl event) {
    if (event.getRegion() instanceof DistributedRegion) {
      DistributedRegion dr = (DistributedRegion) event.getRegion();
      if (dr.getDataPolicy() == DataPolicy.NORMAL || dr.getDataPolicy() == DataPolicy.PRELOADED) {
        VersionTag tag = null;
        try {
          tag = dr.fetchRemoteVersionTag(event.getKey());
        } catch (EntryNotFoundException e) {
          // ignore
        }
        if (tag != null) {
          setRemoteVersionTag(tag);
        }
      }
    }
  }

  /**
   * Perform operation algebra
   *
   * @return false if operation was not done
   */
  private byte adviseOp(byte requestedOpCode, EntryEventImpl event) {
    { // Set event old value
      Object oldVal;
      if (this.op == OP_NULL) {
        oldVal = getOriginalValue();
      } else {
        oldVal = getNearSidePendingValue();
      }

      Region region = event.getRegion();
      boolean needOldValue = region instanceof HARegion // fix for bug 37909
          || region instanceof BucketRegion;
      event.setTXEntryOldValue(oldVal, needOldValue);
    }

    byte advisedOpCode = OP_NULL;
    // Determine new operation based on
    // the requested operation 'requestedOpCode' and
    // the previous operation 'this.op'
    switch (requestedOpCode) {
      case OP_L_DESTROY:
        switch (this.op) {
          case OP_NULL:
            advisedOpCode = requestedOpCode;
            break;
          case OP_L_DESTROY:
          case OP_CREATE_LD:
          case OP_LLOAD_CREATE_LD:
          case OP_NLOAD_CREATE_LD:
          case OP_PUT_LD:
          case OP_LLOAD_PUT_LD:
          case OP_NLOAD_PUT_LD:
          case OP_D_INVALIDATE_LD:
          case OP_D_DESTROY:
            throw new IllegalStateException(
                String.format("Unexpected current op %s for requested op %s",
                    new Object[] {opToString(), opToString(requestedOpCode)}));
          case OP_L_INVALIDATE:
            advisedOpCode = requestedOpCode;
            break;
          case OP_PUT_LI:
            advisedOpCode = OP_PUT_LD;
            break;
          case OP_LLOAD_PUT_LI:
            advisedOpCode = OP_LLOAD_PUT_LD;
            break;
          case OP_NLOAD_PUT_LI:
            advisedOpCode = OP_NLOAD_PUT_LD;
            break;
          case OP_D_INVALIDATE:
            advisedOpCode = OP_D_INVALIDATE_LD;
            break;
          case OP_CREATE_LI:
            advisedOpCode = OP_CREATE_LD;
            break;
          case OP_LLOAD_CREATE_LI:
            advisedOpCode = OP_LLOAD_CREATE_LD;
            break;
          case OP_NLOAD_CREATE_LI:
            advisedOpCode = OP_NLOAD_CREATE_LD;
            break;
          case OP_CREATE:
            advisedOpCode = OP_CREATE_LD;
            break;
          case OP_SEARCH_CREATE:
            advisedOpCode = requestedOpCode;
            break;
          case OP_LLOAD_CREATE:
            advisedOpCode = OP_LLOAD_CREATE_LD;
            break;
          case OP_NLOAD_CREATE:
            advisedOpCode = OP_NLOAD_CREATE_LD;
            break;
          case OP_LOCAL_CREATE:
            advisedOpCode = requestedOpCode;
            break;
          case OP_PUT:
            advisedOpCode = OP_PUT_LD;
            break;
          case OP_SEARCH_PUT:
            advisedOpCode = requestedOpCode;
            break;
          case OP_LLOAD_PUT:
            advisedOpCode = OP_LLOAD_PUT_LD;
            break;
          case OP_NLOAD_PUT:
            advisedOpCode = OP_NLOAD_PUT_LD;
            break;
          default:
            throw new IllegalStateException(
                String.format("Unhandled %s", opToString()));
        }
        break;
      case OP_D_DESTROY:
        Assert.assertTrue(!isOpDestroy(), "Transactional destroy assertion op=" + this.op);
        advisedOpCode = requestedOpCode;
        break;
      case OP_L_INVALIDATE:
        switch (this.op) {
          case OP_NULL:
            advisedOpCode = requestedOpCode;
            break;
          case OP_L_DESTROY:
          case OP_CREATE_LD:
          case OP_LLOAD_CREATE_LD:
          case OP_NLOAD_CREATE_LD:
          case OP_PUT_LD:
          case OP_LLOAD_PUT_LD:
          case OP_NLOAD_PUT_LD:
          case OP_D_DESTROY:
          case OP_D_INVALIDATE_LD:
            throw new IllegalStateException(
                String.format("Unexpected current op %s for requested op %s",
                    new Object[] {opToString(), opToString(requestedOpCode)}));
          case OP_L_INVALIDATE:
            advisedOpCode = requestedOpCode;
            break;
          case OP_LLOAD_PUT_LI:
          case OP_NLOAD_PUT_LI:
          case OP_LLOAD_CREATE_LI:
          case OP_NLOAD_CREATE_LI:
            advisedOpCode = this.op;
            break;
          case OP_PUT_LI:
            advisedOpCode = OP_PUT_LI;
            break;
          case OP_CREATE_LI:
            advisedOpCode = OP_CREATE_LI;
            break;
          case OP_D_INVALIDATE:
            advisedOpCode = OP_D_INVALIDATE;
            break;
          case OP_CREATE:
            advisedOpCode = OP_CREATE_LI;
            break;
          case OP_SEARCH_CREATE:
            advisedOpCode = OP_LOCAL_CREATE;
            // pendingValue will be set to LOCAL_INVALID
            break;
          case OP_LLOAD_CREATE:
            advisedOpCode = OP_LLOAD_CREATE_LI;
            break;
          case OP_NLOAD_CREATE:
            advisedOpCode = OP_NLOAD_CREATE_LI;
            break;
          case OP_LOCAL_CREATE:
            advisedOpCode = OP_LOCAL_CREATE;
            break;
          case OP_PUT:
            advisedOpCode = OP_PUT_LI;
            break;
          case OP_SEARCH_PUT:
            advisedOpCode = requestedOpCode;
            break;
          case OP_LLOAD_PUT:
            advisedOpCode = OP_LLOAD_PUT_LI;
            break;
          case OP_NLOAD_PUT:
            advisedOpCode = OP_NLOAD_PUT_LI;
            break;
          default:
            throw new IllegalStateException(
                String.format("Unhandled %s", opToString()));
        }
        break;
      case OP_D_INVALIDATE:
        switch (this.op) {
          case OP_NULL:
            advisedOpCode = requestedOpCode;
            break;
          case OP_L_DESTROY:
          case OP_CREATE_LD:
          case OP_LLOAD_CREATE_LD:
          case OP_NLOAD_CREATE_LD:
          case OP_PUT_LD:
          case OP_LLOAD_PUT_LD:
          case OP_NLOAD_PUT_LD:
          case OP_D_INVALIDATE_LD:
          case OP_D_DESTROY:
            throw new IllegalStateException(
                String.format("Unexpected current op %s for requested op %s",
                    new Object[] {opToString(), opToString(requestedOpCode)}));
          case OP_D_INVALIDATE:
          case OP_L_INVALIDATE:
            advisedOpCode = OP_D_INVALIDATE;
            break;

          case OP_PUT_LI:
          case OP_LLOAD_PUT_LI:
          case OP_NLOAD_PUT_LI:
          case OP_CREATE_LI:
          case OP_LLOAD_CREATE_LI:
          case OP_NLOAD_CREATE_LI:
            /*
             * No change, keep it how it was.
             */
            advisedOpCode = this.op;
            break;
          case OP_CREATE:
            advisedOpCode = OP_CREATE;
            // pendingValue will be set to INVALID turning it into create invalid
            break;
          case OP_SEARCH_CREATE:
            advisedOpCode = OP_LOCAL_CREATE;
            // pendingValue will be set to INVALID to indicate dinvalidate
            break;
          case OP_LLOAD_CREATE:
            advisedOpCode = OP_CREATE;
            // pendingValue will be set to INVALID turning it into create invalid
            break;
          case OP_NLOAD_CREATE:
            advisedOpCode = OP_CREATE;
            // pendingValue will be set to INVALID turning it into create invalid
            break;
          case OP_LOCAL_CREATE:
            advisedOpCode = OP_LOCAL_CREATE;
            // pendingValue will be set to INVALID to indicate dinvalidate
            break;
          case OP_PUT:
          case OP_SEARCH_PUT:
          case OP_LLOAD_PUT:
          case OP_NLOAD_PUT:
            advisedOpCode = requestedOpCode;
            break;
          default:
            throw new IllegalStateException(
                String.format("Unhandled %s", opToString()));
        }
        break;
      case OP_CREATE:
      case OP_SEARCH_CREATE:
      case OP_LLOAD_CREATE:
      case OP_NLOAD_CREATE:
        advisedOpCode = requestedOpCode;
        break;
      case OP_PUT:
        switch (this.op) {
          case OP_CREATE:
          case OP_SEARCH_CREATE:
          case OP_LLOAD_CREATE:
          case OP_NLOAD_CREATE:
          case OP_LOCAL_CREATE:
          case OP_CREATE_LI:
          case OP_LLOAD_CREATE_LI:
          case OP_NLOAD_CREATE_LI:
          case OP_CREATE_LD:
          case OP_LLOAD_CREATE_LD:
          case OP_NLOAD_CREATE_LD:
          case OP_PUT_LD:
          case OP_LLOAD_PUT_LD:
          case OP_NLOAD_PUT_LD:
          case OP_D_INVALIDATE_LD:
          case OP_L_DESTROY:
          case OP_D_DESTROY:
            advisedOpCode = OP_CREATE;
            break;
          default:
            advisedOpCode = requestedOpCode;
            break;
        }
        break;
      case OP_SEARCH_PUT:
        switch (this.op) {
          case OP_NULL:
            advisedOpCode = requestedOpCode;
            break;
          case OP_L_INVALIDATE:
            advisedOpCode = requestedOpCode;
            break;
          // The incoming search put value should match
          // the pendingValue from the previous tx operation.
          // So it is ok to simply drop the _LI from the op
          case OP_PUT_LI:
            advisedOpCode = OP_PUT;
            break;
          case OP_LLOAD_PUT_LI:
            advisedOpCode = OP_LLOAD_PUT;
            break;
          case OP_NLOAD_PUT_LI:
            advisedOpCode = OP_NLOAD_PUT;
            break;
          case OP_CREATE_LI:
            advisedOpCode = OP_CREATE;
            break;
          case OP_LLOAD_CREATE_LI:
            advisedOpCode = OP_LLOAD_CREATE;
            break;
          case OP_NLOAD_CREATE_LI:
            advisedOpCode = OP_NLOAD_CREATE;
            break;
          default:
            // Note that OP_LOCAL_CREATE and OP_CREATE with invalid values
            // are not possible because they would cause the netsearch to
            // fail and we would do a load or a total miss.
            // Note that OP_D_INVALIDATE followed by OP_SEARCH_PUT is not
            // possible since the netsearch will alwsys "miss" in this case.
            throw new IllegalStateException(
                String.format("Previous op %s unexpected for requested op %s",
                    new Object[] {opToString(), opToString(requestedOpCode)}));
        }
        break;
      case OP_LLOAD_PUT:
      case OP_NLOAD_PUT:
        switch (this.op) {
          case OP_NULL:
          case OP_L_INVALIDATE:
          case OP_PUT_LI:
          case OP_LLOAD_PUT_LI:
          case OP_NLOAD_PUT_LI:
          case OP_D_INVALIDATE:
            advisedOpCode = requestedOpCode;
            break;
          case OP_CREATE:
          case OP_LOCAL_CREATE:
          case OP_CREATE_LI:
          case OP_LLOAD_CREATE_LI:
          case OP_NLOAD_CREATE_LI:
            if (requestedOpCode == OP_LLOAD_PUT) {
              advisedOpCode = OP_LLOAD_CREATE;
            } else {
              advisedOpCode = OP_NLOAD_CREATE;
            }
            break;
          default:
            // note that other invalid states are covered by this default
            // case because they should have caused a OP_SEARCH_PUT
            // to be requested.
            throw new IllegalStateException(
                String.format("Previous op %s unexpected for requested op %s",
                    new Object[] {opToString(), opToString(requestedOpCode)}));
        }
        break;
      default:
        throw new IllegalStateException(
            String.format("OpCode %s should never be requested",
                opToString(requestedOpCode)));
    }
    return advisedOpCode;
  }

  private void performOp(byte advisedOpCode, EntryEventImpl event) {
    this.op = advisedOpCode;
    event.putValueTXEntry(this);
  }

  private boolean areIdentical(Object o1, Object o2) {
    if (o1 == o2)
      return true;
    if (o1 instanceof StoredObject) {
      if (o1.equals(o2))
        return true;
    }
    return false;
  }

  void checkForConflict(InternalRegion r, Object key) throws CommitConflictException {
    if (!isDirty()) {
      // All we did was read the entry and we don't do read/write conflicts; yet.
      return;
    }
    if (isOpSearch()) {
      // netsearch never cause conflicts
      // Note that we don't care, in this case, if region still exists.
      // Search should never cause a conflict
      return;
    }
    try {
      r.checkReadiness();
      RegionEntry re = r.basicGetEntry(key);
      Object curCmtVersionId = null;
      try {
        if ((re == null) || re.isValueNull()) {
          curCmtVersionId = Token.REMOVED_PHASE1;
        } else {
          if (re.getValueWasResultOfSearch()) {
            return;
          }

          /*
           * The originalVersionId may be compressed so grab the value as stored in the map which
           * will match if compression is turned on.
           */
          curCmtVersionId = re.getTransformedValue();
        }
        if (!areIdentical(getOriginalVersionId(), curCmtVersionId)) {
          // I'm not sure it is a good idea to call getDeserializedValue.
          // Might be better to add a toString impl on CachedDeserializable.
          // if (curCmtVersionId instanceof CachedDeserializable) {
          // curCmtVersionId =
          // ((CachedDeserializable)curCmtVersionId).getDeserializedValue();
          // }
          if (VERBOSE_CONFLICT_STRING || logger.isDebugEnabled()) {
            String fromString = calcConflictString(getOriginalVersionId());
            String toString = calcConflictString(curCmtVersionId);
            if (!fromString.equals(toString)) {
              throw new CommitConflictException(
                  String.format(
                      "Entry for key %s on region %s had already been changed from %s to %s",

                      new Object[] {key, r.getDisplayName(), fromString, toString}));
            }
          }
          throw new CommitConflictException(
              String.format("Entry for key %s on region %s had a state change",
                  new Object[] {key, r.getDisplayName()}));
        }
      } finally {
        OffHeapHelper.release(curCmtVersionId);
      }
    } catch (CacheRuntimeException ex) {
      r.getCancelCriterion().checkCancelInProgress(null);
      throw new CommitConflictException(
          "Conflict caused by cache exception", ex);
    }
  }

  private String calcConflictString(Object obj) {
    Object o = obj;
    if (o instanceof CachedDeserializable) {
      if (o instanceof StoredObject && ((StoredObject) o).isCompressed()) {
        // fix for bug 52113
        return "<compressed value of size " + ((StoredObject) o).getValueSizeInBytes() + ">";
      }
      try {
        o = ((CachedDeserializable) o).getDeserializedForReading();
      } catch (PdxSerializationException e) {
        o = "<value unavailable>";
      }
    }
    if (o == null) {
      return "<null>";
    } else if (Token.isRemoved(o)) {
      return "<null>";
    } else if (o == Token.INVALID) {
      return "<invalidated>";
    } else if (o == Token.LOCAL_INVALID) {
      return "<local invalidated>";
    }
    return "<" + StringUtils.forceToString(o) + ">";
  }

  private boolean didDestroy() {
    return this.destroy != DESTROY_NONE;
  }

  private boolean didDistributedDestroy() {
    return this.destroy == DESTROY_DISTRIBUTED;
  }

  /**
   * Returns the total number of modifications made by this transaction to this entry. The result
   * will be +1 for a create and -1 for a destroy.
   */
  int entryCountMod() {
    switch (this.op) {
      case OP_L_DESTROY:
      case OP_CREATE_LD:
      case OP_LLOAD_CREATE_LD:
      case OP_NLOAD_CREATE_LD:
      case OP_PUT_LD:
      case OP_LLOAD_PUT_LD:
      case OP_NLOAD_PUT_LD:
      case OP_D_INVALIDATE_LD:
      case OP_D_DESTROY:
        if (getOriginalValue() == null || Token.isRemoved(getOriginalValue())) {
          return 0;
        } else {
          return -1;
        }
      case OP_CREATE_LI:
      case OP_LLOAD_CREATE_LI:
      case OP_NLOAD_CREATE_LI:
      case OP_CREATE:
      case OP_SEARCH_CREATE:
      case OP_LLOAD_CREATE:
      case OP_NLOAD_CREATE:
      case OP_LOCAL_CREATE:
        if (getOriginalValue() == null || Token.isRemoved(getOriginalValue())) {
          return 1;
        } else {
          return 0;
        }
      case OP_NULL:
      case OP_L_INVALIDATE:
      case OP_PUT_LI:
      case OP_LLOAD_PUT_LI:
      case OP_NLOAD_PUT_LI:
      case OP_D_INVALIDATE:
      case OP_PUT:
      case OP_SEARCH_PUT:
      case OP_LLOAD_PUT:
      case OP_NLOAD_PUT:
      default:
        return 0;
    }
  }

  /**
   * Returns true if this entry may have been created by this transaction.
   */
  boolean wasCreatedByTX() {
    return isOpCreate();
  }

  private void txApplyDestroyLocally(InternalRegion r, Object key, TXState txState) {
    boolean invokeCallbacks = isOpDestroyEvent(r);
    List<EntryEventImpl> pendingCallbacks =
        invokeCallbacks ? txState.getPendingCallbacks() : new ArrayList<EntryEventImpl>();
    try {
      r.txApplyDestroy(key, txState.getTransactionId(), null, false/* inTokenMode */,
          getDestroyOperation(), getNearSideEventId(txState), callBackArgument, pendingCallbacks,
          getFilterRoutingInfo(), txState.bridgeContext, false, this, null, -1);
    } catch (RegionDestroyedException ignore) {
    } catch (EntryDestroyedException ignore) {
    }
    // if !isOpDestroyEvent then
    // this destroy, to local committed state, becomes a noop
    // since nothing needed to be done locally.
    // We don't want to actually do the destroy since we told the
    // transaction listener that no destroy was done.
  }

  private void txApplyInvalidateLocally(InternalRegion r, Object key, Object newValue,
      boolean didDestroy, TXState txState) {
    try {
      r.txApplyInvalidate(key, newValue, didDestroy, txState.getTransactionId(), null,
          isOpLocalInvalidate() ? true : false, getNearSideEventId(txState), callBackArgument,
          txState.getPendingCallbacks(), getFilterRoutingInfo(), txState.bridgeContext, this, null,
          -1);
    } catch (RegionDestroyedException ignore) {
    } catch (EntryDestroyedException ignore) {
    }
  }

  private void txApplyPutLocally(InternalRegion r, Operation putOp, Object key, Object newValue,
      boolean didDestroy, TXState txState) {
    try {
      r.txApplyPut(putOp, key, newValue, didDestroy, txState.getTransactionId(), null,
          getNearSideEventId(txState), callBackArgument, txState.getPendingCallbacks(),
          getFilterRoutingInfo(), txState.bridgeContext, this, null, -1);
    } catch (RegionDestroyedException ignore) {
    } catch (EntryDestroyedException ignore) {
    }
  }

  /**
   * If the entry is not dirty (read only) then clean it up.
   *
   * @return true if this entry is not dirty.
   */
  boolean cleanupNonDirty(InternalRegion r) {
    if (isDirty()) {
      return false;
    } else {
      cleanup(r);
      return true;
    }
  }

  void buildMessage(InternalRegion r, Object key, TXCommitMessage msg, Set otherRecipients) {
    if (!isDirty()) {
      // all we do was read so just return
      return;
    }
    switch (this.op) {
      case OP_NULL:
      case OP_L_DESTROY:
      case OP_L_INVALIDATE:
      case OP_SEARCH_CREATE:
      case OP_SEARCH_PUT:
        // local only
        break;

      case OP_CREATE_LD:
      case OP_LLOAD_CREATE_LD:
      case OP_NLOAD_CREATE_LD:
      case OP_PUT_LD:
      case OP_LLOAD_PUT_LD:
      case OP_NLOAD_PUT_LD:
      case OP_D_INVALIDATE_LD:
      case OP_D_DESTROY:
      case OP_PUT_LI:
      case OP_LLOAD_PUT_LI:
      case OP_NLOAD_PUT_LI:
      case OP_D_INVALIDATE:
      case OP_CREATE_LI:
      case OP_LLOAD_CREATE_LI:
      case OP_NLOAD_CREATE_LI:
      case OP_CREATE:
      case OP_LLOAD_CREATE:
      case OP_NLOAD_CREATE:
      case OP_LOCAL_CREATE:
      case OP_PUT:
      case OP_LLOAD_PUT:
      case OP_NLOAD_PUT:
        msg.addOp(r, key, this, otherRecipients);
        break;

      default:
        throw new IllegalStateException("Unhandled op=" + opToString());
    }
  }


  void buildCompleteMessage(InternalRegion r, Object key, TXCommitMessage msg,
      Set otherRecipients) {
    if (!isDirty()) {
      // all we do was read so just return
      return;
    }
    switch (this.op) {
      case OP_NULL:
      case OP_L_DESTROY:
      case OP_L_INVALIDATE:
      case OP_SEARCH_CREATE:
      case OP_SEARCH_PUT:
        // local only
        break;

      case OP_CREATE_LD:
      case OP_LLOAD_CREATE_LD:
      case OP_NLOAD_CREATE_LD:
      case OP_PUT_LD:
      case OP_LLOAD_PUT_LD:
      case OP_NLOAD_PUT_LD:
      case OP_D_INVALIDATE_LD:
      case OP_D_DESTROY:
      case OP_PUT_LI:
      case OP_LLOAD_PUT_LI:
      case OP_NLOAD_PUT_LI:
      case OP_D_INVALIDATE:
      case OP_CREATE_LI:
      case OP_LLOAD_CREATE_LI:
      case OP_NLOAD_CREATE_LI:
      case OP_CREATE:
      case OP_LLOAD_CREATE:
      case OP_NLOAD_CREATE:
      case OP_LOCAL_CREATE:
      case OP_PUT:
      case OP_LLOAD_PUT:
      case OP_NLOAD_PUT:
        msg.addOp(r, key, this, otherRecipients);
        break;

      default:
        throw new IllegalStateException("Unhandled op=" + opToString());
    }
  }



  void applyChanges(InternalRegion r, Object key, TXState txState) {
    if (logger.isDebugEnabled()) {
      logger.debug("applyChanges txState=" + txState + " ,key=" + key + " ,r=" + r.getDisplayName()
          + " ,op=" + this.op + " ,isDirty=" + isDirty());
    }
    if (!isDirty()) {
      // all we did was read so just return
      return;
    }
    switch (this.op) {
      case OP_NULL:
        // do nothing
        break;
      case OP_L_DESTROY:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_CREATE_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_LLOAD_CREATE_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_NLOAD_CREATE_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_PUT_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_LLOAD_PUT_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_NLOAD_PUT_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_D_INVALIDATE_LD:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_D_DESTROY:
        txApplyDestroyLocally(r, key, txState);
        break;
      case OP_L_INVALIDATE:
        txApplyInvalidateLocally(r, key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_PUT_LI:
        txApplyPutLocally(r, getUpdateOperation(), key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_LLOAD_PUT_LI:
        txApplyPutLocally(r, getUpdateOperation(), key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_NLOAD_PUT_LI:
        txApplyPutLocally(r, getUpdateOperation(), key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_D_INVALIDATE:
        txApplyInvalidateLocally(r, key, Token.INVALID, didDestroy(), txState);
        break;
      case OP_CREATE_LI:
        txApplyPutLocally(r, getCreateOperation(), key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_LLOAD_CREATE_LI:
        txApplyPutLocally(r, getCreateOperation(), key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_NLOAD_CREATE_LI:
        txApplyPutLocally(r, getCreateOperation(), key, Token.LOCAL_INVALID, didDestroy(), txState);
        break;
      case OP_CREATE:
        txApplyPutLocally(r, getCreateOperation(), key, getPendingValue(), didDestroy(), txState);
        break;
      case OP_SEARCH_CREATE:
        txApplyPutLocally(r, Operation.SEARCH_CREATE, key, getPendingValue(), didDestroy(),
            txState);
        break;
      case OP_LLOAD_CREATE:
        txApplyPutLocally(r, Operation.LOCAL_LOAD_CREATE, key, getPendingValue(), didDestroy(),
            txState);
        break;
      case OP_NLOAD_CREATE:
        txApplyPutLocally(r, Operation.NET_LOAD_CREATE, key, getPendingValue(), didDestroy(),
            txState);
        break;
      case OP_LOCAL_CREATE:
        txApplyPutLocally(r, getCreateOperation(), key, getPendingValue(), didDestroy(), txState);
        break;
      case OP_PUT:
        txApplyPutLocally(r, getUpdateOperation(), key, getPendingValue(), didDestroy(), txState);
        break;
      case OP_SEARCH_PUT:
        txApplyPutLocally(r, Operation.SEARCH_UPDATE, key, getPendingValue(), didDestroy(),
            txState);
        break;
      case OP_LLOAD_PUT:
        txApplyPutLocally(r, Operation.LOCAL_LOAD_UPDATE, key, getPendingValue(), didDestroy(),
            txState);
        break;
      case OP_NLOAD_PUT:
        txApplyPutLocally(r, Operation.NET_LOAD_UPDATE, key, getPendingValue(), didDestroy(),
            txState);
        break;
      default:
        throw new IllegalStateException(
            String.format("<unhandled op %s >", opToString()));
    }
  }


  /**
   * @return returns {@link Operation#PUTALL_CREATE} if the operation is a result of bulk op,
   *         {@link Operation#CREATE} otherwise
   */
  private Operation getCreateOperation() {
    return isBulkOp() ? Operation.PUTALL_CREATE : Operation.CREATE;
  }

  /**
   * @return returns {@link Operation#PUTALL_UPDATE} if the operation is a result of bulk op,
   *         {@link Operation#UPDATE} otherwise
   */
  private Operation getUpdateOperation() {
    return isBulkOp() ? Operation.PUTALL_UPDATE : Operation.UPDATE;
  }

  @Override
  @Released(TX_ENTRY_STATE)
  public void release() {
    Object tmp = this.originalVersionId;
    if (OffHeapHelper.release(tmp)) {
      this.originalVersionId = null; // fix for bug 47900
    }
  }

  private Operation getDestroyOperation() {
    if (isOpLocalDestroy()) {
      return Operation.LOCAL_DESTROY;
    } else if (isBulkOp()) {
      return Operation.REMOVEALL_DESTROY;
    } else {
      return Operation.DESTROY;
    }
  }

  /**
   * Serializes this entry state to a data output stream for a far side consumer. Make sure this
   * method is backwards compatible if changes are made.
   *
   * <p>
   * The fromData for this is TXCommitMessage$RegionCommit$FarSideEntryOp#fromData.
   *
   *
   * @param largeModCount true if modCount needs to be represented by an int; false if a byte is
   *        enough
   * @param sendVersionTag true if versionTag should be sent to clients 7.0 and above
   * @param sendShadowKey true if wan shadowKey should be sent to peers 7.0.1 and above
   *
   * @since GemFire 5.0
   */
  void toFarSideData(DataOutput out,
      SerializationContext context,
      boolean largeModCount, boolean sendVersionTag,
      boolean sendShadowKey) throws IOException {
    Operation operation = getFarSideOperation();
    out.writeByte(operation.ordinal);
    if (largeModCount) {
      out.writeInt(this.modSerialNum);
    } else {
      out.writeByte(this.modSerialNum);
    }
    context.getSerializer().writeObject(getCallbackArgument(), out);
    context.getSerializer().writeObject(getFilterRoutingInfo(), out);
    if (sendVersionTag) {
      context.getSerializer().writeObject(getVersionTag(), out);
      assert getVersionTag() != null || !txRegionState.getRegion().getConcurrencyChecksEnabled()
          || txRegionState.getRegion().getDataPolicy() != DataPolicy.REPLICATE : "tag:"
              + getVersionTag() + " r:" + txRegionState.getRegion() + " op:" + opToString()
              + " key:";
    }
    if (sendShadowKey) {
      out.writeLong(this.tailKey);
    }
    out.writeInt(getFarSideEventOffset());
    if (!operation.isDestroy()) {
      out.writeBoolean(didDistributedDestroy());
      if (!operation.isInvalidate()) {
        boolean isTokenOrByteArray = Token.isInvalidOrRemoved(getPendingValue());
        isTokenOrByteArray = isTokenOrByteArray || getPendingValue() instanceof byte[];
        out.writeBoolean(isTokenOrByteArray);
        if (isTokenOrByteArray) {
          // this is a token or byte[] only
          context.getSerializer().writeObject(getPendingValue(), out);
        } else {
          // this is a CachedDeserializable, Object and PDX
          DataSerializer.writeByteArray(getSerializedPendingValue(), out);
        }
      }
    }
  }

  public FilterRoutingInfo getFilterRoutingInfo() {
    return filterRoutingInfo;
  }

  void cleanup(InternalRegion r) {
    if (this.refCountEntry != null) {
      r.txDecRefCount(refCountEntry);
    }
    close();
  }

  /**
   * Returns the sort key for this entry.
   */
  int getSortValue() {
    return this.modSerialNum;
  }

  /**
   * Just like an EntryEventImpl but also has access to TxEntryState to make it Comparable
   *
   * @since GemFire 5.0
   */
  public class TxEntryEventImpl extends EntryEventImpl implements Comparable {
    /**
     * Creates a local tx entry event
     */
    @Retained
    TxEntryEventImpl(InternalRegion r, Object key) {
      super(r, getNearSideOperation(), key, getNearSidePendingValue(),
          TXEntryState.this.getCallbackArgument(), false, r.getMyId(), true/* generateCallbacks */,
          true /* initializeId */);
    }

    /**
     * Returns the value to use to sort us
     */
    private int getSortValue() {
      return TXEntryState.this.getSortValue();
    }

    @Override
    public int compareTo(Object o) {
      TxEntryEventImpl other = (TxEntryEventImpl) o;
      return getSortValue() - other.getSortValue();
    }

    @Override
    public boolean equals(Object o) {
      if (o == null || !(o instanceof TxEntryEventImpl))
        return false;
      return compareTo(o) == 0;
    }

    @Override
    public int hashCode() {
      return getSortValue();
    }
  }

  @Immutable
  private static final TXEntryStateFactory factory = new TXEntryStateFactory() {

    @Override
    public TXEntryState createEntry() {
      return new TXEntryState();
    }

    @Override
    public TXEntryState createEntry(RegionEntry re, Object vId, Object pendingValue,
        Object entryKey, TXRegionState txrs, boolean isDistributed) {
      return new TXEntryState(re, vId, pendingValue, txrs, isDistributed);
    }

  };

  public static TXEntryStateFactory getFactory() {
    return factory;
  }

  public void setFilterRoutingInfo(FilterRoutingInfo fri) {
    this.filterRoutingInfo = fri;
  }

  public Set<InternalDistributedMember> getAdjunctRecipients() {
    return adjunctRecipients;
  }

  public void setAdjunctRecipients(Set<InternalDistributedMember> members) {
    this.adjunctRecipients = members;
  }

  public VersionTag getVersionTag() {
    return versionTag;
  }

  public void setVersionTag(VersionTag versionTag) {
    this.versionTag = versionTag;
  }

  public VersionTag getRemoteVersionTag() {
    return remoteVersionTag;
  }

  public void setRemoteVersionTag(VersionTag remoteVersionTag) {
    this.remoteVersionTag = remoteVersionTag;
  }

  public long getTailKey() {
    return tailKey;
  }

  public void setTailKey(long tailKey) {
    this.tailKey = tailKey;
  }

  public void close() {
    release();
  }

  @Override
  public String toString() {
    StringBuilder str = new StringBuilder();
    str.append("{").append(super.toString()).append(" ");
    str.append(this.op);
    str.append("}");
    return str.toString();
  }

  public DistTxThinEntryState getDistTxEntryStates() {
    return this.distTxThinEntryState;
  }

  public void setDistTxEntryStates(DistTxThinEntryState thinEntryState) {
    this.distTxThinEntryState = thinEntryState;
  }

  /**
   * For Distributed Transaction Usage
   * <p>
   * This class is used to bring relevant information for DistTxEntryEvent from primary, after end
   * of precommit. Same information are sent to all replicates during commit.
   * <p>
   * Whereas see DistTxEntryEvent is used for storing entry event information on TxCoordinator and
   * carry same to replicates.
   */
  public static class DistTxThinEntryState implements DataSerializableFixedID {

    private long regionVersion = 1L;
    private long tailKey = -1L;
    private String memberID;

    // For Serialization
    public DistTxThinEntryState() {}

    @Override
    public Version[] getSerializationVersions() {
      return null;
    }

    @Override
    public int getDSFID() {
      return DIST_TX_THIN_ENTRY_STATE;
    }

    @Override
    public void toData(DataOutput out,
        SerializationContext context) throws IOException {
      DataSerializer.writeLong(this.regionVersion, out);
      DataSerializer.writeLong(this.tailKey, out);
    }

    @Override
    public void fromData(DataInput in,
        DeserializationContext context) throws IOException, ClassNotFoundException {
      this.regionVersion = DataSerializer.readLong(in);
      this.tailKey = DataSerializer.readLong(in);
    }

    @Override
    public String toString() {
      StringBuilder buf = new StringBuilder();
      buf.append("DistTxThinEntryState: ");
      buf.append(" ,regionVersion=" + this.regionVersion);
      buf.append(" ,tailKey=" + this.tailKey);
      buf.append(" ,memberID=" + this.memberID);
      return buf.toString();
    }

    public long getRegionVersion() {
      return this.regionVersion;
    }

    public void setRegionVersion(long regionVersion) {
      this.regionVersion = regionVersion;
    }

    public long getTailKey() {
      return this.tailKey;
    }

    public void setTailKey(long tailKey) {
      this.tailKey = tailKey;
    }

    public String getMemberID() {
      return memberID;
    }

    public void setMemberID(String memberID) {
      this.memberID = memberID;
    }
  }
}
