| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.client; |
| |
| import java.io.IOException; |
| import java.text.NumberFormat; |
| import java.util.Map; |
| |
| import javax.annotation.Nullable; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceAudience.Public; |
| import org.apache.hadoop.classification.InterfaceStability.Evolving; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.tez.common.security.JobTokenSecretManager; |
| import org.apache.tez.dag.api.DAG; |
| import org.apache.tez.dag.api.DAGSubmissionTimedOut; |
| import org.apache.tez.dag.api.DagTypeConverters; |
| import org.apache.tez.dag.api.PreWarmVertex; |
| import org.apache.tez.dag.api.SessionNotRunning; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezConstants; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.client.DAGClient; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; |
| import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto; |
| import org.apache.tez.dag.api.client.DAGClientImpl; |
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Maps; |
| import com.google.protobuf.ServiceException; |
| |
| /** |
| * TezClient is used to submit Tez DAGs for execution. DAG's are executed via a |
| * Tez App Master. TezClient can run the App Master in session or non-session |
| * mode. <br> |
| * In non-session mode, each DAG is executed in a different App Master that |
| * exits after the DAG execution completes. <br> |
| * In session mode, the TezClient creates a single instance of the App Master |
| * and all DAG's are submitted to the same App Master.<br> |
| * Session mode may give better performance when a series of DAGs need to |
| * executed because it enables resource re-use across those DAGs. Non-session |
| * mode should be used when the user wants to submit a single DAG or wants to |
| * disconnect from the cluster after submitting a set of unrelated DAGs. <br> |
| * If API recommendations are followed, then the choice of running in session or |
| * non-session mode is transparent to writing the application. By changing the |
| * session mode configuration, the same application can be running in session or |
| * non-session mode. |
| */ |
| @Public |
| public class TezClient { |
| |
| private static final Log LOG = LogFactory.getLog(TezClient.class); |
| |
| private final String clientName; |
| private ApplicationId sessionAppId; |
| private ApplicationId lastSubmittedAppId; |
| private AMConfiguration amConfig; |
| private FrameworkClient frameworkClient; |
| private boolean isSession; |
| private boolean sessionStarted = false; |
| private boolean sessionStopped = false; |
| /** Tokens which will be required for all DAGs submitted to this session. */ |
| private Credentials sessionCredentials = new Credentials(); |
| private long clientTimeout; |
| Map<String, LocalResource> cachedTezJarResources; |
| private static final long SLEEP_FOR_READY = 500; |
| private JobTokenSecretManager jobTokenSecretManager = |
| new JobTokenSecretManager(); |
| private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap(); |
| |
| private int preWarmDAGCounter = 0; |
| |
| |
| private TezClient(String name, TezConfiguration tezConf) { |
| this(name, tezConf, tezConf.getBoolean( |
| TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT)); |
| } |
| |
| @Private |
| TezClient(String name, TezConfiguration tezConf, |
| @Nullable Map<String, LocalResource> localResources, |
| @Nullable Credentials credentials) { |
| this(name, tezConf, tezConf.getBoolean( |
| TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT), |
| localResources, credentials); |
| } |
| |
| private TezClient(String name, TezConfiguration tezConf, boolean isSession) { |
| this(name, tezConf, isSession, null, null); |
| } |
| |
| @Private |
| TezClient(String name, TezConfiguration tezConf, boolean isSession, |
| @Nullable Map<String, LocalResource> localResources, |
| @Nullable Credentials credentials) { |
| this.clientName = name; |
| this.isSession = isSession; |
| // Set in conf for local mode AM to figure out whether in session mode or not |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession); |
| this.amConfig = new AMConfiguration(tezConf, localResources, credentials); |
| } |
| |
| /** |
| * Create a new TezClient. Session or non-session execution mode will be |
| * inferred from configuration. |
| * @param name |
| * Name of the client. Used for logging etc. This will also be used |
| * as app master name is session mode |
| * @param tezConf |
| * Configuration for the framework |
| */ |
| public static TezClient create(String name, TezConfiguration tezConf) { |
| return new TezClient(name, tezConf); |
| } |
| |
| /** |
| * Create a new TezClient. Session or non-session execution mode will be |
| * inferred from configuration. Set the initial resources and security |
| * credentials for the App Master. If app master resources/credentials are |
| * needed then this is the recommended method for session mode execution. |
| * |
| * @param name |
| * Name of the client. Used for logging etc. This will also be used |
| * as app master name is session mode |
| * @param tezConf |
| * Configuration for the framework |
| * @param localFiles |
| * local files for the App Master |
| * @param credentials |
| * Set security credentials to be used inside the app master, if |
| * needed. Tez App Master needs credentials to access the staging |
| * directory and for most HDFS cases these are automatically obtained |
| * by Tez client. If the staging directory is on a file system for |
| * which credentials cannot be obtained or for any credentials needed |
| * by user code running inside the App Master, credentials must be |
| * supplied by the user. These will be used by the App Master for the |
| * next DAG. <br> |
| * In session mode, credentials, if needed, must be set before |
| * calling start() |
| */ |
| public static TezClient create(String name, TezConfiguration tezConf, |
| @Nullable Map<String, LocalResource> localFiles, |
| @Nullable Credentials credentials) { |
| return new TezClient(name, tezConf, localFiles, credentials); |
| } |
| |
| /** |
| * Create a new TezClient with AM session mode set explicitly. This overrides |
| * the setting from configuration. |
| * @param name |
| * Name of the client. Used for logging etc. This will also be used |
| * as app master name is session mode |
| * @param tezConf Configuration for the framework |
| * @param isSession The AM will run in session mode or not |
| */ |
| public static TezClient create(String name, TezConfiguration tezConf, boolean isSession) { |
| return new TezClient(name, tezConf, isSession); |
| } |
| |
| /** |
| * Create a new TezClient with AM session mode set explicitly. This overrides |
| * the setting from configuration. |
| * Set the initial files and security credentials for the App Master. |
| * @param name |
| * Name of the client. Used for logging etc. This will also be used |
| * as app master name is session mode |
| * @param tezConf Configuration for the framework |
| * @param isSession The AM will run in session mode or not |
| * @param localFiles local files for the App Master |
| * @param credentials credentials for the App Master |
| */ |
| public static TezClient create(String name, TezConfiguration tezConf, boolean isSession, |
| @Nullable Map<String, LocalResource> localFiles, |
| @Nullable Credentials credentials) { |
| return new TezClient(name, tezConf, isSession, localFiles, credentials); |
| } |
| |
| /** |
| * Add local files for the DAG App Master. These may be files, archives, |
| * jars etc.<br> |
| * <p> |
| * In non-session mode these will be added to the files of the App Master |
| * to be launched for the next DAG. Files added via this method will |
| * accumulate and be used for every new App Master until |
| * {@link #clearAppMasterLocalFiles()} is invoked. <br> |
| * <p> |
| * In session mode, the recommended usage is to add all files before |
| * calling start() so that all needed files are available to the app |
| * master before it starts. When called after start(), these local files |
| * will be re-localized to the running session DAG App Master and will be |
| * added to its classpath for execution of this DAG. |
| * <p> |
| * Caveats for invoking this method after start() in session mode: files |
| * accumulate across DAG submissions and are never removed from the classpath. |
| * Only LocalResourceType.FILE is supported. All files will be treated as |
| * private. |
| * |
| * @param localFiles |
| */ |
| public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) { |
| Preconditions.checkNotNull(localFiles); |
| if (isSession && sessionStarted) { |
| additionalLocalResources.putAll(localFiles); |
| } |
| amConfig.addAMLocalResources(localFiles); |
| } |
| |
| /** |
| * If the next DAG App Master needs different local files, then use this |
| * method to clear the local files and then add the new local files |
| * using {@link #addAppMasterLocalFiles(Map)}. This method is a no-op in session mode, |
| * after start() is called. |
| */ |
| public synchronized void clearAppMasterLocalFiles() { |
| amConfig.clearAMLocalResources(); |
| } |
| |
| /** |
| * Set security credentials to be used inside the app master, if needed. Tez App |
| * Master needs credentials to access the staging directory and for most HDFS |
| * cases these are automatically obtained by Tez client. If the staging |
| * directory is on a file system for which credentials cannot be obtained or |
| * for any credentials needed by user code running inside the App Master, |
| * credentials must be supplied by the user. These will be used by the App |
| * Master for the next DAG. <br>In session mode, credentials, if needed, must be |
| * set before calling start() |
| * |
| * @param credentials |
| */ |
| public synchronized void setAppMasterCredentials(Credentials credentials) { |
| Preconditions |
| .checkState(!sessionStarted, |
| "Credentials cannot be set after the session App Master has been started"); |
| amConfig.setCredentials(credentials); |
| } |
| |
| /** |
| * Start the client. This establishes a connection to the YARN cluster. |
| * In session mode, this start the App Master thats runs all the DAGs in the |
| * session. |
| * @throws TezException |
| * @throws IOException |
| */ |
| public synchronized void start() throws TezException, IOException { |
| amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration())); |
| |
| frameworkClient = createFrameworkClient(); |
| frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration()); |
| frameworkClient.start(); |
| |
| if (isSession) { |
| LOG.info("Session mode. Starting session."); |
| TezClientUtils.processTezLocalCredentialsFile(sessionCredentials, |
| amConfig.getTezConfiguration()); |
| |
| Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials); |
| |
| clientTimeout = amConfig.getTezConfiguration().getInt( |
| TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS, |
| TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT); |
| |
| try { |
| if (sessionAppId == null) { |
| sessionAppId = createApplication(); |
| } |
| |
| // Add session token for shuffle |
| TezClientUtils.createSessionToken(sessionAppId.toString(), |
| jobTokenSecretManager, sessionCredentials); |
| |
| ApplicationSubmissionContext appContext = |
| TezClientUtils.createApplicationSubmissionContext( |
| sessionAppId, |
| null, clientName, amConfig, |
| tezJarResources, sessionCredentials); |
| |
| // Set Tez Sessions to not retry on AM crashes if recovery is disabled |
| if (!amConfig.getTezConfiguration().getBoolean( |
| TezConfiguration.DAG_RECOVERY_ENABLED, |
| TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { |
| appContext.setMaxAppAttempts(1); |
| } |
| frameworkClient.submitApplication(appContext); |
| ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId); |
| LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl()); |
| sessionStarted = true; |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| } |
| } |
| |
| /** |
| * Submit a DAG. <br>In non-session mode, it submits a new App Master to the |
| * cluster.<br>In session mode, it submits the DAG to the session App Master. It |
| * blocks until either the DAG is submitted to the session or configured |
| * timeout period expires. Cleans up session if the submission timed out. |
| * |
| * @param dag |
| * DAG to be submitted to Session |
| * @return DAGClient to monitor the DAG |
| * @throws TezException |
| * @throws IOException |
| * @throws DAGSubmissionTimedOut |
| * if submission timed out |
| */ |
| public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException { |
| if (isSession) { |
| return submitDAGSession(dag); |
| } else { |
| return submitDAGApplication(dag); |
| } |
| } |
| |
| private DAGClient submitDAGSession(DAG dag) throws TezException, IOException { |
| Preconditions.checkState(isSession == true, |
| "submitDAG with additional resources applies to only session mode. " + |
| "In non-session mode please specify all resources in the initial configuration"); |
| |
| verifySessionStateForSubmission(); |
| |
| String dagId = null; |
| LOG.info("Submitting dag to TezSession" |
| + ", sessionName=" + clientName |
| + ", applicationId=" + sessionAppId |
| + ", dagName=" + dag.getName()); |
| |
| if (!additionalLocalResources.isEmpty()) { |
| for (LocalResource lr : additionalLocalResources.values()) { |
| Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: " |
| + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported"); |
| } |
| } |
| |
| Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials); |
| DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, |
| TezClientUtils.usingTezLibsFromArchive(tezJarResources), sessionCredentials); |
| |
| SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder(); |
| requestBuilder.setDAGPlan(dagPlan).build(); |
| if (!additionalLocalResources.isEmpty()) { |
| requestBuilder.setAdditionalAmResources(DagTypeConverters |
| .convertFromLocalResources(additionalLocalResources)); |
| } |
| |
| additionalLocalResources.clear(); |
| |
| DAGClientAMProtocolBlockingPB proxy = null; |
| try { |
| proxy = waitForProxy(); |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted while trying to create a connection to the AM", e); |
| } |
| if (proxy == null) { |
| try { |
| LOG.warn("DAG submission to session timed out, stopping session"); |
| stop(); |
| } catch (Throwable t) { |
| LOG.info("Got an exception when trying to stop session", t); |
| } |
| throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session" |
| + ", timed out after " + clientTimeout + " seconds"); |
| } |
| |
| try { |
| SubmitDAGResponseProto response = proxy.submitDAG(null, requestBuilder.build()); |
| // the following check is only for testing since the final class |
| // SubmitDAGResponseProto cannot be mocked |
| if (response != null) { |
| dagId = response.getDagId(); |
| } |
| } catch (ServiceException e) { |
| throw new TezException(e); |
| } |
| LOG.info("Submitted dag to TezSession" |
| + ", sessionName=" + clientName |
| + ", applicationId=" + sessionAppId |
| + ", dagName=" + dag.getName()); |
| return new DAGClientImpl(sessionAppId, dagId, |
| amConfig.getTezConfiguration(), frameworkClient); |
| } |
| |
| /** |
| * Stop the client. This terminates the connection to the YARN cluster. |
| * In session mode, this shuts down the session DAG App Master |
| * @throws TezException |
| * @throws IOException |
| */ |
| public synchronized void stop() throws TezException, IOException { |
| try { |
| if (sessionStarted) { |
| LOG.info("Shutting down Tez Session" |
| + ", sessionName=" + clientName |
| + ", applicationId=" + sessionAppId); |
| sessionStopped = true; |
| boolean sessionShutdownSuccessful = false; |
| try { |
| DAGClientAMProtocolBlockingPB proxy = getSessionAMProxy(sessionAppId); |
| if (proxy != null) { |
| ShutdownSessionRequestProto request = |
| ShutdownSessionRequestProto.newBuilder().build(); |
| proxy.shutdownSession(null, request); |
| sessionShutdownSuccessful = true; |
| } |
| } catch (TezException e) { |
| LOG.info("Failed to shutdown Tez Session via proxy", e); |
| } catch (ServiceException e) { |
| LOG.info("Failed to shutdown Tez Session via proxy", e); |
| } |
| if (!sessionShutdownSuccessful) { |
| LOG.info("Could not connect to AM, killing session via YARN" |
| + ", sessionName=" + clientName |
| + ", applicationId=" + sessionAppId); |
| try { |
| frameworkClient.killApplication(sessionAppId); |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| } |
| } |
| } finally { |
| if (frameworkClient != null) { |
| frameworkClient.close(); |
| } |
| } |
| } |
| |
| /** |
| * Get the name of the client |
| * @return name |
| */ |
| public String getClientName() { |
| return clientName; |
| } |
| |
| @Private |
| @VisibleForTesting |
| public synchronized ApplicationId getAppMasterApplicationId() { |
| if (isSession) { |
| return sessionAppId; |
| } else { |
| return lastSubmittedAppId; |
| } |
| } |
| |
| /** |
| * Get the status of the App Master executing the DAG |
| * In non-session mode it returns the status of the last submitted DAG App Master |
| * In session mode, it returns the status of the App Master hosting the session |
| * |
| * @return State of the session |
| * @throws TezException |
| * @throws IOException |
| */ |
| public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException, IOException { |
| // Supporting per-DAG app master case since user may choose to run the same |
| // code in that mode and the code should continue to work. Its easy to provide |
| // the correct view for per-DAG app master too. |
| ApplicationId appId = null; |
| if (isSession) { |
| appId = sessionAppId; |
| } else { |
| appId = lastSubmittedAppId; |
| } |
| Preconditions.checkState(appId != null, "Cannot get status without starting an application"); |
| try { |
| ApplicationReport appReport = frameworkClient.getApplicationReport( |
| appId); |
| switch (appReport.getYarnApplicationState()) { |
| case NEW: |
| case NEW_SAVING: |
| case ACCEPTED: |
| case SUBMITTED: |
| return TezAppMasterStatus.INITIALIZING; |
| case FINISHED: |
| case FAILED: |
| case KILLED: |
| return TezAppMasterStatus.SHUTDOWN; |
| case RUNNING: |
| if (!isSession) { |
| return TezAppMasterStatus.RUNNING; |
| } |
| try { |
| DAGClientAMProtocolBlockingPB proxy = getSessionAMProxy(appId); |
| if (proxy == null) { |
| return TezAppMasterStatus.INITIALIZING; |
| } |
| GetAMStatusResponseProto response = proxy.getAMStatus(null, |
| GetAMStatusRequestProto.newBuilder().build()); |
| return DagTypeConverters.convertTezSessionStatusFromProto( |
| response.getStatus()); |
| } catch (TezException e) { |
| LOG.info("Failed to retrieve AM Status via proxy", e); |
| } catch (ServiceException e) { |
| LOG.info("Failed to retrieve AM Status via proxy", e); |
| } |
| } |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| return TezAppMasterStatus.INITIALIZING; |
| } |
| |
| /** |
| * API to help pre-allocate containers in session mode. In non-session mode |
| * this is ignored. The pre-allocated containers may be re-used by subsequent |
| * job DAGs to improve performance. |
| * The preWarm vertex should be configured and setup exactly |
| * like the other vertices in the job DAGs so that the pre-allocated containers |
| * may be re-used by the subsequent DAGs to improve performance. |
| * The processor for the preWarmVertex may be used to pre-warm the containers |
| * by pre-loading classes etc. It should be short-running so that pre-warming |
| * does not block real execution. Users can specify their custom processors or |
| * use the PreWarmProcessor from the runtime library. |
| * The parallelism of the preWarmVertex will determine the number of preWarmed |
| * containers. |
| * Pre-warming is best efforts and among other factors is limited by the free |
| * resources on the cluster. |
| * @param preWarmVertex |
| * @throws TezException |
| * @throws IOException |
| */ |
| @Unstable |
| public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException { |
| if (!isSession) { |
| // do nothing for non session mode. This is there to let the code |
| // work correctly in both modes |
| return; |
| } |
| |
| verifySessionStateForSubmission(); |
| |
| DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_" |
| + preWarmDAGCounter++); |
| dag.addVertex(preWarmVertex); |
| |
| try { |
| waitTillReady(); |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted while waiting for AM to become available", e); |
| } |
| submitDAG(dag); |
| } |
| |
| |
| /** |
| * Wait till the DAG is ready to be submitted. |
| * In non-session mode this is a no-op since the application can be immediately |
| * submitted. |
| * In session mode, this waits for the session host to be ready to accept a DAG |
| * @throws IOException |
| * @throws TezException |
| * @throws InterruptedException |
| */ |
| @Evolving |
| public synchronized void waitTillReady() throws IOException, TezException, InterruptedException { |
| if (!isSession) { |
| // nothing to wait for in non-session mode |
| return; |
| } |
| verifySessionStateForSubmission(); |
| while (true) { |
| TezAppMasterStatus status = getAppMasterStatus(); |
| if (status.equals(TezAppMasterStatus.SHUTDOWN)) { |
| throw new SessionNotRunning("TezSession has already shutdown"); |
| } |
| if (status.equals(TezAppMasterStatus.READY)) { |
| return; |
| } |
| Thread.sleep(SLEEP_FOR_READY); |
| } |
| } |
| |
| @VisibleForTesting |
| // for testing |
| @Private |
| protected FrameworkClient createFrameworkClient() { |
| return FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration()); |
| } |
| |
| @VisibleForTesting |
| // for testing |
| protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) |
| throws TezException, IOException { |
| return TezClientUtils.getSessionAMProxy( |
| frameworkClient, amConfig.getYarnConfiguration(), appId); |
| } |
| |
| private DAGClientAMProtocolBlockingPB waitForProxy() |
| throws IOException, TezException, InterruptedException { |
| long startTime = System.currentTimeMillis(); |
| long endTime = startTime + (clientTimeout * 1000); |
| DAGClientAMProtocolBlockingPB proxy = null; |
| while (true) { |
| proxy = getSessionAMProxy(sessionAppId); |
| if (proxy != null) { |
| break; |
| } |
| Thread.sleep(100l); |
| if (clientTimeout != -1 && System.currentTimeMillis() > endTime) { |
| break; |
| } |
| } |
| return proxy; |
| } |
| |
| private void verifySessionStateForSubmission() throws SessionNotRunning { |
| Preconditions.checkState(isSession, "Invalid without session mode"); |
| if (!sessionStarted) { |
| throw new SessionNotRunning("Session not started"); |
| } else if (sessionStopped) { |
| throw new SessionNotRunning("Session stopped"); |
| } |
| } |
| |
| private DAGClient submitDAGApplication(DAG dag) |
| throws TezException, IOException { |
| ApplicationId appId = createApplication(); |
| return submitDAGApplication(appId, dag); |
| } |
| |
| @Private // To be used only by YarnRunner |
| DAGClient submitDAGApplication(ApplicationId appId, DAG dag) |
| throws TezException, IOException { |
| LOG.info("Submitting DAG application with id: " + appId); |
| try { |
| // Use the AMCredentials object in client mode, since this won't be re-used. |
| // Ensures we don't fetch credentially unnecessarily if the user has already provided them. |
| Credentials credentials = amConfig.getCredentials(); |
| if (credentials == null) { |
| credentials = new Credentials(); |
| } |
| TezClientUtils.processTezLocalCredentialsFile(credentials, |
| amConfig.getTezConfiguration()); |
| |
| // Add session token for shuffle |
| TezClientUtils.createSessionToken(appId.toString(), |
| jobTokenSecretManager, credentials); |
| |
| // Add credentials for tez-local resources. |
| Map<String, LocalResource> tezJarResources = getTezJarResources(credentials); |
| ApplicationSubmissionContext appContext = TezClientUtils |
| .createApplicationSubmissionContext( |
| appId, dag, dag.getName(), amConfig, tezJarResources, credentials); |
| LOG.info("Submitting DAG to YARN" |
| + ", applicationId=" + appId |
| + ", dagName=" + dag.getName()); |
| |
| frameworkClient.submitApplication(appContext); |
| ApplicationReport appReport = frameworkClient.getApplicationReport(appId); |
| LOG.info("The url to track the Tez AM: " + appReport.getTrackingUrl()); |
| lastSubmittedAppId = appId; |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| return getDAGClient(appId, amConfig.getTezConfiguration(), frameworkClient); |
| } |
| |
| private ApplicationId createApplication() throws TezException, IOException { |
| try { |
| return frameworkClient.createApplication(). |
| getNewApplicationResponse().getApplicationId(); |
| } catch (YarnException e) { |
| throw new TezException(e); |
| } |
| } |
| |
| private synchronized Map<String, LocalResource> getTezJarResources(Credentials credentials) |
| throws IOException { |
| if (cachedTezJarResources == null) { |
| cachedTezJarResources = TezClientUtils.setupTezJarsLocalResources( |
| amConfig.getTezConfiguration(), credentials); |
| } |
| return cachedTezJarResources; |
| } |
| |
| @Private // Used only for MapReduce compatibility code |
| static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, |
| FrameworkClient frameworkClient) |
| throws IOException, TezException { |
| return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient); |
| } |
| |
| // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java |
| private static final char SEPARATOR = '_'; |
| public static final String DAG = "dag"; |
| static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() { |
| @Override |
| public NumberFormat initialValue() { |
| NumberFormat fmt = NumberFormat.getInstance(); |
| fmt.setGroupingUsed(false); |
| fmt.setMinimumIntegerDigits(4); |
| return fmt; |
| } |
| }; |
| |
| static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() { |
| @Override |
| public NumberFormat initialValue() { |
| NumberFormat fmt = NumberFormat.getInstance(); |
| fmt.setGroupingUsed(false); |
| fmt.setMinimumIntegerDigits(1); |
| return fmt; |
| } |
| }; |
| |
| // Used only for MapReduce compatibility code |
| private static String getDefaultTezDAGID(ApplicationId applicationId) { |
| return (new StringBuilder(DAG)).append(SEPARATOR). |
| append(applicationId.getClusterTimestamp()). |
| append(SEPARATOR). |
| append(tezAppIdFormat.get().format(applicationId.getId())). |
| append(SEPARATOR). |
| append(tezDagIdFormat.get().format(1)).toString(); |
| } |
| } |