/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.hadoop.ozone.container.common.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
    ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
    .ContainerDataProto;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.yaml.snakeyaml.Yaml;

import javax.annotation.Nullable;

import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
import static org.apache.hadoop.ozone.OzoneConsts.DATA_SCAN_TIMESTAMP;
import static org.apache.hadoop.ozone.OzoneConsts.LAYOUTVERSION;
import static org.apache.hadoop.ozone.OzoneConsts.MAX_SIZE;
import static org.apache.hadoop.ozone.OzoneConsts.METADATA;
import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_NODE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_PIPELINE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.STATE;

/**
 * ContainerData is the in-memory representation of container metadata and is
 * represented on disk by the .container file.
 */
public abstract class ContainerData {

  //Type of the container.
  // For now, we support only KeyValueContainer.
  private final ContainerType containerType;

  // Unique identifier for the container
  private final long containerID;

  // Layout version of the container data
  private final int layOutVersion;

  // Metadata of the container will be a key value pair.
  // This can hold information like volume name, owner etc.,
  private final Map<String, String> metadata;

  // Path to Physical file system where chunks are stored.
  private String chunksPath;

  // State of the Container
  private ContainerDataProto.State state;

  private final long maxSize;

  private boolean committedSpace;

  //ID of the pipeline where this container is created
  private String originPipelineId;
  //ID of the datanode where this container is created
  private String originNodeId;

  /** parameters for read/write statistics on the container. **/
  private final AtomicLong readBytes;
  private final AtomicLong writeBytes;
  private final AtomicLong readCount;
  private final AtomicLong writeCount;
  private final AtomicLong bytesUsed;
  private final AtomicLong keyCount;

  private HddsVolume volume;

  private String checksum;

  /** Timestamp of last data scan (milliseconds since Unix Epoch).
   * {@code null} if not yet scanned (or timestamp not recorded,
   * eg. in prior versions). */
  private Long dataScanTimestamp; // for serialization
  private transient Optional<Instant> lastDataScanTime = Optional.empty();

  public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8;
  private static final String DUMMY_CHECKSUM = new String(new byte[64],
      CHARSET_ENCODING);

  // Common Fields need to be stored in .container file.
  protected static final List<String> YAML_FIELDS =
      Collections.unmodifiableList(Lists.newArrayList(
      CONTAINER_TYPE,
      CONTAINER_ID,
      LAYOUTVERSION,
      STATE,
      METADATA,
      MAX_SIZE,
      CHECKSUM,
      DATA_SCAN_TIMESTAMP,
      ORIGIN_PIPELINE_ID,
      ORIGIN_NODE_ID));

  /**
   * Creates a ContainerData Object, which holds metadata of the container.
   * @param type - ContainerType
   * @param containerId - ContainerId
   * @param layOutVersion - Container layOutVersion
   * @param size - Container maximum size in bytes
   * @param originPipelineId - Pipeline Id where this container is/was created
   * @param originNodeId - Node Id where this container is/was created
   */
  protected ContainerData(ContainerType type, long containerId,
      ChunkLayOutVersion layOutVersion, long size, String originPipelineId,
      String originNodeId) {
    Preconditions.checkNotNull(type);

    this.containerType = type;
    this.containerID = containerId;
    this.layOutVersion = layOutVersion.getVersion();
    this.metadata = new TreeMap<>();
    this.state = ContainerDataProto.State.OPEN;
    this.readCount = new AtomicLong(0L);
    this.readBytes =  new AtomicLong(0L);
    this.writeCount =  new AtomicLong(0L);
    this.writeBytes =  new AtomicLong(0L);
    this.bytesUsed = new AtomicLong(0L);
    this.keyCount = new AtomicLong(0L);
    this.maxSize = size;
    this.originPipelineId = originPipelineId;
    this.originNodeId = originNodeId;
    setChecksumTo0ByteArray();
  }

  protected ContainerData(ContainerData source) {
    this(source.getContainerType(), source.getContainerID(),
        source.getLayOutVersion(), source.getMaxSize(),
        source.getOriginPipelineId(), source.getOriginNodeId());
  }

  /**
   * Returns the containerID.
   */
  public long getContainerID() {
    return containerID;
  }

  /**
   * Returns the path to base dir of the container.
   * @return Path to base dir.
   */
  public abstract String getContainerPath();

  /**
   * Returns the type of the container.
   * @return ContainerType
   */
  public ContainerType getContainerType() {
    return containerType;
  }


  /**
   * Returns the state of the container.
   * @return ContainerLifeCycleState
   */
  public synchronized ContainerDataProto.State getState() {
    return state;
  }

  /**
   * Set the state of the container.
   * @param state
   */
  public synchronized void setState(ContainerDataProto.State state) {
    ContainerDataProto.State oldState = this.state;
    this.state = state;

    if ((oldState == ContainerDataProto.State.OPEN) &&
        (state != oldState)) {
      releaseCommitSpace();
    }

    /**
     * commit space when container transitions (back) to Open.
     * when? perhaps closing a container threw an exception
     */
    if ((state == ContainerDataProto.State.OPEN) &&
        (state != oldState)) {
      Preconditions.checkState(getMaxSize() > 0);
      commitSpace();
    }
  }

  /**
   * Return's maximum size of the container in bytes.
   * @return maxSize in bytes
   */
  public long getMaxSize() {
    return maxSize;
  }

  /**
   * Returns the layOutVersion of the actual container data format.
   * @return layOutVersion
   */
  public ChunkLayOutVersion getLayOutVersion() {
    return ChunkLayOutVersion.getChunkLayOutVersion(layOutVersion);
  }

  /**
   * Get chunks path.
   * @return - Path where chunks are stored
   */
  public String getChunksPath() {
    return chunksPath;
  }

  /**
   * Set chunks Path.
   * @param chunkPath - File path.
   */
  public void setChunksPath(String chunkPath) {
    this.chunksPath = chunkPath;
  }

  /**
   * Add/Update metadata.
   * We should hold the container lock before updating the metadata as this
   * will be persisted on disk. Unless, we are reconstructing ContainerData
   * from protoBuf or from on disk .container file in which case lock is not
   * required.
   */
  public void addMetadata(String key, String value) {
    metadata.put(key, value);
  }

  /**
   * Retuns metadata of the container.
   * @return metadata
   */
  public Map<String, String> getMetadata() {
    return Collections.unmodifiableMap(this.metadata);
  }

  /**
   * Set metadata.
   * We should hold the container lock before updating the metadata as this
   * will be persisted on disk. Unless, we are reconstructing ContainerData
   * from protoBuf or from on disk .container file in which case lock is not
   * required.
   */
  public void setMetadata(Map<String, String> metadataMap) {
    metadata.clear();
    metadata.putAll(metadataMap);
  }

  /**
   * checks if the container is open.
   * @return - boolean
   */
  public synchronized  boolean isOpen() {
    return ContainerDataProto.State.OPEN == state;
  }

  /**
   * checks if the container is invalid.
   * @return - boolean
   */
  public synchronized boolean isValid() {
    return ContainerDataProto.State.INVALID != state;
  }

  /**
   * checks if the container is closed.
   * @return - boolean
   */
  public synchronized boolean isClosed() {
    return ContainerDataProto.State.CLOSED == state;
  }

  /**
   * checks if the container is quasi closed.
   * @return - boolean
   */
  public synchronized boolean isQuasiClosed() {
    return ContainerDataProto.State.QUASI_CLOSED == state;
  }

  /**
   * checks if the container is unhealthy.
   * @return - boolean
   */
  public synchronized boolean isUnhealthy() {
    return ContainerDataProto.State.UNHEALTHY == state;
  }

  /**
   * Marks this container as quasi closed.
   */
  public synchronized void quasiCloseContainer() {
    setState(ContainerDataProto.State.QUASI_CLOSED);
  }

  /**
   * Marks this container as closed.
   */
  public synchronized void closeContainer() {
    setState(ContainerDataProto.State.CLOSED);
  }

  private void releaseCommitSpace() {
    long unused = getMaxSize() - getBytesUsed();

    // only if container size < max size
    if (unused > 0 && committedSpace) {
      getVolume().incCommittedBytes(0 - unused);
    }
    committedSpace = false;
  }

  /**
   * add available space in the container to the committed space in the volume.
   * available space is the number of bytes remaining till max capacity.
   */
  public void commitSpace() {
    long unused = getMaxSize() - getBytesUsed();
    ContainerDataProto.State myState = getState();
    HddsVolume cVol;

    //we don't expect duplicate calls
    Preconditions.checkState(!committedSpace);

    // Only Open Containers have Committed Space
    if (myState != ContainerDataProto.State.OPEN) {
      return;
    }

    // junit tests do not always set up volume
    cVol = getVolume();
    if (unused > 0 && (cVol != null)) {
      cVol.incCommittedBytes(unused);
      committedSpace = true;
    }
  }

  /**
   * Get the number of bytes read from the container.
   * @return the number of bytes read from the container.
   */
  public long getReadBytes() {
    return readBytes.get();
  }

  /**
   * Increase the number of bytes read from the container.
   * @param bytes number of bytes read.
   */
  public void incrReadBytes(long bytes) {
    this.readBytes.addAndGet(bytes);
  }

  /**
   * Get the number of times the container is read.
   * @return the number of times the container is read.
   */
  public long getReadCount() {
    return readCount.get();
  }

  /**
   * Increase the number of container read count by 1.
   */
  public void incrReadCount() {
    this.readCount.incrementAndGet();
  }

  /**
   * Get the number of bytes write into the container.
   * @return the number of bytes write into the container.
   */
  public long getWriteBytes() {
    return writeBytes.get();
  }

  /**
   * Increase the number of bytes write into the container.
   * Also decrement committed bytes against the bytes written.
   * @param bytes the number of bytes write into the container.
   */
  public void incrWriteBytes(long bytes) {
    long unused = getMaxSize() - getBytesUsed();

    this.writeBytes.addAndGet(bytes);

    // only if container size < max size
    if (committedSpace && unused > 0) {
      //with this write, container size might breach max size
      long decrement = Math.min(bytes, unused);
      this.getVolume().incCommittedBytes(0 - decrement);
    }
  }

  /**
   * Get the number of writes into the container.
   * @return the number of writes into the container.
   */
  public long getWriteCount() {
    return writeCount.get();
  }

  /**
   * Increase the number of writes into the container by 1.
   */
  public void incrWriteCount() {
    this.writeCount.incrementAndGet();
  }

  /**
   * Sets the number of bytes used by the container.
   * @param used
   */
  public void setBytesUsed(long used) {
    this.bytesUsed.set(used);
  }

  /**
   * Get the number of bytes used by the container.
   * @return the number of bytes used by the container.
   */
  public long getBytesUsed() {
    return bytesUsed.get();
  }

  /**
   * Increase the number of bytes used by the container.
   * @param used number of bytes used by the container.
   * @return the current number of bytes used by the container afert increase.
   */
  public long incrBytesUsed(long used) {
    return this.bytesUsed.addAndGet(used);
  }

  /**
   * Decrease the number of bytes used by the container.
   * @param reclaimed the number of bytes reclaimed from the container.
   * @return the current number of bytes used by the container after decrease.
   */
  public long decrBytesUsed(long reclaimed) {
    return this.bytesUsed.addAndGet(-1L * reclaimed);
  }

  /**
   * Set the Volume for the Container.
   * This should be called only from the createContainer.
   * @param hddsVolume
   */
  public void setVolume(HddsVolume hddsVolume) {
    this.volume = hddsVolume;
  }

  /**
   * Returns the volume of the Container.
   * @return HddsVolume
   */
  public HddsVolume getVolume() {
    return volume;
  }

  /**
   * Increments the number of keys in the container.
   */
  public void incrKeyCount() {
    this.keyCount.incrementAndGet();
  }

  /**
   * Decrements number of keys in the container.
   */
  public void decrKeyCount() {
    this.keyCount.decrementAndGet();
  }

  /**
   * Decrease the count of keys in the container.
   *
   * @param deletedKeyCount
   */
  public void decrKeyCount(long deletedKeyCount) {
    this.keyCount.addAndGet(-1 * deletedKeyCount);
  }

  /**
   * Returns number of keys in the container.
   * @return key count
   */
  public long getKeyCount() {
    return this.keyCount.get();
  }

  /**
   * Set's number of keys in the container.
   * @param count
   */
  public void setKeyCount(long count) {
    this.keyCount.set(count);
  }

  public void setChecksumTo0ByteArray() {
    this.checksum = DUMMY_CHECKSUM;
  }

  public void setChecksum(String checkSum) {
    this.checksum = checkSum;
  }

  public String getChecksum() {
    return this.checksum;
  }

  /**
   * @return {@code Optional} with the timestamp of last data scan.
   * {@code absent} if not yet scanned or timestamp was not recorded.
   */
  public Optional<Instant> lastDataScanTime() {
    return lastDataScanTime;
  }

  public void updateDataScanTime(@Nullable Instant time) {
    lastDataScanTime = Optional.ofNullable(time);
    dataScanTimestamp = time != null ? time.toEpochMilli() : null;
  }

  // for deserialization
  public void setDataScanTimestamp(Long timestamp) {
    dataScanTimestamp = timestamp;
    lastDataScanTime = timestamp != null
        ? Optional.of(Instant.ofEpochMilli(timestamp))
        : Optional.empty();
  }

  public Long getDataScanTimestamp() {
    return dataScanTimestamp;
  }

  /**
   * Returns the origin pipeline Id of this container.
   * @return origin node Id
   */
  public String getOriginPipelineId() {
    return originPipelineId;
  }

  /**
   * Returns the origin node Id of this container.
   * @return origin node Id
   */
  public String getOriginNodeId() {
    return originNodeId;
  }

  /**
   * Compute the checksum for ContainerData using the specified Yaml (based
   * on ContainerType) and set the checksum.
   *
   * Checksum of ContainerData is calculated by setting the
   * {@link ContainerData#checksum} field to a 64-byte array with all 0's -
   * {@link ContainerData#DUMMY_CHECKSUM}. After the checksum is calculated,
   * the checksum field is updated with this value.
   *
   * @param yaml Yaml for ContainerType to get the ContainerData as Yaml String
   * @throws IOException
   */
  public void computeAndSetChecksum(Yaml yaml) throws IOException {
    // Set checksum to dummy value - 0 byte array, to calculate the checksum
    // of rest of the data.
    setChecksumTo0ByteArray();

    // Dump yaml data into a string to compute its checksum
    String containerDataYamlStr = yaml.dump(this);

    this.checksum = ContainerUtils.getChecksum(containerDataYamlStr);
  }

  /**
   * Returns a ProtoBuf Message from ContainerData.
   *
   * @return Protocol Buffer Message
   */
  public abstract ContainerProtos.ContainerDataProto getProtoBufMessage();

  /**
   * Returns the blockCommitSequenceId.
   */
  public abstract long getBlockCommitSequenceId();

  public void updateReadStats(long length) {
    incrReadCount();
    incrReadBytes(length);
  }

  public void updateWriteStats(long bytesWritten, boolean overwrite) {
    if (!overwrite) {
      incrBytesUsed(bytesWritten);
    }
    incrWriteCount();
    incrWriteBytes(bytesWritten);
  }

}
