package org.apache.helix.util;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyType;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;

public final class HelixUtil {
  static private Logger LOG = LoggerFactory.getLogger(HelixUtil.class);

  private HelixUtil() {
  }

  public static String getInstanceNameFromPath(String path) {
    // path structure
    // /<cluster_name>/instances/<instance_name>/[currentStates/messages]
    if (path.contains("/" + PropertyType.INSTANCES + "/")) {
      String[] split = path.split("\\/");
      if (split.length > 3) {
        return split[3];
      }
    }
    return null;
  }

  /**
   * get the parent-path of given path
   * return "/" string if path = "/xxx", null if path = "/"
   * @param path
   * @return
   */
  public static String getZkParentPath(String path) {
    if (path.equals("/")) {
      return null;
    }

    int idx = path.lastIndexOf('/');
    return idx == 0 ? "/" : path.substring(0, idx);
  }

  /**
   * get the last part of the zk-path
   * @param path
   * @return
   */
  public static String getZkName(String path) {
    return path.substring(path.lastIndexOf('/') + 1);
  }

  public static String serializeByComma(List<String> objects) {
    return Joiner.on(",").join(objects);
  }

  public static List<String> deserializeByComma(String object) {
    if (object.length() == 0) {
      return Collections.EMPTY_LIST;
    }
    return Arrays.asList(object.split(","));
  }

  /**
   * parse a csv-formated key-value pairs
   * @param keyValuePairs : csv-formatted key-value pairs. e.g. k1=v1,k2=v2,...
   * @return
   */
  public static Map<String, String> parseCsvFormatedKeyValuePairs(String keyValuePairs) {
    String[] pairs = keyValuePairs.split("[\\s,]");
    Map<String, String> keyValueMap = new TreeMap<String, String>();
    for (String pair : pairs) {
      int idx = pair.indexOf('=');
      if (idx == -1) {
        LOG.error("Invalid key-value pair: " + pair + ". Igonore it.");
        continue;
      }

      String key = pair.substring(0, idx);
      String value = pair.substring(idx + 1);
      keyValueMap.put(key, value);
    }
    return keyValueMap;
  }

  /**
   * Attempts to load the class and delegates to TCCL if class is not found.
   * Note: The approach is used as a last resort for environments like OSGi.
   * @param className
   * @return
   * @throws ClassNotFoundException
   */
  public static <T> Class<?> loadClass(Class<T> clazz, String className)
      throws ClassNotFoundException {
    try {
      return clazz.getClassLoader().loadClass(className);
    } catch (ClassNotFoundException ex) {
      if (Thread.currentThread().getContextClassLoader() != null) {
        return Thread.currentThread().getContextClassLoader().loadClass(className);
      } else {
        throw ex;
      }
    }
  }

  /**
   * This method provides the ideal state mapping with corresponding rebalance strategy
   * @param clusterConfig         The cluster config
   * @param instanceConfigs       List of instance configs
   * @param liveInstances         List of live instance names
   * @param idealState            The ideal state of current resource. If input is null, will be
   *                              treated as newly created resource.
   * @param partitions            The list of partition names
   * @param strategyClassName          The rebalance strategy. e.g. AutoRebalanceStrategy
   * @return A map of ideal state assignment as partition -> instance -> state
   */
  public static Map<String, Map<String, String>> getIdealAssignmentForFullAuto(
      ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
      IdealState idealState, List<String> partitions, String strategyClassName)
      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
    List<String> allNodes = new ArrayList<>();
    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
    for (InstanceConfig instanceConfig : instanceConfigs) {
      allNodes.add(instanceConfig.getInstanceName());
      instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig);
    }
    ResourceControllerDataProvider cache = new ResourceControllerDataProvider();
    cache.setClusterConfig(clusterConfig);
    cache.setInstanceConfigMap(instanceConfigMap);

    StateModelDefinition stateModelDefinition =
        BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
            .getStateModelDefinition();

    RebalanceStrategy strategy =
        RebalanceStrategy.class.cast(loadClass(HelixUtil.class, strategyClassName).newInstance());

    strategy.init(idealState.getResourceName(), partitions, stateModelDefinition
            .getStateCountMap(liveInstances.size(), Integer.parseInt(idealState.getReplicas())),
        idealState.getMaxPartitionsPerInstance());

    Map<String, List<String>> preferenceLists = strategy
        .computePartitionAssignment(allNodes, liveInstances,
            new HashMap<String, Map<String, String>>(), cache).getListFields();

    Map<String, Map<String, String>> idealStateMapping = new HashMap<>();
    Set<String> liveInstanceSet = new HashSet<>(liveInstances);
    for (String partitionName : preferenceLists.keySet()) {
      idealStateMapping.put(partitionName,
          computeIdealMapping(preferenceLists.get(partitionName), stateModelDefinition,
              liveInstanceSet));
    }
    return idealStateMapping;
  }

  /**
   * compute the ideal mapping for resource in Full-Auto and Semi-Auto based on its preference list
   */
  public static Map<String, String> computeIdealMapping(List<String> preferenceList,
      StateModelDefinition stateModelDef, Set<String> liveAndEnabled) {
    Map<String, String> idealStateMap = new HashMap<String, String>();

    if (preferenceList == null) {
      return idealStateMap;
    }

    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
    Set<String> assigned = new HashSet<String>();

    for (String state : statesPriorityList) {
      int stateCount = AbstractRebalancer.getStateCount(state, stateModelDef, liveAndEnabled.size(),
          preferenceList.size());
      for (String instance : preferenceList) {
        if (stateCount <= 0) {
          break;
        }
        if (!assigned.contains(instance)) {
          idealStateMap.put(instance, state);
          assigned.add(instance);
          stateCount--;
        }
      }
    }

    return idealStateMap;
  }

  /**
   * Remove the given message from ZK using the given accessor. This function will
   * not throw exception
   * @param accessor HelixDataAccessor
   * @param msg message to remove
   * @param instanceName name of the instance on which the message sits
   * @return true if success else false
   */
  public static boolean removeMessageFromZK(HelixDataAccessor accessor, Message msg,
      String instanceName) {
    try {
      return accessor.removeProperty(msg.getKey(accessor.keyBuilder(), instanceName));
    } catch (Exception e) {
      LOG.error("Caught exception while removing message {}.", msg, e);
    }
    return false;
  }

  /**
   * Get the value of system property
   * @param propertyKey
   * @param propertyDefaultValue
   * @return
   */
  public static int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);

    try {
      int value = Integer.parseInt(valueString);
      if (value > 0) {
        return value;
      }
    } catch (NumberFormatException e) {
      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
          + ", using default value: " + propertyDefaultValue);
    }

    return propertyDefaultValue;
  }

  /**
   * Get the value of system property
   * @param propertyKey
   * @param propertyDefaultValue
   * @return
   */
  public static long getSystemPropertyAsLong(String propertyKey, long propertyDefaultValue) {
    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);

    try {
      long value = Long.parseLong(valueString);
      if (value > 0) {
        return value;
      }
    } catch (NumberFormatException e) {
      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
          + ", using default value: " + propertyDefaultValue);
    }

    return propertyDefaultValue;
  }

}
