| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source; |
| |
| import org.apache.seatunnel.api.table.catalog.CatalogTable; |
| import org.apache.seatunnel.api.table.catalog.ConstraintKey; |
| import org.apache.seatunnel.api.table.catalog.PrimaryKey; |
| import org.apache.seatunnel.common.utils.SeaTunnelException; |
| import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; |
| import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; |
| import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory; |
| import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter; |
| import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; |
| import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask; |
| import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; |
| import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; |
| import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; |
| import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.enumerator.PostgresChunkSplitter; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresSchema; |
| import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.TableDiscoveryUtils; |
| import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; |
| |
| import io.debezium.connector.postgresql.PostgresConnectorConfig; |
| import io.debezium.connector.postgresql.connection.PostgresConnection; |
| import io.debezium.jdbc.JdbcConnection; |
| import io.debezium.relational.RelationalDatabaseConnectorConfig; |
| import io.debezium.relational.TableId; |
| import io.debezium.relational.history.TableChanges; |
| |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| |
| import static org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresConnectionUtils.newPostgresValueConverterBuilder; |
| |
| public class PostgresDialect implements JdbcDataSourceDialect { |
| |
| private static final long serialVersionUID = 1L; |
| private final PostgresSourceConfig sourceConfig; |
| |
| private transient PostgresSchema postgresSchema; |
| private PostgresWalFetchTask postgresWalFetchTask; |
| |
| private final Map<TableId, CatalogTable> tableMap; |
| |
| public PostgresDialect( |
| PostgresSourceConfigFactory configFactory, List<CatalogTable> catalogTables) { |
| this.sourceConfig = configFactory.create(0); |
| this.tableMap = CatalogTableUtils.convertTables(catalogTables); |
| } |
| |
| @Override |
| public String getName() { |
| return DatabaseIdentifier.POSTGRESQL; |
| } |
| |
| @Override |
| public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { |
| // todo: need to check the case sensitive of the database |
| return true; |
| } |
| |
| @Override |
| public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { |
| PostgresConnectorConfig conf = |
| (PostgresConnectorConfig) sourceConfig.getDbzConnectorConfig(); |
| return new PostgresConnection( |
| conf.getJdbcConfig(), |
| newPostgresValueConverterBuilder(conf, sourceConfig.getServerTimeZone())); |
| } |
| |
| @Override |
| public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) { |
| return new PostgresChunkSplitter(sourceConfig, this); |
| } |
| |
| @Override |
| public JdbcConnectionPoolFactory getPooledDataSourceFactory() { |
| return new PostgresPooledDataSourceFactory(); |
| } |
| |
| @Override |
| public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) { |
| PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) sourceConfig; |
| try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) { |
| return TableDiscoveryUtils.listTables( |
| jdbcConnection, postgresSourceConfig.getTableFilters()); |
| } catch (SQLException e) { |
| throw new SeaTunnelException("Error to discover tables: " + e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { |
| if (postgresSchema == null) { |
| postgresSchema = new PostgresSchema(sourceConfig.getDbzConnectorConfig(), tableMap); |
| } |
| return postgresSchema.getTableSchema(jdbc, tableId); |
| } |
| |
| @Override |
| public PostgresSourceFetchTaskContext createFetchTaskContext( |
| SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { |
| |
| RelationalDatabaseConnectorConfig dbzConnectorConfig = |
| taskSourceConfig.getDbzConnectorConfig(); |
| |
| PostgresConnection jdbcConnection = |
| new PostgresConnection( |
| dbzConnectorConfig.getJdbcConfig(), |
| newPostgresValueConverterBuilder( |
| (PostgresConnectorConfig) dbzConnectorConfig, |
| taskSourceConfig.getServerTimeZone())); |
| |
| List<TableChanges.TableChange> tableChangeList = new ArrayList<>(); |
| // TODO: support save table schema |
| if (sourceSplitBase instanceof SnapshotSplit) { |
| SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; |
| tableChangeList.add(queryTableSchema(jdbcConnection, snapshotSplit.getTableId())); |
| } else { |
| IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; |
| for (TableId tableId : incrementalSplit.getTableIds()) { |
| tableChangeList.add(queryTableSchema(jdbcConnection, tableId)); |
| } |
| } |
| |
| return new PostgresSourceFetchTaskContext( |
| taskSourceConfig, this, jdbcConnection, tableChangeList); |
| } |
| |
| @Override |
| public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) { |
| if (sourceSplitBase.isSnapshotSplit()) { |
| return new PostgresSnapshotFetchTask(sourceSplitBase.asSnapshotSplit()); |
| } else { |
| postgresWalFetchTask = new PostgresWalFetchTask(sourceSplitBase.asIncrementalSplit()); |
| return postgresWalFetchTask; |
| } |
| } |
| |
| @Override |
| public void commitChangeLogOffset(Offset offset) throws Exception { |
| if (postgresWalFetchTask != null) { |
| postgresWalFetchTask.commitCurrentOffset((LsnOffset) offset); |
| } |
| } |
| |
| @Override |
| public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) { |
| return Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey()); |
| } |
| |
| @Override |
| public List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) { |
| return tableMap.get(tableId).getTableSchema().getConstraintKeys(); |
| } |
| } |