blob: d835bd3fc8ecb3ea48633a5f080f48f4bcf8c6ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.avro.message;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.internal.ThreadLocalWithInitial;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A {@link MessageDecoder} that reads a binary-encoded datum. This checks for
* the datum header and decodes the payload with the schema that corresponds to
* the 8-byte schema fingerprint.
* <p>
* Instances can decode message payloads for known {@link Schema schemas}, which
* are schemas added using {@link #addSchema(Schema)}, schemas resolved by the
* {@link SchemaStore} passed to the constructor, or the expected schema passed
* to the constructor. Messages encoded using an unknown schema will cause
* instances to throw a {@link MissingSchemaException}.
* <p>
* It is safe to continue using instances of this class after {@link #decode}
* throws {@link BadHeaderException} or {@link MissingSchemaException}.
* <p>
* This class is thread-safe.
*/
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<byte[]> HEADER_BUFFER = ThreadLocalWithInitial.of(() -> new byte[10]);
private static final ThreadLocal<ByteBuffer> FP_BUFFER = ThreadLocalWithInitial.of(() -> {
byte[] header = HEADER_BUFFER.get();
return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN);
});
private final GenericData model;
private final Schema readSchema;
private final SchemaStore resolver;
private final Map<Long, RawMessageDecoder<D>> codecByFingerprint = new ConcurrentHashMap<>();
/**
* Creates a new {@link BinaryMessageEncoder} that uses the given
* {@link GenericData data model} to construct datum instances described by the
* {@link Schema schema}.
* <p>
* The {@code readSchema} is as used the expected schema (read schema). Datum
* instances created by this class will be described by the expected schema.
* <p>
* If {@code readSchema} is {@code null}, the write schema of an incoming buffer
* is used as read schema for that datum instance.
* <p>
* The schema used to decode incoming buffers is determined by the schema
* fingerprint encoded in the message header. This class can decode messages
* that were encoded using the {@code readSchema} (if any) and other schemas
* that are added using {@link #addSchema(Schema)}.
*
* @param model the {@link GenericData data model} for datum instances
* @param readSchema the {@link Schema} used to construct datum instances
*/
public BinaryMessageDecoder(GenericData model, Schema readSchema) {
this(model, readSchema, null);
}
/**
* Creates a new {@link BinaryMessageEncoder} that uses the given
* {@link GenericData data model} to construct datum instances described by the
* {@link Schema schema}.
* <p>
* The {@code readSchema} is used as the expected schema (read schema). Datum
* instances created by this class will be described by the expected schema.
* <p>
* If {@code readSchema} is {@code null}, the write schema of an incoming buffer
* is used as read schema for that datum instance.
* <p>
* The schema used to decode incoming buffers is determined by the schema
* fingerprint encoded in the message header. This class can decode messages
* that were encoded using the {@code readSchema} (if any), other schemas that
* are added using {@link #addSchema(Schema)}, or schemas returned by the
* {@code resolver}.
*
* @param model the {@link GenericData data model} for datum instances
* @param readSchema the {@link Schema} used to construct datum instances
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
*/
public BinaryMessageDecoder(GenericData model, Schema readSchema, SchemaStore resolver) {
this.model = model;
this.readSchema = readSchema;
this.resolver = resolver;
if (readSchema != null) {
addSchema(readSchema);
}
}
/**
* Adds a {@link Schema} that can be used to decode buffers.
*
* @param writeSchema a {@link Schema} to use when decoding buffers
*/
public void addSchema(Schema writeSchema) {
long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
final Schema actualReadSchema = this.readSchema != null ? this.readSchema : writeSchema;
codecByFingerprint.put(fp, new RawMessageDecoder<D>(model, writeSchema, actualReadSchema));
}
private RawMessageDecoder<D> getDecoder(long fp) {
RawMessageDecoder<D> decoder = codecByFingerprint.get(fp);
if (decoder != null) {
return decoder;
}
if (resolver != null) {
Schema writeSchema = resolver.findByFingerprint(fp);
if (writeSchema != null) {
addSchema(writeSchema);
return codecByFingerprint.get(fp);
}
}
throw new MissingSchemaException("Cannot resolve schema for fingerprint: " + fp);
}
@Override
public D decode(InputStream stream, D reuse) throws IOException {
byte[] header = HEADER_BUFFER.get();
try {
if (!readFully(stream, header)) {
throw new BadHeaderException("Not enough header bytes");
}
} catch (IOException e) {
throw new IOException("Failed to read header and fingerprint bytes", e);
}
if (BinaryMessageEncoder.V1_HEADER[0] != header[0] || BinaryMessageEncoder.V1_HEADER[1] != header[1]) {
throw new BadHeaderException(String.format("Unrecognized header bytes: 0x%02X 0x%02X", header[0], header[1]));
}
RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));
return decoder.decode(stream, reuse);
}
/**
* Reads a buffer from a stream, making multiple read calls if necessary.
*
* @param stream an InputStream to read from
* @param bytes a buffer
* @return true if the buffer is complete, false otherwise (stream ended)
* @throws IOException
*/
private boolean readFully(InputStream stream, byte[] bytes) throws IOException {
int pos = 0;
int bytesRead;
while ((bytes.length - pos) > 0 && (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) {
pos += bytesRead;
}
return (pos == bytes.length);
}
}