SAMZA-2560: Generate SamzaSQLRelRecord using avro schema of input event instead of cached-schema. (#1401)
* Generate SamzaSQLRelRecord using schema of input avro-record rather than the cached-schema.
* Fix checkstyle violations.
* Address review comments.
* Address review comments
Add unit tests for non-nullable union conversion with nested records.
* Fix rat failures.
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index 9afad9c..b7cee00 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -125,11 +125,18 @@
.collect(Collectors.toList()));
}
- private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord, Schema schema) {
+ private static SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) {
List<Object> fieldValues = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (avroRecord != null) {
- fetchFieldNamesAndValuesFromIndexedRecord(avroRecord, fieldNames, fieldValues, schema);
+ fieldNames.addAll(
+ avroRecord.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
+ fieldValues.addAll(avroRecord.getSchema()
+ .getFields()
+ .stream()
+ .map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()),
+ getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema())))
+ .collect(Collectors.toList()));
} else {
String msg = "Avro Record is null";
LOG.error(msg);
@@ -224,7 +231,7 @@
}
switch (schema.getType()) {
case RECORD:
- return convertToRelRecord((IndexedRecord) avroObj, schema);
+ return convertToRelRecord((IndexedRecord) avroObj);
case ARRAY: {
ArrayList<Object> retVal = new ArrayList<>();
List<Object> avroArray;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 9d8201c..9dc4c1e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -81,11 +81,13 @@
this.streamId = String.format("%s-%s", systemName, streamName);
samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
- Validate.notEmpty(samzaRelConverterName, String.format("System %s is not supported. Please check if the system name is provided correctly.", systemName));
+ Validate.notEmpty(samzaRelConverterName, String.format("System %s is not supported."
+ + "Please check if the system name is provided correctly.", systemName));
if (isRemoteTable()) {
samzaRelTableKeyConverterName = streamConfigs.get(CFG_SAMZA_REL_TABLE_KEY_CONVERTER);
- Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System %s is not supported. Please check if the system name is provided correctly.", systemName));
+ Validate.notEmpty(samzaRelTableKeyConverterName, String.format("System %s is not supported. "
+ + "Please check if the system name is provided correctly.", systemName));
} else {
samzaRelTableKeyConverterName = "";
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
index ec519f2..dccdd7d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
@@ -50,6 +50,7 @@
import org.apache.samza.operators.KV;
import org.apache.samza.sql.avro.schemas.AddressRecord;
import org.apache.samza.sql.avro.schemas.ComplexRecord;
+import org.apache.samza.sql.avro.schemas.ComplexUnion;
import org.apache.samza.sql.avro.schemas.Kind;
import org.apache.samza.sql.avro.schemas.MyFixed;
import org.apache.samza.sql.avro.schemas.PhoneNumber;
@@ -76,9 +77,11 @@
private final AvroRelConverter simpleRecordAvroRelConverter;
private final AvroRelConverter complexRecordAvroRelConverter;
private final AvroRelConverter nestedRecordAvroRelConverter;
+ private final AvroRelConverter complexUnionAvroRelConverter;
private final AvroRelSchemaProvider simpleRecordSchemaProvider;
private final AvroRelSchemaProvider complexRecordSchemaProvider;
private final AvroRelSchemaProvider nestedRecordSchemaProvider;
+ private final AvroRelSchemaProvider complexUnionSchemaProvider;
private int id = 1;
private boolean boolValue = true;
@@ -102,6 +105,7 @@
SystemStream ss1 = new SystemStream("test", "complexRecord");
SystemStream ss2 = new SystemStream("test", "simpleRecord");
SystemStream ss3 = new SystemStream("test", "nestedRecord");
+ SystemStream ss4 = new SystemStream("test", "complexUnion");
props.put(
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()),
ComplexRecord.SCHEMA$.toString());
@@ -111,15 +115,20 @@
props.put(
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss3.getSystem(), ss3.getStream()),
Profile.SCHEMA$.toString());
+ props.put(
+ String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss4.getSystem(), ss4.getStream()),
+ ComplexUnion.SCHEMA$.toString());
ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory();
complexRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
simpleRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss2, new MapConfig(props));
nestedRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss3, new MapConfig(props));
+ complexUnionSchemaProvider = (AvroRelSchemaProvider) factory.create(ss3, new MapConfig(props));
complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemaProvider, new MapConfig());
simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig());
nestedRecordAvroRelConverter = new AvroRelConverter(ss3, nestedRecordSchemaProvider, new MapConfig());
+ complexUnionAvroRelConverter = new AvroRelConverter(ss4, complexUnionSchemaProvider, new MapConfig());
fixedBytes.bytes(DEFAULT_TRACKING_ID_BYTES);
}
@@ -232,6 +241,42 @@
}
@Test
+ public void testComplexUnionConversionShouldWorkWithBothStringAndIntTypes() throws Exception {
+ // ComplexUnion is a nested avro non-nullable union-type with both String and Integer type
+ // Test the complex-union conversion for String type.
+ GenericData.Record record = new GenericData.Record(ComplexUnion.SCHEMA$);
+ record.put("non_nullable_union_value", testStrValue);
+
+ ComplexUnion complexUnion = new ComplexUnion();
+ complexUnion.non_nullable_union_value = testStrValue;
+
+ byte[] serializedData = bytesFromGenericRecord(record);
+ GenericRecord genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+ SamzaSqlRelMessage message = complexUnionAvroRelConverter.convertToRelMessage(new KV<>("key", genericRecord));
+
+ Assert.assertEquals(testStrValue, message.getSamzaSqlRelRecord().getField("non_nullable_union_value").get().toString());
+
+ serializedData = encodeAvroSpecificRecord(ComplexUnion.class, complexUnion);
+ genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+ Assert.assertEquals(testStrValue, genericRecord.get("non_nullable_union_value").toString());
+
+ // Testing the complex-union conversion for Integer type
+ record.put("non_nullable_union_value", Integer.valueOf(123));
+
+ complexUnion.non_nullable_union_value = Integer.valueOf(123);
+
+ serializedData = bytesFromGenericRecord(record);
+ genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+ message = complexUnionAvroRelConverter.convertToRelMessage(new KV<>("key", genericRecord));
+ Assert.assertEquals(Integer.valueOf(123), message.getSamzaSqlRelRecord().getField("non_nullable_union_value").get());
+
+ serializedData = encodeAvroSpecificRecord(ComplexUnion.class, complexUnion);
+ genericRecord = genericRecordFromBytes(serializedData, ComplexUnion.SCHEMA$);
+ Assert.assertEquals(Integer.valueOf(123), genericRecord.get("non_nullable_union_value"));
+
+ }
+
+ @Test
public void testNestedRecordConversion() throws IOException {
GenericData.Record record = new GenericData.Record(Profile.SCHEMA$);
record.put("id", 1);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.avsc
new file mode 100644
index 0000000..d94417b
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.avsc
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+ "name": "ComplexUnion",
+ "version" : 1,
+ "namespace": "org.apache.samza.sql.avro.schemas",
+ "type": "record",
+ "fields": [
+ {
+ "name": "non_nullable_union_value",
+ "doc": "union Value.",
+ "type": ["int", "string"]
+ }
+ ]
+}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.java
new file mode 100644
index 0000000..dd746e1
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexUnion.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.avro.schemas;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class ComplexUnion extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ComplexUnion\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"non_nullable_union_value\",\"type\":[\"int\",\"string\"],\"doc\":\"union Value.\"}],\"version\":1}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ /** union Value. */
+ @Deprecated public java.lang.Object non_nullable_union_value;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use <code>newBuilder()</code>.
+ */
+ public ComplexUnion() {}
+
+ /**
+ * All-args constructor.
+ */
+ public ComplexUnion(java.lang.Object non_nullable_union_value) {
+ this.non_nullable_union_value = non_nullable_union_value;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return non_nullable_union_value;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: non_nullable_union_value = (java.lang.Object)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'non_nullable_union_value' field.
+ * union Value. */
+ public java.lang.Object getNonNullableUnionValue() {
+ return non_nullable_union_value;
+ }
+
+ /**
+ * Sets the value of the 'non_nullable_union_value' field.
+ * union Value. * @param value the value to set.
+ */
+ public void setNonNullableUnionValue(java.lang.Object value) {
+ this.non_nullable_union_value = value;
+ }
+
+ /** Creates a new ComplexUnion RecordBuilder */
+ public static org.apache.samza.sql.avro.schemas.ComplexUnion.Builder newBuilder() {
+ return new org.apache.samza.sql.avro.schemas.ComplexUnion.Builder();
+ }
+
+ /** Creates a new ComplexUnion RecordBuilder by copying an existing Builder */
+ public static org.apache.samza.sql.avro.schemas.ComplexUnion.Builder newBuilder(org.apache.samza.sql.avro.schemas.ComplexUnion.Builder other) {
+ return new org.apache.samza.sql.avro.schemas.ComplexUnion.Builder(other);
+ }
+
+ /** Creates a new ComplexUnion RecordBuilder by copying an existing ComplexUnion instance */
+ public static org.apache.samza.sql.avro.schemas.ComplexUnion.Builder newBuilder(org.apache.samza.sql.avro.schemas.ComplexUnion other) {
+ return new org.apache.samza.sql.avro.schemas.ComplexUnion.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for ComplexUnion instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<ComplexUnion>
+ implements org.apache.avro.data.RecordBuilder<ComplexUnion> {
+
+ private java.lang.Object non_nullable_union_value;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.samza.sql.avro.schemas.ComplexUnion.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.samza.sql.avro.schemas.ComplexUnion.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.non_nullable_union_value)) {
+ this.non_nullable_union_value = data().deepCopy(fields()[0].schema(), other.non_nullable_union_value);
+ fieldSetFlags()[0] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing ComplexUnion instance */
+ private Builder(org.apache.samza.sql.avro.schemas.ComplexUnion other) {
+ super(org.apache.samza.sql.avro.schemas.ComplexUnion.SCHEMA$);
+ if (isValidValue(fields()[0], other.non_nullable_union_value)) {
+ this.non_nullable_union_value = data().deepCopy(fields()[0].schema(), other.non_nullable_union_value);
+ fieldSetFlags()[0] = true;
+ }
+ }
+
+ /** Gets the value of the 'non_nullable_union_value' field */
+ public java.lang.Object getNonNullableUnionValue() {
+ return non_nullable_union_value;
+ }
+
+ /** Sets the value of the 'non_nullable_union_value' field */
+ public org.apache.samza.sql.avro.schemas.ComplexUnion.Builder setNonNullableUnionValue(java.lang.Object value) {
+ validate(fields()[0], value);
+ this.non_nullable_union_value = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'non_nullable_union_value' field has been set */
+ public boolean hasNonNullableUnionValue() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'non_nullable_union_value' field */
+ public org.apache.samza.sql.avro.schemas.ComplexUnion.Builder clearNonNullableUnionValue() {
+ non_nullable_union_value = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ @Override
+ public ComplexUnion build() {
+ try {
+ ComplexUnion record = new ComplexUnion();
+ record.non_nullable_union_value = fieldSetFlags()[0] ? this.non_nullable_union_value : (java.lang.Object) defaultValue(fields()[0]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}