blob: b39cc1e283fb37c4357f9bf26d5f5291181c567a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.avro.registry.confluent.debezium;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** Tests for {@link DebeziumAvroFormatFactory}. */
class DebeziumAvroFormatFactoryTest {
private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Column.physical("a", DataTypes.STRING()),
Column.physical("b", DataTypes.INT()),
Column.physical("c", DataTypes.BOOLEAN()));
private static final String RECORD_SCHEMA =
" {\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"test\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"before\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " {\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"testSchema\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"a\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"default\": null\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"b\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " \"int\"\n"
+ " ],\n"
+ " \"default\": null\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"c\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " \"boolean\"\n"
+ " ],\n"
+ " \"default\": null\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " ],\n"
+ " \"default\": null\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"after\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " \"testSchema\"\n"
+ " ],\n"
+ " \"default\": null\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"op\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " \"string\"\n"
+ " ],\n"
+ " \"default\": null\n"
+ " }\n"
+ " ]\n"
+ " }\n";
private static final String AVRO_SCHEMA = "[\n\"null\",\n" + RECORD_SCHEMA + "]";
private static final RowType ROW_TYPE =
(RowType) SCHEMA.toPhysicalRowDataType().getLogicalType();
private static final String SUBJECT = "test-debezium-avro";
private static final String REGISTRY_URL = "http://localhost:8081";
@Test
void testSeDeSchema() {
final Map<String, String> options = getAllOptions();
final Map<String, String> registryConfigs = getRegistryConfigs();
DebeziumAvroDeserializationSchema expectedDeser =
new DebeziumAvroDeserializationSchema(
ROW_TYPE,
InternalTypeInfo.of(ROW_TYPE),
REGISTRY_URL,
null,
registryConfigs);
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
assertEquals(expectedDeser, actualDeser);
DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, null, registryConfigs);
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
assertEquals(expectedSer, actualSer);
}
@Test
public void testSeDeSchemaWithSchemaOption() {
final Map<String, String> options = getAllOptions();
options.put("debezium-avro-confluent.schema", AVRO_SCHEMA);
final Map<String, String> registryConfigs = getRegistryConfigs();
DebeziumAvroDeserializationSchema expectedDeser =
new DebeziumAvroDeserializationSchema(
ROW_TYPE,
InternalTypeInfo.of(ROW_TYPE),
REGISTRY_URL,
AVRO_SCHEMA,
registryConfigs);
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
assertThat(actualDeser).isEqualTo(expectedDeser);
DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, AVRO_SCHEMA, registryConfigs);
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
assertThat(actualSer).isEqualTo(expectedSer);
}
@Test
public void testSeDeSchemaWithInvalidSchemaOption() {
final Map<String, String> options = getAllOptions();
options.put("debezium-avro-confluent.schema", RECORD_SCHEMA);
assertThrows(IllegalArgumentException.class, () -> createDeserializationSchema(options));
assertThrows(IllegalArgumentException.class, () -> createSerializationSchema(options));
String basicSchema = "[ \"null\", \"string\" ]";
options.put("debezium-avro-confluent.schema", basicSchema);
assertThrows(IllegalArgumentException.class, () -> createDeserializationSchema(options));
assertThrows(IllegalArgumentException.class, () -> createSerializationSchema(options));
String invalidSchema =
"[\"null\", "
+ "{ \"type\" : \"record\","
+ "\"name\" : \"debezium\","
+ "\"fields\": [{\n"
+ " \"default\": null,\n"
+ " \"name\": \"op\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
+ " \"string\"\n"
+ " ]\n"
+ " }]"
+ "}]";
options.put("debezium-avro-confluent.schema", invalidSchema);
assertThrows(IllegalArgumentException.class, () -> createDeserializationSchema(options));
assertThrows(IllegalArgumentException.class, () -> createSerializationSchema(options));
String invalidRecordSchema = AVRO_SCHEMA.replace("int", "string");
options.put("debezium-avro-confluent.schema", invalidRecordSchema);
assertThrows(IllegalArgumentException.class, () -> createDeserializationSchema(options));
assertThrows(IllegalArgumentException.class, () -> createSerializationSchema(options));
}
@NotNull
private Map<String, String> getRegistryConfigs() {
final Map<String, String> registryConfigs = new HashMap<>();
registryConfigs.put("basic.auth.user.info", "something1");
registryConfigs.put("basic.auth.credentials.source", "something2");
return registryConfigs;
}
private Map<String, String> getAllOptions() {
final Map<String, String> options = new HashMap<>();
options.put("connector", TestDynamicTableFactory.IDENTIFIER);
options.put("target", "MyTarget");
options.put("buffer-size", "1000");
options.put("format", DebeziumAvroFormatFactory.IDENTIFIER);
options.put("debezium-avro-confluent.url", REGISTRY_URL);
options.put("debezium-avro-confluent.subject", SUBJECT);
options.put("debezium-avro-confluent.basic-auth.user-info", "something1");
options.put("debezium-avro-confluent.basic-auth.credentials-source", "something2");
return options;
}
private static DeserializationSchema<RowData> createDeserializationSchema(
Map<String, String> options) {
final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
return scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
}
private static SerializationSchema<RowData> createSerializationSchema(
Map<String, String> options) {
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
return sinkMock.valueFormat.createRuntimeEncoder(
new SinkRuntimeProviderContext(false), SCHEMA.toPhysicalRowDataType());
}
}