| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| = Ignite to PostgreSQL replication |
| |
| == CDC replication to PostgreSql |
| |
| `IgniteToPostgreSqlCdcConsumer` is a CDC consumer that asynchronously replicates data from Apache Ignite to PostgreSQL. |
| It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data changes (`insert`, `update`, `delete`) in specified caches and apply them to PostgreSQL. |
| |
| == Key Features |
| |
| - Per-cache replication (only selected caches are replicated) |
| - `onlyPrimary` support (replicates only from primary nodes) |
| - Auto table creation in PostgreSQL if needed (`createTables=true`) |
| - Batch replication (`batchSize`) |
| - User-defined `DataSource` — user configures reliability and transactional guarantees |
| |
| == Configuration |
| |
| Spring XML configuration example (`ignite-to-postgres.xml`): |
| |
| [source,xml] |
| ---- |
| <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration"> |
| <property name="consumer"> |
| <bean class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer"> |
| <property name="caches"> |
| <list> |
| <value>T1</value> |
| <value>T2</value> |
| </list> |
| </property> |
| <property name="batchSize" value="1024" /> |
| <property name="onlyPrimary" value="true" /> |
| <property name="createTables" value="true" /> |
| <property name="dataSource" ref="dataSource" /> |
| </bean> |
| </property> |
| </bean> |
| ---- |
| |
| === Configuration Options |
| |
| The following settings can be used to configure the behavior of `IgniteToPostgreSqlCdcConsumer`: |
| |
| [cols="1,2,1", options="header"] |
| |=== |
| | Setting | Description | Default |
| | `dataSource` | JDBC `DataSource` used to connect to the target PostgreSQL database. Must be provided by the user. | _Required_ |
| | `caches` | Set of Ignite cache names to replicate. Must be provided by the user. | _Required_ |
| | `onlyPrimary` | If `true`, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters. | `true` |
| | `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are commited in a single `executeBatch()` call. | `1024` |
| | `createTables`| If `true`, missing target tables in PostgreSQL will be created automatically during startup.| `false` |
| |=== |
| |
| We use `PreparedStatement` for batching with `autoCommit` set to `false`, committing manually after each batch execution. |
| |
| |
| [WARNING] |
| ==== |
| Choosing the `dataSource` is the user's responsibility. Consider: |
| |
| - Required delivery guarantees (e.g., retry logic) |
| - High-availability PostgreSQL setup (replicas, failover, etc.) |
| ==== |
| |
| == Example `dataSource` |
| |
| [source,xml] |
| ---- |
| <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> |
| <property name="driverClassName" value="org.postgresql.Driver"/> |
| <property name="url" value="jdbc:postgresql://localhost:5432/ignite_replica"/> |
| <property name="username" value="ignite_user"/> |
| <property name="password" value="secret"/> |
| <property name="initialSize" value="3"/> |
| <property name="maxTotal" value="10"/> |
| <property name="validationQuery" value="SELECT 1"/> |
| <property name="testOnBorrow" value="true"/> |
| </bean> |
| ---- |
| |
| == Schema Conversion |
| |
| Table schema in PostgreSQL is generated from the `QueryEntity` configured in Ignite cache. |
| Only **one QueryEntity is supported per cache** and is used to generate DDL and DML operations. |
| |
| Schema creation occurs once on the first `CdcCacheEvent` if `createTables=true`. |
| |
| === Example: Schema from Ignite to PostgreSQL |
| |
| [source,java] |
| ---- |
| class TestVal { |
| private final String name; |
| private final int val; |
| } |
| |
| QueryEntity qryEntity = new QueryEntity() |
| .setTableName("test_table") |
| .setKeyFieldName("id") |
| .setValueType("demo.TestVal") |
| .addQueryField("id", Integer.class.getName(), null) |
| .addQueryField("name", String.class.getName(), null) |
| .addQueryField("val", Integer.class.getName(), null); |
| |
| ignite.getOrCreateCache(new CacheConfiguration<Integer, TestVal>("test_table") |
| .setQueryEntities(List.of(qryEntity))); |
| ---- |
| |
| → PostgreSQL: |
| |
| [source,sql] |
| ---- |
| CREATE TABLE test_table ( |
| id INT PRIMARY KEY, |
| name VARCHAR, |
| val INT |
| ); |
| ---- |
| |
| === Composite Key Example |
| |
| [source,java] |
| ---- |
| class TestKey { |
| private final int id; |
| private final String subId; |
| } |
| |
| class TestVal { |
| private final String name; |
| private final int val; |
| } |
| |
| QueryEntity qryEntity = new QueryEntity() |
| .setTableName("test_table") |
| .setKeyFields(Set.of("id", "subId")) |
| .setValueType("demo.TestVal") |
| .addQueryField("id", Integer.class.getName(), null) |
| .addQueryField("subId", String.class.getName(), null) |
| .addQueryField("name", String.class.getName(), null) |
| .addQueryField("val", Integer.class.getName(), null); |
| |
| ignite.getOrCreateCache(new CacheConfiguration<TestKey, TestVal>("test_table") |
| .setQueryEntities(List.of(qryEntity))); |
| ---- |
| |
| → PostgreSQL: |
| |
| [source,sql] |
| ---- |
| CREATE TABLE test_table ( |
| id INT, |
| subId VARCHAR, |
| name VARCHAR, |
| val INT, |
| PRIMARY KEY (id, subId) |
| ); |
| ---- |
| |
| == Insert / Update / Delete Events |
| |
| Insert, update, and delete operations are handled via `CdcEvent`. |
| |
| === Upsert with Version Conflict Resolution |
| |
| Each insert/update is translated into an `INSERT ... ON CONFLICT DO UPDATE` query, with version-based conflict resolution. |
| |
| [NOTE] |
| ==== |
| A `version` column is automatically added and stored as `BYTEA`. |
| |
| This version is a 16-byte array based on `CacheEntryVersion` encoded in big-endian order: |
| |
| - 4 bytes — `topologyVersion` (int) |
| - 8 bytes — `order` (long) |
| - 4 bytes — `nodeOrder` (int) |
| |
| This allows PostgreSQL to compare versions lexicographically: |
| |
| [source,sql] |
| ---- |
| INSERT INTO test_table (id, name, val, version) |
| VALUES (1, 'value', 5, E'\x...') |
| ON CONFLICT (id) DO UPDATE SET |
| name = EXCLUDED.name, |
| val = EXCLUDED.val |
| WHERE test_table.version < EXCLUDED.version; |
| ---- |
| ==== |
| |
| === Delete Example |
| |
| [source,sql] |
| ---- |
| DELETE FROM test_table WHERE id = 1; |
| ---- |
| |
| == Java → PostgreSQL Type Mapping |
| |
| |=== |
| | Java Type | PostgreSQL Type | Precision/Scale |
| | `java.lang.String` | `VARCHAR(precision)` | Precision only |
| | `java.lang.Integer` / `int` | `INT` | None |
| | `java.lang.Long` / `long` | `BIGINT` | None |
| | `java.lang.Boolean` / `boolean` | `BOOL` | None |
| | `java.lang.Double` / `double` | `NUMERIC(precision, scale)` | Precision & scale |
| | `java.lang.Float` / `float` | `NUMERIC(precision, scale)` | Precision & scale |
| | `java.math.BigDecimal` | `NUMERIC(precision, scale)` | Precision & scale |
| | `java.lang.Short` / `short` | `SMALLINT` | None |
| | `java.lang.Byte` / `byte` | `SMALLINT` | None |
| | `java.sql.Date` | `DATE` | None |
| | `java.sql.Time` | `TIME(precision)` | Precision only |
| | `java.sql.Timestamp` | `TIMESTAMP(precision)` | Precision only |
| | `java.util.Date` | `TIMESTAMP(precision)` | Precision only |
| | `java.util.UUID` | `UUID` | None |
| | `java.time.LocalDate` | `DATE` | None |
| | `java.time.LocalTime` | `TIME(precision)` | Precision only |
| | `java.time.LocalDateTime` | `TIMESTAMP(precision)` | Precision only |
| | `java.time.OffsetTime` | `VARCHAR(precision)` | Precision only |
| | `java.time.OffsetDateTime` | `TIMESTAMP WITH TIME ZONE` | None |
| | `byte[]` | `BYTEA` | None |
| |=== |
| |
| [NOTE] |
| ==== |
| - Precision and scale values provided in the mapping configuration will be processed and applied to the generated SQL types where supported. |
| - If the Java type is not recognized in the predefined mapping, an exception will be thrown. |
| ==== |
| |
| == Limitations |
| |
| - Only BinaryObject and primitive fields are supported |
| - `keepBinary` must be set to `true` |
| - Schema evolution is not supported — run with `createTables=true` at startup |