[FLINK-24397][connectors/kafka] Remove TableSchema usage from Kafka table connector
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index ab0fa13..a391ec1 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -436,7 +436,7 @@
 
         // adjust physical arity with value format's metadata
         final int adjustedPhysicalArity =
-                producedDataType.getChildren().size() - metadataKeys.size();
+                DataType.getFieldDataTypes(producedDataType).size() - metadataKeys.size();
 
         // adjust value format projection to include value format's metadata columns at the end
         final int[] adjustedValueProjection =
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index cf23b3c..b2d5880 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -31,7 +31,6 @@
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -162,7 +161,10 @@
         validateTableSourceOptions(tableOptions);
 
         validatePKConstraints(
-                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
+                context.getObjectIdentifier(),
+                context.getPrimaryKeyIndexes(),
+                context.getCatalogTable().getOptions(),
+                valueDecodingFormat);
 
         final StartupOptions startupOptions = getStartupOptions(tableOptions);
 
@@ -175,8 +177,7 @@
                 KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
                 partitionDiscoveryInterval.orElse(-1L).toString());
 
-        final DataType physicalDataType =
-                context.getCatalogTable().getSchema().toPhysicalRowDataType();
+        final DataType physicalDataType = context.getPhysicalRowDataType();
 
         final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
 
@@ -222,10 +223,12 @@
         KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
 
         validatePKConstraints(
-                context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
+                context.getObjectIdentifier(),
+                context.getPrimaryKeyIndexes(),
+                context.getCatalogTable().getOptions(),
+                valueEncodingFormat);
 
-        final DataType physicalDataType =
-                context.getCatalogTable().getSchema().toPhysicalRowDataType();
+        final DataType physicalDataType = context.getPhysicalRowDataType();
 
         final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
 
@@ -310,12 +313,15 @@
     }
 
     private static void validatePKConstraints(
-            ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
-        if (catalogTable.getSchema().getPrimaryKey().isPresent()
+            ObjectIdentifier tableName,
+            int[] primaryKeyIndexes,
+            Map<String, String> options,
+            Format format) {
+        if (primaryKeyIndexes.length > 0
                 && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
-            Configuration options = Configuration.fromMap(catalogTable.getOptions());
+            Configuration configuration = Configuration.fromMap(options);
             String formatName =
-                    options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+                    configuration.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
             throw new ValidationException(
                     String.format(
                             "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index 6c0d88e..8682e40 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -26,9 +26,9 @@
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -115,8 +115,11 @@
 
         // Validate the option data type.
         helper.validateExcept(PROPERTIES_PREFIX);
-        TableSchema schema = context.getCatalogTable().getSchema();
-        validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat, schema);
+        validateSource(
+                tableOptions,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                context.getPrimaryKeyIndexes());
 
         Tuple2<int[], int[]> keyValueProjections =
                 createKeyValueProjections(context.getCatalogTable());
@@ -126,7 +129,7 @@
         StartupMode earliest = StartupMode.EARLIEST;
 
         return new KafkaDynamicSource(
-                schema.toPhysicalRowDataType(),
+                context.getPhysicalRowDataType(),
                 keyDecodingFormat,
                 new DecodingFormatWrapper(valueDecodingFormat),
                 keyValueProjections.f0,
@@ -157,8 +160,11 @@
 
         // Validate the option data type.
         helper.validateExcept(PROPERTIES_PREFIX);
-        TableSchema schema = context.getCatalogTable().getSchema();
-        validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat, schema);
+        validateSink(
+                tableOptions,
+                keyEncodingFormat,
+                valueEncodingFormat,
+                context.getPrimaryKeyIndexes());
 
         Tuple2<int[], int[]> keyValueProjections =
                 createKeyValueProjections(context.getCatalogTable());
@@ -175,8 +181,8 @@
         // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
         // it will use hash partition if key is set else in round-robin behaviour.
         return new KafkaDynamicSink(
-                schema.toPhysicalRowDataType(),
-                schema.toPhysicalRowDataType(),
+                context.getPhysicalRowDataType(),
+                context.getPhysicalRowDataType(),
                 keyEncodingFormat,
                 new EncodingFormatWrapper(valueEncodingFormat),
                 keyValueProjections.f0,
@@ -192,8 +198,8 @@
                 tableOptions.get(TRANSACTIONAL_ID_PREFIX));
     }
 
-    private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable catalogTable) {
-        TableSchema schema = catalogTable.getSchema();
+    private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) {
+        ResolvedSchema schema = catalogTable.getResolvedSchema();
         // primary key should validated earlier
         List<String> keyFields = schema.getPrimaryKey().get().getColumns();
         DataType physicalDataType = schema.toPhysicalRowDataType();
@@ -213,17 +219,23 @@
     // --------------------------------------------------------------------------------------------
 
     private static void validateSource(
-            ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) {
+            ReadableConfig tableOptions,
+            Format keyFormat,
+            Format valueFormat,
+            int[] primaryKeyIndexes) {
         validateTopic(tableOptions);
         validateFormat(keyFormat, valueFormat, tableOptions);
-        validatePKConstraints(schema);
+        validatePKConstraints(primaryKeyIndexes);
     }
 
     private static void validateSink(
-            ReadableConfig tableOptions, Format keyFormat, Format valueFormat, TableSchema schema) {
+            ReadableConfig tableOptions,
+            Format keyFormat,
+            Format valueFormat,
+            int[] primaryKeyIndexes) {
         validateTopic(tableOptions);
         validateFormat(keyFormat, valueFormat, tableOptions);
-        validatePKConstraints(schema);
+        validatePKConstraints(primaryKeyIndexes);
         validateSinkBufferFlush(tableOptions);
     }
 
@@ -256,8 +268,8 @@
         }
     }
 
-    private static void validatePKConstraints(TableSchema schema) {
-        if (!schema.getPrimaryKey().isPresent()) {
+    private static void validatePKConstraints(int[] schema) {
+        if (schema.length == 0) {
             throw new ValidationException(
                     "'upsert-kafka' tables require to define a PRIMARY KEY constraint. "
                             + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. "
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java
index cb338a6..4ea4072 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtilTest.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 
@@ -32,6 +30,10 @@
 
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
@@ -43,25 +45,18 @@
 
     @Test
     public void testFormatProjection() {
-        final TableSchema schema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("id", DataTypes.INT()))
-                        .add(TableColumn.metadata("timestamp", DataTypes.TIMESTAMP(3)))
-                        .add(
-                                TableColumn.computed(
-                                        "timestamp_converted",
-                                        DataTypes.STRING(),
-                                        "CAST(`timestamp` AS STRING)"))
-                        .add(TableColumn.physical("name", DataTypes.STRING()))
-                        .add(TableColumn.physical("age", DataTypes.INT()))
-                        .add(TableColumn.physical("address", DataTypes.STRING()))
-                        .build();
+        final DataType dataType =
+                DataTypes.ROW(
+                        FIELD("id", INT()),
+                        FIELD("name", STRING()),
+                        FIELD("age", INT()),
+                        FIELD("address", STRING()));
+
         final Map<String, String> options = createTestOptions();
         options.put("key.fields", "address; name");
         options.put("value.fields-include", "EXCEPT_KEY");
 
         final Configuration config = Configuration.fromMap(options);
-        final DataType dataType = schema.toPhysicalRowDataType();
 
         assertArrayEquals(new int[] {3, 1}, createKeyFormatProjection(config, dataType));
         assertArrayEquals(new int[] {0, 2}, createValueFormatProjection(config, dataType));
@@ -69,12 +64,10 @@
 
     @Test
     public void testMissingKeyFormatProjection() {
-        final TableSchema schema =
-                TableSchema.builder().add(TableColumn.physical("id", DataTypes.INT())).build();
+        final DataType dataType = ROW(FIELD("id", INT()));
         final Map<String, String> options = createTestOptions();
 
         final Configuration config = Configuration.fromMap(options);
-        final DataType dataType = schema.toPhysicalRowDataType();
 
         try {
             createKeyFormatProjection(config, dataType);
@@ -91,16 +84,11 @@
 
     @Test
     public void testInvalidKeyFormatFieldProjection() {
-        final TableSchema schema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("id", DataTypes.INT()))
-                        .add(TableColumn.physical("name", DataTypes.STRING()))
-                        .build();
+        final DataType dataType = ROW(FIELD("id", INT()), FIELD("name", STRING()));
         final Map<String, String> options = createTestOptions();
         options.put("key.fields", "non_existing");
 
         final Configuration config = Configuration.fromMap(options);
-        final DataType dataType = schema.toPhysicalRowDataType();
 
         try {
             createKeyFormatProjection(config, dataType);
@@ -120,18 +108,13 @@
 
     @Test
     public void testInvalidKeyFormatPrefixProjection() {
-        final TableSchema schema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("k_part_1", DataTypes.INT()))
-                        .add(TableColumn.physical("part_2", DataTypes.STRING()))
-                        .add(TableColumn.physical("name", DataTypes.STRING()))
-                        .build();
+        final DataType dataType =
+                ROW(FIELD("k_part_1", INT()), FIELD("part_2", STRING()), FIELD("name", STRING()));
         final Map<String, String> options = createTestOptions();
         options.put("key.fields", "k_part_1;part_2");
         options.put("key.fields-prefix", "k_");
 
         final Configuration config = Configuration.fromMap(options);
-        final DataType dataType = schema.toPhysicalRowDataType();
 
         try {
             createKeyFormatProjection(config, dataType);
@@ -148,17 +131,12 @@
 
     @Test
     public void testInvalidValueFormatProjection() {
-        final TableSchema schema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("k_id", DataTypes.INT()))
-                        .add(TableColumn.physical("id", DataTypes.STRING()))
-                        .build();
+        final DataType dataType = ROW(FIELD("k_id", INT()), FIELD("id", STRING()));
         final Map<String, String> options = createTestOptions();
         options.put("key.fields", "k_id");
         options.put("key.fields-prefix", "k_");
 
         final Configuration config = Configuration.fromMap(options);
-        final DataType dataType = schema.toPhysicalRowDataType();
 
         try {
             createValueFormatProjection(config, dataType);