/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.net.URL;


/**
 * Class that exposes information about queues maintained by the Hadoop
 * Map/Reduce framework.
 * <p/>
 * The Map/Reduce framework can be configured with one or more queues,
 * depending on the scheduler it is configured with. While some
 * schedulers work only with one queue, some schedulers support multiple
 * queues. Some schedulers also support the notion of queues within
 * queues - a feature called hierarchical queues.
 * <p/>
 * Queue names are unique, and used as a key to lookup queues. Hierarchical
 * queues are named by a 'fully qualified name' such as q1:q2:q3, where
 * q2 is a child queue of q1 and q3 is a child queue of q2.
 * <p/>
 * Leaf level queues are queues that contain no queues within them. Jobs
 * can be submitted only to leaf level queues.
 * <p/>
 * Queues can be configured with various properties. Some of these
 * properties are common to all schedulers, and those are handled by this
 * class. Schedulers might also associate several custom properties with
 * queues. These properties are parsed and maintained per queue by the
 * framework. If schedulers need more complicated structure to maintain
 * configuration per queue, they are free to not use the facilities
 * provided by the framework, but define their own mechanisms. In such cases,
 * it is likely that the name of the queue will be used to relate the
 * common properties of a queue with scheduler specific properties.
 * <p/>
 * Information related to a queue, such as its name, properties, scheduling
 * information and children are exposed by this class via a serializable
 * class called {@link JobQueueInfo}.
 * <p/>
 * Queues are configured in the configuration file mapred-queues.xml.
 * To support backwards compatibility, queues can also be configured
 * in mapred-site.xml. However, when configured in the latter, there is
 * no support for hierarchical queues.
 */
@InterfaceAudience.Private
public class QueueManager {

  private static final Log LOG = LogFactory.getLog(QueueManager.class);

  // Map of a queue name and Queue object
  private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
  private Map<String, Queue> allQueues = new HashMap<String, Queue>();
  public static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
  static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml";

  //Prefix in configuration for queue related keys
  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = "mapred.queue.";

  //Resource in which queue acls are configured.
  private Queue root = null;
  
  // represents if job and queue acls are enabled on the mapreduce cluster
  private boolean areAclsEnabled = false;

  /**
   * Factory method to create an appropriate instance of a queue
   * configuration parser.
   * <p/>
   * Returns a parser that can parse either the deprecated property
   * style queue configuration in mapred-site.xml, or one that can
   * parse hierarchical queues in mapred-queues.xml. First preference
   * is given to configuration in mapred-site.xml. If no queue
   * configuration is found there, then a parser that can parse
   * configuration in mapred-queues.xml is created.
   *
   * @param conf Configuration instance that determines which parser
   *             to use.
   * @return Queue configuration parser
   */
  static QueueConfigurationParser getQueueConfigurationParser(
    Configuration conf, boolean reloadConf, boolean areAclsEnabled) {
    if (conf != null && conf.get(
      DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
      if (reloadConf) {
        conf.reloadConfiguration();
      }
      return new DeprecatedQueueConfigurationParser(conf);
    } else {
      URL xmlInUrl =
        Thread.currentThread().getContextClassLoader()
          .getResource(QUEUE_CONF_FILE_NAME);
      if (xmlInUrl == null) {
        xmlInUrl = Thread.currentThread().getContextClassLoader()
          .getResource(QUEUE_CONF_DEFAULT_FILE_NAME);
        assert xmlInUrl != null; // this should be in our jar
      }
      InputStream stream = null;
      try {
        stream = xmlInUrl.openStream();
        return new QueueConfigurationParser(new BufferedInputStream(stream),
            areAclsEnabled);
      } catch (IOException ioe) {
        throw new RuntimeException("Couldn't open queue configuration at " +
                                   xmlInUrl, ioe);
      } finally {
        IOUtils.closeStream(stream);
      }
    }
  }

  QueueManager() {// acls are disabled
    this(false);
  }

  QueueManager(boolean areAclsEnabled) {
    this.areAclsEnabled = areAclsEnabled;
    initialize(getQueueConfigurationParser(null, false, areAclsEnabled));
  }

  /**
   * Construct a new QueueManager using configuration specified in the passed
   * in {@link org.apache.hadoop.conf.Configuration} object.
   * <p/>
   * This instance supports queue configuration specified in mapred-site.xml,
   * but without support for hierarchical queues. If no queue configuration
   * is found in mapred-site.xml, it will then look for site configuration
   * in mapred-queues.xml supporting hierarchical queues.
   *
   * @param clusterConf    mapreduce cluster configuration
   */
  public QueueManager(Configuration clusterConf) {
    areAclsEnabled = clusterConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
    initialize(getQueueConfigurationParser(clusterConf, false, areAclsEnabled));
  }

  /**
   * Create an instance that supports hierarchical queues, defined in
   * the passed in configuration file.
   * <p/>
   * This is mainly used for testing purposes and should not called from
   * production code.
   *
   * @param confFile File where the queue configuration is found.
   */
  QueueManager(String confFile, boolean areAclsEnabled) {
    this.areAclsEnabled = areAclsEnabled;
    QueueConfigurationParser cp =
        new QueueConfigurationParser(confFile, areAclsEnabled);
    initialize(cp);
  }

  /**
   * Initialize the queue-manager with the queue hierarchy specified by the
   * given {@link QueueConfigurationParser}.
   * 
   * @param cp
   */
  private void initialize(QueueConfigurationParser cp) {
    this.root = cp.getRoot();
    leafQueues.clear();
    allQueues.clear();
    //At this point we have root populated
    //update data structures leafNodes.
    leafQueues = getRoot().getLeafQueues();
    allQueues.putAll(getRoot().getInnerQueues());
    allQueues.putAll(leafQueues);

    LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
  }

  /**
   * Return the set of leaf level queues configured in the system to
   * which jobs are submitted.
   * <p/>
   * The number of queues configured should be dependent on the Scheduler
   * configured. Note that some schedulers work with only one queue, whereas
   * others can support multiple queues.
   *
   * @return Set of queue names.
   */
  public synchronized Set<String> getLeafQueueNames() {
    return leafQueues.keySet();
  }

  /**
   * Return true if the given user is part of the ACL for the given
   * {@link QueueACL} name for the given queue.
   * <p/>
   * An operation is allowed if all users are provided access for this
   * operation, or if either the user or any of the groups specified is
   * provided access.
   *
   * @param queueName Queue on which the operation needs to be performed.
   * @param qACL      The queue ACL name to be checked
   * @param ugi       The user and groups who wish to perform the operation.
   * @return true     if the operation is allowed, false otherwise.
   */
  public synchronized boolean hasAccess(
    String queueName, QueueACL qACL, UserGroupInformation ugi) {

    Queue q = leafQueues.get(queueName);

    if (q == null) {
      LOG.info("Queue " + queueName + " is not present");
      return false;
    }

    if(q.getChildren() != null && !q.getChildren().isEmpty()) {
      LOG.info("Cannot submit job to parent queue " + q.getName());
      return false;
    }

    if (!areAclsEnabled()) {
      return true;
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Checking access for the acl " + toFullPropertyName(queueName,
        qACL.getAclName()) + " for user " + ugi.getShortUserName());
    }

    AccessControlList acl = q.getAcls().get(
        toFullPropertyName(queueName, qACL.getAclName()));
    if (acl == null) {
      return false;
    }

    // Check if user is part of the ACL
    return acl.isUserAllowed(ugi);
  }

  /**
   * Checks whether the given queue is running or not.
   *
   * @param queueName name of the queue
   * @return true, if the queue is running.
   */
  synchronized boolean isRunning(String queueName) {
    Queue q = leafQueues.get(queueName);
    if (q != null) {
      return q.getState().equals(QueueState.RUNNING);
    }
    return false;
  }

  /**
   * Set a generic Object that represents scheduling information relevant
   * to a queue.
   * <p/>
   * A string representation of this Object will be used by the framework
   * to display in user facing applications like the JobTracker web UI and
   * the hadoop CLI.
   *
   * @param queueName queue for which the scheduling information is to be set.
   * @param queueInfo scheduling information for this queue.
   */
  public synchronized void setSchedulerInfo(
    String queueName,
    Object queueInfo) {
    if (allQueues.get(queueName) != null) {
      allQueues.get(queueName).setSchedulingInfo(queueInfo);
    }
  }

  /**
   * Return the scheduler information configured for this queue.
   *
   * @param queueName queue for which the scheduling information is required.
   * @return The scheduling information for this queue.
   */
  public synchronized Object getSchedulerInfo(String queueName) {
    if (allQueues.get(queueName) != null) {
      return allQueues.get(queueName).getSchedulingInfo();
    }
    return null;
  }

  static final String MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY =
      "Unable to refresh queues because queue-hierarchy changed. "
          + "Retaining existing configuration. ";

  static final String MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE =
      "Scheduler couldn't refresh it's queues with the new"
          + " configuration properties. "
          + "Retaining existing configuration throughout the system.";

  /**
   * Refresh acls, state and scheduler properties for the configured queues.
   * <p/>
   * This method reloads configuration related to queues, but does not
   * support changes to the list of queues or hierarchy. The expected usage
   * is that an administrator can modify the queue configuration file and
   * fire an admin command to reload queue configuration. If there is a
   * problem in reloading configuration, then this method guarantees that
   * existing queue configuration is untouched and in a consistent state.
   * 
   * @param schedulerRefresher
   * @throws IOException when queue configuration file is invalid.
   */
  synchronized void refreshQueues(Configuration conf,
      QueueRefresher schedulerRefresher)
      throws IOException {

    // Create a new configuration parser using the passed conf object.
    QueueConfigurationParser cp =
        getQueueConfigurationParser(conf, true, areAclsEnabled);

    /*
     * (1) Validate the refresh of properties owned by QueueManager. As of now,
     * while refreshing queue properties, we only check that the hierarchy is
     * the same w.r.t queue names, ACLs and state for each queue and don't
     * support adding new queues or removing old queues
     */
    if (!root.isHierarchySameAs(cp.getRoot())) {
      LOG.warn(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
      throw new IOException(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
    }

    /*
     * (2) QueueManager owned properties are validated. Now validate and
     * refresh the properties of scheduler in a single step.
     */
    if (schedulerRefresher != null) {
      try {
        schedulerRefresher.refreshQueues(cp.getRoot().getJobQueueInfo().getChildren());
      } catch (Throwable e) {
        StringBuilder msg =
            new StringBuilder(
                "Scheduler's refresh-queues failed with the exception : "
                    + StringUtils.stringifyException(e));
        msg.append("\n");
        msg.append(MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE);
        LOG.error(msg.toString());
        throw new IOException(msg.toString());
      }
    }

    /*
     * (3) Scheduler has validated and refreshed its queues successfully, now
     * refresh the properties owned by QueueManager
     */

    // First copy the scheduling information recursively into the new
    // queue-hierarchy. This is done to retain old scheduling information. This
    // is done after scheduler refresh and not before it because during refresh,
    // schedulers may wish to change their scheduling info objects too.
    cp.getRoot().copySchedulingInfo(this.root);

    // Now switch roots.
    initialize(cp);

    LOG.info("Queue configuration is refreshed successfully.");
  }

  // this method is for internal use only
  public static final String toFullPropertyName(
    String queue,
    String property) {
    return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
  }

  /**
   * Return an array of {@link JobQueueInfo} objects for all the
   * queues configurated in the system.
   *
   * @return array of JobQueueInfo objects.
   */
  synchronized JobQueueInfo[] getJobQueueInfos() {
    ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
    for (String queue : allQueues.keySet()) {
      JobQueueInfo queueInfo = getJobQueueInfo(queue);
      if (queueInfo != null) {
        queueInfoList.add(queueInfo);
      }
    }
    return queueInfoList.toArray(
      new JobQueueInfo[queueInfoList.size()]);
  }


  /**
   * Return {@link JobQueueInfo} for a given queue.
   *
   * @param queue name of the queue
   * @return JobQueueInfo for the queue, null if the queue is not found.
   */
  synchronized JobQueueInfo getJobQueueInfo(String queue) {
    if (allQueues.containsKey(queue)) {
      return allQueues.get(queue).getJobQueueInfo();
    }

    return null;
  }

  /**
   * JobQueueInfo for all the queues.
   * <p/>
   * Contribs can use this data structure to either create a hierarchy or for
   * traversing.
   * They can also use this to refresh properties in case of refreshQueues
   *
   * @return a map for easy navigation.
   */
  synchronized Map<String, JobQueueInfo> getJobQueueInfoMapping() {
    Map<String, JobQueueInfo> m = new HashMap<String, JobQueueInfo>();

    for (String key : allQueues.keySet()) {
      m.put(key, allQueues.get(key).getJobQueueInfo());
    }

    return m;
  }

  /**
   * Generates the array of QueueAclsInfo object.
   * <p/>
   * The array consists of only those queues for which user has acls.
   *
   * @return QueueAclsInfo[]
   * @throws java.io.IOException
   */
  synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
    throws IOException {
    //List of all QueueAclsInfo objects , this list is returned
    ArrayList<QueueAclsInfo> queueAclsInfolist =
      new ArrayList<QueueAclsInfo>();
    QueueACL[] qAcls = QueueACL.values();
    for (String queueName : leafQueues.keySet()) {
      QueueAclsInfo queueAclsInfo = null;
      ArrayList<String> operationsAllowed = null;
      for (QueueACL qAcl : qAcls) {
        if (hasAccess(queueName, qAcl, ugi)) {
          if (operationsAllowed == null) {
            operationsAllowed = new ArrayList<String>();
          }
          operationsAllowed.add(qAcl.getAclName());
        }
      }
      if (operationsAllowed != null) {
        //There is atleast 1 operation supported for queue <queueName>
        //, hence initialize queueAclsInfo
        queueAclsInfo = new QueueAclsInfo(
          queueName, operationsAllowed.toArray
            (new String[operationsAllowed.size()]));
        queueAclsInfolist.add(queueAclsInfo);
      }
    }
    return queueAclsInfolist.toArray(
      new QueueAclsInfo[queueAclsInfolist.size()]);
  }

  /**
   * ONLY FOR TESTING - Do not use in production code.
   * This method is used for setting up of leafQueues only.
   * We are not setting the hierarchy here.
   *
   * @param queues
   */
  synchronized void setQueues(Queue[] queues) {
    root.getChildren().clear();
    leafQueues.clear();
    allQueues.clear();

    for (Queue queue : queues) {
      root.addChild(queue);
    }
    //At this point we have root populated
    //update data structures leafNodes.
    leafQueues = getRoot().getLeafQueues();
    allQueues.putAll(getRoot().getInnerQueues());
    allQueues.putAll(leafQueues);
  }

  /**
   * Return an array of {@link JobQueueInfo} objects for the root
   * queues configured in the system.
   * <p/>
   * Root queues are queues that are at the top-most level in the
   * hierarchy of queues in mapred-queues.xml, or they are the queues
   * configured in the mapred.queue.names key in mapred-site.xml.
   *
   * @return array of JobQueueInfo objects for root level queues.
   */

  JobQueueInfo[] getRootQueues() {
    List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
    return list.toArray(new JobQueueInfo[list.size()]);
  }

  /**
   * Get the complete hierarchy of children for queue
   * queueName
   *
   * @param queueName
   * @return
   */
  JobQueueInfo[] getChildQueues(String queueName) {
    List<JobQueueInfo> list =
      allQueues.get(queueName).getJobQueueInfo().getChildren();
    if (list != null) {
      return list.toArray(new JobQueueInfo[list.size()]);
    } else {
      return new JobQueueInfo[0];
    }
  }

  /**
   * Used only for testing purposes .
   * This method is unstable as refreshQueues would leave this
   * data structure in unstable state.
   *
   * @param queueName
   * @return
   */
  Queue getQueue(String queueName) {
    return this.allQueues.get(queueName);
  }


  /**
   * Return if ACLs are enabled for the Map/Reduce system
   *
   * @return true if ACLs are enabled.
   */
  boolean areAclsEnabled() {
    return areAclsEnabled;
  }

  /**
   * Used only for test.
   *
   * @return
   */
  Queue getRoot() {
    return root;
  }

  /**
   * Returns the specific queue ACL for the given queue.
   * Returns null if the given queue does not exist or the acl is not
   * configured for that queue.
   * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
   * ACL with all users.
   */
  synchronized AccessControlList getQueueACL(String queueName,
      QueueACL qACL) {
    if (areAclsEnabled) {
      Queue q = leafQueues.get(queueName);
      if (q != null) {
        return q.getAcls().get(toFullPropertyName(
          queueName, qACL.getAclName()));
      }
      else {
        LOG.warn("Queue " + queueName + " is not present.");
        return null;
      }
    }
    return new AccessControlList("*");
  }

  /**
   * Dumps the configuration of hierarchy of queues
   * @param out the writer object to which dump is written
   * @throws IOException
   */
  static void dumpConfiguration(Writer out,Configuration conf) throws IOException {
    dumpConfiguration(out, null,conf);
  }
  
  /***
   * Dumps the configuration of hierarchy of queues with 
   * the xml file path given. It is to be used directly ONLY FOR TESTING.
   * @param out the writer object to which dump is written to.
   * @param configFile the filename of xml file
   * @throws IOException
   */
  static void dumpConfiguration(Writer out, String configFile,
      Configuration conf) throws IOException {
    if (conf != null && conf.get(DeprecatedQueueConfigurationParser.
        MAPRED_QUEUE_NAMES_KEY) != null) {
      return;
    }
    
    JsonFactory dumpFactory = new JsonFactory();
    JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
    QueueConfigurationParser parser;
    boolean aclsEnabled = false;
    if (conf != null) {
      aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
    }
    if (configFile != null && !"".equals(configFile)) {
      parser = new QueueConfigurationParser(configFile, aclsEnabled);
    }
    else {
      parser = getQueueConfigurationParser(null, false, aclsEnabled);
    }
    dumpGenerator.writeStartObject();
    dumpGenerator.writeFieldName("queues");
    dumpGenerator.writeStartArray();
    dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
    dumpGenerator.writeEndArray();
    dumpGenerator.writeEndObject();
    dumpGenerator.flush();
  }

  /**
   * method to perform depth-first search and write the parameters of every 
   * queue in JSON format.
   * @param dumpGenerator JsonGenerator object which takes the dump and flushes
   *  to a writer object
   * @param rootQueues the top-level queues
   * @throws JsonGenerationException
   * @throws IOException
   */
  private static void dumpConfiguration(JsonGenerator dumpGenerator,
      Set<Queue> rootQueues) throws JsonGenerationException, IOException {
    for (Queue queue : rootQueues) {
      dumpGenerator.writeStartObject();
      dumpGenerator.writeStringField("name", queue.getName());
      dumpGenerator.writeStringField("state", queue.getState().toString());
      AccessControlList submitJobList = null;
      AccessControlList administerJobsList = null;
      if (queue.getAcls() != null) {
        submitJobList =
          queue.getAcls().get(toFullPropertyName(queue.getName(),
              QueueACL.SUBMIT_JOB.getAclName()));
        administerJobsList =
          queue.getAcls().get(toFullPropertyName(queue.getName(),
              QueueACL.ADMINISTER_JOBS.getAclName()));
      }
      String aclsSubmitJobValue = " ";
      if (submitJobList != null ) {
        aclsSubmitJobValue = submitJobList.getAclString();
      }
      dumpGenerator.writeStringField("acl_submit_job", aclsSubmitJobValue);
      String aclsAdministerValue = " ";
      if (administerJobsList != null) {
        aclsAdministerValue = administerJobsList.getAclString();
      }
      dumpGenerator.writeStringField("acl_administer_jobs",
          aclsAdministerValue);
      dumpGenerator.writeFieldName("properties");
      dumpGenerator.writeStartArray();
      if (queue.getProperties() != null) {
        for (Map.Entry<Object, Object>property :
          queue.getProperties().entrySet()) {
          dumpGenerator.writeStartObject();
          dumpGenerator.writeStringField("key", (String)property.getKey());
          dumpGenerator.writeStringField("value", (String)property.getValue());
          dumpGenerator.writeEndObject();
        }
      }
      dumpGenerator.writeEndArray();
      Set<Queue> childQueues = queue.getChildren();
      dumpGenerator.writeFieldName("children");
      dumpGenerator.writeStartArray();
      if (childQueues != null && childQueues.size() > 0) {
        dumpConfiguration(dumpGenerator, childQueues);
      }
      dumpGenerator.writeEndArray();
      dumpGenerator.writeEndObject();
    }
  }

}
