[Hotfix][Postgres-CDC/OpenGauss-CDC] Fix read data missing when restore (#6785)

1. The job was not added new tables into `publication` before start to snapshot read.
2. Delete of database logs did not use checkpoint offset.
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
index 689490a..58f59f4 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.connectors.cdc.base.dialect;
 
-import org.apache.seatunnel.api.state.CheckpointListener;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 
@@ -33,8 +33,7 @@
  *
  * @param <C> The source config of data source.
  */
-public interface DataSourceDialect<C extends SourceConfig>
-        extends Serializable, CheckpointListener {
+public interface DataSourceDialect<C extends SourceConfig> extends Serializable {
 
     /** Get the name of dialect. */
     String getName();
@@ -57,9 +56,6 @@
     /**
      * We have an empty default implementation here because most dialects do not have to implement
      * the method.
-     *
-     * @see CheckpointListener#notifyCheckpointComplete(long)
      */
-    @Override
-    default void notifyCheckpointComplete(long checkpointId) throws Exception {}
+    default void commitChangeLogOffset(Offset offset) throws Exception {}
 }
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 829f687..b5fc443 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -25,6 +25,7 @@
 import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
 import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
 import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
@@ -72,6 +73,8 @@
 
     private final DataSourceDialect<C> dataSourceDialect;
 
+    private transient volatile Offset snapshotChangeLogOffset;
+
     private final AtomicBoolean needSendSplitRequest = new AtomicBoolean(false);
 
     public IncrementalSourceReader(
@@ -113,7 +116,7 @@
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        dataSourceDialect.notifyCheckpointComplete(checkpointId);
+        dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset);
     }
 
     @Override
@@ -238,7 +241,9 @@
         unfinishedSplits.addAll(finishedUnackedSplits.values());
 
         if (isIncrementalSplitPhase(unfinishedSplits)) {
-            return snapshotCheckpointDataType(unfinishedSplits);
+            IncrementalSplit incrementalSplit = unfinishedSplits.get(0).asIncrementalSplit();
+            snapshotChangeLogOffset = incrementalSplit.getStartupOffset();
+            return snapshotCheckpointDataType(incrementalSplit);
         }
 
         return unfinishedSplits;
@@ -253,12 +258,7 @@
         return stateSplits.size() == 1 && stateSplits.get(0).isIncrementalSplit();
     }
 
-    private List<SourceSplitBase> snapshotCheckpointDataType(List<SourceSplitBase> stateSplits) {
-        if (!isIncrementalSplitPhase(stateSplits)) {
-            throw new IllegalStateException(
-                    "The splits should be incremental split when snapshot  checkpoint datatype");
-        }
-        IncrementalSplit incrementalSplit = stateSplits.get(0).asIncrementalSplit();
+    private List<SourceSplitBase> snapshotCheckpointDataType(IncrementalSplit incrementalSplit) {
         // Snapshot current datatype to checkpoint
         SeaTunnelDataType<T> checkpointDataType = debeziumDeserializationSchema.getProducedType();
         IncrementalSplit newIncrementalSplit =
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
index aa39c9f..cf6b624 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
@@ -25,6 +25,7 @@
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
@@ -33,6 +34,7 @@
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.enumerator.PostgresChunkSplitter;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask;
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask;
@@ -161,9 +163,9 @@
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    public void commitChangeLogOffset(Offset offset) throws Exception {
         if (postgresWalFetchTask != null) {
-            postgresWalFetchTask.commitCurrentOffset();
+            postgresWalFetchTask.commitCurrentOffset((LsnOffset) offset);
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
index d5d725e..3cb614c 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
@@ -47,7 +47,6 @@
 import io.debezium.connector.postgresql.TypeRegistry;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import io.debezium.connector.postgresql.connection.ReplicationConnection;
-import io.debezium.connector.postgresql.spi.SlotCreationResult;
 import io.debezium.connector.postgresql.spi.SlotState;
 import io.debezium.connector.postgresql.spi.Snapshotter;
 import io.debezium.data.Envelope;
@@ -180,28 +179,31 @@
                 snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
             }
 
-            SlotCreationResult slotCreatedInfo = null;
             if (snapshotter.shouldStream()) {
-                final boolean doSnapshot = snapshotter.shouldSnapshot();
-                createReplicationConnection(
-                        doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
                 // we need to create the slot before we start streaming if it doesn't exist
                 // otherwise we can't stream back changes happening while the snapshot is taking
                 // place
-                if (slotInfo == null) {
+                if (this.replicationConnection == null) {
+                    this.replicationConnection =
+                            createReplicationConnection(
+                                    this.taskContext,
+                                    snapshotter.shouldSnapshot(),
+                                    connectorConfig.maxRetries(),
+                                    connectorConfig.retryDelay());
                     try {
-                        slotCreatedInfo =
-                                replicationConnection.createReplicationSlot().orElse(null);
+                        // create the slot if it doesn't exist, otherwise update slot to add new
+                        // table(job restore and add table)
+                        replicationConnection.createReplicationSlot().orElse(null);
                     } catch (SQLException ex) {
                         String message = "Creation of replication slot failed";
                         if (ex.getMessage().contains("already exists")) {
                             message +=
                                     "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
+                            log.warn(message);
+                        } else {
+                            throw new DebeziumException(message, ex);
                         }
-                        throw new DebeziumException(message, ex);
                     }
-                } else {
-                    slotCreatedInfo = null;
                 }
             }
 
@@ -265,20 +267,6 @@
                 engineHistory);
     }
 
-    public void createReplicationConnection(
-            boolean doSnapshot, int maxRetries, Duration retryDelay) {
-        if (this.replicationConnection != null) {
-            return;
-        }
-        synchronized (this) {
-            if (this.replicationConnection == null) {
-                this.replicationConnection =
-                        createReplicationConnection(
-                                this.taskContext, doSnapshot, maxRetries, retryDelay);
-            }
-        }
-    }
-
     @Override
     public PostgresSourceConfig getSourceConfig() {
         return (PostgresSourceConfig) sourceConfig;
@@ -350,7 +338,9 @@
     public void close() {
         try {
             this.dataConnection.close();
-            this.replicationConnection.close();
+            if (this.replicationConnection != null) {
+                this.replicationConnection.close();
+            }
         } catch (Exception e) {
             log.warn("Failed to close connection", e);
         }
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java
index e8a7312..7fd9906 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java
@@ -20,6 +20,7 @@
 import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
 import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
 
 import io.debezium.connector.postgresql.PostgresOffsetContext;
@@ -73,12 +74,11 @@
         streamingChangeEventSource.execute(changeEventSourceContext, offsetContext);
     }
 
-    public void commitCurrentOffset() {
-        if (streamingChangeEventSource != null && offsetContext != null) {
+    public void commitCurrentOffset(LsnOffset offset) {
+        if (streamingChangeEventSource != null && offset != null) {
 
             // only extracting and storing the lsn of the last commit
-            Long commitLsn =
-                    (Long) offsetContext.getOffset().get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+            Long commitLsn = offset.getLsn().asLong();
             if (commitLsn != null
                     && (lastCommitLsn == null
                             || Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(lastCommitLsn)) > 0)) {