/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.tier.sockets;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DSCODE;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.offheap.AddressableMemoryManager;
import org.apache.geode.internal.offheap.StoredObject;

/**
 * Represents one unit of information (essentially a <code>byte</code> array) in the wire protocol.
 * Each server connection runs in its own thread to maximize concurrency and improve response times
 * to edge requests
 *
 * @see Message
 * @since GemFire 2.0.2
 */
public class Part {

  private static final byte BYTE_CODE = 0;
  private static final byte OBJECT_CODE = 1;

  private Version version;

  /**
   * Used to represent and empty byte array for bug 36279
   *
   * @since GemFire 5.1
   */
  private static final byte EMPTY_BYTEARRAY_CODE = 2;
  @Immutable
  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

  /**
   * The payload of this part. Could be null, a byte[] or a HeapDataOutputStream on the send side.
   * Could be null, or a byte[] on the receiver side.
   */
  private Object part;

  /** Is the payload (<code>part</code>) a serialized object? */
  private byte typeCode;

  public void init(byte[] v, byte tc) {
    if (tc == EMPTY_BYTEARRAY_CODE) {
      this.part = EMPTY_BYTE_ARRAY;
    } else {
      this.part = v;
    }
    this.typeCode = tc;
  }

  public void clear() {
    if (this.part != null) {
      if (this.part instanceof HeapDataOutputStream) {
        ((HeapDataOutputStream) this.part).close();
      }
      this.part = null;
    }
    this.typeCode = BYTE_CODE;
  }

  public boolean isNull() {
    if (this.part == null) {
      return true;
    }
    if (isObject() && this.part instanceof byte[]) {
      byte[] b = (byte[]) this.part;
      if (b.length == 1 && b[0] == DSCODE.NULL.toByte()) {
        return true;
      }
    }
    return false;
  }

  public boolean isObject() {
    return this.typeCode == OBJECT_CODE;
  }

  public boolean isBytes() {
    return this.typeCode == BYTE_CODE || this.typeCode == EMPTY_BYTEARRAY_CODE;
  }

  public void setPartState(byte[] b, boolean isObject) {
    if (isObject) {
      this.typeCode = OBJECT_CODE;
    } else if (b != null && b.length == 0) {
      this.typeCode = EMPTY_BYTEARRAY_CODE;
      b = EMPTY_BYTE_ARRAY;
    } else {
      this.typeCode = BYTE_CODE;
    }
    this.part = b;
  }

  public void setPartState(HeapDataOutputStream os, boolean isObject) {
    if (isObject) {
      this.typeCode = OBJECT_CODE;
      this.part = os;
    } else if (os != null && os.size() == 0) {
      this.typeCode = EMPTY_BYTEARRAY_CODE;
      this.part = EMPTY_BYTE_ARRAY;
    } else {
      this.typeCode = BYTE_CODE;
      this.part = os;
    }
  }

  public void setPartState(StoredObject so, boolean isObject) {
    if (isObject) {
      this.typeCode = OBJECT_CODE;
    } else if (so.getDataSize() == 0) {
      this.typeCode = EMPTY_BYTEARRAY_CODE;
      this.part = EMPTY_BYTE_ARRAY;
      return;
    } else {
      this.typeCode = BYTE_CODE;
    }
    if (so.hasRefCount()) {
      this.part = so;
    } else {
      this.part = so.getValueAsHeapByteArray();
    }
  }

  public byte getTypeCode() {
    return this.typeCode;
  }

  /**
   * Return the length of the part. The length is the number of bytes needed for its serialized
   * form.
   */
  public int getLength() {
    if (this.part == null) {
      return 0;
    } else if (this.part instanceof byte[]) {
      return ((byte[]) this.part).length;
    } else if (this.part instanceof StoredObject) {
      return ((StoredObject) this.part).getDataSize();
    } else {
      return ((HeapDataOutputStream) this.part).size();
    }
  }

  public String getString() {
    if (this.part == null) {
      return null;
    }
    if (!isBytes()) {
      Assert.assertTrue(false, "expected String part to be of type BYTE, part =" + this.toString());
    }
    return CacheServerHelper.fromUTF((byte[]) this.part);
  }

  @MakeNotStatic("not tied to the cache lifecycle")
  private static final Map<ByteArrayKey, String> CACHED_STRINGS = new ConcurrentHashMap<>();

  private static String getCachedString(byte[] serializedBytes) {
    ByteArrayKey key = new ByteArrayKey(serializedBytes);
    String result = CACHED_STRINGS.get(key);
    if (result == null) {
      result = CacheServerHelper.fromUTF(serializedBytes);
      CACHED_STRINGS.put(key, result);
    }
    return result;
  }

  /**
   * Used to wrap a byte array so that it can be used
   * as a key on a HashMap. This is needed so that
   * equals and hashCode will be based on the contents
   * of the byte array instead of the identity.
   */
  private static final class ByteArrayKey {
    private final byte[] bytes;

    public ByteArrayKey(byte[] bytes) {
      this.bytes = bytes;
    }

    @Override
    public int hashCode() {
      return Arrays.hashCode(bytes);
    }

    @Override
    public boolean equals(Object obj) {
      ByteArrayKey other = (ByteArrayKey) obj;
      return Arrays.equals(bytes, other.bytes);
    }
  }

  /**
   * Like getString but will also check a cache of frequently serialized strings.
   * The result will be added to the cache if it is not already in it.
   * NOTE: only call this for strings that are reused often (like region names).
   */
  public String getCachedString() {
    if (this.part == null) {
      return null;
    }
    if (!isBytes()) {
      Assert.assertTrue(false, "expected String part to be of type BYTE, part =" + this.toString());
    }
    return getCachedString((byte[]) this.part);
  }

  @Immutable
  private static final byte[][] BYTES = new byte[256][1];
  private static final int BYTES_OFFSET = -1 * Byte.MIN_VALUE;
  static {
    for (byte i = Byte.MIN_VALUE; i < Byte.MAX_VALUE; i++) {
      BYTES[i + BYTES_OFFSET][0] = i;
    }
  }

  public void setByte(byte b) {
    this.typeCode = BYTE_CODE;
    this.part = BYTES[b + BYTES_OFFSET];
  }

  public byte getByte() {
    if (!isBytes()) {
      Assert.assertTrue(false, "expected int part to be of type BYTE, part = " + this.toString());
    }
    if (getLength() != 1) {
      Assert.assertTrue(false,
          "expected int length to be 1 but it was " + getLength() + "; part = " + this.toString());
    }
    final byte[] bytes = getSerializedForm();
    return bytes[0];
  }

  public int getInt() {
    if (!isBytes()) {
      Assert.assertTrue(false, "expected int part to be of type BYTE, part = " + this.toString());
    }
    if (getLength() != 4) {
      Assert.assertTrue(false,
          "expected int length to be 4 but it was " + getLength() + "; part = " + this.toString());
    }
    byte[] bytes = getSerializedForm();
    return decodeInt(bytes, 0);
  }

  public static int decodeInt(byte[] bytes, int offset) {
    return (((bytes[offset + 0]) << 24) & 0xFF000000) | (((bytes[offset + 1]) << 16) & 0x00FF0000)
        | (((bytes[offset + 2]) << 8) & 0x0000FF00) | ((bytes[offset + 3]) & 0x000000FF);
  }

  @MakeNotStatic
  private static final Map<Integer, byte[]> CACHED_INTS = new ConcurrentHashMap<Integer, byte[]>();

  public void setInt(int v) {
    byte[] bytes = CACHED_INTS.get(v);
    if (bytes == null) {
      bytes = new byte[4];
      encodeInt(v, bytes);
      CACHED_INTS.put(v, bytes);
    }
    this.typeCode = BYTE_CODE;
    this.part = bytes;
  }

  /**
   * @since GemFire 5.7
   */
  public static void encodeInt(int v, byte[] bytes) {
    encodeInt(v, bytes, 0);
  }

  public static void encodeInt(int v, byte[] bytes, int offset) {
    // encode an int into the given byte array
    bytes[offset + 0] = (byte) ((v & 0xFF000000) >> 24);
    bytes[offset + 1] = (byte) ((v & 0x00FF0000) >> 16);
    bytes[offset + 2] = (byte) ((v & 0x0000FF00) >> 8);
    bytes[offset + 3] = (byte) (v & 0x000000FF);
  }

  public void setLong(long v) {
    byte[] bytes = new byte[8];
    bytes[0] = (byte) ((v & 0xFF00000000000000l) >> 56);
    bytes[1] = (byte) ((v & 0x00FF000000000000l) >> 48);
    bytes[2] = (byte) ((v & 0x0000FF0000000000l) >> 40);
    bytes[3] = (byte) ((v & 0x000000FF00000000l) >> 32);
    bytes[4] = (byte) ((v & 0x00000000FF000000l) >> 24);
    bytes[5] = (byte) ((v & 0x0000000000FF0000l) >> 16);
    bytes[6] = (byte) ((v & 0x000000000000FF00l) >> 8);
    bytes[7] = (byte) (v & 0xFF);
    this.typeCode = BYTE_CODE;
    this.part = bytes;
  }

  public long getLong() {
    if (!isBytes()) {
      Assert.assertTrue(false, "expected long part to be of type BYTE, part = " + this.toString());
    }
    if (getLength() != 8) {
      Assert.assertTrue(false,
          "expected long length to be 8 but it was " + getLength() + "; part = " + this.toString());
    }
    byte[] bytes = getSerializedForm();
    return ((((long) bytes[0]) << 56) & 0xFF00000000000000l)
        | ((((long) bytes[1]) << 48) & 0x00FF000000000000l)
        | ((((long) bytes[2]) << 40) & 0x0000FF0000000000l)
        | ((((long) bytes[3]) << 32) & 0x000000FF00000000l)
        | ((((long) bytes[4]) << 24) & 0x00000000FF000000l)
        | ((((long) bytes[5]) << 16) & 0x0000000000FF0000l)
        | ((((long) bytes[6]) << 8) & 0x000000000000FF00l) | (bytes[7] & 0x00000000000000FFl);
  }

  public byte[] getSerializedForm() {
    if (this.part == null) {
      return null;
    } else if (this.part instanceof byte[]) {
      return (byte[]) this.part;
    } else {
      return null; // should not be called on sender side?
    }
  }

  public Object getObject(boolean unzip) throws IOException, ClassNotFoundException {
    if (isBytes()) {
      return this.part;
    } else {
      if (this.version != null) {
        return CacheServerHelper.deserialize((byte[]) this.part, this.version, unzip);
      } else {
        return CacheServerHelper.deserialize((byte[]) this.part, unzip);
      }
    }
  }

  public Object getObject() throws IOException, ClassNotFoundException {
    return getObject(false);
  }

  public Object getStringOrObject() throws IOException, ClassNotFoundException {
    if (isObject()) {
      return getObject();
    } else {
      return getString();
    }
  }

  /**
   * Write the contents of this part to the specified output stream. This is only called for parts
   * that will not fit into the commBuffer so they need to be written directly to the stream. A
   * stream is used because the client is configured for old IO (instead of nio).
   *
   * @param buf the buffer to use if any data needs to be copied to one
   */
  public void writeTo(OutputStream out, ByteBuffer buf) throws IOException {
    if (getLength() > 0) {
      if (this.part instanceof byte[]) {
        byte[] bytes = (byte[]) this.part;
        out.write(bytes, 0, bytes.length);
      } else if (this.part instanceof StoredObject) {
        StoredObject so = (StoredObject) this.part;
        ByteBuffer sobb = so.createDirectByteBuffer();
        if (sobb != null) {
          HeapDataOutputStream.writeByteBufferToStream(out, buf, sobb);
        } else {
          int bytesToSend = so.getDataSize();
          long addr = so.getAddressForReadingData(0, bytesToSend);
          while (bytesToSend > 0) {
            if (buf.remaining() == 0) {
              HeapDataOutputStream.flushStream(out, buf);
            }
            buf.put(AddressableMemoryManager.readByte(addr));
            addr++;
            bytesToSend--;
          }
        }
      } else {
        HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
        hdos.sendTo(out, buf);
        hdos.rewind();
      }
    }
  }

  /**
   * Write the contents of this part to the specified byte buffer. Precondition: caller has already
   * checked the length of this part and it will fit into "buf".
   */
  public void writeTo(ByteBuffer buf) {
    if (getLength() > 0) {
      if (this.part instanceof byte[]) {
        buf.put((byte[]) this.part);
      } else if (this.part instanceof StoredObject) {
        StoredObject c = (StoredObject) this.part;
        ByteBuffer bb = c.createDirectByteBuffer();
        if (bb != null) {
          buf.put(bb);
        } else {
          int bytesToSend = c.getDataSize();
          long addr = c.getAddressForReadingData(0, bytesToSend);
          while (bytesToSend > 0) {
            buf.put(AddressableMemoryManager.readByte(addr));
            addr++;
            bytesToSend--;
          }
        }
      } else {
        HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
        hdos.sendTo(buf);
        hdos.rewind();
      }
    }
  }

  /**
   * Write the contents of this part to the specified socket channel using the specified byte
   * buffer. This is only called for parts that will not fit into the commBuffer so they need to be
   * written directly to the socket. Precondition: buf contains nothing that needs to be sent
   */
  public void writeTo(SocketChannel sc, ByteBuffer buf) throws IOException {
    if (getLength() > 0) {
      final int BUF_MAX = buf.capacity();
      if (this.part instanceof byte[]) {
        final byte[] bytes = (byte[]) this.part;
        int off = 0;
        int len = bytes.length;
        buf.clear();
        while (len > 0) {
          int bytesThisTime = len;
          if (bytesThisTime > BUF_MAX) {
            bytesThisTime = BUF_MAX;
          }
          buf.put(bytes, off, bytesThisTime);
          len -= bytesThisTime;
          off += bytesThisTime;
          buf.flip();
          while (buf.remaining() > 0) {
            sc.write(buf);
          }
          buf.clear();
        }
      } else if (this.part instanceof StoredObject) {
        // instead of copying the StoredObject to buf try to create a direct ByteBuffer and
        // just write it directly to the socket channel.
        StoredObject c = (StoredObject) this.part;
        ByteBuffer bb = c.createDirectByteBuffer();
        if (bb != null) {
          while (bb.remaining() > 0) {
            sc.write(bb);
          }
        } else {
          int len = c.getDataSize();
          long addr = c.getAddressForReadingData(0, len);
          buf.clear();
          while (len > 0) {
            int bytesThisTime = len;
            if (bytesThisTime > BUF_MAX) {
              bytesThisTime = BUF_MAX;
            }
            len -= bytesThisTime;
            while (bytesThisTime > 0) {
              buf.put(AddressableMemoryManager.readByte(addr));
              addr++;
              bytesThisTime--;
            }
            buf.flip();
            while (buf.remaining() > 0) {
              sc.write(buf);
            }
            buf.clear();
          }
        }
      } else {
        HeapDataOutputStream hdos = (HeapDataOutputStream) this.part;
        hdos.sendTo(sc, buf);
        hdos.rewind();
      }
    }
  }

  private static String typeCodeToString(byte c) {
    switch (c) {
      case BYTE_CODE:
        return "BYTE_CODE";
      case OBJECT_CODE:
        return "OBJECT_CODE";
      case EMPTY_BYTEARRAY_CODE:
        return "EMPTY_BYTEARRAY_CODE";
      default:
        return "unknown code " + c;
    }
  }

  @Override
  public String toString() {
    StringBuffer sb = new StringBuffer();
    sb.append("partCode=");
    sb.append(typeCodeToString(this.typeCode));
    sb.append(" partLength=" + getLength());
    return sb.toString();
  }

  public void setVersion(Version clientVersion) {
    this.version = clientVersion;
  }
}
