| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader; |
| |
| import org.apache.seatunnel.api.table.type.SeaTunnelRowType; |
| import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; |
| import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; |
| import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; |
| import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; |
| import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext; |
| import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; |
| import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; |
| import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; |
| import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtils; |
| |
| import org.apache.kafka.connect.data.Struct; |
| import org.apache.kafka.connect.errors.ConnectException; |
| import org.apache.kafka.connect.source.SourceRecord; |
| |
| import io.debezium.DebeziumException; |
| import io.debezium.connector.base.ChangeEventQueue; |
| import io.debezium.connector.postgresql.PostgresConnectorConfig; |
| import io.debezium.connector.postgresql.PostgresErrorHandler; |
| import io.debezium.connector.postgresql.PostgresEventMetadataProvider; |
| import io.debezium.connector.postgresql.PostgresOffsetContext; |
| import io.debezium.connector.postgresql.PostgresSchema; |
| import io.debezium.connector.postgresql.PostgresTaskContext; |
| import io.debezium.connector.postgresql.PostgresTopicSelector; |
| import io.debezium.connector.postgresql.TypeRegistry; |
| import io.debezium.connector.postgresql.connection.PostgresConnection; |
| import io.debezium.connector.postgresql.connection.ReplicationConnection; |
| import io.debezium.connector.postgresql.spi.SlotState; |
| import io.debezium.connector.postgresql.spi.Snapshotter; |
| import io.debezium.data.Envelope; |
| import io.debezium.pipeline.DataChangeEvent; |
| import io.debezium.pipeline.ErrorHandler; |
| import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; |
| import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; |
| import io.debezium.relational.Table; |
| import io.debezium.relational.TableId; |
| import io.debezium.relational.Tables; |
| import io.debezium.relational.history.TableChanges; |
| import io.debezium.schema.TopicSelector; |
| import io.debezium.util.Clock; |
| import io.debezium.util.LoggingContext; |
| import io.debezium.util.Metronome; |
| import lombok.Getter; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.sql.SQLException; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| |
| import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY; |
| import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; |
| import static org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresConnectionUtils.newPostgresValueConverterBuilder; |
| |
| @Slf4j |
| public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { |
| |
| private static final String CONTEXT_NAME = "postgres-cdc-connector-task"; |
| |
| private final PostgresConnection dataConnection; |
| |
| @Getter private ReplicationConnection replicationConnection; |
| |
| private final PostgresEventMetadataProvider metadataProvider; |
| |
| @Getter private Snapshotter snapshotter; |
| private PostgresSchema databaseSchema; |
| private PostgresOffsetContext offsetContext; |
| private TopicSelector<TableId> topicSelector; |
| private JdbcSourceEventDispatcher dispatcher; |
| private ChangeEventQueue<DataChangeEvent> queue; |
| private PostgresErrorHandler errorHandler; |
| |
| @Getter private PostgresTaskContext taskContext; |
| |
| private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics; |
| |
| private PostgresConnection.PostgresValueConverterBuilder postgresValueConverterBuilder; |
| |
| private Collection<TableChanges.TableChange> engineHistory; |
| |
| public PostgresSourceFetchTaskContext( |
| JdbcSourceConfig sourceConfig, |
| JdbcDataSourceDialect dataSourceDialect, |
| PostgresConnection dataConnection, |
| Collection<TableChanges.TableChange> engineHistory) { |
| super(sourceConfig, dataSourceDialect); |
| this.dataConnection = dataConnection; |
| this.metadataProvider = new PostgresEventMetadataProvider(); |
| this.engineHistory = engineHistory; |
| this.postgresValueConverterBuilder = |
| newPostgresValueConverterBuilder( |
| getDbzConnectorConfig(), sourceConfig.getServerTimeZone()); |
| } |
| |
| @Override |
| public void configure(SourceSplitBase sourceSplitBase) { |
| registerDatabaseHistory(sourceSplitBase); |
| |
| // initial stateful objects |
| final PostgresConnectorConfig connectorConfig = getDbzConnectorConfig(); |
| this.snapshotter = connectorConfig.getSnapshotter(); |
| |
| this.topicSelector = PostgresTopicSelector.create(connectorConfig); |
| final TypeRegistry typeRegistry = dataConnection.getTypeRegistry(); |
| |
| this.databaseSchema = |
| new PostgresSchema( |
| connectorConfig, |
| typeRegistry, |
| topicSelector, |
| postgresValueConverterBuilder.build(typeRegistry)); |
| |
| this.taskContext = new PostgresTaskContext(connectorConfig, databaseSchema, topicSelector); |
| try { |
| taskContext.refreshSchema(dataConnection, false); |
| } catch (SQLException e) { |
| throw new DebeziumException("load schema failed", e); |
| } |
| this.offsetContext = |
| loadStartingOffsetState( |
| new PostgresOffsetContext.Loader(connectorConfig), sourceSplitBase); |
| |
| // If in the snapshot read phase and enable exactly-once, the queue needs to be set to a |
| // maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise, |
| // use the configuration queue size. |
| final int queueSize = |
| sourceSplitBase.isSnapshotSplit() && isExactlyOnce() |
| ? Integer.MAX_VALUE |
| : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); |
| |
| LoggingContext.PreviousContext previousContext = |
| taskContext.configureLoggingContext(CONTEXT_NAME); |
| try { |
| // Print out the server information |
| SlotState slotInfo = null; |
| try { |
| if (log.isInfoEnabled()) { |
| log.info(dataConnection.serverInfo().toString()); |
| } |
| slotInfo = |
| dataConnection.getReplicationSlotState( |
| connectorConfig.slotName(), |
| connectorConfig.plugin().getPostgresPluginName()); |
| } catch (SQLException e) { |
| log.warn( |
| "unable to load info of replication slot, Debezium will try to create the slot"); |
| } |
| |
| if (offsetContext == null) { |
| log.info("No previous offset found"); |
| // if we have no initial offset, indicate that to Snapshotter by passing null |
| snapshotter.init(connectorConfig, null, slotInfo); |
| } else { |
| log.info("Found previous offset {}", offsetContext); |
| snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo); |
| } |
| |
| if (snapshotter.shouldStream()) { |
| // we need to create the slot before we start streaming if it doesn't exist |
| // otherwise we can't stream back changes happening while the snapshot is taking |
| // place |
| if (this.replicationConnection == null) { |
| this.replicationConnection = |
| createReplicationConnection( |
| this.taskContext, |
| snapshotter.shouldSnapshot(), |
| connectorConfig.maxRetries(), |
| connectorConfig.retryDelay()); |
| try { |
| // create the slot if it doesn't exist, otherwise update slot to add new |
| // table(job restore and add table) |
| replicationConnection.createReplicationSlot().orElse(null); |
| } catch (SQLException ex) { |
| String message = "Creation of replication slot failed"; |
| if (ex.getMessage().contains("already exists")) { |
| message += |
| "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each."; |
| log.warn(message); |
| } else { |
| throw new DebeziumException(message, ex); |
| } |
| } |
| } |
| } |
| |
| try { |
| dataConnection.commit(); |
| } catch (SQLException e) { |
| throw new DebeziumException(e); |
| } |
| |
| this.queue = |
| new ChangeEventQueue.Builder<DataChangeEvent>() |
| .pollInterval(connectorConfig.getPollInterval()) |
| .maxBatchSize(connectorConfig.getMaxBatchSize()) |
| .maxQueueSize(queueSize) |
| .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) |
| .loggingContextSupplier( |
| () -> taskContext.configureLoggingContext(CONTEXT_NAME)) |
| // do not buffer any element, we use signal event |
| // .buffering() |
| .build(); |
| |
| this.dispatcher = |
| new JdbcSourceEventDispatcher( |
| connectorConfig, |
| topicSelector, |
| databaseSchema, |
| queue, |
| connectorConfig.getTableFilters().dataCollectionFilter(), |
| DataChangeEvent::new, |
| metadataProvider, |
| schemaNameAdjuster); |
| |
| this.snapshotChangeEventSourceMetrics = |
| new DefaultChangeEventSourceMetricsFactory() |
| .getSnapshotMetrics(taskContext, queue, metadataProvider); |
| |
| this.errorHandler = new PostgresErrorHandler(connectorConfig.getLogicalName(), queue); |
| } finally { |
| previousContext.restore(); |
| } |
| } |
| |
| private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) { |
| List<TableChanges.TableChange> engineHistory = new ArrayList<>(); |
| // TODO: support save table schema |
| if (sourceSplitBase instanceof SnapshotSplit) { |
| SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; |
| engineHistory.add( |
| dataSourceDialect.queryTableSchema(dataConnection, snapshotSplit.getTableId())); |
| } else { |
| IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; |
| for (TableId tableId : incrementalSplit.getTableIds()) { |
| engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId)); |
| } |
| } |
| |
| EmbeddedDatabaseHistory.registerHistory( |
| sourceConfig |
| .getDbzConfiguration() |
| .getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), |
| engineHistory); |
| } |
| |
| @Override |
| public PostgresSourceConfig getSourceConfig() { |
| return (PostgresSourceConfig) sourceConfig; |
| } |
| |
| public PostgresConnection getDataConnection() { |
| return dataConnection; |
| } |
| |
| public SnapshotChangeEventSourceMetrics getSnapshotChangeEventSourceMetrics() { |
| return snapshotChangeEventSourceMetrics; |
| } |
| |
| @Override |
| public PostgresConnectorConfig getDbzConnectorConfig() { |
| return (PostgresConnectorConfig) super.getDbzConnectorConfig(); |
| } |
| |
| @Override |
| public PostgresOffsetContext getOffsetContext() { |
| return offsetContext; |
| } |
| |
| @Override |
| public ErrorHandler getErrorHandler() { |
| return errorHandler; |
| } |
| |
| @Override |
| public PostgresSchema getDatabaseSchema() { |
| return databaseSchema; |
| } |
| |
| @Override |
| public TableId getTableId(SourceRecord record) { |
| Struct value = (Struct) record.value(); |
| Struct source = value.getStruct(Envelope.FieldName.SOURCE); |
| String schemaName = source.getString(SCHEMA_NAME_KEY); |
| String tableName = source.getString(TABLE_NAME_KEY); |
| return new TableId(null, schemaName, tableName); |
| } |
| |
| @Override |
| public SeaTunnelRowType getSplitType(Table table) { |
| return PostgresUtils.getSplitType(table); |
| } |
| |
| @Override |
| public JdbcSourceEventDispatcher getDispatcher() { |
| return dispatcher; |
| } |
| |
| @Override |
| public ChangeEventQueue<DataChangeEvent> getQueue() { |
| return queue; |
| } |
| |
| @Override |
| public Tables.TableFilter getTableFilter() { |
| return getDbzConnectorConfig().getTableFilters().dataCollectionFilter(); |
| } |
| |
| @Override |
| public Offset getStreamOffset(SourceRecord sourceRecord) { |
| return PostgresUtils.getLsnPosition(sourceRecord); |
| } |
| |
| @Override |
| public void close() { |
| try { |
| this.dataConnection.close(); |
| if (this.replicationConnection != null) { |
| this.replicationConnection.close(); |
| } |
| } catch (Exception e) { |
| log.warn("Failed to close connection", e); |
| } |
| } |
| |
| /** Loads the connector's persistent offset (if present) via the given loader. */ |
| private PostgresOffsetContext loadStartingOffsetState( |
| PostgresOffsetContext.Loader loader, SourceSplitBase split) { |
| Offset offset = |
| split.isSnapshotSplit() |
| ? LsnOffset.INITIAL_OFFSET |
| : split.asIncrementalSplit().getStartupOffset(); |
| return loader.load(offset.getOffset()); |
| } |
| |
| public ReplicationConnection createReplicationConnection( |
| PostgresTaskContext taskContext, |
| boolean doSnapshot, |
| int maxRetries, |
| Duration retryDelay) |
| throws ConnectException { |
| final Metronome metronome = Metronome.parker(retryDelay, Clock.SYSTEM); |
| short retryCount = 0; |
| ReplicationConnection replicationConnection = null; |
| while (retryCount <= maxRetries) { |
| try { |
| return taskContext.createReplicationConnection(doSnapshot); |
| } catch (SQLException ex) { |
| retryCount++; |
| if (retryCount > maxRetries) { |
| log.error( |
| "Too many errors connecting to server. All {} retries failed.", |
| maxRetries); |
| throw new ConnectException(ex); |
| } |
| |
| log.warn( |
| "Error connecting to server; will attempt retry {} of {} after {} " |
| + "seconds. Exception message: {}", |
| retryCount, |
| maxRetries, |
| retryDelay.getSeconds(), |
| ex.getMessage()); |
| try { |
| metronome.pause(); |
| } catch (InterruptedException e) { |
| log.warn("Connection retry sleep interrupted by exception: " + e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| return replicationConnection; |
| } |
| } |