SQOOP-2846: Sqoop Export with update-key failing for avro data file
(VISHNU S NAIR via Jarek Jarcec Cecho)
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index c85602c..f911280 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -21,14 +21,23 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -40,6 +49,8 @@
*/
public class JdbcUpdateExportJob extends ExportJobBase {
+ // Fix For Issue [SQOOP-2846]
+ private FileType fileType;
public static final Log LOG = LogFactory.getLog(
JdbcUpdateExportJob.class.getName());
@@ -64,11 +75,21 @@
super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
}
+ // Fix For Issue [SQOOP-2846]
@Override
protected Class<? extends Mapper> getMapperClass() {
- if (inputIsSequenceFiles()) {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getExportMapperClass();
+ }
+ switch (fileType) {
+ case SEQUENCE_FILE:
return SequenceFileExportMapper.class;
- } else {
+ case AVRO_DATA_FILE:
+ return AvroExportMapper.class;
+ case PARQUET_FILE:
+ return ParquetExportMapper.class;
+ case UNKNOWN:
+ default:
return TextExportMapper.class;
}
}
@@ -143,5 +164,69 @@
throw new IOException("Could not load OutputFormat", cnfe);
}
}
-}
+ // Fix For Issue [SQOOP-2846]
+ @Override
+ protected void configureInputFormat(Job job, String tableName, String tableClassName,
+ String splitByCol)
+ throws ClassNotFoundException, IOException
+ {
+
+ fileType = getInputFileType();
+
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+
+ if (isHCatJob) {
+ SqoopHCatUtilities.configureExportInputFormat(options, job, context.getConnManager(),
+ tableName,
+ job.getConfiguration());
+ return;
+ } else if (fileType == FileType.AVRO_DATA_FILE) {
+ LOG.debug("Configuring for Avro export");
+ configureGenericRecordExportInputFormat(job, tableName);
+ } else if (fileType == FileType.PARQUET_FILE) {
+ LOG.debug("Configuring for Parquet export");
+ configureGenericRecordExportInputFormat(job, tableName);
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ String uri = "dataset:" + fs.makeQualified(getInputPath());
+ DatasetKeyInputFormat.configure(job).readFrom(uri);
+ }
+ }
+
+ // Fix For Issue [SQOOP-2846]
+ private void configureGenericRecordExportInputFormat(Job job, String tableName)
+ throws IOException
+ {
+ ConnManager connManager = context.getConnManager();
+ Map<String, Integer> columnTypeInts;
+ if (options.getCall() == null) {
+ columnTypeInts = connManager.getColumnTypes(tableName, options.getSqlQuery());
+ } else {
+ columnTypeInts = connManager.getColumnTypesForProcedure(options.getCall());
+ }
+ MapWritable columnTypes = new MapWritable();
+ for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+ Text columnName = new Text(e.getKey());
+ Text columnText = new Text(connManager.toJavaType(tableName, e.getKey(), e.getValue()));
+ columnTypes.put(columnName, columnText);
+ }
+ DefaultStringifier.store(job.getConfiguration(), columnTypes,
+ AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
+ }
+
+ // Fix For Issue [SQOOP-2846]
+ @Override
+ protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getInputFormatClass();
+ }
+ switch (fileType) {
+ case AVRO_DATA_FILE:
+ return AvroInputFormat.class;
+ case PARQUET_FILE:
+ return DatasetKeyInputFormat.class;
+ default:
+ return super.getInputFormatClass();
+ }
+ }
+}
diff --git a/src/test/com/cloudera/sqoop/TestAvroExport.java b/src/test/com/cloudera/sqoop/TestAvroExport.java
index f91cd48..02db6c0 100644
--- a/src/test/com/cloudera/sqoop/TestAvroExport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroExport.java
@@ -33,6 +33,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@@ -164,10 +165,12 @@
fields.add(buildAvroField("id", Schema.Type.INT));
fields.add(buildAvroField("msg", Schema.Type.STRING));
int colNum = 0;
- for (ColumnGenerator gen : extraCols) {
- if (gen.getColumnAvroSchema() != null) {
- fields.add(buildAvroField(forIdx(colNum++),
- gen.getColumnAvroSchema()));
+ // Issue [SQOOP-2846]
+ if (null != extraCols) {
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getColumnAvroSchema() != null) {
+ fields.add(buildAvroField(forIdx(colNum++), gen.getColumnAvroSchema()));
+ }
}
}
Schema schema = Schema.createRecord("myschema", null, null, false);
@@ -178,9 +181,12 @@
private void addExtraColumns(GenericRecord record, int rowNum,
ColumnGenerator[] extraCols) {
int colNum = 0;
- for (ColumnGenerator gen : extraCols) {
- if (gen.getColumnAvroSchema() != null) {
- record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+ // Issue [SQOOP-2846]
+ if (null != extraCols) {
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getColumnAvroSchema() != null) {
+ record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+ }
}
}
}
@@ -253,6 +259,39 @@
}
}
+ /**
+ * Create the table definition to export and also inserting one records for
+ * identifying the updates. Issue [SQOOP-2846]
+ */
+ private void createTableWithInsert() throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE ");
+ sb.append(getTableName());
+ sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+ sb.append(")");
+ statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ Statement statement2 = conn.createStatement();
+ String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');";
+ statement2.execute(insertCmd);
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+
+
/** Verify that on a given row, a column has a given value.
* @param id the id column specifying the row to test.
*/
@@ -418,6 +457,33 @@
verifyExport(TOTAL_RECORDS);
}
+ // Test Case for Issue [SQOOP-2846]
+ public void testAvroWithUpsert() throws IOException, SQLException {
+ String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" };
+ final int TOTAL_RECORDS = 2;
+ // ColumnGenerator gen = colGenerator("100",
+ // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
+ createAvroFile(0, TOTAL_RECORDS, null);
+ createTableWithInsert();
+ try {
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ } catch (Exception e) {
+ // expected
+ assertTrue(true);
+ }
+ }
+
+ // Test Case for Issue [SQOOP-2846]
+ public void testAvroWithUpdateKey() throws IOException, SQLException {
+ String[] argv = { "--update-key", "ID" };
+ final int TOTAL_RECORDS = 1;
+ // ColumnGenerator gen = colGenerator("100",
+ // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
+ createAvroFile(0, TOTAL_RECORDS, null);
+ createTableWithInsert();
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ verifyExport(getMsgPrefix() + "0");
+ }
public void testMissingAvroFields() throws IOException, SQLException {
String[] argv = {};
final int TOTAL_RECORDS = 1;
diff --git a/src/test/com/cloudera/sqoop/TestParquetExport.java b/src/test/com/cloudera/sqoop/TestParquetExport.java
index b938bf8..0c94248 100644
--- a/src/test/com/cloudera/sqoop/TestParquetExport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetExport.java
@@ -32,6 +32,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@@ -143,10 +144,11 @@
fields.add(buildField("id", Schema.Type.INT));
fields.add(buildField("msg", Schema.Type.STRING));
int colNum = 0;
- for (ColumnGenerator gen : extraCols) {
- if (gen.getColumnParquetSchema() != null) {
- fields.add(buildParquetField(forIdx(colNum++),
- gen.getColumnParquetSchema()));
+ if (null != extraCols) {
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getColumnParquetSchema() != null) {
+ fields.add(buildParquetField(forIdx(colNum++), gen.getColumnParquetSchema()));
+ }
}
}
Schema schema = Schema.createRecord("myschema", null, null, false);
@@ -157,9 +159,11 @@
private void addExtraColumns(GenericRecord record, int rowNum,
ColumnGenerator[] extraCols) {
int colNum = 0;
- for (ColumnGenerator gen : extraCols) {
- if (gen.getColumnParquetSchema() != null) {
- record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+ if (null != extraCols) {
+ for (ColumnGenerator gen : extraCols) {
+ if (gen.getColumnParquetSchema() != null) {
+ record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+ }
}
}
}
@@ -232,6 +236,38 @@
}
}
+ /**
+ * Create the table definition to export and also inserting one records for
+ * identifying the updates. Issue [SQOOP-2846]
+ */
+ private void createTableWithInsert() throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement statement = conn.prepareStatement(getDropTableStatement(getTableName()),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE ");
+ sb.append(getTableName());
+ sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+ sb.append(")");
+ statement = conn.prepareStatement(sb.toString(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ try {
+ statement.executeUpdate();
+ Statement statement2 = conn.createStatement();
+ String insertCmd = "INSERT INTO " + getTableName() + " (ID,MSG) VALUES(" + 0 + ",'testMsg');";
+ statement2.execute(insertCmd);
+ conn.commit();
+ } finally {
+ statement.close();
+ }
+ }
+
/** Verify that on a given row, a column has a given value.
* @param id the id column specifying the row to test.
*/
@@ -369,6 +405,30 @@
verifyExport(TOTAL_RECORDS);
}
+ public void testParquetWithUpdateKey() throws IOException, SQLException {
+ String[] argv = { "--update-key", "ID" };
+ final int TOTAL_RECORDS = 1;
+ createParquetFile(0, TOTAL_RECORDS, null);
+ createTableWithInsert();
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ verifyExport(getMsgPrefix() + "0");
+ }
+
+ // Test Case for Issue [SQOOP-2846]
+ public void testParquetWithUpsert() throws IOException, SQLException {
+ String[] argv = { "--update-key", "ID", "--update-mode", "allowinsert" };
+ final int TOTAL_RECORDS = 2;
+ // ColumnGenerator gen = colGenerator("100",
+ // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
+ createParquetFile(0, TOTAL_RECORDS, null);
+ createTableWithInsert();
+ try {
+ runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+ } catch (Exception e) {
+ // expected
+ assertTrue(true);
+ }
+ }
public void testMissingParquetFields() throws IOException, SQLException {
String[] argv = {};
final int TOTAL_RECORDS = 1;
diff --git a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
index ed0dc31..786bd94 100644
--- a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
@@ -301,8 +301,31 @@
statement.close();
}
- assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal,
- maxMsg);
+ assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal, maxMsg);
+ }
+
+ // Verify Export Method For checking update's : Issue [SQOOP-2846]
+ protected void verifyExport(String expectedValue) throws IOException, SQLException {
+ Connection conn = getConnection();
+ LOG.info("Verifying export: " + getTableName());
+ // Check that we got back the correct number of records.
+ PreparedStatement statement = conn.prepareStatement("SELECT MSG FROM " + getTableName(),
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ String actualValue = null;
+ ResultSet rs = null;
+ try {
+ rs = statement.executeQuery();
+ try {
+ rs.next();
+ actualValue = rs.getString(1);
+ } finally {
+ rs.close();
+ }
+ } finally {
+ statement.close();
+ }
+
+ assertEquals("Got back unexpected row count", expectedValue, actualValue);
}
/**