/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.providers.slideram;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.AbstractLauncher;
import org.apache.slider.core.launch.JavaCommandLineBuilder;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.PlacementPolicy;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.slider.api.ResourceKeys.COMPONENT_INSTANCES;

/**
 * handles the setup of the Slider AM.
 * This keeps aspects of role, cluster validation and Clusterspec setup
 * out of the core slider client
 */
public class SliderAMClientProvider extends AbstractClientProvider
    implements SliderKeys {


  protected static final Logger log =
    LoggerFactory.getLogger(SliderAMClientProvider.class);
  protected static final String NAME = "SliderAM";
  public static final String INSTANCE_RESOURCE_BASE = PROVIDER_RESOURCE_BASE_ROOT +
                                                       "slideram/instance/";
  public static final String INTERNAL_JSON =
    INSTANCE_RESOURCE_BASE + "internal.json";
  public static final String APPCONF_JSON =
    INSTANCE_RESOURCE_BASE + "appconf.json";
  public static final String RESOURCES_JSON =
    INSTANCE_RESOURCE_BASE + "resources.json";

  public SliderAMClientProvider(Configuration conf) {
    super(conf);
  }

  /**
   * List of roles
   */
  public static final List<ProviderRole> ROLES =
    new ArrayList<ProviderRole>();

  public static final int KEY_AM = ROLE_AM_PRIORITY_INDEX;

  public static final ProviderRole APPMASTER =
      new ProviderRole(COMPONENT_AM, KEY_AM,
          PlacementPolicy.EXCLUDE_FROM_FLEXING);

  /**
   * Initialize role list
   */
  static {
    ROLES.add(APPMASTER);
  }

  @Override
  public String getName() {
    return NAME;
  }

  @Override
  public List<ProviderRole> getRoles() {
    return ROLES;
  }


  @Override //Client
  public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem,
                                                    String clustername,
                                                    Configuration configuration,
                                                    AggregateConf instanceDefinition,
                                                    Path clusterDirPath,
                                                    Path generatedConfDirPath,
                                                    boolean secure)
      throws SliderException, IOException {

    super.preflightValidateClusterConfiguration(sliderFileSystem, clustername, configuration, instanceDefinition, clusterDirPath, generatedConfDirPath, secure);
    //add a check for the directory being writeable by the current user
    String
      dataPath = instanceDefinition.getInternalOperations()
                                   .getGlobalOptions()
                                   .getMandatoryOption(
                                     InternalKeys.INTERNAL_DATA_DIR_PATH);

    Path path = new Path(dataPath);
    sliderFileSystem.verifyDirectoryWriteAccess(path);
    Path historyPath = new Path(clusterDirPath, SliderKeys.HISTORY_DIR_NAME);
    sliderFileSystem.verifyDirectoryWriteAccess(historyPath);
  }

  /**
   * Verify that an instance definition is considered valid by the provider
   * @param instanceDefinition instance definition
   * @throws SliderException if the configuration is not valid
   */
  public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws
      SliderException {

    super.validateInstanceDefinition(instanceDefinition, fs);
    
    // make sure there is no negative entry in the instance count
    Map<String, Map<String, String>> instanceMap =
        instanceDefinition.getResources().components;
    for (Map.Entry<String, Map<String, String>> entry : instanceMap.entrySet()) {
      MapOperations mapOperations = new MapOperations(entry);
      int instances = mapOperations.getOptionInt(COMPONENT_INSTANCES, 0);
      if (instances < 0) {
        throw new BadClusterStateException(
            "Component %s has negative instance count: %d",
            mapOperations.name,
            instances);
      }
    }
  }
  
  /**
   * The Slider AM sets up all the dependency JARs above slider.jar itself
   * {@inheritDoc}
   */
  public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem,
      Configuration serviceConf,
      AbstractLauncher launcher,
      AggregateConf instanceDescription,
      Path snapshotConfDirPath,
      Path generatedConfDirPath,
      Configuration clientConfExtras,
      String libdir,
      Path tempPath, boolean miniClusterTestRun)
    throws IOException, SliderException {

    Map<String, LocalResource> providerResources =
        new HashMap<String, LocalResource>();


    ProviderUtils.addProviderJar(providerResources,
        this,
        SLIDER_JAR,
        fileSystem,
        tempPath,
        libdir,
        miniClusterTestRun);

    String libDirProp =
        System.getProperty(SliderKeys.PROPERTY_LIB_DIR);
      log.info("Loading all dependencies for AM.");
      ProviderUtils.addAllDependencyJars(providerResources,
                                         fileSystem,
                                         tempPath,
                                         libdir,
                                         libDirProp);
    addKeytabResourceIfNecessary(fileSystem,
                                 launcher,
                                 instanceDescription,
                                 providerResources);

    //also pick up all env variables from a map
    launcher.copyEnvVars(
      instanceDescription.getInternalOperations().getOrAddComponent(
        SliderKeys.COMPONENT_AM));
  }

  /**
   * If the cluster is secure, and an HDFS installed keytab is available for AM
   * authentication, add this keytab as a local resource for the AM launch.
   *
   * @param fileSystem
   * @param launcher
   * @param instanceDescription
   * @param providerResources
   * @throws IOException
   */
  protected void addKeytabResourceIfNecessary(SliderFileSystem fileSystem,
                                              AbstractLauncher launcher,
                                              AggregateConf instanceDescription,
                                              Map<String, LocalResource> providerResources)
      throws IOException {
    if (UserGroupInformation.isSecurityEnabled()) {
      String keytabPathOnHost = instanceDescription.getAppConfOperations()
          .getComponent(SliderKeys.COMPONENT_AM).get(
              SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
      if (SliderUtils.isUnset(keytabPathOnHost)) {
        String amKeytabName = instanceDescription.getAppConfOperations()
            .getComponent(SliderKeys.COMPONENT_AM).get(
                SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
        String keytabDir = instanceDescription.getAppConfOperations()
            .getComponent(SliderKeys.COMPONENT_AM).get(
                SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
        Path keytabPath = fileSystem.buildKeytabPath(keytabDir, amKeytabName,
                                                     instanceDescription.getName());
        LocalResource keytabRes = fileSystem.createAmResource(keytabPath,
                                                LocalResourceType.FILE);

         providerResources.put(SliderKeys.KEYTAB_DIR + "/" +
                               amKeytabName, keytabRes);
      }
    }
    launcher.addLocalResources(providerResources);
  }

  /**
   * Update the AM resource with any local needs
   * @param capability capability to update
   */
  public void prepareAMResourceRequirements(MapOperations sliderAM,
                                            Resource capability) {
    capability.setMemory(sliderAM.getOptionInt(
      ResourceKeys.YARN_MEMORY,
      capability.getMemory()));
    capability.setVirtualCores(
        sliderAM.getOptionInt(ResourceKeys.YARN_CORES, capability.getVirtualCores()));
  }
  
  /**
   * Extract any JVM options from the cluster specification and
   * add them to the command line
   */
  public void addJVMOptions(AggregateConf aggregateConf,
                            JavaCommandLineBuilder cmdLine)
      throws BadConfigException {

    MapOperations sliderAM =
        aggregateConf.getAppConfOperations().getMandatoryComponent(
        SliderKeys.COMPONENT_AM);
    cmdLine.forceIPv4().headless();
    String heap = sliderAM.getOption(RoleKeys.JVM_HEAP,
                                   DEFAULT_JVM_HEAP);
    cmdLine.setJVMHeap(heap);
    String jvmopts = sliderAM.getOption(RoleKeys.JVM_OPTS, "");
    if (SliderUtils.isSet(jvmopts)) {
      cmdLine.add(jvmopts);
    }
  }


  @Override
  public void prepareInstanceConfiguration(AggregateConf aggregateConf)
      throws SliderException, IOException {
    mergeTemplates(aggregateConf,
        INTERNAL_JSON, RESOURCES_JSON, APPCONF_JSON
                  );
  }
}
