SQOOP-2846: Sqoop Export with update-key failing for avro data file

(VISHNU S NAIR via Jarek Jarcec Cecho)
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index c85602c..f911280 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -21,14 +21,23 @@
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -40,6 +49,8 @@
  */
 public class JdbcUpdateExportJob extends ExportJobBase {
 
+  // Fix For Issue [SQOOP-2846]
+  private FileType fileType;
   public static final Log LOG = LogFactory.getLog(
       JdbcUpdateExportJob.class.getName());
 
@@ -64,11 +75,21 @@
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
   }
 
+  // Fix For Issue [SQOOP-2846]
   @Override
   protected Class<? extends Mapper> getMapperClass() {
-    if (inputIsSequenceFiles()) {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getExportMapperClass();
+    }
+    switch (fileType) {
+    case SEQUENCE_FILE:
       return SequenceFileExportMapper.class;
-    } else {
+    case AVRO_DATA_FILE:
+      return AvroExportMapper.class;
+    case PARQUET_FILE:
+      return ParquetExportMapper.class;
+    case UNKNOWN:
+    default:
       return TextExportMapper.class;
     }
   }
@@ -143,5 +164,69 @@
       throw new IOException("Could not load OutputFormat", cnfe);
     }
   }
-}
 
+  // Fix For Issue [SQOOP-2846]
+  @Override
+  protected void configureInputFormat(Job job, String tableName, String tableClassName,
+      String splitByCol)
+          throws ClassNotFoundException, IOException
+  {
+
+    fileType = getInputFileType();
+
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+
+    if (isHCatJob) {
+      SqoopHCatUtilities.configureExportInputFormat(options, job, context.getConnManager(),
+          tableName,
+          job.getConfiguration());
+      return;
+    } else if (fileType == FileType.AVRO_DATA_FILE) {
+      LOG.debug("Configuring for Avro export");
+      configureGenericRecordExportInputFormat(job, tableName);
+    } else if (fileType == FileType.PARQUET_FILE) {
+      LOG.debug("Configuring for Parquet export");
+      configureGenericRecordExportInputFormat(job, tableName);
+      FileSystem fs = FileSystem.get(job.getConfiguration());
+      String uri = "dataset:" + fs.makeQualified(getInputPath());
+      DatasetKeyInputFormat.configure(job).readFrom(uri);
+    }
+  }
+
+  // Fix For Issue [SQOOP-2846]
+  private void configureGenericRecordExportInputFormat(Job job, String tableName)
+      throws IOException
+  {
+    ConnManager connManager = context.getConnManager();
+    Map<String, Integer> columnTypeInts;
+    if (options.getCall() == null) {
+      columnTypeInts = connManager.getColumnTypes(tableName, options.getSqlQuery());
+    } else {
+      columnTypeInts = connManager.getColumnTypesForProcedure(options.getCall());
+    }
+    MapWritable columnTypes = new MapWritable();
+    for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+      Text columnName = new Text(e.getKey());
+      Text columnText = new Text(connManager.toJavaType(tableName, e.getKey(), e.getValue()));
+      columnTypes.put(columnName, columnText);
+    }
+    DefaultStringifier.store(job.getConfiguration(), columnTypes,
+        AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
+  }
+
+  // Fix For Issue [SQOOP-2846]
+  @Override
+  protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getInputFormatClass();
+    }
+    switch (fileType) {
+    case AVRO_DATA_FILE:
+      return AvroInputFormat.class;
+    case PARQUET_FILE:
+      return DatasetKeyInputFormat.class;
+    default:
+      return super.getInputFormatClass();
+    }
+  }
+}
diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java
index f91cd48..02db6c0 100644
--- a/src/test/com/cloudera/sqoop/TestAvroExport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroExport.java
@@ -33,6 +33,7 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -164,10 +165,12 @@
     fields.add(buildAvroField("id", Schema.Type.INT));
     fields.add(buildAvroField("msg", Schema.Type.STRING));
     int colNum = 0;
-    for (ColumnGenerator gen : extraCols) {
-      if (gen.getColumnAvroSchema() != null) {
-        fields.add(buildAvroField(forIdx(colNum++),
-            gen.getColumnAvroSchema()));
+    // Issue [SQOOP-2846]
+    if (null != extraCols) {
+      for (ColumnGenerator gen : extraCols) {
+        if (gen.getColumnAvroSchema() != null) {
+          fields.add(buildAvroField(forIdx(colNum++), gen.getColumnAvroSchema()));
+        }
       }
     }
     Schema schema = Schema.createRecord("myschema", null, null, false);
@@ -178,9 +181,12 @@
   private void addExtraColumns(GenericRecord record, int rowNum,
       ColumnGenerator[] extraCols) {
     int colNum = 0;
-    for (ColumnGenerator gen : extraCols) {
-      if (gen.getColumnAvroSchema() != null) {
-        record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+    // Issue [SQOOP-2846]
+    if (null != extraCols) {
+      for (ColumnGenerator gen : extraCols) {
+        if (gen.getColumnAvroSchema() != null) {
+          record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+        }
       }
     }
   }
@@ -253,6 +259,39 @@
     }
   }
 
+  /**
+   * Create the table definition to export and also inserting one records for
+   * identifying the updates. Issue [SQOOP-2846]
+   */
+  private void createTableWithInsert() throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE TABLE ");
+    sb.append(getTableName());
+    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    sb.append(")");
+    statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      Statement statement2 = conn.createStatement();
+      String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');";
+      statement2.execute(insertCmd);
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+
   /** Verify that on a given row, a column has a given value.
    * @param id the id column specifying the row to test.
    */
@@ -418,6 +457,33 @@
     verifyExport(TOTAL_RECORDS);
   }
 
+  // Test Case for Issue [SQOOP-2846]
+  public void testAvroWithUpsert() throws IOException, SQLException {
+    String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" };
+    final int TOTAL_RECORDS = 2;
+    // ColumnGenerator gen = colGenerator("100",
+    // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
+    createAvroFile(0, TOTAL_RECORDS, null);
+    createTableWithInsert();
+    try {
+      runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    } catch (Exception e) {
+      // expected
+      assertTrue(true);
+    }
+  }
+
+  // Test Case for Issue [SQOOP-2846]
+  public void testAvroWithUpdateKey() throws IOException, SQLException {
+    String[] argv = { "--update-key", "ID" };
+    final int TOTAL_RECORDS = 1;
+    // ColumnGenerator gen = colGenerator("100",
+    // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
+    createAvroFile(0, TOTAL_RECORDS, null);
+    createTableWithInsert();
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(getMsgPrefix() + "0");
+  }
   public void testMissingAvroFields()  throws IOException, SQLException {
     String[] argv = {};
     final int TOTAL_RECORDS = 1;
diff --git a/src/test/com/cloudera/sqoop/TestParquetExport.java b/src/test/com/cloudera/sqoop/TestParquetExport.java
index b938bf8..0c94248 100644
--- a/src/test/com/cloudera/sqoop/TestParquetExport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetExport.java
@@ -32,6 +32,7 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -143,10 +144,11 @@
     fields.add(buildField("id", Schema.Type.INT));
     fields.add(buildField("msg", Schema.Type.STRING));
     int colNum = 0;
-    for (ColumnGenerator gen : extraCols) {
-      if (gen.getColumnParquetSchema() != null) {
-        fields.add(buildParquetField(forIdx(colNum++),
-            gen.getColumnParquetSchema()));
+    if (null != extraCols) {
+      for (ColumnGenerator gen : extraCols) {
+        if (gen.getColumnParquetSchema() != null) {
+          fields.add(buildParquetField(forIdx(colNum++), gen.getColumnParquetSchema()));
+        }
       }
     }
     Schema schema = Schema.createRecord("myschema", null, null, false);
@@ -157,9 +159,11 @@
   private void addExtraColumns(GenericRecord record, int rowNum,
       ColumnGenerator[] extraCols) {
     int colNum = 0;
-    for (ColumnGenerator gen : extraCols) {
-      if (gen.getColumnParquetSchema() != null) {
-        record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+    if (null != extraCols) {
+      for (ColumnGenerator gen : extraCols) {
+        if (gen.getColumnParquetSchema() != null) {
+          record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+        }
       }
     }
   }
@@ -232,6 +236,38 @@
     }
   }
 
+  /**
+   * Create the table definition to export and also inserting one records for
+   * identifying the updates. Issue [SQOOP-2846]
+   */
+  private void createTableWithInsert() throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE TABLE ");
+    sb.append(getTableName());
+    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    sb.append(")");
+    statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      Statement statement2 = conn.createStatement();
+      String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');";
+      statement2.execute(insertCmd);
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
   /** Verify that on a given row, a column has a given value.
    * @param id the id column specifying the row to test.
    */
@@ -369,6 +405,30 @@
     verifyExport(TOTAL_RECORDS);
   }
 
+  public void testParquetWithUpdateKey() throws IOException, SQLException {
+    String[] argv = { "--update-key", "ID" };
+    final int TOTAL_RECORDS = 1;
+    createParquetFile(0, TOTAL_RECORDS, null);
+    createTableWithInsert();
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(getMsgPrefix() + "0");
+  }
+
+  // Test Case for Issue [SQOOP-2846]
+  public void testParquetWithUpsert() throws IOException, SQLException {
+    String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" };
+    final int TOTAL_RECORDS = 2;
+    // ColumnGenerator gen = colGenerator("100",
+    // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
+    createParquetFile(0, TOTAL_RECORDS, null);
+    createTableWithInsert();
+    try {
+      runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    } catch (Exception e) {
+      // expected
+      assertTrue(true);
+    }
+  }
   public void testMissingParquetFields()  throws IOException, SQLException {
     String[] argv = {};
     final int TOTAL_RECORDS = 1;
diff --git a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
index ed0dc31..786bd94 100644
--- a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
@@ -301,8 +301,31 @@
       statement.close();
     }
 
-    assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal,
-        maxMsg);
+    assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal, maxMsg);
+  }
+
+  // Verify Export Method For checking update's : Issue [SQOOP-2846]
+  protected void verifyExport(String expectedValue) throws IOException, SQLException {
+    Connection conn = getConnection();
+    LOG.info("Verifying export: " + getTableName());
+    // Check that we got back the correct number of records.
+    PreparedStatement statement = conn.prepareStatement("SELECT MSG FROM " + getTableName(),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    String actualValue = null;
+    ResultSet rs = null;
+    try {
+      rs = statement.executeQuery();
+      try {
+        rs.next();
+        actualValue = rs.getString(1);
+      } finally {
+        rs.close();
+      }
+    } finally {
+      statement.close();
+    }
+
+    assertEquals("Got back unexpected row count", expectedValue, actualValue);
   }
 
   /**