blob: 5922100226517e7c2cea198aae8985070a8c2e57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.examples;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.cli.Options;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.CallerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@InterfaceAudience.Private
public abstract class TezExampleBase extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(TezExampleBase.class);
private TezClient tezClientInternal;
protected static final String DISABLE_SPLIT_GROUPING = "disableSplitGrouping";
protected static final String LOCAL_MODE = "local";
protected static final String COUNTER_LOG = "counter";
private boolean disableSplitGrouping = false;
private boolean isLocalMode = false;
private boolean isCountersLog = false;
protected boolean isCountersLog() {
return isCountersLog;
}
protected boolean isDisableSplitGrouping() {
return disableSplitGrouping;
}
private Options getExtraOptions() {
Options options = new Options();
options.addOption(LOCAL_MODE, false, "run it as local mode");
options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping");
options.addOption(COUNTER_LOG, false , "print counter log");
return options;
}
@Override
public final int run(String[] args) throws Exception {
Configuration conf = getConf();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(), args);
String[] otherArgs = optionParser.getRemainingArgs();
if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) {
isLocalMode = true;
}
if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) {
disableSplitGrouping = true;
}
if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
isCountersLog = true;
}
return _execute(otherArgs, null, null);
}
/**
* Utility method to use the example from within code or a test.
*
* @param conf the tez configuration instance which will be used to crate the DAG and
* possible the Tez Client.
* @param args arguments to the example
* @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one
* exists. If no TezClient is specified (null), one will be created based on the
* provided configuration. If TezClient is specified, local mode option can not been
* specified in arguments, it takes no effect.
* @return Zero indicates success, non-zero indicates failure
* @throws Exception
*/
public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
Exception {
setConf(conf);
GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(), args);
if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) {
isLocalMode = true;
if (tezClient != null) {
throw new RuntimeException("can't specify local mode when TezClient is created, it takes no effect");
}
}
if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) {
disableSplitGrouping = true;
}
if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
isCountersLog = true;
}
String[] otherArgs = optionParser.getRemainingArgs();
return _execute(otherArgs, conf, tezClient);
}
/**
* @param dag the dag to execute
* @param printCounters whether to print counters or not
* @param logger the logger to use while printing diagnostics
* @return Zero indicates success, non-zero indicates failure
* @throws TezException
* @throws InterruptedException
* @throws IOException
*/
public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException,
InterruptedException, IOException {
tezClientInternal.waitTillReady();
CallerContext callerContext = CallerContext.create("TezExamples",
"Tez Example DAG: " + dag.getName());
ApplicationId appId = tezClientInternal.getAppMasterApplicationId();
if (appId != null) {
callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication");
}
dag.setCallerContext(callerContext);
DAGClient dagClient = tezClientInternal.submitDAG(dag);
Set<StatusGetOpts> getOpts = Sets.newHashSet();
if (printCounters) {
getOpts.add(StatusGetOpts.GET_COUNTERS);
}
DAGStatus dagStatus;
dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
return -1;
}
return 0;
}
private int _validateArgs(String[] args) {
int res = validateArgs(args);
if (res != 0) {
_printUsage();
return res;
}
return 0;
}
private int _execute(String[] otherArgs, TezConfiguration tezConf, TezClient tezClient) throws
Exception {
int result = _validateArgs(otherArgs);
if (result != 0) {
return result;
}
if (tezConf == null) {
tezConf = new TezConfiguration(getConf());
}
if (isLocalMode) {
LOG.info("Running in local mode...");
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf.set("fs.defaultFS", "file:///");
tezConf.setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
}
UserGroupInformation.setConfiguration(tezConf);
boolean ownTezClient = false;
if (tezClient == null) {
ownTezClient = true;
tezClientInternal = createTezClient(tezConf);
} else {
tezClientInternal = tezClient;
}
try {
return runJob(otherArgs, tezConf, tezClientInternal);
} finally {
if (ownTezClient && tezClientInternal != null) {
tezClientInternal.stop();
}
}
}
private TezClient createTezClient(TezConfiguration tezConf) throws IOException, TezException {
TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
tezClient.start();
return tezClient;
}
private void _printUsage() {
printUsage();
System.err.println();
printExtraOptionsUsage(System.err);
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
}
/**
* Print usage instructions for this example
*/
protected abstract void printUsage();
protected void printExtraOptionsUsage(PrintStream ps) {
ps.println("Tez example extra options supported are");
// TODO TEZ-1348 make it able to access dfs in tez local mode
ps.println("-" + LOCAL_MODE + "\t\trun it in tez local mode, currently it can only access local file system in tez local mode,"
+ " run it in distributed mode without this option");
ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
+ " enable split grouping without this option.");
ps.println("-" + COUNTER_LOG + "\t\t to print counters information");
ps.println();
ps.println("The Tez example extra options usage syntax is ");
ps.println("example_name [extra_options] [example_parameters]");
}
/**
* Validate the arguments
*
* @param otherArgs arguments, if any
* @return Zero indicates success, non-zero indicates failure
*/
protected abstract int validateArgs(String[] otherArgs);
/**
* Create and execute the actual DAG for the example
*
* @param args arguments for execution
* @param tezConf the tez configuration instance to be used while processing the DAG
* @param tezClient the tez client instance to use to run the DAG if any custom monitoring is
* required. Otherwise the utility method {@link #runDag(org.apache.tez.dag.api.DAG,
* boolean, org.slf4j.Logger)} should be used
* @return Zero indicates success, non-zero indicates failure
* @throws IOException
* @throws TezException
*/
protected abstract int runJob(String[] args, TezConfiguration tezConf,
TezClient tezClient) throws Exception;
}