/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.providers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.types.AddressTypes;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.server.appmaster.actions.QueueAccess;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.state.ContainerReleaseSelector;
import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.services.workflow.ForkedProcessService;
import org.apache.slider.server.services.workflow.ServiceParent;
import org.apache.slider.server.services.workflow.WorkflowSequenceService;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
 * The base class for provider services. It lets the implementations
 * add sequences of operations, and propagates service failures
 * upstream
 */
public abstract class AbstractProviderService
    extends WorkflowSequenceService
    implements
    ProviderCore,
    SliderKeys,
    ProviderService {
  private static final Logger log =
    LoggerFactory.getLogger(AbstractProviderService.class);
  protected StateAccessForProviders amState;
  protected AgentRestOperations restOps;
  protected URL amWebAPI;
  protected YarnRegistryViewForProviders yarnRegistry;
  protected QueueAccess queueAccess;

  public AbstractProviderService(String name) {
    super(name);
    setStopIfNoChildServicesAtStartup(false);
  }

  @Override
  public Configuration getConf() {
    return getConfig();
  }

  public StateAccessForProviders getAmState() {
    return amState;
  }

  public QueueAccess getQueueAccess() {
    return queueAccess;
  }

  public void setAmState(StateAccessForProviders amState) {
    this.amState = amState;
  }

  @Override
  public String getHumanName() {
    return getName().toLowerCase(Locale.ENGLISH);
  }
  
  @Override
  public void bind(StateAccessForProviders stateAccessor,
      QueueAccess queueAccess,
      List<Container> liveContainers) {
    this.amState = stateAccessor;
    this.queueAccess = queueAccess;
  }

  @Override
  public void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry) {
    this.yarnRegistry = yarnRegistry;
  }

  public YarnRegistryViewForProviders getYarnRegistry() {
    return yarnRegistry;
  }

  @Override
  public AgentRestOperations getAgentRestOperations() {
    return restOps;
  }

  @Override
  public void notifyContainerCompleted(ContainerId containerId) {
  }

  public void setAgentRestOperations(AgentRestOperations agentRestOperations) {
    this.restOps = agentRestOperations;
  }

  /**
   * Load a specific XML configuration file for the provider config
   * @param confDir configuration directory
   * @param siteXMLFilename provider-specific filename
   * @return a configuration to be included in status
   * @throws BadCommandArgumentsException argument problems
   * @throws IOException IO problems
   */
  protected Configuration loadProviderConfigurationInformation(File confDir,
                                                               String siteXMLFilename)
    throws BadCommandArgumentsException, IOException {
    Configuration siteConf;
    File siteXML = new File(confDir, siteXMLFilename);
    if (!siteXML.exists()) {
      throw new BadCommandArgumentsException(
        "Configuration directory %s doesn't contain %s - listing is %s",
        confDir, siteXMLFilename, SliderUtils.listDir(confDir));
    }

    //now read it in
    siteConf = ConfigHelper.loadConfFromFile(siteXML);
    log.info("{} file is at {}", siteXMLFilename, siteXML);
    log.info(ConfigHelper.dumpConfigToString(siteConf));
    return siteConf;
  }

  /**
   * No-op implementation of this method.
   */
  @Override
  public void initializeApplicationConfiguration(
      AggregateConf instanceDefinition, SliderFileSystem fileSystem)
      throws IOException, SliderException {
  }

  /**
   * No-op implementation of this method.
   *
   * {@inheritDoc}
   */
  @Override
  public void validateApplicationConfiguration(AggregateConf instance,
                                               File confDir,
                                               boolean secure)
      throws IOException, SliderException {

  }

  /**
   * Scan through the roles and see if it is supported.
   * @param role role to look for
   * @return true if the role is known about -and therefore
   * that a launcher thread can be deployed to launch it
   */
  @Override
  public boolean isSupportedRole(String role) {
    Collection<ProviderRole> roles = getRoles();
    for (ProviderRole providedRole : roles) {
      if (providedRole.name.equals(role)) {
        return true;
      }
    }
    return false;
  }

  /**
   * override point to allow a process to start executing in this container
   * @param instanceDefinition cluster description
   * @param confDir configuration directory
   * @param env environment
   * @param execInProgress the callback for the exec events
   * @return false
   * @throws IOException
   * @throws SliderException
   */
  @Override
  public boolean exec(AggregateConf instanceDefinition,
      File confDir,
      Map<String, String> env,
      ProviderCompleted execInProgress) throws IOException, SliderException {
    return false;
  }

  @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
  @Override // ExitCodeProvider
  public int getExitCode() {
    Throwable cause = getFailureCause();
    if (cause != null) {
      //failed for some reason
      if (cause instanceof ExitCodeProvider) {
        return ((ExitCodeProvider) cause).getExitCode();
      }
    }
    ForkedProcessService lastProc = latestProcess();
    if (lastProc == null || !lastProc.isProcessTerminated()) {
      return 0;
    } else {
      return lastProc.getExitCode();
    }
  }

  /**
   * Return the latest forked process service that ran
   * @return the forkes service
   */
  protected ForkedProcessService latestProcess() {
    Service current = getActiveService();
    Service prev = getPreviousService();

    Service latest = current != null ? current : prev;
    if (latest instanceof ForkedProcessService) {
      return (ForkedProcessService) latest;
    } else {
      //its a composite object, so look inside it for a process
      if (latest instanceof ServiceParent) {
        return getFPSFromParentService((ServiceParent) latest);
      } else {
        //no match
        return null;
      }
    }
  }


  /**
   * Given a parent service, find the one that is a forked process
   * @param serviceParent parent
   * @return the forked process service or null if there is none
   */
  protected ForkedProcessService getFPSFromParentService(ServiceParent serviceParent) {
    List<Service> services = serviceParent.getServices();
    for (Service s : services) {
      if (s instanceof ForkedProcessService) {
        return (ForkedProcessService) s;
      }
    }
    return null;
  }

  /**
   * if we are already running, start this service
   */
  protected void maybeStartCommandSequence() {
    if (isInState(STATE.STARTED)) {
      startNextService();
    }
  }

  /**
   * Create a new forked process service with the given
   * name, environment and command list -then add it as a child
   * for execution in the sequence.
   *
   * @param name command name
   * @param env environment
   * @param commands command line
   * @throws IOException
   * @throws SliderException
   */
  protected ForkedProcessService queueCommand(String name,
                              Map<String, String> env,
                              List<String> commands) throws
                                                     IOException,
      SliderException {
    ForkedProcessService process = buildProcess(name, env, commands);
    //register the service for lifecycle management; when this service
    //is terminated, so is the master process
    addService(process);
    return process;
  }

  public ForkedProcessService buildProcess(String name,
                                           Map<String, String> env,
                                           List<String> commands) throws
                                                                  IOException,
      SliderException {
    ForkedProcessService process;
    process = new ForkedProcessService(name);
    process.init(getConfig());
    process.build(env, commands);
    return process;
  }

  /*
   * Build the provider status, can be empty
   * @return the provider status - map of entries to add to the info section
   */
  @Override
  public Map<String, String> buildProviderStatus() {
    return new HashMap<String, String>();
  }

  /*
  Build the monitor details. The base implementation includes all the external URL endpoints
  in the external view
   */
  @Override
  public Map<String, String> buildMonitorDetails(ClusterDescription clusterDesc) {
    Map<String, String> details = new LinkedHashMap<String, String>();

    // add in all the endpoints
    buildEndpointDetails(details);

    return details;
  }
  
  protected String getInfoAvoidingNull(ClusterDescription clusterDesc, String key) {
    String value = clusterDesc.getInfo(key);

    return null == value ? "N/A" : value;
  }

  @Override
  public void buildEndpointDetails(Map<String, String> details) {
    ServiceRecord self = yarnRegistry.getSelfRegistration();

    List<Endpoint> externals = self.external;
    for (Endpoint endpoint : externals) {
      String addressType = endpoint.addressType;
      if (AddressTypes.ADDRESS_URI.equals(addressType)) {
        try {
          List<URL> urls = RegistryTypeUtils.retrieveAddressURLs(endpoint);
          if (!urls.isEmpty()) {
            details.put(endpoint.api, urls.get(0).toString());
          }
        } catch (InvalidRecordException ignored) {
          // Ignored
        } catch (MalformedURLException ignored) {
          // ignored
        }

      }

    }
  }

  @Override
  public void applyInitialRegistryDefinitions(URL amWebURI,
      URL agentOpsURI,
      URL agentStatusURI,
      ServiceRecord serviceRecord)
    throws IOException {
      this.amWebAPI = amWebURI;
  }

  /**
   * {@inheritDoc}
   * 
   * 
   * @return The base implementation returns the most recent containers first.
   */
  @Override
  public ContainerReleaseSelector createContainerReleaseSelector() {
    return new MostRecentContainerReleaseSelector();
  }

  @Override
  public void releaseAssignedContainer(ContainerId containerId) {
    // no-op
  }

  @Override
  public void addContainerRequest(AMRMClient.ContainerRequest req) {
    // no-op
  }

  @Override
  public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
    // no-op
  }

  @Override
  public int cancelContainerRequests(Priority priority1,
      Priority priority2,
      int count) {
    return 0;
  }

  @Override
  public void execute(List<AbstractRMOperation> operations) {
    for (AbstractRMOperation operation : operations) {
      operation.execute(this);
    }
  }
  /**
   * No-op implementation of this method.
   */
  @Override
  public void rebuildContainerDetails(List<Container> liveContainers,
      String applicationId, Map<Integer, ProviderRole> providerRoles) {
  }
}
