blob: 0012876e034c4b62fe51e111e899ca7360fc599d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.EnumSymbol;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestReadingWritingDataInEvolvedSchemas {
private static final String RECORD_A = "RecordA";
private static final String FIELD_A = "fieldA";
private static final char LATIN_SMALL_LETTER_O_WITH_DIARESIS = '\u00F6';
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final Schema DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().doubleType().noDefault() //
.endRecord();
private static final Schema FLOAT_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().floatType().noDefault() //
.endRecord();
private static final Schema LONG_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().longType().noDefault() //
.endRecord();
private static final Schema INT_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().intType().noDefault() //
.endRecord();
private static final Schema UNION_INT_LONG_FLOAT_DOUBLE_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().unionOf().doubleType().and().floatType().and().longType().and().intType().endUnion()
.noDefault() //
.endRecord();
private static final Schema STRING_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().stringType().noDefault() //
.endRecord();
private static final Schema BYTES_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().bytesType().noDefault() //
.endRecord();
private static final Schema UNION_STRING_BYTES_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().unionOf().stringType().and().bytesType().endUnion()
.noDefault() //
.endRecord();
private static final Schema ENUM_AB_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().enumeration("Enum1").symbols("A", "B").noDefault() //
.endRecord();
private static final Schema ENUM_ABC_RECORD = SchemaBuilder.record(RECORD_A) //
.fields() //
.name(FIELD_A).type().enumeration("Enum1").symbols("A", "B", "C").noDefault() //
.endRecord();
@Test
public void doubleWrittenWithUnionSchemaIsConvertedToDoubleSchema() throws Exception {
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
byte[] encoded = encodeGenericBlob(record);
Record decoded = decodeGenericBlob(DOUBLE_RECORD, writer, encoded);
assertEquals(42.0, decoded.get(FIELD_A));
}
@Test
public void doubleWrittenWithUnionSchemaIsNotConvertedToFloatSchema() throws Exception {
expectedException.expect(AvroTypeException.class);
expectedException.expectMessage("Found double, expecting float");
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0);
byte[] encoded = encodeGenericBlob(record);
decodeGenericBlob(FLOAT_RECORD, writer, encoded);
}
@Test
public void floatWrittenWithUnionSchemaIsNotConvertedToLongSchema() throws Exception {
expectedException.expect(AvroTypeException.class);
expectedException.expectMessage("Found float, expecting long");
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, 42.0f);
byte[] encoded = encodeGenericBlob(record);
decodeGenericBlob(LONG_RECORD, writer, encoded);
}
@Test
public void longWrittenWithUnionSchemaIsNotConvertedToIntSchema() throws Exception {
expectedException.expect(AvroTypeException.class);
expectedException.expectMessage("Found long, expecting int");
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, 42L);
byte[] encoded = encodeGenericBlob(record);
decodeGenericBlob(INT_RECORD, writer, encoded);
}
@Test
public void intWrittenWithUnionSchemaIsConvertedToAllNumberSchemas() throws Exception {
Schema writer = UNION_INT_LONG_FLOAT_DOUBLE_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, 42);
byte[] encoded = encodeGenericBlob(record);
assertEquals(42.0, decodeGenericBlob(DOUBLE_RECORD, writer, encoded).get(FIELD_A));
assertEquals(42.0f, decodeGenericBlob(FLOAT_RECORD, writer, encoded).get(FIELD_A));
assertEquals(42L, decodeGenericBlob(LONG_RECORD, writer, encoded).get(FIELD_A));
assertEquals(42, decodeGenericBlob(INT_RECORD, writer, encoded).get(FIELD_A));
}
@Test
public void asciiStringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception {
Schema writer = UNION_STRING_BYTES_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, "42");
byte[] encoded = encodeGenericBlob(record);
ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A);
assertArrayEquals("42".getBytes("UTF-8"), actual.array());
}
@Test
public void utf8StringWrittenWithUnionSchemaIsConvertedToBytesSchema() throws Exception {
String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS);
Schema writer = UNION_STRING_BYTES_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, goeran);
byte[] encoded = encodeGenericBlob(record);
ByteBuffer actual = (ByteBuffer) decodeGenericBlob(BYTES_RECORD, writer, encoded).get(FIELD_A);
assertArrayEquals(goeran.getBytes("UTF-8"), actual.array());
}
@Test
public void asciiBytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception {
Schema writer = UNION_STRING_BYTES_RECORD;
ByteBuffer buf = ByteBuffer.wrap("42".getBytes("UTF-8"));
Record record = defaultRecordWithSchema(writer, FIELD_A, buf);
byte[] encoded = encodeGenericBlob(record);
CharSequence read = (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A);
assertEquals("42", read.toString());
}
@Test
public void utf8BytesWrittenWithUnionSchemaIsConvertedToStringSchema() throws Exception {
String goeran = String.format("G%sran", LATIN_SMALL_LETTER_O_WITH_DIARESIS);
Schema writer = UNION_STRING_BYTES_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, goeran);
byte[] encoded = encodeGenericBlob(record);
CharSequence read = (CharSequence) decodeGenericBlob(STRING_RECORD, writer, encoded).get(FIELD_A);
assertEquals(goeran, read.toString());
}
@Test
public void enumRecordCanBeReadWithExtendedEnumSchema() throws Exception {
Schema writer = ENUM_AB_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A"));
byte[] encoded = encodeGenericBlob(record);
Record decoded = decodeGenericBlob(ENUM_ABC_RECORD, writer, encoded);
assertEquals("A", decoded.get(FIELD_A).toString());
}
@Test
public void enumRecordWithExtendedSchemaCanBeReadWithOriginalEnumSchemaIfOnlyOldValues() throws Exception {
Schema writer = ENUM_ABC_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "A"));
byte[] encoded = encodeGenericBlob(record);
Record decoded = decodeGenericBlob(ENUM_AB_RECORD, writer, encoded);
assertEquals("A", decoded.get(FIELD_A).toString());
}
@Test
public void enumRecordWithExtendedSchemaCanNotBeReadIfNewValuesAreUsed() throws Exception {
expectedException.expect(AvroTypeException.class);
expectedException.expectMessage("No match for C");
Schema writer = ENUM_ABC_RECORD;
Record record = defaultRecordWithSchema(writer, FIELD_A, new EnumSymbol(writer, "C"));
byte[] encoded = encodeGenericBlob(record);
decodeGenericBlob(ENUM_AB_RECORD, writer, encoded);
}
@Test
public void recordWrittenWithExtendedSchemaCanBeReadWithOriginalSchemaButLossOfData() throws Exception {
Schema writer = SchemaBuilder.record(RECORD_A) //
.fields() //
.name("newTopField").type().stringType().noDefault() //
.name(FIELD_A).type().intType().noDefault() //
.endRecord();
Record record = defaultRecordWithSchema(writer, FIELD_A, 42);
record.put("newTopField", "not decoded");
byte[] encoded = encodeGenericBlob(record);
Record decoded = decodeGenericBlob(INT_RECORD, writer, encoded);
assertEquals(42, decoded.get(FIELD_A));
assertNull(decoded.get("newTopField"));
}
@Test
public void readerWithoutDefaultValueThrowsException() throws Exception {
expectedException.expect(AvroTypeException.class);
expectedException.expectMessage("missing required field newField");
Schema reader = SchemaBuilder.record(RECORD_A) //
.fields() //
.name("newField").type().intType().noDefault() //
.name(FIELD_A).type().intType().noDefault() //
.endRecord();
Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
byte[] encoded = encodeGenericBlob(record);
decodeGenericBlob(reader, INT_RECORD, encoded);
}
@Test
public void readerWithDefaultValueIsApplied() throws Exception {
Schema reader = SchemaBuilder.record(RECORD_A) //
.fields() //
.name("newFieldWithDefault").type().intType().intDefault(314) //
.name(FIELD_A).type().intType().noDefault() //
.endRecord();
Record record = defaultRecordWithSchema(INT_RECORD, FIELD_A, 42);
byte[] encoded = encodeGenericBlob(record);
Record decoded = decodeGenericBlob(reader, INT_RECORD, encoded);
assertEquals(42, decoded.get(FIELD_A));
assertEquals(314, decoded.get("newFieldWithDefault"));
}
private <T> Record defaultRecordWithSchema(Schema schema, String key, T value) {
Record data = new GenericData.Record(schema);
data.put(key, value);
return data;
}
private static byte[] encodeGenericBlob(GenericRecord data)
throws IOException {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(data.getSchema());
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(outStream, null);
writer.write(data, encoder);
encoder.flush();
outStream.close();
return outStream.toByteArray();
}
private static Record decodeGenericBlob(Schema expectedSchema, Schema schemaOfBlob, byte[] blob) throws IOException {
if (blob == null) {
return null;
}
GenericDatumReader<Record> reader = new GenericDatumReader<Record>();
reader.setExpected(expectedSchema);
reader.setSchema(schemaOfBlob);
Decoder decoder = DecoderFactory.get().binaryDecoder(blob, null);
Record data = null;
data = reader.read(null, decoder);
return data;
}
}