| commit | 542bf26cdd120e8fbcdb38c827a80f20e5eec2ea | [log] [tgz] |
|---|---|---|
| author | cygnusdark <53288632+cygnusdark@users.noreply.github.com> | Tue Jun 27 15:02:49 2023 +0800 |
| committer | GitHub <noreply@github.com> | Tue Jun 27 15:02:49 2023 +0800 |
| tree | c6f7118ec8be169f07296e738d28b8d48cfa19eb | |
| parent | 1adebb3128ebbb90629aeb2e849d3d74cc94d573 [diff] |
Fix issue 149 by appointing a unique uid to each sink operator. (#150)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 84d5b57..1699d52 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -117,7 +117,7 @@ DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag); int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism()); - sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table); + sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table); } }