AVRO-1704: Java: Add support for single-message encoding.
diff --git a/CHANGES.txt b/CHANGES.txt
index 0dfbdfc..a94ed17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
NEW FEATURES
+ AVRO-1704: Java: Add support for single-message encoding. (blue)
+
OPTIMIZATIONS
IMPROVEMENTS
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
new file mode 100644
index 0000000..38c0001
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * Exception thrown by a {@link MessageDecoder} when a message header is not
+ * recognized.
+ * <p>
+ * This usually indicates that the encoded bytes were not an Avro message.
+ */
+public class BadHeaderException extends AvroRuntimeException {
+ public BadHeaderException(String message) {
+ super(message);
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
new file mode 100644
index 0000000..11a7336
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.MapMaker;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+
+/**
+ * A {@link MessageDecoder} that reads a binary-encoded datum. This checks for
+ * the datum header and decodes the payload with the schema that corresponds to
+ * the 8-byte schema fingerprint.
+ * <p>
+ * Instances can decode message payloads for known {@link Schema schemas}, which
+ * are schemas added using {@link #addSchema(Schema)}, schemas resolved by the
+ * {@link SchemaStore} passed to the constructor, or the expected schema passed
+ * to the constructor. Messages encoded using an unknown schema will cause
+ * instances to throw a {@link MissingSchemaException}.
+ * <p>
+ * It is safe to continue using instances of this class after {@link #decode}
+ * throws {@link BadHeaderException} or {@link MissingSchemaException}.
+ * <p>
+ * This class is thread-safe.
+ */
+public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
+
+ private static final ThreadLocal<byte[]> HEADER_BUFFER =
+ new ThreadLocal<byte[]>() {
+ @Override
+ protected byte[] initialValue() {
+ return new byte[10];
+ }
+ };
+
+ private static final ThreadLocal<ByteBuffer> FP_BUFFER =
+ new ThreadLocal<ByteBuffer>() {
+ @Override
+ protected ByteBuffer initialValue() {
+ byte[] header = HEADER_BUFFER.get();
+ return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
+ }
+ };
+
+ private final GenericData model;
+ private final Schema readSchema;
+ private final SchemaStore resolver;
+
+ private final Map<Long, RawMessageDecoder<D>> codecByFingerprint =
+ new MapMaker().makeMap();
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * The {@code readSchema} is as used the expected schema (read schema). Datum
+ * instances created by this class will are described by the expected schema.
+ * <p>
+ * The schema used to decode incoming buffers is determined by the schema
+ * fingerprint encoded in the message header. This class can decode messages
+ * that were encoded using the {@code readSchema} and other schemas that are
+ * added using {@link #addSchema(Schema)}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param readSchema the {@link Schema} used to construct datum instances
+ */
+ public BinaryMessageDecoder(GenericData model, Schema readSchema) {
+ this(model, readSchema, null);
+ }
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * The {@code readSchema} is used as the expected schema (read schema). Datum
+ * instances created by this class will are described by the expected schema.
+ * <p>
+ * The schema used to decode incoming buffers is determined by the schema
+ * fingerprint encoded in the message header. This class can decode messages
+ * that were encoded using the {@code readSchema}, other schemas that are
+ * added using {@link #addSchema(Schema)}, or schemas returned by the
+ * {@code resolver}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param readSchema the {@link Schema} used to construct datum instances
+ * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+ */
+ public BinaryMessageDecoder(GenericData model, Schema readSchema,
+ SchemaStore resolver) {
+ this.model = model;
+ this.readSchema = readSchema;
+ this.resolver = resolver;
+ addSchema(readSchema);
+ }
+
+ /**
+ * Adds a {@link Schema} that can be used to decode buffers.
+ *
+ * @param writeSchema a {@link Schema} to use when decoding buffers
+ */
+ public void addSchema(Schema writeSchema) {
+ long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
+ codecByFingerprint.put(fp,
+ new RawMessageDecoder<D>(model, writeSchema, readSchema));
+ }
+
+ private RawMessageDecoder<D> getDecoder(long fp) {
+ RawMessageDecoder<D> decoder = codecByFingerprint.get(fp);
+ if (decoder != null) {
+ return decoder;
+ }
+
+ if (resolver != null) {
+ Schema writeSchema = resolver.findByFingerprint(fp);
+ if (writeSchema != null) {
+ addSchema(writeSchema);
+ return codecByFingerprint.get(fp);
+ }
+ }
+
+ throw new MissingSchemaException(
+ "Cannot resolve schema for fingerprint: " + fp);
+ }
+
+ @Override
+ public D decode(InputStream stream, D reuse) throws IOException {
+ byte[] header = HEADER_BUFFER.get();
+ try {
+ if (!readFully(stream, header)) {
+ throw new BadHeaderException("Not enough header bytes");
+ }
+ } catch (IOException e) {
+ throw new IOException("Failed to read header and fingerprint bytes", e);
+ }
+
+ if (! (BinaryMessageEncoder.V1_HEADER[0] == header[0])
+ && BinaryMessageEncoder.V1_HEADER[1] == header[1]) {
+ throw new BadHeaderException(String.format(
+ "Unrecognized header bytes: 0x%h%h",
+ header[0], header[1]));
+ }
+
+ RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));
+
+ return decoder.decode(stream, reuse);
+ }
+
+ /**
+ * Reads a buffer from a stream, making multiple read calls if necessary.
+ *
+ * @param stream an InputStream to read from
+ * @param bytes a buffer
+ * @return true if the buffer is complete, false otherwise (stream ended)
+ * @throws IOException
+ */
+ private boolean readFully(InputStream stream, byte[] bytes)
+ throws IOException {
+ int pos = 0;
+ int bytesRead;
+ while ((bytes.length - pos) > 0 &&
+ (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) {
+ pos += bytesRead;
+ }
+ return (pos == bytes.length);
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
new file mode 100644
index 0000000..3cf3d0c
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.primitives.Bytes;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * A {@link MessageEncoder} that adds a header and 8-byte schema fingerprint to
+ * each datum encoded as binary.
+ * <p>
+ * This class is thread-safe.
+ */
+public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
+
+ static final byte[] V1_HEADER = new byte[] {(byte) 0xC3, (byte) 0x01};
+
+ private final RawMessageEncoder<D> writeCodec;
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * Buffers returned by {@link #encode(D)} are copied and will not be modified
+ * by future calls to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ */
+ public BinaryMessageEncoder(GenericData model, Schema schema) {
+ this(model, schema, true);
+ }
+
+ /**
+ * Creates a new {@link BinaryMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)}
+ * are copied and will not be modified by future calls to {@code encode}.
+ * <p>
+ * If {@code shouldCopy} is false, then buffers returned by {@code encode}
+ * wrap a thread-local buffer that can be reused by future calls to
+ * {@code encode}, but may not be. Callers should only set {@code shouldCopy}
+ * to false if the buffer will be copied before the current thread's next call
+ * to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ * @param shouldCopy whether to copy buffers before returning encoded results
+ */
+ public BinaryMessageEncoder(GenericData model, Schema schema,
+ boolean shouldCopy) {
+ this.writeCodec = new V1MessageEncoder<D>(model, schema, shouldCopy);
+ }
+
+ @Override
+ public ByteBuffer encode(D datum) throws IOException {
+ return writeCodec.encode(datum);
+ }
+
+ @Override
+ public void encode(D datum, OutputStream stream) throws IOException {
+ writeCodec.encode(datum, stream);
+ }
+
+ /**
+ * This is a RawDatumEncoder that adds the V1 header to the outgoing buffer.
+ * BinaryDatumEncoder wraps this class to avoid confusion over what it does.
+ * It should not have an "is a" relationship with RawDatumEncoder because it
+ * adds the extra bytes.
+ */
+ private static class V1MessageEncoder<D> extends RawMessageEncoder<D> {
+ private final byte[] headerBytes;
+
+ V1MessageEncoder(GenericData model, Schema schema, boolean shouldCopy) {
+ super(model, schema, shouldCopy);
+ this.headerBytes = getWriteHeader(schema);
+ }
+
+ @Override
+ public void encode(D datum, OutputStream stream) throws IOException {
+ stream.write(headerBytes);
+ super.encode(datum, stream);
+ }
+
+ private static byte[] getWriteHeader(Schema schema) {
+ try {
+ byte[] fp = SchemaNormalization
+ .parsingFingerprint("CRC-64-AVRO", schema);
+ return Bytes.concat(V1_HEADER, fp);
+ } catch (NoSuchAlgorithmException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
new file mode 100644
index 0000000..bc86d12
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.util.ReusableByteArrayInputStream;
+import org.apache.avro.util.ReusableByteBufferInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Deserializes a single datum from a ByteBuffer, byte array, or InputStream.
+ * @param <D> a datum class
+ */
+public interface MessageDecoder<D> {
+
+ /**
+ * Deserialize a single datum from an InputStream.
+ *
+ * @param stream stream to read from
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(InputStream stream) throws IOException;
+
+ /**
+ * Deserialize a single datum from an InputStream.
+ *
+ * @param stream stream to read from
+ * @param reuse a datum instance to reuse, avoiding instantiation if possible
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(InputStream stream, D reuse) throws IOException;
+
+ /**
+ * Deserialize a single datum from a ByteBuffer.
+ *
+ * @param encoded a ByteBuffer containing an encoded datum
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(ByteBuffer encoded) throws IOException;
+
+ /**
+ * Deserialize a single datum from a ByteBuffer.
+ *
+ * @param encoded a ByteBuffer containing an encoded datum
+ * @param reuse a datum instance to reuse, avoiding instantiation if possible
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(ByteBuffer encoded, D reuse) throws IOException;
+
+ /**
+ * Deserialize a single datum from a byte array.
+ *
+ * @param encoded a byte array containing an encoded datum
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(byte[] encoded) throws IOException;
+
+ /**
+ * Deserialize a single datum from a byte array.
+ *
+ * @param encoded a byte array containing an encoded datum
+ * @param reuse a datum instance to reuse, avoiding instantiation if possible
+ * @return a datum read from the stream
+ * @throws BadHeaderException If the payload's header is not recognized.
+ * @throws MissingSchemaException If the payload's schema cannot be found.
+ * @throws IOException
+ */
+ D decode(byte[] encoded, D reuse) throws IOException;
+
+ /**
+ * Base class for {@link MessageEncoder} implementations that provides default
+ * implementations for most of the {@code DatumEncoder} API.
+ * <p>
+ * Implementations provided by this base class are thread-safe.
+ *
+ * @param <D> a datum class
+ */
+ abstract class BaseDecoder<D> implements MessageDecoder<D> {
+
+ private static final ThreadLocal<ReusableByteArrayInputStream>
+ BYTE_ARRAY_IN = new ThreadLocal<ReusableByteArrayInputStream>() {
+ @Override
+ protected ReusableByteArrayInputStream initialValue() {
+ return new ReusableByteArrayInputStream();
+ }
+ };
+
+ private static final ThreadLocal<ReusableByteBufferInputStream>
+ BYTE_BUFFER_IN = new ThreadLocal<ReusableByteBufferInputStream>() {
+ @Override
+ protected ReusableByteBufferInputStream initialValue() {
+ return new ReusableByteBufferInputStream();
+ }
+ };
+
+ @Override
+ public D decode(InputStream stream) throws IOException {
+ return decode(stream, null);
+ }
+
+ @Override
+ public D decode(ByteBuffer encoded) throws IOException {
+ return decode(encoded, null);
+ }
+
+ @Override
+ public D decode(byte[] encoded) throws IOException {
+ return decode(encoded, null);
+ }
+
+ @Override
+ public D decode(ByteBuffer encoded, D reuse) throws IOException {
+ ReusableByteBufferInputStream in = BYTE_BUFFER_IN.get();
+ in.setByteBuffer(encoded);
+ return decode(in, reuse);
+ }
+
+ @Override
+ public D decode(byte[] encoded, D reuse) throws IOException {
+ ReusableByteArrayInputStream in = BYTE_ARRAY_IN.get();
+ in.setByteArray(encoded, 0, encoded.length);
+ return decode(in, reuse);
+ }
+
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
new file mode 100644
index 0000000..60bfb79
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializes an individual datum as a ByteBuffer or to an OutputStream.
+ * @param <D> a datum class
+ */
+public interface MessageEncoder<D> {
+
+ /**
+ * Serialize a single datum to a ByteBuffer.
+ *
+ * @param datum a datum
+ * @return a ByteBuffer containing the serialized datum
+ * @throws IOException
+ */
+ ByteBuffer encode(D datum) throws IOException;
+
+ /**
+ * Serialize a single datum to an OutputStream.
+ *
+ * @param datum a datum
+ * @param stream an OutputStream to serialize the datum to
+ * @throws IOException
+ */
+ void encode(D datum, OutputStream stream) throws IOException;
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
new file mode 100644
index 0000000..a3b89fd
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * Exception thrown by a {@link MessageDecoder} when the message is encoded
+ * using an unknown {@link org.apache.avro.Schema}.
+ * <p>
+ * Using a {@link SchemaStore} to provide schemas to the decoder can avoid this
+ * problem.
+ */
+public class MissingSchemaException extends AvroRuntimeException {
+ public MissingSchemaException(String message) {
+ super(message);
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
new file mode 100644
index 0000000..52a7c2e
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link MessageDecoder} that deserializes from raw datum bytes.
+ * <p>
+ * This class uses the schema passed to its constructor when decoding buffers.
+ * To decode buffers that have different schemas, use
+ * {@link BinaryMessageEncoder} and {@link BinaryMessageDecoder}.
+ * <p>
+ * This will not throw {@link BadHeaderException} because it expects no header,
+ * and will not throw {@link MissingSchemaException} because it always uses the
+ * read schema from its constructor.
+ * <p>
+ * This class is thread-safe.
+ */
+public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
+
+ private static final ThreadLocal<BinaryDecoder> DECODER =
+ new ThreadLocal<BinaryDecoder>();
+
+ private final Schema writeSchema;
+ private final Schema readSchema;
+ private final DatumReader<D> reader;
+
+ /**
+ * Creates a new {@link RawMessageDecoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * The {@code schema} is used as both the expected schema (read schema) and
+ * for the schema of payloads that are decoded (written schema).
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} used to construct datum instances and to
+ * decode buffers.
+ */
+ public RawMessageDecoder(GenericData model, Schema schema) {
+ this(model, schema, schema);
+ }
+
+ /**
+ * Creates a new {@link RawMessageDecoder} that uses the given
+ * {@link GenericData data model} to construct datum instances described by
+ * the {@link Schema readSchema}.
+ * <p>
+ * The {@code readSchema} is used for the expected schema and the
+ * {@code writeSchema} is the schema used to decode buffers. The
+ * {@code writeSchema} must be the schema that was used to encode all buffers
+ * decoded by this class.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param readSchema the {@link Schema} used to construct datum instances
+ * @param writeSchema the {@link Schema} used to decode buffers
+ */
+ public RawMessageDecoder(GenericData model, Schema writeSchema,
+ Schema readSchema) {
+ this.writeSchema = writeSchema;
+ this.readSchema = readSchema;
+ this.reader = model.createDatumReader(this.writeSchema, this.readSchema);
+ }
+
+ @Override
+ public D decode(InputStream stream, D reuse) {
+ BinaryDecoder decoder = DecoderFactory.get()
+ .directBinaryDecoder(stream, DECODER.get());
+ DECODER.set(decoder);
+ try {
+ return reader.read(reuse, decoder);
+ } catch (IOException e) {
+ throw new AvroRuntimeException("Decoding datum failed", e);
+ }
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
new file mode 100644
index 0000000..07ed861
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link MessageEncoder} that encodes only a datum's bytes, without additional
+ * information (such as a schema fingerprint).
+ * <p>
+ * This class is thread-safe.
+ */
+public class RawMessageEncoder<D> implements MessageEncoder<D> {
+
+ private static final ThreadLocal<BufferOutputStream> TEMP =
+ new ThreadLocal<BufferOutputStream>() {
+ @Override
+ protected BufferOutputStream initialValue() {
+ return new BufferOutputStream();
+ }
+ };
+
+ private static final ThreadLocal<BinaryEncoder> ENCODER =
+ new ThreadLocal<BinaryEncoder>();
+
+ private final Schema writeSchema;
+ private final boolean copyOutputBytes;
+ private final DatumWriter<D> writer;
+
+ /**
+ * Creates a new {@link RawMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * Buffers returned by {@link #encode(D)} are copied and will not be modified
+ * by future calls to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ */
+ public RawMessageEncoder(GenericData model, Schema schema) {
+ this(model, schema, true);
+ }
+
+ /**
+ * Creates a new {@link RawMessageEncoder} that uses the given
+ * {@link GenericData data model} to deconstruct datum instances described by
+ * the {@link Schema schema}.
+ * <p>
+ * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)}
+ * are copied and will not be modified by future calls to {@code encode}.
+ * <p>
+ * If {@code shouldCopy} is false, then buffers returned by {@code encode}
+ * wrap a thread-local buffer that can be reused by future calls to
+ * {@code encode}, but may not be. Callers should only set {@code shouldCopy}
+ * to false if the buffer will be copied before the current thread's next call
+ * to {@code encode}.
+ *
+ * @param model the {@link GenericData data model} for datum instances
+ * @param schema the {@link Schema} for datum instances
+ * @param shouldCopy whether to copy buffers before returning encoded results
+ */
+ public RawMessageEncoder(GenericData model, Schema schema, boolean shouldCopy) {
+ this.writeSchema = schema;
+ this.copyOutputBytes = shouldCopy;
+ this.writer = model.createDatumWriter(this.writeSchema);
+ }
+
+ @Override
+ public ByteBuffer encode(D datum) throws IOException {
+ BufferOutputStream temp = TEMP.get();
+ temp.reset();
+
+ encode(datum, temp);
+
+ if (copyOutputBytes) {
+ return temp.toBufferWithCopy();
+ } else {
+ return temp.toBufferWithoutCopy();
+ }
+ }
+
+ @Override
+ public void encode(D datum, OutputStream stream) throws IOException {
+ BinaryEncoder encoder = EncoderFactory.get()
+ .directBinaryEncoder(stream, ENCODER.get());
+ ENCODER.set(encoder);
+ writer.write(datum, encoder);
+ encoder.flush();
+ }
+
+ private static class BufferOutputStream extends ByteArrayOutputStream {
+ BufferOutputStream() {
+ }
+
+ ByteBuffer toBufferWithoutCopy() {
+ return ByteBuffer.wrap(buf, 0, count);
+ }
+
+ ByteBuffer toBufferWithCopy() {
+ return ByteBuffer.wrap(toByteArray());
+ }
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
new file mode 100644
index 0000000..6e89b52
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.MapMaker;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import java.util.Map;
+
+/**
+ * Interface for classes that can provide avro schemas by fingerprint.
+ */
+public interface SchemaStore {
+
+ /**
+ * Retrieves a fingerprint by its AVRO-CRC-64 fingerprint.
+ * @param fingerprint an AVRO-CRC-64 fingerprint long
+ * @return a Schema with the given fingerprint, or null
+ */
+ Schema findByFingerprint(long fingerprint);
+
+ /**
+ * A map-based cache of schemas by AVRO-CRC-64 fingerprint.
+ * <p>
+ * This class is thread-safe.
+ */
+ class Cache implements SchemaStore {
+ private final Map<Long, Schema> schemas = new MapMaker().makeMap();
+
+ /**
+ * Adds a schema to this cache that can be retrieved using its AVRO-CRC-64
+ * fingerprint.
+ *
+ * @param schema a {@link Schema}
+ */
+ public void addSchema(Schema schema) {
+ long fp = SchemaNormalization.parsingFingerprint64(schema);
+ schemas.put(fp, schema);
+ }
+
+ @Override
+ public Schema findByFingerprint(long fingerprint) {
+ return schemas.get(fingerprint);
+ }
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
new file mode 100644
index 0000000..6fd2ae4
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.util;
+
+import java.io.ByteArrayInputStream;
+
+public class ReusableByteArrayInputStream extends ByteArrayInputStream {
+ public ReusableByteArrayInputStream() {
+ super(new byte[0]);
+ }
+
+ public void setByteArray(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
new file mode 100644
index 0000000..eff7fdc
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ReusableByteBufferInputStream extends InputStream {
+
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+ private ByteBuffer buffer = EMPTY_BUFFER;
+ private int mark = 0;
+
+ public void setByteBuffer(ByteBuffer buf) {
+ // do not modify the buffer that is passed in
+ this.buffer = buf.duplicate();
+ this.mark = buf.position();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (buffer.hasRemaining()) {
+ return buffer.get() & 0xff;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (buffer.remaining() <= 0) {
+ return -1;
+ }
+ // allow IndexOutOfBoundsException to be thrown by ByteBuffer#get
+ int bytesToRead = Math.min(len, buffer.remaining());
+ buffer.get(b, off, bytesToRead);
+ return bytesToRead;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
+ // n may be negative and results in skipping 0 bytes, according to javadoc
+ return 0;
+ }
+
+ // this catches n > Integer.MAX_VALUE
+ int bytesToSkip = n > buffer.remaining() ? buffer.remaining() : (int) n;
+ buffer.position(buffer.position() + bytesToSkip);
+ return bytesToSkip;
+ }
+
+ @Override
+ public synchronized void mark(int readLimit) {
+ // readLimit is ignored. there is no requirement to implement readLimit, it
+ // is a way for implementations to avoid buffering too much. since all data
+ // for this stream is held in memory, this has no need for such a limit.
+ this.mark = buffer.position();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ buffer.position(mark);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
new file mode 100644
index 0000000..47656b8
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class TestBinaryMessageEncoding {
+ public static final Schema SCHEMA_V1 = SchemaBuilder.record("TestRecord")
+ .fields()
+ .requiredInt("id")
+ .optionalString("msg")
+ .endRecord();
+
+ public static final GenericRecordBuilder V1_BUILDER =
+ new GenericRecordBuilder(SCHEMA_V1);
+
+ public static final List<Record> V1_RECORDS = Arrays.asList(
+ V1_BUILDER.set("id", 1).set("msg", "m-1").build(),
+ V1_BUILDER.set("id", 2).set("msg", "m-2").build(),
+ V1_BUILDER.set("id", 4).set("msg", "m-4").build(),
+ V1_BUILDER.set("id", 6).set("msg", "m-6").build()
+ );
+
+ public static final Schema SCHEMA_V2 = SchemaBuilder.record("TestRecord")
+ .fields()
+ .requiredLong("id")
+ .name("message").aliases("msg").type().optional().stringType()
+ .optionalDouble("data")
+ .endRecord();
+
+ public static final GenericRecordBuilder V2_BUILDER =
+ new GenericRecordBuilder(SCHEMA_V2);
+
+ public static final List<Record> V2_RECORDS = Arrays.asList(
+ V2_BUILDER.set("id", 3L).set("message", "m-3").set("data", 12.3).build(),
+ V2_BUILDER.set("id", 5L).set("message", "m-5").set("data", 23.4).build(),
+ V2_BUILDER.set("id", 7L).set("message", "m-7").set("data", 34.5).build(),
+ V2_BUILDER.set("id", 8L).set("message", "m-8").set("data", 35.6).build()
+ );
+
+ @Test
+ public void testByteBufferRoundTrip() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ Record copy = decoder.decode(encoder.encode(V2_RECORDS.get(0)));
+
+ Assert.assertTrue("Copy should not be the same object",
+ copy != V2_RECORDS.get(0));
+ Assert.assertEquals("Record should be identical after round-trip",
+ V2_RECORDS.get(0), copy);
+ }
+
+ @Test
+ public void testSchemaEvolution() throws Exception {
+ List<ByteBuffer> buffers = Lists.newArrayList();
+ List<Record> records = Ordering.usingToString().sortedCopy(
+ Iterables.concat(V1_RECORDS, V2_RECORDS));
+
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ MessageEncoder<Record> v2Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ for (Record record : records) {
+ if (record.getSchema() == SCHEMA_V1) {
+ buffers.add(v1Encoder.encode(record));
+ } else {
+ buffers.add(v2Encoder.encode(record));
+ }
+ }
+
+ Set<Record> allAsV2 = Sets.newHashSet(V2_RECORDS);
+ allAsV2.add(
+ V2_BUILDER.set("id", 1L).set("message", "m-1").clear("data").build());
+ allAsV2.add(
+ V2_BUILDER.set("id", 2L).set("message", "m-2").clear("data").build());
+ allAsV2.add(
+ V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build());
+ allAsV2.add(
+ V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build());
+
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ v2Decoder.addSchema(SCHEMA_V1);
+
+ Set<Record> decodedUsingV2 = Sets.newHashSet();
+ for (ByteBuffer buffer : buffers) {
+ decodedUsingV2.add(v2Decoder.decode(buffer));
+ }
+
+ Assert.assertEquals(allAsV2, decodedUsingV2);
+ }
+
+ @Test(expected = MissingSchemaException.class)
+ public void testCompatibleReadFailsWithoutSchema() throws Exception {
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
+
+ v2Decoder.decode(v1Buffer);
+ }
+
+ @Test
+ public void testCompatibleReadWithSchema() throws Exception {
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ v2Decoder.addSchema(SCHEMA_V1);
+
+ ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
+
+ Record record = v2Decoder.decode(v1Buffer);
+
+ Assert.assertEquals(
+ V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build(),
+ record);
+ }
+
+ @Test
+ public void testCompatibleReadWithSchemaFromLookup() throws Exception {
+ MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+
+ SchemaStore.Cache schemaCache = new SchemaStore.Cache();
+ schemaCache.addSchema(SCHEMA_V1);
+ BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2, schemaCache);
+
+ ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(2));
+
+ Record record = v2Decoder.decode(v1Buffer);
+
+ Assert.assertEquals(
+ V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build(),
+ record);
+ }
+
+ @Test
+ public void testBufferReuse() throws Exception {
+ // This test depends on the serialized version of record 1 being smaller or
+ // the same size as record 0 so that the reused ByteArrayOutputStream won't
+ // expand its internal buffer.
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1, false);
+
+ ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
+ ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
+
+ Assert.assertEquals(b0.array(), b1.array());
+
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ Assert.assertEquals("Buffer was reused, decode(b0) should be record 1",
+ V1_RECORDS.get(1), decoder.decode(b0));
+ }
+
+ @Test
+ public void testBufferCopy() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+
+ ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
+ ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
+
+ Assert.assertNotEquals(b0.array(), b1.array());
+
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V1);
+ // bytes are not changed by reusing the encoder
+ Assert.assertEquals("Buffer was copied, decode(b0) should be record 0",
+ V1_RECORDS.get(0), decoder.decode(b0));
+ }
+
+ @Test(expected = AvroRuntimeException.class)
+ public void testByteBufferMissingPayload() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
+
+ buffer.limit(12);
+
+ decoder.decode(buffer);
+ }
+
+ @Test(expected = BadHeaderException.class)
+ public void testByteBufferMissingFullHeader() throws Exception {
+ MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+ MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+ GenericData.get(), SCHEMA_V2);
+
+ ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
+
+ buffer.limit(8);
+
+ decoder.decode(buffer);
+ }
+
+}
diff --git a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
index fd77f2d..25d918f 100644
--- a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
+++ b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
@@ -19,6 +19,7 @@
package org.apache.avro;
import com.google.common.collect.MapMaker;
+import com.google.common.primitives.Bytes;
class GuavaClasses {
/*
@@ -27,5 +28,6 @@
*/
static {
MapMaker.class.getName();
+ Bytes.class.getName();
}
}