package org.apache.helix;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.manager.zk.serializer.JacksonPayloadSerializer;
import org.apache.helix.manager.zk.serializer.PayloadSerializer;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Generic Record Format to store data at a Node This can be used to store
 * simpleFields mapFields listFields
 */
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonSerialize(include = Inclusion.NON_NULL)
public class ZNRecord {
  static Logger _logger = LoggerFactory.getLogger(ZNRecord.class);
  private final String id;

  @JsonIgnore(true)
  public static final String LIST_FIELD_BOUND = "listField.bound";

  @JsonIgnore(true)
  public static final int SIZE_LIMIT = 1000 * 1024; // leave a margin out of 1M

  // We don't want the _deltaList to be serialized and deserialized
  private List<ZNRecordDelta> _deltaList = new ArrayList<ZNRecordDelta>();

  private Map<String, String> simpleFields;
  private Map<String, Map<String, String>> mapFields;
  private Map<String, List<String>> listFields;
  private byte[] rawPayload;

  private PayloadSerializer _serializer;

  // the version field of zookeeper Stat
  private int _version;

  private long _creationTime;

  private long _modifiedTime;

  private long _ephemeralOwner;

  /**
   * Initialize with an identifier
   * @param id
   */
  @JsonCreator
  public ZNRecord(@JsonProperty("id") String id) {
    this.id = id;
    simpleFields = new TreeMap<>();
    mapFields = new TreeMap<>();
    listFields = new TreeMap<>();
    rawPayload = null;
    _serializer = new JacksonPayloadSerializer();
  }

  /**
   * Initialize with a pre-populated ZNRecord
   * @param record
   */
  public ZNRecord(ZNRecord record) {
    this(record, record.getId());
  }

  /**
   * Initialize with a pre-populated ZNRecord, overwriting the identifier
   * @param record
   * @param id
   */
  public ZNRecord(ZNRecord record, String id) {
    this(id);
    simpleFields.putAll(record.getSimpleFields());
    mapFields.putAll(record.getMapFields());
    listFields.putAll(record.getListFields());
    if (record.rawPayload != null) {
      rawPayload = new byte[record.rawPayload.length];
      System.arraycopy(record.rawPayload, 0, rawPayload, 0, record.rawPayload.length);
    } else {
      rawPayload = null;
    }
    _version = record.getVersion();
    _creationTime = record.getCreationTime();
    _modifiedTime = record.getModifiedTime();
    _ephemeralOwner = record.getEphemeralOwner();
  }

  /**
   * Set a custom {@link PayloadSerializer} to allow including arbitrary data
   * @param serializer
   */
  @JsonIgnore(true)
  public void setPayloadSerializer(PayloadSerializer serializer) {
    _serializer = serializer;
  }

  /**
   * Set the list of updates to this ZNRecord
   * @param deltaList
   */
  @JsonIgnore(true)
  public void setDeltaList(List<ZNRecordDelta> deltaList) {
    _deltaList = deltaList;
  }

  /**
   * Get the list of updates to this ZNRecord
   * @return list of {@link ZNRecordDelta}
   */
  @JsonIgnore(true)
  public List<ZNRecordDelta> getDeltaList() {
    return _deltaList;
  }

  /**
   * Get all plain key, value fields
   * @return Map of simple fields
   */
  @JsonProperty
  public Map<String, String> getSimpleFields() {
    return simpleFields;
  }

  /**
   * Set all plain key, value fields
   * @param simpleFields
   */
  @JsonProperty
  public void setSimpleFields(Map<String, String> simpleFields) {
    this.simpleFields = simpleFields;
  }

  /**
   * Get all fields whose values are key, value properties
   * @return all map fields
   */
  @JsonProperty
  public Map<String, Map<String, String>> getMapFields() {
    return mapFields;
  }

  /**
   * Set all fields whose values are key, value properties
   * @param mapFields
   */
  @JsonProperty
  public void setMapFields(Map<String, Map<String, String>> mapFields) {
    this.mapFields = mapFields;
  }

  /**
   * Get all fields whose values are a list of values
   * @return all list fields
   */
  @JsonProperty
  public Map<String, List<String>> getListFields() {
    return listFields;
  }

  /**
   * Set all fields whose values are a list of values
   * @param listFields
   */
  @JsonProperty
  public void setListFields(Map<String, List<String>> listFields) {
    this.listFields = listFields;
  }

  /**
   * Set a simple key, value field
   * @param k
   * @param v
   */
  @JsonProperty
  public void setSimpleField(String k, String v) {
    simpleFields.put(k, v);
  }

  @JsonProperty
  public String getId() {
    return id;
  }

  /**
   * Set arbitrary data serialized as a byte array payload. Consider using
   * {@link #setPayload(Object)} instead
   * @param payload
   */
  @JsonProperty
  public void setRawPayload(byte[] payload) {
    rawPayload = payload;
  }

  /**
   * Get arbitrary data serialized as a byte array payload. Consider using
   * {@link #getPayload(Class)} instead
   * @return
   */
  @JsonProperty
  public byte[] getRawPayload() {
    return rawPayload;
  }

  /**
   * Set a typed payload that will be serialized and persisted.
   * @param payload
   */
  @JsonIgnore(true)
  public <T> void setPayload(T payload) {
    if (_serializer != null && payload != null) {
      rawPayload = _serializer.serialize(payload);
    } else {
      rawPayload = null;
    }
  }

  /**
   * Get a typed deserialized payload
   * @param clazz
   * @return
   */
  @JsonIgnore(true)
  public <T> T getPayload(Class<T> clazz) {
    if (_serializer != null && rawPayload != null) {
      return _serializer.deserialize(clazz, rawPayload);
    } else {
      return null;
    }
  }

  /**
   * Set a single String --> Map field
   * @param k
   * @param v
   */
  public void setMapField(String k, Map<String, String> v) {
    mapFields.put(k, v);
  }

  /**
   * Set a single String --> List field
   * @param k
   * @param v
   */
  public void setListField(String k, List<String> v) {
    listFields.put(k, v);
  }

  /**
   * Get a single String field
   * @param k
   * @return String field
   */
  public String getSimpleField(String k) {
    return simpleFields.get(k);
  }

  /**
   * Get a single Map field
   * @param k
   * @return String --> String map
   */
  public Map<String, String> getMapField(String k) {
    return mapFields.get(k);
  }

  /**
   * Get a single List field
   * @param k
   * @return String list
   */
  public List<String> getListField(String k) {
    return listFields.get(k);
  }

  /**
   * Set a single simple int field
   * @param k
   * @param v
   */
  public void setIntField(String k, int v) {
    setSimpleField(k, Integer.toString(v));
  }

  /**
   * Get a single int field
   * @param k
   * @param defaultValue
   * @return the int value, or defaultValue if not present
   */
  public int getIntField(String k, int defaultValue) {
    int v = defaultValue;
    String valueStr = getSimpleField(k);
    if (valueStr != null) {
      try {
        v = Integer.parseInt(valueStr);
      } catch (NumberFormatException e) {
        _logger.warn("", e);
      }
    }
    return v;
  }

  /**
   * Set a single simple long field
   * @param k
   * @param v
   */
  public void setLongField(String k, long v) {
    setSimpleField(k, Long.toString(v));
  }

  /**
   * Get a single long field
   * @param k
   * @param defaultValue
   * @return the long value, or defaultValue if not present
   */
  public long getLongField(String k, long defaultValue) {
    long v = defaultValue;
    String valueStr = getSimpleField(k);
    if (valueStr != null) {
      try {
        v = Long.parseLong(valueStr);
      } catch (NumberFormatException e) {
        _logger.warn("", e);
      }
    }
    return v;
  }

  /**
   * Set a single simple double field
   * @param k
   * @param v
   */
  public void setDoubleField(String k, double v) {
    setSimpleField(k, Double.toString(v));
  }

  /**
   * Get a single double field
   * @param k
   * @param defaultValue
   * @return the double value, or defaultValue if not present
   */
  public double getDoubleField(String k, double defaultValue) {
    double v = defaultValue;
    String valueStr = getSimpleField(k);
    if (valueStr != null) {
      try {
        v = Double.parseDouble(valueStr);
      } catch (NumberFormatException e) {
        _logger.warn("", e);
      }
    }
    return v;
  }

  /**
   * Set a single simple boolean field
   * @param k
   * @param v
   */
  public void setBooleanField(String k, boolean v) {
    setSimpleField(k, Boolean.toString(v));
  }

  /**
   * Get a single boolean field
   * @param k
   * @param defaultValue
   * @return the boolean field, or defaultValue if not present
   */
  public boolean getBooleanField(String k, boolean defaultValue) {
    boolean v = defaultValue;
    String valueStr = getSimpleField(k);
    if (valueStr != null) {
      // Boolean.parseBoolean() doesn't throw an exception if the string isn't a valid boolean.
      // Thus, a direct comparison is necessary to make sure the value is actually "true" or
      // "false"
      if (valueStr.equalsIgnoreCase(Boolean.TRUE.toString())) {
        v = true;
      } else if (valueStr.equalsIgnoreCase(Boolean.FALSE.toString())) {
        v = false;
      }
    }
    return v;
  }

  /**
   * Set a single simple Enum field
   * @param k
   * @param v
   */
  public <T extends Enum<T>> void setEnumField(String k, T v) {
    setSimpleField(k, v.toString());
  }

  /**
   * Get a single Enum field
   * @param k
   * @param enumType
   * @param defaultValue
   * @return the Enum field of enumType, or defaultValue if not present
   */
  public <T extends Enum<T>> T getEnumField(String k, Class<T> enumType, T defaultValue) {
    T v = defaultValue;
    String valueStr = getSimpleField(k);
    if (valueStr != null) {
      try {
        v = Enum.valueOf(enumType, valueStr);
      } catch (NullPointerException e) {
        _logger.warn("", e);
      } catch (IllegalArgumentException e) {
        _logger.warn("", e);
      }
    }
    return v;
  }

  /**
   * Get a single String field
   * @param k
   * @param defaultValue
   * @return the String value, or defaultValue if not present
   */
  public String getStringField(String k, String defaultValue) {
    String v = defaultValue;
    String valueStr = getSimpleField(k);
    if (valueStr != null) {
      v = valueStr;
    }
    return v;
  }

  @Override
  public String toString() {
    StringBuffer sb = new StringBuffer();
    sb.append(id + ", ");
    if (simpleFields != null) {
      sb.append(simpleFields);
    }
    if (mapFields != null) {
      sb.append(mapFields);
    }
    if (listFields != null) {
      sb.append(listFields);
    }
    return sb.toString();
  }

  /**
   * merge functionality is used to merge multiple znrecord into a single one.
   * This will make use of the id of each ZNRecord and append it to every key
   * thus making key unique. This is needed to optimize on the watches.
   * @param record
   */
  public void merge(ZNRecord record) {
    if (record == null) {
      return;
    }

    if (record.getDeltaList().size() > 0) {
      _logger.info("Merging with delta list, recordId = " + id + " other:" + record.getId());
      merge(record.getDeltaList());
      return;
    }
    simpleFields.putAll(record.simpleFields);
    for (String key : record.mapFields.keySet()) {
      Map<String, String> map = mapFields.get(key);
      if (map != null) {
        map.putAll(record.mapFields.get(key));
      } else {
        mapFields.put(key, record.mapFields.get(key));
      }
    }
    for (String key : record.listFields.keySet()) {
      List<String> list = listFields.get(key);
      if (list != null) {
        list.addAll(record.listFields.get(key));
      } else {
        listFields.put(key, record.listFields.get(key));
      }
    }
  }

  /**
   * Replace functionality is used to update this ZNRecord with the given ZNRecord. The value of a
   * field in this record will be replaced with the value of the same field in given record if it
   * presents. If there is new field in given ZNRecord but not in this record, add that field into
   * this record.
   * The list fields and map fields will be replaced as a single entry.
   *
   * @param record
   */
  public void update(ZNRecord record) {
    if (record != null) {
      simpleFields.putAll(record.simpleFields);
      listFields.putAll(record.listFields);
      mapFields.putAll(record.mapFields);
    }
  }

  /**
   * Merge in a {@link ZNRecordDelta} corresponding to its merge policy
   * @param delta
   */
  void merge(ZNRecordDelta delta) {
    if (delta.getMergeOperation() == MergeOperation.ADD) {
      merge(delta.getRecord());
    } else if (delta.getMergeOperation() == MergeOperation.SUBTRACT) {
      subtract(delta.getRecord());
    } else if (delta.getMergeOperation() == MergeOperation.UPDATE) {
      update(delta.getRecord());
    }
  }

  /**
   * Batch merge of {@link ZNRecordDelta}
   * @see #merge(ZNRecordDelta)
   * @param deltaList
   */
  void merge(List<ZNRecordDelta> deltaList) {
    for (ZNRecordDelta delta : deltaList) {
      merge(delta);
    }
  }

  @Override
  public boolean equals(Object obj) {
    if (!(obj instanceof ZNRecord)) {
      return false;
    }
    ZNRecord that = (ZNRecord) obj;
    if (this.getSimpleFields().size() != that.getSimpleFields().size()) {
      return false;
    }
    if (this.getMapFields().size() != that.getMapFields().size()) {
      return false;
    }
    if (this.getListFields().size() != that.getListFields().size()) {
      return false;
    }
    if (!this.getSimpleFields().equals(that.getSimpleFields())) {
      return false;
    }
    if (!this.getMapFields().equals(that.getMapFields())) {
      return false;
    }
    if (!this.getListFields().equals(that.getListFields())) {
      return false;
    }

    return true;
  }

  /**
   * Subtract value from this ZNRecord
   * Note: does not support subtract in each list in list fields or map in
   * mapFields
   * @param value
   */
  public void subtract(ZNRecord value) {
    for (String key : value.getSimpleFields().keySet()) {
      simpleFields.remove(key);
    }

    for (String key : value.getListFields().keySet()) {
      listFields.remove(key);
    }

    for (String key : value.getMapFields().keySet()) {
      Map<String, String> map = value.getMapField(key);
      if (map == null) {
        mapFields.remove(key);
      } else {
        Map<String, String> nestedMap = mapFields.get(key);
        if (nestedMap != null) {
          for (String mapKey : map.keySet()) {
            nestedMap.remove(mapKey);
          }
          if (nestedMap.size() == 0) {
            mapFields.remove(key);
          }
        }
      }
    }
  }

  /**
   * Get the version of this record
   * @return version number
   */
  @JsonIgnore(true)
  public int getVersion() {
    return _version;
  }

  /**
   * Set the version of this record
   * @param version
   */
  @JsonIgnore(true)
  public void setVersion(int version) {
    _version = version;
  }

  /**
   * Get the time that this record was created
   * @return UNIX timestamp
   */
  @JsonIgnore(true)
  public long getCreationTime() {
    return _creationTime;
  }

  /**
   * Set the time that this record was created
   * @param creationTime
   */
  @JsonIgnore(true)
  public void setCreationTime(long creationTime) {
    _creationTime = creationTime;
  }

  /**
   * Get the time that this record was last modified
   * @return UNIX timestamp
   */
  @JsonIgnore(true)
  public long getModifiedTime() {
    return _modifiedTime;
  }

  /**
   * Set the time that this record was last modified
   * @param modifiedTime
   */
  @JsonIgnore(true)
  public void setModifiedTime(long modifiedTime) {
    _modifiedTime = modifiedTime;
  }

  /**
   * Get the session Id of ephemeral node owner
   */
  @JsonIgnore(true)
  public long getEphemeralOwner() {
    return _ephemeralOwner;
  }

  /**
   * Set the session Id of ephemeral node owner
   * @param ephemeralOwner
   */
  @JsonIgnore(true)
  public void setEphemeralOwner(long ephemeralOwner) {
    _ephemeralOwner = ephemeralOwner;
  }
}
