/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.Breadcrumbs;

/**
 * This class uniquely identifies any Region Operation like create, update destroy etc. It is
 * composed of three parts , namely :- 1) DistributedMembershipID 2) ThreadID 3) SequenceID This
 * helps in sequencing the events belonging to a unique producer.
 */
public class EventID implements DataSerializableFixedID, Serializable, Externalizable {
  private static final Logger logger = LogService.getLogger();

  /** turns on very verbose logging ove membership id bytes */
  private static final boolean LOG_ID_BYTES =
      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "log-event-member-id-bytes");

  /**
   * Uniquely identifies the distributed member VM in which the Event is produced
   */
  private byte[] membershipID;

  /**
   * Unqiuely identifies the thread producing the event
   */
  private long threadID;

  /**
   * Uniquely identifies individual event produced by a given thread
   */
  private long sequenceID;

  private int bucketID;

  private byte breadcrumbCounter = 0x0;

  public void incBreadcrumbCounter() {
    this.breadcrumbCounter++;
  }

  /** The versions in which this message was modified */
  @Immutable
  private static final Version[] dsfidVersions = new Version[] {Version.GFE_80};


  private static final ThreadLocal threadIDLocal = new ThreadLocal() {
    @Override
    protected Object initialValue() {
      return new ThreadAndSequenceIDWrapper();
    }
  };

  private transient int hashCode = 0;

  /**
   * the distributed system associated with the static client_side_event_identity
   */
  @MakeNotStatic
  private static volatile DistributedSystem system = null;

  /**
   * the membership id of the distributed system in this client (if running in a client) that is
   * reflected in client_side_event_identity
   */
  @MakeNotStatic
  private static DistributedMember systemMemberId;

  /**
   * this form of client ID is used in event identifiers to reduce the size of the ID
   */
  @MakeNotStatic
  private static volatile byte[] client_side_event_identity = null;

  /**
   * An array containing the helper class objects which are used to create optimized byte array for
   * an eventID , which can be sent on the network
   */
  @Immutable
  static final List<AbstractEventIDByteArrayFiller> fillerArray =
      Collections.unmodifiableList(Arrays.asList(
          new ByteEventIDByteArrayFiller(), new ShortEventIDByteArrayFiller(),
          new IntegerEventIDByteArrayFiller(), new LongEventIDByteArrayFiller()));

  /**
   * Constructor used for creating EventID object at the actual source of creation of Event. The
   * thread identification & sequence ID of the event is done using ThreadLocal object. The
   * membershipId must be created with EventID.getMembershipId() and not the methods or byte array
   * stored in a ClientProxyMembershipID as those are heavyweight identifiers and they will cause
   * serialization and comparison problems when used in EventIDs
   */
  private EventID(final byte[] membershipId) {
    // Assert.assertTrue(membershipId.length <= Short.MAX_VALUE);
    this.membershipID = membershipId;
    // TODO:Asif : If the DS is closed & restarted can we continue with the
    // existing Thread ID & Sequenec ID. Should not be an issue.
    // But we should not cache membershipID as for the same thread it can
    // differ.Hence it should be passed as parameter in the constructor
    ThreadAndSequenceIDWrapper wrapper = (ThreadAndSequenceIDWrapper) threadIDLocal.get();
    this.threadID = wrapper.threadID;
    this.sequenceID = wrapper.getAndIncrementSequenceID();
    this.bucketID = -1;
  }

  /**
   * constructor for creating an event ID originating in the local cache
   *
   * @param sys the local distributed system
   */
  public EventID(DistributedSystem sys) {
    this(initializeAndGetDSEventIdentity(sys));
  }

  public static byte[] getMembershipId(DistributedSystem sys) {
    return EventID.initializeAndGetDSEventIdentity(sys);
  }

  public static void unsetDS() {
    system = null;
  }

  /**
   * Convert a ClientProxyMembershipID distribted member ID array into one usable by EventIDs
   *
   * @param client the client's ID
   * @return a byte array that may be used in EventID formation
   */
  public static byte[] getMembershipId(ClientProxyMembershipID client) {
    try {
      HeapDataOutputStream hdos = new HeapDataOutputStream(256, client.getClientVersion());
      ((InternalDistributedMember) client.getDistributedMember()).writeEssentialData(hdos);
      return hdos.toByteArray();
    } catch (IOException ioe) {
      throw new InternalGemFireException(
          "Unable to serialize identity",
          ioe);
    }
  }

  /**
   * Returns the thread id used by the calling thread for its event ids
   *
   * @since GemFire 5.7
   */
  public static long getThreadId() {
    ThreadAndSequenceIDWrapper wrapper = (ThreadAndSequenceIDWrapper) threadIDLocal.get();
    return wrapper.threadID;
  }

  /**
   * Returns the next reservable sequence id used by the calling thread for its event ids. Note that
   * the returned id is not yet reserved by the calling thread.
   *
   * @since GemFire 5.7
   */
  public static long getSequenceId() {
    ThreadAndSequenceIDWrapper wrapper = (ThreadAndSequenceIDWrapper) threadIDLocal.get();
    return wrapper.sequenceID;
  }

  /**
   * Reserves and returns a sequence id for the calling thread to be used for an event id.
   *
   * @since GemFire 5.7
   */
  public static long reserveSequenceId() {
    ThreadAndSequenceIDWrapper wrapper = (ThreadAndSequenceIDWrapper) threadIDLocal.get();
    return wrapper.getAndIncrementSequenceID();
  }

  public void reserveSequenceId(int count) {
    ThreadAndSequenceIDWrapper wrapper = (ThreadAndSequenceIDWrapper) threadIDLocal.get();
    wrapper.reserveSequenceID(count);
  }

  /**
   * Constructor used for creating an EventID with specified sequenceID for putAll
   */
  public EventID(EventID eventId, int offset) {
    assert (eventId != null);
    this.membershipID = eventId.getMembershipID();
    this.threadID = eventId.getThreadID();
    this.sequenceID = eventId.getSequenceID() + offset;
    this.bucketID = -1;
  }

  /**
   * Constructor which explicitly sets all the fields and by-passes auto-generation of thread and
   * seq. ids. This is used by the QRM thread and by ServerConnection threads that have cached their
   * client ID byte arrays. It is also used by many unit test methods to create fake event
   * identifiers.
   *
   * @param memId - membership id for this entry - must be created by EventID.getMembershipID()
   * @param threadId - thread id for this entry
   * @param seqId - sequence id for this entry
   */
  public EventID(byte[] memId, long threadId, long seqId) {
    this.membershipID = memId;
    this.threadID = threadId;
    this.sequenceID = seqId;
    this.bucketID = -1;
  }

  public EventID(byte[] memId, long threadId, long seqId, int bucketId) {
    this.membershipID = memId;
    this.threadID = threadId;
    this.sequenceID = seqId;
    this.bucketID = bucketId;
  }

  /** support for migrating across threads for Hydra */
  public static Object getThreadLocalDataForHydra() {
    Object result = threadIDLocal.get();
    threadIDLocal.set(null);
    return result;
  }

  /** support for migrating across threads for Hydra */
  public static void setThreadLocalDataForHydra(Object wrapper) {
    if (!(wrapper instanceof ThreadAndSequenceIDWrapper)) {
      throw new IllegalArgumentException(
          "Expected a ThreadAndSequenceIdWrapper but received " + wrapper);
    }
    threadIDLocal.set(wrapper);
  }

  /**
   * Default constructor used for deserialization of EventID object
   *
   */
  public EventID() {}

  public long getThreadID() {
    return this.threadID;
  }

  public void setThreadID(long threadID) {
    this.threadID = threadID;
  }

  public byte[] getMembershipID() {
    return this.membershipID;
  }

  public int getBucketID() {
    return this.bucketID;
  }

  /**
   * starting in v6.5 this method returns a somewhat crippled Identifier. It is missing any durable
   * attributes and roles information but contains all other info about the member. This fixes bug
   * #39361.
   *
   * @return the member that initiated this event
   */
  public InternalDistributedMember getDistributedMember() {
    return getDistributedMember(Version.CURRENT);
  }

  /**
   * deserialize the memberID bytes using the given version. The correct thing to do would be to
   * have EventID carry the version ordinal of the serialized memberID, or to have it be part of the
   * memberID bytes and use that version to deserialize the bytes. Clients prior to 1.1.0 need to
   * have UUID bytes in the memberID. Newer clients don't require this.
   */
  public InternalDistributedMember getDistributedMember(Version targetVersion) {
    Version disVersion = null;
    if (targetVersion.compareTo(Version.GEODE_1_1_0) < 0) {
      // GEODE-3153: clients expect to receive UUID bytes, which are only
      // read if the stream's version is 1.0.0-incubating
      disVersion = Version.GFE_90;
    }
    ByteArrayDataInput dis = new ByteArrayDataInput(membershipID, disVersion);
    InternalDistributedMember result = null;
    try {
      result = InternalDistributedMember.readEssentialData(dis);
    } catch (IOException e) {
      // nothing can be done about this
    } catch (ClassNotFoundException e) {
      // ditto
    }
    return result;
  }

  public long getSequenceID() {
    return this.sequenceID;
  }

  /**
   * Returns a byte[] whose contents are calculated by calling
   * {@link #getOptimizedByteArrayForEventID}
   *
   * @since GemFire 5.7
   */
  public byte[] calcBytes() {
    return getOptimizedByteArrayForEventID(getThreadID(), getSequenceID());
  }

  @Override
  public int getDSFID() {
    return EVENT_ID;
  }

  @Override
  public void toData(DataOutput dop) throws IOException {
    Version version = InternalDataSerializer.getVersionForDataStream(dop);
    // if we are sending to old clients we need to reserialize the ID
    // using the client's version to ensure it gets the proper on-wire form
    // of the identifier
    // See GEODE-3072
    if (version.compareTo(Version.GEODE_1_1_0) < 0) {
      InternalDistributedMember member = getDistributedMember(Version.GFE_90);
      // reserialize with the client's version so that we write the UUID
      // bytes
      HeapDataOutputStream hdos = new HeapDataOutputStream(Version.GFE_90);
      member.writeEssentialData(hdos);
      DataSerializer.writeByteArray(hdos.toByteArray(), dop);
    } else {
      DataSerializer.writeByteArray(this.membershipID, dop);
    }
    DataSerializer.writeByteArray(getOptimizedByteArrayForEventID(this.threadID, this.sequenceID),
        dop);
    dop.writeInt(this.bucketID);
    dop.writeByte(this.breadcrumbCounter);
  }

  public void toDataPre_GFE_8_0_0_0(DataOutput dop) throws IOException {
    DataSerializer.writeByteArray(this.membershipID, dop);
    DataSerializer.writeByteArray(getOptimizedByteArrayForEventID(this.threadID, this.sequenceID),
        dop);
  }

  @Override
  public void fromData(DataInput di) throws IOException, ClassNotFoundException {
    this.membershipID = DataSerializer.readByteArray(di);
    ByteBuffer eventIdParts = ByteBuffer.wrap(DataSerializer.readByteArray(di));
    this.threadID = readEventIdPartsFromOptmizedByteArray(eventIdParts);
    this.sequenceID = readEventIdPartsFromOptmizedByteArray(eventIdParts);
    this.bucketID = di.readInt();
    this.breadcrumbCounter = di.readByte();
  }

  public void fromDataPre_GFE_8_0_0_0(DataInput di) throws IOException, ClassNotFoundException {
    this.membershipID = DataSerializer.readByteArray(di);
    ByteBuffer eventIdParts = ByteBuffer.wrap(DataSerializer.readByteArray(di));
    this.threadID = readEventIdPartsFromOptmizedByteArray(eventIdParts);
    this.sequenceID = readEventIdPartsFromOptmizedByteArray(eventIdParts);
  }

  @Override
  public void writeExternal(ObjectOutput out) throws IOException {
    DataSerializer.writeByteArray(this.membershipID, out);
    DataSerializer.writeByteArray(getOptimizedByteArrayForEventID(this.threadID, this.sequenceID),
        out);
    out.writeInt(this.bucketID);
  }

  @Override
  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
    this.membershipID = DataSerializer.readByteArray(in);
    ByteBuffer eventIdParts = ByteBuffer.wrap(DataSerializer.readByteArray(in));
    this.threadID = readEventIdPartsFromOptmizedByteArray(eventIdParts);
    this.sequenceID = readEventIdPartsFromOptmizedByteArray(eventIdParts);
    this.bucketID = in.readInt();
  }

  @Override
  public boolean equals(Object obj) {
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    EventID other = (EventID) obj;
    if (sequenceID != other.sequenceID)
      return false;
    if (threadID != other.threadID)
      return false;
    return equalMembershipIds(membershipID, other.membershipID);
  }

  /** GEODE_3072 - 1.0.0 client IDs contain a UUID and member-weight byte that are all zero */
  static final int NULL_90_MEMBER_DATA_LENGTH = 17;

  /** minimum length of an ID array */
  static final int MINIMIM_ID_LENGTH = 19;

  /**
   * check to see if membership ID byte arrays are equal
   */
  public static boolean equalMembershipIds(byte[] m1, byte[] m2) {
    int sizeDifference = Math.abs(m1.length - m2.length);
    if (sizeDifference != 0 && sizeDifference != NULL_90_MEMBER_DATA_LENGTH) {
      return false;
    }
    for (int i = 0; i < m1.length; i++) {
      if (i >= m2.length) {
        return nullUUIDCheck(m1, i);
      }
      if (m1[i] != m2[i]) {
        return false;
      }
    }
    if (m1.length != m2.length) {
      return nullUUIDCheck(m2, m1.length);
    }
    return true;
  }

  /**
   * GEODE-3072 - v1.0.0 memberIDs in EventIDs may have trailing bytes that should be ignored
   */
  private static boolean nullUUIDCheck(byte[] memberID, int position) {
    if (position < 0) {
      return false;
    }
    if (memberID.length - position != NULL_90_MEMBER_DATA_LENGTH) {
      return false;
    }
    for (int i = position; i < memberID.length; i++) {
      if (memberID[i] != 0) {
        return false;
      }
    }
    return true;
  }

  /**
   * form the hashcode for the memberID byte array
   */
  public static int hashCodeMemberId(byte[] memberID) {
    if (memberID.length < (NULL_90_MEMBER_DATA_LENGTH + MINIMIM_ID_LENGTH)
        || !nullUUIDCheck(memberID, memberID.length - NULL_90_MEMBER_DATA_LENGTH)) {
      return Arrays.hashCode(memberID);
    }
    byte[] newID = new byte[memberID.length - NULL_90_MEMBER_DATA_LENGTH];
    System.arraycopy(memberID, 0, newID, 0, newID.length);
    return Arrays.hashCode(newID);
  }

  public int hashCode() {
    if (hashCode == 0) {
      final int prime = 31;
      int result = 1;
      result = prime * result + hashCodeMemberId(membershipID);
      result = prime * result + (int) (sequenceID ^ (sequenceID >>> 32));
      result = prime * result + (int) (threadID ^ (threadID >>> 32));
      hashCode = result;
    }
    return hashCode;
  }

  String getShortClassName() {
    String cname = getClass().getName();
    return cname.substring(getClass().getPackage().getName().length() + 1);
  }

  public String expensiveToString() {
    Object mbr;
    try {
      mbr = InternalDistributedMember
          .readEssentialData(new ByteArrayDataInput(membershipID));
    } catch (Exception e) {
      mbr = membershipID; // punt and use the bytes
    }
    return "EventID[" + mbr + ";threadID=" + ThreadIdentifier.toDisplayString(threadID)
        + ";sequenceID=" + sequenceID + (Breadcrumbs.ENABLED ? ";bcrumb=" + breadcrumbCounter : "")
        + (bucketID >= 0 ? (";bucketID=" + bucketID) : "") + "]";
  }

  @Override
  public String toString() {
    if (logger.isDebugEnabled()) {
      return expensiveToString();
    } else {
      return cheapToString();
    }
  }


  public String cheapToString() {
    final StringBuffer buf = new StringBuffer();
    buf.append(getShortClassName());
    if (LOG_ID_BYTES) {
      buf.append("[membershipID=");
      for (int i = 0; i < membershipID.length; i++) {
        buf.append(membershipID[i]);
        if (i < membershipID.length - 1) {
          buf.append(',');
        }
      }
      buf.append(";");
    } else {
      buf.append("[id=").append(membershipID.length).append(" bytes;");
    }
    // buf.append(this.membershipID.toString());
    buf.append("threadID=");
    buf.append(ThreadIdentifier.toDisplayString(threadID));
    buf.append(";sequenceID=");
    buf.append(this.sequenceID);

    if (Breadcrumbs.ENABLED) {
      buf.append(";bcrumb=").append(this.breadcrumbCounter);
    }

    if (this.bucketID >= 0) {
      buf.append(";bucketId=");
      buf.append(this.bucketID);
    }
    buf.append("]");
    return buf.toString();
  }

  private static byte[] initializeAndGetDSEventIdentity(DistributedSystem sys) {
    if (sys == null) {
      // DistributedSystem is required now before handshaking -Kirk
      throw new IllegalStateException(
          "Attempting to handshake with CacheServer before creating DistributedSystem and Cache.");
    }
    if (EventID.system != sys) {
      // DS already exists... make sure it's for current DS connection
      EventID.systemMemberId = sys.getDistributedMember();
      try {
        HeapDataOutputStream hdos = new HeapDataOutputStream(256, Version.CURRENT);
        ((InternalDistributedMember) EventID.systemMemberId).writeEssentialData(hdos);
        client_side_event_identity = hdos.toByteArray();
      } catch (IOException ioe) {
        throw new InternalGemFireException(
            "Unable to serialize identity",
            ioe);
      }
      if (((InternalDistributedMember) EventID.systemMemberId).getPort() != 0) {
        EventID.system = sys;
      }
    }
    return EventID.client_side_event_identity;

  }

  /**
   * Returns the number of bytes needed to store the given value. This calculation is needed to
   * create the optimized byte-array for an eventId, which will be sent across the network
   *
   * @param id - the long value ( threadId or sequenceId in this context)
   * @return - the number of byte taken by this value
   */
  public static int getByteSizeForValue(long id) {
    int length = 0;

    // compare threadId to find its range
    if (id <= Byte.MAX_VALUE) {
      length = 1;
    } else if (id <= Short.MAX_VALUE) {
      length = 2;
    } else if (id <= Integer.MAX_VALUE) {
      length = 4;
    } else {
      length = 8;
    }

    return length;
  }

  /**
   * Gets the optmized byte-array representation of an eventId which can be sent across the network.
   * For client to server messages, the membership id part of event-id is not need to be sent with
   * each event. Also, the threadId and sequenceId need not be sent as long if their value is small.
   * This function returns the optimal byte-array for the eventId.
   * <p>
   * This is public for unit testing purposes only
   *
   * @param threadId - the long value of threadId
   * @param sequenceId - the long value of sequenceId
   * @return - the optimized byte-array representing the eventId
   */
  public static byte[] getOptimizedByteArrayForEventID(long threadId, long sequenceId) {

    int threadIdLength = getByteSizeForValue(threadId);
    int threadIdIndex = (threadIdLength == 1) ? 0 : ((threadIdLength / 4) + 1);

    int sequenceIdLength = getByteSizeForValue(sequenceId);
    int sequenceIdIndex = (sequenceIdLength == 1) ? 0 : ((sequenceIdLength / 4) + 1);

    int byteBufferLength = 2 + threadIdLength + sequenceIdLength;
    ByteBuffer buffer = ByteBuffer.allocate(byteBufferLength);
    fillerArray.get(threadIdIndex).fill(buffer, threadId);
    fillerArray.get(sequenceIdIndex).fill(buffer, sequenceId);
    return buffer.array();

  }

  /**
   * Reads the optimized byte-array representation of an eventId and returns the long value of
   * threadId or sequenceId ( the first invocation of this method on bytebuffer returns the threadId
   * and the second returns the sequenceId.
   *
   * @param buffer - the byte-buffer wrapping the optimized byte-array for the eventId
   * @return - long value of threadId or sequenceId
   */
  public static long readEventIdPartsFromOptmizedByteArray(ByteBuffer buffer) {
    byte byteType = buffer.get();
    long id = fillerArray.get(byteType).read(buffer);

    return id;
  }

  /**
   * Abstract helper class used to create optimized byte-array for the eventid, which will be sent
   * across the network.
   *
   */
  protected abstract static class AbstractEventIDByteArrayFiller {
    /**
     * This method adds to the byte-buffer, the token indicating the type of the passed 'id'
     * (threadId or sequenceId) and the optimal byte array representing the id depending on the
     * value of the 'id'.
     *
     * @param buffer - the byte buffer wrapping the byte array for the event id
     * @param id - the long value of threadId or sequenceId
     */
    public abstract void fill(ByteBuffer buffer, long id);

    /**
     * Reads the given buffer and returns the value as long.
     *
     * @param buffer - the byte-buffer containing the eventId parts which needs to be read
     * @return - the long value of id (threadId or sequenceId).
     */
    public abstract long read(ByteBuffer buffer);
  }

  protected static class ByteEventIDByteArrayFiller extends AbstractEventIDByteArrayFiller {
    /**
     * The token to indicate that given id ( threadId or sequenceId) is of type <code>Byte</code>
     */
    private static final byte EVENTID_BYTE = 0;

    /**
     * Writes the given 'id' to the given buffer as 'byte' preceeded by a token indicating that it
     * is written as 'byte' type.
     *
     * @param buffer - the buffer in which id is to be written
     * @param id - the threadId or sequenceId to be written
     */
    @Override
    public void fill(ByteBuffer buffer, long id) {
      buffer.put(EVENTID_BYTE);
      buffer.put((byte) id);
    }

    /**
     * Reads the byte value of id from the buffer and returns it as long.
     *
     * @param buffer - the buffer from which 'id'(threadId or sequenceId) is to be read
     * @return - the value of the id as long
     *
     */
    @Override
    public long read(ByteBuffer buffer) {
      long value = buffer.get();
      return value;
    }

  }

  protected static class ShortEventIDByteArrayFiller extends AbstractEventIDByteArrayFiller {
    /**
     * The token to indicate that given id ( threadId or sequenceId) is of type <code>Short</code>
     */
    private static final byte EVENTID_SHORT = 1;

    /**
     * Writes the given 'id' to the given buffer as 'short' preceeded by a token indicating that it
     * is written as 'short' type.
     *
     * @param buffer - the buffer in which id is to be written
     * @param id - the threadId or sequenceId to be written
     */
    @Override
    public void fill(ByteBuffer buffer, long id) {
      buffer.put(EVENTID_SHORT);
      buffer.putShort((short) id);
    }

    /**
     * Reads the short value of id from the buffer and returns it as long.
     *
     * @param buffer - the buffer from which 'id'(threadId or sequenceId) is to be read
     * @return - the value of the id as long
     *
     */
    @Override
    public long read(ByteBuffer buffer) {
      long value = buffer.getShort();
      return value;
    }
  }

  protected static class IntegerEventIDByteArrayFiller extends AbstractEventIDByteArrayFiller {
    /**
     * The token to indicate that given id ( threadId or sequenceId) is of type <code>Integer</code>
     */
    private static final byte EVENTID_INT = 2;

    /**
     * Writes the given 'id' to the given buffer as 'int' preceeded by a token indicating that it is
     * written as 'int' type.
     *
     * @param buffer - the buffer in which id is to be written
     * @param id - the threadId or sequenceId to be written
     */
    @Override
    public void fill(ByteBuffer buffer, long id) {
      buffer.put(EVENTID_INT);
      buffer.putInt((int) id);
    }

    /**
     * Reads the int value of id from the buffer and returns it as long.
     *
     * @param buffer - the buffer from which 'id'(threadId or sequenceId) is to be read
     * @return - the value of the id as long
     *
     */
    @Override
    public long read(ByteBuffer buffer) {
      long value = buffer.getInt();
      return value;
    }
  }

  protected static class LongEventIDByteArrayFiller extends AbstractEventIDByteArrayFiller {
    /**
     * The token to indicate that given id ( threadId or sequenceId) is of type <code>Long</code>
     */
    private static final byte EVENTID_LONG = 3;

    /**
     * Writes the given 'id' to the given buffer as 'long' preceeded by a token indicating that it
     * is written as 'long' type.
     *
     * @param buffer - the buffer in which id is to be written
     * @param id - the threadId or sequenceId to be written
     */
    @Override
    public void fill(ByteBuffer buffer, long id) {
      buffer.put(EVENTID_LONG);
      buffer.putLong(id);
    }

    /**
     * Reads the long value of id from the buffer and returns it.
     *
     * @param buffer - the buffer from which 'id'(threadId or sequenceId) is to be read
     * @return - the value of the id as long
     *
     */
    @Override
    public long read(ByteBuffer buffer) {
      long value = buffer.getLong();
      return value;
    }
  }

  static class ThreadAndSequenceIDWrapper {
    final long threadID;

    long sequenceID = (HARegionQueue.INIT_OF_SEQUENCEID + 1);

    @MakeNotStatic
    private static final AtomicLong atmLong = new AtomicLong(0);

    ThreadAndSequenceIDWrapper() {
      threadID = atmLong.incrementAndGet();
    }

    long getAndIncrementSequenceID() {
      return this.sequenceID++;
    }

    void reserveSequenceID(int size) {
      this.sequenceID += size;
    }
  }

  @Override
  public Version[] getSerializationVersions() {
    return dsfidVersions;
  }
}
