SQOOP-1094: Add Avro support to merge tool

(Yibing Shi via Jarek Jarcec Cecho)
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index ee3cf62..9036076 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -18,14 +18,26 @@
 package org.apache.sqoop.avro;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.sqoop.lib.BlobRef;
 import org.apache.sqoop.lib.ClobRef;
 import org.apache.sqoop.orm.ClassWriter;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
@@ -184,4 +196,35 @@
     }
   }
 
+  /**
+   * Get the schema of AVRO files stored in a directory
+   */
+  public static Schema getAvroSchema(Path path, Configuration conf)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    Path fileToTest;
+    if (fs.isDirectory(path)) {
+      FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
+        @Override
+        public boolean accept(Path p) {
+          String name = p.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      });
+      if (fileStatuses.length == 0) {
+        return null;
+      }
+      fileToTest = fileStatuses[0].getPath();
+    } else {
+      fileToTest = path;
+    }
+
+    SeekableInput input = new FsInput(fileToTest, conf);
+    DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+    FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
+
+    Schema result = fileReader.getSchema();
+    fileReader.close();
+    return result;
+  }
 }
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroJob.java b/src/java/org/apache/sqoop/mapreduce/AvroJob.java
index bb4755c..65695d8 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroJob.java
@@ -42,6 +42,11 @@
     return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
   }
 
+  /** Set a job's output key schema. */
+  public static void setOutputSchema(Configuration job, Schema s) {
+    job.set(OUTPUT_SCHEMA, s.toString());
+  }
+
   /** Return a job's output key schema. */
   public static Schema getOutputSchema(Configuration job) {
     return Schema.parse(job.get(OUTPUT_SCHEMA));
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java
new file mode 100644
index 0000000..a2277bf
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeAvroMapper.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.avro.AvroUtil;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Mapper for the merge program which operates on AVRO data files.
+ */
+public class MergeAvroMapper
+ extends
+    MergeMapperBase<AvroWrapper<GenericRecord>, NullWritable> {
+
+  private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
+  private SqoopRecord sqoopRecordImpl;
+
+  @Override
+  protected void setup(Context context) throws InterruptedException, IOException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY);
+    try {
+      final Class<? extends Object> clazz = Class.forName(userClassName, true,
+          Thread.currentThread().getContextClassLoader());
+      sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf);
+      for (final Field field : clazz.getDeclaredFields()) {
+        final String fieldName = field.getName();
+        final String fieldTypeName = field.getType().getName();
+        sqoopRecordFields.put(fieldName.toLowerCase(), new Pair<String, String>(fieldName,
+            fieldTypeName));
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Cannot find the user record class with class name"
+          + userClassName, e);
+    }
+  }
+
+  @Override
+  public void map(AvroWrapper<GenericRecord> key, NullWritable val, Context c)
+      throws IOException, InterruptedException {
+    processRecord(toSqoopRecord(key.datum()), c);
+  }
+
+  private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException {
+    Schema avroSchema = genericRecord.getSchema();
+    for (Schema.Field field : avroSchema.getFields()) {
+      Pair<String, String> sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase());
+      if (null == sqoopRecordField) {
+        throw new IOException("Cannot find field '" + field.name() + "' in fields of user class"
+            + sqoopRecordImpl.getClass().getName() + ". Fields are: "
+            + Arrays.deepToString(sqoopRecordFields.values().toArray()));
+      }
+      Object avroObject = genericRecord.get(field.name());
+      Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value());
+      sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal);
+    }
+    return sqoopRecordImpl;
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java
new file mode 100644
index 0000000..2e85f51
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeAvroReducer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.avro.AvroUtil;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+public class MergeAvroReducer extends MergeReducerBase<AvroWrapper<GenericRecord>, NullWritable> {
+  private AvroWrapper<GenericRecord> wrapper;
+  private Schema schema;
+  private boolean bigDecimalFormatString;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    wrapper = new AvroWrapper<GenericRecord>();
+    schema = AvroJob.getOutputSchema(context.getConfiguration());
+    bigDecimalFormatString = context.getConfiguration().getBoolean(
+        ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+  }
+
+  @Override
+  protected void writeRecord(SqoopRecord record, Context context)
+      throws IOException, InterruptedException {
+    GenericRecord outKey = AvroUtil.toGenericRecord(record.getFieldMap(), schema,
+        bigDecimalFormatString);
+    wrapper.datum(outKey);
+    context.write(wrapper, NullWritable.get());
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 4e2a916..5b6c4df 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -20,9 +20,19 @@
 
 import java.io.IOException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -30,7 +40,10 @@
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
 import org.apache.sqoop.util.Jars;
+
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.mapreduce.JobBase;
 
@@ -114,20 +127,26 @@
 
       FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
 
-      if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
-        job.setInputFormatClass(SequenceFileInputFormat.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setMapperClass(MergeRecordMapper.class);
-      } else {
-        job.setMapperClass(MergeTextMapper.class);
-        job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+      FileType fileType = ExportJobBase.getFileType(jobConf, oldPath);
+      switch (fileType) {
+        case AVRO_DATA_FILE:
+          configueAvroMergeJob(conf, job, oldPath, newPath);
+          break;
+        case SEQUENCE_FILE:
+          job.setInputFormatClass(SequenceFileInputFormat.class);
+          job.setOutputFormatClass(SequenceFileOutputFormat.class);
+          job.setMapperClass(MergeRecordMapper.class);
+          job.setReducerClass(MergeReducer.class);
+          break;
+        default:
+          job.setMapperClass(MergeTextMapper.class);
+          job.setOutputFormatClass(RawKeyTextOutputFormat.class);
+          job.setReducerClass(MergeReducer.class);
       }
 
       jobConf.set("mapred.output.key.class", userClassName);
       job.setOutputValueClass(NullWritable.class);
 
-      job.setReducerClass(MergeReducer.class);
-
       // Set the intermediate data types.
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(MergeRecord.class);
@@ -142,6 +161,23 @@
       throw new IOException(cnfe);
     }
   }
+
+  private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath)
+      throws IOException {
+    LOG.info("Trying to merge avro files");
+    final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf);
+    final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf);
+    if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) {
+      throw new IOException("Invalid schema for input directories. Schema for old data: ["
+          + oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]");
+    }
+    LOG.debug("Avro Schema:" + oldPathSchema);
+    job.setInputFormatClass(AvroInputFormat.class);
+    job.setOutputFormatClass(AvroOutputFormat.class);
+    job.setMapperClass(MergeAvroMapper.class);
+    job.setReducerClass(MergeAvroReducer.class);
+    AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
+  }
 }
 
 
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
index cafff8a..6192cdb 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
@@ -29,28 +29,12 @@
  * a new one if possible; otherwise, an old one.
  */
 public class MergeReducer
-    extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
+ extends MergeReducerBase<SqoopRecord, NullWritable> {
 
   @Override
-  public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
+  protected void writeRecord(SqoopRecord record, Context c)
       throws IOException, InterruptedException {
-    SqoopRecord bestRecord = null;
-    try {
-      for (MergeRecord val : vals) {
-        if (null == bestRecord && !val.isNewRecord()) {
-          // Use an old record if we don't have a new record.
-          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
-        } else if (val.isNewRecord()) {
-          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
-        }
-      }
-    } catch (CloneNotSupportedException cnse) {
-      throw new IOException(cnse);
-    }
-
-    if (null != bestRecord) {
-      c.write(bestRecord, NullWritable.get());
-    }
+    c.write(record, NullWritable.get());
   }
 }
 
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java b/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java
new file mode 100644
index 0000000..4af498f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeReducerBase.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+public abstract class MergeReducerBase<KEYOUT, VALUEOUT> extends
+    Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
+
+  @Override
+  public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
+      throws IOException, InterruptedException {
+    SqoopRecord bestRecord = null;
+    try {
+      for (MergeRecord val : vals) {
+        if (null == bestRecord && !val.isNewRecord()) {
+          // Use an old record if we don't have a new record.
+          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+        } else if (val.isNewRecord()) {
+          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
+        }
+      }
+    } catch (CloneNotSupportedException cnse) {
+      throw new IOException(cnse);
+    }
+
+    if (null != bestRecord) {
+      writeRecord(bestRecord, c);
+    }
+  }
+
+  abstract protected void writeRecord(SqoopRecord record, Context c)
+      throws IOException, InterruptedException;
+}
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
index 3821aa1..1709419 100644
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ b/src/test/com/cloudera/sqoop/TestMerge.java
@@ -25,18 +25,12 @@
 import java.sql.Timestamp;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
-
 import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.testutil.HsqldbTestServer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
+import com.cloudera.sqoop.SqoopOptions.FileLayout;
 import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
@@ -44,6 +38,20 @@
 import com.cloudera.sqoop.tool.ImportTool;
 import com.cloudera.sqoop.tool.MergeTool;
 import com.cloudera.sqoop.util.ClassLoaderStack;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Test that the merge tool works.
@@ -58,6 +66,19 @@
 
   public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
 
+  private static final List<List<Integer>> initRecords = Arrays
+      .asList(Arrays.asList(new Integer(0), new Integer(0)),
+          Arrays.asList(new Integer(1), new Integer(42)));
+
+  private static final List<List<Integer>> newRecords = Arrays.asList(
+      Arrays.asList(new Integer(1), new Integer(43)),
+      Arrays.asList(new Integer(3), new Integer(313)));
+
+  private static final List<List<Integer>> mergedRecords = Arrays.asList(
+      Arrays.asList(new Integer(0), new Integer(0)),
+      Arrays.asList(new Integer(1), new Integer(43)),
+      Arrays.asList(new Integer(3), new Integer(313)));
+
   @Override
   public void setUp() {
     super.setUp();
@@ -71,6 +92,9 @@
   }
 
   public static final String TABLE_NAME = "MergeTable";
+  private static final String OLD_PATH = "merge-old";
+  private static final String NEW_PATH = "merge_new";
+  private static final String FINAL_PATH = "merge_final";
 
   public Configuration newConf() {
     Configuration conf = new Configuration();
@@ -91,7 +115,7 @@
     return options;
   }
 
-  protected void createTable() throws SQLException {
+  protected void createTable(List<List<Integer>> records) throws SQLException {
     PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS");
     try {
       s.executeUpdate();
@@ -99,32 +123,38 @@
       s.close();
     }
 
-    s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME + "\" (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)");
+    s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME
+        + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)");
     try {
       s.executeUpdate();
     } finally {
       s.close();
     }
 
-    s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (0, 0, NOW())");
-    try {
-      s.executeUpdate();
-    } finally {
-      s.close();
-    }
-
-    s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (1, 42, NOW())");
-    try {
-      s.executeUpdate();
-    } finally {
-      s.close();
+    for (List<Integer> record : records) {
+      final String values = StringUtils.join(record, ", ");
+      s = conn
+          .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())");
+      try {
+        s.executeUpdate();
+      } finally {
+        s.close();
+      }
     }
 
     conn.commit();
   }
 
-  public void testMerge() throws Exception {
-    createTable();
+  public void testTextFileMerge() throws Exception {
+    runMergeTest(SqoopOptions.FileLayout.TextFile);
+  }
+
+  public void testAvroFileMerge() throws Exception {
+    runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
+  }
+
+  public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
+    createTable(initRecords);
 
     // Create a jar to use for the merging process; we'll load it
     // into the current thread CL for when this runs. This needs
@@ -146,80 +176,33 @@
     String jarFileName = jars.get(0);
 
     // Now do the imports.
-
-    Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
-
-    options = getSqoopOptions(newConf());
-    options.setTableName(TABLE_NAME);
-    options.setNumMappers(1);
-
-    // Do an import of this data into the "old" dataset.
-    options.setTargetDir(new Path(warehouse, "merge-old").toString());
-    options.setIncrementalMode(IncrementalMode.DateLastModified);
-    options.setIncrementalTestColumn("LASTMOD");
-
-    ImportTool importTool = new ImportTool();
-    Sqoop importer = new Sqoop(importTool, options.getConf(), options);
-    ret = Sqoop.runSqoop(importer, new String[0]);
-    if (0 != ret) {
-      fail("Initial import failed with exit code " + ret);
-    }
+    importData(OLD_PATH, fileLayout);
 
     // Check that we got records that meet our expected values.
-    assertRecordStartsWith("0,0,", "merge-old");
-    assertRecordStartsWith("1,42,", "merge-old");
-
-    long prevImportEnd = System.currentTimeMillis();
+    checkData(OLD_PATH, initRecords, fileLayout);
 
     Thread.sleep(25);
 
     // Modify the data in the warehouse.
-    PreparedStatement s = conn.prepareStatement("UPDATE \"" + TABLE_NAME + "\" SET val=43, lastmod=NOW() WHERE id=1");
-    try {
-      s.executeUpdate();
-      conn.commit();
-    } finally {
-      s.close();
-    }
-
-    s = conn.prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (3,313,NOW())");
-    try {
-      s.executeUpdate();
-      conn.commit();
-    } finally {
-      s.close();
-    }
+    createTable(newRecords);
 
     Thread.sleep(25);
 
     // Do another import, into the "new" dir.
-    options = getSqoopOptions(newConf());
-    options.setTableName(TABLE_NAME);
-    options.setNumMappers(1);
-    options.setTargetDir(new Path(warehouse, "merge-new").toString());
-    options.setIncrementalMode(IncrementalMode.DateLastModified);
-    options.setIncrementalTestColumn("LASTMOD");
-    options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
+    importData(NEW_PATH, fileLayout);
 
-    importTool = new ImportTool();
-    importer = new Sqoop(importTool, options.getConf(), options);
-    ret = Sqoop.runSqoop(importer, new String[0]);
-    if (0 != ret) {
-      fail("Second import failed with exit code " + ret);
-    }
-
-    assertRecordStartsWith("1,43,", "merge-new");
-    assertRecordStartsWith("3,313,", "merge-new");
+    checkData(NEW_PATH, newRecords, fileLayout);
 
     // Now merge the results!
     ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME);
-
+    Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
     options = getSqoopOptions(newConf());
-    options.setMergeOldPath(new Path(warehouse, "merge-old").toString());
-    options.setMergeNewPath(new Path(warehouse, "merge-new").toString());
+    options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString());
+    options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString());
     options.setMergeKeyCol("ID");
-    options.setTargetDir(new Path(warehouse, "merge-final").toString());
+    options.setTargetDir(new Path(warehouse, FINAL_PATH).toString());
     options.setClassName(MERGE_CLASS_NAME);
+    options.setExistingJarName(jarFileName);
 
     MergeTool mergeTool = new MergeTool();
     Sqoop merger = new Sqoop(mergeTool, options.getConf(), options);
@@ -228,17 +211,47 @@
       fail("Merge failed with exit code " + ret);
     }
 
-    assertRecordStartsWith("0,0,", "merge-final");
-    assertRecordStartsWith("1,43,", "merge-final");
-    assertRecordStartsWith("3,313,", "merge-final");
+    checkData(FINAL_PATH, mergedRecords, fileLayout);
+  }
+
+  private void checkData(String dataDir, List<List<Integer>> records,
+      SqoopOptions.FileLayout fileLayout) throws Exception {
+    for (List<Integer> record : records) {
+      assertRecordStartsWith(record, dataDir, fileLayout);
+    }
+  }
+
+  private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) {
+    return recordVals.get(0).equals(genericRecord.get(0))
+        && recordVals.get(1).equals(genericRecord.get(1));
+  }
+
+  private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) {
+    SqoopOptions options;
+    options = getSqoopOptions(newConf());
+    options.setTableName(TABLE_NAME);
+    options.setNumMappers(1);
+    options.setFileLayout(fileLayout);
+    options.setDeleteMode(true);
+
+    Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+    options.setTargetDir(new Path(warehouse, targetDir).toString());
+
+    ImportTool importTool = new ImportTool();
+    Sqoop importer = new Sqoop(importTool, options.getConf(), options);
+    int ret = Sqoop.runSqoop(importer, new String[0]);
+    if (0 != ret) {
+      fail("Initial import failed with exit code " + ret);
+    }
   }
 
   /**
    * @return true if the file specified by path 'p' contains a line
    * that starts with 'prefix'
    */
-  protected boolean checkFileForLine(FileSystem fs, Path p, String prefix)
+  protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record)
       throws IOException {
+    final String prefix = StringUtils.join(record, ',');
     BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
     try {
       while (true) {
@@ -258,11 +271,42 @@
     return false;
   }
 
+  private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record)
+      throws IOException {
+    SeekableInput in = new FsInput(p, new Configuration());
+    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+    FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader);
+    reader.sync(0);
+
+    while (reader.hasNext()) {
+      if (valueMatches(reader.next(), record)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
+      List<Integer> record) throws IOException {
+    boolean result = false;
+    switch (fileLayout) {
+      case TextFile:
+        result = checkTextFileForLine(fs, p, record);
+        break;
+      case AvroDataFile:
+        result = checkAvroFileForLine(fs, p, record);
+        break;
+    }
+    return result;
+  }
+
   /**
    * Return true if there's a file in 'dirName' with a line that starts with
    * 'prefix'.
    */
-  protected boolean recordStartsWith(String prefix, String dirName)
+  protected boolean recordStartsWith(List<Integer> record, String dirName,
+      SqoopOptions.FileLayout fileLayout)
       throws Exception {
     Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
     Path targetPath = new Path(warehousePath, dirName);
@@ -277,7 +321,7 @@
     for (FileStatus stat : files) {
       Path p = stat.getPath();
       if (p.getName().startsWith("part-")) {
-        if (checkFileForLine(fs, p, prefix)) {
+        if (checkFileForLine(fs, p, fileLayout, record)) {
           // We found the line. Nothing further to do.
           return true;
         }
@@ -287,11 +331,10 @@
     return false;
   }
 
-  protected void assertRecordStartsWith(String prefix, String dirName)
-      throws Exception {
-    if (!recordStartsWith(prefix, dirName)) {
-      fail("No record found that starts with " + prefix + " in " + dirName);
+  protected void assertRecordStartsWith(List<Integer> record, String dirName,
+      SqoopOptions.FileLayout fileLayout) throws Exception {
+    if (!recordStartsWith(record, dirName, fileLayout)) {
+      fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName);
     }
   }
 }
-