blob: b3515551ad9de4534c4ccc6e59751fa61b716f9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import static java.util.Arrays.asList;
import static org.apache.avro.SchemaCompatibility.Incompatibility;
import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityResult;
import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
import static org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType;
import static org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
import static org.apache.avro.TestSchemas.A_DINT_B_DINT_RECORD1;
import static org.apache.avro.TestSchemas.A_DINT_RECORD1;
import static org.apache.avro.TestSchemas.A_INT_B_DINT_RECORD1;
import static org.apache.avro.TestSchemas.A_INT_B_INT_RECORD1;
import static org.apache.avro.TestSchemas.A_INT_RECORD1;
import static org.apache.avro.TestSchemas.A_LONG_RECORD1;
import static org.apache.avro.TestSchemas.BOOLEAN_SCHEMA;
import static org.apache.avro.TestSchemas.BYTES_SCHEMA;
import static org.apache.avro.TestSchemas.BYTES_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.DOUBLE_SCHEMA;
import static org.apache.avro.TestSchemas.DOUBLE_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.EMPTY_RECORD1;
import static org.apache.avro.TestSchemas.EMPTY_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.ENUM1_ABC_SCHEMA;
import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA;
import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA_DEFAULT;
import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA_NAMESPACE_1;
import static org.apache.avro.TestSchemas.ENUM1_AB_SCHEMA_NAMESPACE_2;
import static org.apache.avro.TestSchemas.ENUM1_BC_SCHEMA;
import static org.apache.avro.TestSchemas.ENUM_ABC_ENUM_DEFAULT_A_RECORD;
import static org.apache.avro.TestSchemas.ENUM_ABC_ENUM_DEFAULT_A_SCHEMA;
import static org.apache.avro.TestSchemas.ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD;
import static org.apache.avro.TestSchemas.ENUM_AB_ENUM_DEFAULT_A_RECORD;
import static org.apache.avro.TestSchemas.ENUM_AB_ENUM_DEFAULT_A_SCHEMA;
import static org.apache.avro.TestSchemas.ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD;
import static org.apache.avro.TestSchemas.FIXED_4_BYTES;
import static org.apache.avro.TestSchemas.FLOAT_SCHEMA;
import static org.apache.avro.TestSchemas.FLOAT_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.INT_ARRAY_SCHEMA;
import static org.apache.avro.TestSchemas.INT_FLOAT_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.INT_LIST_RECORD;
import static org.apache.avro.TestSchemas.INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.INT_LONG_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.INT_MAP_SCHEMA;
import static org.apache.avro.TestSchemas.INT_SCHEMA;
import static org.apache.avro.TestSchemas.INT_STRING_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.INT_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.LONG_ARRAY_SCHEMA;
import static org.apache.avro.TestSchemas.LONG_LIST_RECORD;
import static org.apache.avro.TestSchemas.LONG_MAP_SCHEMA;
import static org.apache.avro.TestSchemas.LONG_SCHEMA;
import static org.apache.avro.TestSchemas.LONG_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.NS_RECORD1;
import static org.apache.avro.TestSchemas.NS_RECORD2;
import static org.apache.avro.TestSchemas.WITH_NS;
import static org.apache.avro.TestSchemas.WITHOUT_NS;
import static org.apache.avro.TestSchemas.NULL_SCHEMA;
import static org.apache.avro.TestSchemas.ReaderWriter;
import static org.apache.avro.TestSchemas.STRING_ARRAY_SCHEMA;
import static org.apache.avro.TestSchemas.STRING_INT_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.STRING_SCHEMA;
import static org.apache.avro.TestSchemas.STRING_UNION_SCHEMA;
import static org.apache.avro.TestSchemas.assertSchemaContains;
import static org.apache.avro.TestSchemas.list;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.EnumSymbol;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Unit-tests for SchemaCompatibility.
*/
public class TestSchemaCompatibility {
private static final Logger LOG = LoggerFactory.getLogger(TestSchemaCompatibility.class);
// -----------------------------------------------------------------------------------------------
private static final Schema WRITER_SCHEMA = Schema.createRecord(list(
new Schema.Field("oldfield1", INT_SCHEMA, null, null), new Schema.Field("oldfield2", STRING_SCHEMA, null, null)));
@Test
public void testValidateSchemaPairMissingField() {
final List<Field> readerFields = list(new Schema.Field("oldfield1", INT_SCHEMA, null, null));
final Schema reader = Schema.createRecord(readerFields);
final SchemaCompatibility.SchemaPairCompatibility expectedResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.compatible(), reader, WRITER_SCHEMA,
SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
// Test omitting a field.
assertEquals(expectedResult, checkReaderWriterCompatibility(reader, WRITER_SCHEMA));
}
@Test
public void testValidateSchemaPairMissingSecondField() {
final List<Schema.Field> readerFields = list(new Schema.Field("oldfield2", STRING_SCHEMA, null, null));
final Schema reader = Schema.createRecord(readerFields);
final SchemaCompatibility.SchemaPairCompatibility expectedResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.compatible(), reader, WRITER_SCHEMA,
SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
// Test omitting other field.
assertEquals(expectedResult, checkReaderWriterCompatibility(reader, WRITER_SCHEMA));
}
@Test
public void testValidateSchemaPairAllFields() {
final List<Schema.Field> readerFields = list(new Schema.Field("oldfield1", INT_SCHEMA, null, null),
new Schema.Field("oldfield2", STRING_SCHEMA, null, null));
final Schema reader = Schema.createRecord(readerFields);
final SchemaCompatibility.SchemaPairCompatibility expectedResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.compatible(), reader, WRITER_SCHEMA,
SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
// Test with all fields.
assertEquals(expectedResult, checkReaderWriterCompatibility(reader, WRITER_SCHEMA));
}
@Test
public void testValidateSchemaNewFieldWithDefault() {
final List<Schema.Field> readerFields = list(new Schema.Field("oldfield1", INT_SCHEMA, null, null),
new Schema.Field("newfield1", INT_SCHEMA, null, 42));
final Schema reader = Schema.createRecord(readerFields);
final SchemaCompatibility.SchemaPairCompatibility expectedResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.compatible(), reader, WRITER_SCHEMA,
SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
// Test new field with default value.
assertEquals(expectedResult, checkReaderWriterCompatibility(reader, WRITER_SCHEMA));
}
@Test
public void testValidateSchemaNewField() {
final List<Schema.Field> readerFields = list(new Schema.Field("oldfield1", INT_SCHEMA, null, null),
new Schema.Field("newfield1", INT_SCHEMA, null, null));
final Schema reader = Schema.createRecord(readerFields);
SchemaPairCompatibility compatibility = checkReaderWriterCompatibility(reader, WRITER_SCHEMA);
// Test new field without default value.
assertEquals(SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE, compatibility.getType());
assertEquals(SchemaCompatibility.SchemaCompatibilityResult.incompatible(
SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE, reader, WRITER_SCHEMA, "newfield1",
asList("", "fields", "1")), compatibility.getResult());
assertEquals(String.format(
"Data encoded using writer schema:%n%s%n" + "will or may fail to decode using reader schema:%n%s%n",
WRITER_SCHEMA.toString(true), reader.toString(true)), compatibility.getDescription());
assertEquals(reader, compatibility.getReader());
assertEquals(WRITER_SCHEMA, compatibility.getWriter());
}
@Test
public void testValidateArrayWriterSchema() {
final Schema validReader = Schema.createArray(STRING_SCHEMA);
final Schema invalidReader = Schema.createMap(STRING_SCHEMA);
final SchemaCompatibility.SchemaPairCompatibility validResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.compatible(), validReader, STRING_ARRAY_SCHEMA,
SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
final SchemaCompatibility.SchemaPairCompatibility invalidResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.incompatible(
SchemaIncompatibilityType.TYPE_MISMATCH, invalidReader, STRING_ARRAY_SCHEMA,
"reader type: MAP not compatible with writer type: ARRAY", Collections.singletonList("")),
invalidReader, STRING_ARRAY_SCHEMA,
String.format(
"Data encoded using writer schema:%n%s%n" + "will or may fail to decode using reader schema:%n%s%n",
STRING_ARRAY_SCHEMA.toString(true), invalidReader.toString(true)));
assertEquals(validResult, checkReaderWriterCompatibility(validReader, STRING_ARRAY_SCHEMA));
assertEquals(invalidResult, checkReaderWriterCompatibility(invalidReader, STRING_ARRAY_SCHEMA));
}
@Test
public void testValidatePrimitiveWriterSchema() {
final Schema validReader = Schema.create(Schema.Type.STRING);
final SchemaCompatibility.SchemaPairCompatibility validResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.compatible(), validReader, STRING_SCHEMA,
SchemaCompatibility.READER_WRITER_COMPATIBLE_MESSAGE);
final SchemaCompatibility.SchemaPairCompatibility invalidResult = new SchemaCompatibility.SchemaPairCompatibility(
SchemaCompatibility.SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.TYPE_MISMATCH, INT_SCHEMA,
STRING_SCHEMA, "reader type: INT not compatible with writer type: STRING", Collections.singletonList("")),
INT_SCHEMA, STRING_SCHEMA,
String.format(
"Data encoded using writer schema:%n%s%n" + "will or may fail to decode using reader schema:%n%s%n",
STRING_SCHEMA.toString(true), INT_SCHEMA.toString(true)));
assertEquals(validResult, checkReaderWriterCompatibility(validReader, STRING_SCHEMA));
assertEquals(invalidResult, checkReaderWriterCompatibility(INT_SCHEMA, STRING_SCHEMA));
}
/**
* Reader union schema must contain all writer union branches.
*/
@Test
public void testUnionReaderWriterSubsetIncompatibility() {
final Schema unionWriter = Schema.createUnion(list(INT_SCHEMA, STRING_SCHEMA, LONG_SCHEMA));
final Schema unionReader = Schema.createUnion(list(INT_SCHEMA, STRING_SCHEMA));
final SchemaPairCompatibility result = checkReaderWriterCompatibility(unionReader, unionWriter);
assertEquals(SchemaCompatibilityType.INCOMPATIBLE, result.getType());
}
// -----------------------------------------------------------------------------------------------
/**
* Collection of reader/writer schema pair that are compatible.
*/
public static final List<ReaderWriter> COMPATIBLE_READER_WRITER_TEST_CASES = list(
new ReaderWriter(BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
new ReaderWriter(INT_SCHEMA, INT_SCHEMA),
new ReaderWriter(LONG_SCHEMA, INT_SCHEMA), new ReaderWriter(LONG_SCHEMA, LONG_SCHEMA),
// Avro spec says INT/LONG can be promoted to FLOAT/DOUBLE.
// This is arguable as this causes a loss of precision.
new ReaderWriter(FLOAT_SCHEMA, INT_SCHEMA), new ReaderWriter(FLOAT_SCHEMA, LONG_SCHEMA),
new ReaderWriter(DOUBLE_SCHEMA, LONG_SCHEMA),
new ReaderWriter(DOUBLE_SCHEMA, INT_SCHEMA), new ReaderWriter(DOUBLE_SCHEMA, FLOAT_SCHEMA),
new ReaderWriter(STRING_SCHEMA, STRING_SCHEMA),
new ReaderWriter(BYTES_SCHEMA, BYTES_SCHEMA),
new ReaderWriter(INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA), new ReaderWriter(LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
new ReaderWriter(INT_MAP_SCHEMA, INT_MAP_SCHEMA), new ReaderWriter(LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
new ReaderWriter(ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA), new ReaderWriter(ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
new ReaderWriter(ENUM1_AB_SCHEMA_DEFAULT, ENUM1_ABC_SCHEMA),
new ReaderWriter(ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA_NAMESPACE_1),
new ReaderWriter(ENUM1_AB_SCHEMA_NAMESPACE_1, ENUM1_AB_SCHEMA),
new ReaderWriter(ENUM1_AB_SCHEMA_NAMESPACE_1, ENUM1_AB_SCHEMA_NAMESPACE_2),
// String-to/from-bytes, introduced in Avro 1.7.7
new ReaderWriter(STRING_SCHEMA, BYTES_SCHEMA), new ReaderWriter(BYTES_SCHEMA, STRING_SCHEMA),
// Tests involving unions:
new ReaderWriter(EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
new ReaderWriter(FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA), new ReaderWriter(FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
new ReaderWriter(FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
new ReaderWriter(FLOAT_UNION_SCHEMA, INT_LONG_UNION_SCHEMA), new ReaderWriter(INT_UNION_SCHEMA, INT_UNION_SCHEMA),
new ReaderWriter(INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
new ReaderWriter(INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA), new ReaderWriter(LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
new ReaderWriter(LONG_UNION_SCHEMA, INT_UNION_SCHEMA), new ReaderWriter(FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA), new ReaderWriter(FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
new ReaderWriter(DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
new ReaderWriter(FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
new ReaderWriter(DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
new ReaderWriter(STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
new ReaderWriter(STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
new ReaderWriter(BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
new ReaderWriter(BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
new ReaderWriter(DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
// Readers capable of reading all branches of a union are compatible
new ReaderWriter(FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA), new ReaderWriter(LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
new ReaderWriter(DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
new ReaderWriter(DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
// Special case of singleton unions:
new ReaderWriter(FLOAT_SCHEMA, FLOAT_UNION_SCHEMA), new ReaderWriter(INT_UNION_SCHEMA, INT_SCHEMA),
new ReaderWriter(INT_SCHEMA, INT_UNION_SCHEMA),
// Fixed types
new ReaderWriter(FIXED_4_BYTES, FIXED_4_BYTES),
// Tests involving records:
new ReaderWriter(EMPTY_RECORD1, EMPTY_RECORD1), new ReaderWriter(EMPTY_RECORD1, A_INT_RECORD1),
new ReaderWriter(A_INT_RECORD1, A_INT_RECORD1), new ReaderWriter(A_DINT_RECORD1, A_INT_RECORD1),
new ReaderWriter(A_DINT_RECORD1, A_DINT_RECORD1), new ReaderWriter(A_INT_RECORD1, A_DINT_RECORD1),
new ReaderWriter(A_LONG_RECORD1, A_INT_RECORD1),
new ReaderWriter(A_INT_RECORD1, A_INT_B_INT_RECORD1), new ReaderWriter(A_DINT_RECORD1, A_INT_B_INT_RECORD1),
new ReaderWriter(A_INT_B_DINT_RECORD1, A_INT_RECORD1), new ReaderWriter(A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
new ReaderWriter(A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
new ReaderWriter(A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
new ReaderWriter(INT_LIST_RECORD, INT_LIST_RECORD), new ReaderWriter(LONG_LIST_RECORD, LONG_LIST_RECORD),
new ReaderWriter(LONG_LIST_RECORD, INT_LIST_RECORD),
new ReaderWriter(NULL_SCHEMA, NULL_SCHEMA),
new ReaderWriter(ENUM_AB_ENUM_DEFAULT_A_RECORD, ENUM_ABC_ENUM_DEFAULT_A_RECORD),
new ReaderWriter(ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD, ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD),
// This is comparing two records that have an inner array of records with
// different namespaces.
new ReaderWriter(NS_RECORD1, NS_RECORD2), new ReaderWriter(WITHOUT_NS, WITH_NS));
// -----------------------------------------------------------------------------------------------
/**
* The reader/writer pairs that are incompatible are now moved to specific test
* classes, one class per error case (for easier pinpointing of errors). The
* method to validate incompatibility is still here.
*/
public static void validateIncompatibleSchemas(Schema reader, Schema writer,
SchemaIncompatibilityType incompatibility, String message, String location) {
validateIncompatibleSchemas(reader, writer, Collections.singletonList(incompatibility),
Collections.singletonList(message), Collections.singletonList(location));
}
// -----------------------------------------------------------------------------------------------
public static void validateIncompatibleSchemas(Schema reader, Schema writer,
List<SchemaIncompatibilityType> incompatibilityTypes, List<String> messages, List<String> locations) {
SchemaPairCompatibility compatibility = checkReaderWriterCompatibility(reader, writer);
SchemaCompatibilityResult compatibilityResult = compatibility.getResult();
assertEquals(reader, compatibility.getReader());
assertEquals(writer, compatibility.getWriter());
assertEquals(SchemaCompatibilityType.INCOMPATIBLE, compatibilityResult.getCompatibility());
assertEquals(incompatibilityTypes.size(), compatibilityResult.getIncompatibilities().size());
for (int i = 0; i < incompatibilityTypes.size(); i++) {
Incompatibility incompatibility = compatibilityResult.getIncompatibilities().get(i);
assertSchemaContains(incompatibility.getReaderFragment(), reader);
assertSchemaContains(incompatibility.getWriterFragment(), writer);
assertEquals(incompatibilityTypes.get(i), incompatibility.getType());
assertEquals(messages.get(i), incompatibility.getMessage());
assertEquals(locations.get(i), incompatibility.getLocation());
}
String description = String.format(
"Data encoded using writer schema:%n%s%n" + "will or may fail to decode using reader schema:%n%s%n",
writer.toString(true), reader.toString(true));
assertEquals(description, compatibility.getDescription());
}
// -----------------------------------------------------------------------------------------------
/**
* Tests reader/writer compatibility validation.
*/
@Test
public void testReaderWriterCompatibility() {
for (ReaderWriter readerWriter : COMPATIBLE_READER_WRITER_TEST_CASES) {
final Schema reader = readerWriter.getReader();
final Schema writer = readerWriter.getWriter();
LOG.debug("Testing compatibility of reader {} with writer {}.", reader, writer);
final SchemaPairCompatibility result = checkReaderWriterCompatibility(reader, writer);
assertEquals(String.format("Expecting reader %s to be compatible with writer %s, but tested incompatible.",
reader, writer), SchemaCompatibilityType.COMPATIBLE, result.getType());
}
}
// -----------------------------------------------------------------------------------------------
/**
* Descriptor for a test case that encodes a datum according to a given writer
* schema, then decodes it according to reader schema and validates the decoded
* value.
*/
private static final class DecodingTestCase {
/**
* Writer schema used to encode the datum.
*/
private final Schema mWriterSchema;
/**
* Datum to encode according to the specified writer schema.
*/
private final Object mDatum;
/**
* Reader schema used to decode the datum encoded using the writer schema.
*/
private final Schema mReaderSchema;
/**
* Expected datum value when using the reader schema to decode from the writer
* schema.
*/
private final Object mDecodedDatum;
public DecodingTestCase(final Schema writerSchema, final Object datum, final Schema readerSchema,
final Object decoded) {
mWriterSchema = writerSchema;
mDatum = datum;
mReaderSchema = readerSchema;
mDecodedDatum = decoded;
}
public Schema getReaderSchema() {
return mReaderSchema;
}
public Schema getWriterSchema() {
return mWriterSchema;
}
public Object getDatum() {
return mDatum;
}
public Object getDecodedDatum() {
return mDecodedDatum;
}
}
// -----------------------------------------------------------------------------------------------
public static final List<DecodingTestCase> DECODING_COMPATIBILITY_TEST_CASES = list(
new DecodingTestCase(INT_SCHEMA, 1, INT_SCHEMA, 1), new DecodingTestCase(INT_SCHEMA, 1, LONG_SCHEMA, 1L),
new DecodingTestCase(INT_SCHEMA, 1, FLOAT_SCHEMA, 1.0f), new DecodingTestCase(INT_SCHEMA, 1, DOUBLE_SCHEMA, 1.0d),
// This is currently accepted but causes a precision loss:
// IEEE 754 floats have 24 bits signed mantissa
new DecodingTestCase(INT_SCHEMA, (1 << 24) + 1, FLOAT_SCHEMA, (float) ((1 << 24) + 1)),
// new DecodingTestCase(LONG_SCHEMA, 1L, INT_SCHEMA, 1), // should work in
// best-effort!
new DecodingTestCase(ENUM1_AB_SCHEMA, new EnumSymbol(ENUM1_AB_SCHEMA, "A"), ENUM1_ABC_SCHEMA,
new EnumSymbol(ENUM1_ABC_SCHEMA, "A")),
new DecodingTestCase(ENUM1_ABC_SCHEMA, new EnumSymbol(ENUM1_ABC_SCHEMA, "A"), ENUM1_AB_SCHEMA,
new EnumSymbol(ENUM1_AB_SCHEMA, "A")),
new DecodingTestCase(ENUM1_ABC_SCHEMA, new EnumSymbol(ENUM1_ABC_SCHEMA, "B"), ENUM1_BC_SCHEMA,
new EnumSymbol(ENUM1_BC_SCHEMA, "B")),
new DecodingTestCase(ENUM_ABC_ENUM_DEFAULT_A_SCHEMA, new EnumSymbol(ENUM_ABC_ENUM_DEFAULT_A_SCHEMA, "C"),
ENUM_AB_ENUM_DEFAULT_A_SCHEMA, new EnumSymbol(ENUM_AB_ENUM_DEFAULT_A_SCHEMA, "A")),
new DecodingTestCase(INT_STRING_UNION_SCHEMA, "the string", STRING_SCHEMA, new Utf8("the string")),
new DecodingTestCase(INT_STRING_UNION_SCHEMA, "the string", STRING_UNION_SCHEMA, new Utf8("the string")));
/**
* Tests the reader/writer compatibility at decoding time.
*/
@Test
public void testReaderWriterDecodingCompatibility() throws Exception {
for (DecodingTestCase testCase : DECODING_COMPATIBILITY_TEST_CASES) {
final Schema readerSchema = testCase.getReaderSchema();
final Schema writerSchema = testCase.getWriterSchema();
final Object datum = testCase.getDatum();
final Object expectedDecodedDatum = testCase.getDecodedDatum();
LOG.debug("Testing incompatibility of reader {} with writer {}.", readerSchema, writerSchema);
LOG.debug("Encode datum {} with writer {}.", datum, writerSchema);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
final DatumWriter<Object> datumWriter = new GenericDatumWriter<>(writerSchema);
datumWriter.write(datum, encoder);
encoder.flush();
LOG.debug("Decode datum {} whose writer is {} with reader {}.", datum, writerSchema, readerSchema);
final byte[] bytes = baos.toByteArray();
final Decoder decoder = DecoderFactory.get().resolvingDecoder(writerSchema, readerSchema,
DecoderFactory.get().binaryDecoder(bytes, null));
final DatumReader<Object> datumReader = new GenericDatumReader<>(readerSchema);
final Object decodedDatum = datumReader.read(null, decoder);
assertEquals(String.format(
"Expecting decoded value %s when decoding value %s whose writer schema is %s "
+ "using reader schema %s, but value was %s.",
expectedDecodedDatum, datum, writerSchema, readerSchema, decodedDatum), expectedDecodedDatum, decodedDatum);
}
}
private Schema readSchemaFromResources(String name) throws IOException {
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(name)) {
final String result = new BufferedReader(new InputStreamReader(inputStream)).lines()
.collect(Collectors.joining("\n"));
return new Schema.Parser().parse(result);
}
}
@Test
public void checkResolvingDecoder() throws IOException {
final Schema locationSchema = readSchemaFromResources("schema-location.json");
final Schema writeSchema = readSchemaFromResources("schema-location-write.json");
// For the read schema the long field has been removed
// And a new field has been added, called long_r2
// This one should be null.
final Schema readSchema = readSchemaFromResources("schema-location-read.json");
// Create some testdata
GenericData.Record record = new GenericData.Record(writeSchema);
GenericData.Record location = new GenericData.Record(locationSchema);
location.put("lat", 52.995143f);
location.put("long", -1.539054f);
HashMap<String, Record> locations = new HashMap<>();
locations.put("l1", location);
record.put("location", locations);
// Write the record to bytes
byte[] payload;
try (ByteArrayOutputStream bbos = new ByteArrayOutputStream()) {
DatumWriter<GenericData.Record> datumWriter = new GenericDatumWriter<>(writeSchema);
Encoder enc = EncoderFactory.get().binaryEncoder(bbos, null);
datumWriter.write(record, enc);
enc.flush();
payload = bbos.toByteArray();
}
// Read the record, and decode it using the read with the long
// And project it using the other schema with the long_r2
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<>();
reader.setSchema(writeSchema);
reader.setExpected(readSchema);
// Get the object we're looking for
GenericData.Record r = reader.read(null, decoder);
HashMap<Utf8, GenericData.Record> locs = (HashMap<Utf8, GenericData.Record>) r.get("location");
GenericData.Record loc = locs.get(new Utf8("l1"));
assertNotNull(loc.get("lat"));
// This is a new field, and should be null
assertNull(loc.get("long_r2"));
}
}