blob: 11a7336c0813375c4b42fb8aa035451803bebc16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.avro.message;
import com.google.common.collect.MapMaker;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.generic.GenericData;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
/**
* A {@link MessageDecoder} that reads a binary-encoded datum. This checks for
* the datum header and decodes the payload with the schema that corresponds to
* the 8-byte schema fingerprint.
* <p>
* Instances can decode message payloads for known {@link Schema schemas}, which
* are schemas added using {@link #addSchema(Schema)}, schemas resolved by the
* {@link SchemaStore} passed to the constructor, or the expected schema passed
* to the constructor. Messages encoded using an unknown schema will cause
* instances to throw a {@link MissingSchemaException}.
* <p>
* It is safe to continue using instances of this class after {@link #decode}
* throws {@link BadHeaderException} or {@link MissingSchemaException}.
* <p>
* This class is thread-safe.
*/
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<byte[]> HEADER_BUFFER =
new ThreadLocal<byte[]>() {
@Override
protected byte[] initialValue() {
return new byte[10];
}
};
private static final ThreadLocal<ByteBuffer> FP_BUFFER =
new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
byte[] header = HEADER_BUFFER.get();
return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
}
};
private final GenericData model;
private final Schema readSchema;
private final SchemaStore resolver;
private final Map<Long, RawMessageDecoder<D>> codecByFingerprint =
new MapMaker().makeMap();
/**
* Creates a new {@link BinaryMessageEncoder} that uses the given
* {@link GenericData data model} to construct datum instances described by
* the {@link Schema schema}.
* <p>
* The {@code readSchema} is as used the expected schema (read schema). Datum
* instances created by this class will are described by the expected schema.
* <p>
* The schema used to decode incoming buffers is determined by the schema
* fingerprint encoded in the message header. This class can decode messages
* that were encoded using the {@code readSchema} and other schemas that are
* added using {@link #addSchema(Schema)}.
*
* @param model the {@link GenericData data model} for datum instances
* @param readSchema the {@link Schema} used to construct datum instances
*/
public BinaryMessageDecoder(GenericData model, Schema readSchema) {
this(model, readSchema, null);
}
/**
* Creates a new {@link BinaryMessageEncoder} that uses the given
* {@link GenericData data model} to construct datum instances described by
* the {@link Schema schema}.
* <p>
* The {@code readSchema} is used as the expected schema (read schema). Datum
* instances created by this class will are described by the expected schema.
* <p>
* The schema used to decode incoming buffers is determined by the schema
* fingerprint encoded in the message header. This class can decode messages
* that were encoded using the {@code readSchema}, other schemas that are
* added using {@link #addSchema(Schema)}, or schemas returned by the
* {@code resolver}.
*
* @param model the {@link GenericData data model} for datum instances
* @param readSchema the {@link Schema} used to construct datum instances
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
*/
public BinaryMessageDecoder(GenericData model, Schema readSchema,
SchemaStore resolver) {
this.model = model;
this.readSchema = readSchema;
this.resolver = resolver;
addSchema(readSchema);
}
/**
* Adds a {@link Schema} that can be used to decode buffers.
*
* @param writeSchema a {@link Schema} to use when decoding buffers
*/
public void addSchema(Schema writeSchema) {
long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
codecByFingerprint.put(fp,
new RawMessageDecoder<D>(model, writeSchema, readSchema));
}
private RawMessageDecoder<D> getDecoder(long fp) {
RawMessageDecoder<D> decoder = codecByFingerprint.get(fp);
if (decoder != null) {
return decoder;
}
if (resolver != null) {
Schema writeSchema = resolver.findByFingerprint(fp);
if (writeSchema != null) {
addSchema(writeSchema);
return codecByFingerprint.get(fp);
}
}
throw new MissingSchemaException(
"Cannot resolve schema for fingerprint: " + fp);
}
@Override
public D decode(InputStream stream, D reuse) throws IOException {
byte[] header = HEADER_BUFFER.get();
try {
if (!readFully(stream, header)) {
throw new BadHeaderException("Not enough header bytes");
}
} catch (IOException e) {
throw new IOException("Failed to read header and fingerprint bytes", e);
}
if (! (BinaryMessageEncoder.V1_HEADER[0] == header[0])
&& BinaryMessageEncoder.V1_HEADER[1] == header[1]) {
throw new BadHeaderException(String.format(
"Unrecognized header bytes: 0x%h%h",
header[0], header[1]));
}
RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));
return decoder.decode(stream, reuse);
}
/**
* Reads a buffer from a stream, making multiple read calls if necessary.
*
* @param stream an InputStream to read from
* @param bytes a buffer
* @return true if the buffer is complete, false otherwise (stream ended)
* @throws IOException
*/
private boolean readFully(InputStream stream, byte[] bytes)
throws IOException {
int pos = 0;
int bytesRead;
while ((bytes.length - pos) > 0 &&
(bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) {
pos += bytesRead;
}
return (pos == bytes.length);
}
}