blob: 3cb614c0f3e1274d2d6abdd2ed1d00db12965b35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresErrorHandler;
import io.debezium.connector.postgresql.PostgresEventMetadataProvider;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresConnectionUtils.newPostgresValueConverterBuilder;
@Slf4j
public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private static final String CONTEXT_NAME = "postgres-cdc-connector-task";
private final PostgresConnection dataConnection;
@Getter private ReplicationConnection replicationConnection;
private final PostgresEventMetadataProvider metadataProvider;
@Getter private Snapshotter snapshotter;
private PostgresSchema databaseSchema;
private PostgresOffsetContext offsetContext;
private TopicSelector<TableId> topicSelector;
private JdbcSourceEventDispatcher dispatcher;
private ChangeEventQueue<DataChangeEvent> queue;
private PostgresErrorHandler errorHandler;
@Getter private PostgresTaskContext taskContext;
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
private PostgresConnection.PostgresValueConverterBuilder postgresValueConverterBuilder;
private Collection<TableChanges.TableChange> engineHistory;
public PostgresSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
PostgresConnection dataConnection,
Collection<TableChanges.TableChange> engineHistory) {
super(sourceConfig, dataSourceDialect);
this.dataConnection = dataConnection;
this.metadataProvider = new PostgresEventMetadataProvider();
this.engineHistory = engineHistory;
this.postgresValueConverterBuilder =
newPostgresValueConverterBuilder(
getDbzConnectorConfig(), sourceConfig.getServerTimeZone());
}
@Override
public void configure(SourceSplitBase sourceSplitBase) {
registerDatabaseHistory(sourceSplitBase);
// initial stateful objects
final PostgresConnectorConfig connectorConfig = getDbzConnectorConfig();
this.snapshotter = connectorConfig.getSnapshotter();
this.topicSelector = PostgresTopicSelector.create(connectorConfig);
final TypeRegistry typeRegistry = dataConnection.getTypeRegistry();
this.databaseSchema =
new PostgresSchema(
connectorConfig,
typeRegistry,
topicSelector,
postgresValueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, databaseSchema, topicSelector);
try {
taskContext.refreshSchema(dataConnection, false);
} catch (SQLException e) {
throw new DebeziumException("load schema failed", e);
}
this.offsetContext =
loadStartingOffsetState(
new PostgresOffsetContext.Loader(connectorConfig), sourceSplitBase);
// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
LoggingContext.PreviousContext previousContext =
taskContext.configureLoggingContext(CONTEXT_NAME);
try {
// Print out the server information
SlotState slotInfo = null;
try {
if (log.isInfoEnabled()) {
log.info(dataConnection.serverInfo().toString());
}
slotInfo =
dataConnection.getReplicationSlotState(
connectorConfig.slotName(),
connectorConfig.plugin().getPostgresPluginName());
} catch (SQLException e) {
log.warn(
"unable to load info of replication slot, Debezium will try to create the slot");
}
if (offsetContext == null) {
log.info("No previous offset found");
// if we have no initial offset, indicate that to Snapshotter by passing null
snapshotter.init(connectorConfig, null, slotInfo);
} else {
log.info("Found previous offset {}", offsetContext);
snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
}
if (snapshotter.shouldStream()) {
// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking
// place
if (this.replicationConnection == null) {
this.replicationConnection =
createReplicationConnection(
this.taskContext,
snapshotter.shouldSnapshot(),
connectorConfig.maxRetries(),
connectorConfig.retryDelay());
try {
// create the slot if it doesn't exist, otherwise update slot to add new
// table(job restore and add table)
replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
}
}
}
try {
dataConnection.commit();
} catch (SQLException e) {
throw new DebeziumException(e);
}
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(queueSize)
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(
() -> taskContext.configureLoggingContext(CONTEXT_NAME))
// do not buffer any element, we use signal event
// .buffering()
.build();
this.dispatcher =
new JdbcSourceEventDispatcher(
connectorConfig,
topicSelector,
databaseSchema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);
this.snapshotChangeEventSourceMetrics =
new DefaultChangeEventSourceMetricsFactory()
.getSnapshotMetrics(taskContext, queue, metadataProvider);
this.errorHandler = new PostgresErrorHandler(connectorConfig.getLogicalName(), queue);
} finally {
previousContext.restore();
}
}
private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
List<TableChanges.TableChange> engineHistory = new ArrayList<>();
// TODO: support save table schema
if (sourceSplitBase instanceof SnapshotSplit) {
SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
engineHistory.add(
dataSourceDialect.queryTableSchema(dataConnection, snapshotSplit.getTableId()));
} else {
IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase;
for (TableId tableId : incrementalSplit.getTableIds()) {
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
}
}
EmbeddedDatabaseHistory.registerHistory(
sourceConfig
.getDbzConfiguration()
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
engineHistory);
}
@Override
public PostgresSourceConfig getSourceConfig() {
return (PostgresSourceConfig) sourceConfig;
}
public PostgresConnection getDataConnection() {
return dataConnection;
}
public SnapshotChangeEventSourceMetrics getSnapshotChangeEventSourceMetrics() {
return snapshotChangeEventSourceMetrics;
}
@Override
public PostgresConnectorConfig getDbzConnectorConfig() {
return (PostgresConnectorConfig) super.getDbzConnectorConfig();
}
@Override
public PostgresOffsetContext getOffsetContext() {
return offsetContext;
}
@Override
public ErrorHandler getErrorHandler() {
return errorHandler;
}
@Override
public PostgresSchema getDatabaseSchema() {
return databaseSchema;
}
@Override
public TableId getTableId(SourceRecord record) {
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String schemaName = source.getString(SCHEMA_NAME_KEY);
String tableName = source.getString(TABLE_NAME_KEY);
return new TableId(null, schemaName, tableName);
}
@Override
public SeaTunnelRowType getSplitType(Table table) {
return PostgresUtils.getSplitType(table);
}
@Override
public JdbcSourceEventDispatcher getDispatcher() {
return dispatcher;
}
@Override
public ChangeEventQueue<DataChangeEvent> getQueue() {
return queue;
}
@Override
public Tables.TableFilter getTableFilter() {
return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
}
@Override
public Offset getStreamOffset(SourceRecord sourceRecord) {
return PostgresUtils.getLsnPosition(sourceRecord);
}
@Override
public void close() {
try {
this.dataConnection.close();
if (this.replicationConnection != null) {
this.replicationConnection.close();
}
} catch (Exception e) {
log.warn("Failed to close connection", e);
}
}
/** Loads the connector's persistent offset (if present) via the given loader. */
private PostgresOffsetContext loadStartingOffsetState(
PostgresOffsetContext.Loader loader, SourceSplitBase split) {
Offset offset =
split.isSnapshotSplit()
? LsnOffset.INITIAL_OFFSET
: split.asIncrementalSplit().getStartupOffset();
return loader.load(offset.getOffset());
}
public ReplicationConnection createReplicationConnection(
PostgresTaskContext taskContext,
boolean doSnapshot,
int maxRetries,
Duration retryDelay)
throws ConnectException {
final Metronome metronome = Metronome.parker(retryDelay, Clock.SYSTEM);
short retryCount = 0;
ReplicationConnection replicationConnection = null;
while (retryCount <= maxRetries) {
try {
return taskContext.createReplicationConnection(doSnapshot);
} catch (SQLException ex) {
retryCount++;
if (retryCount > maxRetries) {
log.error(
"Too many errors connecting to server. All {} retries failed.",
maxRetries);
throw new ConnectException(ex);
}
log.warn(
"Error connecting to server; will attempt retry {} of {} after {} "
+ "seconds. Exception message: {}",
retryCount,
maxRetries,
retryDelay.getSeconds(),
ex.getMessage());
try {
metronome.pause();
} catch (InterruptedException e) {
log.warn("Connection retry sleep interrupted by exception: " + e);
Thread.currentThread().interrupt();
}
}
}
return replicationConnection;
}
}