SQOOP-1094: Add Avro support to merge tool
(Yibing Shi via Jarek Jarcec Cecho)
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index ee3cf62..9036076 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -18,14 +18,26 @@
package org.apache.sqoop.avro;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.sqoop.lib.BlobRef;
import org.apache.sqoop.lib.ClobRef;
import org.apache.sqoop.orm.ClassWriter;
+import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
@@ -184,4 +196,35 @@
}
}
+ /**
+ * Get the schema of AVRO files stored in a directory
+ */
+ public static Schema getAvroSchema(Path path, Configuration conf)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ Path fileToTest;
+ if (fs.isDirectory(path)) {
+ FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ });
+ if (fileStatuses.length == 0) {
+ return null;
+ }
+ fileToTest = fileStatuses[0].getPath();
+ } else {
+ fileToTest = path;
+ }
+
+ SeekableInput input = new FsInput(fileToTest, conf);
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+ FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
+
+ Schema result = fileReader.getSchema();
+ fileReader.close();
+ return result;
+ }
}
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroJob.java b/src/java/org/apache/sqoop/mapreduce/AvroJob.java
index bb4755c..65695d8 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroJob.java
@@ -42,6 +42,11 @@
return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
}
+ /** Set a job's output key schema. */
+ public static void setOutputSchema(Configuration job, Schema s) {
+ job.set(OUTPUT_SCHEMA, s.toString());
+ }
+
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
return Schema.parse(job.get(OUTPUT_SCHEMA));
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java
new file mode 100644
index 0000000..a2277bf
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.avro.AvroUtil;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Mapper for the merge program which operates on AVRO data files.
+ */
+public class MergeAvroMapper
+ extends
+ MergeMapperBase<AvroWrapper<GenericRecord>, NullWritable> {
+
+ private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
+ private SqoopRecord sqoopRecordImpl;
+
+ @Override
+ protected void setup(Context context) throws InterruptedException, IOException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY);
+ try {
+ final Class<? extends Object> clazz = Class.forName(userClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf);
+ for (final Field field : clazz.getDeclaredFields()) {
+ final String fieldName = field.getName();
+ final String fieldTypeName = field.getType().getName();
+ sqoopRecordFields.put(fieldName.toLowerCase(), new Pair<String, String>(fieldName,
+ fieldTypeName));
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find the user record class with class name"
+ + userClassName, e);
+ }
+ }
+
+ @Override
+ public void map(AvroWrapper<GenericRecord> key, NullWritable val, Context c)
+ throws IOException, InterruptedException {
+ processRecord(toSqoopRecord(key.datum()), c);
+ }
+
+ private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException {
+ Schema avroSchema = genericRecord.getSchema();
+ for (Schema.Field field : avroSchema.getFields()) {
+ Pair<String, String> sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase());
+ if (null == sqoopRecordField) {
+ throw new IOException("Cannot find field '" + field.name() + "' in fields of user class"
+ + sqoopRecordImpl.getClass().getName() + ". Fields are: "
+ + Arrays.deepToString(sqoopRecordFields.values().toArray()));
+ }
+ Object avroObject = genericRecord.get(field.name());
+ Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value());
+ sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal);
+ }
+ return sqoopRecordImpl;
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java
new file mode 100644
index 0000000..2e85f51
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.avro.AvroUtil;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+public class MergeAvroReducer extends MergeReducerBase<AvroWrapper<GenericRecord>, NullWritable> {
+ private AvroWrapper<GenericRecord> wrapper;
+ private Schema schema;
+ private boolean bigDecimalFormatString;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ wrapper = new AvroWrapper<GenericRecord>();
+ schema = AvroJob.getOutputSchema(context.getConfiguration());
+ bigDecimalFormatString = context.getConfiguration().getBoolean(
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+ }
+
+ @Override
+ protected void writeRecord(SqoopRecord record, Context context)
+ throws IOException, InterruptedException {
+ GenericRecord outKey = AvroUtil.toGenericRecord(record.getFieldMap(), schema,
+ bigDecimalFormatString);
+ wrapper.datum(outKey);
+ context.write(wrapper, NullWritable.get());
+ }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 4e2a916..5b6c4df 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -20,9 +20,19 @@
import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -30,7 +40,10 @@
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.util.Jars;
+
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JobBase;
@@ -114,20 +127,26 @@
FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
- if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setMapperClass(MergeRecordMapper.class);
- } else {
- job.setMapperClass(MergeTextMapper.class);
- job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+ FileType fileType = ExportJobBase.getFileType(jobConf, oldPath);
+ switch (fileType) {
+ case AVRO_DATA_FILE:
+ configueAvroMergeJob(conf, job, oldPath, newPath);
+ break;
+ case SEQUENCE_FILE:
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setMapperClass(MergeRecordMapper.class);
+ job.setReducerClass(MergeReducer.class);
+ break;
+ default:
+ job.setMapperClass(MergeTextMapper.class);
+ job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+ job.setReducerClass(MergeReducer.class);
}
jobConf.set("mapred.output.key.class", userClassName);
job.setOutputValueClass(NullWritable.class);
- job.setReducerClass(MergeReducer.class);
-
// Set the intermediate data types.
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MergeRecord.class);
@@ -142,6 +161,23 @@
throw new IOException(cnfe);
}
}
+
+ private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath)
+ throws IOException {
+ LOG.info("Trying to merge avro files");
+ final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf);
+ final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf);
+ if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) {
+ throw new IOException("Invalid schema for input directories. Schema for old data: ["
+ + oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]");
+ }
+ LOG.debug("Avro Schema:" + oldPathSchema);
+ job.setInputFormatClass(AvroInputFormat.class);
+ job.setOutputFormatClass(AvroOutputFormat.class);
+ job.setMapperClass(MergeAvroMapper.class);
+ job.setReducerClass(MergeAvroReducer.class);
+ AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
+ }
}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
index cafff8a..6192cdb 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
@@ -29,28 +29,12 @@
* a new one if possible; otherwise, an old one.
*/
public class MergeReducer
- extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
+ extends MergeReducerBase<SqoopRecord, NullWritable> {
@Override
- public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
+ protected void writeRecord(SqoopRecord record, Context c)
throws IOException, InterruptedException {
- SqoopRecord bestRecord = null;
- try {
- for (MergeRecord val : vals) {
- if (null == bestRecord && !val.isNewRecord()) {
- // Use an old record if we don't have a new record.
- bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
- } else if (val.isNewRecord()) {
- bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
- }
- }
- } catch (CloneNotSupportedException cnse) {
- throw new IOException(cnse);
- }
-
- if (null != bestRecord) {
- c.write(bestRecord, NullWritable.get());
- }
+ c.write(record, NullWritable.get());
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java b/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java
new file mode 100644
index 0000000..4af498f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+public abstract class MergeReducerBase<KEYOUT, VALUEOUT> extends
+ Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
+
+ @Override
+ public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
+ throws IOException, InterruptedException {
+ SqoopRecord bestRecord = null;
+ try {
+ for (MergeRecord val : vals) {
+ if (null == bestRecord && !val.isNewRecord()) {
+ // Use an old record if we don't have a new record.
+ bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+ } else if (val.isNewRecord()) {
+ bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+ }
+ }
+ } catch (CloneNotSupportedException cnse) {
+ throw new IOException(cnse);
+ }
+
+ if (null != bestRecord) {
+ writeRecord(bestRecord, c);
+ }
+ }
+
+ abstract protected void writeRecord(SqoopRecord record, Context c)
+ throws IOException, InterruptedException;
+}
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
index 3821aa1..1709419 100644
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ b/src/test/com/cloudera/sqoop/TestMerge.java
@@ -25,18 +25,12 @@
import java.sql.Timestamp;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-
+import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
-
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
+import com.cloudera.sqoop.SqoopOptions.FileLayout;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
@@ -44,6 +38,20 @@
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.MergeTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/**
* Test that the merge tool works.
@@ -58,6 +66,19 @@
public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
+ private static final List<List<Integer>> initRecords = Arrays
+ .asList(Arrays.asList(new Integer(0), new Integer(0)),
+ Arrays.asList(new Integer(1), new Integer(42)));
+
+ private static final List<List<Integer>> newRecords = Arrays.asList(
+ Arrays.asList(new Integer(1), new Integer(43)),
+ Arrays.asList(new Integer(3), new Integer(313)));
+
+ private static final List<List<Integer>> mergedRecords = Arrays.asList(
+ Arrays.asList(new Integer(0), new Integer(0)),
+ Arrays.asList(new Integer(1), new Integer(43)),
+ Arrays.asList(new Integer(3), new Integer(313)));
+
@Override
public void setUp() {
super.setUp();
@@ -71,6 +92,9 @@
}
public static final String TABLE_NAME = "MergeTable";
+ private static final String OLD_PATH = "merge-old";
+ private static final String NEW_PATH = "merge_new";
+ private static final String FINAL_PATH = "merge_final";
public Configuration newConf() {
Configuration conf = new Configuration();
@@ -91,7 +115,7 @@
return options;
}
- protected void createTable() throws SQLException {
+ protected void createTable(List<List<Integer>> records) throws SQLException {
PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS");
try {
s.executeUpdate();
@@ -99,32 +123,38 @@
s.close();
}
- s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME + "\" (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)");
+ s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME
+ + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)");
try {
s.executeUpdate();
} finally {
s.close();
}
- s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (0, 0, NOW())");
- try {
- s.executeUpdate();
- } finally {
- s.close();
- }
-
- s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (1, 42, NOW())");
- try {
- s.executeUpdate();
- } finally {
- s.close();
+ for (List<Integer> record : records) {
+ final String values = StringUtils.join(record, ", ");
+ s = conn
+ .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())");
+ try {
+ s.executeUpdate();
+ } finally {
+ s.close();
+ }
}
conn.commit();
}
- public void testMerge() throws Exception {
- createTable();
+ public void testTextFileMerge() throws Exception {
+ runMergeTest(SqoopOptions.FileLayout.TextFile);
+ }
+
+ public void testAvroFileMerge() throws Exception {
+ runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
+ }
+
+ public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
+ createTable(initRecords);
// Create a jar to use for the merging process; we'll load it
// into the current thread CL for when this runs. This needs
@@ -146,80 +176,33 @@
String jarFileName = jars.get(0);
// Now do the imports.
-
- Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
-
- options = getSqoopOptions(newConf());
- options.setTableName(TABLE_NAME);
- options.setNumMappers(1);
-
- // Do an import of this data into the "old" dataset.
- options.setTargetDir(new Path(warehouse, "merge-old").toString());
- options.setIncrementalMode(IncrementalMode.DateLastModified);
- options.setIncrementalTestColumn("LASTMOD");
-
- ImportTool importTool = new ImportTool();
- Sqoop importer = new Sqoop(importTool, options.getConf(), options);
- ret = Sqoop.runSqoop(importer, new String[0]);
- if (0 != ret) {
- fail("Initial import failed with exit code " + ret);
- }
+ importData(OLD_PATH, fileLayout);
// Check that we got records that meet our expected values.
- assertRecordStartsWith("0,0,", "merge-old");
- assertRecordStartsWith("1,42,", "merge-old");
-
- long prevImportEnd = System.currentTimeMillis();
+ checkData(OLD_PATH, initRecords, fileLayout);
Thread.sleep(25);
// Modify the data in the warehouse.
- PreparedStatement s = conn.prepareStatement("UPDATE \"" + TABLE_NAME + "\" SET val=43, lastmod=NOW() WHERE id=1");
- try {
- s.executeUpdate();
- conn.commit();
- } finally {
- s.close();
- }
-
- s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (3,313,NOW())");
- try {
- s.executeUpdate();
- conn.commit();
- } finally {
- s.close();
- }
+ createTable(newRecords);
Thread.sleep(25);
// Do another import, into the "new" dir.
- options = getSqoopOptions(newConf());
- options.setTableName(TABLE_NAME);
- options.setNumMappers(1);
- options.setTargetDir(new Path(warehouse, "merge-new").toString());
- options.setIncrementalMode(IncrementalMode.DateLastModified);
- options.setIncrementalTestColumn("LASTMOD");
- options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
+ importData(NEW_PATH, fileLayout);
- importTool = new ImportTool();
- importer = new Sqoop(importTool, options.getConf(), options);
- ret = Sqoop.runSqoop(importer, new String[0]);
- if (0 != ret) {
- fail("Second import failed with exit code " + ret);
- }
-
- assertRecordStartsWith("1,43,", "merge-new");
- assertRecordStartsWith("3,313,", "merge-new");
+ checkData(NEW_PATH, newRecords, fileLayout);
// Now merge the results!
ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
-
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
options = getSqoopOptions(newConf());
- options.setMergeOldPath(new Path(warehouse, "merge-old").toString());
- options.setMergeNewPath(new Path(warehouse, "merge-new").toString());
+ options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString());
+ options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString());
options.setMergeKeyCol("ID");
- options.setTargetDir(new Path(warehouse, "merge-final").toString());
+ options.setTargetDir(new Path(warehouse, FINAL_PATH).toString());
options.setClassName(MERGE_CLASS_NAME);
+ options.setExistingJarName(jarFileName);
MergeTool mergeTool = new MergeTool();
Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
@@ -228,17 +211,47 @@
fail("Merge failed with exit code " + ret);
}
- assertRecordStartsWith("0,0,", "merge-final");
- assertRecordStartsWith("1,43,", "merge-final");
- assertRecordStartsWith("3,313,", "merge-final");
+ checkData(FINAL_PATH, mergedRecords, fileLayout);
+ }
+
+ private void checkData(String dataDir, List<List<Integer>> records,
+ SqoopOptions.FileLayout fileLayout) throws Exception {
+ for (List<Integer> record : records) {
+ assertRecordStartsWith(record, dataDir, fileLayout);
+ }
+ }
+
+ private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) {
+ return recordVals.get(0).equals(genericRecord.get(0))
+ && recordVals.get(1).equals(genericRecord.get(1));
+ }
+
+ private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) {
+ SqoopOptions options;
+ options = getSqoopOptions(newConf());
+ options.setTableName(TABLE_NAME);
+ options.setNumMappers(1);
+ options.setFileLayout(fileLayout);
+ options.setDeleteMode(true);
+
+ Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+ options.setTargetDir(new Path(warehouse, targetDir).toString());
+
+ ImportTool importTool = new ImportTool();
+ Sqoop importer = new Sqoop(importTool, options.getConf(), options);
+ int ret = Sqoop.runSqoop(importer, new String[0]);
+ if (0 != ret) {
+ fail("Initial import failed with exit code " + ret);
+ }
}
/**
* @return true if the file specified by path 'p' contains a line
* that starts with 'prefix'
*/
- protected boolean checkFileForLine(FileSystem fs, Path p, String prefix)
+ protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record)
throws IOException {
+ final String prefix = StringUtils.join(record, ',');
BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
while (true) {
@@ -258,11 +271,42 @@
return false;
}
+ private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record)
+ throws IOException {
+ SeekableInput in = new FsInput(p, new Configuration());
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+ FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader);
+ reader.sync(0);
+
+ while (reader.hasNext()) {
+ if (valueMatches(reader.next(), record)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
+ List<Integer> record) throws IOException {
+ boolean result = false;
+ switch (fileLayout) {
+ case TextFile:
+ result = checkTextFileForLine(fs, p, record);
+ break;
+ case AvroDataFile:
+ result = checkAvroFileForLine(fs, p, record);
+ break;
+ }
+ return result;
+ }
+
/**
* Return true if there's a file in 'dirName' with a line that starts with
* 'prefix'.
*/
- protected boolean recordStartsWith(String prefix, String dirName)
+ protected boolean recordStartsWith(List<Integer> record, String dirName,
+ SqoopOptions.FileLayout fileLayout)
throws Exception {
Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
Path targetPath = new Path(warehousePath, dirName);
@@ -277,7 +321,7 @@
for (FileStatus stat : files) {
Path p = stat.getPath();
if (p.getName().startsWith("part-")) {
- if (checkFileForLine(fs, p, prefix)) {
+ if (checkFileForLine(fs, p, fileLayout, record)) {
// We found the line. Nothing further to do.
return true;
}
@@ -287,11 +331,10 @@
return false;
}
- protected void assertRecordStartsWith(String prefix, String dirName)
- throws Exception {
- if (!recordStartsWith(prefix, dirName)) {
- fail("No record found that starts with " + prefix + " in " + dirName);
+ protected void assertRecordStartsWith(List<Integer> record, String dirName,
+ SqoopOptions.FileLayout fileLayout) throws Exception {
+ if (!recordStartsWith(record, dirName, fileLayout)) {
+ fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName);
}
}
}
-