blob: 4eb77b1752e4fa9a400c25747aa0cac6a657d8d8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Ignite to PostgreSQL replication
== CDC replication to PostgreSql
`IgniteToPostgreSqlCdcConsumer` is a CDC consumer that asynchronously replicates data from Apache Ignite to PostgreSQL.
It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data changes (`insert`, `update`, `delete`) in specified caches and apply them to PostgreSQL.
== Key Features
- Per-cache replication (only selected caches are replicated)
- `onlyPrimary` support (replicates only from primary nodes)
- Auto table creation in PostgreSQL if needed (`createTables=true`)
- Batch replication (`batchSize`)
- User-defined `DataSource` — user configures reliability and transactional guarantees
== Configuration
Spring XML configuration example (`ignite-to-postgres.xml`):
[source,xml]
----
<bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
<property name="consumer">
<bean class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer">
<property name="caches">
<list>
<value>T1</value>
<value>T2</value>
</list>
</property>
<property name="batchSize" value="1024" />
<property name="onlyPrimary" value="true" />
<property name="createTables" value="true" />
<property name="dataSource" ref="dataSource" />
</bean>
</property>
</bean>
----
=== Configuration Options
The following settings can be used to configure the behavior of `IgniteToPostgreSqlCdcConsumer`:
[cols="1,2,1", options="header"]
|===
| Setting | Description | Default
| `dataSource` | JDBC `DataSource` used to connect to the target PostgreSQL database. Must be provided by the user. | _Required_
| `caches` | Set of Ignite cache names to replicate. Must be provided by the user. | _Required_
| `onlyPrimary` | If `true`, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters. | `true`
| `maxBatchSize` | Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are commited in a single `executeBatch()` call. | `1024`
| `createTables`| If `true`, missing target tables in PostgreSQL will be created automatically during startup.| `false`
|===
We use `PreparedStatement` for batching with `autoCommit` set to `false`, committing manually after each batch execution.
[WARNING]
====
Choosing the `dataSource` is the user's responsibility. Consider:
- Required delivery guarantees (e.g., retry logic)
- High-availability PostgreSQL setup (replicas, failover, etc.)
====
== Example `dataSource`
[source,xml]
----
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="jdbc:postgresql://localhost:5432/ignite_replica"/>
<property name="username" value="ignite_user"/>
<property name="password" value="secret"/>
<property name="initialSize" value="3"/>
<property name="maxTotal" value="10"/>
<property name="validationQuery" value="SELECT 1"/>
<property name="testOnBorrow" value="true"/>
</bean>
----
== Schema Conversion
Table schema in PostgreSQL is generated from the `QueryEntity` configured in Ignite cache.
Only **one QueryEntity is supported per cache** and is used to generate DDL and DML operations.
Schema creation occurs once on the first `CdcCacheEvent` if `createTables=true`.
=== Example: Schema from Ignite to PostgreSQL
[source,java]
----
class TestVal {
private final String name;
private final int val;
}
QueryEntity qryEntity = new QueryEntity()
.setTableName("test_table")
.setKeyFieldName("id")
.setValueType("demo.TestVal")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("val", Integer.class.getName(), null);
ignite.getOrCreateCache(new CacheConfiguration<Integer, TestVal>("test_table")
.setQueryEntities(List.of(qryEntity)));
----
→ PostgreSQL:
[source,sql]
----
CREATE TABLE test_table (
id INT PRIMARY KEY,
name VARCHAR,
val INT
);
----
=== Composite Key Example
[source,java]
----
class TestKey {
private final int id;
private final String subId;
}
class TestVal {
private final String name;
private final int val;
}
QueryEntity qryEntity = new QueryEntity()
.setTableName("test_table")
.setKeyFields(Set.of("id", "subId"))
.setValueType("demo.TestVal")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("subId", String.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("val", Integer.class.getName(), null);
ignite.getOrCreateCache(new CacheConfiguration<TestKey, TestVal>("test_table")
.setQueryEntities(List.of(qryEntity)));
----
→ PostgreSQL:
[source,sql]
----
CREATE TABLE test_table (
id INT,
subId VARCHAR,
name VARCHAR,
val INT,
PRIMARY KEY (id, subId)
);
----
== Insert / Update / Delete Events
Insert, update, and delete operations are handled via `CdcEvent`.
=== Upsert with Version Conflict Resolution
Each insert/update is translated into an `INSERT ... ON CONFLICT DO UPDATE` query, with version-based conflict resolution.
[NOTE]
====
A `version` column is automatically added and stored as `BYTEA`.
This version is a 16-byte array based on `CacheEntryVersion` encoded in big-endian order:
- 4 bytes — `topologyVersion` (int)
- 8 bytes — `order` (long)
- 4 bytes — `nodeOrder` (int)
This allows PostgreSQL to compare versions lexicographically:
[source,sql]
----
INSERT INTO test_table (id, name, val, version)
VALUES (1, 'value', 5, E'\x...')
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
val = EXCLUDED.val
WHERE test_table.version < EXCLUDED.version;
----
====
=== Delete Example
[source,sql]
----
DELETE FROM test_table WHERE id = 1;
----
== Java → PostgreSQL Type Mapping
|===
| Java Type | PostgreSQL Type | Precision/Scale
| `java.lang.String` | `VARCHAR(precision)` | Precision only
| `java.lang.Integer` / `int` | `INT` | None
| `java.lang.Long` / `long` | `BIGINT` | None
| `java.lang.Boolean` / `boolean` | `BOOL` | None
| `java.lang.Double` / `double` | `NUMERIC(precision, scale)` | Precision & scale
| `java.lang.Float` / `float` | `NUMERIC(precision, scale)` | Precision & scale
| `java.math.BigDecimal` | `NUMERIC(precision, scale)` | Precision & scale
| `java.lang.Short` / `short` | `SMALLINT` | None
| `java.lang.Byte` / `byte` | `SMALLINT` | None
| `java.sql.Date` | `DATE` | None
| `java.sql.Time` | `TIME(precision)` | Precision only
| `java.sql.Timestamp` | `TIMESTAMP(precision)` | Precision only
| `java.util.Date` | `TIMESTAMP(precision)` | Precision only
| `java.util.UUID` | `UUID` | None
| `java.time.LocalDate` | `DATE` | None
| `java.time.LocalTime` | `TIME(precision)` | Precision only
| `java.time.LocalDateTime` | `TIMESTAMP(precision)` | Precision only
| `java.time.OffsetTime` | `VARCHAR(precision)` | Precision only
| `java.time.OffsetDateTime` | `TIMESTAMP WITH TIME ZONE` | None
| `byte[]` | `BYTEA` | None
|===
[NOTE]
====
- Precision and scale values provided in the mapping configuration will be processed and applied to the generated SQL types where supported.
- If the Java type is not recognized in the predefined mapping, an exception will be thrown.
====
== Limitations
- Only BinaryObject and primitive fields are supported
- `keepBinary` must be set to `true`
- Schema evolution is not supported — run with `createTables=true` at startup