[Hotfix][Postgres-CDC/OpenGauss-CDC] Fix read data missing when restore (#6785)
1. The job was not added new tables into `publication` before start to snapshot read.
2. Delete of database logs did not use checkpoint offset.
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
index 689490a..58f59f4 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.connectors.cdc.base.dialect;
-import org.apache.seatunnel.api.state.CheckpointListener;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -33,8 +33,7 @@
*
* @param <C> The source config of data source.
*/
-public interface DataSourceDialect<C extends SourceConfig>
- extends Serializable, CheckpointListener {
+public interface DataSourceDialect<C extends SourceConfig> extends Serializable {
/** Get the name of dialect. */
String getName();
@@ -57,9 +56,6 @@
/**
* We have an empty default implementation here because most dialects do not have to implement
* the method.
- *
- * @see CheckpointListener#notifyCheckpointComplete(long)
*/
- @Override
- default void notifyCheckpointComplete(long checkpointId) throws Exception {}
+ default void commitChangeLogOffset(Offset offset) throws Exception {}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 829f687..b5fc443 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
@@ -72,6 +73,8 @@
private final DataSourceDialect<C> dataSourceDialect;
+ private transient volatile Offset snapshotChangeLogOffset;
+
private final AtomicBoolean needSendSplitRequest = new AtomicBoolean(false);
public IncrementalSourceReader(
@@ -113,7 +116,7 @@
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- dataSourceDialect.notifyCheckpointComplete(checkpointId);
+ dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset);
}
@Override
@@ -238,7 +241,9 @@
unfinishedSplits.addAll(finishedUnackedSplits.values());
if (isIncrementalSplitPhase(unfinishedSplits)) {
- return snapshotCheckpointDataType(unfinishedSplits);
+ IncrementalSplit incrementalSplit = unfinishedSplits.get(0).asIncrementalSplit();
+ snapshotChangeLogOffset = incrementalSplit.getStartupOffset();
+ return snapshotCheckpointDataType(incrementalSplit);
}
return unfinishedSplits;
@@ -253,12 +258,7 @@
return stateSplits.size() == 1 && stateSplits.get(0).isIncrementalSplit();
}
- private List<SourceSplitBase> snapshotCheckpointDataType(List<SourceSplitBase> stateSplits) {
- if (!isIncrementalSplitPhase(stateSplits)) {
- throw new IllegalStateException(
- "The splits should be incremental split when snapshot checkpoint datatype");
- }
- IncrementalSplit incrementalSplit = stateSplits.get(0).asIncrementalSplit();
+ private List<SourceSplitBase> snapshotCheckpointDataType(IncrementalSplit incrementalSplit) {
// Snapshot current datatype to checkpoint
SeaTunnelDataType<T> checkpointDataType = debeziumDeserializationSchema.getProducedType();
IncrementalSplit newIncrementalSplit =
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
index aa39c9f..cf6b624 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
@@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
@@ -33,6 +34,7 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.enumerator.PostgresChunkSplitter;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask;
@@ -161,9 +163,9 @@
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ public void commitChangeLogOffset(Offset offset) throws Exception {
if (postgresWalFetchTask != null) {
- postgresWalFetchTask.commitCurrentOffset();
+ postgresWalFetchTask.commitCurrentOffset((LsnOffset) offset);
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
index d5d725e..3cb614c 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
@@ -47,7 +47,6 @@
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
-import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
@@ -180,28 +179,31 @@
snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
}
- SlotCreationResult slotCreatedInfo = null;
if (snapshotter.shouldStream()) {
- final boolean doSnapshot = snapshotter.shouldSnapshot();
- createReplicationConnection(
- doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking
// place
- if (slotInfo == null) {
+ if (this.replicationConnection == null) {
+ this.replicationConnection =
+ createReplicationConnection(
+ this.taskContext,
+ snapshotter.shouldSnapshot(),
+ connectorConfig.maxRetries(),
+ connectorConfig.retryDelay());
try {
- slotCreatedInfo =
- replicationConnection.createReplicationSlot().orElse(null);
+ // create the slot if it doesn't exist, otherwise update slot to add new
+ // table(job restore and add table)
+ replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
+ log.warn(message);
+ } else {
+ throw new DebeziumException(message, ex);
}
- throw new DebeziumException(message, ex);
}
- } else {
- slotCreatedInfo = null;
}
}
@@ -265,20 +267,6 @@
engineHistory);
}
- public void createReplicationConnection(
- boolean doSnapshot, int maxRetries, Duration retryDelay) {
- if (this.replicationConnection != null) {
- return;
- }
- synchronized (this) {
- if (this.replicationConnection == null) {
- this.replicationConnection =
- createReplicationConnection(
- this.taskContext, doSnapshot, maxRetries, retryDelay);
- }
- }
- }
-
@Override
public PostgresSourceConfig getSourceConfig() {
return (PostgresSourceConfig) sourceConfig;
@@ -350,7 +338,9 @@
public void close() {
try {
this.dataConnection.close();
- this.replicationConnection.close();
+ if (this.replicationConnection != null) {
+ this.replicationConnection.close();
+ }
} catch (Exception e) {
log.warn("Failed to close connection", e);
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java
index e8a7312..7fd9906 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/wal/PostgresWalFetchTask.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
import io.debezium.connector.postgresql.PostgresOffsetContext;
@@ -73,12 +74,11 @@
streamingChangeEventSource.execute(changeEventSourceContext, offsetContext);
}
- public void commitCurrentOffset() {
- if (streamingChangeEventSource != null && offsetContext != null) {
+ public void commitCurrentOffset(LsnOffset offset) {
+ if (streamingChangeEventSource != null && offset != null) {
// only extracting and storing the lsn of the last commit
- Long commitLsn =
- (Long) offsetContext.getOffset().get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+ Long commitLsn = offset.getLsn().asLong();
if (commitLsn != null
&& (lastCommitLsn == null
|| Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(lastCommitLsn)) > 0)) {