[FLINK-24397][connectors/kafka] Remove TableSchema usage from Kafka table connector
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index ab0fa13..a391ec1 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -436,7 +436,7 @@
// adjust physical arity with value format's metadata
final int adjustedPhysicalArity =
- producedDataType.getChildren().size() - metadataKeys.size();
+ DataType.getFieldDataTypes(producedDataType).size() - metadataKeys.size();
// adjust value format projection to include value format's metadata columns at the end
final int[] adjustedValueProjection =
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index cf23b3c..b2d5880 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -31,7 +31,6 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -162,7 +161,10 @@
validateTableSourceOptions(tableOptions);
validatePKConstraints(
- context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
+ context.getObjectIdentifier(),
+ context.getPrimaryKeyIndexes(),
+ context.getCatalogTable().getOptions(),
+ valueDecodingFormat);
final StartupOptions startupOptions = getStartupOptions(tableOptions);
@@ -175,8 +177,7 @@
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
partitionDiscoveryInterval.orElse(-1L).toString());
- final DataType physicalDataType =
- context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ final DataType physicalDataType = context.getPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
@@ -222,10 +223,12 @@
KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
validatePKConstraints(
- context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
+ context.getObjectIdentifier(),
+ context.getPrimaryKeyIndexes(),
+ context.getCatalogTable().getOptions(),
+ valueEncodingFormat);
- final DataType physicalDataType =
- context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ final DataType physicalDataType = context.getPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
@@ -310,12 +313,15 @@
}
private static void validatePKConstraints(
- ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
- if (catalogTable.getSchema().getPrimaryKey().isPresent()
+ ObjectIdentifier tableName,
+ int[] primaryKeyIndexes,
+ Map<String, String> options,
+ Format format) {
+ if (primaryKeyIndexes.length > 0
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
- Configuration options = Configuration.fromMap(catalogTable.getOptions());
+ Configuration configuration = Configuration.fromMap(options);
String formatName =
- options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+ configuration.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
throw new ValidationException(
String.format(
"The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index 6c0d88e..8682e40 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -26,9 +26,9 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -115,8 +115,11 @@
// Validate the option data type.
helper.validateExcept(PROPERTIES_PREFIX);
- TableSchema schema = context.getCatalogTable().getSchema();
- validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat, schema);
+ validateSource(
+ tableOptions,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ context.getPrimaryKeyIndexes());
Tuple2<int[], int[]> keyValueProjections =
createKeyValueProjections(context.getCatalogTable());
@@ -126,7 +129,7 @@
StartupMode earliest = StartupMode.EARLIEST;
return new KafkaDynamicSource(
- schema.toPhysicalRowDataType(),
+ context.getPhysicalRowDataType(),
keyDecodingFormat,
new DecodingFormatWrapper(valueDecodingFormat),
keyValueProjections.f0,
@@ -157,8 +160,11 @@
// Validate the option data type.
helper.validateExcept(PROPERTIES_PREFIX);
- TableSchema schema = context.getCatalogTable().getSchema();
- validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat, schema);
+ validateSink(
+ tableOptions,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ context.getPrimaryKeyIndexes());
Tuple2<int[], int[]> keyValueProjections =
createKeyValueProjections(context.getCatalogTable());
@@ -175,8 +181,8 @@
// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
// it will use hash partition if key is set else in round-robin behaviour.
return new KafkaDynamicSink(
- schema.toPhysicalRowDataType(),
- schema.toPhysicalRowDataType(),
+ context.getPhysicalRowDataType(),
+ context.getPhysicalRowDataType(),
keyEncodingFormat,
new EncodingFormatWrapper(valueEncodingFormat),
keyValueProjections.f0,
@@ -192,8 +198,8 @@
tableOptions.get(TRANSACTIONAL_ID_PREFIX));
}
- private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable catalogTable) {
- TableSchema schema = catalogTable.getSchema();
+ private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) {
+ ResolvedSchema schema = catalogTable.getResolvedSchema();
// primary key should validated earlier
List<String> keyFields = schema.getPrimaryKey().get().getColumns();
DataType physicalDataType = schema.toPhysicalRowDataType();
@@ -213,17 +219,23 @@
// --------------------------------------------------------------------------------------------
private static void validateSource(
- ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) {
+ ReadableConfig tableOptions,
+ Format keyFormat,
+ Format valueFormat,
+ int[] primaryKeyIndexes) {
validateTopic(tableOptions);
validateFormat(keyFormat, valueFormat, tableOptions);
- validatePKConstraints(schema);
+ validatePKConstraints(primaryKeyIndexes);
}
private static void validateSink(
- ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) {
+ ReadableConfig tableOptions,
+ Format keyFormat,
+ Format valueFormat,
+ int[] primaryKeyIndexes) {
validateTopic(tableOptions);
validateFormat(keyFormat, valueFormat, tableOptions);
- validatePKConstraints(schema);
+ validatePKConstraints(primaryKeyIndexes);
validateSinkBufferFlush(tableOptions);
}
@@ -256,8 +268,8 @@
}
}
- private static void validatePKConstraints(TableSchema schema) {
- if (!schema.getPrimaryKey().isPresent()) {
+ private static void validatePKConstraints(int[] schema) {
+ if (schema.length == 0) {
throw new ValidationException(
"'upsert-kafka' tables require to define a PRIMARY KEY constraint. "
+ "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. "
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java
index cb338a6..4ea4072 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java
@@ -20,8 +20,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
@@ -32,6 +30,10 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
@@ -43,25 +45,18 @@
@Test
public void testFormatProjection() {
- final TableSchema schema =
- TableSchema.builder()
- .add(TableColumn.physical("id", DataTypes.INT()))
- .add(TableColumn.metadata("timestamp", DataTypes.TIMESTAMP(3)))
- .add(
- TableColumn.computed(
- "timestamp_converted",
- DataTypes.STRING(),
- "CAST(`timestamp` AS STRING)"))
- .add(TableColumn.physical("name", DataTypes.STRING()))
- .add(TableColumn.physical("age", DataTypes.INT()))
- .add(TableColumn.physical("address", DataTypes.STRING()))
- .build();
+ final DataType dataType =
+ DataTypes.ROW(
+ FIELD("id", INT()),
+ FIELD("name", STRING()),
+ FIELD("age", INT()),
+ FIELD("address", STRING()));
+
final Map<String, String> options = createTestOptions();
options.put("key.fields", "address; name");
options.put("value.fields-include", "EXCEPT_KEY");
final Configuration config = Configuration.fromMap(options);
- final DataType dataType = schema.toPhysicalRowDataType();
assertArrayEquals(new int[] {3, 1}, createKeyFormatProjection(config, dataType));
assertArrayEquals(new int[] {0, 2}, createValueFormatProjection(config, dataType));
@@ -69,12 +64,10 @@
@Test
public void testMissingKeyFormatProjection() {
- final TableSchema schema =
- TableSchema.builder().add(TableColumn.physical("id", DataTypes.INT())).build();
+ final DataType dataType = ROW(FIELD("id", INT()));
final Map<String, String> options = createTestOptions();
final Configuration config = Configuration.fromMap(options);
- final DataType dataType = schema.toPhysicalRowDataType();
try {
createKeyFormatProjection(config, dataType);
@@ -91,16 +84,11 @@
@Test
public void testInvalidKeyFormatFieldProjection() {
- final TableSchema schema =
- TableSchema.builder()
- .add(TableColumn.physical("id", DataTypes.INT()))
- .add(TableColumn.physical("name", DataTypes.STRING()))
- .build();
+ final DataType dataType = ROW(FIELD("id", INT()), FIELD("name", STRING()));
final Map<String, String> options = createTestOptions();
options.put("key.fields", "non_existing");
final Configuration config = Configuration.fromMap(options);
- final DataType dataType = schema.toPhysicalRowDataType();
try {
createKeyFormatProjection(config, dataType);
@@ -120,18 +108,13 @@
@Test
public void testInvalidKeyFormatPrefixProjection() {
- final TableSchema schema =
- TableSchema.builder()
- .add(TableColumn.physical("k_part_1", DataTypes.INT()))
- .add(TableColumn.physical("part_2", DataTypes.STRING()))
- .add(TableColumn.physical("name", DataTypes.STRING()))
- .build();
+ final DataType dataType =
+ ROW(FIELD("k_part_1", INT()), FIELD("part_2", STRING()), FIELD("name", STRING()));
final Map<String, String> options = createTestOptions();
options.put("key.fields", "k_part_1;part_2");
options.put("key.fields-prefix", "k_");
final Configuration config = Configuration.fromMap(options);
- final DataType dataType = schema.toPhysicalRowDataType();
try {
createKeyFormatProjection(config, dataType);
@@ -148,17 +131,12 @@
@Test
public void testInvalidValueFormatProjection() {
- final TableSchema schema =
- TableSchema.builder()
- .add(TableColumn.physical("k_id", DataTypes.INT()))
- .add(TableColumn.physical("id", DataTypes.STRING()))
- .build();
+ final DataType dataType = ROW(FIELD("k_id", INT()), FIELD("id", STRING()));
final Map<String, String> options = createTestOptions();
options.put("key.fields", "k_id");
options.put("key.fields-prefix", "k_");
final Configuration config = Configuration.fromMap(options);
- final DataType dataType = schema.toPhysicalRowDataType();
try {
createValueFormatProjection(config, dataType);