blob: 7f456552a76a8b797195d9084400c0dfdcdb6a49 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.ExportJobContext;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
/**
* Actually runs a jdbc export job using the ORM files generated by the sqoop.orm package.
* Uses DBOutputFormat
*/
public class ExportJob {
public static final Log LOG = LogFactory.getLog(ExportJob.class.getName());
public static final String SQOOP_EXPORT_TABLE_CLASS_KEY = "sqoop.export.table.class";
private ExportJobContext context;
public ExportJob(final ExportJobContext ctxt) {
this.context = ctxt;
}
/**
* @return true if p is a SequenceFile, or a directory containing
* SequenceFiles.
*/
private boolean isSequenceFiles(Path p) throws IOException {
Configuration conf = context.getOptions().getConf();
FileSystem fs = p.getFileSystem(conf);
try {
FileStatus stat = fs.getFileStatus(p);
if (null == stat) {
// Couldn't get the item.
LOG.warn("Input path " + p + " does not exist");
return false;
}
if (stat.isDir()) {
FileStatus [] subitems = fs.listStatus(p);
if (subitems == null || subitems.length == 0) {
LOG.warn("Input path " + p + " contains no files");
return false; // empty dir.
}
// Pick a random child entry to examine instead.
stat = subitems[0];
}
if (null == stat) {
LOG.warn("null FileStatus object in isSequenceFiles(); assuming false.");
return false;
}
Path target = stat.getPath();
// Test target's header to see if it contains magic numbers indicating it's
// a SequenceFile.
byte [] header = new byte[3];
FSDataInputStream is = null;
try {
is = fs.open(target);
is.readFully(header);
} catch (IOException ioe) {
// Error reading header or EOF; assume not a SequenceFile.
LOG.warn("IOException checking SequenceFile header: " + ioe);
return false;
} finally {
try {
if (null != is) {
is.close();
}
} catch (IOException ioe) {
// ignore; closing.
LOG.warn("IOException closing input stream: " + ioe + "; ignoring.");
}
}
// Return true (isSequenceFile) iff the magic number sticks.
return header[0] == 'S' && header[1] == 'E' && header[2] == 'Q';
} catch (FileNotFoundException fnfe) {
LOG.warn("Input path " + p + " does not exist");
return false; // doesn't exist!
}
}
/**
* Run an export job to dump a table from HDFS to a database
*/
public void runExport() throws IOException {
SqoopOptions options = context.getOptions();
Configuration conf = options.getConf();
String tableName = context.getTableName();
String tableClassName = new TableClassName(options).getClassForTable(tableName);
String ormJarFile = context.getJarFile();
LOG.info("Beginning export of " + tableName);
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
ClassLoader prevClassLoader = null;
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
// as the job source, we're running in the current thread. Push on another classloader
// that loads from that jar in addition to everything currently on the classpath.
prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
}
try {
Job job = new Job(conf);
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
Path inputPath = new Path(context.getOptions().getExportDir());
inputPath = inputPath.makeQualified(FileSystem.get(conf));
if (isSequenceFiles(inputPath)) {
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(SequenceFileExportMapper.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TextExportMapper.class);
}
FileInputFormat.addInputPath(job, inputPath);
job.setNumReduceTasks(0);
ConnManager mgr = new ConnFactory(conf).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString());
} else {
DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
options.getConnectString(), username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
DBOutputFormat.setOutput(job, tableName, colNames);
job.setOutputFormatClass(DBOutputFormat.class);
job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
job.setMapOutputKeyClass(SqoopRecord.class);
job.setMapOutputValueClass(NullWritable.class);
try {
job.waitForCompletion(false);
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
} finally {
if (isLocal && null != prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
}