package org.apache.helix.manager.zk;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.GroupCommit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordAssembler;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordUpdater;
import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.data.Stat;


public class ZKHelixDataAccessor implements HelixDataAccessor {
  private static Logger LOG = LoggerFactory.getLogger(ZKHelixDataAccessor.class);
  private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
  final InstanceType _instanceType;
  private final String _clusterName;
  private final Builder _propertyKeyBuilder;
  private final GroupCommit _groupCommit = new GroupCommit();

  public ZKHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) {
    this(clusterName, null, baseDataAccessor);
  }

  public ZKHelixDataAccessor(String clusterName, InstanceType instanceType,
      BaseDataAccessor<ZNRecord> baseDataAccessor) {
    _clusterName = clusterName;
    _instanceType = instanceType;
    _baseDataAccessor = baseDataAccessor;
    _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
  }

  /* Copy constructor */
  public ZKHelixDataAccessor(ZKHelixDataAccessor dataAccessor) {
    _clusterName = dataAccessor._clusterName;
    _instanceType = dataAccessor._instanceType;
    _baseDataAccessor = dataAccessor._baseDataAccessor;
    _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
  }

  @Override
  public boolean createStateModelDef(StateModelDefinition stateModelDef) {
    String path = PropertyPathBuilder.stateModelDef(_clusterName, stateModelDef.getId());
    HelixProperty property =
        getProperty(new PropertyKey.Builder(_clusterName).stateModelDef(stateModelDef.getId()));

    // Set new StateModelDefinition if it is different from old one.
    if (property != null) {
      // StateModelDefinition need to be updated
      if (!new StateModelDefinition(property.getRecord()).equals(stateModelDef)) {
        return stateModelDef.isValid() && _baseDataAccessor
            .set(path, stateModelDef.getRecord(), AccessOption.PERSISTENT);
      }
    } else {
      // StateModeDefinition does not exist
      return stateModelDef.isValid() && _baseDataAccessor
          .create(path, stateModelDef.getRecord(), AccessOption.PERSISTENT);
    }
    // StateModelDefinition exists but not need to be updated
    return true;
  }

  @Override
  public boolean createControllerMessage(Message message) {
    return _baseDataAccessor.create(PropertyPathBuilder.controllerMessage(_clusterName,
        message.getMsgId()),
        message.getRecord(), AccessOption.PERSISTENT);
  }

  @Override
  public boolean createControllerLeader(LiveInstance leader) {
    return _baseDataAccessor.create(PropertyPathBuilder.controllerLeader(_clusterName), leader.getRecord(),
        AccessOption.EPHEMERAL);
  }

  @Override
  public boolean createPause(PauseSignal pauseSignal) {
    return _baseDataAccessor.create(
        PropertyPathBuilder.pause(_clusterName), pauseSignal.getRecord(), AccessOption.PERSISTENT);
  }

  @Override
  public boolean createMaintenance(MaintenanceSignal maintenanceSignal) {
    return _baseDataAccessor.set(PropertyPathBuilder.maintenance(_clusterName),
        maintenanceSignal.getRecord(), AccessOption.PERSISTENT);
  }

  @Override
  public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value) {
    PropertyType type = key.getType();
    if (!value.isValid()) {
      throw new HelixMetaDataAccessException("The ZNRecord for " + type + " is not valid.");
    }

    String path = key.getPath();
    int options = constructOptions(type);

    boolean success = false;
    switch (type) {
    case IDEALSTATES:
    case EXTERNALVIEW:
      // check if bucketized
      if (value.getBucketSize() > 0) {
        // set parent node
        ZNRecord metaRecord = new ZNRecord(value.getId());
        metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
        success = _baseDataAccessor.set(path, metaRecord, options);
        if (success) {
          ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());

          Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
          List<String> paths = new ArrayList<String>();
          List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
          for (String bucketName : map.keySet()) {
            paths.add(path + "/" + bucketName);
            bucketizedRecords.add(map.get(bucketName));
          }

          // TODO: set success accordingly
          _baseDataAccessor.setChildren(paths, bucketizedRecords, options);
        }
      } else {
        success = _baseDataAccessor.set(path, value.getRecord(), options);
      }
      break;
    default:
      success = _baseDataAccessor.set(path, value.getRecord(), options);
      break;
    }
    return success;
  }

  @Override
  public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value) {
    return updateProperty(key, new ZNRecordUpdater(value.getRecord()), value);
  }

  @Override
  public <T extends HelixProperty> boolean updateProperty(PropertyKey key, DataUpdater<ZNRecord> updater, T value) {
    PropertyType type = key.getType();
    String path = key.getPath();
    int options = constructOptions(type);

    boolean success = false;
    switch (type) {
    case CURRENTSTATES:
      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true);
      break;
    case STATUSUPDATES:
      if (LOG.isTraceEnabled()) {
        LOG.trace("Update status. path: " + key.getPath() + ", record: " + value.getRecord());
      }
      break;
    default:
      success = _baseDataAccessor.update(path, updater, options);
      break;
    }
    return success;
  }

  @Deprecated
  @Override
  public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) {
    return getProperty(keys, false);
  }

  @Override
  public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys,
      boolean throwException) throws HelixMetaDataAccessException {
    if (keys == null || keys.size() == 0) {
      return Collections.emptyList();
    }

    List<T> childValues = new ArrayList<T>();

    // read all records
    List<String> paths = new ArrayList<>();
    List<Stat> stats = new ArrayList<>();
    for (PropertyKey key : keys) {
      paths.add(key.getPath());
      stats.add(new Stat());
    }
    List<ZNRecord> children = _baseDataAccessor.get(paths, stats, 0, throwException);

    // check if bucketized
    for (int i = 0; i < keys.size(); i++) {
      PropertyKey key = keys.get(i);
      ZNRecord record = children.get(i);
      Stat stat = stats.get(i);

      PropertyType type = key.getType();
      String path = key.getPath();
      int options = constructOptions(type);
      if (record != null) {
        record.setCreationTime(stat.getCtime());
        record.setModifiedTime(stat.getMtime());
        record.setVersion(stat.getVersion());
        record.setEphemeralOwner(stat.getEphemeralOwner());
      }

      switch (type) {
      case CURRENTSTATES:
      case IDEALSTATES:
      case EXTERNALVIEW:
        // check if bucketized
        if (record != null) {
          HelixProperty property = new HelixProperty(record);

          int bucketSize = property.getBucketSize();
          if (bucketSize > 0) {
            // @see HELIX-574
            // clean up list and map fields in case we write to parent node by mistake
            property.getRecord().getMapFields().clear();
            property.getRecord().getListFields().clear();

            List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options, 1, 0);
            ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);

            // merge with parent node value
            if (assembledRecord != null) {
              record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
              record.getListFields().putAll(assembledRecord.getListFields());
              record.getMapFields().putAll(assembledRecord.getMapFields());
            }
          }
        }
        break;
      default:
        break;
      }

      @SuppressWarnings("unchecked")
      T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
      childValues.add(t);
    }

    return childValues;
  }

  @Override
  public <T extends HelixProperty> T getProperty(PropertyKey key) {
    PropertyType type = key.getType();
    String path = key.getPath();
    int options = constructOptions(type);
    ZNRecord record = null;
    try {
      Stat stat = new Stat();
      record = _baseDataAccessor.get(path, stat, options);
      if (record != null) {
        record.setCreationTime(stat.getCtime());
        record.setModifiedTime(stat.getMtime());
        record.setVersion(stat.getVersion());
        record.setEphemeralOwner(stat.getEphemeralOwner());
      }
    } catch (ZkNoNodeException e) {
      // OK
    }

    switch (type) {
    case CURRENTSTATES:
    case IDEALSTATES:
    case EXTERNALVIEW:
      // check if bucketized
      if (record != null) {
        HelixProperty property = new HelixProperty(record);

        int bucketSize = property.getBucketSize();
        if (bucketSize > 0) {
          // @see HELIX-574
          // clean up list and map fields in case we write to parent node by mistake
          property.getRecord().getMapFields().clear();
          property.getRecord().getListFields().clear();

          List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options);
          ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);

          // merge with parent node value
          if (assembledRecord != null) {
            record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
            record.getListFields().putAll(assembledRecord.getListFields());
            record.getMapFields().putAll(assembledRecord.getMapFields());
          }
        }
      }
      break;
    default:
      break;
    }

    @SuppressWarnings("unchecked")
    T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
    return t;
  }

  @Override
  public HelixProperty.Stat getPropertyStat(PropertyKey key) {
    PropertyType type = key.getType();
    String path = key.getPath();
    int options = constructOptions(type);
    try {
      Stat stat = _baseDataAccessor.getStat(path, options);
      if (stat != null) {
        return new HelixProperty.Stat(stat.getVersion(), stat.getCtime(), stat.getMtime(), stat.getEphemeralOwner());
      }
    } catch (ZkNoNodeException e) {

    }

    return null;
  }

  @Override
  public List<HelixProperty.Stat> getPropertyStats(List<PropertyKey> keys) {
    if (keys == null || keys.size() == 0) {
      return Collections.emptyList();
    }

    List<HelixProperty.Stat> propertyStats = new ArrayList<>(keys.size());
    List<String> paths = new ArrayList<>(keys.size());
    for (PropertyKey key : keys) {
      paths.add(key.getPath());
    }
    Stat[] zkStats = _baseDataAccessor.getStats(paths, 0);

    for (int i = 0; i < keys.size(); i++) {
      Stat zkStat = zkStats[i];
      HelixProperty.Stat propertyStat = null;
      if (zkStat != null) {
        propertyStat =
            new HelixProperty.Stat(zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(), zkStat.getEphemeralOwner());
      }
      propertyStats.add(propertyStat);
    }

    return propertyStats;
  }

  @Override
  public boolean removeProperty(PropertyKey key) {
    PropertyType type = key.getType();
    String path = key.getPath();
    int options = constructOptions(type);

    return _baseDataAccessor.remove(path, options);
  }

  @Override
  public List<String> getChildNames(PropertyKey key) {
    PropertyType type = key.getType();
    String parentPath = key.getPath();
    int options = constructOptions(type);
    List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
    if (childNames == null) {
      childNames = Collections.emptyList();
    }
    return childNames;
  }

  @Deprecated
  @Override
  public <T extends HelixProperty> List<T> getChildValues(PropertyKey key) {
    return getChildValues(key, false);
  }

  @Override
  public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, boolean throwException) {
    PropertyType type = key.getType();
    String parentPath = key.getPath();
    int options = constructOptions(type);
    List<T> childValues = new ArrayList<T>();
    List<ZNRecord> children;
    if (throwException) {
      children = _baseDataAccessor.getChildren(parentPath, null, options, 1, 0);
    } else {
      children = _baseDataAccessor.getChildren(parentPath, null, options);
    }
    if (children != null) {
      for (ZNRecord record : children) {
        switch (type) {
        case CURRENTSTATES:
        case IDEALSTATES:
        case EXTERNALVIEW:
          if (record != null) {
            HelixProperty property = new HelixProperty(record);

            int bucketSize = property.getBucketSize();
            if (bucketSize > 0) {
              // TODO: fix this if record.id != pathName
              String childPath = parentPath + "/" + record.getId();
              List<ZNRecord> childRecords;
              if (throwException) {
                childRecords = _baseDataAccessor.getChildren(childPath, null, options, 1, 0);
              } else {
                childRecords = _baseDataAccessor.getChildren(childPath, null, options);
              }
              ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);

              // merge with parent node value
              if (assembledRecord != null) {
                record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
                record.getListFields().putAll(assembledRecord.getListFields());
                record.getMapFields().putAll(assembledRecord.getMapFields());
              }
            }
          }

          break;
        default:
          break;
        }

        if (record != null) {
          @SuppressWarnings("unchecked")
          T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
          childValues.add(t);
        }
      }
    }
    return childValues;
  }

  @Deprecated
  @Override
  public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) {
    return getChildValuesMap(key, false);
  }

  @Override
  public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key,
      boolean throwException) {
    PropertyType type = key.getType();
    String parentPath = key.getPath();
    int options = constructOptions(type);
    List<T> children = getChildValues(key, throwException);
    Map<String, T> childValuesMap = new HashMap<String, T>();
    for (T t : children) {
      childValuesMap.put(t.getRecord().getId(), t);
    }
    return childValuesMap;
  }

  @Override
  public Builder keyBuilder() {
    return _propertyKeyBuilder;
  }

  private int constructOptions(PropertyType type) {
    int options = 0;
    if (type.isPersistent()) {
      options = options | AccessOption.PERSISTENT;
    } else {
      options = options | AccessOption.EPHEMERAL;
    }

    return options;
  }

  @Override
  public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys, List<T> children) {
    // TODO: add validation
    int options = -1;
    List<String> paths = new ArrayList<String>();
    List<ZNRecord> records = new ArrayList<ZNRecord>();
    for (int i = 0; i < keys.size(); i++) {
      PropertyKey key = keys.get(i);
      PropertyType type = key.getType();
      String path = key.getPath();
      paths.add(path);
      HelixProperty value = children.get(i);
      records.add(value.getRecord());
      options = constructOptions(type);
    }
    return _baseDataAccessor.createChildren(paths, records, options);
  }

  @Override
  public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys, List<T> children) {
    int options = -1;
    List<String> paths = new ArrayList<String>();
    List<ZNRecord> records = new ArrayList<ZNRecord>();

    List<List<String>> bucketizedPaths =
        new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
    List<List<ZNRecord>> bucketizedRecords =
        new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(), null));

    for (int i = 0; i < keys.size(); i++) {
      PropertyKey key = keys.get(i);
      PropertyType type = key.getType();
      String path = key.getPath();
      paths.add(path);
      options = constructOptions(type);

      HelixProperty value = children.get(i);

      switch (type) {
      case EXTERNALVIEW:
        if (value.getBucketSize() == 0) {
          records.add(value.getRecord());
        } else {

          ZNRecord metaRecord = new ZNRecord(value.getId());
          metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
          records.add(metaRecord);

          ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());

          Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
          List<String> childBucketizedPaths = new ArrayList<String>();
          List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
          for (String bucketName : map.keySet()) {
            childBucketizedPaths.add(path + "/" + bucketName);
            childBucketizedRecords.add(map.get(bucketName));
          }
          bucketizedPaths.set(i, childBucketizedPaths);
          bucketizedRecords.set(i, childBucketizedRecords);
        }
        break;
      case STATEMODELDEFS:
        if (value.isValid()) {
          records.add(value.getRecord());
        }
        break;
      default:
        records.add(value.getRecord());
        break;
      }
    }

    // set non-bucketized nodes or parent nodes of bucketized nodes
    boolean success[] = _baseDataAccessor.setChildren(paths, records, options);

    // set bucketized nodes
    List<String> allBucketizedPaths = new ArrayList<String>();
    List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();

    for (int i = 0; i < keys.size(); i++) {
      if (success[i] && bucketizedPaths.get(i) != null) {
        allBucketizedPaths.addAll(bucketizedPaths.get(i));
        allBucketizedRecords.addAll(bucketizedRecords.get(i));
      }
    }

    // TODO: set success accordingly
    _baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);

    return success;
  }

  @Override
  public BaseDataAccessor<ZNRecord> getBaseDataAccessor() {
    return _baseDataAccessor;
  }

  @Override
  public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
      List<DataUpdater<ZNRecord>> updaters, int options) {
    return _baseDataAccessor.updateChildren(paths, updaters, options);
  }
}
