blob: c448a91b887025a27266c187c57e68ec5196a626 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.avro.registry.confluent;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.SchemaCoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import javax.annotation.Nullable;
import java.util.Map;
/**
* Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that
* uses Confluent Schema Registry.
*
* @param <T> type of record it produces
*/
public class ConfluentRegistryAvroDeserializationSchema<T>
extends RegistryAvroDeserializationSchema<T> {
private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
private static final long serialVersionUID = -1671641202177852775L;
/**
* Creates a Avro deserialization schema.
*
* @param recordClazz class to which deserialize. Should be either {@link SpecificRecord} or
* {@link GenericRecord}.
* @param reader reader's Avro schema. Should be provided if recordClazz is {@link
* GenericRecord}
* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent
* Schema Registry
*/
private ConfluentRegistryAvroDeserializationSchema(
Class<T> recordClazz,
@Nullable Schema reader,
SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
super(recordClazz, reader, schemaCoderProvider);
}
/**
* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link
* GenericRecord} using the provided reader schema and looks up the writer schema in the
* Confluent Schema Registry.
*
* <p>By default, this method supports up to 1000 cached schema versions.
*
* @param schema schema of produced records
* @param url url of schema registry to connect
* @return deserialized record in form of {@link GenericRecord}
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema, String url) {
return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
}
/**
* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link
* GenericRecord} using the provided reader schema and looks up the writer schema in the
* Confluent Schema Registry.
*
* @param schema schema of produced records
* @param url url of schema registry to connect
* @param identityMapCapacity maximum number of cached schema versions
* @return deserialized record in form of {@link GenericRecord}
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema, String url, int identityMapCapacity) {
return forGeneric(schema, url, identityMapCapacity, null);
}
/**
* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link
* GenericRecord} using the provided reader schema and looks up the writer schema in the
* Confluent Schema Registry.
*
* <p>By default, this method supports up to 1000 cached schema versions.
*
* @param schema schema of produced records
* @param url URL of schema registry to connect
* @param registryConfigs map with additional schema registry configs (for example SSL
* properties)
* @return deserialized record in form of {@link GenericRecord}
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema, String url, @Nullable Map<String, ?> registryConfigs) {
return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY, registryConfigs);
}
/**
* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link
* GenericRecord} using the provided reader schema and looks up the writer schema in the
* Confluent Schema Registry.
*
* @param schema schema of produced records
* @param url URL of schema registry to connect
* @param identityMapCapacity maximum number of cached schema versions
* @param registryConfigs map with additional schema registry configs (for example SSL
* properties)
* @return deserialized record in form of {@link GenericRecord}
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema,
String url,
int identityMapCapacity,
@Nullable Map<String, ?> registryConfigs) {
return new ConfluentRegistryAvroDeserializationSchema<>(
GenericRecord.class,
schema,
new CachedSchemaCoderProvider(null, url, identityMapCapacity, registryConfigs));
}
/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro
* schema and looks up the writer schema in the Confluent Schema Registry.
*
* <p>By default, this method supports up to 1000 cached schema versions.
*
* @param tClass class of record to be produced
* @param url url of schema registry to connect
* @return deserialized record
*/
public static <T extends SpecificRecord>
ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass, String url) {
return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY, null);
}
/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro
* schema and looks up the writer schema in the Confluent Schema Registry.
*
* @param tClass class of record to be produced
* @param url url of schema registry to connect
* @param identityMapCapacity maximum number of cached schema versions
* @return deserialized record
*/
public static <T extends SpecificRecord>
ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass, String url, int identityMapCapacity) {
return forSpecific(tClass, url, identityMapCapacity, null);
}
/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro
* schema and looks up the writer schema in the Confluent Schema Registry.
*
* <p>By default, this method supports up to 1000 cached schema versions.
*
* @param tClass class of record to be produced
* @param url URL of schema registry to connect
* @param registryConfigs map with additional schema registry configs (for example SSL
* properties)
* @return deserialized record
*/
public static <T extends SpecificRecord>
ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass, String url, @Nullable Map<String, ?> registryConfigs) {
return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY, registryConfigs);
}
/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from Avro
* schema and looks up the writer schema in the Confluent Schema Registry.
*
* @param tClass class of record to be produced
* @param url URL of schema registry to connect
* @param identityMapCapacity maximum number of cached schema versions
* @param registryConfigs map with additional schema registry configs (for example SSL
* properties)
* @return deserialized record
*/
public static <T extends SpecificRecord>
ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass,
String url,
int identityMapCapacity,
@Nullable Map<String, ?> registryConfigs) {
return new ConfluentRegistryAvroDeserializationSchema<>(
tClass,
null,
new CachedSchemaCoderProvider(null, url, identityMapCapacity, registryConfigs));
}
}