blob: 14c855066c37daa00cf55cb02d72f35f7c04c53d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.lib.SqoopRecord;
import org.apache.hadoop.sqoop.testutil.ExportJobTestCase;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
import org.junit.Before;
/**
* Test that we can export data from HDFS into databases.
*/
public class TestExport extends ExportJobTestCase {
@Before
public void setUp() {
// start the server
super.setUp();
// throw away any existing data that might be in the database.
try {
this.getTestServer().dropExistingSchema();
} catch (SQLException sqlE) {
fail(sqlE.toString());
}
}
private String getRecordLine(int recordNum, ColumnGenerator... extraCols) {
String idStr = Integer.toString(recordNum);
StringBuilder sb = new StringBuilder();
sb.append(idStr);
sb.append("\t");
sb.append(getMsgPrefix());
sb.append(idStr);
for (ColumnGenerator gen : extraCols) {
sb.append("\t");
sb.append(gen.getExportText(recordNum));
}
sb.append("\n");
return sb.toString();
}
/** When generating data for export tests, each column is generated
according to a ColumnGenerator. Methods exist for determining
what to put into text strings in the files to export, as well
as what the string representation of the column as returned by
the database should look like.
*/
interface ColumnGenerator {
/** for a row with id rowNum, what should we write into that
line of the text file to export?
*/
public String getExportText(int rowNum);
/** for a row with id rowNum, what should the database return
for the given column's value?
*/
public String getVerifyText(int rowNum);
/** Return the column type to put in the CREATE TABLE statement */
public String getType();
}
/**
* Create a data file that gets exported to the db
* @param fileNum the number of the file (for multi-file export)
* @param numRecords how many records to write to the file.
* @param gzip is true if the file should be gzipped.
*/
private void createTextFile(int fileNum, int numRecords, boolean gzip,
ColumnGenerator... extraCols) throws IOException {
int startId = fileNum * numRecords;
String ext = ".txt";
if (gzip) {
ext = ext + ".gz";
}
Path tablePath = getTablePath();
Path filePath = new Path(tablePath, "part" + fileNum + ext);
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(tablePath);
OutputStream os = fs.create(filePath);
if (gzip) {
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(filePath);
os = codec.createOutputStream(os);
}
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
for (int i = 0; i < numRecords; i++) {
w.write(getRecordLine(startId + i, extraCols));
}
w.close();
os.close();
if (gzip) {
verifyCompressedFile(filePath, numRecords);
}
}
private void verifyCompressedFile(Path f, int expectedNumLines) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(conf);
InputStream is = fs.open(f);
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(f);
LOG.info("gzip check codec is " + codec);
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (null == decompressor) {
LOG.info("Verifying gzip sanity with null decompressor");
} else {
LOG.info("Verifying gzip sanity with decompressor: " + decompressor.toString());
}
is = codec.createInputStream(is, decompressor);
BufferedReader r = new BufferedReader(new InputStreamReader(is));
int numLines = 0;
while (true) {
String ln = r.readLine();
if (ln == null) {
break;
}
numLines++;
}
r.close();
assertEquals("Did not read back correct number of lines",
expectedNumLines, numLines);
LOG.info("gzip sanity check returned " + numLines + " lines; ok.");
}
/**
* Create a data file in SequenceFile format that gets exported to the db
* @param fileNum the number of the file (for multi-file export).
* @param numRecords how many records to write to the file.
* @param className the table class name to instantiate and populate
* for each record.
*/
private void createSequenceFile(int fileNum, int numRecords, String className)
throws IOException {
try {
// Instantiate the value record object via reflection.
Class cls = Class.forName(className, true,
Thread.currentThread().getContextClassLoader());
SqoopRecord record = (SqoopRecord) ReflectionUtils.newInstance(cls, new Configuration());
// Create the SequenceFile.
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(conf);
Path tablePath = getTablePath();
Path filePath = new Path(tablePath, "part" + fileNum);
fs.mkdirs(tablePath);
SequenceFile.Writer w =
SequenceFile.createWriter(fs, conf, filePath, LongWritable.class, cls);
// Now write the data.
int startId = fileNum * numRecords;
for (int i = 0; i < numRecords; i++) {
record.parse(getRecordLine(startId + i));
w.append(new LongWritable(startId + i), record);
}
w.close();
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
} catch (RecordParser.ParseError pe) {
throw new IOException(pe);
}
}
/** Return the column name for a column index.
* Each table contains two columns named 'id' and 'msg', and then an
* arbitrary number of additional columns defined by ColumnGenerators.
* These columns are referenced by idx 0, 1, 2...
* @param idx the index of the ColumnGenerator in the array passed to
* createTable().
* @return the name of the column
*/
protected String forIdx(int idx) {
return "col" + idx;
}
/** Create the table definition to export to, removing any prior table.
By specifying ColumnGenerator arguments, you can add extra columns
to the table of arbitrary type.
*/
public void createTable(ColumnGenerator... extraColumns) throws SQLException {
Connection conn = getTestServer().getConnection();
PreparedStatement statement = conn.prepareStatement(
"DROP TABLE " + getTableName() + " IF EXISTS",
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.executeUpdate();
conn.commit();
statement.close();
StringBuilder sb = new StringBuilder();
sb.append("CREATE TABLE ");
sb.append(getTableName());
sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
int colNum = 0;
for (ColumnGenerator gen : extraColumns) {
sb.append(", " + forIdx(colNum++) + " " + gen.getType());
}
sb.append(")");
statement = conn.prepareStatement(sb.toString(),
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.executeUpdate();
conn.commit();
statement.close();
}
/** Removing an existing table directory from the filesystem */
private void removeTablePath() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(conf);
fs.delete(getTablePath(), true);
}
/** Verify that on a given row, a column has a given value.
* @param id the id column specifying the row to test.
*/
private void assertColValForRowId(int id, String colName, String expectedVal)
throws SQLException {
Connection conn = getTestServer().getConnection();
LOG.info("Verifying column " + colName + " has value " + expectedVal);
PreparedStatement statement = conn.prepareStatement(
"SELECT " + colName + " FROM " + getTableName() + " WHERE id = " + id,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = statement.executeQuery();
rs.next();
String actualVal = rs.getString(1);
rs.close();
statement.close();
assertEquals("Got unexpected column value", expectedVal, actualVal);
}
/** Verify that for the max and min values of the 'id' column, the values
for a given column meet the expected values.
*/
private void assertColMinAndMax(String colName, ColumnGenerator generator)
throws SQLException {
int minId = getMinRowId();
int maxId = getMaxRowId();
LOG.info("Checking min/max for column " + colName + " with type " + generator.getType());
String expectedMin = generator.getVerifyText(minId);
String expectedMax = generator.getVerifyText(maxId);
assertColValForRowId(minId, colName, expectedMin);
assertColValForRowId(maxId, colName, expectedMax);
}
/** Export 10 rows, make sure they load in correctly */
public void testTextExport() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
createTextFile(0, TOTAL_RECORDS, false);
createTable();
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
}
/** Export 10 rows from gzipped text files. */
public void testGzipExport() throws IOException, SQLException {
LOG.info("Beginning gzip export test");
final int TOTAL_RECORDS = 10;
createTextFile(0, TOTAL_RECORDS, true);
createTable();
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
LOG.info("Complete gzip export test");
}
/** Run 2 mappers, make sure all records load in correctly */
public void testMultiMapTextExport() throws IOException, SQLException {
final int RECORDS_PER_MAP = 10;
final int NUM_FILES = 2;
for (int f = 0; f < NUM_FILES; f++) {
createTextFile(f, RECORDS_PER_MAP, false);
}
createTable();
runExport(getArgv(true));
verifyExport(RECORDS_PER_MAP * NUM_FILES);
}
/** Export some rows from a SequenceFile, make sure they import correctly */
public void testSequenceFileExport() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
// First, generate class and jar files that represent the table we're exporting to.
LOG.info("Creating initial schema for SeqFile test");
createTable();
LOG.info("Generating code...");
List<String> generatedJars = runExport(getArgv(true, "--generate-only"));
// Now, wipe the created table so we can export on top of it again.
LOG.info("Resetting schema and data...");
createTable();
// Wipe the directory we use when creating files to export to ensure
// it's ready for new SequenceFiles.
removeTablePath();
assertNotNull(generatedJars);
assertEquals("Expected 1 generated jar file", 1, generatedJars.size());
String jarFileName = generatedJars.get(0);
// Sqoop generates jars named "foo.jar"; by default, this should contain a
// class named 'foo'. Extract the class name.
Path jarPath = new Path(jarFileName);
String jarBaseName = jarPath.getName();
assertTrue(jarBaseName.endsWith(".jar"));
assertTrue(jarBaseName.length() > ".jar".length());
String className = jarBaseName.substring(0, jarBaseName.length() - ".jar".length());
LOG.info("Using jar filename: " + jarFileName);
LOG.info("Using class name: " + className);
ClassLoader prevClassLoader = null;
try {
if (null != jarFileName) {
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, className);
}
// Now use this class and jar name to create a sequence file.
LOG.info("Writing data to SequenceFiles");
createSequenceFile(0, TOTAL_RECORDS, className);
// Now run and verify the export.
LOG.info("Exporting SequenceFile-based data");
runExport(getArgv(true, "--class-name", className, "--jar-file", jarFileName));
verifyExport(TOTAL_RECORDS);
} finally {
if (null != prevClassLoader) {
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
public void testIntCol() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
// generate a column equivalent to rownum.
ColumnGenerator gen = new ColumnGenerator() {
public String getExportText(int rowNum) {
return "" + rowNum;
}
public String getVerifyText(int rowNum) {
return "" + rowNum;
}
public String getType() {
return "INTEGER";
}
};
createTextFile(0, TOTAL_RECORDS, false, gen);
createTable(gen);
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
public void testBigIntCol() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
// generate a column that won't fit in a normal int.
ColumnGenerator gen = new ColumnGenerator() {
public String getExportText(int rowNum) {
long val = (long) rowNum * 1000000000;
return "" + val;
}
public String getVerifyText(int rowNum) {
long val = (long) rowNum * 1000000000;
return "" + val;
}
public String getType() {
return "BIGINT";
}
};
createTextFile(0, TOTAL_RECORDS, false, gen);
createTable(gen);
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
private String pad(int n) {
if (n <= 9) {
return "0" + n;
} else {
return String.valueOf(n);
}
}
public void testDatesAndTimes() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
ColumnGenerator genDate = new ColumnGenerator() {
public String getExportText(int rowNum) {
int day = rowNum + 1;
return "2009-10-" + day;
}
public String getVerifyText(int rowNum) {
int day = rowNum + 1;
return "2009-10-" + pad(day);
}
public String getType() {
return "DATE";
}
};
ColumnGenerator genTime = new ColumnGenerator() {
public String getExportText(int rowNum) {
return "10:01:" + rowNum;
}
public String getVerifyText(int rowNum) {
return "10:01:" + pad(rowNum);
}
public String getType() {
return "TIME";
}
};
createTextFile(0, TOTAL_RECORDS, false, genDate, genTime);
createTable(genDate, genTime);
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), genDate);
assertColMinAndMax(forIdx(1), genTime);
}
public void testNumericTypes() throws IOException, SQLException {
final int TOTAL_RECORDS = 10;
// Check floating point values
ColumnGenerator genFloat = new ColumnGenerator() {
public String getExportText(int rowNum) {
double v = 3.141 * (double) rowNum;
return "" + v;
}
public String getVerifyText(int rowNum) {
double v = 3.141 * (double) rowNum;
return "" + v;
}
public String getType() {
return "FLOAT";
}
};
// Check precise decimal placement. The first of ten
// rows will be 2.7181; the last of ten rows will be
// 2.71810.
ColumnGenerator genNumeric = new ColumnGenerator() {
public String getExportText(int rowNum) {
int digit = rowNum + 1;
return "2.718" + digit;
}
public String getVerifyText(int rowNum) {
int digit = rowNum + 1;
return "2.718" + digit;
}
public String getType() {
return "NUMERIC";
}
};
createTextFile(0, TOTAL_RECORDS, false, genFloat, genNumeric);
createTable(genFloat, genNumeric);
runExport(getArgv(true));
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), genFloat);
assertColMinAndMax(forIdx(1), genNumeric);
}
}