[Hotfix][CDC] Fix thread-unsafe collection container in cdc enumerator (#5614)

diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index ce2373b..a1e6729 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -32,12 +32,17 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /** Assigner for snapshot split. */
 public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
@@ -47,12 +52,12 @@
 
     private final C sourceConfig;
     private final List<TableId> alreadyProcessedTables;
-    private final List<SnapshotSplit> remainingSplits;
+    private final Queue<SnapshotSplit> remainingSplits;
     private final Map<String, SnapshotSplit> assignedSplits;
     private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
     private boolean assignerCompleted;
     private final int currentParallelism;
-    private final LinkedList<TableId> remainingTables;
+    private final Deque<TableId> remainingTables;
     private final boolean isRemainingTablesCheckpointed;
 
     private ChunkSplitter chunkSplitter;
@@ -115,12 +120,12 @@
         this.context = context;
         this.sourceConfig = context.getSourceConfig();
         this.currentParallelism = currentParallelism;
-        this.alreadyProcessedTables = alreadyProcessedTables;
-        this.remainingSplits = remainingSplits;
-        this.assignedSplits = assignedSplits;
-        this.splitCompletedOffsets = splitCompletedOffsets;
+        this.alreadyProcessedTables = Collections.synchronizedList(alreadyProcessedTables);
+        this.remainingSplits = new ConcurrentLinkedQueue(remainingSplits);
+        this.assignedSplits = new ConcurrentHashMap<>(assignedSplits);
+        this.splitCompletedOffsets = new ConcurrentHashMap<>(splitCompletedOffsets);
         this.assignerCompleted = assignerCompleted;
-        this.remainingTables = new LinkedList<>(remainingTables);
+        this.remainingTables = new ConcurrentLinkedDeque<>(remainingTables);
         this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
         this.isTableIdCaseSensitive = isTableIdCaseSensitive;
         this.dialect = dialect;
@@ -211,11 +216,15 @@
         SnapshotPhaseState state =
                 new SnapshotPhaseState(
                         alreadyProcessedTables,
-                        remainingSplits,
+                        remainingSplits.isEmpty()
+                                ? Collections.emptyList()
+                                : new ArrayList<>(remainingSplits),
                         assignedSplits,
                         splitCompletedOffsets,
                         assignerCompleted,
-                        remainingTables,
+                        remainingTables.isEmpty()
+                                ? Collections.emptyList()
+                                : new ArrayList<>(remainingTables),
                         isTableIdCaseSensitive,
                         true);
         // we need a complete checkpoint before mark this assigner to be completed, to wait for all