| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.avro.message; |
| |
| import com.google.common.collect.MapMaker; |
| import org.apache.avro.Schema; |
| import org.apache.avro.SchemaNormalization; |
| import org.apache.avro.generic.GenericData; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.util.Map; |
| |
| /** |
| * A {@link MessageDecoder} that reads a binary-encoded datum. This checks for |
| * the datum header and decodes the payload with the schema that corresponds to |
| * the 8-byte schema fingerprint. |
| * <p> |
| * Instances can decode message payloads for known {@link Schema schemas}, which |
| * are schemas added using {@link #addSchema(Schema)}, schemas resolved by the |
| * {@link SchemaStore} passed to the constructor, or the expected schema passed |
| * to the constructor. Messages encoded using an unknown schema will cause |
| * instances to throw a {@link MissingSchemaException}. |
| * <p> |
| * It is safe to continue using instances of this class after {@link #decode} |
| * throws {@link BadHeaderException} or {@link MissingSchemaException}. |
| * <p> |
| * This class is thread-safe. |
| */ |
| public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> { |
| |
| private static final ThreadLocal<byte[]> HEADER_BUFFER = |
| new ThreadLocal<byte[]>() { |
| @Override |
| protected byte[] initialValue() { |
| return new byte[10]; |
| } |
| }; |
| |
| private static final ThreadLocal<ByteBuffer> FP_BUFFER = |
| new ThreadLocal<ByteBuffer>() { |
| @Override |
| protected ByteBuffer initialValue() { |
| byte[] header = HEADER_BUFFER.get(); |
| return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN); |
| } |
| }; |
| |
| private final GenericData model; |
| private final Schema readSchema; |
| private final SchemaStore resolver; |
| |
| private final Map<Long, RawMessageDecoder<D>> codecByFingerprint = |
| new MapMaker().makeMap(); |
| |
| /** |
| * Creates a new {@link BinaryMessageEncoder} that uses the given |
| * {@link GenericData data model} to construct datum instances described by |
| * the {@link Schema schema}. |
| * <p> |
| * The {@code readSchema} is as used the expected schema (read schema). Datum |
| * instances created by this class will are described by the expected schema. |
| * <p> |
| * The schema used to decode incoming buffers is determined by the schema |
| * fingerprint encoded in the message header. This class can decode messages |
| * that were encoded using the {@code readSchema} and other schemas that are |
| * added using {@link #addSchema(Schema)}. |
| * |
| * @param model the {@link GenericData data model} for datum instances |
| * @param readSchema the {@link Schema} used to construct datum instances |
| */ |
| public BinaryMessageDecoder(GenericData model, Schema readSchema) { |
| this(model, readSchema, null); |
| } |
| |
| /** |
| * Creates a new {@link BinaryMessageEncoder} that uses the given |
| * {@link GenericData data model} to construct datum instances described by |
| * the {@link Schema schema}. |
| * <p> |
| * The {@code readSchema} is used as the expected schema (read schema). Datum |
| * instances created by this class will are described by the expected schema. |
| * <p> |
| * The schema used to decode incoming buffers is determined by the schema |
| * fingerprint encoded in the message header. This class can decode messages |
| * that were encoded using the {@code readSchema}, other schemas that are |
| * added using {@link #addSchema(Schema)}, or schemas returned by the |
| * {@code resolver}. |
| * |
| * @param model the {@link GenericData data model} for datum instances |
| * @param readSchema the {@link Schema} used to construct datum instances |
| * @param resolver a {@link SchemaStore} used to find schemas by fingerprint |
| */ |
| public BinaryMessageDecoder(GenericData model, Schema readSchema, |
| SchemaStore resolver) { |
| this.model = model; |
| this.readSchema = readSchema; |
| this.resolver = resolver; |
| addSchema(readSchema); |
| } |
| |
| /** |
| * Adds a {@link Schema} that can be used to decode buffers. |
| * |
| * @param writeSchema a {@link Schema} to use when decoding buffers |
| */ |
| public void addSchema(Schema writeSchema) { |
| long fp = SchemaNormalization.parsingFingerprint64(writeSchema); |
| codecByFingerprint.put(fp, |
| new RawMessageDecoder<D>(model, writeSchema, readSchema)); |
| } |
| |
| private RawMessageDecoder<D> getDecoder(long fp) { |
| RawMessageDecoder<D> decoder = codecByFingerprint.get(fp); |
| if (decoder != null) { |
| return decoder; |
| } |
| |
| if (resolver != null) { |
| Schema writeSchema = resolver.findByFingerprint(fp); |
| if (writeSchema != null) { |
| addSchema(writeSchema); |
| return codecByFingerprint.get(fp); |
| } |
| } |
| |
| throw new MissingSchemaException( |
| "Cannot resolve schema for fingerprint: " + fp); |
| } |
| |
| @Override |
| public D decode(InputStream stream, D reuse) throws IOException { |
| byte[] header = HEADER_BUFFER.get(); |
| try { |
| if (!readFully(stream, header)) { |
| throw new BadHeaderException("Not enough header bytes"); |
| } |
| } catch (IOException e) { |
| throw new IOException("Failed to read header and fingerprint bytes", e); |
| } |
| |
| if (! (BinaryMessageEncoder.V1_HEADER[0] == header[0]) |
| && BinaryMessageEncoder.V1_HEADER[1] == header[1]) { |
| throw new BadHeaderException(String.format( |
| "Unrecognized header bytes: 0x%h%h", |
| header[0], header[1])); |
| } |
| |
| RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2)); |
| |
| return decoder.decode(stream, reuse); |
| } |
| |
| /** |
| * Reads a buffer from a stream, making multiple read calls if necessary. |
| * |
| * @param stream an InputStream to read from |
| * @param bytes a buffer |
| * @return true if the buffer is complete, false otherwise (stream ended) |
| * @throws IOException |
| */ |
| private boolean readFully(InputStream stream, byte[] bytes) |
| throws IOException { |
| int pos = 0; |
| int bytesRead; |
| while ((bytes.length - pos) > 0 && |
| (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) { |
| pos += bytesRead; |
| } |
| return (pos == bytes.length); |
| } |
| } |