blob: 50d270bafe2a309bf1c3fa243f81e04eb4de7ce8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.avro.registry.confluent.debezium;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link DebeziumAvroFormatFactory}. */
class DebeziumAvroFormatFactoryTest {
private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Column.physical("a", DataTypes.STRING()),
Column.physical("b", DataTypes.INT()),
Column.physical("c", DataTypes.BOOLEAN()));
private static final RowType ROW_TYPE =
(RowType) SCHEMA.toPhysicalRowDataType().getLogicalType();
private static final String SUBJECT = "test-debezium-avro";
private static final String REGISTRY_URL = "http://localhost:8081";
@Test
void testSeDeSchema() {
final Map<String, String> options = getAllOptions();
final Map<String, String> registryConfigs = new HashMap<>();
registryConfigs.put("basic.auth.user.info", "something1");
registryConfigs.put("basic.auth.credentials.source", "something2");
DebeziumAvroDeserializationSchema expectedDeser =
new DebeziumAvroDeserializationSchema(
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, registryConfigs);
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
assertThat(actualDeser).isEqualTo(expectedDeser);
DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, registryConfigs);
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
assertThat(actualSer).isEqualTo(expectedSer);
}
private Map<String, String> getAllOptions() {
final Map<String, String> options = new HashMap<>();
options.put("connector", TestDynamicTableFactory.IDENTIFIER);
options.put("target", "MyTarget");
options.put("buffer-size", "1000");
options.put("format", DebeziumAvroFormatFactory.IDENTIFIER);
options.put("debezium-avro-confluent.url", REGISTRY_URL);
options.put("debezium-avro-confluent.subject", SUBJECT);
options.put("debezium-avro-confluent.basic-auth.user-info", "something1");
options.put("debezium-avro-confluent.basic-auth.credentials-source", "something2");
return options;
}
private static DeserializationSchema<RowData> createDeserializationSchema(
Map<String, String> options) {
final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
return scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
}
private static SerializationSchema<RowData> createSerializationSchema(
Map<String, String> options) {
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
return sinkMock.valueFormat.createRuntimeEncoder(
new SinkRuntimeProviderContext(false), SCHEMA.toPhysicalRowDataType());
}
}