[Hotfix][CDC] Fix thread-unsafe collection container in cdc enumerator (#5614)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index ce2373b..a1e6729 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -32,12 +32,17 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
@@ -47,12 +52,12 @@
private final C sourceConfig;
private final List<TableId> alreadyProcessedTables;
- private final List<SnapshotSplit> remainingSplits;
+ private final Queue<SnapshotSplit> remainingSplits;
private final Map<String, SnapshotSplit> assignedSplits;
private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
private boolean assignerCompleted;
private final int currentParallelism;
- private final LinkedList<TableId> remainingTables;
+ private final Deque<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private ChunkSplitter chunkSplitter;
@@ -115,12 +120,12 @@
this.context = context;
this.sourceConfig = context.getSourceConfig();
this.currentParallelism = currentParallelism;
- this.alreadyProcessedTables = alreadyProcessedTables;
- this.remainingSplits = remainingSplits;
- this.assignedSplits = assignedSplits;
- this.splitCompletedOffsets = splitCompletedOffsets;
+ this.alreadyProcessedTables = Collections.synchronizedList(alreadyProcessedTables);
+ this.remainingSplits = new ConcurrentLinkedQueue(remainingSplits);
+ this.assignedSplits = new ConcurrentHashMap<>(assignedSplits);
+ this.splitCompletedOffsets = new ConcurrentHashMap<>(splitCompletedOffsets);
this.assignerCompleted = assignerCompleted;
- this.remainingTables = new LinkedList<>(remainingTables);
+ this.remainingTables = new ConcurrentLinkedDeque<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect;
@@ -211,11 +216,15 @@
SnapshotPhaseState state =
new SnapshotPhaseState(
alreadyProcessedTables,
- remainingSplits,
+ remainingSplits.isEmpty()
+ ? Collections.emptyList()
+ : new ArrayList<>(remainingSplits),
assignedSplits,
splitCompletedOffsets,
assignerCompleted,
- remainingTables,
+ remainingTables.isEmpty()
+ ? Collections.emptyList()
+ : new ArrayList<>(remainingTables),
isTableIdCaseSensitive,
true);
// we need a complete checkpoint before mark this assigner to be completed, to wait for all