blob: 7c6bf1a180b1a83a92e6cecf531d5523f02c32ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.avro.message;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecordBuilder;
import org.junit.Assert;
import org.junit.Test;
public class TestBinaryMessageEncoding {
private static final Schema SCHEMA_V1 = SchemaBuilder.record("TestRecord").fields().requiredInt("id")
.optionalString("msg").endRecord();
private static final GenericRecordBuilder V1_BUILDER = new GenericRecordBuilder(SCHEMA_V1);
private static final List<Record> V1_RECORDS = Arrays.asList(V1_BUILDER.set("id", 1).set("msg", "m-1").build(),
V1_BUILDER.set("id", 2).set("msg", "m-2").build(), V1_BUILDER.set("id", 4).set("msg", "m-4").build(),
V1_BUILDER.set("id", 6).set("msg", "m-6").build());
private static final Schema SCHEMA_V2 = SchemaBuilder.record("TestRecord").fields().requiredLong("id").name("message")
.aliases("msg").type().optional().stringType().optionalDouble("data").endRecord();
private static final GenericRecordBuilder V2_BUILDER = new GenericRecordBuilder(SCHEMA_V2);
private static final List<Record> V2_RECORDS = Arrays.asList(
V2_BUILDER.set("id", 3L).set("message", "m-3").set("data", 12.3).build(),
V2_BUILDER.set("id", 5L).set("message", "m-5").set("data", 23.4).build(),
V2_BUILDER.set("id", 7L).set("message", "m-7").set("data", 34.5).build(),
V2_BUILDER.set("id", 8L).set("message", "m-8").set("data", 35.6).build());
@Test
public void testByteBufferRoundTrip() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
Record copy = decoder.decode(encoder.encode(V2_RECORDS.get(0)));
Assert.assertNotSame("Copy should not be the same object", copy, V2_RECORDS.get(0));
Assert.assertEquals("Record should be identical after round-trip", V2_RECORDS.get(0), copy);
}
@Test
public void testSchemaEvolution() throws Exception {
List<ByteBuffer> buffers = new ArrayList<>();
List<Record> records = new ArrayList<>();
records.addAll(V1_RECORDS);
records.addAll(V2_RECORDS);
MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1);
MessageEncoder<Record> v2Encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
for (Record record : records) {
if (record.getSchema().equals(SCHEMA_V1)) {
buffers.add(v1Encoder.encode(record));
} else {
buffers.add(v2Encoder.encode(record));
}
}
Set<Record> allAsV2 = new HashSet<>(V2_RECORDS);
allAsV2.add(V2_BUILDER.set("id", 1L).set("message", "m-1").clear("data").build());
allAsV2.add(V2_BUILDER.set("id", 2L).set("message", "m-2").clear("data").build());
allAsV2.add(V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build());
allAsV2.add(V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build());
BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
v2Decoder.addSchema(SCHEMA_V1);
Set<Record> decodedUsingV2 = new HashSet<>();
for (ByteBuffer buffer : buffers) {
decodedUsingV2.add(v2Decoder.decode(buffer));
}
Assert.assertEquals(allAsV2, decodedUsingV2);
}
@Test(expected = MissingSchemaException.class)
public void testCompatibleReadFailsWithoutSchema() throws Exception {
MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1);
BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
v2Decoder.decode(v1Buffer);
}
@Test
public void testCompatibleReadWithSchema() throws Exception {
MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1);
BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
v2Decoder.addSchema(SCHEMA_V1);
ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
Record record = v2Decoder.decode(v1Buffer);
Assert.assertEquals(V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build(), record);
}
@Test
public void testCompatibleReadWithSchemaFromLookup() throws Exception {
MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1);
SchemaStore.Cache schemaCache = new SchemaStore.Cache();
schemaCache.addSchema(SCHEMA_V1);
BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2, schemaCache);
ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(2));
Record record = v2Decoder.decode(v1Buffer);
Assert.assertEquals(V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build(), record);
}
@Test
public void testIdenticalReadWithSchemaFromLookup() throws Exception {
MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1);
SchemaStore.Cache schemaCache = new SchemaStore.Cache();
schemaCache.addSchema(SCHEMA_V1);
// The null readSchema should not throw an NPE, but trigger the
// BinaryMessageEncoder to use the write schema as read schema
BinaryMessageDecoder<Record> genericDecoder = new BinaryMessageDecoder<>(GenericData.get(), null, schemaCache);
ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(2));
Record record = genericDecoder.decode(v1Buffer);
Assert.assertEquals(V1_RECORDS.get(2), record);
}
@Test
public void testBufferReuse() throws Exception {
// This test depends on the serialized version of record 1 being smaller or
// the same size as record 0 so that the reused ByteArrayOutputStream won't
// expand its internal buffer.
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1, false);
ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
Assert.assertEquals(b0.array(), b1.array());
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V1);
Assert.assertEquals("Buffer was reused, decode(b0) should be record 1", V1_RECORDS.get(1), decoder.decode(b0));
}
@Test
public void testBufferCopy() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V1);
ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
Assert.assertNotEquals(b0.array(), b1.array());
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V1);
// bytes are not changed by reusing the encoder
Assert.assertEquals("Buffer was copied, decode(b0) should be record 0", V1_RECORDS.get(0), decoder.decode(b0));
}
@Test(expected = AvroRuntimeException.class)
public void testByteBufferMissingPayload() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
buffer.limit(12);
decoder.decode(buffer);
}
@Test(expected = BadHeaderException.class)
public void testByteBufferMissingFullHeader() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
buffer.limit(8);
decoder.decode(buffer);
}
@Test(expected = BadHeaderException.class)
public void testByteBufferBadMarkerByte() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
buffer.array()[0] = 0x00;
decoder.decode(buffer);
}
@Test(expected = BadHeaderException.class)
public void testByteBufferBadVersionByte() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
buffer.array()[1] = 0x00;
decoder.decode(buffer);
}
@Test(expected = MissingSchemaException.class)
public void testByteBufferUnknownSchema() throws Exception {
MessageEncoder<Record> encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA_V2);
MessageDecoder<Record> decoder = new BinaryMessageDecoder<>(GenericData.get(), SCHEMA_V2);
ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
buffer.array()[4] = 0x00;
decoder.decode(buffer);
}
}