blob: dca9778ab66bdedf0e42fe9cad305244c89e2d84 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.client;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Logger;
import org.apache.tez.common.EnvironmentUpdateUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClientHandler;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.DAG;
import com.google.common.annotations.VisibleForTesting;
public class LocalClient extends FrameworkClient {
public static final Logger LOG = Logger.getLogger(LocalClient.class);
private volatile DAGAppMaster dagAppMaster = null;
private volatile DAGClientHandler clientHandler = null;
private Thread dagAmThread;
private Configuration conf;
private final long clusterTimeStamp = System.currentTimeMillis();
private final long TIME_OUT = 60 * 1000;
private int appIdNumber = 1;
private boolean isSession;
private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
private static final String localModeDAGSchedulerClassName =
"org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled";
public LocalClient() {
}
@Override
public void init(TezConfiguration tezConf, YarnConfiguration yarnConf) {
this.conf = yarnConf;
tezConf.set("fs.defaultFS", "file:///");
// Tez libs already in the client's classpath
tezConf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
tezConf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, localModeDAGSchedulerClassName);
isSession = tezConf.getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE,
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT);
// disable web service for local mode.
this.conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false);
}
@Override
public void start() {
// LocalClients are shared between TezClient and DAGClients, which can cause stop / start / close
// to be invoked multiple times. If modifying these methods - this should be factored in.
}
@Override
public void stop() {
// LocalClients are shared between TezClient and DAGClients, which can cause stop / start / close
// to be invoked multiple times. If modifying these methods - this should be factored in.
}
@Override
public void close() throws IOException {
// LocalClients are shared between TezClient and DAGClients, which can cause stop / start / close
// to be invoked multiple times. If modifying these methods - this should be factored in.
// Multiple DAGClient's can reuse the LocalClient (for ex session). However there is only a
// single instance of LocalClient for a TezClient, and dagAppMaster can be cleaned up when
// the TezClient is stopped, in order not to leak.
if (dagAppMaster != null) {
dagAppMaster.stop();
}
}
@Override
public YarnClientApplication createApplication() throws YarnException, IOException {
ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class);
ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, appIdNumber++);
context.setApplicationId(appId);
GetNewApplicationResponse response = Records.newRecord(GetNewApplicationResponse.class);
response.setApplicationId(appId);
return new YarnClientApplication(response, context);
}
@Override
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) {
ApplicationId appId = appContext.getApplicationId();
startDAGAppMaster(appContext);
return appId;
}
@Override
public void killApplication(ApplicationId appId) {
clientHandler.shutdownAM();
}
@Override
public ApplicationReport getApplicationReport(ApplicationId appId) {
ApplicationReport report = Records.newRecord(ApplicationReport.class);
report.setApplicationId(appId);
report.setCurrentApplicationAttemptId(dagAppMaster.getAttemptID());
AppContext runningAppContext = dagAppMaster.getContext();
if (runningAppContext != null) {
DAG dag = runningAppContext.getCurrentDAG();
if (dag != null) {
report.setUser(runningAppContext.getUser());
}
report.setName(runningAppContext.getApplicationName());
report.setStartTime(runningAppContext.getStartTime());
}
report.setHost(dagAppMaster.getAppNMHost());
report.setRpcPort(dagAppMaster.getRpcPort());
report.setClientToAMToken(null);
report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState()));
report.setFinalApplicationStatus(convertDAGAppMasterStateToFinalYARNState(dagAppMaster.getState()));
List<String> diagnostics = dagAppMaster.getDiagnostics();
if (diagnostics != null) {
report.setDiagnostics(diagnostics.toString());
}
report.setTrackingUrl("N/A");
report.setFinishTime(0);
report.setApplicationResourceUsageReport(null);
report.setOriginalTrackingUrl("N/A");
report.setProgress(dagAppMaster.getProgress());
report.setAMRMToken(null);
return report;
}
protected FinalApplicationStatus convertDAGAppMasterStateToFinalYARNState(
DAGAppMasterState dagAppMasterState) {
switch (dagAppMasterState) {
case NEW:
case INITED:
case RECOVERING:
case IDLE:
case RUNNING:
return FinalApplicationStatus.UNDEFINED;
case SUCCEEDED:
return FinalApplicationStatus.SUCCEEDED;
case FAILED:
return FinalApplicationStatus.FAILED;
case KILLED:
return FinalApplicationStatus.KILLED;
case ERROR:
return FinalApplicationStatus.FAILED;
default:
return FinalApplicationStatus.UNDEFINED;
}
}
protected YarnApplicationState convertDAGAppMasterState(DAGAppMasterState dagAppMasterState) {
switch (dagAppMasterState) {
case NEW:
return YarnApplicationState.NEW;
case INITED:
case RECOVERING:
case IDLE:
case RUNNING:
return YarnApplicationState.RUNNING;
case SUCCEEDED:
return YarnApplicationState.FINISHED;
case FAILED:
return YarnApplicationState.FAILED;
case KILLED:
return YarnApplicationState.KILLED;
case ERROR:
return YarnApplicationState.FAILED;
default:
return YarnApplicationState.SUBMITTED;
}
}
protected void startDAGAppMaster(final ApplicationSubmissionContext appContext) {
if (dagAmThread == null) {
try {
dagAmThread = createDAGAppMaster(appContext);
dagAmThread.start();
// Wait until DAGAppMaster is started
long waitingTime = 0;
while (true) {
if (dagAppMaster != null) {
DAGAppMasterState dagAMState = dagAppMaster.getState();
LOG.info("DAGAppMaster state: " + dagAMState);
if (dagAMState.equals(DAGAppMasterState.NEW)) {
LOG.info("DAGAppMaster is not started wait for 100ms...");
} else if (dagAMState.equals(DAGAppMasterState.INITED)) {
LOG.info("DAGAppMaster is not startetd wait for 100ms...");
} else if (dagAMState.equals(DAGAppMasterState.ERROR)) {
throw new TezException("DAGAppMaster got an error during initialization");
} else if (dagAMState.equals(DAGAppMasterState.KILLED)) {
throw new TezException("DAGAppMaster is killed");
} else {
break;
}
}
if (waitingTime < TIME_OUT) {
LOG.info("DAGAppMaster is not created wait for 100ms...");
Thread.sleep(100);
waitingTime += 100;
} else {
throw new TezException("Time out creating DAGAppMaster");
}
}
} catch (Throwable t) {
LOG.fatal("Error starting DAGAppMaster", t);
dagAmThread.interrupt();
System.exit(0);
}
}
}
protected Thread createDAGAppMaster(final ApplicationSubmissionContext appContext) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
ApplicationId appId = appContext.getApplicationId();
// Set up working directory for DAGAppMaster
Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
LOG.info("Using staging directory: " + userDir.toUri().getPath());
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(userDir);
// Prepare Environment
Path logDir = new Path(userDir, "localmode-log-dir");
Path localDir = new Path(userDir, "localmode-local-dir");
fs.mkdirs(logDir);
fs.mkdirs(localDir);
EnvironmentUpdateUtils.put(Environment.LOG_DIRS.name(), logDir.toUri().getPath());
EnvironmentUpdateUtils.put(Environment.LOCAL_DIRS.name(), localDir.toUri().getPath());
// Add session specific credentials to the AM credentials.
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
ByteBuffer tokens = appContext.getAMContainerSpec().getTokens();
if (tokens != null) {
Credentials amCredentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(tokens);
amCredentials.readTokenStorageStream(dibb);
tokens.rewind();
currentUser.addCredentials(amCredentials);
}
// Construct, initialize, and start the DAGAppMaster
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(applicationAttemptId, 1);
String currentHost = InetAddress.getLocalHost().getHostName();
int nmPort = YarnConfiguration.DEFAULT_NM_PORT;
int nmHttpPort = YarnConfiguration.DEFAULT_NM_WEBAPP_PORT;
long appSubmitTime = System.currentTimeMillis();
dagAppMaster = createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(),
appSubmitTime, isSession, userDir.toUri().getPath());
clientHandler = new DAGClientHandler(dagAppMaster);
DAGAppMaster.initAndStartAppMaster(dagAppMaster, currentUser.getShortUserName());
} catch (Throwable t) {
LOG.fatal("Error starting DAGAppMaster", t);
if (dagAppMaster != null) {
dagAppMaster.stop();
dagAppMaster = null;
}
System.exit(1);
}
}
});
thread.setName("DAGAppMaster Thread");
LOG.info("DAGAppMaster thread has been created");
return thread;
}
// this can be overridden by test code to create a mock app
@VisibleForTesting
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String userDir) {
return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir, versionInfo.getVersion());
}
}