/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache.wan;

import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;

import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.cache.wan.EventSequenceID;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.CachedDeserializableFactory;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.WrappedCallbackArgument;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.lang.ObjectUtils;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.ReferenceCountHelper;
import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.size.Sizeable;

/**
 * Class <code>GatewaySenderEventImpl</code> represents an event sent between
 * <code>GatewaySender</code>
 *
 *
 * @since GemFire 7.0
 *
 */
public class GatewaySenderEventImpl
    implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable {
  private static final long serialVersionUID = -5690172020872255422L;
  protected static final Object TOKEN_NULL = new Object();

  // It should use current version. But it was hard-coded to be 0x11, i.e. GEODE_120_ORDINAL,
  // by mistake since 120 to pre-190
  protected static final short VERSION = KnownVersion.getCurrentVersion().ordinal();

  protected EnumListenerEvent operation;

  protected Object substituteValue;

  /**
   * The action to be taken (e.g. AFTER_CREATE)
   */
  protected int action;

  /**
   * The operation detail of EntryEvent (e.g. LOAD, PUTALL etc.)
   */
  protected int operationDetail;

  /**
   * The number of parts for the <code>Message</code>
   *
   * @see Message
   */
  protected int numberOfParts;

  /**
   * The identifier of this event
   */
  protected EventID id;

  /**
   * The <code>Region</code> that was updated
   */
  private transient LocalRegion region;

  /**
   * The name of the region being affected by this event
   */
  protected String regionPath;

  /**
   * The key being affected by this event
   */
  protected Object key;

  /**
   * The serialized new value for this event's key. May not be computed at construction time.
   */
  protected volatile byte[] value;

  /**
   * The "object" form of the value. Will be null after this object is deserialized.
   */
  @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
  protected transient Object valueObj;
  protected transient boolean valueObjReleased;

  private transient boolean serializedValueNotAvailable;

  /**
   * Whether the value is a serialized object or just a byte[]
   */
  protected byte valueIsObject;

  /**
   * The callback argument for this event
   */
  protected GatewaySenderEventCallbackArgument callbackArgument;

  /**
   * The version timestamp
   */
  protected long versionTimeStamp;

  /**
   * Whether this event is a possible duplicate
   */
  protected boolean possibleDuplicate;

  /**
   * Whether this event is acknowledged after the ack received by AckReaderThread. As of now this is
   * getting used for PDX related GatewaySenderEvent. But can be extended for for other
   * GatewaySenderEvent.
   */
  protected volatile boolean isAcked;

  /**
   * Whether this event is dispatched by dispatcher. As of now this is getting used for PDX related
   * GatewaySenderEvent. But can be extended for for other GatewaySenderEvent.
   */
  protected volatile boolean isDispatched;
  /**
   * The creation timestamp in ms
   */
  protected long creationTime;

  /**
   * For ParalledGatewaySender we need bucketId of the PartitionRegion on which the update operation
   * was applied.
   */
  protected int bucketId;

  protected Long shadowKey = -1L;

  protected boolean isInitialized;

  private transient boolean isConcurrencyConflict = false;

  private short version;

  private boolean isLastEventInTransaction = false;

  private TransactionId transactionId = null;


  /**
   * Is this thread in the process of serializing this event?
   */
  public static final ThreadLocal<Boolean> isSerializingValue =
      ThreadLocal.withInitial(() -> Boolean.FALSE);

  private static final int CREATE_ACTION = 0;

  private static final int UPDATE_ACTION = 1;

  private static final int DESTROY_ACTION = 2;

  private static final int VERSION_ACTION = 3;

  public static final int UPDATE_ACTION_NO_GENERATE_CALLBACKS = 4;

  private static final int INVALIDATE_ACTION = 5;
  /**
   * Static constants for Operation detail of EntryEvent.
   */
  private static final int OP_DETAIL_NONE = 10;

  private static final int OP_DETAIL_LOCAL_LOAD = 11;

  private static final int OP_DETAIL_NET_LOAD = 12;

  private static final int OP_DETAIL_PUTALL = 13;

  private static final int OP_DETAIL_REMOVEALL = 14;

  private static final int DEFAULT_SERIALIZED_VALUE_SIZE = -1;

  private volatile int serializedValueSize = DEFAULT_SERIALIZED_VALUE_SIZE;



  /**
   * No-arg constructor for data serialization.
   *
   * @see DataSerializer
   */
  public GatewaySenderEventImpl() {}

  /**
   * @param operation The operation for this event (e.g. AFTER_CREATE)
   * @param event The <code>CacheEvent</code> on which this <code>GatewayEventImpl</code> is based
   * @param substituteValue The value to be enqueued instead of the value in the event.
   * @param transactionMetadataDisposition indicating the inclusion of transaction metadata.
   *
   */
  @Retained
  public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event,
      Object substituteValue, final TransactionMetadataDisposition transactionMetadataDisposition)
      throws IOException {
    this(operation, event, substituteValue, true, transactionMetadataDisposition);
  }

  @Retained
  public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event,
      Object substituteValue) throws IOException {
    this(operation, event, substituteValue, true, EXCLUDE);
  }

  @Retained
  public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event,
      Object substituteValue, boolean initialize, int bucketId,
      final TransactionMetadataDisposition transactionMetadataDisposition) throws IOException {
    this(operation, event, substituteValue, initialize, transactionMetadataDisposition);
    this.bucketId = bucketId;
  }

  /**
   * @param operation The operation for this event (e.g. AFTER_CREATE)
   * @param ce The <code>CacheEvent</code> on which this <code>GatewayEventImpl</code> is based
   * @param substituteValue The value to be enqueued instead of the value in the event.
   * @param initialize Whether to initialize this instance
   * @param transactionMetadataDisposition indicating the inclusion of transaction metadata.
   */
  @Retained
  public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> ce,
      Object substituteValue, boolean initialize,
      final TransactionMetadataDisposition transactionMetadataDisposition) throws IOException {
    // Set the operation and event
    final EntryEventImpl event = (EntryEventImpl) ce;
    this.operation = operation;
    this.substituteValue = substituteValue;

    // Initialize the region name. This is being done here because the event
    // can get serialized/deserialized (for some reason) between the time
    // it is set above and used (in initialize). If this happens, the
    // region is null because it is a transient field of the event.
    region = (LocalRegion) event.getRegion();
    regionPath = region.getFullPath();

    // Initialize the unique id
    initializeId(event);

    // Initialize possible duplicate
    possibleDuplicate = event.isPossibleDuplicate();

    // Initialize ack and dispatch status of events
    isAcked = false;
    isDispatched = false;


    // Initialize the creation timestamp
    creationTime = System.currentTimeMillis();

    if (event.getVersionTag() != null && event.getVersionTag().hasValidVersion()) {
      versionTimeStamp = event.getVersionTag().getVersionTimeStamp();
    }

    // Set key
    // System.out.println("this._entryEvent: " + event);
    // System.out.println("this._entryEvent.getKey(): " +
    // event.getKey());
    key = event.getKey();

    initializeValue(event);

    // Set the callback arg
    callbackArgument = (GatewaySenderEventCallbackArgument) event.getRawCallbackArgument();

    // Initialize the action and number of parts (called after _callbackArgument
    // is set above)
    initializeAction(this.operation, event);

    // initialize the operation detail
    initializeOperationDetail(event.getOperation());

    setShadowKey(event.getTailKey());

    if (initialize) {
      initialize();
    }
    isConcurrencyConflict = event.isConcurrencyConflict();

    if (transactionMetadataDisposition != EXCLUDE) {
      transactionId = event.getTransactionId();
      isLastEventInTransaction =
          transactionMetadataDisposition == INCLUDE_LAST_EVENT && null != transactionId;
    }
  }

  /**
   * Used to create a heap copy of an offHeap event. Note that this constructor produces an instance
   * that does not need to be released.
   */
  protected GatewaySenderEventImpl(GatewaySenderEventImpl offHeapEvent) {
    operation = offHeapEvent.operation;
    action = offHeapEvent.action;
    numberOfParts = offHeapEvent.numberOfParts;
    id = offHeapEvent.id;
    region = offHeapEvent.region;
    regionPath = offHeapEvent.regionPath;
    key = offHeapEvent.key;
    callbackArgument = offHeapEvent.callbackArgument;
    versionTimeStamp = offHeapEvent.versionTimeStamp;
    possibleDuplicate = offHeapEvent.possibleDuplicate;
    isAcked = offHeapEvent.isAcked;
    isDispatched = offHeapEvent.isDispatched;
    creationTime = offHeapEvent.creationTime;
    bucketId = offHeapEvent.bucketId;
    shadowKey = offHeapEvent.shadowKey;
    isInitialized = offHeapEvent.isInitialized;

    valueObj = null;
    valueObjReleased = false;
    valueIsObject = offHeapEvent.valueIsObject;
    value = offHeapEvent.getSerializedValue();
    transactionId = offHeapEvent.transactionId;
    isLastEventInTransaction = offHeapEvent.isLastEventInTransaction;
  }

  /**
   * Returns this event's action
   *
   * @return this event's action
   */
  public int getAction() {
    return action;
  }

  /**
   * Returns this event's operation
   *
   * @return this event's operation
   */
  @Override
  public Operation getOperation() {
    Operation op = null;
    switch (action) {
      case CREATE_ACTION:
        switch (operationDetail) {
          case OP_DETAIL_LOCAL_LOAD:
            op = Operation.LOCAL_LOAD_CREATE;
            break;
          case OP_DETAIL_NET_LOAD:
            op = Operation.NET_LOAD_CREATE;
            break;
          case OP_DETAIL_PUTALL:
            op = Operation.PUTALL_CREATE;
            break;
          case OP_DETAIL_NONE:
            op = Operation.CREATE;
            break;
          // if operationDetail is none of the above, then default should be NONE
          default:
            op = Operation.CREATE;
            break;
        }
        break;
      case UPDATE_ACTION:
        switch (operationDetail) {
          case OP_DETAIL_LOCAL_LOAD:
            op = Operation.LOCAL_LOAD_UPDATE;
            break;
          case OP_DETAIL_NET_LOAD:
            op = Operation.NET_LOAD_UPDATE;
            break;
          case OP_DETAIL_PUTALL:
            op = Operation.PUTALL_UPDATE;
            break;
          case OP_DETAIL_NONE:
            op = Operation.UPDATE;
            break;
          // if operationDetail is none of the above, then default should be NONE
          default:
            op = Operation.UPDATE;
            break;
        }
        break;
      case DESTROY_ACTION:
        if (operationDetail == OP_DETAIL_REMOVEALL) {
          op = Operation.REMOVEALL_DESTROY;
        } else {
          op = Operation.DESTROY;
        }
        break;
      case VERSION_ACTION:
        op = Operation.UPDATE_VERSION_STAMP;
        break;
      case INVALIDATE_ACTION:
        op = Operation.INVALIDATE;
        break;
    }
    return op;
  }

  public Object getSubstituteValue() {
    return substituteValue;
  }

  public EnumListenerEvent getEnumListenerEvent() {
    return operation;
  }

  /**
   * Return this event's region name
   *
   * @return this event's region name
   */
  public String getRegionPath() {
    return regionPath;
  }

  public boolean isInitialized() {
    return isInitialized;
  }

  /**
   * Returns this event's key
   *
   * @return this event's key
   */
  @Override
  public Object getKey() {
    // TODO:Asif : Ideally would like to have throw exception if the key
    // is TOKEN_UN_INITIALIZED, but for the time being trying to retain the GFE
    // behaviour
    // of returning null if getKey is invoked on un-initialized gateway event
    return isInitialized() ? key : null;
  }

  /**
   * Returns whether this event's value is a serialized object
   *
   * @return whether this event's value is a serialized object
   */
  public byte getValueIsObject() {
    return valueIsObject;
  }

  /**
   * Return this event's callback argument
   *
   * @return this event's callback argument
   */
  @Override
  public Object getCallbackArgument() {
    Object result = getSenderCallbackArgument();
    while (result instanceof WrappedCallbackArgument) {
      WrappedCallbackArgument wca = (WrappedCallbackArgument) result;
      result = wca.getOriginalCallbackArg();
    }
    return result;
  }

  public GatewaySenderEventCallbackArgument getSenderCallbackArgument() {
    return callbackArgument;
  }

  /**
   * Return this event's number of parts
   *
   * @return this event's number of parts
   */
  public int getNumberOfParts() {
    return numberOfParts;
  }

  /**
   * Return the currently held form of the object. May return a retained OFF_HEAP_REFERENCE.
   */
  @Retained
  public Object getRawValue() {
    @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
    Object result = value;
    if (result == null) {
      result = substituteValue;
      if (result == null) {
        result = valueObj;
        if (result instanceof StoredObject && ((StoredObject) result).hasRefCount()) {
          if (valueObjReleased) {
            result = null;
          } else {
            StoredObject ohref = (StoredObject) result;
            if (!ohref.retain()) {
              result = null;
            } else if (valueObjReleased) {
              ohref.release();
              result = null;
            }
          }
        }
      }
    }
    return result;
  }

  /**
   * Return this event's deserialized value
   *
   * @return this event's deserialized value
   */
  @Override
  public Object getDeserializedValue() {
    if (valueIsObject == 0x00) {
      Object result = value;
      if (result == null) {
        @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
        Object so = valueObj;
        if (valueObjReleased) {
          throw new IllegalStateException(
              "Value is no longer available. getDeserializedValue must be called before processEvents returns.");
        }
        if (so instanceof StoredObject) {
          return ((StoredObject) so).getValueAsDeserializedHeapObject();
        } else {
          throw new IllegalStateException(
              "expected valueObj field to be an instance of StoredObject but it was " + so);
        }
      }
      return result;
    } else {
      Object vo = valueObj;
      if (vo != null) {
        if (vo instanceof StoredObject) {
          @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
          StoredObject so = (StoredObject) vo;
          return so.getValueAsDeserializedHeapObject();
        } else {
          return vo; // it is already deserialized
        }
      } else {
        if (value != null) {
          Object result = EntryEventImpl.deserialize(value);
          valueObj = result;
          return result;
        } else if (substituteValue != null) {
          // If the substitute value is set, return it.
          return substituteValue;
        } else {
          if (valueObjReleased) {
            throw new IllegalStateException(
                "Value is no longer available. getDeserializedValue must be called before processEvents returns.");
          }
          // both value and valueObj are null but we did not free it.
          return null;
        }
      }
    }
  }

  /**
   * Returns the value in the form of a String. This should be used by code that wants to log the
   * value. This is a debugging exception.
   */
  public String getValueAsString(boolean deserialize) {
    Object v = value;
    if (v == null) {
      v = substituteValue;
    }
    if (deserialize) {
      try {
        v = getDeserializedValue();
      } catch (Exception | InternalGemFireError e) {
        return "Could not convert value to string because " + e;
      }
    }
    if (v == null) {
      @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
      Object ov = valueObj;
      if (ov instanceof CachedDeserializable) {
        return ((CachedDeserializable) ov).getStringForm();
      }
    }
    if (v != null) {
      if (v instanceof byte[]) {
        byte[] bav = (byte[]) v;
        // Using Arrays.toString(bav) can cause us to run out of memory
        return "byte[" + bav.length + "]";
      } else {
        String valueStr;
        try {
          valueStr = v.toString();
        } catch (Exception e) {
          valueStr = "Could not convert value to string because " + e;
        }
        return valueStr;
      }
    } else {
      return "";
    }
  }

  public boolean isSerializedValueNotAvailable() {
    return serializedValueNotAvailable;
  }

  /**
   * If the value owned of this event is just bytes return that byte array; otherwise serialize the
   * value object and return the serialized bytes. Use {@link #getValueIsObject()} to determine if
   * the result is raw or serialized bytes.
   */
  @Override
  public byte[] getSerializedValue() {
    byte[] result = value;
    if (result == null) {
      if (substituteValue != null) {
        // The substitute value is set. Serialize it
        isSerializingValue.set(Boolean.TRUE);
        result = EntryEventImpl.serialize(substituteValue);
        isSerializingValue.set(Boolean.FALSE);
        return result;
      }
      @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
      Object vo = valueObj;
      if (vo instanceof StoredObject) {
        synchronized (this) {
          result = value;
          if (result == null) {
            StoredObject so = (StoredObject) vo;
            result = so.getValueAsHeapByteArray();
            value = result;
          }
        }
      } else {
        synchronized (this) {
          result = value;
          if (result == null && vo != null && !(vo instanceof Token)) {
            isSerializingValue.set(Boolean.TRUE);
            result = EntryEventImpl.serialize(vo);
            isSerializingValue.set(Boolean.FALSE);
            value = result;
          } else if (result == null) {
            if (valueObjReleased) {
              serializedValueNotAvailable = true;
              throw new IllegalStateException(
                  "Value is no longer available. getSerializedValue must be called before processEvents returns.");
            }
          }
        }
      }
    }
    return result;
  }

  public void setPossibleDuplicate(boolean possibleDuplicate) {
    this.possibleDuplicate = possibleDuplicate;
  }

  @Override
  public boolean getPossibleDuplicate() {
    return possibleDuplicate;
  }

  public long getCreationTime() {
    return creationTime;
  }

  @Override
  public int getDSFID() {
    return GATEWAY_SENDER_EVENT_IMPL;
  }

  @Override
  public void toData(DataOutput out,
      SerializationContext context) throws IOException {
    toDataPre_GEODE_1_15_0_0(out, context);
    out.writeInt(operationDetail);
  }

  public void toDataPre_GEODE_1_15_0_0(DataOutput out,
      SerializationContext context) throws IOException {
    toDataPre_GEODE_1_14_0_0(out, context);
    boolean hasTransaction = transactionId != null;
    DataSerializer.writeBoolean(hasTransaction, out);
    if (hasTransaction) {
      DataSerializer.writeBoolean(isLastEventInTransaction, out);
      context.getSerializer().writeObject(transactionId, out);
    }
  }

  public void toDataPre_GEODE_1_14_0_0(DataOutput out,
      SerializationContext context) throws IOException {
    toDataPre_GEODE_1_9_0_0(out, context);
    DataSerializer.writeBoolean(isConcurrencyConflict, out);
  }

  public void toDataPre_GEODE_1_9_0_0(DataOutput out, SerializationContext context)
      throws IOException {
    // Make sure we are initialized before we serialize.
    initialize();
    out.writeShort(VERSION);
    out.writeInt(action);
    out.writeInt(numberOfParts);
    // out.writeUTF(this._id);
    context.getSerializer().writeObject(id, out);
    DataSerializer.writeString(regionPath, out);
    out.writeByte(valueIsObject);
    serializeKey(out, context);
    DataSerializer.writeByteArray(getSerializedValue(), out);
    context.getSerializer().writeObject(callbackArgument, out);
    out.writeBoolean(possibleDuplicate);
    out.writeLong(creationTime);
    out.writeInt(bucketId);
    out.writeLong(shadowKey);
    out.writeLong(getVersionTimeStamp());
  }

  protected void serializeKey(DataOutput out,
      SerializationContext context) throws IOException {
    context.getSerializer().writeObject(key, out);
  }

  @Override
  public void fromData(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    fromDataPre_GEODE_1_15_0_0(in, context);
    if (version >= KnownVersion.GEODE_1_15_0.ordinal()) {
      operationDetail = in.readInt();
    }
  }

  public void fromDataPre_GEODE_1_15_0_0(DataInput in, DeserializationContext context)
      throws IOException, ClassNotFoundException {
    fromDataPre_GEODE_1_14_0_0(in, context);
    if (version >= KnownVersion.GEODE_1_14_0.ordinal()) {
      boolean hasTransaction = DataSerializer.readBoolean(in);
      if (hasTransaction) {
        isLastEventInTransaction = DataSerializer.readBoolean(in);
        transactionId = context.getDeserializer().readObject(in);
      }
    }
  }

  public void fromDataPre_GEODE_1_14_0_0(DataInput in, DeserializationContext context)
      throws IOException, ClassNotFoundException {
    fromDataPre_GEODE_1_9_0_0(in, context);
    if (version >= KnownVersion.GEODE_1_9_0.ordinal()) {
      isConcurrencyConflict = DataSerializer.readBoolean(in);
    }
  }

  public void fromDataPre_GEODE_1_9_0_0(DataInput in, DeserializationContext context)
      throws IOException, ClassNotFoundException {
    version = in.readShort();
    isInitialized = true;
    action = in.readInt();
    numberOfParts = in.readInt();
    id = context.getDeserializer().readObject(in);
    regionPath = DataSerializer.readString(in);
    valueIsObject = in.readByte();
    deserializeKey(in, context);
    value = DataSerializer.readByteArray(in);
    callbackArgument = context.getDeserializer().readObject(in);
    possibleDuplicate = in.readBoolean();
    creationTime = in.readLong();
    bucketId = in.readInt();
    shadowKey = in.readLong();
    versionTimeStamp = in.readLong();
  }

  protected void deserializeKey(DataInput in,
      DeserializationContext context) throws IOException, ClassNotFoundException {
    key = context.getDeserializer().readObject(in);
  }

  @Override
  public String toString() {
    return "GatewaySenderEventImpl[" + "id=" + id + ";action="
        + action + ";operation=" + getOperation() + ";region="
        + regionPath + ";key=" + key + ";value="
        + getValueAsString(true) + ";valueIsObject=" + valueIsObject
        + ";numberOfParts=" + numberOfParts + ";callbackArgument="
        + callbackArgument + ";possibleDuplicate=" + possibleDuplicate
        + ";creationTime=" + creationTime + ";shadowKey="
        + shadowKey + ";timeStamp=" + versionTimeStamp
        + ";acked=" + isAcked + ";dispatched=" + isDispatched
        + ";bucketId=" + bucketId
        + ";isConcurrencyConflict=" + isConcurrencyConflict
        + ";transactionId=" + transactionId
        + ";isLastEventInTransaction=" + isLastEventInTransaction
        + "]";
  }

  public String toSmallString() {
    return "GatewaySenderEventImpl[" + "id=" + id + ";operation="
        + getOperation() + ";region=" + regionPath + ";key="
        + key + ";shadowKey=" + shadowKey + ";bucketId="
        + bucketId + "]";
  }

  public static boolean isSerializingValue() {
    return isSerializingValue.get();
  }

  // public static boolean isDeserializingValue() {
  // return ((Boolean)isDeserializingValue.get()).booleanValue();
  // }

  // / Conflatable interface methods ///

  /**
   * Determines whether or not to conflate this message. This method will answer true IFF the
   * message's operation is AFTER_UPDATE and its region has enabled are conflation. Otherwise, this
   * method will answer false. Messages whose operation is AFTER_CREATE, AFTER_DESTROY,
   * AFTER_INVALIDATE or AFTER_REGION_DESTROY are not conflated.
   *
   * @return Whether to conflate this message
   */
  @Override
  public boolean shouldBeConflated() {
    // If the message is an update, it may be conflatable. If it is a
    // create, destroy, invalidate or destroy-region, it is not conflatable.
    // Only updates are conflated.
    return isUpdate() && !isConcurrencyConflict();
  }

  @Override
  public String getRegionToConflate() {
    return regionPath;
  }

  @Override
  public Object getKeyToConflate() {
    return key;
  }

  @Override
  public Object getValueToConflate() {
    // Since all the uses of this are for logging
    // changing it to return the string form of the value
    // instead of the actual value.
    return getValueAsString(true);
  }

  @Override
  public void setLatestValue(Object value) {
    // Currently this method is never used.
    // If someone does want to use it in the future
    // then the implementation needs to be updated
    // to correctly update value, valueObj, and valueIsObject
    throw new UnsupportedOperationException();
  }

  // / End Conflatable interface methods ///

  /**
   * Returns whether this <code>GatewayEvent</code> represents an update.
   *
   * @return whether this <code>GatewayEvent</code> represents an update
   */
  protected boolean isUpdate() {
    // This event can be in one of three states:
    // - in memory primary (initialized)
    // - in memory secondary (not initialized)
    // - evicted to disk, read back in (initialized)
    // In the first case, both the operation and action are set.
    // In the second case, only the operation is set.
    // In the third case, only the action is set.
    return operation == null ? action == UPDATE_ACTION
        : operation == EnumListenerEvent.AFTER_UPDATE;
  }

  /**
   * Returns whether this <code>GatewayEvent</code> represents a create.
   *
   * @return whether this <code>GatewayEvent</code> represents a create
   */
  protected boolean isCreate() {
    // See the comment in isUpdate() for additional details
    return operation == null ? action == CREATE_ACTION
        : operation == EnumListenerEvent.AFTER_CREATE;
  }

  /**
   * Returns whether this <code>GatewayEvent</code> represents a destroy.
   *
   * @return whether this <code>GatewayEvent</code> represents a destroy
   */
  protected boolean isDestroy() {
    // See the comment in isUpdate() for additional details
    return operation == null ? action == DESTROY_ACTION
        : operation == EnumListenerEvent.AFTER_DESTROY;
  }

  /**
   * Initialize the unique identifier for this event. This id is used by the receiving
   * <code>Gateway</code> to keep track of which events have been processed. Duplicates can be
   * dropped.
   */
  private void initializeId(EntryEventImpl event) {
    // CS43_HA
    id = event.getEventId();
    // TODO:ASIF :Once stabilized remove the check below
    if (id == null) {
      throw new IllegalStateException(
          "No event id is available for this gateway event.");
    }

  }

  /**
   * Initialize this instance. Get the useful parts of the input operation and event.
   */
  public void initialize() {
    if (isInitialized()) {
      return;
    }
    isInitialized = true;
  }


  // Initializes the value object. This function need a relook because the
  // serialization of the value looks unnecessary.
  @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
  protected void initializeValue(EntryEventImpl event) {
    // Set the value to be a byte[] representation of either the value or
    // substituteValue (if set).
    if (substituteValue == null) {
      // If the value is already serialized, use it.
      valueIsObject = 0x01;
      /*
       * so ends up being stored in this.valueObj
       */
      @Retained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
      StoredObject so;
      {
        ReferenceCountHelper.setReferenceCountOwner(this);
        so = event.getOffHeapNewValue();
        ReferenceCountHelper.setReferenceCountOwner(null);
      }

      if (so != null) {
        // Since GatewaySenderEventImpl instances can live for a long time in the gateway region
        // queue we do not want the StoredObject to be one that keeps the heap form cached.
        so = so.getStoredObjectWithoutHeapForm();
        valueObj = so;
        if (!so.isSerialized()) {
          valueIsObject = 0x00;
        }
      } else if (event.getCachedSerializedNewValue() != null) {
        // We want this to have lower precedence than StoredObject so that the gateway
        // can share a reference to the off-heap value.
        value = event.getCachedSerializedNewValue();
      } else {
        final Object newValue = event.getRawNewValue();
        assert !(newValue instanceof StoredObject); // since we already called getOffHeapNewValue()
                                                    // and it returned null
        if (newValue instanceof CachedDeserializable) {
          value = ((CachedDeserializable) newValue).getSerializedValue();
        } else if (newValue instanceof byte[]) {
          // The value is byte[]. Set _valueIsObject flag to 0x00 (not an object)
          value = (byte[]) newValue;
          valueIsObject = 0x00;
        } else {
          // The value is an object. It will be serialized later when getSerializedValue is called.
          valueObj = newValue;
          // to prevent bug 48281 we need to serialize it now
          getSerializedValue();
          valueObj = null;
        }
      }
    } else {
      // The substituteValue is set. Use it.
      if (substituteValue instanceof byte[]) {
        // The substituteValue is byte[]. Set valueIsObject flag to 0x00 (not an object)
        value = (byte[]) substituteValue;
        valueIsObject = 0x00;
      } else if (substituteValue == TOKEN_NULL) {
        // The substituteValue represents null. Set the value and substituteValue to null.
        value = null;
        substituteValue = null;
        valueIsObject = 0x01;
      } else {
        // The substituteValue is an object. Leave it as is.
        valueIsObject = 0x01;
      }
    }
  }

  /**
   * Initialize this event's action and number of parts
   *
   * @param operation The operation from which to initialize this event's action and number of parts
   */
  protected void initializeAction(EnumListenerEvent operation, EntryEventImpl event) {
    if (operation == EnumListenerEvent.AFTER_CREATE) {
      // Initialize after create action
      action = CREATE_ACTION;

      // Initialize number of parts
      // part 1 = action
      // part 2 = posDup flag
      // part 3 = regionName
      // part 4 = eventId
      // part 5 = key
      // part 6 = value (create and update only)
      // part 7 = whether callbackArgument is non-null
      // part 8 = callbackArgument (if non-null)
      // part 9 = versionTimeStamp;
      numberOfParts = (callbackArgument == null) ? 8 : 9;
    } else if (operation == EnumListenerEvent.AFTER_UPDATE) {
      // Initialize after update action
      action = UPDATE_ACTION;

      // Initialize number of parts
      numberOfParts = (callbackArgument == null) ? 8 : 9;
    } else if (operation == EnumListenerEvent.AFTER_DESTROY) {
      // Initialize after destroy action
      action = DESTROY_ACTION;

      // Initialize number of parts
      // Since there is no value, there is one less part
      numberOfParts = (callbackArgument == null) ? 7 : 8;
    } else if (operation == EnumListenerEvent.TIMESTAMP_UPDATE) {
      // Initialize after destroy action
      action = VERSION_ACTION;

      // Initialize number of parts
      // Since there is no value, there is one less part
      numberOfParts = (callbackArgument == null) ? 7 : 8;
    } else if (operation == EnumListenerEvent.AFTER_INVALIDATE) {
      // Initialize after invalidate action
      action = INVALIDATE_ACTION;

      // Initialize number of parts
      // Since there is no value, there is one less part
      numberOfParts = (callbackArgument == null) ? 7 : 8;
    } else if (operation == EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS) {
      if (event.isGenerateCallbacks()) {
        action = UPDATE_ACTION;
      } else {
        action = UPDATE_ACTION_NO_GENERATE_CALLBACKS;
      }
      numberOfParts = (callbackArgument == null) ? 8 : 9;
    }
  }

  private void initializeOperationDetail(Operation operation) {
    if (operation.isLocalLoad()) {
      operationDetail = OP_DETAIL_LOCAL_LOAD;
    } else if (operation.isNetLoad()) {
      operationDetail = OP_DETAIL_NET_LOAD;
    } else if (operation.isPutAll()) {
      operationDetail = OP_DETAIL_PUTALL;
    } else if (operation.isRemoveAll()) {
      operationDetail = OP_DETAIL_REMOVEALL;
    } else {
      operationDetail = OP_DETAIL_NONE;
    }
  }

  @Override
  public EventID getEventId() {
    return id;
  }

  /**
   * Return the EventSequenceID of the Event
   *
   */
  @Override
  public EventSequenceID getEventSequenceID() {
    return new EventSequenceID(id.getMembershipID(), id.getThreadID(), id.getSequenceID());
  }

  public long getVersionTimeStamp() {
    return versionTimeStamp;
  }

  @Override
  public int getSizeInBytes() {
    // Calculate the size of this event. This is used for overflow to disk.

    // The sizes of the following variables are calculated:
    //
    // - the value (byte[])
    // - the original callback argument (Object)
    // - primitive and object instance variable references
    //
    // The sizes of the following variables are not calculated:

    // - the key because it is a reference
    // - the region and regionName because they are references
    // - the operation because it is a reference
    // - the entry event because it is nulled prior to calling this method
    // - the transactionId because it is is a reference

    // The size of instances of the following internal datatypes were estimated
    // using a NullDataOutputStream and hardcoded into this method:

    // - the id (an instance of EventId)
    // - the callbackArgument (an instance of GatewayEventCallbackArgument)

    int size = 0;

    // Add this event overhead
    size += Sizeable.PER_OBJECT_OVERHEAD;

    // Add object references
    // _id reference = 4 bytes
    // _region reference = 4 bytes
    // _regionName reference = 4 bytes
    // _key reference = 4 bytes
    // _callbackArgument reference = 4 bytes
    // _operation reference = 4 bytes
    // _entryEvent reference = 4 bytes
    // _transactionId reference = 4 bytes
    size += 32;

    // Add primitive references
    // int _action = 4 bytes
    // int _numberOfParts = 4 bytes
    // byte _valueIsObject = 1 byte
    // boolean _possibleDuplicate = 1 byte
    // int bucketId = 4 bytes
    // long shadowKey = 8 bytes
    // long creationTime = 8 bytes
    // boolean _hasTransaction = 1 byte
    // boolean _isLastEventInTransaction = 1 byte
    size += 32;

    // Add the id (an instance of EventId)
    // The hardcoded value below was estimated using a NullDataOutputStream
    size += Sizeable.PER_OBJECT_OVERHEAD + 56;

    // The value (a byte[])
    size += getSerializedValueSize();

    // The callback argument (a GatewayEventCallbackArgument wrapping an Object
    // which is the original callback argument)
    // The hardcoded value below represents the GatewayEventCallbackArgument
    // and was estimated using a NullDataOutputStream
    size += Sizeable.PER_OBJECT_OVERHEAD + 194;
    // The sizeOf call gets the size of the input callback argument.
    size += Sizeable.PER_OBJECT_OVERHEAD + sizeOf(getCallbackArgument());

    // the version timestamp
    size += 8;

    return size;
  }

  private int sizeOf(Object obj) {
    int size = 0;
    if (obj == null) {
      return size;
    }
    if (obj instanceof String) {
      size = ObjectSizer.DEFAULT.sizeof(obj);
    } else if (obj instanceof Integer) {
      size = 4; // estimate
    } else if (obj instanceof Long) {
      size = 8; // estimate
    } else {
      size = CachedDeserializableFactory.calcMemSize(obj) - Sizeable.PER_OBJECT_OVERHEAD;
    }
    return size;
  }


  // Asif: If the GatewayEvent serializes to a node where the region itself may
  // not be present or the
  // region is not created yet , and if the gateway event queue is persistent,
  // then even if
  // we try to set the region in the fromData , we may still get null. Though
  // the product is
  // not using this method anywhere still not comfortable changing the Interface
  // so
  // modifying the implementation a bit.

  @Override
  public Region<?, ?> getRegion() {
    // The region will be null mostly for the other node where the gateway event
    // is serialized
    return region != null ? region
        : CacheFactory.getAnyInstance().getRegion(regionPath);
  }

  public int getBucketId() {
    return bucketId;
  }

  public boolean isConcurrencyConflict() {
    return isConcurrencyConflict;
  }

  /**
   * @param tailKey the tailKey to set
   */
  public void setShadowKey(Long tailKey) {
    shadowKey = tailKey;
  }

  /**
   * @return the tailKey
   */
  public Long getShadowKey() {
    return shadowKey;
  }

  public boolean isLastEventInTransaction() {
    return isLastEventInTransaction;
  }

  public TransactionId getTransactionId() {
    return transactionId;
  }

  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }

    if (!(obj instanceof GatewaySenderEventImpl)) {
      return false;
    }

    GatewaySenderEventImpl that = (GatewaySenderEventImpl) obj;

    return shadowKey.equals(that.shadowKey)
        && id.equals(that.id)
        && bucketId == that.bucketId
        && action == that.action
        && regionPath.equals(that.regionPath)
        && key.equals(that.key)
        && Arrays.equals(value, that.value);
  }

  public int hashCode() {
    int hashCode = 17;
    hashCode = 37 * hashCode + ObjectUtils.hashCode(shadowKey);
    hashCode = 37 * hashCode + ObjectUtils.hashCode(id);
    hashCode = 37 * hashCode + bucketId;
    hashCode = 37 * hashCode + action;
    hashCode = 37 * hashCode + ObjectUtils.hashCode(regionPath);
    hashCode = 37 * hashCode + ObjectUtils.hashCode(key);
    hashCode = 37 * hashCode + (value == null ? 0 : Arrays.hashCode(value));
    return hashCode;
  }

  @Override
  public KnownVersion[] getSerializationVersions() {
    return new KnownVersion[] {KnownVersion.GEODE_1_9_0, KnownVersion.GEODE_1_14_0,
        KnownVersion.GEODE_1_15_0};
  }

  public int getSerializedValueSize() {
    int localSerializedValueSize = serializedValueSize;
    if (localSerializedValueSize != DEFAULT_SERIALIZED_VALUE_SIZE) {
      return localSerializedValueSize;
    }
    @Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
    Object vo = valueObj;
    if (vo instanceof StoredObject) {
      localSerializedValueSize = ((StoredObject) vo).getSizeInBytes();
    } else {
      if (substituteValue != null) {
        localSerializedValueSize = sizeOf(substituteValue);
      } else {
        localSerializedValueSize = CachedDeserializableFactory.calcMemSize(getSerializedValue());
      }
    }
    serializedValueSize = localSerializedValueSize;
    return localSerializedValueSize;
  }

  @Override
  @Released(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
  public synchronized void release() {
    @Released(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
    Object vo = valueObj;
    if (OffHeapHelper.releaseAndTrackOwner(vo, this)) {
      valueObj = null;
      valueObjReleased = true;
    }
  }

  public static void release(
      @Released(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE) Object o) {
    if (o instanceof GatewaySenderEventImpl) {
      ((GatewaySenderEventImpl) o).release();
    }
  }

  /**
   * Make a heap copy of this off-heap event and return it. A copy only needs to be made if the
   * event's value is stored off-heap. If it is already on the java heap then just return "this". If
   * it was stored off-heap and is no longer available (because it was released) then return null.
   */
  public GatewaySenderEventImpl makeHeapCopyIfOffHeap() {
    if (value != null || substituteValue != null) {
      // we have the value stored on the heap so return this
      return this;
    } else {
      Object v = valueObj;
      if (v == null) {
        if (valueObjReleased) {
          // this means that the original off heap value was freed
          return null;
        } else {
          return this;
        }
      }
      if (v instanceof StoredObject && ((StoredObject) v).hasRefCount()) {
        try {
          return makeCopy();
        } catch (IllegalStateException ex) {
          // this means that the original off heap value was freed
          return null;
        }
      } else {
        // the valueObj does not use refCounts so just return this.
        return this;
      }
    }
  }

  protected GatewaySenderEventImpl makeCopy() {
    return new GatewaySenderEventImpl(this);
  }

  public void setAcked(boolean acked) {
    isAcked = acked;
  }

  public enum TransactionMetadataDisposition {
    /**
     * Transaction metadata should be excluded from the event.
     */
    EXCLUDE,
    /**
     * Transaction metadata should be included in the event.
     */
    INCLUDE,
    /**
     * Transaction metadata should be included in the event and this is the last event in the
     * transaction.
     */
    INCLUDE_LAST_EVENT,
  }
}
