| <!-- doc/src/sgml/logicaldecoding.sgml --> |
| <chapter id="logicaldecoding"> |
| <title>Logical Decoding</title> |
| <indexterm zone="logicaldecoding"> |
| <primary>Logical Decoding</primary> |
| </indexterm> |
| <para> |
| PostgreSQL provides infrastructure to stream the modifications performed |
| via SQL to external consumers. This functionality can be used for a |
| variety of purposes, including replication solutions and auditing. |
| </para> |
| |
| <para> |
| Changes are sent out in streams identified by logical replication slots. |
| </para> |
| |
| <para> |
| The format in which those changes are streamed is determined by the output |
| plugin used. An example plugin is provided in the PostgreSQL distribution. |
| Additional plugins can be |
| written to extend the choice of available formats without modifying any |
| core code. |
| Every output plugin has access to each individual new row produced |
| by <command>INSERT</command> and the new row version created |
| by <command>UPDATE</command>. Availability of old row versions for |
| <command>UPDATE</command> and <command>DELETE</command> depends on |
| the configured replica identity (see <xref linkend="sql-altertable-replica-identity"/>). |
| </para> |
| |
| <para> |
| Changes can be consumed either using the streaming replication protocol |
| (see <xref linkend="protocol-replication"/> and |
| <xref linkend="logicaldecoding-walsender"/>), or by calling functions |
| via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible |
| to write additional methods of consuming the output of a replication slot |
| without modifying core code |
| (see <xref linkend="logicaldecoding-writer"/>). |
| </para> |
| |
| <sect1 id="logicaldecoding-example"> |
| <title>Logical Decoding Examples</title> |
| |
| <para> |
| The following example demonstrates controlling logical decoding using the |
| SQL interface. |
| </para> |
| |
| <para> |
| Before you can use logical decoding, you must set |
| <xref linkend="guc-wal-level"/> to <literal>logical</literal> and |
| <xref linkend="guc-max-replication-slots"/> to at least 1. Then, you |
| should connect to the target database (in the example |
| below, <literal>postgres</literal>) as a superuser. |
| </para> |
| |
| <programlisting> |
| postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding' |
| postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); |
| slot_name | lsn |
| -----------------+----------- |
| regression_slot | 0/16B1970 |
| (1 row) |
| |
| postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; |
| slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn |
| -----------------+---------------+-----------+----------+--------+-------------+----------------- |
| regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440 |
| (1 row) |
| |
| postgres=# -- There are no changes to see yet |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----+-----+------ |
| (0 rows) |
| |
| postgres=# CREATE TABLE data(id serial primary key, data text); |
| CREATE TABLE |
| |
| postgres=# -- DDL isn't replicated, so all you'll see is the transaction |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-------+-------------- |
| 0/BA2DA58 | 10297 | BEGIN 10297 |
| 0/BA5A5A0 | 10297 | COMMIT 10297 |
| (2 rows) |
| |
| postgres=# -- Once changes are read, they're consumed and not emitted |
| postgres=# -- in a subsequent call: |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----+-----+------ |
| (0 rows) |
| |
| postgres=# BEGIN; |
| postgres=*# INSERT INTO data(data) VALUES('1'); |
| postgres=*# INSERT INTO data(data) VALUES('2'); |
| postgres=*# COMMIT; |
| |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-------+--------------------------------------------------------- |
| 0/BA5A688 | 10298 | BEGIN 10298 |
| 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1' |
| 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2' |
| 0/BA5A8A8 | 10298 | COMMIT 10298 |
| (4 rows) |
| |
| postgres=# INSERT INTO data(data) VALUES('3'); |
| |
| postgres=# -- You can also peek ahead in the change stream without consuming changes |
| postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-------+--------------------------------------------------------- |
| 0/BA5A8E0 | 10299 | BEGIN 10299 |
| 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' |
| 0/BA5A990 | 10299 | COMMIT 10299 |
| (3 rows) |
| |
| postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again |
| postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-------+--------------------------------------------------------- |
| 0/BA5A8E0 | 10299 | BEGIN 10299 |
| 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' |
| 0/BA5A990 | 10299 | COMMIT 10299 |
| (3 rows) |
| |
| postgres=# -- options can be passed to output plugin, to influence the formatting |
| postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); |
| lsn | xid | data |
| -----------+-------+--------------------------------------------------------- |
| 0/BA5A8E0 | 10299 | BEGIN 10299 |
| 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' |
| 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04) |
| (3 rows) |
| |
| postgres=# -- Remember to destroy a slot you no longer need to stop it consuming |
| postgres=# -- server resources: |
| postgres=# SELECT pg_drop_replication_slot('regression_slot'); |
| pg_drop_replication_slot |
| ----------------------- |
| |
| (1 row) |
| </programlisting> |
| |
| <para> |
| The following example shows how logical decoding is controlled over the |
| streaming replication protocol, using the |
| program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL |
| distribution. This requires that client authentication is set up to allow |
| replication connections |
| (see <xref linkend="streaming-replication-authentication"/>) and |
| that <varname>max_wal_senders</varname> is set sufficiently high to allow |
| an additional connection. |
| </para> |
| <programlisting> |
| $ pg_recvlogical -d postgres --slot=test --create-slot |
| $ pg_recvlogical -d postgres --slot=test --start -f - |
| <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo> |
| $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');" |
| $ fg |
| BEGIN 693 |
| table public.data: INSERT: id[integer]:4 data[text]:'4' |
| COMMIT 693 |
| <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo> |
| $ pg_recvlogical -d postgres --slot=test --drop-slot |
| </programlisting> |
| |
| <para> |
| The following example shows SQL interface that can be used to decode prepared |
| transactions. Before you use two-phase commit commands, you must set |
| <varname>max_prepared_transactions</varname> to at least 1. You must also have |
| set the two-phase parameter as 'true' while creating the slot using |
| <function>pg_create_logical_replication_slot</function> |
| Note that we will stream the entire transaction after the commit if it |
| is not already decoded. |
| </para> |
| <programlisting> |
| postgres=# BEGIN; |
| postgres=*# INSERT INTO data(data) VALUES('5'); |
| postgres=*# PREPARE TRANSACTION 'test_prepared1'; |
| |
| postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-----+--------------------------------------------------------- |
| 0/1689DC0 | 529 | BEGIN 529 |
| 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5' |
| 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529 |
| (3 rows) |
| |
| postgres=# COMMIT PREPARED 'test_prepared1'; |
| postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-----+-------------------------------------------- |
| 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529 |
| (4 row) |
| |
| postgres=#-- you can also rollback a prepared transaction |
| postgres=# BEGIN; |
| postgres=*# INSERT INTO data(data) VALUES('6'); |
| postgres=*# PREPARE TRANSACTION 'test_prepared2'; |
| postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-----+--------------------------------------------------------- |
| 0/168A180 | 530 | BEGIN 530 |
| 0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6' |
| 0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530 |
| (3 rows) |
| |
| postgres=# ROLLBACK PREPARED 'test_prepared2'; |
| postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); |
| lsn | xid | data |
| -----------+-----+---------------------------------------------- |
| 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530 |
| (1 row) |
| </programlisting> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-explanation"> |
| <title>Logical Decoding Concepts</title> |
| <sect2> |
| <title>Logical Decoding</title> |
| |
| <indexterm> |
| <primary>Logical Decoding</primary> |
| </indexterm> |
| |
| <para> |
| Logical decoding is the process of extracting all persistent changes |
| to a database's tables into a coherent, easy to understand format which |
| can be interpreted without detailed knowledge of the database's internal |
| state. |
| </para> |
| |
| <para> |
| In <productname>PostgreSQL</productname>, logical decoding is implemented |
| by decoding the contents of the <link linkend="wal">write-ahead |
| log</link>, which describe changes on a storage level, into an |
| application-specific form such as a stream of tuples or SQL statements. |
| </para> |
| </sect2> |
| |
| <sect2 id="logicaldecoding-replication-slots"> |
| <title>Replication Slots</title> |
| |
| <indexterm> |
| <primary>replication slot</primary> |
| <secondary>logical replication</secondary> |
| </indexterm> |
| |
| <para> |
| In the context of logical replication, a slot represents a stream of |
| changes that can be replayed to a client in the order they were made on |
| the origin server. Each slot streams a sequence of changes from a single |
| database. |
| </para> |
| |
| <note> |
| <para><productname>PostgreSQL</productname> also has streaming replication slots |
| (see <xref linkend="streaming-replication"/>), but they are used somewhat |
| differently there. |
| </para> |
| </note> |
| |
| <para> |
| A replication slot has an identifier that is unique across all databases |
| in a <productname>PostgreSQL</productname> cluster. Slots persist |
| independently of the connection using them and are crash-safe. |
| </para> |
| |
| <para> |
| A logical slot will emit each change just once in normal operation. |
| The current position of each slot is persisted only at checkpoint, so in |
| the case of a crash the slot may return to an earlier LSN, which will |
| then cause recent changes to be sent again when the server restarts. |
| Logical decoding clients are responsible for avoiding ill effects from |
| handling the same message more than once. Clients may wish to record |
| the last LSN they saw when decoding and skip over any repeated data or |
| (when using the replication protocol) request that decoding start from |
| that LSN rather than letting the server determine the start point. |
| The Replication Progress Tracking feature is designed for this purpose, |
| refer to <link linkend="replication-origins">replication origins</link>. |
| </para> |
| |
| <para> |
| Multiple independent slots may exist for a single database. Each slot has |
| its own state, allowing different consumers to receive changes from |
| different points in the database change stream. For most applications, a |
| separate slot will be required for each consumer. |
| </para> |
| |
| <para> |
| A logical replication slot knows nothing about the state of the |
| receiver(s). It's even possible to have multiple different receivers using |
| the same slot at different times; they'll just get the changes following |
| on from when the last receiver stopped consuming them. Only one receiver |
| may consume changes from a slot at any given time. |
| </para> |
| |
| <caution> |
| <para> |
| Replication slots persist across crashes and know nothing about the state |
| of their consumer(s). They will prevent removal of required resources |
| even when there is no connection using them. This consumes storage |
| because neither required WAL nor required rows from the system catalogs |
| can be removed by <command>VACUUM</command> as long as they are required by a replication |
| slot. In extreme cases this could cause the database to shut down to prevent |
| transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>). |
| So if a slot is no longer required it should be dropped. |
| </para> |
| </caution> |
| </sect2> |
| |
| <sect2> |
| <title>Output Plugins</title> |
| <para> |
| Output plugins transform the data from the write-ahead log's internal |
| representation into the format the consumer of a replication slot desires. |
| </para> |
| </sect2> |
| |
| <sect2> |
| <title>Exported Snapshots</title> |
| <para> |
| When a new replication slot is created using the streaming replication |
| interface (see <xref linkend="protocol-replication-create-slot"/>), a |
| snapshot is exported |
| (see <xref linkend="functions-snapshot-synchronization"/>), which will show |
| exactly the state of the database after which all changes will be |
| included in the change stream. This can be used to create a new replica by |
| using <link linkend="sql-set-transaction"><literal>SET TRANSACTION |
| SNAPSHOT</literal></link> to read the state of the database at the moment |
| the slot was created. This transaction can then be used to dump the |
| database's state at that point in time, which afterwards can be updated |
| using the slot's contents without losing any changes. |
| </para> |
| <para> |
| Creation of a snapshot is not always possible. In particular, it will |
| fail when connected to a hot standby. Applications that do not require |
| snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal> |
| option. |
| </para> |
| </sect2> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-walsender"> |
| <title>Streaming Replication Protocol Interface</title> |
| |
| <para> |
| The commands |
| <itemizedlist> |
| <listitem> |
| <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para> |
| </listitem> |
| |
| <listitem> |
| <para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para> |
| </listitem> |
| |
| <listitem> |
| <para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para> |
| </listitem> |
| </itemizedlist> |
| are used to create, drop, and stream changes from a replication |
| slot, respectively. These commands are only available over a replication |
| connection; they cannot be used via SQL. |
| See <xref linkend="protocol-replication"/> for details on these commands. |
| </para> |
| |
| <para> |
| The command <xref linkend="app-pgrecvlogical"/> can be used to control |
| logical decoding over a streaming replication connection. (It uses |
| these commands internally.) |
| </para> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-sql"> |
| <title>Logical Decoding <acronym>SQL</acronym> Interface</title> |
| |
| <para> |
| See <xref linkend="functions-replication"/> for detailed documentation on |
| the SQL-level API for interacting with logical decoding. |
| </para> |
| |
| <para> |
| Synchronous replication (see <xref linkend="synchronous-replication"/>) is |
| only supported on replication slots used over the streaming replication interface. The |
| function interface and additional, non-core interfaces do not support |
| synchronous replication. |
| </para> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-catalogs"> |
| <title>System Catalogs Related to Logical Decoding</title> |
| |
| <para> |
| The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link> |
| view and the |
| <link linkend="monitoring-pg-stat-replication-view"> |
| <structname>pg_stat_replication</structname></link> |
| view provide information about the current state of replication slots and |
| streaming replication connections respectively. These views apply to both physical and |
| logical replication. The |
| <link linkend="monitoring-pg-stat-replication-slots-view"> |
| <structname>pg_stat_replication_slots</structname></link> |
| view provides statistics information about the logical replication slots. |
| </para> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-output-plugin"> |
| <title>Logical Decoding Output Plugins</title> |
| <para> |
| An example output plugin can be found in the |
| <link linkend="test-decoding"> |
| <filename>contrib/test_decoding</filename> |
| </link> |
| subdirectory of the PostgreSQL source tree. |
| </para> |
| <sect2 id="logicaldecoding-output-init"> |
| <title>Initialization Function</title> |
| <indexterm zone="logicaldecoding-output-init"> |
| <primary>_PG_output_plugin_init</primary> |
| </indexterm> |
| <para> |
| An output plugin is loaded by dynamically loading a shared library with |
| the output plugin's name as the library base name. The normal library |
| search path is used to locate the library. To provide the required output |
| plugin callbacks and to indicate that the library is actually an output |
| plugin it needs to provide a function named |
| <function>_PG_output_plugin_init</function>. This function is passed a |
| struct that needs to be filled with the callback function pointers for |
| individual actions. |
| <programlisting> |
| typedef struct OutputPluginCallbacks |
| { |
| LogicalDecodeStartupCB startup_cb; |
| LogicalDecodeBeginCB begin_cb; |
| LogicalDecodeChangeCB change_cb; |
| LogicalDecodeTruncateCB truncate_cb; |
| LogicalDecodeCommitCB commit_cb; |
| LogicalDecodeMessageCB message_cb; |
| LogicalDecodeFilterByOriginCB filter_by_origin_cb; |
| LogicalDecodeShutdownCB shutdown_cb; |
| LogicalDecodeFilterPrepareCB filter_prepare_cb; |
| LogicalDecodeBeginPrepareCB begin_prepare_cb; |
| LogicalDecodePrepareCB prepare_cb; |
| LogicalDecodeCommitPreparedCB commit_prepared_cb; |
| LogicalDecodeRollbackPreparedCB rollback_prepared_cb; |
| LogicalDecodeStreamStartCB stream_start_cb; |
| LogicalDecodeStreamStopCB stream_stop_cb; |
| LogicalDecodeStreamAbortCB stream_abort_cb; |
| LogicalDecodeStreamPrepareCB stream_prepare_cb; |
| LogicalDecodeStreamCommitCB stream_commit_cb; |
| LogicalDecodeStreamChangeCB stream_change_cb; |
| LogicalDecodeStreamMessageCB stream_message_cb; |
| LogicalDecodeStreamTruncateCB stream_truncate_cb; |
| } OutputPluginCallbacks; |
| |
| typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); |
| </programlisting> |
| The <function>begin_cb</function>, <function>change_cb</function> |
| and <function>commit_cb</function> callbacks are required, |
| while <function>startup_cb</function>, |
| <function>filter_by_origin_cb</function>, <function>truncate_cb</function>, |
| and <function>shutdown_cb</function> are optional. |
| If <function>truncate_cb</function> is not set but a |
| <command>TRUNCATE</command> is to be decoded, the action will be ignored. |
| </para> |
| |
| <para> |
| An output plugin may also define functions to support streaming of large, |
| in-progress transactions. The <function>stream_start_cb</function>, |
| <function>stream_stop_cb</function>, <function>stream_abort_cb</function>, |
| <function>stream_commit_cb</function>, <function>stream_change_cb</function>, |
| and <function>stream_prepare_cb</function> |
| are required, while <function>stream_message_cb</function> and |
| <function>stream_truncate_cb</function> are optional. |
| </para> |
| |
| <para> |
| An output plugin may also define functions to support two-phase commits, |
| which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>. |
| The <function>begin_prepare_cb</function>, <function>prepare_cb</function>, |
| <function>stream_prepare_cb</function>, |
| <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function> |
| callbacks are required, while <function>filter_prepare_cb</function> is optional. |
| </para> |
| </sect2> |
| |
| <sect2 id="logicaldecoding-capabilities"> |
| <title>Capabilities</title> |
| |
| <para> |
| To decode, format and output changes, output plugins can use most of the |
| backend's normal infrastructure, including calling output functions. Read |
| only access to relations is permitted as long as only relations are |
| accessed that either have been created by <command>initdb</command> in |
| the <literal>pg_catalog</literal> schema, or have been marked as user |
| provided catalog tables using |
| <programlisting> |
| ALTER TABLE user_catalog_table SET (user_catalog_table = true); |
| CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); |
| </programlisting> |
| Note that access to user catalog tables or regular system catalog tables |
| in the output plugins has to be done via the <literal>systable_*</literal> |
| scan APIs only. Access via the <literal>heap_*</literal> scan APIs will |
| error out. Additionally, any actions leading to transaction ID assignment |
| are prohibited. That, among others, includes writing to tables, performing |
| DDL changes, and calling <literal>pg_current_xact_id()</literal>. |
| </para> |
| </sect2> |
| |
| <sect2 id="logicaldecoding-output-mode"> |
| <title>Output Modes</title> |
| |
| <para> |
| Output plugin callbacks can pass data to the consumer in nearly arbitrary |
| formats. For some use cases, like viewing the changes via SQL, returning |
| data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is |
| cumbersome. If the output plugin only outputs textual data in the |
| server's encoding, it can declare that by |
| setting <literal>OutputPluginOptions.output_type</literal> |
| to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead |
| of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in |
| the <link linkend="logicaldecoding-output-plugin-startup">startup |
| callback</link>. In that case, all the data has to be in the server's encoding |
| so that a <type>text</type> datum can contain it. This is checked in assertion-enabled |
| builds. |
| </para> |
| </sect2> |
| |
| <sect2 id="logicaldecoding-output-plugin-callbacks"> |
| <title>Output Plugin Callbacks</title> |
| |
| <para> |
| An output plugin gets notified about changes that are happening via |
| various callbacks it needs to provide. |
| </para> |
| |
| <para> |
| Concurrent transactions are decoded in commit order, and only changes |
| belonging to a specific transaction are decoded between |
| the <literal>begin</literal> and <literal>commit</literal> |
| callbacks. Transactions that were rolled back explicitly or implicitly |
| never get |
| decoded. Successful savepoints are |
| folded into the transaction containing them in the order they were |
| executed within that transaction. A transaction that is prepared for |
| a two-phase commit using <command>PREPARE TRANSACTION</command> will |
| also be decoded if the output plugin callbacks needed for decoding |
| them are provided. It is possible that the current prepared transaction |
| which is being decoded is aborted concurrently via a |
| <command>ROLLBACK PREPARED</command> command. In that case, the logical |
| decoding of this transaction will be aborted too. All the changes of such |
| a transaction are skipped once the abort is detected and the |
| <function>prepare_cb</function> callback is invoked. Thus even in case of |
| a concurrent abort, enough information is provided to the output plugin |
| for it to properly deal with <command>ROLLBACK PREPARED</command> once |
| that is decoded. |
| </para> |
| |
| <note> |
| <para> |
| Only transactions that have already safely been flushed to disk will be |
| decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a |
| directly following <literal>pg_logical_slot_get_changes()</literal> |
| when <varname>synchronous_commit</varname> is set |
| to <literal>off</literal>. |
| </para> |
| </note> |
| |
| <sect3 id="logicaldecoding-output-plugin-startup"> |
| <title>Startup Callback</title> |
| <para> |
| The optional <function>startup_cb</function> callback is called whenever |
| a replication slot is created or asked to stream changes, independent |
| of the number of changes that are ready to be put out. |
| <programlisting> |
| typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, |
| OutputPluginOptions *options, |
| bool is_init); |
| </programlisting> |
| The <literal>is_init</literal> parameter will be true when the |
| replication slot is being created and false |
| otherwise. <parameter>options</parameter> points to a struct of options |
| that output plugins can set: |
| <programlisting> |
| typedef struct OutputPluginOptions |
| { |
| OutputPluginOutputType output_type; |
| bool receive_rewrites; |
| } OutputPluginOptions; |
| </programlisting> |
| <literal>output_type</literal> has to either be set to |
| <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> |
| or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also |
| <xref linkend="logicaldecoding-output-mode"/>. |
| If <literal>receive_rewrites</literal> is true, the output plugin will |
| also be called for changes made by heap rewrites during certain DDL |
| operations. These are of interest to plugins that handle DDL |
| replication, but they require special handling. |
| </para> |
| |
| <para> |
| The startup callback should validate the options present in |
| <literal>ctx->output_plugin_options</literal>. If the output plugin |
| needs to have a state, it can |
| use <literal>ctx->output_plugin_private</literal> to store it. |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-shutdown"> |
| <title>Shutdown Callback</title> |
| |
| <para> |
| The optional <function>shutdown_cb</function> callback is called |
| whenever a formerly active replication slot is not used anymore and can |
| be used to deallocate resources private to the output plugin. The slot |
| isn't necessarily being dropped, streaming is just being stopped. |
| <programlisting> |
| typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-begin"> |
| <title>Transaction Begin Callback</title> |
| |
| <para> |
| The required <function>begin_cb</function> callback is called whenever a |
| start of a committed transaction has been decoded. Aborted transactions |
| and their contents never get decoded. |
| <programlisting> |
| typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| </programlisting> |
| The <parameter>txn</parameter> parameter contains meta information about |
| the transaction, like the time stamp at which it has been committed and |
| its XID. |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-commit"> |
| <title>Transaction End Callback</title> |
| |
| <para> |
| The required <function>commit_cb</function> callback is called whenever |
| a transaction commit has been |
| decoded. The <function>change_cb</function> callbacks for all modified |
| rows will have been called before this, if there have been any modified |
| rows. |
| <programlisting> |
| typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-change"> |
| <title>Change Callback</title> |
| |
| <para> |
| The required <function>change_cb</function> callback is called for every |
| individual row modification inside a transaction, may it be |
| an <command>INSERT</command>, <command>UPDATE</command>, |
| or <command>DELETE</command>. Even if the original command modified |
| several rows at once the callback will be called individually for each |
| row. The <function>change_cb</function> callback may access system or |
| user catalog tables to aid in the process of outputting the row |
| modification details. In case of decoding a prepared (but yet |
| uncommitted) transaction or decoding of an uncommitted transaction, this |
| change callback might also error out due to simultaneous rollback of |
| this very same transaction. In that case, the logical decoding of this |
| aborted transaction is stopped gracefully. |
| <programlisting> |
| typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| Relation relation, |
| ReorderBufferChange *change); |
| </programlisting> |
| The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters |
| have the same contents as for the <function>begin_cb</function> |
| and <function>commit_cb</function> callbacks, but additionally the |
| relation descriptor <parameter>relation</parameter> points to the |
| relation the row belongs to and a struct |
| <parameter>change</parameter> describing the row modification are passed |
| in. |
| </para> |
| |
| <note> |
| <para> |
| Only changes in user defined tables that are not unlogged |
| (see <xref linkend="sql-createtable-unlogged"/>) and not temporary |
| (see <xref linkend="sql-createtable-temporary"/>) can be extracted using |
| logical decoding. |
| </para> |
| </note> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-truncate"> |
| <title>Truncate Callback</title> |
| |
| <para> |
| The <function>truncate_cb</function> callback is called for a |
| <command>TRUNCATE</command> command. |
| <programlisting> |
| typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| int nrelations, |
| Relation relations[], |
| ReorderBufferChange *change); |
| </programlisting> |
| The parameters are analogous to the <function>change_cb</function> |
| callback. However, because <command>TRUNCATE</command> actions on |
| tables connected by foreign keys need to be executed together, this |
| callback receives an array of relations instead of just a single one. |
| See the description of the <xref linkend="sql-truncate"/> statement for |
| details. |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-filter-origin"> |
| <title>Origin Filter Callback</title> |
| |
| <para> |
| The optional <function>filter_by_origin_cb</function> callback |
| is called to determine whether data that has been replayed |
| from <parameter>origin_id</parameter> is of interest to the |
| output plugin. |
| <programlisting> |
| typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, |
| RepOriginId origin_id); |
| </programlisting> |
| The <parameter>ctx</parameter> parameter has the same contents |
| as for the other callbacks. No information but the origin is |
| available. To signal that changes originating on the passed in |
| node are irrelevant, return true, causing them to be filtered |
| away; false otherwise. The other callbacks will not be called |
| for transactions and changes that have been filtered away. |
| </para> |
| <para> |
| This is useful when implementing cascading or multidirectional |
| replication solutions. Filtering by the origin allows to |
| prevent replicating the same changes back and forth in such |
| setups. While transactions and changes also carry information |
| about the origin, filtering via this callback is noticeably |
| more efficient. |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-message"> |
| <title>Generic Message Callback</title> |
| |
| <para> |
| The optional <function>message_cb</function> callback is called whenever |
| a logical decoding message has been decoded. |
| <programlisting> |
| typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr message_lsn, |
| bool transactional, |
| const char *prefix, |
| Size message_size, |
| const char *message); |
| </programlisting> |
| The <parameter>txn</parameter> parameter contains meta information about |
| the transaction, like the time stamp at which it has been committed and |
| its XID. Note however that it can be NULL when the message is |
| non-transactional and the XID was not assigned yet in the transaction |
| which logged the message. The <parameter>lsn</parameter> has WAL |
| location of the message. The <parameter>transactional</parameter> says |
| if the message was sent as transactional or not. Similar to the change |
| callback, in case of decoding a prepared (but yet uncommitted) |
| transaction or decoding of an uncommitted transaction, this message |
| callback might also error out due to simultaneous rollback of |
| this very same transaction. In that case, the logical decoding of this |
| aborted transaction is stopped gracefully. |
| |
| The <parameter>prefix</parameter> is arbitrary null-terminated prefix |
| which can be used for identifying interesting messages for the current |
| plugin. And finally the <parameter>message</parameter> parameter holds |
| the actual message of <parameter>message_size</parameter> size. |
| </para> |
| <para> |
| Extra care should be taken to ensure that the prefix the output plugin |
| considers interesting is unique. Using name of the extension or the |
| output plugin itself is often a good choice. |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-filter-prepare"> |
| <title>Prepare Filter Callback</title> |
| |
| <para> |
| The optional <function>filter_prepare_cb</function> callback |
| is called to determine whether data that is part of the current |
| two-phase commit transaction should be considered for decoding |
| at this prepare stage or later as a regular one-phase transaction at |
| <command>COMMIT PREPARED</command> time. To signal that |
| decoding should be skipped, return <literal>true</literal>; |
| <literal>false</literal> otherwise. When the callback is not |
| defined, <literal>false</literal> is assumed (i.e. no filtering, all |
| transactions using two-phase commit are decoded in two phases as well). |
| <programlisting> |
| typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, |
| TransactionId xid, |
| const char *gid); |
| </programlisting> |
| The <parameter>ctx</parameter> parameter has the same contents as for |
| the other callbacks. The parameters <parameter>xid</parameter> |
| and <parameter>gid</parameter> provide two different ways to identify |
| the transaction. The later <command>COMMIT PREPARED</command> or |
| <command>ROLLBACK PREPARED</command> carries both identifiers, |
| providing an output plugin the choice of what to use. |
| </para> |
| <para> |
| The callback may be invoked multiple times per transaction to decode |
| and must provide the same static answer for a given pair of |
| <parameter>xid</parameter> and <parameter>gid</parameter> every time |
| it is called. |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-begin-prepare"> |
| <title>Transaction Begin Prepare Callback</title> |
| |
| <para> |
| The required <function>begin_prepare_cb</function> callback is called |
| whenever the start of a prepared transaction has been decoded. The |
| <parameter>gid</parameter> field, which is part of the |
| <parameter>txn</parameter> parameter, can be used in this callback to |
| check if the plugin has already received this <command>PREPARE</command> |
| in which case it can either error out or skip the remaining changes of |
| the transaction. |
| <programlisting> |
| typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-prepare"> |
| <title>Transaction Prepare Callback</title> |
| |
| <para> |
| The required <function>prepare_cb</function> callback is called whenever |
| a transaction which is prepared for two-phase commit has been |
| decoded. The <function>change_cb</function> callback for all modified |
| rows will have been called before this, if there have been any modified |
| rows. The <parameter>gid</parameter> field, which is part of the |
| <parameter>txn</parameter> parameter, can be used in this callback. |
| <programlisting> |
| typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_lsn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-commit-prepared"> |
| <title>Transaction Commit Prepared Callback</title> |
| |
| <para> |
| The required <function>commit_prepared_cb</function> callback is called |
| whenever a transaction <command>COMMIT PREPARED</command> has been decoded. |
| The <parameter>gid</parameter> field, which is part of the |
| <parameter>txn</parameter> parameter, can be used in this callback. |
| <programlisting> |
| typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-rollback-prepared"> |
| <title>Transaction Rollback Prepared Callback</title> |
| |
| <para> |
| The required <function>rollback_prepared_cb</function> callback is called |
| whenever a transaction <command>ROLLBACK PREPARED</command> has been |
| decoded. The <parameter>gid</parameter> field, which is part of the |
| <parameter>txn</parameter> parameter, can be used in this callback. The |
| parameters <parameter>prepare_end_lsn</parameter> and |
| <parameter>prepare_time</parameter> can be used to check if the plugin |
| has received this <command>PREPARE TRANSACTION</command> in which case |
| it can apply the rollback, otherwise, it can skip the rollback operation. The |
| <parameter>gid</parameter> alone is not sufficient because the downstream |
| node can have a prepared transaction with same identifier. |
| <programlisting> |
| typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_end_lsn, |
| TimestampTz prepare_time); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-start"> |
| <title>Stream Start Callback</title> |
| <para> |
| The <function>stream_start_cb</function> callback is called when opening |
| a block of streamed changes from an in-progress transaction. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-stop"> |
| <title>Stream Stop Callback</title> |
| <para> |
| The <function>stream_stop_cb</function> callback is called when closing |
| a block of streamed changes from an in-progress transaction. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-abort"> |
| <title>Stream Abort Callback</title> |
| <para> |
| The <function>stream_abort_cb</function> callback is called to abort |
| a previously streamed transaction. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr abort_lsn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-prepare"> |
| <title>Stream Prepare Callback</title> |
| <para> |
| The <function>stream_prepare_cb</function> callback is called to prepare |
| a previously streamed transaction as part of a two-phase commit. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_lsn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-commit"> |
| <title>Stream Commit Callback</title> |
| <para> |
| The <function>stream_commit_cb</function> callback is called to commit |
| a previously streamed transaction. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-change"> |
| <title>Stream Change Callback</title> |
| <para> |
| The <function>stream_change_cb</function> callback is called when sending |
| a change in a block of streamed changes (demarcated by |
| <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls). |
| The actual changes are not displayed as the transaction can abort at a later |
| point in time and we don't decode changes for aborted transactions. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| Relation relation, |
| ReorderBufferChange *change); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-message"> |
| <title>Stream Message Callback</title> |
| <para> |
| The <function>stream_message_cb</function> callback is called when sending |
| a generic message in a block of streamed changes (demarcated by |
| <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls). |
| The message contents for transactional messages are not displayed as the transaction |
| can abort at a later point in time and we don't decode changes for aborted |
| transactions. |
| <programlisting> |
| typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr message_lsn, |
| bool transactional, |
| const char *prefix, |
| Size message_size, |
| const char *message); |
| </programlisting> |
| </para> |
| </sect3> |
| |
| <sect3 id="logicaldecoding-output-plugin-stream-truncate"> |
| <title>Stream Truncate Callback</title> |
| <para> |
| The <function>stream_truncate_cb</function> callback is called for a |
| <command>TRUNCATE</command> command in a block of streamed changes |
| (demarcated by <function>stream_start_cb</function> and |
| <function>stream_stop_cb</function> calls). |
| <programlisting> |
| typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| int nrelations, |
| Relation relations[], |
| ReorderBufferChange *change); |
| </programlisting> |
| The parameters are analogous to the <function>stream_change_cb</function> |
| callback. However, because <command>TRUNCATE</command> actions on |
| tables connected by foreign keys need to be executed together, this |
| callback receives an array of relations instead of just a single one. |
| See the description of the <xref linkend="sql-truncate"/> statement for |
| details. |
| </para> |
| </sect3> |
| |
| </sect2> |
| |
| <sect2 id="logicaldecoding-output-plugin-output"> |
| <title>Functions for Producing Output</title> |
| |
| <para> |
| To actually produce output, output plugins can write data to |
| the <literal>StringInfo</literal> output buffer |
| in <literal>ctx->out</literal> when inside |
| the <function>begin_cb</function>, <function>commit_cb</function>, |
| or <function>change_cb</function> callbacks. Before writing to the output |
| buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has |
| to be called, and after finishing writing to the |
| buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be |
| called to perform the write. The <parameter>last_write</parameter> |
| indicates whether a particular write was the callback's last write. |
| </para> |
| |
| <para> |
| The following example shows how to output data to the consumer of an |
| output plugin: |
| <programlisting> |
| OutputPluginPrepareWrite(ctx, true); |
| appendStringInfo(ctx->out, "BEGIN %u", txn->xid); |
| OutputPluginWrite(ctx, true); |
| </programlisting> |
| </para> |
| </sect2> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-writer"> |
| <title>Logical Decoding Output Writers</title> |
| |
| <para> |
| It is possible to add more output methods for logical decoding. |
| For details, see |
| <filename>src/backend/replication/logical/logicalfuncs.c</filename>. |
| Essentially, three functions need to be provided: one to read WAL, one to |
| prepare writing output, and one to write the output |
| (see <xref linkend="logicaldecoding-output-plugin-output"/>). |
| </para> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-synchronous"> |
| <title>Synchronous Replication Support for Logical Decoding</title> |
| <sect2> |
| <title>Overview</title> |
| |
| <para> |
| Logical decoding can be used to build |
| <link linkend="synchronous-replication">synchronous |
| replication</link> solutions with the same user interface as synchronous |
| replication for <link linkend="streaming-replication">streaming |
| replication</link>. To do this, the streaming replication interface |
| (see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out |
| data. Clients have to send <literal>Standby status update (F)</literal> |
| (see <xref linkend="protocol-replication"/>) messages, just like streaming |
| replication clients do. |
| </para> |
| |
| <note> |
| <para> |
| A synchronous replica receiving changes via logical decoding will work in |
| the scope of a single database. Since, in contrast to |
| that, <parameter>synchronous_standby_names</parameter> currently is |
| server wide, this means this technique will not work properly if more |
| than one database is actively used. |
| </para> |
| </note> |
| </sect2> |
| |
| <sect2 id="logicaldecoding-synchronous-caveats"> |
| <title>Caveats</title> |
| |
| <para> |
| In synchronous replication setup, a deadlock can happen, if the transaction |
| has locked [user] catalog tables exclusively. See |
| <xref linkend="logicaldecoding-capabilities"/> for information on user |
| catalog tables. This is because logical decoding of transactions can lock |
| catalog tables to access them. To avoid this users must refrain from taking |
| an exclusive lock on [user] catalog tables. This can happen in the following |
| ways: |
| |
| <itemizedlist> |
| <listitem> |
| <para> |
| Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname> |
| in a transaction. |
| </para> |
| </listitem> |
| |
| <listitem> |
| <para> |
| Perform <command>CLUSTER</command> on <structname>pg_class</structname> in |
| a transaction. |
| </para> |
| </listitem> |
| |
| <listitem> |
| <para> |
| <command>PREPARE TRANSACTION</command> after <command>LOCK</command> command |
| on <structname>pg_class</structname> and allow logical decoding of two-phase |
| transactions. |
| </para> |
| </listitem> |
| |
| <listitem> |
| <para> |
| <command>PREPARE TRANSACTION</command> after <command>CLUSTER</command> |
| command on <structname>pg_trigger</structname> and allow logical decoding of |
| two-phase transactions. This will lead to deadlock only when published table |
| have a trigger. |
| </para> |
| </listitem> |
| |
| <listitem> |
| <para> |
| Executing <command>TRUNCATE</command> on [user] catalog table in a |
| transaction. |
| </para> |
| </listitem> |
| </itemizedlist> |
| |
| Note that these commands that can cause deadlock apply to not only explicitly |
| indicated system catalog tables above but also to any other [user] catalog |
| table. |
| </para> |
| </sect2> |
| </sect1> |
| |
| <sect1 id="logicaldecoding-streaming"> |
| <title>Streaming of Large Transactions for Logical Decoding</title> |
| |
| <para> |
| The basic output plugin callbacks (e.g., <function>begin_cb</function>, |
| <function>change_cb</function>, <function>commit_cb</function> and |
| <function>message_cb</function>) are only invoked when the transaction |
| actually commits. The changes are still decoded from the transaction |
| log, but are only passed to the output plugin at commit (and discarded |
| if the transaction aborts). |
| </para> |
| |
| <para> |
| This means that while the decoding happens incrementally, and may spill |
| to disk to keep memory usage under control, all the decoded changes have |
| to be transmitted when the transaction finally commits (or more precisely, |
| when the commit is decoded from the transaction log). Depending on the |
| size of the transaction and network bandwidth, the transfer time may |
| significantly increase the apply lag. |
| </para> |
| |
| <para> |
| To reduce the apply lag caused by large transactions, an output plugin |
| may provide additional callback to support incremental streaming of |
| in-progress transactions. There are multiple required streaming callbacks |
| (<function>stream_start_cb</function>, <function>stream_stop_cb</function>, |
| <function>stream_abort_cb</function>, <function>stream_commit_cb</function> |
| and <function>stream_change_cb</function>) and two optional callbacks |
| (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>). |
| </para> |
| |
| <para> |
| When streaming an in-progress transaction, the changes (and messages) are |
| streamed in blocks demarcated by <function>stream_start_cb</function> |
| and <function>stream_stop_cb</function> callbacks. Once all the decoded |
| changes are transmitted, the transaction can be committed using the |
| the <function>stream_commit_cb</function> callback |
| (or possibly aborted using the <function>stream_abort_cb</function> callback). |
| If two-phase commits are supported, the transaction can be prepared using the |
| <function>stream_prepare_cb</function> callback, |
| <command>COMMIT PREPARED</command> using the |
| <function>commit_prepared_cb</function> callback or aborted using the |
| <function>rollback_prepared_cb</function>. |
| </para> |
| |
| <para> |
| One example sequence of streaming callback calls for one transaction may |
| look like this: |
| <programlisting> |
| stream_start_cb(...); <-- start of first block of changes |
| stream_change_cb(...); |
| stream_change_cb(...); |
| stream_message_cb(...); |
| stream_change_cb(...); |
| ... |
| stream_change_cb(...); |
| stream_stop_cb(...); <-- end of first block of changes |
| |
| stream_start_cb(...); <-- start of second block of changes |
| stream_change_cb(...); |
| stream_change_cb(...); |
| stream_change_cb(...); |
| ... |
| stream_message_cb(...); |
| stream_change_cb(...); |
| stream_stop_cb(...); <-- end of second block of changes |
| |
| stream_commit_cb(...); <-- commit of the streamed transaction |
| </programlisting> |
| </para> |
| |
| <para> |
| The actual sequence of callback calls may be more complicated, of course. |
| There may be blocks for multiple streamed transactions, some of the |
| transactions may get aborted, etc. |
| </para> |
| |
| <para> |
| Similar to spill-to-disk behavior, streaming is triggered when the total |
| amount of changes decoded from the WAL (for all in-progress transactions) |
| exceeds the limit defined by <varname>logical_decoding_work_mem</varname> setting. |
| At that point, the largest top-level transaction (measured by the amount of memory |
| currently used for decoded changes) is selected and streamed. However, in |
| some cases we still have to spill to disk even if streaming is enabled |
| because we exceed the memory threshold but still have not decoded the |
| complete tuple e.g., only decoded toast table insert but not the main table |
| insert. |
| </para> |
| |
| <para> |
| Even when streaming large transactions, the changes are still applied in |
| commit order, preserving the same guarantees as the non-streaming mode. |
| </para> |
| |
| </sect1> |
| |
| <sect1 id="logicaldecoding-two-phase-commits"> |
| <title>Two-phase Commit Support for Logical Decoding</title> |
| |
| <para> |
| With the basic output plugin callbacks (eg., <function>begin_cb</function>, |
| <function>change_cb</function>, <function>commit_cb</function> and |
| <function>message_cb</function>) two-phase commit commands like |
| <command>PREPARE TRANSACTION</command>, <command>COMMIT PREPARED</command> |
| and <command>ROLLBACK PREPARED</command> are not decoded. While the |
| <command>PREPARE TRANSACTION</command> is ignored, |
| <command>COMMIT PREPARED</command> is decoded as a <command>COMMIT</command> |
| and <command>ROLLBACK PREPARED</command> is decoded as a |
| <command>ROLLBACK</command>. |
| </para> |
| |
| <para> |
| To support the streaming of two-phase commands, an output plugin needs to |
| provide additional callbacks. There are multiple two-phase commit callbacks |
| that are required, (<function>begin_prepare_cb</function>, |
| <function>prepare_cb</function>, <function>commit_prepared_cb</function>, |
| <function>rollback_prepared_cb</function> and |
| <function>stream_prepare_cb</function>) and an optional callback |
| (<function>filter_prepare_cb</function>). |
| </para> |
| |
| <para> |
| If the output plugin callbacks for decoding two-phase commit commands are |
| provided, then on <command>PREPARE TRANSACTION</command>, the changes of |
| that transaction are decoded, passed to the output plugin, and the |
| <function>prepare_cb</function> callback is invoked. This differs from the |
| basic decoding setup where changes are only passed to the output plugin |
| when a transaction is committed. The start of a prepared transaction is |
| indicated by the <function>begin_prepare_cb</function> callback. |
| </para> |
| |
| <para> |
| When a prepared transaction is rolled back using the |
| <command>ROLLBACK PREPARED</command>, then the |
| <function>rollback_prepared_cb</function> callback is invoked and when the |
| prepared transaction is committed using <command>COMMIT PREPARED</command>, |
| then the <function>commit_prepared_cb</function> callback is invoked. |
| </para> |
| |
| <para> |
| Optionally the output plugin can define filtering rules via |
| <function>filter_prepare_cb</function> to decode only specific transaction |
| in two phases. This can be achieved by pattern matching on the |
| <parameter>gid</parameter> or via lookups using the |
| <parameter>xid</parameter>. |
| </para> |
| |
| <para> |
| The users that want to decode prepared transactions need to be careful about |
| below mentioned points: |
| |
| <itemizedlist> |
| <listitem> |
| <para> |
| If the prepared transaction has locked [user] catalog tables exclusively |
| then decoding prepare can block till the main transaction is committed. |
| </para> |
| </listitem> |
| |
| <listitem> |
| <para> |
| The logical replication solution that builds distributed two phase commit |
| using this feature can deadlock if the prepared transaction has locked |
| [user] catalog tables exclusively. To avoid this users must refrain from |
| having locks on catalog tables (e.g. explicit <command>LOCK</command> command) |
| in such transactions. |
| See <xref linkend="logicaldecoding-synchronous-caveats"/> for the details. |
| </para> |
| </listitem> |
| </itemizedlist> |
| </para> |
| |
| </sect1> |
| </chapter> |