blob: 952db6dd8616bc5c4b88c665f5300cb9bc32a2fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.hive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.io.CodecMap;
import java.io.IOException;
/**
* Class containing the common logic for different HiveClient implementations.
*/
public class HiveClientCommon {
public static final Log LOG = LogFactory.getLog(HiveClientCommon.class.getName());
/**
* If we used a MapReduce-based upload of the data, remove the _logs dir
* from where we put it, before running Hive LOAD DATA INPATH.
*/
public void removeTempLogs(Configuration configuration, Path tablePath) throws IOException {
FileSystem fs = tablePath.getFileSystem(configuration);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
if (!fs.delete(logsPath, true)) {
LOG.warn("Could not delete temporary files; "
+ "continuing with import, but it may fail.");
}
}
}
/**
* Clean up after successful HIVE import.
*
* @param outputPath path to the output directory
* @throws IOException
*/
public void cleanUp(Configuration configuration, Path outputPath) throws IOException {
FileSystem fs = outputPath.getFileSystem(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
// that is blank for case that user wants to periodically populate HIVE
// table (for example with --hive-overwrite).
try {
if (outputPath != null && fs.exists(outputPath)) {
FileStatus[] statuses = fs.listStatus(outputPath);
if (statuses.length == 0) {
LOG.info("Export directory is empty, removing it.");
fs.delete(outputPath, true);
} else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
fs.delete(outputPath, true);
} else {
LOG.info("Export directory is not empty, keeping it.");
}
}
} catch(IOException e) {
LOG.error("Issue with cleaning (safe to ignore)", e);
}
}
public void indexLzoFiles(SqoopOptions sqoopOptions, Path finalPath) throws IOException {
String codec = sqoopOptions.getCompressionCodec();
if (codec != null && (codec.equals(CodecMap.LZOP)
|| codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
try {
Tool tool = ReflectionUtils.newInstance(Class.
forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
asSubclass(Tool.class), sqoopOptions.getConf());
ToolRunner.run(sqoopOptions.getConf(), tool,
new String[]{finalPath.toString()});
} catch (Exception ex) {
LOG.error("Error indexing lzo files", ex);
throw new IOException("Error indexing lzo files", ex);
}
}
}
}