blob: f5a709bd0403967ee0e9411ffb1cc7d5db6b0ad1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.hive;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
/**
* Utility to import a table into the Hive metastore. Manages the connection
* to Hive itself as well as orchestrating the use of the other classes in this
* package.
*/
public class HiveImport {
public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
private SqoopOptions options;
private ConnManager connManager;
private Configuration configuration;
public HiveImport(final SqoopOptions opts, final ConnManager connMgr, final Configuration conf) {
this.options = opts;
this.connManager = connMgr;
this.configuration = conf;
}
/**
* @return the filename of the hive executable to run to do the import
*/
private String getHiveBinPath() {
// If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
// exists.
// Fall back to just plain 'hive' and hope it's in the path.
String hiveHome = options.getHiveHome();
if (null == hiveHome) {
return "hive";
}
Path p = new Path(hiveHome);
p = new Path(p, "bin");
p = new Path(p, "hive");
String hiveBinStr = p.toString();
if (new File(hiveBinStr).exists()) {
return hiveBinStr;
} else {
return "hive";
}
}
/**
* If we used a MapReduce-based upload of the data, remove the _logs dir
* from where we put it, before running Hive LOAD DATA INPATH
*/
private void removeTempLogs(String tableName) throws IOException {
FileSystem fs = FileSystem.get(configuration);
String warehouseDir = options.getWarehouseDir();
Path tablePath;
if (warehouseDir != null) {
tablePath = new Path(new Path(warehouseDir), tableName);
} else {
tablePath = new Path(tableName);
}
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
if (!fs.delete(logsPath, true)) {
LOG.warn("Could not delete temporary files; continuing with import, but it may fail.");
}
}
}
public void importTable(String tableName) throws IOException {
removeTempLogs(tableName);
LOG.info("Loading uploaded data into Hive");
// For testing purposes against our mock hive implementation,
// if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
// environment variable for the child hive process. We also disable
// timestamp comments so that we have deterministic table creation scripts.
String expectedScript = System.getProperty("expected.script");
List<String> env = Executor.getCurEnvpStrings();
boolean debugMode = expectedScript != null;
if (debugMode) {
env.add("EXPECTED_SCRIPT=" + expectedScript);
env.add("TMPDIR=" + options.getTempDir());
}
// generate the HQL statements to run.
TableDefWriter tableWriter = new TableDefWriter(options, connManager, tableName,
configuration, !debugMode);
String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
// write them to a script file.
File tempFile = File.createTempFile("hive-script-",".txt", new File(options.getTempDir()));
try {
String tmpFilename = tempFile.toString();
BufferedWriter w = null;
try {
FileOutputStream fos = new FileOutputStream(tempFile);
w = new BufferedWriter(new OutputStreamWriter(fos));
w.write(createTableStr, 0, createTableStr.length());
w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
} catch (IOException ioe) {
LOG.error("Error writing Hive load-in script: " + ioe.toString());
ioe.printStackTrace();
throw ioe;
} finally {
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.warn("IOException closing stream to Hive script: " + ioe.toString());
}
}
}
// run Hive on the script and note the return code.
String hiveExec = getHiveBinPath();
ArrayList<String> args = new ArrayList<String>();
args.add(hiveExec);
args.add("-f");
args.add(tmpFilename);
LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
int ret = Executor.exec(args.toArray(new String[0]),
env.toArray(new String[0]), logSink, logSink);
if (0 != ret) {
throw new IOException("Hive exited with status " + ret);
}
LOG.info("Hive import complete.");
} finally {
if (!tempFile.delete()) {
LOG.warn("Could not remove temporary file: " + tempFile.toString());
// try to delete the file later.
tempFile.deleteOnExit();
}
}
}
}