[HUDI-4611] Fix the duplicate creation of config in HoodieFlinkStreamer (#6369)

Co-authored-by: linfey <linfey2021@gmail.com>
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 013753b..29f55f7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -69,12 +69,12 @@
     TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
     kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
 
+    Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
     // Read from kafka source
     RowType rowType =
-        (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
+        (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
-    Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
     long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
     int parallelism = env.getParallelism();
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e5e30c1..4b93fae 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -116,10 +116,6 @@
         new Path(cfg.propsFilePath), cfg.configs).getProps();
   }
 
-  public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
-    return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
-  }
-
   public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
     if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
       return new FilebasedSchemaProvider(conf).getSourceSchema();