blob: 0d8ee1bbd441076792b417b159c63f45bfe1a573 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.hadoop;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.output.TeeOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hive.beeline.BeeLine;
import com.google.common.annotations.VisibleForTesting;
public class Hive2Main extends LauncherMain {
@VisibleForTesting
static final Pattern[] HIVE2_JOB_IDS_PATTERNS = {
Pattern.compile("Ended Job = (job_\\S*)"),
Pattern.compile("Submitted application (application[0-9_]*)"),
Pattern.compile("Running with YARN Application = (application[0-9_]*)"),
Pattern.compile("Executing on YARN cluster with App id (application[0-9_]*)")
};
private static final Set<String> DISALLOWED_BEELINE_OPTIONS = new HashSet<>();
static {
DISALLOWED_BEELINE_OPTIONS.add("-u");
DISALLOWED_BEELINE_OPTIONS.add("-n");
DISALLOWED_BEELINE_OPTIONS.add("-p");
DISALLOWED_BEELINE_OPTIONS.add("-d");
DISALLOWED_BEELINE_OPTIONS.add("-e");
DISALLOWED_BEELINE_OPTIONS.add("-f");
DISALLOWED_BEELINE_OPTIONS.add("-a");
DISALLOWED_BEELINE_OPTIONS.add("--help");
}
public static void main(String[] args) throws Exception {
run(Hive2Main.class, args);
}
private static Configuration initActionConf() {
// Loading action conf prepared by Oozie
Configuration actionConf = new Configuration(false);
String actionXml = System.getProperty("oozie.action.conf.xml");
if (actionXml == null) {
throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]");
}
if (!new File(actionXml).exists()) {
throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist");
} else {
System.out.println("Using action configuration file " + actionXml);
}
actionConf.addResource(new Path("file:///", actionXml));
setYarnTag(actionConf);
// Propagate delegation related props from launcher job to Hive job
String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
if (delegationToken != null) {
actionConf.set("mapreduce.job.credentials.binary", delegationToken);
actionConf.set("tez.credentials.path", delegationToken);
System.out.println("------------------------");
System.out.println("Setting env property for mapreduce.job.credentials.binary to: " + delegationToken);
System.out.println("------------------------");
System.setProperty("mapreduce.job.credentials.binary", delegationToken);
} else {
System.out.println("Non-Kerberos execution");
}
// See https://issues.apache.org/jira/browse/HIVE-1411
actionConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
return actionConf;
}
@Override
protected void run(String[] args) throws Exception {
System.out.println();
System.out.println("Oozie Hive 2 action configuration");
System.out.println("=================================================================");
System.out.println();
Configuration actionConf = initActionConf();
//Logfile to capture job IDs
String hadoopJobId = System.getProperty("oozie.launcher.job.id");
if (hadoopJobId == null) {
throw new RuntimeException("Launcher Hadoop Job ID system property not set");
}
String logFile = new File("hive2-oozie-" + hadoopJobId + ".log").getAbsolutePath();
List<String> arguments = new ArrayList<>();
String jdbcUrl = actionConf.get(Hive2ActionExecutor.HIVE2_JDBC_URL);
if (jdbcUrl == null) {
throw new RuntimeException("Action Configuration does not have [" + Hive2ActionExecutor.HIVE2_JDBC_URL
+ "] property");
}
arguments.add("-u");
arguments.add(jdbcUrl);
// Use the user who is running the map task
String username = actionConf.get("user.name");
arguments.add("-n");
arguments.add(username);
String password = actionConf.get(Hive2ActionExecutor.HIVE2_PASSWORD);
if (password == null) {
// Have to pass something or Beeline might interactively prompt, which we don't want
password = "DUMMY";
}
arguments.add("-p");
arguments.add(password);
// We always use the same driver
arguments.add("-d");
arguments.add("org.apache.hive.jdbc.HiveDriver");
String scriptPath = actionConf.get(Hive2ActionExecutor.HIVE2_SCRIPT);
String query = actionConf.get(Hive2ActionExecutor.HIVE2_QUERY);
if (scriptPath != null) {
if (!new File(scriptPath).exists()) {
throw new RuntimeException("Hive 2 script file [" + scriptPath + "] does not exist");
}
// print out current directory & its contents
File localDir = new File("dummy").getAbsoluteFile().getParentFile();
System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
System.out.println("------------------------");
String[] files = localDir.list();
if (files != null) {
for (String file : files) {
System.out.println(" " + file);
}
}
System.out.println("------------------------");
System.out.println();
// Prepare the Hive Script
String script = readStringFromFile(scriptPath);
System.out.println();
System.out.println("Script [" + scriptPath + "] content: ");
System.out.println("------------------------");
System.out.println(script);
System.out.println("------------------------");
System.out.println();
arguments.add("-f");
arguments.add(scriptPath);
} else if (query != null) {
System.out.println("Query: ");
System.out.println("------------------------");
System.out.println(query);
System.out.println("------------------------");
System.out.println();
String filename = createScriptFile(query);
arguments.add("-f");
arguments.add(filename);
} else {
throw new RuntimeException("Action Configuration does not have ["
+ Hive2ActionExecutor.HIVE2_SCRIPT + "], or ["
+ Hive2ActionExecutor.HIVE2_QUERY + "] property");
}
// Pass any parameters to Beeline via arguments
String[] params = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS);
if (params.length > 0) {
System.out.println("Parameters:");
System.out.println("------------------------");
for (String param : params) {
System.out.println(" " + param);
int idx = param.indexOf('=');
if (idx == -1) {
throw new RuntimeException("Parameter expression must contain an assignment: " + param);
} else if (idx == 0) {
throw new RuntimeException("Parameter value not specified: " + param);
}
arguments.add("--hivevar");
arguments.add(param);
}
System.out.println("------------------------");
System.out.println();
}
// This tells BeeLine to look for a delegation token; otherwise it won't and will fail in secure mode because there are no
// Kerberos credentials. In non-secure mode, this argument is ignored so we can simply always pass it.
arguments.add("-a");
arguments.add("delegationToken");
String[] beelineArgs = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS);
for (String beelineArg : beelineArgs) {
if (DISALLOWED_BEELINE_OPTIONS.contains(beelineArg)) {
throw new RuntimeException("Error: Beeline argument " + beelineArg + " is not supported");
}
arguments.add(beelineArg);
}
// Propagate MR job tag if defined
if (actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS) != null ) {
arguments.add("--hiveconf");
arguments.add("mapreduce.job.tags=" + actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS));
}
// Propagate "oozie.*" configs (but not "oozie.launcher.*" nor "oozie.hive2.*")
for (Map.Entry<String, String> oozieConfig : actionConf.getValByRegex("^oozie\\.(?!launcher|hive2).+").entrySet()) {
arguments.add("--hiveconf");
arguments.add(oozieConfig.getKey() + "=" + oozieConfig.getValue());
}
System.out.println("Beeline command arguments :");
for (String arg : arguments) {
System.out.println(" " + arg);
}
System.out.println();
LauncherMain.killChildYarnJobs(actionConf);
System.out.println("=================================================================");
System.out.println();
System.out.println(">>> Invoking Beeline command line now >>>");
System.out.println();
System.out.flush();
try {
runBeeline(arguments.toArray(new String[arguments.size()]), logFile);
}
finally {
System.out.println("\n<<< Invocation of Beeline command completed <<<\n");
writeExternalChildIDs(logFile, HIVE2_JOB_IDS_PATTERNS, "Beeline");
}
}
private String createScriptFile(String query) throws IOException {
String filename = "oozie-hive2-query-" + System.currentTimeMillis() + ".hql";
File f = new File(filename);
if (query != null && !query.endsWith(System.lineSeparator())) {
query += System.lineSeparator();
}
FileUtils.writeStringToFile(f, query, StandardCharsets.UTF_8);
return filename;
}
private void runBeeline(String[] args, String logFile) throws Exception {
// We do this instead of calling BeeLine.main so we can duplicate the error stream for harvesting Hadoop child job IDs
BeeLine beeLine = new BeeLine();
beeLine.setErrorStream(new PrintStream(new TeeOutputStream(System.err, new FileOutputStream(logFile)),
false, StandardCharsets.UTF_8.name()));
int status = beeLine.begin(args, null);
beeLine.close();
if (status != 0) {
System.exit(status);
}
}
private static String readStringFromFile(String filePath) throws IOException {
String line;
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), StandardCharsets.UTF_8.name()));
StringBuilder sb = new StringBuilder();
String sep = System.getProperty("line.separator");
while ((line = br.readLine()) != null) {
sb.append(line).append(sep);
}
return sb.toString();
}
finally {
if (br != null) {
br.close();
}
}
}
}