[hotfix][table-common] Avoid unnecessary casting when creating type information in sources and sinks

This it is not a compatible change. But given that those interfaces are still relatively new and
not many people have changed to the new sources/sinks. We should do this change now or never
and avoid @SuppressWarning in almost all implementations.
diff --git a/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index 9a13425..8b3c2bc 100644
--- a/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ b/flink-formats-kafka/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -73,7 +73,7 @@
 					DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+						context.createTypeInformation(producedDataType);
 				return new AvroRowDataDeserializationSchema(
 						ConfluentRegistryAvroDeserializationSchema.forGeneric(
 								AvroSchemaConverter.convertToSchema(rowType),
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 57952b9..1fe5da1 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -56,7 +56,6 @@
 
 	public static final String IDENTIFIER = "json";
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
@@ -75,7 +74,7 @@
 					DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+						context.createTypeInformation(producedDataType);
 				return new JsonRowDataDeserializationSchema(
 						rowType,
 						rowDataTypeInfo,
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index c5d649d..0afc96c 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -67,7 +67,6 @@
 		.noDefaultValue()
 		.withDescription("Only read changelog rows which match the specific table (by comparing the \"table\" meta field in the record).");
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
@@ -84,7 +83,7 @@
 					DynamicTableSource.Context context, DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-					(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+						context.createTypeInformation(producedDataType);
 				return CanalJsonDeserializationSchema
 					.builder(rowType, rowDataTypeInfo)
 					.setIgnoreParseErrors(ignoreParseErrors)
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index d81f7a1..794b1a4 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -64,7 +64,6 @@
 
 	public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
@@ -80,7 +79,7 @@
 					DynamicTableSource.Context context, DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-					(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+						context.createTypeInformation(producedDataType);
 				return new DebeziumJsonDeserializationSchema(
 					rowType,
 					rowDataTypeInfo,
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
index e3df043..f51e5e2 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
@@ -54,7 +54,6 @@
 
 	public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
@@ -69,7 +68,7 @@
 					DynamicTableSource.Context context, DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-					(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
+						context.createTypeInformation(producedDataType);
 				return new MaxwellJsonDeserializationSchema(
 					rowType,
 					rowDataTypeInfo,