blob: a5046ac23c3608d3d0c0428e6548d4c9cf012f61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.avro.registry.confluent;
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.AvroToRowDataConverters;
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.avro.utils.TestDataGenerator;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Random;
import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Tests for {@link AvroRowDataDeserializationSchema} and {@link AvroRowDataSerializationSchema} for
* schema registry avro.
*/
class RegistryAvroRowDataSeDeSchemaTest {
private static final Schema ADDRESS_SCHEMA = Address.getClassSchema();
private static final Schema ADDRESS_SCHEMA_COMPATIBLE =
new Schema.Parser()
.parse(
""
+ "{\"namespace\": \"org.apache.flink.formats.avro.generated\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Address\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"num\", \"type\": \"int\"},\n"
+ " {\"name\": \"street\", \"type\": \"string\"}\n"
+ " ]\n"
+ "}");
private static final String SUBJECT = "address-value";
private static SchemaRegistryClient client;
private Address address;
@BeforeAll
static void beforeClass() {
client = new MockSchemaRegistryClient();
}
@BeforeEach
void before() {
this.address = TestDataGenerator.generateRandomAddress(new Random());
}
@AfterEach
void after() throws IOException, RestClientException {
client.deleteSubject(SUBJECT);
}
@Test
void testRowDataWriteReadWithFullSchema() throws Exception {
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA);
}
@Test
void testRowDataWriteReadWithCompatibleSchema() throws Exception {
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA_COMPATIBLE);
// Validates new schema has been registered.
assertThat(client.getAllVersions(SUBJECT)).hasSize(1);
}
@Test
void testRowDataWriteReadWithPreRegisteredSchema() throws Exception {
client.register(SUBJECT, ADDRESS_SCHEMA);
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA);
// Validates it does not produce new schema.
assertThat(client.getAllVersions(SUBJECT)).hasSize(1);
}
@Test
void testRowDataReadWithNonRegistryAvro() throws Exception {
DataType dataType = AvroSchemaConverter.convertToDataType(ADDRESS_SCHEMA.toString());
RowType rowType = (RowType) dataType.getLogicalType();
AvroRowDataDeserializationSchema deserializer =
getDeserializationSchema(rowType, ADDRESS_SCHEMA);
deserializer.open(null);
client.register(SUBJECT, ADDRESS_SCHEMA);
byte[] oriBytes = writeRecord(address, ADDRESS_SCHEMA);
assertThatThrownBy(() -> deserializer.deserialize(oriBytes))
.isInstanceOf(IOException.class)
.hasCause(new IOException("Unknown data format. Magic number does not match"));
}
private void testRowDataWriteReadWithSchema(Schema schema) throws Exception {
DataType dataType = AvroSchemaConverter.convertToDataType(schema.toString());
RowType rowType = (RowType) dataType.getLogicalType();
AvroRowDataSerializationSchema serializer = getSerializationSchema(rowType, schema);
Schema writeSchema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
AvroRowDataDeserializationSchema deserializer =
getDeserializationSchema(rowType, writeSchema);
serializer.open(null);
deserializer.open(null);
assertThat(deserializer.deserialize(null)).isNull();
RowData oriData = address2RowData(address);
byte[] serialized = serializer.serialize(oriData);
RowData rowData = deserializer.deserialize(serialized);
assertThat(rowData.getArity()).isEqualTo(schema.getFields().size());
assertThat(rowData.getInt(0)).isEqualTo(address.getNum());
assertThat(rowData.getString(1).toString()).isEqualTo(address.getStreet());
if (schema != ADDRESS_SCHEMA_COMPATIBLE) {
assertThat(rowData.getString(2).toString()).isEqualTo(address.getCity());
assertThat(rowData.getString(3).toString()).isEqualTo(address.getState());
assertThat(rowData.getString(4).toString()).isEqualTo(address.getZip());
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static AvroRowDataSerializationSchema getSerializationSchema(
RowType rowType, Schema avroSchema) {
ConfluentSchemaRegistryCoder registryCoder =
new ConfluentSchemaRegistryCoder(SUBJECT, client);
return new AvroRowDataSerializationSchema(
rowType,
new RegistryAvroSerializationSchema<GenericRecord>(
GenericRecord.class, avroSchema, () -> registryCoder),
RowDataToAvroConverters.createConverter(rowType));
}
private static AvroRowDataDeserializationSchema getDeserializationSchema(
RowType rowType, Schema avroSchema) {
ConfluentSchemaRegistryCoder registryCoder =
new ConfluentSchemaRegistryCoder(SUBJECT, client);
return new AvroRowDataDeserializationSchema(
new RegistryAvroDeserializationSchema<GenericRecord>(
GenericRecord.class, avroSchema, () -> registryCoder),
AvroToRowDataConverters.createRowConverter(rowType),
InternalTypeInfo.of(rowType));
}
private static RowData address2RowData(Address address) {
GenericRowData rowData = new GenericRowData(5);
rowData.setField(0, address.getNum());
rowData.setField(1, new BinaryStringData(address.getStreet().toString()));
rowData.setField(2, new BinaryStringData(address.getCity().toString()));
rowData.setField(3, new BinaryStringData(address.getState().toString()));
rowData.setField(4, new BinaryStringData(address.getZip().toString()));
return rowData;
}
}