/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service.utils;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ServiceApiUtil {
  private static final Logger LOG =
      LoggerFactory.getLogger(ServiceApiUtil.class);
  public static JsonSerDeser<Service> jsonSerDeser =
      new JsonSerDeser<>(Service.class,
          PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
  private static final PatternValidator namePattern
      = new PatternValidator("[a-z][a-z0-9-]*");

  private static final PatternValidator userNamePattern
      = new PatternValidator("[a-z][a-z0-9-.]*");

  @VisibleForTesting
  public static void setJsonSerDeser(JsonSerDeser jsd) {
    jsonSerDeser = jsd;
  }

  @VisibleForTesting
  public static void validateAndResolveService(Service service,
      SliderFileSystem fs, org.apache.hadoop.conf.Configuration conf) throws
      IOException {
    boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
        RegistryConstants.DEFAULT_DNS_ENABLED);
    if (dnsEnabled) {
      if (RegistryUtils.currentUser().length()
          > RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
        throw new IllegalArgumentException(
            RestApiErrorMessages.ERROR_USER_NAME_INVALID);
      }
      userNamePattern.validate(RegistryUtils.currentUser());
    }

    if (StringUtils.isEmpty(service.getName())) {
      throw new IllegalArgumentException(
          RestApiErrorMessages.ERROR_APPLICATION_NAME_INVALID);
    }

    if (StringUtils.isEmpty(service.getVersion())) {
      throw new IllegalArgumentException(String.format(
          RestApiErrorMessages.ERROR_APPLICATION_VERSION_INVALID,
          service.getName()));
    }

    validateNameFormat(service.getName(), conf);

    // If the service has no components, throw error
    if (!hasComponent(service)) {
      throw new IllegalArgumentException(
          "No component specified for " + service.getName());
    }

    if (UserGroupInformation.isSecurityEnabled()) {
      if (!StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
        try {
          // validate URI format
          new URI(service.getKerberosPrincipal().getKeytab());
        } catch (URISyntaxException e) {
          throw new IllegalArgumentException(e);
        }
      }
    }

    // Validate there are no component name collisions (collisions are not
    // currently supported) and add any components from external services
    Configuration globalConf = service.getConfiguration();
    Set<String> componentNames = new HashSet<>();
    List<Component> componentsToRemove = new ArrayList<>();
    List<Component> componentsToAdd = new ArrayList<>();
    for (Component comp : service.getComponents()) {
      int maxCompLength = RegistryConstants.MAX_FQDN_LABEL_LENGTH;
      maxCompLength = maxCompLength - Long.toString(Long.MAX_VALUE).length();
      if (dnsEnabled && comp.getName().length() > maxCompLength) {
        throw new IllegalArgumentException(String.format(RestApiErrorMessages
            .ERROR_COMPONENT_NAME_INVALID, maxCompLength, comp.getName()));
      }
      if (componentNames.contains(comp.getName())) {
        throw new IllegalArgumentException("Component name collision: " +
            comp.getName());
      }
      // If artifact is of type SERVICE (which cannot be filled from
      // global), read external service and add its components to this
      // service
      if (comp.getArtifact() != null && comp.getArtifact().getType() ==
          Artifact.TypeEnum.SERVICE) {
        if (StringUtils.isEmpty(comp.getArtifact().getId())) {
          throw new IllegalArgumentException(
              RestApiErrorMessages.ERROR_ARTIFACT_ID_INVALID);
        }
        LOG.info("Marking {} for removal", comp.getName());
        componentsToRemove.add(comp);
        List<Component> externalComponents = getComponents(fs,
            comp.getArtifact().getId());
        for (Component c : externalComponents) {
          Component override = service.getComponent(c.getName());
          if (override != null && override.getArtifact() == null) {
            // allow properties from external components to be overridden /
            // augmented by properties in this component, except for artifact
            // which must be read from external component
            override.mergeFrom(c);
            LOG.info("Merging external component {} from external {}", c
                .getName(), comp.getName());
          } else {
            if (componentNames.contains(c.getName())) {
              throw new IllegalArgumentException("Component name collision: " +
                  c.getName());
            }
            componentNames.add(c.getName());
            componentsToAdd.add(c);
            LOG.info("Adding component {} from external {}", c.getName(),
                comp.getName());
          }
        }
      } else {
        // otherwise handle as a normal component
        componentNames.add(comp.getName());
        // configuration
        comp.getConfiguration().mergeFrom(globalConf);
      }
    }
    service.getComponents().removeAll(componentsToRemove);
    service.getComponents().addAll(componentsToAdd);

    // Validate components and let global values take effect if component level
    // values are not provided
    Artifact globalArtifact = service.getArtifact();
    Resource globalResource = service.getResource();
    for (Component comp : service.getComponents()) {
      // fill in global artifact unless it is type SERVICE
      if (comp.getArtifact() == null && service.getArtifact() != null
          && service.getArtifact().getType() != Artifact.TypeEnum
          .SERVICE) {
        comp.setArtifact(globalArtifact);
      }
      // fill in global resource
      if (comp.getResource() == null) {
        comp.setResource(globalResource);
      }
      // validate dependency existence
      if (comp.getDependencies() != null) {
        for (String dependency : comp.getDependencies()) {
          if (!componentNames.contains(dependency)) {
            throw new IllegalArgumentException(String.format(
                RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, dependency,
                comp.getName()));
          }
        }
      }
      validateComponent(comp, fs.getFileSystem(), conf);
    }
    validatePlacementPolicy(service.getComponents(), componentNames);

    // validate dependency tree
    sortByDependencies(service.getComponents());

    // Service lifetime if not specified, is set to unlimited lifetime
    if (service.getLifetime() == null) {
      service.setLifetime(RestApiConstants.DEFAULT_UNLIMITED_LIFETIME);
    }
  }

  private static void validateComponent(Component comp, FileSystem fs,
      org.apache.hadoop.conf.Configuration conf)
      throws IOException {
    validateNameFormat(comp.getName(), conf);

    AbstractClientProvider compClientProvider = ProviderFactory
        .getClientProvider(comp.getArtifact());
    compClientProvider.validateArtifact(comp.getArtifact(), fs);

    if (comp.getLaunchCommand() == null && (comp.getArtifact() == null || comp
        .getArtifact().getType() != Artifact.TypeEnum.DOCKER)) {
      throw new IllegalArgumentException(RestApiErrorMessages
          .ERROR_ABSENT_LAUNCH_COMMAND);
    }

    validateServiceResource(comp.getResource(), comp);

    if (comp.getNumberOfContainers() == null
        || comp.getNumberOfContainers() < 0) {
      throw new IllegalArgumentException(String.format(
          RestApiErrorMessages.ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID
              + ": " + comp.getNumberOfContainers(), comp.getName()));
    }
    compClientProvider.validateConfigFiles(comp.getConfiguration()
        .getFiles(), fs);

    MonitorUtils.getProbe(comp.getReadinessCheck());
  }

  // Check component or service name format and transform to lower case.
  public static void validateNameFormat(String name,
      org.apache.hadoop.conf.Configuration conf) {
    if (StringUtils.isEmpty(name)) {
      throw new IllegalArgumentException("Name can not be empty!");
    }
    // validate component name
    if (name.contains("_")) {
      throw new IllegalArgumentException(
          "Invalid format: " + name
              + ", can not use '_', as DNS hostname does not allow '_'. Use '-' Instead. ");
    }
    boolean dnsEnabled = conf.getBoolean(RegistryConstants.KEY_DNS_ENABLED,
        RegistryConstants.DEFAULT_DNS_ENABLED);
    if (dnsEnabled && name.length() > RegistryConstants.MAX_FQDN_LABEL_LENGTH) {
      throw new IllegalArgumentException(String
          .format("Invalid format %s, must be no more than 63 characters ",
              name));
    }
    namePattern.validate(name);
  }

  private static void validatePlacementPolicy(List<Component> components,
      Set<String> componentNames) {
    for (Component comp : components) {
      if (comp.getPlacementPolicy() != null) {
        for (PlacementConstraint constraint : comp.getPlacementPolicy()
            .getConstraints()) {
          for (String targetTag : constraint.getTargetTags()) {
            if (!comp.getName().equals(targetTag)) {
              throw new IllegalArgumentException(String.format(
                  RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
                  targetTag, comp.getName(), comp.getName(), comp.getName()));
            }
          }
        }
      }
    }
  }

  @VisibleForTesting
  public static List<Component> getComponents(SliderFileSystem
      fs, String serviceName) throws IOException {
    return loadService(fs, serviceName).getComponents();
  }

  public static Service loadService(SliderFileSystem fs, String
      serviceName) throws IOException {
    Path serviceJson = getServiceJsonPath(fs, serviceName);
    LOG.info("Loading service definition from " + serviceJson);
    return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
  }

  public static Service loadServiceUpgrade(SliderFileSystem fs,
      String serviceName, String version) throws IOException {
    Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
    Path versionedDef = new Path(versionPath, serviceName + ".json");
    LOG.info("Loading service definition from {}", versionedDef);
    return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
  }

  public static Service loadServiceFrom(SliderFileSystem fs,
      Path appDefPath) throws IOException {
    LOG.info("Loading service definition from " + appDefPath);
    return jsonSerDeser.load(fs.getFileSystem(), appDefPath);
  }

  public static Path getServiceJsonPath(SliderFileSystem fs, String serviceName) {
    Path serviceDir = fs.buildClusterDirPath(serviceName);
    return new Path(serviceDir, serviceName + ".json");
  }

  private static void validateServiceResource(Resource resource,
      Component comp) {
    // Only services/components of type SERVICE can skip resource requirement
    if (resource == null) {
      throw new IllegalArgumentException(
          comp == null ? RestApiErrorMessages.ERROR_RESOURCE_INVALID : String
              .format(RestApiErrorMessages.ERROR_RESOURCE_FOR_COMP_INVALID,
                  comp.getName()));
    }
    // One and only one of profile OR cpus & memory can be specified. Specifying
    // both raises validation error.
    if (StringUtils.isNotEmpty(resource.getProfile()) && (
        resource.getCpus() != null || StringUtils
            .isNotEmpty(resource.getMemory()))) {
      throw new IllegalArgumentException(comp == null ?
          RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED :
          String.format(
              RestApiErrorMessages.ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_FOR_COMP_NOT_SUPPORTED,
              comp.getName()));
    }
    // Currently resource profile is not supported yet, so we will raise
    // validation error if only resource profile is specified
    if (StringUtils.isNotEmpty(resource.getProfile())) {
      throw new IllegalArgumentException(
          RestApiErrorMessages.ERROR_RESOURCE_PROFILE_NOT_SUPPORTED_YET);
    }

    String memory = resource.getMemory();
    Integer cpus = resource.getCpus();
    if (StringUtils.isEmpty(memory)) {
      throw new IllegalArgumentException(
          comp == null ? RestApiErrorMessages.ERROR_RESOURCE_MEMORY_INVALID :
              String.format(
                  RestApiErrorMessages.ERROR_RESOURCE_MEMORY_FOR_COMP_INVALID,
                  comp.getName()));
    }
    if (cpus == null) {
      throw new IllegalArgumentException(
          comp == null ? RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID :
              String.format(
                  RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID,
                  comp.getName()));
    }
    if (cpus <= 0) {
      throw new IllegalArgumentException(comp == null ?
          RestApiErrorMessages.ERROR_RESOURCE_CPUS_INVALID_RANGE : String
          .format(
              RestApiErrorMessages.ERROR_RESOURCE_CPUS_FOR_COMP_INVALID_RANGE,
              comp.getName()));
    }
  }

  // check if comp mem size exceeds cluster limit
  public static void validateCompResourceSize(
      org.apache.hadoop.yarn.api.records.Resource maxResource,
      Service service) throws YarnException {
    for (Component component : service.getComponents()) {
      long mem = Long.parseLong(component.getResource().getMemory());
      if (mem > maxResource.getMemorySize()) {
        throw new YarnException(
            "Component " + component.getName() + ": specified memory size ("
                + mem + ") is larger than configured max container memory " +
                "size (" + maxResource.getMemorySize() + ")");
      }
      int cpu = component.getResource().getCpus();
      if (cpu > maxResource.getVirtualCores()) {
        throw new YarnException(
            "Component " + component.getName() + ": specified number of " +
                "virtual core (" + cpu + ") is larger than configured max " +
                "virtual core size (" + maxResource.getVirtualCores() + ")");
      }
    }
  }

  private static boolean hasComponent(Service service) {
    if (service.getComponents() == null || service.getComponents()
        .isEmpty()) {
      return false;
    }
    return true;
  }

  public static Collection<Component> sortByDependencies(List<Component>
      components) {
    Map<String, Component> sortedComponents =
        sortByDependencies(components, null);
    return sortedComponents.values();
  }

  /**
   * Each internal call of sortByDependencies will identify all of the
   * components with the same dependency depth (the lowest depth that has not
   * been processed yet) and add them to the sortedComponents list, preserving
   * their original ordering in the components list.
   *
   * So the first time it is called, all components with no dependencies
   * (depth 0) will be identified. The next time it is called, all components
   * that have dependencies only on the the depth 0 components will be
   * identified (depth 1). This will be repeated until all components have
   * been added to the sortedComponents list. If no new components are
   * identified but the sortedComponents list is not complete, an error is
   * thrown.
   */
  private static Map<String, Component> sortByDependencies(List<Component>
      components, Map<String, Component> sortedComponents) {
    if (sortedComponents == null) {
      sortedComponents = new LinkedHashMap<>();
    }

    Map<String, Component> componentsToAdd = new LinkedHashMap<>();
    List<Component> componentsSkipped = new ArrayList<>();
    for (Component component : components) {
      String name = component.getName();
      if (sortedComponents.containsKey(name)) {
        continue;
      }
      boolean dependenciesAlreadySorted = true;
      if (!ServiceUtils.isEmpty(component.getDependencies())) {
        for (String dependency : component.getDependencies()) {
          if (!sortedComponents.containsKey(dependency)) {
            dependenciesAlreadySorted = false;
            break;
          }
        }
      }
      if (dependenciesAlreadySorted) {
        componentsToAdd.put(name, component);
      } else {
        componentsSkipped.add(component);
      }
    }

    if (componentsToAdd.size() == 0) {
      throw new IllegalArgumentException(String.format(RestApiErrorMessages
          .ERROR_DEPENDENCY_CYCLE, componentsSkipped));
    }
    sortedComponents.putAll(componentsToAdd);
    if (sortedComponents.size() == components.size()) {
      return sortedComponents;
    }
    return sortByDependencies(components, sortedComponents);
  }

  public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
      Service service)
      throws IOException, SliderException {
    FsPermission appDirPermission = new FsPermission("750");
    fs.createWithPermissions(appDir, appDirPermission);
    Path appJson = writeAppDefinition(fs, appDir, service);
    LOG.info("Persisted service {} version {} at {}", service.getName(),
        service.getVersion(), appJson);
  }

  public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
      Service service) throws IOException {
    Path appJson = new Path(appDir, service.getName() + ".json");
    jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
    return appJson;
  }

  public static String $(String s) {
    return "${" + s +"}";
  }
}
