/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.yarn.core;

import java.io.PrintStream;
import java.util.List;
import java.util.Map;

import org.apache.drill.yarn.appMaster.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigList;
import com.typesafe.config.ConfigValue;

public class ClusterDef {

  private static Logger logger = LoggerFactory.getLogger(ClusterDef.class);

  // The following keys are relative to the cluster group definition

  public static final String GROUP_NAME = "name";
  public static final String GROUP_TYPE = "type";
  public static final String GROUP_SIZE = "count";

  // For the labeled pool

  public static final String DRILLBIT_LABEL = "drillbit-label-expr";
  public static final String AM_LABEL = "am-label-expr";

  /**
   * Defined cluster tier types. The value of the type appears as the value of
   * the {@link $CLUSTER_TYPE} parameter in the config file.
   */

  public enum GroupType {
    BASIC,
    LABELED;

    public static GroupType toEnum(String value) {
      return GroupType.valueOf( value.toUpperCase() );
    }

    public String toValue() {
      return name().toLowerCase();
    }
  }

  public static class ClusterGroup {
    private final String name;
    private final int count;
    private final GroupType type;

    public ClusterGroup( Map<String, Object> group, int index, GroupType type ) {
      this.type = type;

      // Config system has already parsed the value. We insist that the value,
      // when parsed, was interpreted as an integer. That is, the value had
      // to be, say 10. Not "10", not 10.0, but just a plain integer.

      try {
        count = (Integer) group.get(GROUP_SIZE);
      } catch (ClassCastException e) {
        throw new IllegalArgumentException(
            "Expected an integer for " + GROUP_SIZE + " for tier " + index);
      }
      Object nameValue = group.get(GROUP_NAME);
      String theName = null;
      if (nameValue != null) {
        theName = nameValue.toString();
      }
      if (DoYUtil.isBlank(theName)) {
        theName = "tier-" + Integer.toString(index);
      }
      name = theName;
    }


    public String getName( ) { return name; }
    public int getCount( ) { return count; }
    public GroupType getType( ) { return type; }

    public void getPairs(int index, List<NameValuePair> pairs) {
      String key = DrillOnYarnConfig.append(DrillOnYarnConfig.CLUSTERS,
          Integer.toString(index));
      addPairs(pairs, key);
    }

    protected void addPairs(List<NameValuePair> pairs, String key) {
      pairs.add(
          new NameValuePair(DrillOnYarnConfig.append(key, GROUP_NAME), name));
      pairs.add(
          new NameValuePair(DrillOnYarnConfig.append(key, GROUP_TYPE), type));
      pairs.add(
          new NameValuePair(DrillOnYarnConfig.append(key, GROUP_SIZE), count));
    }

    public void dump(String prefix, PrintStream out) {
      out.print(prefix);
      out.print("name = ");
      out.println(name);
      out.print(prefix);
      out.print("type = ");
      out.println(type.toValue());
      out.print(prefix);
      out.print("count = ");
      out.println(count);
    }

    public void modifyTaskSpec(TaskSpec taskSpec) {
    }
  }

  public static class BasicGroup extends ClusterGroup {

    public BasicGroup(Map<String, Object> pool, int index) {
      super(pool, index, GroupType.BASIC);
    }

  }

  public static class LabeledGroup extends ClusterGroup {

    private final String drillbitLabelExpr;

    public LabeledGroup(Map<String, Object> pool, int index) {
      super(pool, index, GroupType.LABELED);
      drillbitLabelExpr = (String) pool.get(DRILLBIT_LABEL);
      if (drillbitLabelExpr == null) {
        logger.warn("Labeled pool is missing the drillbit label expression ("
            + DRILLBIT_LABEL + "), will treat pool as basic.");
      }
    }

    public String getLabelExpr( ) { return drillbitLabelExpr; }

    @Override
    public void dump(String prefix, PrintStream out) {
      out.print(prefix);
      out.print("Drillbit label expr = ");
      out.println((drillbitLabelExpr == null) ? "<none>" : drillbitLabelExpr);
    }

    @Override
    protected void addPairs(List<NameValuePair> pairs, String key) {
      super.addPairs(pairs, key);
      pairs.add(new NameValuePair(DrillOnYarnConfig.append(key, DRILLBIT_LABEL),
          drillbitLabelExpr));
    }

    @Override
    public void modifyTaskSpec(TaskSpec taskSpec) {
      taskSpec.containerSpec.nodeLabelExpr = drillbitLabelExpr;
    }
  }

  /**
   * Deserialize a node tier from the configuration file.
   *
   * @param n
   * @return
   */

  public static ClusterGroup getCluster(Config config, int n) {
    int index = n + 1;
    ConfigList tiers = config.getList(DrillOnYarnConfig.CLUSTERS);
    ConfigValue value = tiers.get(n);
    if ( value == null ) {
      throw new IllegalArgumentException( "If cluster group is provided, it cannot be null: group " + index );
    }
    @SuppressWarnings("unchecked")
    Map<String, Object> tier = (Map<String, Object>) value.unwrapped();
    String type;
    try {
      type = tier.get(GROUP_TYPE).toString();
    } catch (NullPointerException e) {
      throw new IllegalArgumentException(
          "Pool type is required for cluster group " + index);
    }
    GroupType groupType = GroupType.toEnum(type);
    if (groupType == null) {
      throw new IllegalArgumentException(
          "Undefined type for cluster group " + index + ": " + type);
    }
    ClusterGroup tierDef;
    switch (groupType) {
    case BASIC:
      tierDef = new BasicGroup( tier, index );
      break;
    case LABELED:
      tierDef = new LabeledGroup( tier, index );
      break;
    default:
      assert false;
      throw new IllegalStateException(
          "Undefined cluster group type: " + groupType);
    }
    return tierDef;
  }
}
