[HUDI-4611] Fix the duplicate creation of config in HoodieFlinkStreamer (#6369)
Co-authored-by: linfey <linfey2021@gmail.com>
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 013753b..29f55f7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -69,12 +69,12 @@
TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
+ Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
// Read from kafka source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
+ (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
- Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e5e30c1..4b93fae 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -116,10 +116,6 @@
new Path(cfg.propsFilePath), cfg.configs).getProps();
}
- public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
- return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
- }
-
public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
return new FilebasedSchemaProvider(conf).getSourceSchema();