AVRO-1704: Java: Add support for single-message encoding.
diff --git a/CHANGES.txt b/CHANGES.txt
index 0dfbdfc..a94ed17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
 
   NEW FEATURES
 
+    AVRO-1704: Java: Add support for single-message encoding. (blue)
+
   OPTIMIZATIONS
 
   IMPROVEMENTS
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
new file mode 100644
index 0000000..38c0001
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * Exception thrown by a {@link MessageDecoder} when a message header is not
+ * recognized.
+ * <p>
+ * This usually indicates that the encoded bytes were not an Avro message.
+ */
+public class BadHeaderException extends AvroRuntimeException {
+  public BadHeaderException(String message) {
+    super(message);
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
new file mode 100644
index 0000000..11a7336
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.MapMaker;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+
+/**
+ * A {@link MessageDecoder} that reads a binary-encoded datum. This checks for
+ * the datum header and decodes the payload with the schema that corresponds to
+ * the 8-byte schema fingerprint.
+ * <p>
+ * Instances can decode message payloads for known {@link Schema schemas}, which
+ * are schemas added using {@link #addSchema(Schema)}, schemas resolved by the
+ * {@link SchemaStore} passed to the constructor, or the expected schema passed
+ * to the constructor. Messages encoded using an unknown schema will cause
+ * instances to throw a {@link MissingSchemaException}.
+ * <p>
+ * It is safe to continue using instances of this class after {@link #decode}
+ * throws {@link BadHeaderException} or {@link MissingSchemaException}.
+ * <p>
+ * This class is thread-safe.
+ */
+public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
+
+  private static final ThreadLocal<byte[]> HEADER_BUFFER =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          return new byte[10];
+        }
+      };
+
+  private static final ThreadLocal<ByteBuffer> FP_BUFFER =
+      new ThreadLocal<ByteBuffer>() {
+        @Override
+        protected ByteBuffer initialValue() {
+          byte[] header = HEADER_BUFFER.get();
+          return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
+        }
+      };
+
+  private final GenericData model;
+  private final Schema readSchema;
+  private final SchemaStore resolver;
+
+  private final Map<Long, RawMessageDecoder<D>> codecByFingerprint =
+      new MapMaker().makeMap();
+
+  /**
+   * Creates a new {@link BinaryMessageEncoder} that uses the given
+   * {@link GenericData data model} to construct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * The {@code readSchema} is as used the expected schema (read schema). Datum
+   * instances created by this class will are described by the expected schema.
+   * <p>
+   * The schema used to decode incoming buffers is determined by the schema
+   * fingerprint encoded in the message header. This class can decode messages
+   * that were encoded using the {@code readSchema} and other schemas that are
+   * added using {@link #addSchema(Schema)}.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param readSchema the {@link Schema} used to construct datum instances
+   */
+  public BinaryMessageDecoder(GenericData model, Schema readSchema) {
+    this(model, readSchema, null);
+  }
+
+  /**
+   * Creates a new {@link BinaryMessageEncoder} that uses the given
+   * {@link GenericData data model} to construct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * The {@code readSchema} is used as the expected schema (read schema). Datum
+   * instances created by this class will are described by the expected schema.
+   * <p>
+   * The schema used to decode incoming buffers is determined by the schema
+   * fingerprint encoded in the message header. This class can decode messages
+   * that were encoded using the {@code readSchema}, other schemas that are
+   * added using {@link #addSchema(Schema)}, or schemas returned by the
+   * {@code resolver}.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param readSchema the {@link Schema} used to construct datum instances
+   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+   */
+  public BinaryMessageDecoder(GenericData model, Schema readSchema,
+                              SchemaStore resolver) {
+    this.model = model;
+    this.readSchema = readSchema;
+    this.resolver = resolver;
+    addSchema(readSchema);
+  }
+
+  /**
+   * Adds a {@link Schema} that can be used to decode buffers.
+   *
+   * @param writeSchema a {@link Schema} to use when decoding buffers
+   */
+  public void addSchema(Schema writeSchema) {
+    long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
+    codecByFingerprint.put(fp,
+        new RawMessageDecoder<D>(model, writeSchema, readSchema));
+  }
+
+  private RawMessageDecoder<D> getDecoder(long fp) {
+    RawMessageDecoder<D> decoder = codecByFingerprint.get(fp);
+    if (decoder != null) {
+      return decoder;
+    }
+
+    if (resolver != null) {
+      Schema writeSchema = resolver.findByFingerprint(fp);
+      if (writeSchema != null) {
+        addSchema(writeSchema);
+        return codecByFingerprint.get(fp);
+      }
+    }
+
+    throw new MissingSchemaException(
+        "Cannot resolve schema for fingerprint: " + fp);
+  }
+
+  @Override
+  public D decode(InputStream stream, D reuse) throws IOException {
+    byte[] header = HEADER_BUFFER.get();
+    try {
+      if (!readFully(stream, header)) {
+        throw new BadHeaderException("Not enough header bytes");
+      }
+    } catch (IOException e) {
+      throw new IOException("Failed to read header and fingerprint bytes", e);
+    }
+
+    if (! (BinaryMessageEncoder.V1_HEADER[0] == header[0])
+        && BinaryMessageEncoder.V1_HEADER[1] == header[1]) {
+      throw new BadHeaderException(String.format(
+          "Unrecognized header bytes: 0x%h%h",
+          header[0], header[1]));
+    }
+
+    RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));
+
+    return decoder.decode(stream, reuse);
+  }
+
+  /**
+   * Reads a buffer from a stream, making multiple read calls if necessary.
+   *
+   * @param stream an InputStream to read from
+   * @param bytes a buffer
+   * @return true if the buffer is complete, false otherwise (stream ended)
+   * @throws IOException
+   */
+  private boolean readFully(InputStream stream, byte[] bytes)
+      throws IOException {
+    int pos = 0;
+    int bytesRead;
+    while ((bytes.length - pos) > 0 &&
+        (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) {
+      pos += bytesRead;
+    }
+    return (pos == bytes.length);
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
new file mode 100644
index 0000000..3cf3d0c
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.primitives.Bytes;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * A {@link MessageEncoder} that adds a header and 8-byte schema fingerprint to
+ * each datum encoded as binary.
+ * <p>
+ * This class is thread-safe.
+ */
+public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
+
+  static final byte[] V1_HEADER = new byte[] {(byte) 0xC3, (byte) 0x01};
+
+  private final RawMessageEncoder<D> writeCodec;
+
+  /**
+   * Creates a new {@link BinaryMessageEncoder} that uses the given
+   * {@link GenericData data model} to deconstruct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * Buffers returned by {@link #encode(D)} are copied and will not be modified
+   * by future calls to {@code encode}.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param schema the {@link Schema} for datum instances
+   */
+  public BinaryMessageEncoder(GenericData model, Schema schema) {
+    this(model, schema, true);
+  }
+
+  /**
+   * Creates a new {@link BinaryMessageEncoder} that uses the given
+   * {@link GenericData data model} to deconstruct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)}
+   * are copied and will not be modified by future calls to {@code encode}.
+   * <p>
+   * If {@code shouldCopy} is false, then buffers returned by {@code encode}
+   * wrap a thread-local buffer that can be reused by future calls to
+   * {@code encode}, but may not be. Callers should only set {@code shouldCopy}
+   * to false if the buffer will be copied before the current thread's next call
+   * to {@code encode}.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param schema the {@link Schema} for datum instances
+   * @param shouldCopy whether to copy buffers before returning encoded results
+   */
+  public BinaryMessageEncoder(GenericData model, Schema schema,
+                              boolean shouldCopy) {
+    this.writeCodec = new V1MessageEncoder<D>(model, schema, shouldCopy);
+  }
+
+  @Override
+  public ByteBuffer encode(D datum) throws IOException {
+    return writeCodec.encode(datum);
+  }
+
+  @Override
+  public void encode(D datum, OutputStream stream) throws IOException {
+    writeCodec.encode(datum, stream);
+  }
+
+  /**
+   * This is a RawDatumEncoder that adds the V1 header to the outgoing buffer.
+   * BinaryDatumEncoder wraps this class to avoid confusion over what it does.
+   * It should not have an "is a" relationship with RawDatumEncoder because it
+   * adds the extra bytes.
+   */
+  private static class V1MessageEncoder<D> extends RawMessageEncoder<D> {
+    private final byte[] headerBytes;
+
+    V1MessageEncoder(GenericData model, Schema schema, boolean shouldCopy) {
+      super(model, schema, shouldCopy);
+      this.headerBytes = getWriteHeader(schema);
+    }
+
+    @Override
+    public void encode(D datum, OutputStream stream) throws IOException {
+      stream.write(headerBytes);
+      super.encode(datum, stream);
+    }
+
+    private static byte[] getWriteHeader(Schema schema) {
+      try {
+        byte[] fp = SchemaNormalization
+            .parsingFingerprint("CRC-64-AVRO", schema);
+        return Bytes.concat(V1_HEADER, fp);
+      } catch (NoSuchAlgorithmException e) {
+        throw new AvroRuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
new file mode 100644
index 0000000..bc86d12
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.util.ReusableByteArrayInputStream;
+import org.apache.avro.util.ReusableByteBufferInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Deserializes a single datum from a ByteBuffer, byte array, or InputStream.
+ * @param <D> a datum class
+ */
+public interface MessageDecoder<D> {
+
+  /**
+   * Deserialize a single datum from an InputStream.
+   *
+   * @param stream stream to read from
+   * @return a datum read from the stream
+   * @throws BadHeaderException If the payload's header is not recognized.
+   * @throws MissingSchemaException If the payload's schema cannot be found.
+   * @throws IOException
+   */
+  D decode(InputStream stream) throws IOException;
+
+  /**
+   * Deserialize a single datum from an InputStream.
+   *
+   * @param stream stream to read from
+   * @param reuse a datum instance to reuse, avoiding instantiation if possible
+   * @return a datum read from the stream
+   * @throws BadHeaderException If the payload's header is not recognized.
+   * @throws MissingSchemaException If the payload's schema cannot be found.
+   * @throws IOException
+   */
+  D decode(InputStream stream, D reuse) throws IOException;
+
+  /**
+   * Deserialize a single datum from a ByteBuffer.
+   *
+   * @param encoded a ByteBuffer containing an encoded datum
+   * @return a datum read from the stream
+   * @throws BadHeaderException If the payload's header is not recognized.
+   * @throws MissingSchemaException If the payload's schema cannot be found.
+   * @throws IOException
+   */
+  D decode(ByteBuffer encoded) throws IOException;
+
+  /**
+   * Deserialize a single datum from a ByteBuffer.
+   *
+   * @param encoded a ByteBuffer containing an encoded datum
+   * @param reuse a datum instance to reuse, avoiding instantiation if possible
+   * @return a datum read from the stream
+   * @throws BadHeaderException If the payload's header is not recognized.
+   * @throws MissingSchemaException If the payload's schema cannot be found.
+   * @throws IOException
+   */
+  D decode(ByteBuffer encoded, D reuse) throws IOException;
+
+  /**
+   * Deserialize a single datum from a byte array.
+   *
+   * @param encoded a byte array containing an encoded datum
+   * @return a datum read from the stream
+   * @throws BadHeaderException If the payload's header is not recognized.
+   * @throws MissingSchemaException If the payload's schema cannot be found.
+   * @throws IOException
+   */
+  D decode(byte[] encoded) throws IOException;
+
+  /**
+   * Deserialize a single datum from a byte array.
+   *
+   * @param encoded a byte array containing an encoded datum
+   * @param reuse a datum instance to reuse, avoiding instantiation if possible
+   * @return a datum read from the stream
+   * @throws BadHeaderException If the payload's header is not recognized.
+   * @throws MissingSchemaException If the payload's schema cannot be found.
+   * @throws IOException
+   */
+  D decode(byte[] encoded, D reuse) throws IOException;
+
+  /**
+   * Base class for {@link MessageEncoder} implementations that provides default
+   * implementations for most of the {@code DatumEncoder} API.
+   * <p>
+   * Implementations provided by this base class are thread-safe.
+   *
+   * @param <D> a datum class
+   */
+  abstract class BaseDecoder<D> implements MessageDecoder<D> {
+
+    private static final ThreadLocal<ReusableByteArrayInputStream>
+        BYTE_ARRAY_IN = new ThreadLocal<ReusableByteArrayInputStream>() {
+          @Override
+          protected ReusableByteArrayInputStream initialValue() {
+            return new ReusableByteArrayInputStream();
+          }
+        };
+
+    private static final ThreadLocal<ReusableByteBufferInputStream>
+        BYTE_BUFFER_IN = new ThreadLocal<ReusableByteBufferInputStream>() {
+          @Override
+          protected ReusableByteBufferInputStream initialValue() {
+            return new ReusableByteBufferInputStream();
+          }
+        };
+
+    @Override
+    public D decode(InputStream stream) throws IOException {
+      return decode(stream, null);
+    }
+
+    @Override
+    public D decode(ByteBuffer encoded) throws IOException {
+      return decode(encoded, null);
+    }
+
+    @Override
+    public D decode(byte[] encoded) throws IOException {
+      return decode(encoded, null);
+    }
+
+    @Override
+    public D decode(ByteBuffer encoded, D reuse) throws IOException {
+      ReusableByteBufferInputStream in = BYTE_BUFFER_IN.get();
+      in.setByteBuffer(encoded);
+      return decode(in, reuse);
+    }
+
+    @Override
+    public D decode(byte[] encoded, D reuse) throws IOException {
+      ReusableByteArrayInputStream in = BYTE_ARRAY_IN.get();
+      in.setByteArray(encoded, 0, encoded.length);
+      return decode(in, reuse);
+    }
+
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
new file mode 100644
index 0000000..60bfb79
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializes an individual datum as a ByteBuffer or to an OutputStream.
+ * @param <D> a datum class
+ */
+public interface MessageEncoder<D> {
+
+  /**
+   * Serialize a single datum to a ByteBuffer.
+   *
+   * @param datum a datum
+   * @return a ByteBuffer containing the serialized datum
+   * @throws IOException
+   */
+  ByteBuffer encode(D datum) throws IOException;
+
+  /**
+   * Serialize a single datum to an OutputStream.
+   *
+   * @param datum a datum
+   * @param stream an OutputStream to serialize the datum to
+   * @throws IOException
+   */
+  void encode(D datum, OutputStream stream) throws IOException;
+
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
new file mode 100644
index 0000000..a3b89fd
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * Exception thrown by a {@link MessageDecoder} when the message is encoded
+ * using an unknown {@link org.apache.avro.Schema}.
+ * <p>
+ * Using a {@link SchemaStore} to provide schemas to the decoder can avoid this
+ * problem.
+ */
+public class MissingSchemaException extends AvroRuntimeException {
+  public MissingSchemaException(String message) {
+    super(message);
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
new file mode 100644
index 0000000..52a7c2e
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link MessageDecoder} that deserializes from raw datum bytes.
+ * <p>
+ * This class uses the schema passed to its constructor when decoding buffers.
+ * To decode buffers that have different schemas, use
+ * {@link BinaryMessageEncoder} and {@link BinaryMessageDecoder}.
+ * <p>
+ * This will not throw {@link BadHeaderException} because it expects no header,
+ * and will not throw {@link MissingSchemaException} because it always uses the
+ * read schema from its constructor.
+ * <p>
+ * This class is thread-safe.
+ */
+public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
+
+  private static final ThreadLocal<BinaryDecoder> DECODER =
+      new ThreadLocal<BinaryDecoder>();
+
+  private final Schema writeSchema;
+  private final Schema readSchema;
+  private final DatumReader<D> reader;
+
+  /**
+   * Creates a new {@link RawMessageDecoder} that uses the given
+   * {@link GenericData data model} to construct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * The {@code schema} is used as both the expected schema (read schema) and
+   * for the schema of payloads that are decoded (written schema).
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param schema the {@link Schema} used to construct datum instances and to
+   *               decode buffers.
+   */
+  public RawMessageDecoder(GenericData model, Schema schema) {
+    this(model, schema, schema);
+  }
+
+  /**
+   * Creates a new {@link RawMessageDecoder} that uses the given
+   * {@link GenericData data model} to construct datum instances described by
+   * the {@link Schema readSchema}.
+   * <p>
+   * The {@code readSchema} is used for the expected schema and the
+   * {@code writeSchema} is the schema used to decode buffers. The
+   * {@code writeSchema} must be the schema that was used to encode all buffers
+   * decoded by this class.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param readSchema the {@link Schema} used to construct datum instances
+   * @param writeSchema the {@link Schema} used to decode buffers
+   */
+  public RawMessageDecoder(GenericData model, Schema writeSchema,
+                           Schema readSchema) {
+    this.writeSchema = writeSchema;
+    this.readSchema = readSchema;
+    this.reader = model.createDatumReader(this.writeSchema, this.readSchema);
+  }
+
+  @Override
+  public D decode(InputStream stream, D reuse) {
+    BinaryDecoder decoder = DecoderFactory.get()
+        .directBinaryDecoder(stream, DECODER.get());
+    DECODER.set(decoder);
+    try {
+      return reader.read(reuse, decoder);
+    } catch (IOException e) {
+      throw new AvroRuntimeException("Decoding datum failed", e);
+    }
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
new file mode 100644
index 0000000..07ed861
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link MessageEncoder} that encodes only a datum's bytes, without additional
+ * information (such as a schema fingerprint).
+ * <p>
+ * This class is thread-safe.
+ */
+public class RawMessageEncoder<D> implements MessageEncoder<D> {
+
+  private static final ThreadLocal<BufferOutputStream> TEMP =
+      new ThreadLocal<BufferOutputStream>() {
+        @Override
+        protected BufferOutputStream initialValue() {
+          return new BufferOutputStream();
+        }
+      };
+
+  private static final ThreadLocal<BinaryEncoder> ENCODER =
+      new ThreadLocal<BinaryEncoder>();
+
+  private final Schema writeSchema;
+  private final boolean copyOutputBytes;
+  private final DatumWriter<D> writer;
+
+  /**
+   * Creates a new {@link RawMessageEncoder} that uses the given
+   * {@link GenericData data model} to deconstruct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * Buffers returned by {@link #encode(D)} are copied and will not be modified
+   * by future calls to {@code encode}.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param schema the {@link Schema} for datum instances
+   */
+  public RawMessageEncoder(GenericData model, Schema schema) {
+    this(model, schema, true);
+  }
+
+  /**
+   * Creates a new {@link RawMessageEncoder} that uses the given
+   * {@link GenericData data model} to deconstruct datum instances described by
+   * the {@link Schema schema}.
+   * <p>
+   * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)}
+   * are copied and will not be modified by future calls to {@code encode}.
+   * <p>
+   * If {@code shouldCopy} is false, then buffers returned by {@code encode}
+   * wrap a thread-local buffer that can be reused by future calls to
+   * {@code encode}, but may not be. Callers should only set {@code shouldCopy}
+   * to false if the buffer will be copied before the current thread's next call
+   * to {@code encode}.
+   *
+   * @param model the {@link GenericData data model} for datum instances
+   * @param schema the {@link Schema} for datum instances
+   * @param shouldCopy whether to copy buffers before returning encoded results
+   */
+  public RawMessageEncoder(GenericData model, Schema schema, boolean shouldCopy) {
+    this.writeSchema = schema;
+    this.copyOutputBytes = shouldCopy;
+    this.writer = model.createDatumWriter(this.writeSchema);
+  }
+
+  @Override
+  public ByteBuffer encode(D datum) throws IOException {
+    BufferOutputStream temp = TEMP.get();
+    temp.reset();
+
+    encode(datum, temp);
+
+    if (copyOutputBytes) {
+      return temp.toBufferWithCopy();
+    } else {
+      return temp.toBufferWithoutCopy();
+    }
+  }
+
+  @Override
+  public void encode(D datum, OutputStream stream) throws IOException {
+    BinaryEncoder encoder = EncoderFactory.get()
+        .directBinaryEncoder(stream, ENCODER.get());
+    ENCODER.set(encoder);
+    writer.write(datum, encoder);
+    encoder.flush();
+  }
+
+  private static class BufferOutputStream extends ByteArrayOutputStream {
+    BufferOutputStream() {
+    }
+
+    ByteBuffer toBufferWithoutCopy() {
+      return ByteBuffer.wrap(buf, 0, count);
+    }
+
+    ByteBuffer toBufferWithCopy() {
+      return ByteBuffer.wrap(toByteArray());
+    }
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
new file mode 100644
index 0000000..6e89b52
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.MapMaker;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import java.util.Map;
+
+/**
+ * Interface for classes that can provide avro schemas by fingerprint.
+ */
+public interface SchemaStore {
+
+  /**
+   * Retrieves a fingerprint by its AVRO-CRC-64 fingerprint.
+   * @param fingerprint an AVRO-CRC-64 fingerprint long
+   * @return a Schema with the given fingerprint, or null
+   */
+  Schema findByFingerprint(long fingerprint);
+
+  /**
+   * A map-based cache of schemas by AVRO-CRC-64 fingerprint.
+   * <p>
+   * This class is thread-safe.
+   */
+  class Cache implements SchemaStore {
+    private final Map<Long, Schema> schemas = new MapMaker().makeMap();
+
+    /**
+     * Adds a schema to this cache that can be retrieved using its AVRO-CRC-64
+     * fingerprint.
+     *
+     * @param schema a {@link Schema}
+     */
+    public void addSchema(Schema schema) {
+      long fp = SchemaNormalization.parsingFingerprint64(schema);
+      schemas.put(fp, schema);
+    }
+
+    @Override
+    public Schema findByFingerprint(long fingerprint) {
+      return schemas.get(fingerprint);
+    }
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
new file mode 100644
index 0000000..6fd2ae4
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.util;
+
+import java.io.ByteArrayInputStream;
+
+public class ReusableByteArrayInputStream extends ByteArrayInputStream {
+  public ReusableByteArrayInputStream() {
+    super(new byte[0]);
+  }
+
+  public void setByteArray(byte[] buf, int offset, int length) {
+    this.buf = buf;
+    this.pos = offset;
+    this.count = Math.min(offset + length, buf.length);
+    this.mark = offset;
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
new file mode 100644
index 0000000..eff7fdc
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ReusableByteBufferInputStream extends InputStream {
+
+  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+  private ByteBuffer buffer = EMPTY_BUFFER;
+  private int mark = 0;
+
+  public void setByteBuffer(ByteBuffer buf) {
+    // do not modify the buffer that is passed in
+    this.buffer = buf.duplicate();
+    this.mark = buf.position();
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (buffer.hasRemaining()) {
+      return buffer.get() & 0xff;
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (buffer.remaining() <= 0) {
+      return -1;
+    }
+    // allow IndexOutOfBoundsException to be thrown by ByteBuffer#get
+    int bytesToRead = Math.min(len, buffer.remaining());
+    buffer.get(b, off, bytesToRead);
+    return bytesToRead;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (n <= 0) {
+      // n may be negative and results in skipping 0 bytes, according to javadoc
+      return 0;
+    }
+
+    // this catches n > Integer.MAX_VALUE
+    int bytesToSkip = n > buffer.remaining() ? buffer.remaining() : (int) n;
+    buffer.position(buffer.position() + bytesToSkip);
+    return bytesToSkip;
+  }
+
+  @Override
+  public synchronized void mark(int readLimit) {
+    // readLimit is ignored. there is no requirement to implement readLimit, it
+    // is a way for implementations to avoid buffering too much. since all data
+    // for this stream is held in memory, this has no need for such a limit.
+    this.mark = buffer.position();
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    buffer.position(mark);
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
new file mode 100644
index 0000000..47656b8
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.avro.message;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class TestBinaryMessageEncoding {
+  public static final Schema SCHEMA_V1 = SchemaBuilder.record("TestRecord")
+      .fields()
+      .requiredInt("id")
+      .optionalString("msg")
+      .endRecord();
+
+  public static final GenericRecordBuilder V1_BUILDER =
+      new GenericRecordBuilder(SCHEMA_V1);
+
+  public static final List<Record> V1_RECORDS = Arrays.asList(
+      V1_BUILDER.set("id", 1).set("msg", "m-1").build(),
+      V1_BUILDER.set("id", 2).set("msg", "m-2").build(),
+      V1_BUILDER.set("id", 4).set("msg", "m-4").build(),
+      V1_BUILDER.set("id", 6).set("msg", "m-6").build()
+  );
+
+  public static final Schema SCHEMA_V2 = SchemaBuilder.record("TestRecord")
+      .fields()
+      .requiredLong("id")
+      .name("message").aliases("msg").type().optional().stringType()
+      .optionalDouble("data")
+      .endRecord();
+
+  public static final GenericRecordBuilder V2_BUILDER =
+      new GenericRecordBuilder(SCHEMA_V2);
+
+  public static final List<Record> V2_RECORDS = Arrays.asList(
+      V2_BUILDER.set("id", 3L).set("message", "m-3").set("data", 12.3).build(),
+      V2_BUILDER.set("id", 5L).set("message", "m-5").set("data", 23.4).build(),
+      V2_BUILDER.set("id", 7L).set("message", "m-7").set("data", 34.5).build(),
+      V2_BUILDER.set("id", 8L).set("message", "m-8").set("data", 35.6).build()
+  );
+
+  @Test
+  public void testByteBufferRoundTrip() throws Exception {
+    MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+    MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+
+    Record copy = decoder.decode(encoder.encode(V2_RECORDS.get(0)));
+
+    Assert.assertTrue("Copy should not be the same object",
+        copy != V2_RECORDS.get(0));
+    Assert.assertEquals("Record should be identical after round-trip",
+        V2_RECORDS.get(0), copy);
+  }
+
+  @Test
+  public void testSchemaEvolution() throws Exception {
+    List<ByteBuffer> buffers = Lists.newArrayList();
+    List<Record> records = Ordering.usingToString().sortedCopy(
+        Iterables.concat(V1_RECORDS, V2_RECORDS));
+
+    MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+    MessageEncoder<Record> v2Encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+
+    for (Record record : records) {
+      if (record.getSchema() == SCHEMA_V1) {
+        buffers.add(v1Encoder.encode(record));
+      } else {
+        buffers.add(v2Encoder.encode(record));
+      }
+    }
+
+    Set<Record> allAsV2 = Sets.newHashSet(V2_RECORDS);
+    allAsV2.add(
+        V2_BUILDER.set("id", 1L).set("message", "m-1").clear("data").build());
+    allAsV2.add(
+        V2_BUILDER.set("id", 2L).set("message", "m-2").clear("data").build());
+    allAsV2.add(
+        V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build());
+    allAsV2.add(
+        V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build());
+
+    BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+    v2Decoder.addSchema(SCHEMA_V1);
+
+    Set<Record> decodedUsingV2 = Sets.newHashSet();
+    for (ByteBuffer buffer : buffers) {
+      decodedUsingV2.add(v2Decoder.decode(buffer));
+    }
+
+    Assert.assertEquals(allAsV2, decodedUsingV2);
+  }
+
+  @Test(expected = MissingSchemaException.class)
+  public void testCompatibleReadFailsWithoutSchema() throws Exception {
+    MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+    BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+
+    ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
+
+    v2Decoder.decode(v1Buffer);
+  }
+
+  @Test
+  public void testCompatibleReadWithSchema() throws Exception {
+    MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+    BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+    v2Decoder.addSchema(SCHEMA_V1);
+
+    ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3));
+
+    Record record = v2Decoder.decode(v1Buffer);
+
+    Assert.assertEquals(
+        V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build(),
+        record);
+  }
+
+  @Test
+  public void testCompatibleReadWithSchemaFromLookup() throws Exception {
+    MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+
+    SchemaStore.Cache schemaCache = new SchemaStore.Cache();
+    schemaCache.addSchema(SCHEMA_V1);
+    BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2, schemaCache);
+
+    ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(2));
+
+    Record record = v2Decoder.decode(v1Buffer);
+
+    Assert.assertEquals(
+        V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build(),
+        record);
+  }
+
+  @Test
+  public void testBufferReuse() throws Exception {
+    // This test depends on the serialized version of record 1 being smaller or
+    // the same size as record 0 so that the reused ByteArrayOutputStream won't
+    // expand its internal buffer.
+    MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V1, false);
+
+    ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
+    ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
+
+    Assert.assertEquals(b0.array(), b1.array());
+
+    MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+    Assert.assertEquals("Buffer was reused, decode(b0) should be record 1",
+        V1_RECORDS.get(1), decoder.decode(b0));
+  }
+
+  @Test
+  public void testBufferCopy() throws Exception {
+    MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+
+    ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0));
+    ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1));
+
+    Assert.assertNotEquals(b0.array(), b1.array());
+
+    MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V1);
+    // bytes are not changed by reusing the encoder
+    Assert.assertEquals("Buffer was copied, decode(b0) should be record 0",
+        V1_RECORDS.get(0), decoder.decode(b0));
+  }
+
+  @Test(expected = AvroRuntimeException.class)
+  public void testByteBufferMissingPayload() throws Exception {
+    MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+    MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+
+    ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
+
+    buffer.limit(12);
+
+    decoder.decode(buffer);
+  }
+
+  @Test(expected = BadHeaderException.class)
+  public void testByteBufferMissingFullHeader() throws Exception {
+    MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+    MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>(
+        GenericData.get(), SCHEMA_V2);
+
+    ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0));
+
+    buffer.limit(8);
+
+    decoder.decode(buffer);
+  }
+
+}
diff --git a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
index fd77f2d..25d918f 100644
--- a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
+++ b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
@@ -19,6 +19,7 @@
 package org.apache.avro;
 
 import com.google.common.collect.MapMaker;
+import com.google.common.primitives.Bytes;
 
 class GuavaClasses {
   /*
@@ -27,5 +28,6 @@
    */
   static {
     MapMaker.class.getName();
+    Bytes.class.getName();
   }
 }