/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.providers.accumulo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.launch.AbstractLauncher;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

/**
 * Client-side accumulo provider
 */
public class AccumuloClientProvider extends AbstractClientProvider implements
                                                       AccumuloKeys {

  protected static final Logger log =
    LoggerFactory.getLogger(AccumuloClientProvider.class);
  private static final ProviderUtils providerUtils = new ProviderUtils(log);
  private static final String INSTANCE_RESOURCE_BASE =
      "/org/apache/slider/providers/accumulo/instance/";


  protected AccumuloClientProvider(Configuration conf) {
    super(conf);
  }

  public static List<ProviderRole> getProviderRoles() {
    return AccumuloRoles.ROLES;
  }

  @Override
  public String getName() {
    return PROVIDER_ACCUMULO;
  }

  @Override
  public List<ProviderRole> getRoles() {
    return AccumuloRoles.ROLES;
  }


  @Override
  public void prepareInstanceConfiguration(AggregateConf aggregateConf) throws
      SliderException,
                                                                        IOException {
    String resourceTemplate = INSTANCE_RESOURCE_BASE + "resources.json";
    String appConfTemplate = INSTANCE_RESOURCE_BASE + "appconf.json";
    mergeTemplates(aggregateConf, null, resourceTemplate, appConfTemplate);
    aggregateConf.getAppConfOperations().set(OPTION_ACCUMULO_PASSWORD,
                                             createAccumuloPassword());
  }


  public String createAccumuloPassword() {
    return UUID.randomUUID().toString();
  }

  public void setDatabasePath(Map<String, String> sitexml, String dataPath) {
    Path path = new Path(dataPath);
    URI parentUri = path.toUri();
    String authority = parentUri.getAuthority();
    String fspath =
      parentUri.getScheme() + "://" + (authority == null ? "" : authority) + "/";
    sitexml.put(AccumuloConfigFileOptions.INSTANCE_DFS_URI, fspath);
    sitexml.put(AccumuloConfigFileOptions.INSTANCE_DFS_DIR,
                parentUri.getPath());
  }

  /**
   * Build the accumulo-site.xml file
   * This the configuration used by Accumulo directly
   * @param instanceDescription this is the cluster specification used to define this
   * @return a map of the dynamic bindings for this Slider instance
   */
  public Map<String, String> buildSiteConfFromInstance(
    AggregateConf instanceDescription)
    throws BadConfigException {


    ConfTreeOperations appconf =
      instanceDescription.getAppConfOperations();

    MapOperations globalAppOptions = appconf.getGlobalOptions();
    MapOperations globalInstanceOptions =
      instanceDescription.getInternalOperations().getGlobalOptions();


    Map<String, String> sitexml = new HashMap<String, String>();

    providerUtils.propagateSiteOptions(globalAppOptions, sitexml);

    propagateClientFSBinding(sitexml);
    setDatabasePath(sitexml,
                    globalInstanceOptions.getMandatoryOption(InternalKeys.INTERNAL_DATA_DIR_PATH));


    String quorum =
      globalAppOptions.getMandatoryOption(OptionKeys.ZOOKEEPER_QUORUM);
    sitexml.put(AccumuloConfigFileOptions.ZOOKEEPER_HOST, quorum);

    return sitexml;

  }


  public void propagateClientFSBinding(Map<String, String> sitexml) throws
                                                                    BadConfigException {
    String fsDefaultName =
      getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY);
    if (fsDefaultName == null) {
      throw new BadConfigException("Key not found in conf: {}",
                                   CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
    }
    sitexml.put(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsDefaultName);
    sitexml.put(SliderXmlConfKeys.FS_DEFAULT_NAME_CLASSIC, fsDefaultName);
  }

  @Override 
  public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem,
                                                    String clustername,
                                                    Configuration configuration,
                                                    AggregateConf instanceDefinition,
                                                    Path clusterDirPath,
                                                    Path generatedConfDirPath,
                                                    boolean secure) throws
      SliderException,
                                                                    IOException {
    super.preflightValidateClusterConfiguration(sliderFileSystem, clustername,
                                                configuration,
                                                instanceDefinition,
                                                clusterDirPath,
                                                generatedConfDirPath, secure);

  }

  /**
   * Add Accumulo and its dependencies (only) to the job configuration.
   * <p>
   * This is intended as a low-level API, facilitating code reuse between this
   * class and its mapred counterpart. It also of use to external tools that
   * need to build a MapReduce job that interacts with Accumulo but want
   * fine-grained control over the jars shipped to the cluster.
   * </p>
   *
   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
   * @see <a href="https://issues.apache.org/;jira/browse/PIG-3285">PIG-3285</a>
   *
   * @param providerResources provider resources to add resource to
   * @param sliderFileSystem filesystem
   * @param libdir relative directory to place resources
   * @param tempPath path in the cluster FS for temp files
   * @throws IOException IO problems
   * @throws SliderException Slider-specific issues
   */
  private void addAccumuloDependencyJars(Map<String, LocalResource> providerResources,
                                            SliderFileSystem sliderFileSystem,
                                            String libdir,
                                            Path tempPath) throws
                                                           IOException,
      SliderException {
    String[] jars =
      {
      /*  "zookeeper.jar",*/
      };
    Class<?>[] classes = {
      //zk
/*      org.apache.zookeeper.ClientCnxn.class*/
    };
    
    ProviderUtils.addDependencyJars(providerResources, sliderFileSystem, tempPath,
                                    libdir, jars,
                                    classes);
  }

  @Override
  public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem,
      Configuration serviceConf,
      AbstractLauncher launcher,
      AggregateConf instanceDescription,
      Path snapshotConfDirPath,
      Path generatedConfDirPath,
      Configuration clientConfExtras,
      String libdir,
      Path tempPath,
      boolean miniClusterTestRun) throws IOException, SliderException {
    //load in the template site config
    log.debug("Loading template configuration from {}", snapshotConfDirPath);
      Configuration siteConf = ConfigHelper.loadTemplateConfiguration(
        serviceConf,
          snapshotConfDirPath,
        AccumuloKeys.SITE_XML,
        AccumuloKeys.SITE_XML_RESOURCE);



    Map<String, LocalResource> providerResources;
    providerResources = fileSystem.submitDirectory(generatedConfDirPath,
                                                   SliderKeys.PROPAGATED_CONF_DIR_NAME);


    ProviderUtils.addProviderJar(providerResources,
        this,
        "slider-accumulo-provider.jar",
        fileSystem,
        tempPath,
        libdir,
        miniClusterTestRun);


    addAccumuloDependencyJars(providerResources, fileSystem, libdir, tempPath);
    launcher.addLocalResources(providerResources);
    
    //construct the cluster configuration values

    ConfTreeOperations appconf =
      instanceDescription.getAppConfOperations();


    Map<String, String> clusterConfMap = buildSiteConfFromInstance(
      instanceDescription);

    //merge them
    ConfigHelper.addConfigMap(siteConf,
                              clusterConfMap.entrySet(),
                              "Accumulo Provider");

    //now, if there is an extra client conf, merge it in too
    if (clientConfExtras != null) {
      ConfigHelper.mergeConfigurations(siteConf, clientConfExtras,
                                       "Slider Client", true);
    }

    if (log.isDebugEnabled()) {
      log.debug("Merged Configuration");
      ConfigHelper.dumpConf(siteConf);
    }

    Path sitePath = ConfigHelper.saveConfig(serviceConf,
                                            siteConf,
                                            generatedConfDirPath,
                                            AccumuloKeys.SITE_XML);

    log.debug("Saving the config to {}", sitePath);
    launcher.submitDirectory(generatedConfDirPath,
                             SliderKeys.PROPAGATED_CONF_DIR_NAME);

  }

  private static Set<String> knownRoleNames = new HashSet<String>();
  static {
    knownRoleNames.add(SliderKeys.COMPONENT_AM);
    for (ProviderRole role : AccumuloRoles.ROLES) {
      knownRoleNames.add(role.name);
    }
  }

  @Override
  public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws
      SliderException {
    super.validateInstanceDefinition(instanceDefinition, fs);

    ConfTreeOperations resources =
      instanceDefinition.getResourceOperations();
    Set<String> unknownRoles = resources.getComponentNames();
    unknownRoles.removeAll(knownRoleNames);
    if (!unknownRoles.isEmpty()) {
      throw new BadCommandArgumentsException("There is unknown role: %s",
                                             unknownRoles.iterator().next());
    }
    providerUtils.validateNodeCount(instanceDefinition,
                                    AccumuloKeys.ROLE_TABLET,
                                    1, -1);


    providerUtils.validateNodeCount(instanceDefinition,
                                    AccumuloKeys.ROLE_MASTER, 1, -1);

    providerUtils.validateNodeCount(instanceDefinition,
                                    AccumuloKeys.ROLE_GARBAGE_COLLECTOR,
                                    0, -1);

    providerUtils.validateNodeCount(instanceDefinition,
                                    AccumuloKeys.ROLE_MONITOR,
                                    0, -1);

    providerUtils.validateNodeCount(instanceDefinition,
                                    AccumuloKeys.ROLE_TRACER , 0, -1);

    MapOperations globalAppConfOptions =
      instanceDefinition.getAppConfOperations().getGlobalOptions();
    globalAppConfOptions.verifyOptionSet(AccumuloKeys.OPTION_ZK_HOME);
    globalAppConfOptions.verifyOptionSet(AccumuloKeys.OPTION_HADOOP_HOME);
  }


  /**
   * Get the path to the script
   * @return the script
   */
  public static String buildScriptBinPath(AggregateConf instanceDefinition)
    throws FileNotFoundException {
    return providerUtils.buildPathToScript(instanceDefinition, "bin", "accumulo");
  }


}
