[HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084)
diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
index 975151c..73711c70 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
@@ -53,10 +53,12 @@
this(record.get(), (record1) -> 0); // natural order
}
- @Override
- public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
- throws IOException {
- IndexedRecord insertValue = getInsertValue(schema).get();
+ /**
+ *
+ * Handle a possible delete - check for "D" in Op column and return empty row if found.
+ * @param insertValue The new row that is being "inserted".
+ */
+ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) throws IOException {
boolean delete = false;
if (insertValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertValue;
@@ -65,4 +67,17 @@
return delete ? Option.empty() : Option.of(insertValue);
}
+
+ @Override
+ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
+ IndexedRecord insertValue = super.getInsertValue(schema).get();
+ return handleDeleteOperation(insertValue);
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
+ throws IOException {
+ IndexedRecord insertValue = super.getInsertValue(schema).get();
+ return handleDeleteOperation(insertValue);
+ }
}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
new file mode 100644
index 0000000..802096a
--- /dev/null
+++ b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.payload;
+
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestAWSDmsAvroPayload {
+
+ private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\","
+ + "\"name\": \"events\"," + "\"fields\": [ "
+ + "{\"name\": \"field1\", \"type\" : \"int\"},"
+ + "{\"name\": \"Op\", \"type\": \"string\"}"
+ + "]}";
+
+ @Test
+ public void testInsert() {
+
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("field1", 0);
+ record.put("Op", "I");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record));
+
+ try {
+ Option<IndexedRecord> outputPayload = payload.getInsertValue(avroSchema);
+ assertTrue((int) outputPayload.get().get(0) == 0);
+ assertTrue(outputPayload.get().get(1).toString().equals("I"));
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ }
+
+ @Test
+ public void testUpdate() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord newRecord = new GenericData.Record(avroSchema);
+ newRecord.put("field1", 1);
+ newRecord.put("Op", "U");
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 0);
+ oldRecord.put("Op", "I");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord));
+
+ try {
+ Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema);
+ assertTrue((int) outputPayload.get().get(0) == 1);
+ assertTrue(outputPayload.get().get(1).toString().equals("U"));
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ }
+
+ @Test
+ public void testDelete() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord deleteRecord = new GenericData.Record(avroSchema);
+ deleteRecord.put("field1", 2);
+ deleteRecord.put("Op", "D");
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 2);
+ oldRecord.put("Op", "U");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord));
+
+ try {
+ Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema);
+ // expect nothing to be comitted to table
+ assertFalse(outputPayload.isPresent());
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ }
+
+ @Test
+ public void testPreCombineWithDelete() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord deleteRecord = new GenericData.Record(avroSchema);
+ deleteRecord.put("field1", 4);
+ deleteRecord.put("Op", "D");
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 4);
+ oldRecord.put("Op", "I");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord));
+ AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord));
+
+ try {
+ OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload);
+ Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema);
+ // expect nothing to be comitted to table
+ assertFalse(outputPayload.isPresent());
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+ }
+}