SQOOP-3178: Incremental Merging for Parquet File Format
(Sandish Kumar HN via Anna Szonyi)
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java
new file mode 100644
index 0000000..31d56a5
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeGenericRecordExportMapper.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+public class MergeGenericRecordExportMapper<K, V>
+ extends AutoProgressMapper<K, V, Text, MergeRecord> {
+
+ protected MapWritable columnTypes = new MapWritable();
+ private String keyColName;
+ private boolean isNewDatasetSplit;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
+
+ InputSplit inputSplit = context.getInputSplit();
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ Path splitPath = fileSplit.getPath();
+
+ if (splitPath.toString().startsWith(conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
+ this.isNewDatasetSplit = true;
+ } else if (splitPath.toString().startsWith(conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
+ this.isNewDatasetSplit = false;
+ } else {
+ throw new IOException(
+ "File " + splitPath + " is not under new path " + conf.get(MergeJob.MERGE_NEW_PATH_KEY)
+ + " or old path " + conf.get(MergeJob.MERGE_OLD_PATH_KEY));
+ }
+ super.setup(context);
+ }
+
+ protected void processRecord(SqoopRecord sqoopRecord, Context context) throws IOException, InterruptedException {
+ MergeRecord mergeRecord = new MergeRecord(sqoopRecord, isNewDatasetSplit);
+ Map<String, Object> fieldMap = sqoopRecord.getFieldMap();
+ if (null == fieldMap) {
+ throw new IOException("No field map in record " + sqoopRecord);
+ }
+ Object keyObj = fieldMap.get(keyColName);
+ if (null == keyObj) {
+ throw new IOException(
+ "Cannot join values on null key. " + "Did you specify a key column that exists?");
+ } else {
+ context.write(new Text(keyObj.toString()), mergeRecord);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 8b1cba3..c6be189 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -19,19 +19,21 @@
package org.apache.sqoop.mapreduce;
import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -43,6 +45,16 @@
import org.apache.sqoop.avro.AvroUtil;
import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
import org.apache.sqoop.util.Jars;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.schema.MessageType;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JobBase;
@@ -67,6 +79,8 @@
*/
public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
+ public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema";
+
public MergeJob(final SqoopOptions opts) {
super(opts, null, null, null);
}
@@ -130,6 +144,11 @@
FileType fileType = ExportJobBase.getFileType(jobConf, oldPath);
switch (fileType) {
+ case PARQUET_FILE:
+ Path finalPath = new Path(options.getTargetDir());
+ finalPath = FileSystemUtil.makeQualified(finalPath, jobConf);
+ configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
+ break;
case AVRO_DATA_FILE:
configueAvroMergeJob(conf, job, oldPath, newPath);
break;
@@ -179,6 +198,51 @@
job.setReducerClass(MergeAvroReducer.class);
AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
+
+ private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
+ Path finalPath) throws IOException {
+ try {
+ FileSystem fileSystem = finalPath.getFileSystem(conf);
+ LOG.info("Trying to merge parquet files");
+ job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class);
+ job.setMapperClass(MergeParquetMapper.class);
+ job.setReducerClass(MergeParquetReducer.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ List<Footer> footers = new ArrayList<Footer>();
+ FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
+ FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
+ footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
+ footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
+
+ MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+ AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+ Schema avroSchema = avroSchemaConverter.convert(schema);
+
+ if (!fileSystem.exists(finalPath)) {
+ Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
+ DatasetKeyOutputFormat.configure(job).overwrite(dataset);
+ } else {
+ DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
+ }
+
+ job.setInputFormatClass(AvroParquetInputFormat.class);
+ AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
+
+ conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString());
+ Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
+
+ job.setOutputFormatClass(outClass);
+ } catch (Exception cnfe) {
+ throw new IOException(cnfe);
+ }
+ }
+
+ public static Dataset createDataset(Schema schema, String uri) {
+ DatasetDescriptor descriptor =
+ new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
+ return Datasets.create(uri, descriptor, GenericRecord.class);
+ }
}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java
new file mode 100644
index 0000000..8a5a7ca
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetMapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.mapred.Pair;
+
+import org.apache.sqoop.avro.AvroUtil;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+
+public class MergeParquetMapper
+ extends MergeGenericRecordExportMapper<GenericRecord, GenericRecord> {
+
+ private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
+ private SqoopRecord sqoopRecordImpl;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ final String userClassName = conf.get(MergeJob.MERGE_SQOOP_RECORD_KEY);
+ try {
+ final Class<? extends Object> clazz = Class.forName(userClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ sqoopRecordImpl = (SqoopRecord) ReflectionUtils.newInstance(clazz, conf);
+ for (final Field field : clazz.getDeclaredFields()) {
+ final String fieldName = field.getName();
+ final String fieldTypeName = field.getType().getName();
+ sqoopRecordFields.put(fieldName.toLowerCase(), new Pair<String, String>(fieldName,
+ fieldTypeName));
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find the user record class with class name"
+ + userClassName, e);
+ }
+ }
+
+ @Override
+ protected void map(GenericRecord key, GenericRecord val, Context context)
+ throws IOException, InterruptedException {
+ processRecord(toSqoopRecord(val), context);
+ }
+
+ private SqoopRecord toSqoopRecord(GenericRecord genericRecord) throws IOException {
+ Schema avroSchema = genericRecord.getSchema();
+ for (Schema.Field field : avroSchema.getFields()) {
+ Pair<String, String> sqoopRecordField = sqoopRecordFields.get(field.name().toLowerCase());
+ if (null == sqoopRecordField) {
+ throw new IOException("Cannot find field '" + field.name() + "' in fields of user class"
+ + sqoopRecordImpl.getClass().getName() + ". Fields are: "
+ + Arrays.deepToString(sqoopRecordFields.values().toArray()));
+ }
+ Object avroObject = genericRecord.get(field.name());
+ Object fieldVal = AvroUtil.fromAvro(avroObject, field.schema(), sqoopRecordField.value());
+ sqoopRecordImpl.setField(sqoopRecordField.key(), fieldVal);
+ }
+ return sqoopRecordImpl;
+ }
+
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
new file mode 100644
index 0000000..293ffc9
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
@@ -0,0 +1,75 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.avro.AvroUtil;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+
+public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> {
+
+ private Schema schema = null;
+ private boolean bigDecimalFormatString = true;
+ private Map<String, Pair<String, String>> sqoopRecordFields = new HashMap<String, Pair<String, String>>();
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema"));
+ bigDecimalFormatString = context.getConfiguration().getBoolean(
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+ }
+
+ @Override
+ public void reduce(Text key, Iterable<MergeRecord> vals, Context context)
+ throws IOException, InterruptedException {
+ SqoopRecord bestRecord = null;
+ try {
+ for (MergeRecord mergeRecord : vals) {
+ if (null == bestRecord && !mergeRecord.isNewRecord()) {
+ // Use an old record if we don't have a new record.
+ bestRecord = (SqoopRecord) mergeRecord.getSqoopRecord().clone();
+ } else if (mergeRecord.isNewRecord()) {
+ bestRecord = (SqoopRecord) mergeRecord.getSqoopRecord().clone();
+ }
+ }
+ } catch (CloneNotSupportedException cnse) {
+ throw new IOException(cnse);
+ }
+
+ if (null != bestRecord) {
+ GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
+ bigDecimalFormatString);
+ context.write(outKey, null);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 78c7758..807ec8c 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -48,6 +48,7 @@
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.metastore.JobStorage;
import com.cloudera.sqoop.metastore.JobStorageFactory;
+import org.apache.sqoop.orm.ClassWriter;
import com.cloudera.sqoop.orm.TableClassName;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ClassLoaderStack;
@@ -460,7 +461,12 @@
options.setTargetDir(destDir.toString());
// Local job tracker needs jars in the classpath.
- loadJars(options.getConf(), context.getJarFile(), context.getTableName());
+ if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+ loadJars(options.getConf(), context.getJarFile(), ClassWriter.toJavaIdentifier("codegen_" +
+ context.getTableName()));
+ } else {
+ loadJars(options.getConf(), context.getJarFile(), context.getTableName());
+ }
MergeJob mergeJob = new MergeJob(options);
if (mergeJob.runMergeJob()) {
diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java
index 114e934..9639f84 100644
--- a/src/test/com/cloudera/sqoop/TestMerge.java
+++ b/src/test/com/cloudera/sqoop/TestMerge.java
@@ -54,7 +54,11 @@
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
+import static org.apache.avro.generic.GenericData.Record;
import static org.junit.Assert.fail;
/**
@@ -96,7 +100,7 @@
}
public static final String TABLE_NAME = "MergeTable";
- private static final String OLD_PATH = "merge-old";
+ private static final String OLD_PATH = "merge_old";
private static final String NEW_PATH = "merge_new";
private static final String FINAL_PATH = "merge_final";
@@ -159,6 +163,11 @@
runMergeTest(SqoopOptions.FileLayout.AvroDataFile);
}
+ @Test
+ public void testParquetFileMerge() throws Exception {
+ runMergeTest(SqoopOptions.FileLayout.ParquetFile);
+ }
+
public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception {
createTable(initRecords);
@@ -293,6 +302,27 @@
return false;
}
+ private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
+ {
+ Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
+ DatasetReader<Record> datasetReader = null;
+ try {
+ datasetReader = parquetRecords.newReader();
+ for (GenericRecord genericRecord : datasetReader) {
+ if (valueMatches(genericRecord, record)) {
+ return true;
+ }
+ }
+ }
+ finally {
+ if (datasetReader != null) {
+ datasetReader.close();
+ }
+ }
+
+ return false;
+ }
+
protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout,
List<Integer> record) throws IOException {
boolean result = false;
@@ -303,6 +333,9 @@
case AvroDataFile:
result = checkAvroFileForLine(fs, p, record);
break;
+ case ParquetFile:
+ result = checkParquetFileForLine(fs, p, record);
+ break;
}
return result;
}
@@ -326,7 +359,7 @@
for (FileStatus stat : files) {
Path p = stat.getPath();
- if (p.getName().startsWith("part-")) {
+ if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) {
if (checkFileForLine(fs, p, fileLayout, record)) {
// We found the line. Nothing further to do.
return true;