blob: 030f0e0ca6a5bcb594a655e9cf2fe2b00aced8b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.mapred;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
*/
public class ImportJob {
public static final Log LOG = LogFactory.getLog(ImportJob.class.getName());
private ImportOptions options;
public ImportJob(final ImportOptions opts) {
this.options = opts;
}
/**
* Run an import job to read a table in to HDFS
*
* @param tableName the database table to read
* @param ormJarFile the Jar file to insert into the dcache classpath. (may be null)
* @param orderByCol the column of the database table to use to order the import
* @param conf A fresh Hadoop Configuration to use to build an MR job.
*/
public void runImport(String tableName, String ormJarFile, String orderByCol,
Configuration conf) throws IOException {
LOG.info("Beginning data import of " + tableName);
String tableClassName = new TableClassName(options).getClassForTable(tableName);
boolean isLocal = "local".equals(conf.get(JTConfig.JT_IPC_ADDRESS));
ClassLoader prevClassLoader = null;
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
// as the job source, we're running in the current thread. Push on another classloader
// that loads from that jar in addition to everything currently on the classpath.
prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, tableClassName);
}
try {
JobConf job = new JobConf(conf);
job.setJar(ormJarFile);
String hdfsWarehouseDir = options.getWarehouseDir();
Path outputPath;
if (null != hdfsWarehouseDir) {
Path hdfsWarehousePath = new Path(hdfsWarehouseDir);
hdfsWarehousePath.makeQualified(FileSystem.get(job));
outputPath = new Path(hdfsWarehousePath, tableName);
} else {
outputPath = new Path(tableName);
}
if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
job.setOutputFormat(RawKeyTextOutputFormat.class);
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
}
job.setNumReduceTasks(0);
job.setNumMapTasks(1);
job.setInputFormat(DBInputFormat.class);
job.setMapRunnerClass(AutoProgressMapRunner.class);
FileOutputFormat.setOutputPath(job, outputPath);
ConnManager mgr = new ConnFactory(conf).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job, mgr.getDriverClass(), options.getConnectString());
} else {
DBConfiguration.configureDB(job, mgr.getDriverClass(), options.getConnectString(),
username, options.getPassword());
}
String [] colNames = options.getColumns();
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
// We can't set the class properly in here, because we may not have the
// jar loaded in this JVM. So we start by calling setInput() with DBWritable,
// and then overriding the string manually.
DBInputFormat.setInput(job, DBWritable.class, tableName, whereClause,
orderByCol, colNames);
job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
PerfCounters counters = new PerfCounters();
counters.startClock();
RunningJob runningJob = JobClient.runJob(job);
counters.stopClock();
// TODO(aaron): Is this the correct way to determine how much data got written?
counters.addBytes(runningJob.getCounters().getGroup("FileSystemCounters")
.getCounterForName("FILE_BYTES_WRITTEN").getCounter());
LOG.info("Transferred " + counters.toString());
} finally {
if (isLocal && null != prevClassLoader) {
// unload the special classloader for this jar.
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
}