/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tez.client;

import java.io.IOException;
import java.text.NumberFormat;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.ServiceException;

/**
 * TezClient is used to submit Tez DAGs for execution. DAG's are executed via a
 * Tez App Master. TezClient can run the App Master in session or non-session
 * mode. <br>
 * In non-session mode, each DAG is executed in a different App Master that
 * exits after the DAG execution completes. <br>
 * In session mode, the TezClient creates a single instance of the App Master
 * and all DAG's are submitted to the same App Master.<br>
 * Session mode may give better performance when a series of DAGs need to
 * executed because it enables resource re-use across those DAGs. Non-session
 * mode should be used when the user wants to submit a single DAG or wants to
 * disconnect from the cluster after submitting a set of unrelated DAGs. <br>
 * If API recommendations are followed, then the choice of running in session or
 * non-session mode is transparent to writing the application. By changing the
 * session mode configuration, the same application can be running in session or
 * non-session mode.
 */
@Public
public class TezClient {

  private static final Log LOG = LogFactory.getLog(TezClient.class);

  private final String clientName;
  private ApplicationId sessionAppId;
  private ApplicationId lastSubmittedAppId;
  private AMConfiguration amConfig;
  private FrameworkClient frameworkClient;
  private boolean isSession;
  private boolean sessionStarted = false;
  private boolean sessionStopped = false;
  /** Tokens which will be required for all DAGs submitted to this session. */
  private Credentials sessionCredentials = new Credentials();
  private long clientTimeout;
  Map<String, LocalResource> cachedTezJarResources;
  private static final long SLEEP_FOR_READY = 500;
  private JobTokenSecretManager jobTokenSecretManager =
      new JobTokenSecretManager();
  private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
  
  private int preWarmDAGCounter = 0;


  private TezClient(String name, TezConfiguration tezConf) {
    this(name, tezConf, tezConf.getBoolean(
        TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
  }

  @Private
  TezClient(String name, TezConfiguration tezConf,
            @Nullable Map<String, LocalResource> localResources,
            @Nullable Credentials credentials) {
    this(name, tezConf, tezConf.getBoolean(
        TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT),
        localResources, credentials);
  }

  private TezClient(String name, TezConfiguration tezConf, boolean isSession) {
    this(name, tezConf, isSession, null, null);
  }

  @Private
  TezClient(String name, TezConfiguration tezConf, boolean isSession,
            @Nullable Map<String, LocalResource> localResources,
            @Nullable Credentials credentials) {
    this.clientName = name;
    this.isSession = isSession;
    // Set in conf for local mode AM to figure out whether in session mode or not
    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
    this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
  }

  /**
   * Create a new TezClient. Session or non-session execution mode will be
   * inferred from configuration.
   * @param name
   *          Name of the client. Used for logging etc. This will also be used
   *          as app master name is session mode
   * @param tezConf
   *          Configuration for the framework
   */
  public static TezClient create(String name, TezConfiguration tezConf) {
    return new TezClient(name, tezConf);
  }

  /**
   * Create a new TezClient. Session or non-session execution mode will be
   * inferred from configuration. Set the initial resources and security
   * credentials for the App Master. If app master resources/credentials are
   * needed then this is the recommended method for session mode execution.
   *
   * @param name
   *          Name of the client. Used for logging etc. This will also be used
   *          as app master name is session mode
   * @param tezConf
   *          Configuration for the framework
   * @param localFiles
   *          local files for the App Master
   * @param credentials
   *          Set security credentials to be used inside the app master, if
   *          needed. Tez App Master needs credentials to access the staging
   *          directory and for most HDFS cases these are automatically obtained
   *          by Tez client. If the staging directory is on a file system for
   *          which credentials cannot be obtained or for any credentials needed
   *          by user code running inside the App Master, credentials must be
   *          supplied by the user. These will be used by the App Master for the
   *          next DAG. <br>
   *          In session mode, credentials, if needed, must be set before
   *          calling start()
   */
  public static TezClient create(String name, TezConfiguration tezConf,
                                 @Nullable Map<String, LocalResource> localFiles,
                                 @Nullable Credentials credentials) {
    return new TezClient(name, tezConf, localFiles, credentials);
  }

  /**
   * Create a new TezClient with AM session mode set explicitly. This overrides
   * the setting from configuration.
   * @param name
   *          Name of the client. Used for logging etc. This will also be used
   *          as app master name is session mode
   * @param tezConf Configuration for the framework
   * @param isSession The AM will run in session mode or not
   */
  public static TezClient create(String name, TezConfiguration tezConf, boolean isSession) {
    return new TezClient(name, tezConf, isSession);
  }

  /**
   * Create a new TezClient with AM session mode set explicitly. This overrides
   * the setting from configuration.
   * Set the initial files and security credentials for the App Master.
   * @param name
   *          Name of the client. Used for logging etc. This will also be used
   *          as app master name is session mode
   * @param tezConf Configuration for the framework
   * @param isSession The AM will run in session mode or not
   * @param localFiles local files for the App Master
   * @param credentials credentials for the App Master
   */
  public static TezClient create(String name, TezConfiguration tezConf, boolean isSession,
                                 @Nullable Map<String, LocalResource> localFiles,
                                 @Nullable Credentials credentials) {
    return new TezClient(name, tezConf, isSession, localFiles, credentials);
  }

  /**
   * Add local files for the DAG App Master. These may be files, archives, 
   * jars etc.<br>
   * <p>
   * In non-session mode these will be added to the files of the App Master
   * to be launched for the next DAG. Files added via this method will
   * accumulate and be used for every new App Master until
   * {@link #clearAppMasterLocalFiles()} is invoked. <br>
   * <p>
   * In session mode, the recommended usage is to add all files before
   * calling start() so that all needed files are available to the app
   * master before it starts. When called after start(), these local files
   * will be re-localized to the running session DAG App Master and will be
   * added to its classpath for execution of this DAG.
   * <p>
   * Caveats for invoking this method after start() in session mode: files
   * accumulate across DAG submissions and are never removed from the classpath.
   * Only LocalResourceType.FILE is supported. All files will be treated as
   * private.
   * 
   * @param localFiles
   */
  public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
    Preconditions.checkNotNull(localFiles);
    if (isSession && sessionStarted) {
      additionalLocalResources.putAll(localFiles);
    }
    amConfig.addAMLocalResources(localFiles);
  }
  
  /**
   * If the next DAG App Master needs different local files, then use this
   * method to clear the local files and then add the new local files
   * using {@link #addAppMasterLocalFiles(Map)}. This method is a no-op in session mode,
   * after start() is called.
   */
  public synchronized void clearAppMasterLocalFiles() {
    amConfig.clearAMLocalResources();
  }
  
  /**
   * Set security credentials to be used inside the app master, if needed. Tez App
   * Master needs credentials to access the staging directory and for most HDFS
   * cases these are automatically obtained by Tez client. If the staging
   * directory is on a file system for which credentials cannot be obtained or
   * for any credentials needed by user code running inside the App Master,
   * credentials must be supplied by the user. These will be used by the App
   * Master for the next DAG. <br>In session mode, credentials, if needed, must be
   * set before calling start()
   * 
   * @param credentials
   */
  public synchronized void setAppMasterCredentials(Credentials credentials) {
    Preconditions
        .checkState(!sessionStarted,
            "Credentials cannot be set after the session App Master has been started");
    amConfig.setCredentials(credentials);
  }
  
  /**
   * Start the client. This establishes a connection to the YARN cluster.
   * In session mode, this start the App Master thats runs all the DAGs in the
   * session.
   * @throws TezException
   * @throws IOException
   */
  public synchronized void start() throws TezException, IOException {
    amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration()));

    frameworkClient = createFrameworkClient();
    frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration());
    frameworkClient.start();    

    if (isSession) {
      LOG.info("Session mode. Starting session.");
      TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
          amConfig.getTezConfiguration());
  
      Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
  
      clientTimeout = amConfig.getTezConfiguration().getInt(
          TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
          TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
  
      try {
        if (sessionAppId == null) {
          sessionAppId = createApplication();
        }
  
        // Add session token for shuffle
        TezClientUtils.createSessionToken(sessionAppId.toString(),
            jobTokenSecretManager, sessionCredentials);
  
        ApplicationSubmissionContext appContext =
            TezClientUtils.createApplicationSubmissionContext(
                sessionAppId,
                null, clientName, amConfig,
                tezJarResources, sessionCredentials);
  
        // Set Tez Sessions to not retry on AM crashes if recovery is disabled
        if (!amConfig.getTezConfiguration().getBoolean(
            TezConfiguration.DAG_RECOVERY_ENABLED,
            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
          appContext.setMaxAppAttempts(1);
        }  
        frameworkClient.submitApplication(appContext);
        ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
        LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl());
        sessionStarted = true;
      } catch (YarnException e) {
        throw new TezException(e);
      }
    }
  }
  
  /**
   * Submit a DAG. <br>In non-session mode, it submits a new App Master to the
   * cluster.<br>In session mode, it submits the DAG to the session App Master. It
   * blocks until either the DAG is submitted to the session or configured
   * timeout period expires. Cleans up session if the submission timed out.
   * 
   * @param dag
   *          DAG to be submitted to Session
   * @return DAGClient to monitor the DAG
   * @throws TezException
   * @throws IOException
   * @throws DAGSubmissionTimedOut
   *           if submission timed out
   */  
  public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
    if (isSession) {
      return submitDAGSession(dag);
    } else {
      return submitDAGApplication(dag);
    }
  }

  private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
    Preconditions.checkState(isSession == true, 
        "submitDAG with additional resources applies to only session mode. " + 
        "In non-session mode please specify all resources in the initial configuration");
    
    verifySessionStateForSubmission();

    String dagId = null;
    LOG.info("Submitting dag to TezSession"
      + ", sessionName=" + clientName
      + ", applicationId=" + sessionAppId
      + ", dagName=" + dag.getName());
    
    if (!additionalLocalResources.isEmpty()) {
      for (LocalResource lr : additionalLocalResources.values()) {
        Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
            + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
      }
    }
    
    Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
    DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
        TezClientUtils.usingTezLibsFromArchive(tezJarResources), sessionCredentials);

    SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
    requestBuilder.setDAGPlan(dagPlan).build();
    if (!additionalLocalResources.isEmpty()) {
      requestBuilder.setAdditionalAmResources(DagTypeConverters
          .convertFromLocalResources(additionalLocalResources));
    }
    
    additionalLocalResources.clear();

    DAGClientAMProtocolBlockingPB proxy = null;
    try {
      proxy = waitForProxy();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while trying to create a connection to the AM", e);
    }
    if (proxy == null) {
      try {
        LOG.warn("DAG submission to session timed out, stopping session");
        stop();
      } catch (Throwable t) {
        LOG.info("Got an exception when trying to stop session", t);
      }
      throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
          + ", timed out after " + clientTimeout + " seconds");
    }

    try {
      SubmitDAGResponseProto response = proxy.submitDAG(null, requestBuilder.build());
      // the following check is only for testing since the final class
      // SubmitDAGResponseProto cannot be mocked
      if (response != null) {
        dagId = response.getDagId();
      }
    } catch (ServiceException e) {
      throw new TezException(e);
    }
    LOG.info("Submitted dag to TezSession"
        + ", sessionName=" + clientName
        + ", applicationId=" + sessionAppId
        + ", dagName=" + dag.getName());
    return new DAGClientImpl(sessionAppId, dagId,
        amConfig.getTezConfiguration(), frameworkClient);
  }

  /**
   * Stop the client. This terminates the connection to the YARN cluster.
   * In session mode, this shuts down the session DAG App Master
   * @throws TezException
   * @throws IOException
   */
  public synchronized void stop() throws TezException, IOException {
    try {
      if (sessionStarted) {
        LOG.info("Shutting down Tez Session"
            + ", sessionName=" + clientName
            + ", applicationId=" + sessionAppId);
        sessionStopped = true;
        boolean sessionShutdownSuccessful = false;
        try {
          DAGClientAMProtocolBlockingPB proxy = getSessionAMProxy(sessionAppId);
          if (proxy != null) {
            ShutdownSessionRequestProto request =
                ShutdownSessionRequestProto.newBuilder().build();
            proxy.shutdownSession(null, request);
            sessionShutdownSuccessful = true;
          }
        } catch (TezException e) {
          LOG.info("Failed to shutdown Tez Session via proxy", e);
        } catch (ServiceException e) {
          LOG.info("Failed to shutdown Tez Session via proxy", e);
        }
        if (!sessionShutdownSuccessful) {
          LOG.info("Could not connect to AM, killing session via YARN"
              + ", sessionName=" + clientName
              + ", applicationId=" + sessionAppId);
          try {
            frameworkClient.killApplication(sessionAppId);
          } catch (YarnException e) {
            throw new TezException(e);
          }
        }
      }
    } finally {
      if (frameworkClient != null) {
        frameworkClient.close();
      }
    }
  }

  /**
   * Get the name of the client
   * @return name
   */
  public String getClientName() {
    return clientName;
  }
  
  @Private
  @VisibleForTesting
  public synchronized ApplicationId getAppMasterApplicationId() {
    if (isSession) {
      return sessionAppId;
    } else {
      return lastSubmittedAppId;
    }
  }

  /**
   * Get the status of the App Master executing the DAG
   * In non-session mode it returns the status of the last submitted DAG App Master 
   * In session mode, it returns the status of the App Master hosting the session
   * 
   * @return State of the session
   * @throws TezException
   * @throws IOException
   */
  public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException, IOException {
    // Supporting per-DAG app master case since user may choose to run the same 
    // code in that mode and the code should continue to work. Its easy to provide 
    // the correct view for per-DAG app master too.
    ApplicationId appId = null;
    if (isSession) {
      appId = sessionAppId;
    } else {
      appId = lastSubmittedAppId;
    }
    Preconditions.checkState(appId != null, "Cannot get status without starting an application");
    try {
      ApplicationReport appReport = frameworkClient.getApplicationReport(
          appId);
      switch (appReport.getYarnApplicationState()) {
      case NEW:
      case NEW_SAVING:
      case ACCEPTED:
      case SUBMITTED:
        return TezAppMasterStatus.INITIALIZING;
      case FINISHED:
      case FAILED:
      case KILLED:
        return TezAppMasterStatus.SHUTDOWN;
      case RUNNING:
        if (!isSession) {
          return TezAppMasterStatus.RUNNING;
        }
        try {
          DAGClientAMProtocolBlockingPB proxy = getSessionAMProxy(appId);
          if (proxy == null) {
            return TezAppMasterStatus.INITIALIZING;
          }
          GetAMStatusResponseProto response = proxy.getAMStatus(null,
              GetAMStatusRequestProto.newBuilder().build());
          return DagTypeConverters.convertTezSessionStatusFromProto(
              response.getStatus());
        } catch (TezException e) {
          LOG.info("Failed to retrieve AM Status via proxy", e);
        } catch (ServiceException e) {
          LOG.info("Failed to retrieve AM Status via proxy", e);
        }
      }
    } catch (YarnException e) {
      throw new TezException(e);
    }
    return TezAppMasterStatus.INITIALIZING;
  }
  
  /**
   * API to help pre-allocate containers in session mode. In non-session mode
   * this is ignored. The pre-allocated containers may be re-used by subsequent 
   * job DAGs to improve performance. 
   * The preWarm vertex should be configured and setup exactly
   * like the other vertices in the job DAGs so that the pre-allocated containers 
   * may be re-used by the subsequent DAGs to improve performance.
   * The processor for the preWarmVertex may be used to pre-warm the containers
   * by pre-loading classes etc. It should be short-running so that pre-warming 
   * does not block real execution. Users can specify their custom processors or
   * use the PreWarmProcessor from the runtime library.
   * The parallelism of the preWarmVertex will determine the number of preWarmed
   * containers.
   * Pre-warming is best efforts and among other factors is limited by the free 
   * resources on the cluster.
   * @param preWarmVertex
   * @throws TezException
   * @throws IOException
   */
  @Unstable
  public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException {
    if (!isSession) {
      // do nothing for non session mode. This is there to let the code 
      // work correctly in both modes
      return;
    }
    
    verifySessionStateForSubmission();
    
    DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_"
        + preWarmDAGCounter++);
    dag.addVertex(preWarmVertex);

    try {
      waitTillReady();
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while waiting for AM to become available", e);
    }
    submitDAG(dag);
  }

  
  /**
   * Wait till the DAG is ready to be submitted.
   * In non-session mode this is a no-op since the application can be immediately
   * submitted.
   * In session mode, this waits for the session host to be ready to accept a DAG
   * @throws IOException
   * @throws TezException
   * @throws InterruptedException 
   */
  @Evolving
  public synchronized void waitTillReady() throws IOException, TezException, InterruptedException {
    if (!isSession) {
      // nothing to wait for in non-session mode
      return;
    }
    verifySessionStateForSubmission();
    while (true) {
      TezAppMasterStatus status = getAppMasterStatus();
      if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
        throw new SessionNotRunning("TezSession has already shutdown");
      }
      if (status.equals(TezAppMasterStatus.READY)) {
        return;
      }
      Thread.sleep(SLEEP_FOR_READY);
    }
  }
  
  @VisibleForTesting
  // for testing
  @Private
  protected FrameworkClient createFrameworkClient() {
    return FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration());
  }

  @VisibleForTesting
  // for testing
  protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) 
      throws TezException, IOException {
    return TezClientUtils.getSessionAMProxy(
        frameworkClient, amConfig.getYarnConfiguration(), appId);
  }

  private DAGClientAMProtocolBlockingPB waitForProxy()
      throws IOException, TezException, InterruptedException {
    long startTime = System.currentTimeMillis();
    long endTime = startTime + (clientTimeout * 1000);
    DAGClientAMProtocolBlockingPB proxy = null;
    while (true) {
      proxy = getSessionAMProxy(sessionAppId);
      if (proxy != null) {
        break;
      }
      Thread.sleep(100l);
      if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
        break;
      }
    }
    return proxy;
  }

  private void verifySessionStateForSubmission() throws SessionNotRunning {
    Preconditions.checkState(isSession, "Invalid without session mode");
    if (!sessionStarted) {
      throw new SessionNotRunning("Session not started");
    } else if (sessionStopped) {
      throw new SessionNotRunning("Session stopped");
    }
  }
  
  private DAGClient submitDAGApplication(DAG dag)
      throws TezException, IOException {
    ApplicationId appId = createApplication();
    return submitDAGApplication(appId, dag);
  }

  @Private // To be used only by YarnRunner
  DAGClient submitDAGApplication(ApplicationId appId, DAG dag)
          throws TezException, IOException {
    LOG.info("Submitting DAG application with id: " + appId);
    try {
      // Use the AMCredentials object in client mode, since this won't be re-used.
      // Ensures we don't fetch credentially unnecessarily if the user has already provided them.
      Credentials credentials = amConfig.getCredentials();
      if (credentials == null) {
        credentials = new Credentials();
      }
      TezClientUtils.processTezLocalCredentialsFile(credentials,
          amConfig.getTezConfiguration());

      // Add session token for shuffle
      TezClientUtils.createSessionToken(appId.toString(),
          jobTokenSecretManager, credentials);

      // Add credentials for tez-local resources.
      Map<String, LocalResource> tezJarResources = getTezJarResources(credentials);
      ApplicationSubmissionContext appContext = TezClientUtils
          .createApplicationSubmissionContext( 
              appId, dag, dag.getName(), amConfig, tezJarResources, credentials);
      LOG.info("Submitting DAG to YARN"
          + ", applicationId=" + appId
          + ", dagName=" + dag.getName());
      
      frameworkClient.submitApplication(appContext);
      ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
      LOG.info("The url to track the Tez AM: " + appReport.getTrackingUrl());
      lastSubmittedAppId = appId;
    } catch (YarnException e) {
      throw new TezException(e);
    }
    return getDAGClient(appId, amConfig.getTezConfiguration(), frameworkClient);
  }

  private ApplicationId createApplication() throws TezException, IOException {
    try {
      return frameworkClient.createApplication().
          getNewApplicationResponse().getApplicationId();
    } catch (YarnException e) {
      throw new TezException(e);
    }
  }

  private synchronized Map<String, LocalResource> getTezJarResources(Credentials credentials)
      throws IOException {
    if (cachedTezJarResources == null) {
      cachedTezJarResources = TezClientUtils.setupTezJarsLocalResources(
          amConfig.getTezConfiguration(), credentials);
    }
    return cachedTezJarResources;
  }

  @Private // Used only for MapReduce compatibility code
  static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
                                FrameworkClient frameworkClient)
      throws IOException, TezException {
    return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
  }

  // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
  private static final char SEPARATOR = '_';
  public static final String DAG = "dag";
  static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() {
    @Override
    public NumberFormat initialValue() {
      NumberFormat fmt = NumberFormat.getInstance();
      fmt.setGroupingUsed(false);
      fmt.setMinimumIntegerDigits(4);
      return fmt;
    }
  };

  static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() {
    @Override
    public NumberFormat initialValue() {
      NumberFormat fmt = NumberFormat.getInstance();
      fmt.setGroupingUsed(false);
      fmt.setMinimumIntegerDigits(1);
      return fmt;
    }
  };

  // Used only for MapReduce compatibility code
  private static String getDefaultTezDAGID(ApplicationId applicationId) {
     return (new StringBuilder(DAG)).append(SEPARATOR).
         append(applicationId.getClusterTimestamp()).
         append(SEPARATOR).
         append(tezAppIdFormat.get().format(applicationId.getId())).
         append(SEPARATOR).
         append(tezDagIdFormat.get().format(1)).toString();
  }
}
