blob: 93807fdf46e776e0d13a892a1b8d74f78014f858 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.client;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.NumberFormat;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.JavaOptsChecker;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.dag.api.TezConfigurationConstants;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.util.Time;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotReady;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
/**
* TezClient is used to submit Tez DAGs for execution. DAG's are executed via a
* Tez App Master. TezClient can run the App Master in session or non-session
* mode. <br>
* In non-session mode, each DAG is executed in a different App Master that
* exits after the DAG execution completes. <br>
* In session mode, the TezClient creates a single instance of the App Master
* and all DAG's are submitted to the same App Master.<br>
* Session mode may give better performance when a series of DAGs need to
* executed because it enables resource re-use across those DAGs. Non-session
* mode should be used when the user wants to submit a single DAG or wants to
* disconnect from the cluster after submitting a set of unrelated DAGs. <br>
* If API recommendations are followed, then the choice of running in session or
* non-session mode is transparent to writing the application. By changing the
* session mode configuration, the same application can be running in session or
* non-session mode.
*/
@Public
public class TezClient {
private static final Logger LOG = LoggerFactory.getLogger(TezClient.class);
private static final String appIdStrPrefix = "application";
private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_';
private static final long PREWARM_WAIT_MS = 500;
@VisibleForTesting
static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found.";
@VisibleForTesting
final String clientName;
private ApplicationId sessionAppId;
private ApplicationId lastSubmittedAppId;
@VisibleForTesting
final AMConfiguration amConfig;
@VisibleForTesting
FrameworkClient frameworkClient;
private String diagnostics;
@VisibleForTesting
final boolean isSession;
private final AtomicBoolean sessionStarted = new AtomicBoolean(false);
private final AtomicBoolean sessionStopped = new AtomicBoolean(false);
/** Tokens which will be required for all DAGs submitted to this session. */
private Credentials sessionCredentials = new Credentials();
private long clientTimeout;
Map<String, LocalResource> cachedTezJarResources;
boolean usingTezArchiveDeploy = false;
private static final long SLEEP_FOR_READY = 500;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private final Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
@VisibleForTesting
final TezApiVersionInfo apiVersionInfo;
@VisibleForTesting
final ServicePluginsDescriptor servicePluginsDescriptor;
private JavaOptsChecker javaOptsChecker = null;
private DAGClient prewarmDagClient = null;
private int preWarmDAGCounter = 0;
/* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */
private int maxSubmitDAGRequestSizeThroughIPC;
/* this counter counts number of serialized DAGPlan and is used to give unique name to each serialized DAGPlan */
private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0);
private FileSystem stagingFs = null;
private ScheduledExecutorService amKeepAliveService;
private final Map<String, UserGroupInformation> ugiMap;
private TezClient(String name, TezConfiguration tezConf) {
this(name, tezConf, tezConf.getBoolean(
TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
}
@Private
TezClient(String name, TezConfiguration tezConf,
@Nullable Map<String, LocalResource> localResources,
@Nullable Credentials credentials) {
this(name, tezConf, tezConf.getBoolean(
TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT),
localResources, credentials);
}
private TezClient(String name, TezConfiguration tezConf, boolean isSession) {
this(name, tezConf, isSession, null, null);
}
@Private
protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
@Nullable Map<String, LocalResource> localResources,
@Nullable Credentials credentials) {
this(name, tezConf, isSession, localResources, credentials, null);
}
@Private
protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
@Nullable Map<String, LocalResource> localResources,
@Nullable Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) {
this.clientName = name;
this.isSession = isSession;
// Set in conf for local mode AM to figure out whether in session mode or not
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
try {
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
tezConf.set(TezConfigurationConstants.TEZ_SUBMIT_HOST, ip.getCanonicalHostName());
tezConf.set(TezConfigurationConstants.TEZ_SUBMIT_HOST_ADDRESS, ip.getHostAddress());
}
} catch (UnknownHostException e) {
LOG.warn("The host name of the client the tez application was submitted from was unable to be retrieved", e);
}
this.ugiMap = new HashMap<>();
this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
this.apiVersionInfo = new TezApiVersionInfo();
this.servicePluginsDescriptor = servicePluginsDescriptor;
this.maxSubmitDAGRequestSizeThroughIPC = tezConf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT) -
tezConf.getInt(TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES,
TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES_DEFAULT);
Limits.setConfiguration(tezConf);
LOG.info("Tez Client Version: " + apiVersionInfo.toString());
}
/**
* Create a new TezClientBuilder. This can be used to setup additional parameters
* like session mode, local resources, credentials, servicePlugins, etc.
* <p/>
* If session mode is not specified in the builder, this will be inferred from
* the provided TezConfiguration.
*
* @param name Name of the client. Used for logging etc. This will also be used
* as app master name is session mode
* @param tezConf Configuration for the framework
* @return An instance of {@link org.apache.tez.client.TezClient.TezClientBuilder}
* which can be used to construct the final TezClient.
*/
public static TezClientBuilder newBuilder(String name, TezConfiguration tezConf) {
return new TezClientBuilder(name, tezConf);
}
/**
* Create a new TezClient. Session or non-session execution mode will be
* inferred from configuration.
* @param name
* Name of the client. Used for logging etc. This will also be used
* as app master name is session mode
* @param tezConf
* Configuration for the framework
*/
public static TezClient create(String name, TezConfiguration tezConf) {
return new TezClient(name, tezConf);
}
/**
* Create a new TezClient. Session or non-session execution mode will be
* inferred from configuration. Set the initial resources and security
* credentials for the App Master. If app master resources/credentials are
* needed then this is the recommended method for session mode execution.
*
* @param name
* Name of the client. Used for logging etc. This will also be used
* as app master name is session mode
* @param tezConf
* Configuration for the framework
* @param localFiles
* local files for the App Master
* @param credentials
* Set security credentials to be used inside the app master, if
* needed. Tez App Master needs credentials to access the staging
* directory and for most HDFS cases these are automatically obtained
* by Tez client. If the staging directory is on a file system for
* which credentials cannot be obtained or for any credentials needed
* by user code running inside the App Master, credentials must be
* supplied by the user. These will be used by the App Master for the
* next DAG. <br>
* In session mode, credentials, if needed, must be set before
* calling start()
*/
public static TezClient create(String name, TezConfiguration tezConf,
@Nullable Map<String, LocalResource> localFiles,
@Nullable Credentials credentials) {
return new TezClient(name, tezConf, localFiles, credentials);
}
/**
* Create a new TezClient with AM session mode set explicitly. This overrides
* the setting from configuration.
* @param name
* Name of the client. Used for logging etc. This will also be used
* as app master name is session mode
* @param tezConf Configuration for the framework
* @param isSession The AM will run in session mode or not
*/
public static TezClient create(String name, TezConfiguration tezConf, boolean isSession) {
return new TezClient(name, tezConf, isSession);
}
/**
* Create a new TezClient with AM session mode set explicitly. This overrides
* the setting from configuration.
* Set the initial files and security credentials for the App Master.
* @param name
* Name of the client. Used for logging etc. This will also be used
* as app master name is session mode
* @param tezConf Configuration for the framework
* @param isSession The AM will run in session mode or not
* @param localFiles local files for the App Master
* @param credentials credentials for the App Master
*/
public static TezClient create(String name, TezConfiguration tezConf, boolean isSession,
@Nullable Map<String, LocalResource> localFiles,
@Nullable Credentials credentials) {
return new TezClient(name, tezConf, isSession, localFiles, credentials);
}
/**
* Add local files for the DAG App Master. These may be files, archives,
* jars etc.<br>
* <p>
* In non-session mode these will be added to the files of the App Master
* to be launched for the next DAG. Files added via this method will
* accumulate and be used for every new App Master until
* {@link #clearAppMasterLocalFiles()} is invoked. <br>
* <p>
* In session mode, the recommended usage is to add all files before
* calling start() so that all needed files are available to the app
* master before it starts. When called after start(), these local files
* will be re-localized to the running session DAG App Master and will be
* added to its classpath for execution of this DAG.
* <p>
* Caveats for invoking this method after start() in session mode: files
* accumulate across DAG submissions and are never removed from the classpath.
* Only LocalResourceType.FILE is supported. All files will be treated as
* private.
*
* @param localFiles the files to be made available in the AM
*/
public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
Objects.requireNonNull(localFiles);
if (isSession && sessionStarted.get()) {
additionalLocalResources.putAll(localFiles);
}
amConfig.addAMLocalResources(localFiles);
}
/**
* If the next DAG App Master needs different local files, then use this
* method to clear the local files and then add the new local files
* using {@link #addAppMasterLocalFiles(Map)}. This method is a no-op in session mode,
* after start() is called.
*/
public synchronized void clearAppMasterLocalFiles() {
amConfig.clearAMLocalResources();
}
/**
* Set security credentials to be used inside the app master, if needed. Tez App
* Master needs credentials to access the staging directory and for most HDFS
* cases these are automatically obtained by Tez client. If the staging
* directory is on a file system for which credentials cannot be obtained or
* for any credentials needed by user code running inside the App Master,
* credentials must be supplied by the user. These will be used by the App
* Master for the next DAG. <br>In session mode, credentials, if needed, must be
* set before calling start()
*
* @param credentials credentials
*/
public synchronized void setAppMasterCredentials(Credentials credentials) {
Preconditions
.checkState(!sessionStarted.get(),
"Credentials cannot be set after the session App Master has been started");
amConfig.setCredentials(credentials);
}
/**
* Sets the history log level for this session. It will be in effect for DAGs submitted after this
* call.
*
* @param historyLogLevel The log level to be used.
*/
public synchronized void setHistoryLogLevel(HistoryLogLevel historyLogLevel) {
amConfig.getTezConfiguration().setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL,
historyLogLevel);
}
/**
* Start the client. This establishes a connection to the YARN cluster.
* In session mode, this start the App Master thats runs all the DAGs in the
* session.
* @throws TezException
* @throws IOException
*/
public synchronized void start() throws TezException, IOException {
startFrameworkClient();
setupJavaOptsChecker();
if (isSession) {
LOG.info("Session mode. Starting session.");
TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
amConfig.getTezConfiguration());
clientTimeout = amConfig.getTezConfiguration().getInt(
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
try {
if (sessionAppId == null) {
sessionAppId = createApplication();
}
ApplicationSubmissionContext appContext = setupApplicationContext();
frameworkClient.submitApplication(appContext);
ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl());
sessionStarted.set(true);
} catch (YarnException e) {
cleanStagingDir();
throw new TezException(e);
} catch (IOException e) {
cleanStagingDir();
throw new TezException(e);
}
startClientHeartbeat();
this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
}
}
private void cleanStagingDir() {
Configuration conf = amConfig.getTezConfiguration();
String appId = sessionAppId.toString();
Path stagingDir = TezCommonUtils.getTezSystemStagingPath(conf, appId);
boolean isStgDeleted = false;
try {
FileSystem fs = stagingDir.getFileSystem(conf);
isStgDeleted = fs.delete(stagingDir, true);
} catch (IOException ioe) {
LOG.error("Error deleting staging dir " + stagingDir, ioe);
} finally {
LOG.info("Staging dir {}, deleted:{} ", stagingDir, isStgDeleted);
}
}
public synchronized TezClient getClient(String appIdStr) throws IOException, TezException {
return getClient(appIdfromString(appIdStr));
}
/**
* Alternative to start() that explicitly sets sessionAppId and doesn't start a new AM.
* The caller of getClient is responsible for initializing the new TezClient with a
* Configuration compatible with the existing AM. It is expected the caller has cached the
* original Configuration (e.g. in Zookeeper).
*
* In contrast to "start", no resources are localized. It is the responsibility of the caller to
* ensure that existing localized resources and staging dirs are still valid.
*
* @param appId
* @return 'this' just as a convenience for fluent style chaining
*/
public synchronized TezClient getClient(ApplicationId appId) throws TezException, IOException {
sessionAppId = appId;
startFrameworkClient();
setupJavaOptsChecker();
if (!isSession) {
String msg = "Must be in session mode to bind TezClient to existing AM";
LOG.error(msg);
throw new IllegalStateException(msg);
}
LOG.info("Session mode. Reconnecting to session: " + sessionAppId.toString());
clientTimeout = amConfig.getTezConfiguration().getInt(
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
try {
setupApplicationContext();
ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl());
sessionStarted.set(true);
} catch (YarnException e) {
cleanStagingDir();
throw new TezException(e);
} catch (IOException e) {
cleanStagingDir();
throw new TezException(e);
}
startClientHeartbeat();
this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
return this;
}
private void startFrameworkClient() {
frameworkClient = createFrameworkClient();
frameworkClient.init(amConfig.getTezConfiguration());
frameworkClient.start();
}
private ApplicationSubmissionContext setupApplicationContext() throws IOException, YarnException {
TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
amConfig.getTezConfiguration());
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
// Add session token for shuffle
TezClientUtils.createSessionToken(sessionAppId.toString(),
jobTokenSecretManager, sessionCredentials);
ApplicationSubmissionContext appContext =
TezClientUtils.createApplicationSubmissionContext(
sessionAppId,
null, clientName, amConfig,
tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
servicePluginsDescriptor, javaOptsChecker);
// Set Tez Sessions to not retry on AM crashes if recovery is disabled
if (!amConfig.getTezConfiguration().getBoolean(
TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
appContext.setMaxAppAttempts(1);
}
return appContext;
}
private void setupJavaOptsChecker() {
if (this.amConfig.getTezConfiguration().getBoolean(
TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED,
TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) {
String javaOptsCheckerClassName = this.amConfig.getTezConfiguration().get(
TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS, "");
if (!javaOptsCheckerClassName.isEmpty()) {
try {
javaOptsChecker = ReflectionUtils.createClazzInstance(javaOptsCheckerClassName);
} catch (Exception e) {
LOG.warn("Failed to initialize configured Java Opts Checker"
+ " (" + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS
+ ") , checkerClass=" + javaOptsCheckerClassName
+ ". Disabling checker.", e);
javaOptsChecker = null;
}
} else {
javaOptsChecker = new JavaOptsChecker();
}
}
}
private void startClientHeartbeat() {
long amClientKeepAliveTimeoutIntervalMillis =
TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration());
// Poll at minimum of 1 second interval
long pollPeriod = TezCommonUtils.
getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(),
amClientKeepAliveTimeoutIntervalMillis, 10);
boolean isLocal = amConfig.getTezConfiguration().getBoolean(
TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) {
amKeepAliveService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("AMKeepAliveThread #%d").build());
amKeepAliveService.scheduleWithFixedDelay(new Runnable() {
private DAGClientAMProtocolBlockingPB proxy;
@Override
public void run() {
proxy = sendAMHeartbeat(proxy);
}
}, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS);
}
}
public DAGClientAMProtocolBlockingPB sendAMHeartbeat(DAGClientAMProtocolBlockingPB proxy) {
if (sessionStopped.get()) {
// Ignore sending heartbeat as session being stopped
return null;
}
try {
if (proxy == null) {
try {
proxy = frameworkClient.waitForProxy(clientTimeout, amConfig.getTezConfiguration(),
sessionAppId, getUgi());
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to create a connection to the AM", e);
} catch (SessionNotRunning e) {
LOG.error("Cannot create a connection to the AM, stopping heartbeat to AM", e);
cancelAMKeepAlive(false);
}
}
if (proxy != null) {
LOG.debug("Sending heartbeat to AM");
proxy.getAMStatus(null, GetAMStatusRequestProto.newBuilder().build());
}
return proxy;
} catch (Exception e) {
LOG.info("Exception when sending heartbeat to AM for app {}: {}", sessionAppId,
e.getMessage());
LOG.debug("Error when sending heartbeat ping to AM. Resetting AM proxy for app: {}"
+ " due to exception :", sessionAppId, e);
return null;
}
}
/**
* Submit a DAG. <br>In non-session mode, it submits a new App Master to the
* cluster.<br>In session mode, it submits the DAG to the session App Master. It
* blocks until either the DAG is submitted to the session or configured
* timeout period expires. Cleans up session if the submission timed out.
*
* @param dag
* DAG to be submitted to Session
* @return DAGClient to monitor the DAG
* @throws TezException
* @throws IOException
* @throws DAGSubmissionTimedOut
* if submission timed out
*/
public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
DAGClient result = isSession ? submitDAGSession(dag) : submitDAGApplication(dag);
if (result != null) {
closePrewarmDagClient(); // Assume the current DAG replaced the prewarm one; no need to kill.
}
return result;
}
private void killAndClosePrewarmDagClient(long waitTimeMs) {
if (prewarmDagClient == null) {
return;
}
try {
prewarmDagClient.tryKillDAG();
if (waitTimeMs > 0) {
LOG.info("Waiting for prewarm DAG to shut down");
prewarmDagClient.waitForCompletion(waitTimeMs);
}
}
catch (Exception ex) {
LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex);
}
closePrewarmDagClient();
}
private void closePrewarmDagClient() {
if (prewarmDagClient == null) {
return;
}
try {
prewarmDagClient.close();
} catch (Exception e) {
LOG.warn("Failed to close prewarm DagClient " + prewarmDagClient, e);
}
prewarmDagClient = null;
}
private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
Preconditions.checkState(isSession == true,
"submitDAG with additional resources applies to only session mode. " +
"In non-session mode please specify all resources in the initial configuration");
verifySessionStateForSubmission();
String callerContextStr = "";
if (dag.getCallerContext() != null) {
callerContextStr = ", callerContext=" + dag.getCallerContext().contextAsSimpleString();
}
LOG.info("Submitting dag to TezSession"
+ ", sessionName=" + clientName
+ ", applicationId=" + sessionAppId
+ ", dagName=" + dag.getName()
+ callerContextStr);
if (!additionalLocalResources.isEmpty()) {
for (LocalResource lr : additionalLocalResources.values()) {
Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
+ lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
}
}
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
requestBuilder.setDAGPlan(dagPlan);
if (!additionalLocalResources.isEmpty()) {
requestBuilder.setAdditionalAmResources(DagTypeConverters
.convertFromLocalResources(additionalLocalResources));
}
additionalLocalResources.clear();
// if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS
SubmitDAGRequestProto request = requestBuilder.build();
if (request.getSerializedSize() > maxSubmitDAGRequestSizeThroughIPC) {
Path dagPlanPath = new Path(TezCommonUtils.getTezSystemStagingPath(amConfig.getTezConfiguration(),
sessionAppId.toString()), TezConstants.TEZ_PB_PLAN_BINARY_NAME +
serializedSubmitDAGPlanRequestCounter.incrementAndGet());
FileSystem fs = dagPlanPath.getFileSystem(stagingFs.getConf());
try (FSDataOutputStream fsDataOutputStream = fs.create(dagPlanPath, false)) {
LOG.info("Send dag plan using YARN local resources since it's too large"
+ ", dag plan size=" + request.getSerializedSize()
+ ", max dag plan size through IPC=" + maxSubmitDAGRequestSizeThroughIPC
+ ", max IPC message size= " + amConfig.getTezConfiguration().getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT));
request.writeTo(fsDataOutputStream);
request = requestBuilder.clear().setSerializedRequestPath(fs.resolvePath(dagPlanPath).toString()).build();
}
}
return frameworkClient.submitDag(dag, request, clientName, sessionAppId, clientTimeout,
getUgi(), amConfig.getTezConfiguration());
}
private UserGroupInformation getUgi() throws IOException {
String userName = UserGroupInformation.getCurrentUser().getUserName();
return ugiMap.computeIfAbsent(userName,
v -> UserGroupInformation.createRemoteUser(userName));
}
@VisibleForTesting
protected long getPrewarmWaitTimeMs() {
return PREWARM_WAIT_MS;
}
/**
* Stop the client. This terminates the connection to the YARN cluster.
* In session mode, this shuts down the session DAG App Master
* @throws TezException
* @throws IOException
*/
public synchronized void stop() throws TezException, IOException {
killAndClosePrewarmDagClient(getPrewarmWaitTimeMs());
try {
if (amKeepAliveService != null) {
amKeepAliveService.shutdownNow();
}
if (sessionStarted.get()) {
LOG.info("Shutting down Tez Session"
+ ", sessionName=" + clientName
+ ", applicationId=" + sessionAppId);
sessionStopped.set(true);
boolean sessionShutdownSuccessful = false;
try {
sessionShutdownSuccessful = frameworkClient
.shutdownSession(amConfig.getTezConfiguration(), sessionAppId, getUgi());
boolean asynchronousStop = amConfig.getTezConfiguration().getBoolean(
TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
if (!asynchronousStop && sessionShutdownSuccessful) {
LOG.info("Waiting until application is in a final state");
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
long killTimeOut = amConfig.getTezConfiguration().getLong(
TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
ApplicationReport appReport = frameworkClient
.getApplicationReport(sessionAppId);
while ((currentTimeMillis < timeKillIssued + killTimeOut)
&& !isJobInTerminalState(appReport.getYarnApplicationState())) {
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
/** interrupted, just break */
break;
}
currentTimeMillis = System.currentTimeMillis();
appReport = frameworkClient.getApplicationReport(sessionAppId);
}
if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
frameworkClient.killApplication(sessionAppId);
}
}
} catch (TezException e) {
LOG.info("Failed to shutdown Tez Session via proxy", e);
} catch (ServiceException e) {
LOG.info("Failed to shutdown Tez Session via proxy", e);
} catch (ApplicationNotFoundException e) {
LOG.info("Failed to kill nonexistent application " + sessionAppId, e);
} catch (YarnException e) {
throw new TezException(e);
}
if (!sessionShutdownSuccessful) {
LOG.info("Could not connect to AM, killing session via YARN"
+ ", sessionName=" + clientName
+ ", applicationId=" + sessionAppId);
try {
frameworkClient.killApplication(sessionAppId);
} catch (ApplicationNotFoundException e) {
LOG.info("Failed to kill nonexistent application " + sessionAppId, e);
} catch (YarnException e) {
throw new TezException(e);
}
}
}
} finally {
if (frameworkClient != null) {
frameworkClient.close();
}
}
}
private boolean isJobInTerminalState(YarnApplicationState yarnApplicationState) {
return (yarnApplicationState == YarnApplicationState.FINISHED
|| yarnApplicationState == YarnApplicationState.FAILED
|| yarnApplicationState == YarnApplicationState.KILLED);
}
/**
* Get the name of the client
* @return name
*/
public String getClientName() {
return clientName;
}
@Private
@VisibleForTesting
public synchronized ApplicationId getAppMasterApplicationId() {
if (isSession) {
return sessionAppId;
} else {
return lastSubmittedAppId;
}
}
/**
* Get the status of the App Master executing the DAG
* In non-session mode it returns the status of the last submitted DAG App Master
* In session mode, it returns the status of the App Master hosting the session
*
* @return State of the session
* @throws TezException
* @throws IOException
*/
public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException, IOException {
// Supporting per-DAG app master case since user may choose to run the same
// code in that mode and the code should continue to work. Its easy to provide
// the correct view for per-DAG app master too.
ApplicationId appId = null;
if (isSession) {
appId = sessionAppId;
} else {
appId = lastSubmittedAppId;
}
Preconditions.checkState(appId != null, "Cannot get status without starting an application");
try {
ApplicationReport appReport = frameworkClient.getApplicationReport(
appId);
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
case ACCEPTED:
case SUBMITTED:
return TezAppMasterStatus.INITIALIZING;
case FAILED:
case KILLED:
diagnostics = appReport.getDiagnostics();
LOG.info("App did not succeed. Diagnostics: "
+ (appReport.getDiagnostics() != null ? appReport.getDiagnostics()
: NO_CLUSTER_DIAGNOSTICS_MSG));
return TezAppMasterStatus.SHUTDOWN;
case FINISHED:
return TezAppMasterStatus.SHUTDOWN;
case RUNNING:
try {
return frameworkClient.getAMStatus(amConfig.getTezConfiguration(), appId, getUgi());
} catch (TezException e) {
LOG.info("Failed to retrieve AM Status via proxy", e);
} catch (ServiceException e) {
LOG.info("Failed to retrieve AM Status via proxy", e);
}
}
} catch (ApplicationNotFoundException e) {
return TezAppMasterStatus.SHUTDOWN;
} catch (YarnException e) {
throw new TezException(e);
}
return TezAppMasterStatus.INITIALIZING;
}
/**
* API to help pre-allocate containers in session mode. In non-session mode
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* The preWarm vertex should be configured and setup exactly
* like the other vertices in the job DAGs so that the pre-allocated containers
* may be re-used by the subsequent DAGs to improve performance.
* The processor for the preWarmVertex may be used to pre-warm the containers
* by pre-loading classes etc. It should be short-running so that pre-warming
* does not block real execution. Users can specify their custom processors or
* use the PreWarmProcessor from the runtime library.
* The parallelism of the preWarmVertex will determine the number of preWarmed
* containers.
* Pre-warming is best efforts and among other factors is limited by the free
* resources on the cluster.
* @param preWarmVertex
* @throws TezException
* @throws IOException
*/
@Unstable
public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException {
preWarm(preWarmVertex, 0, TimeUnit.MILLISECONDS);
}
/**
* API to help pre-allocate containers in session mode. In non-session mode
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* The preWarm vertex should be configured and setup exactly
* like the other vertices in the job DAGs so that the pre-allocated
* containers may be re-used by the subsequent DAGs to improve performance.
* The processor for the preWarmVertex may be used to pre-warm the containers
* by pre-loading classes etc. It should be short-running so that pre-warming
* does not block real execution. Users can specify their custom processors or
* use the PreWarmProcessor from the runtime library.
* The parallelism of the preWarmVertex will determine the number of preWarmed
* containers.
* Pre-warming is best efforts and among other factors is limited by the free
* resources on the cluster. Based on the specified timeout value it returns
* false if the status is not READY after the wait period.
* @param preWarmVertex
* @param timeout
* @param unit
* @throws TezException
* @throws IOException
*/
@Unstable
public synchronized void preWarm(PreWarmVertex preWarmVertex,
long timeout, TimeUnit unit)
throws TezException, IOException {
if (!isSession) {
// do nothing for non session mode. This is there to let the code
// work correctly in both modes
LOG.warn("preWarm is not supported in non-session mode," +
"please use session-mode of TezClient");
return;
}
verifySessionStateForSubmission();
DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_"
+ preWarmDAGCounter++);
dag.addVertex(preWarmVertex);
boolean isReady;
try {
isReady = waitTillReady(timeout, unit);
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for AM to become " +
"available", e);
}
if(isReady) {
prewarmDagClient = submitDAG(dag);
} else {
throw new SessionNotReady("Tez AM not ready, could not submit DAG");
}
}
/**
* Wait till the DAG is ready to be submitted.
* In non-session mode this is a no-op since the application can be immediately
* submitted.
* In session mode, this waits for the session host to be ready to accept a DAG
* @throws IOException
* @throws TezException
* @throws InterruptedException
*/
@Evolving
public synchronized void waitTillReady() throws IOException, TezException, InterruptedException {
waitTillReady(0, TimeUnit.MILLISECONDS);
}
/**
* Wait till the DAG is ready to be submitted.
* In non-session mode this is a no-op since the application can be
* immediately submitted.
* In session mode, this waits for the session host to be ready to accept
* a DAG and returns false if not ready after a configured time wait period.
* @param timeout
* @param unit
* @return true if READY or is not in session mode, false otherwise.
* @throws IOException
* @throws TezException
* @throws InterruptedException
*/
@Evolving
public synchronized boolean waitTillReady(long timeout, TimeUnit unit)
throws IOException, TezException, InterruptedException {
timeout = unit.toMillis(timeout);
if (!isSession) {
// nothing to wait for in non-session mode
return true;
}
verifySessionStateForSubmission();
long startTime = Time.monotonicNow();
long timeLimit = startTime + timeout;
while (true) {
TezAppMasterStatus status = getAppMasterStatus();
if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
throw new SessionNotRunning("TezSession has already shutdown. "
+ ((diagnostics != null) ? diagnostics : NO_CLUSTER_DIAGNOSTICS_MSG));
}
if (status.equals(TezAppMasterStatus.READY)) {
return true;
}
if (timeout == 0) {
Thread.sleep(SLEEP_FOR_READY);
continue;
}
long now = Time.monotonicNow();
if (timeLimit > now) {
long sleepTime = Math.min(SLEEP_FOR_READY, timeLimit - now);
Thread.sleep(sleepTime);
} else {
return false;
}
}
}
private void waitNonSessionTillReady() throws IOException, TezException {
Preconditions.checkArgument(!isSession, "It is supposed to be only called in non-session mode");
while (true) {
TezAppMasterStatus status = getAppMasterStatus();
// DAGClient will handle the AM SHUTDOWN case
if (status.equals(TezAppMasterStatus.RUNNING)
|| status.equals(TezAppMasterStatus.SHUTDOWN)) {
return;
}
try {
Thread.sleep(SLEEP_FOR_READY);
} catch (InterruptedException e) {
throw new TezException("TezClient is interrupted");
}
}
}
@VisibleForTesting
// for testing
@Private
protected FrameworkClient createFrameworkClient() {
return FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration());
}
private void verifySessionStateForSubmission() throws SessionNotRunning {
Preconditions.checkState(isSession, "Invalid without session mode");
if (!sessionStarted.get()) {
throw new SessionNotRunning("Session not started");
} else if (sessionStopped.get()) {
throw new SessionNotRunning("Session stopped by user");
}
}
private DAGClient submitDAGApplication(DAG dag)
throws TezException, IOException {
ApplicationId appId = createApplication();
return submitDAGApplication(appId, dag);
}
@Private // To be used only by YarnRunner
DAGClient submitDAGApplication(ApplicationId appId, DAG dag)
throws TezException, IOException {
LOG.info("Submitting DAG application with id: " + appId);
try {
// Use the AMCredentials object in client mode, since this won't be re-used.
// Ensures we don't fetch credentially unnecessarily if the user has already provided them.
Credentials credentials = amConfig.getCredentials();
if (credentials == null) {
credentials = new Credentials();
}
TezClientUtils.processTezLocalCredentialsFile(credentials,
amConfig.getTezConfiguration());
// Add session token for shuffle
TezClientUtils.createSessionToken(appId.toString(),
jobTokenSecretManager, credentials);
// Add credentials for tez-local resources.
Map<String, LocalResource> tezJarResources = getTezJarResources(credentials);
ApplicationSubmissionContext appContext = TezClientUtils
.createApplicationSubmissionContext(
appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
usingTezArchiveDeploy, apiVersionInfo, servicePluginsDescriptor, javaOptsChecker);
String callerContextStr = "";
if (dag.getCallerContext() != null) {
callerContextStr = ", callerContext=" + dag.getCallerContext().contextAsSimpleString();
}
LOG.info("Submitting DAG to YARN"
+ ", applicationId=" + appId
+ ", dagName=" + dag.getName()
+ callerContextStr);
TezCommonUtils.logCredentials(LOG, credentials, "appContext");
frameworkClient.submitApplication(appContext);
ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
LOG.info("The url to track the Tez AM: " + appReport.getTrackingUrl());
lastSubmittedAppId = appId;
} catch (YarnException e) {
throw new TezException(e);
}
// wait for dag in non-session mode to start running, so that we can start to getDAGStatus
waitNonSessionTillReady();
return getDAGClient(appId, amConfig.getTezConfiguration(), frameworkClient, getUgi());
}
private ApplicationId createApplication() throws TezException, IOException {
try {
return frameworkClient.createApplication().
getNewApplicationResponse().getApplicationId();
} catch (YarnException e) {
throw new TezException(e);
}
}
private synchronized Map<String, LocalResource> getTezJarResources(Credentials credentials)
throws IOException {
if (cachedTezJarResources == null) {
cachedTezJarResources = new HashMap<String, LocalResource>();
usingTezArchiveDeploy = TezClientUtils.setupTezJarsLocalResources(
amConfig.getTezConfiguration(), credentials, cachedTezJarResources);
}
return cachedTezJarResources;
}
@Private
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient, UserGroupInformation ugi) throws IOException, TezException {
return frameworkClient.getDAGClient(appId, getDefaultTezDAGID(appId), tezConf, ugi);
}
// DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
private static final char SEPARATOR = '_';
public static final String DAG = "dag";
static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(1);
return fmt;
}
};
// Used only for MapReduce compatibility code
private static String getDefaultTezDAGID(ApplicationId applicationId) {
return (new StringBuilder(DAG)).append(SEPARATOR).
append(applicationId.getClusterTimestamp()).
append(SEPARATOR).
append(tezAppIdFormat.get().format(applicationId.getId())).
append(SEPARATOR).
append(tezDagIdFormat.get().format(1)).toString();
}
@VisibleForTesting
@Private
public synchronized void cancelAMKeepAlive(boolean shutdownNow) {
if (amKeepAliveService != null) {
if (shutdownNow) {
amKeepAliveService.shutdownNow();
} else {
amKeepAliveService.shutdown();
}
}
}
@VisibleForTesting
protected synchronized ScheduledExecutorService getAMKeepAliveService() {
return amKeepAliveService;
}
/**
* A builder for setting up an instance of {@link org.apache.tez.client.TezClient}
*/
@Public
public static class TezClientBuilder {
final String name;
final TezConfiguration tezConf;
boolean isSession;
private Map<String, LocalResource> localResourceMap;
private Credentials credentials;
ServicePluginsDescriptor servicePluginsDescriptor;
/**
* Create an instance of a TezClientBuilder
*
* @param name
* Name of the client. Used for logging etc. This will also be used
* as app master name is session mode
* @param tezConf
* Configuration for the framework
*/
private TezClientBuilder(String name, TezConfiguration tezConf) {
this.name = name;
this.tezConf = tezConf;
isSession = tezConf.getBoolean(
TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT);
}
/**
* Specify whether this client is a session or not
* @param isSession whether the client is a session
* @return the current builder
*/
public TezClientBuilder setIsSession(boolean isSession) {
this.isSession = isSession;
return this;
}
/**
* Set local resources to be used by the AppMaster
*
* @param localResources local files for the App Master
* @return the files to be added to the AM
*/
public TezClientBuilder setLocalResources(Map<String, LocalResource> localResources) {
this.localResourceMap = localResources;
return this;
}
/**
* Setup security credentials
*
* @param credentials
* Set security credentials to be used inside the app master, if
* needed. Tez App Master needs credentials to access the staging
* directory and for most HDFS cases these are automatically obtained
* by Tez client. If the staging directory is on a file system for
* which credentials cannot be obtained or for any credentials needed
* by user code running inside the App Master, credentials must be
* supplied by the user. These will be used by the App Master for the
* next DAG. <br>
* In session mode, credentials, if needed, must be set before
* calling start()
* @return the current builder
*/
public TezClientBuilder setCredentials(Credentials credentials) {
this.credentials = credentials;
return this;
}
/**
* Specify the service plugins that will be running in the AM
* @param servicePluginsDescriptor the service plugin descriptor with details about the plugins running in the AM
* @return the current builder
*/
public TezClientBuilder setServicePluginDescriptor(ServicePluginsDescriptor servicePluginsDescriptor) {
this.servicePluginsDescriptor = servicePluginsDescriptor;
return this;
}
/**
* Build the actual instance of the {@link TezClient}
* @return an instance of {@link TezClient}
*/
public TezClient build() {
return new TezClient(name, tezConf, isSession, localResourceMap, credentials,
servicePluginsDescriptor);
}
}
//Copied this helper method from
//org.apache.hadoop.yarn.api.records.ApplicationId in Hadoop 2.8+
//to simplify implementation on 2.7.x
@Public
@Unstable
public static ApplicationId appIdfromString(String appIdStr) {
if (!appIdStr.startsWith(APPLICATION_ID_PREFIX)) {
throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ appIdStr + ". The valid ApplicationId should start with prefix "
+ appIdStrPrefix);
}
try {
int pos1 = APPLICATION_ID_PREFIX.length() - 1;
int pos2 = appIdStr.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ApplicationId: "
+ appIdStr);
}
long rmId = Long.parseLong(appIdStr.substring(pos1 + 1, pos2));
int appId = Integer.parseInt(appIdStr.substring(pos2 + 1));
ApplicationId applicationId = ApplicationId.newInstance(rmId, appId);
return applicationId;
} catch (NumberFormatException n) {
throw new IllegalArgumentException("Invalid ApplicationId: "
+ appIdStr, n);
}
}
}