SQOOP-3178: Incremental Merging for Parquet File Format

(Sandish Kumar HN via Anna Szonyi)
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java
new file mode 100644
index 0000000..31d56a5
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+public class MergeGenericRecordExportMapper<K, V>
+    extends AutoProgressMapper<K, V, Text, MergeRecord> {
+
+  protected MapWritable columnTypes = new MapWritable();
+  private String keyColName;
+  private boolean isNewDatasetSplit;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
+
+    InputSplit inputSplit = context.getInputSplit();
+    FileSplit fileSplit = (FileSplit) inputSplit;
+    Path splitPath = fileSplit.getPath();
+
+    if (splitPath.toString().startsWith(conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
+      this.isNewDatasetSplit = true;
+    } else if (splitPath.toString().startsWith(conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
+      this.isNewDatasetSplit = false;
+    } else {
+      throw new IOException(
+          "File " + splitPath + " is not under new path " + conf.get(MergeJob.MERGE_NEW_PATH_KEY)
+              + " or old path " + conf.get(MergeJob.MERGE_OLD_PATH_KEY));
+    }
+    super.setup(context);
+  }
+
+  protected void processRecord(SqoopRecord sqoopRecord, Context context) throws IOException, InterruptedException {
+    MergeRecord mergeRecord = new MergeRecord(sqoopRecord, isNewDatasetSplit);
+    Map<String, Object> fieldMap = sqoopRecord.getFieldMap();
+    if (null == fieldMap) {
+      throw new IOException("No field map in record " + sqoopRecord);
+    }
+    Object keyObj = fieldMap.get(keyColName);
+    if (null == keyObj) {
+      throw new IOException(
+          "Cannot join values on null key. " + "Did you specify a key column that exists?");
+    } else {
+      context.write(new Text(keyObj.toString()), mergeRecord);
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 8b1cba3..c6be189 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -19,19 +19,21 @@
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -43,6 +45,16 @@
 import org.apache.sqoop.avro.AvroUtil;
 import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
 import org.apache.sqoop.util.Jars;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.schema.MessageType;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.mapreduce.JobBase;
@@ -67,6 +79,8 @@
    */
   public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
 
+  public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema";
+
   public MergeJob(final SqoopOptions opts) {
     super(opts, null, null, null);
   }
@@ -130,6 +144,11 @@
 
       FileType fileType = ExportJobBase.getFileType(jobConf, oldPath);
       switch (fileType) {
+        case PARQUET_FILE:
+          Path finalPath = new Path(options.getTargetDir());
+          finalPath = FileSystemUtil.makeQualified(finalPath, jobConf);
+          configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
+          break;
         case AVRO_DATA_FILE:
           configueAvroMergeJob(conf, job, oldPath, newPath);
           break;
@@ -179,6 +198,51 @@
     job.setReducerClass(MergeAvroReducer.class);
     AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
   }
+
+  private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
+      Path finalPath) throws IOException {
+    try {
+      FileSystem fileSystem = finalPath.getFileSystem(conf);
+      LOG.info("Trying to merge parquet files");
+      job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class);
+      job.setMapperClass(MergeParquetMapper.class);
+      job.setReducerClass(MergeParquetReducer.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      List<Footer> footers = new ArrayList<Footer>();
+      FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
+      FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
+      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
+      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
+
+      MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+      AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+      Schema avroSchema = avroSchemaConverter.convert(schema);
+
+      if (!fileSystem.exists(finalPath)) {
+        Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
+        DatasetKeyOutputFormat.configure(job).overwrite(dataset);
+      } else {
+        DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
+      }
+
+      job.setInputFormatClass(AvroParquetInputFormat.class);
+      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
+
+      conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString());
+      Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
+
+      job.setOutputFormatClass(outClass);
+    } catch (Exception cnfe) {
+      throw new IOException(cnfe);
+    }
+  }
+
+  public static Dataset createDataset(Schema schema, String uri) {
+    DatasetDescriptor descriptor =
+        new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
+    return Datasets.create(uri, descriptor, GenericRecord.class);
+  }
 }
 
 
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java
new file mode 100644
index 0000000..8a5a7ca
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.mapred.Pair;
+
+import org.apache.sqoop.avro.AvroUtil;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+
+public class MergeParquetMapper
+    extends  MergeGenericRecordExportMapper<GenericRecord, GenericRecord> {
+
+  private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
+  private SqoopRecord sqoopRecordImpl;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY);
+    try {
+      final Class<? extends Object> clazz = Class.forName(userClassName, true,
+          Thread.currentThread().getContextClassLoader());
+      sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf);
+      for (final Field field : clazz.getDeclaredFields()) {
+        final String fieldName = field.getName();
+        final String fieldTypeName = field.getType().getName();
+        sqoopRecordFields.put(fieldName.toLowerCase(), new Pair<String, String>(fieldName,
+            fieldTypeName));
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Cannot find the user record class with class name"
+          + userClassName, e);
+    }
+  }
+
+  @Override
+  protected void map(GenericRecord key, GenericRecord val, Context context)
+      throws IOException, InterruptedException {
+    processRecord(toSqoopRecord(val), context);
+  }
+
+  private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException {
+    Schema avroSchema = genericRecord.getSchema();
+    for (Schema.Field field : avroSchema.getFields()) {
+      Pair<String, String> sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase());
+      if (null == sqoopRecordField) {
+        throw new IOException("Cannot find field '" + field.name() + "' in fields of user class"
+            + sqoopRecordImpl.getClass().getName() + ". Fields are: "
+            + Arrays.deepToString(sqoopRecordFields.values().toArray()));
+      }
+      Object avroObject = genericRecord.get(field.name());
+      Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value());
+      sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal);
+    }
+    return sqoopRecordImpl;
+  }
+
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
new file mode 100644
index 0000000..293ffc9
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
@@ -0,0 +1,75 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.avro.AvroUtil;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+
+public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> {
+
+  private Schema schema = null;
+  private boolean bigDecimalFormatString = true;
+  private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema"));
+      bigDecimalFormatString = context.getConfiguration().getBoolean(
+          ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    }
+
+    @Override
+    public void reduce(Text key, Iterable<MergeRecord> vals, Context context)
+        throws IOException, InterruptedException {
+      SqoopRecord bestRecord = null;
+      try {
+        for (MergeRecord mergeRecord : vals) {
+          if (null == bestRecord && !mergeRecord.isNewRecord()) {
+            // Use an old record if we don't have a new record.
+            bestRecord = (SqoopRecord) mergeRecord.getSqoopRecord().clone();
+          } else if (mergeRecord.isNewRecord()) {
+            bestRecord = (SqoopRecord) mergeRecord.getSqoopRecord().clone();
+          }
+        }
+      } catch (CloneNotSupportedException cnse) {
+        throw new IOException(cnse);
+      }
+
+      if (null != bestRecord) {
+        GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
+            bigDecimalFormatString);
+        context.write(outKey, null);
+      }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 78c7758..807ec8c 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -48,6 +48,7 @@
 import com.cloudera.sqoop.metastore.JobData;
 import com.cloudera.sqoop.metastore.JobStorage;
 import com.cloudera.sqoop.metastore.JobStorageFactory;
+import org.apache.sqoop.orm.ClassWriter;
 import com.cloudera.sqoop.orm.TableClassName;
 import com.cloudera.sqoop.util.AppendUtils;
 import com.cloudera.sqoop.util.ClassLoaderStack;
@@ -460,7 +461,12 @@
         options.setTargetDir(destDir.toString());
 
         // Local job tracker needs jars in the classpath.
-        loadJars(options.getConf(), context.getJarFile(), context.getTableName());
+        if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+          loadJars(options.getConf(), context.getJarFile(), ClassWriter.toJavaIdentifier("codegen_" +
+            context.getTableName()));
+        } else {
+          loadJars(options.getConf(), context.getJarFile(), context.getTableName());
+        }
 
         MergeJob mergeJob = new MergeJob(options);
         if (mergeJob.runMergeJob()) {
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
index 114e934..9639f84 100644
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ b/src/test/com/cloudera/sqoop/TestMerge.java
@@ -54,7 +54,11 @@
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Test;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
 
+import static org.apache.avro.generic.GenericData.Record;
 import static org.junit.Assert.fail;
 
 /**
@@ -96,7 +100,7 @@
   }
 
   public static final String TABLE_NAME = "MergeTable";
-  private static final String OLD_PATH = "merge-old";
+  private static final String OLD_PATH = "merge_old";
   private static final String NEW_PATH = "merge_new";
   private static final String FINAL_PATH = "merge_final";
 
@@ -159,6 +163,11 @@
     runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
   }
 
+  @Test
+  public void testParquetFileMerge() throws Exception {
+    runMergeTest(SqoopOptions.FileLayout.ParquetFile);
+  }
+
   public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
     createTable(initRecords);
 
@@ -293,6 +302,27 @@
     return false;
   }
 
+  private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
+  {
+    Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
+    DatasetReader<Record> datasetReader = null;
+    try {
+      datasetReader = parquetRecords.newReader();
+      for (GenericRecord genericRecord : datasetReader) {
+        if (valueMatches(genericRecord, record)) {
+          return true;
+        }
+      }
+    }
+    finally {
+      if (datasetReader != null) {
+        datasetReader.close();
+      }
+    }
+
+    return false;
+  }
+
   protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
       List<Integer> record) throws IOException {
     boolean result = false;
@@ -303,6 +333,9 @@
       case AvroDataFile:
         result = checkAvroFileForLine(fs, p, record);
         break;
+      case ParquetFile:
+        result = checkParquetFileForLine(fs, p, record);
+        break;
     }
     return result;
   }
@@ -326,7 +359,7 @@
 
     for (FileStatus stat : files) {
       Path p = stat.getPath();
-      if (p.getName().startsWith("part-")) {
+      if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) {
         if (checkFileForLine(fs, p, fileLayout, record)) {
           // We found the line. Nothing further to do.
           return true;