[hotfix][table-common] Avoid unnecessary casting when creating type information in sources and sinks
This it is not a compatible change. But given that those interfaces are still relatively new and
not many people have changed to the new sources/sinks. We should do this change now or never
and avoid @SuppressWarning in almost all implementations.
diff --git a/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index 9a13425..8b3c2bc 100644
--- a/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ b/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -73,7 +73,7 @@
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
- (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+ context.createTypeInformation(producedDataType);
return new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(rowType),
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 57952b9..1fe5da1 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -56,7 +56,6 @@
public static final String IDENTIFIER = "json";
- @SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
@@ -75,7 +74,7 @@
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
- (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+ context.createTypeInformation(producedDataType);
return new JsonRowDataDeserializationSchema(
rowType,
rowDataTypeInfo,
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index c5d649d..0afc96c 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -67,7 +67,6 @@
.noDefaultValue()
.withDescription("Only read changelog rows which match the specific table (by comparing the \"table\" meta field in the record).");
- @SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
@@ -84,7 +83,7 @@
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
- (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+ context.createTypeInformation(producedDataType);
return CanalJsonDeserializationSchema
.builder(rowType, rowDataTypeInfo)
.setIgnoreParseErrors(ignoreParseErrors)
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index d81f7a1..794b1a4 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -64,7 +64,6 @@
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
- @SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
@@ -80,7 +79,7 @@
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
- (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+ context.createTypeInformation(producedDataType);
return new DebeziumJsonDeserializationSchema(
rowType,
rowDataTypeInfo,
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
index e3df043..f51e5e2 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
@@ -54,7 +54,6 @@
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
- @SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
@@ -69,7 +68,7 @@
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
- (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+ context.createTypeInformation(producedDataType);
return new MaxwellJsonDeserializationSchema(
rowType,
rowDataTypeInfo,