| /*------------------------------------------------------------------------- |
| * |
| * test_decoding.c |
| * example logical decoding output plugin |
| * |
| * Copyright (c) 2012-2023, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * contrib/test_decoding/test_decoding.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include "catalog/pg_type.h" |
| |
| #include "replication/logical.h" |
| #include "replication/origin.h" |
| |
| #include "utils/builtins.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/rel.h" |
| |
| PG_MODULE_MAGIC; |
| |
| typedef struct |
| { |
| MemoryContext context; |
| bool include_xids; |
| bool include_timestamp; |
| bool skip_empty_xacts; |
| bool only_local; |
| } TestDecodingData; |
| |
| /* |
| * Maintain the per-transaction level variables to track whether the |
| * transaction and or streams have written any changes. In streaming mode the |
| * transaction can be decoded in streams so along with maintaining whether the |
| * transaction has written any changes, we also need to track whether the |
| * current stream has written any changes. This is required so that if user |
| * has requested to skip the empty transactions we can skip the empty streams |
| * even though the transaction has written some changes. |
| */ |
| typedef struct |
| { |
| bool xact_wrote_changes; |
| bool stream_wrote_changes; |
| } TestDecodingTxnData; |
| |
| static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, |
| bool is_init); |
| static void pg_decode_shutdown(LogicalDecodingContext *ctx); |
| static void pg_decode_begin_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| static void pg_output_begin(LogicalDecodingContext *ctx, |
| TestDecodingData *data, |
| ReorderBufferTXN *txn, |
| bool last_write); |
| static void pg_decode_commit_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, XLogRecPtr commit_lsn); |
| static void pg_decode_change(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, Relation relation, |
| ReorderBufferChange *change); |
| static void pg_decode_truncate(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| int nrelations, Relation relations[], |
| ReorderBufferChange *change); |
| static bool pg_decode_filter(LogicalDecodingContext *ctx, |
| RepOriginId origin_id); |
| static void pg_decode_message(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, XLogRecPtr lsn, |
| bool transactional, const char *prefix, |
| Size sz, const char *message); |
| static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, |
| TransactionId xid, |
| const char *gid); |
| static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_lsn); |
| static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn); |
| static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_end_lsn, |
| TimestampTz prepare_time); |
| static void pg_decode_stream_start(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| static void pg_output_stream_start(LogicalDecodingContext *ctx, |
| TestDecodingData *data, |
| ReorderBufferTXN *txn, |
| bool last_write); |
| static void pg_decode_stream_stop(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn); |
| static void pg_decode_stream_abort(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr abort_lsn); |
| static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_lsn); |
| static void pg_decode_stream_commit(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn); |
| static void pg_decode_stream_change(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| Relation relation, |
| ReorderBufferChange *change); |
| static void pg_decode_stream_message(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, XLogRecPtr lsn, |
| bool transactional, const char *prefix, |
| Size sz, const char *message); |
| static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| int nrelations, Relation relations[], |
| ReorderBufferChange *change); |
| |
| void |
| _PG_init(void) |
| { |
| /* other plugins can perform things here */ |
| } |
| |
| /* specify output plugin callbacks */ |
| void |
| _PG_output_plugin_init(OutputPluginCallbacks *cb) |
| { |
| cb->startup_cb = pg_decode_startup; |
| cb->begin_cb = pg_decode_begin_txn; |
| cb->change_cb = pg_decode_change; |
| cb->truncate_cb = pg_decode_truncate; |
| cb->commit_cb = pg_decode_commit_txn; |
| cb->filter_by_origin_cb = pg_decode_filter; |
| cb->shutdown_cb = pg_decode_shutdown; |
| cb->message_cb = pg_decode_message; |
| cb->filter_prepare_cb = pg_decode_filter_prepare; |
| cb->begin_prepare_cb = pg_decode_begin_prepare_txn; |
| cb->prepare_cb = pg_decode_prepare_txn; |
| cb->commit_prepared_cb = pg_decode_commit_prepared_txn; |
| cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn; |
| cb->stream_start_cb = pg_decode_stream_start; |
| cb->stream_stop_cb = pg_decode_stream_stop; |
| cb->stream_abort_cb = pg_decode_stream_abort; |
| cb->stream_prepare_cb = pg_decode_stream_prepare; |
| cb->stream_commit_cb = pg_decode_stream_commit; |
| cb->stream_change_cb = pg_decode_stream_change; |
| cb->stream_message_cb = pg_decode_stream_message; |
| cb->stream_truncate_cb = pg_decode_stream_truncate; |
| } |
| |
| |
| /* initialize this plugin */ |
| static void |
| pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, |
| bool is_init) |
| { |
| ListCell *option; |
| TestDecodingData *data; |
| bool enable_streaming = false; |
| |
| data = palloc0(sizeof(TestDecodingData)); |
| data->context = AllocSetContextCreate(ctx->context, |
| "text conversion context", |
| ALLOCSET_DEFAULT_SIZES); |
| data->include_xids = true; |
| data->include_timestamp = false; |
| data->skip_empty_xacts = false; |
| data->only_local = false; |
| |
| ctx->output_plugin_private = data; |
| |
| opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; |
| opt->receive_rewrites = false; |
| |
| foreach(option, ctx->output_plugin_options) |
| { |
| DefElem *elem = lfirst(option); |
| |
| Assert(elem->arg == NULL || IsA(elem->arg, String)); |
| |
| if (strcmp(elem->defname, "include-xids") == 0) |
| { |
| /* if option does not provide a value, it means its value is true */ |
| if (elem->arg == NULL) |
| data->include_xids = true; |
| else if (!parse_bool(strVal(elem->arg), &data->include_xids)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| } |
| else if (strcmp(elem->defname, "include-timestamp") == 0) |
| { |
| if (elem->arg == NULL) |
| data->include_timestamp = true; |
| else if (!parse_bool(strVal(elem->arg), &data->include_timestamp)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| } |
| else if (strcmp(elem->defname, "force-binary") == 0) |
| { |
| bool force_binary; |
| |
| if (elem->arg == NULL) |
| continue; |
| else if (!parse_bool(strVal(elem->arg), &force_binary)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| |
| if (force_binary) |
| opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; |
| } |
| else if (strcmp(elem->defname, "skip-empty-xacts") == 0) |
| { |
| |
| if (elem->arg == NULL) |
| data->skip_empty_xacts = true; |
| else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| } |
| else if (strcmp(elem->defname, "only-local") == 0) |
| { |
| |
| if (elem->arg == NULL) |
| data->only_local = true; |
| else if (!parse_bool(strVal(elem->arg), &data->only_local)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| } |
| else if (strcmp(elem->defname, "include-rewrites") == 0) |
| { |
| |
| if (elem->arg == NULL) |
| continue; |
| else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| } |
| else if (strcmp(elem->defname, "stream-changes") == 0) |
| { |
| if (elem->arg == NULL) |
| continue; |
| else if (!parse_bool(strVal(elem->arg), &enable_streaming)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("could not parse value \"%s\" for parameter \"%s\"", |
| strVal(elem->arg), elem->defname))); |
| } |
| else |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("option \"%s\" = \"%s\" is unknown", |
| elem->defname, |
| elem->arg ? strVal(elem->arg) : "(null)"))); |
| } |
| } |
| |
| ctx->streaming &= enable_streaming; |
| } |
| |
| /* cleanup this plugin's resources */ |
| static void |
| pg_decode_shutdown(LogicalDecodingContext *ctx) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| |
| /* cleanup our own resources via memory context reset */ |
| MemoryContextDelete(data->context); |
| } |
| |
| /* BEGIN callback */ |
| static void |
| pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = |
| MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); |
| |
| txndata->xact_wrote_changes = false; |
| txn->output_plugin_private = txndata; |
| |
| /* |
| * If asked to skip empty transactions, we'll emit BEGIN at the point |
| * where the first operation is received for this transaction. |
| */ |
| if (data->skip_empty_xacts) |
| return; |
| |
| pg_output_begin(ctx, data, txn, true); |
| } |
| |
| static void |
| pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) |
| { |
| OutputPluginPrepareWrite(ctx, last_write); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "BEGIN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "BEGIN"); |
| OutputPluginWrite(ctx, last_write); |
| } |
| |
| /* COMMIT callback */ |
| static void |
| pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| bool xact_wrote_changes = txndata->xact_wrote_changes; |
| |
| pfree(txndata); |
| txn->output_plugin_private = NULL; |
| |
| if (data->skip_empty_xacts && !xact_wrote_changes) |
| return; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "COMMIT %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "COMMIT"); |
| |
| if (data->include_timestamp) |
| appendStringInfo(ctx->out, " (at %s)", |
| timestamptz_to_str(txn->xact_time.commit_time)); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* BEGIN PREPARE callback */ |
| static void |
| pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = |
| MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); |
| |
| txndata->xact_wrote_changes = false; |
| txn->output_plugin_private = txndata; |
| |
| /* |
| * If asked to skip empty transactions, we'll emit BEGIN at the point |
| * where the first operation is received for this transaction. |
| */ |
| if (data->skip_empty_xacts) |
| return; |
| |
| pg_output_begin(ctx, data, txn, true); |
| } |
| |
| /* PREPARE callback */ |
| static void |
| pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, |
| XLogRecPtr prepare_lsn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| |
| /* |
| * If asked to skip empty transactions, we'll emit PREPARE at the point |
| * where the first operation is received for this transaction. |
| */ |
| if (data->skip_empty_xacts && !txndata->xact_wrote_changes) |
| return; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", |
| quote_literal_cstr(txn->gid)); |
| |
| if (data->include_xids) |
| appendStringInfo(ctx->out, ", txid %u", txn->xid); |
| |
| if (data->include_timestamp) |
| appendStringInfo(ctx->out, " (at %s)", |
| timestamptz_to_str(txn->xact_time.prepare_time)); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* COMMIT PREPARED callback */ |
| static void |
| pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| appendStringInfo(ctx->out, "COMMIT PREPARED %s", |
| quote_literal_cstr(txn->gid)); |
| |
| if (data->include_xids) |
| appendStringInfo(ctx->out, ", txid %u", txn->xid); |
| |
| if (data->include_timestamp) |
| appendStringInfo(ctx->out, " (at %s)", |
| timestamptz_to_str(txn->xact_time.commit_time)); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* ROLLBACK PREPARED callback */ |
| static void |
| pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_end_lsn, |
| TimestampTz prepare_time) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", |
| quote_literal_cstr(txn->gid)); |
| |
| if (data->include_xids) |
| appendStringInfo(ctx->out, ", txid %u", txn->xid); |
| |
| if (data->include_timestamp) |
| appendStringInfo(ctx->out, " (at %s)", |
| timestamptz_to_str(txn->xact_time.commit_time)); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* |
| * Filter out two-phase transactions. |
| * |
| * Each plugin can implement its own filtering logic. Here we demonstrate a |
| * simple logic by checking the GID. If the GID contains the "_nodecode" |
| * substring, then we filter it out. |
| */ |
| static bool |
| pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, |
| const char *gid) |
| { |
| if (strstr(gid, "_nodecode") != NULL) |
| return true; |
| |
| return false; |
| } |
| |
| static bool |
| pg_decode_filter(LogicalDecodingContext *ctx, |
| RepOriginId origin_id) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| |
| if (data->only_local && origin_id != InvalidRepOriginId) |
| return true; |
| return false; |
| } |
| |
| /* |
| * Print literal `outputstr' already represented as string of type `typid' |
| * into stringbuf `s'. |
| * |
| * Some builtin types aren't quoted, the rest is quoted. Escaping is done as |
| * if standard_conforming_strings were enabled. |
| */ |
| static void |
| print_literal(StringInfo s, Oid typid, char *outputstr) |
| { |
| const char *valptr; |
| |
| switch (typid) |
| { |
| case INT2OID: |
| case INT4OID: |
| case INT8OID: |
| case OIDOID: |
| case FLOAT4OID: |
| case FLOAT8OID: |
| case NUMERICOID: |
| /* NB: We don't care about Inf, NaN et al. */ |
| appendStringInfoString(s, outputstr); |
| break; |
| |
| case BITOID: |
| case VARBITOID: |
| appendStringInfo(s, "B'%s'", outputstr); |
| break; |
| |
| case BOOLOID: |
| if (strcmp(outputstr, "t") == 0) |
| appendStringInfoString(s, "true"); |
| else |
| appendStringInfoString(s, "false"); |
| break; |
| |
| default: |
| appendStringInfoChar(s, '\''); |
| for (valptr = outputstr; *valptr; valptr++) |
| { |
| char ch = *valptr; |
| |
| if (SQL_STR_DOUBLE(ch, false)) |
| appendStringInfoChar(s, ch); |
| appendStringInfoChar(s, ch); |
| } |
| appendStringInfoChar(s, '\''); |
| break; |
| } |
| } |
| |
| /* print the tuple 'tuple' into the StringInfo s */ |
| static void |
| tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls) |
| { |
| int natt; |
| |
| /* print all columns individually */ |
| for (natt = 0; natt < tupdesc->natts; natt++) |
| { |
| Form_pg_attribute attr; /* the attribute itself */ |
| Oid typid; /* type of current attribute */ |
| Oid typoutput; /* output function */ |
| bool typisvarlena; |
| Datum origval; /* possibly toasted Datum */ |
| bool isnull; /* column is null? */ |
| |
| attr = TupleDescAttr(tupdesc, natt); |
| |
| /* |
| * don't print dropped columns, we can't be sure everything is |
| * available for them |
| */ |
| if (attr->attisdropped) |
| continue; |
| |
| /* |
| * Don't print system columns, oid will already have been printed if |
| * present. |
| */ |
| if (attr->attnum < 0) |
| continue; |
| |
| typid = attr->atttypid; |
| |
| /* get Datum from tuple */ |
| origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); |
| |
| if (isnull && skip_nulls) |
| continue; |
| |
| /* print attribute name */ |
| appendStringInfoChar(s, ' '); |
| appendStringInfoString(s, quote_identifier(NameStr(attr->attname))); |
| |
| /* print attribute type */ |
| appendStringInfoChar(s, '['); |
| appendStringInfoString(s, format_type_be(typid)); |
| appendStringInfoChar(s, ']'); |
| |
| /* query output function */ |
| getTypeOutputInfo(typid, |
| &typoutput, &typisvarlena); |
| |
| /* print separator */ |
| appendStringInfoChar(s, ':'); |
| |
| /* print data */ |
| if (isnull) |
| appendStringInfoString(s, "null"); |
| else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) |
| appendStringInfoString(s, "unchanged-toast-datum"); |
| else if (!typisvarlena) |
| print_literal(s, typid, |
| OidOutputFunctionCall(typoutput, origval)); |
| else |
| { |
| Datum val; /* definitely detoasted Datum */ |
| |
| val = PointerGetDatum(PG_DETOAST_DATUM(origval)); |
| print_literal(s, typid, OidOutputFunctionCall(typoutput, val)); |
| } |
| } |
| } |
| |
| /* |
| * callback for individual changed tuples |
| */ |
| static void |
| pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, |
| Relation relation, ReorderBufferChange *change) |
| { |
| TestDecodingData *data; |
| TestDecodingTxnData *txndata; |
| Form_pg_class class_form; |
| TupleDesc tupdesc; |
| MemoryContext old; |
| |
| data = ctx->output_plugin_private; |
| txndata = txn->output_plugin_private; |
| |
| /* output BEGIN if we haven't yet */ |
| if (data->skip_empty_xacts && !txndata->xact_wrote_changes) |
| { |
| pg_output_begin(ctx, data, txn, false); |
| } |
| txndata->xact_wrote_changes = true; |
| |
| class_form = RelationGetForm(relation); |
| tupdesc = RelationGetDescr(relation); |
| |
| /* Avoid leaking memory by using and resetting our own context */ |
| old = MemoryContextSwitchTo(data->context); |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| appendStringInfoString(ctx->out, "table "); |
| appendStringInfoString(ctx->out, |
| quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), |
| class_form->relrewrite ? |
| get_rel_name(class_form->relrewrite) : |
| NameStr(class_form->relname))); |
| appendStringInfoChar(ctx->out, ':'); |
| |
| switch (change->action) |
| { |
| case REORDER_BUFFER_CHANGE_INSERT: |
| appendStringInfoString(ctx->out, " INSERT:"); |
| if (change->data.tp.newtuple == NULL) |
| appendStringInfoString(ctx->out, " (no-tuple-data)"); |
| else |
| tuple_to_stringinfo(ctx->out, tupdesc, |
| &change->data.tp.newtuple->tuple, |
| false); |
| break; |
| case REORDER_BUFFER_CHANGE_UPDATE: |
| appendStringInfoString(ctx->out, " UPDATE:"); |
| if (change->data.tp.oldtuple != NULL) |
| { |
| appendStringInfoString(ctx->out, " old-key:"); |
| tuple_to_stringinfo(ctx->out, tupdesc, |
| &change->data.tp.oldtuple->tuple, |
| true); |
| appendStringInfoString(ctx->out, " new-tuple:"); |
| } |
| |
| if (change->data.tp.newtuple == NULL) |
| appendStringInfoString(ctx->out, " (no-tuple-data)"); |
| else |
| tuple_to_stringinfo(ctx->out, tupdesc, |
| &change->data.tp.newtuple->tuple, |
| false); |
| break; |
| case REORDER_BUFFER_CHANGE_DELETE: |
| appendStringInfoString(ctx->out, " DELETE:"); |
| |
| /* if there was no PK, we only know that a delete happened */ |
| if (change->data.tp.oldtuple == NULL) |
| appendStringInfoString(ctx->out, " (no-tuple-data)"); |
| /* In DELETE, only the replica identity is present; display that */ |
| else |
| tuple_to_stringinfo(ctx->out, tupdesc, |
| &change->data.tp.oldtuple->tuple, |
| true); |
| break; |
| default: |
| Assert(false); |
| } |
| |
| MemoryContextSwitchTo(old); |
| MemoryContextReset(data->context); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| static void |
| pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, |
| int nrelations, Relation relations[], ReorderBufferChange *change) |
| { |
| TestDecodingData *data; |
| TestDecodingTxnData *txndata; |
| MemoryContext old; |
| int i; |
| |
| data = ctx->output_plugin_private; |
| txndata = txn->output_plugin_private; |
| |
| /* output BEGIN if we haven't yet */ |
| if (data->skip_empty_xacts && !txndata->xact_wrote_changes) |
| { |
| pg_output_begin(ctx, data, txn, false); |
| } |
| txndata->xact_wrote_changes = true; |
| |
| /* Avoid leaking memory by using and resetting our own context */ |
| old = MemoryContextSwitchTo(data->context); |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| appendStringInfoString(ctx->out, "table "); |
| |
| for (i = 0; i < nrelations; i++) |
| { |
| if (i > 0) |
| appendStringInfoString(ctx->out, ", "); |
| |
| appendStringInfoString(ctx->out, |
| quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace), |
| NameStr(relations[i]->rd_rel->relname))); |
| } |
| |
| appendStringInfoString(ctx->out, ": TRUNCATE:"); |
| |
| if (change->data.truncate.restart_seqs |
| || change->data.truncate.cascade) |
| { |
| if (change->data.truncate.restart_seqs) |
| appendStringInfoString(ctx->out, " restart_seqs"); |
| if (change->data.truncate.cascade) |
| appendStringInfoString(ctx->out, " cascade"); |
| } |
| else |
| appendStringInfoString(ctx->out, " (no-flags)"); |
| |
| MemoryContextSwitchTo(old); |
| MemoryContextReset(data->context); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| static void |
| pg_decode_message(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, |
| const char *prefix, Size sz, const char *message) |
| { |
| OutputPluginPrepareWrite(ctx, true); |
| appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", |
| transactional, prefix, sz); |
| appendBinaryStringInfo(ctx->out, message, sz); |
| OutputPluginWrite(ctx, true); |
| } |
| |
| static void |
| pg_decode_stream_start(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| |
| /* |
| * Allocate the txn plugin data for the first stream in the transaction. |
| */ |
| if (txndata == NULL) |
| { |
| txndata = |
| MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); |
| txndata->xact_wrote_changes = false; |
| txn->output_plugin_private = txndata; |
| } |
| |
| txndata->stream_wrote_changes = false; |
| if (data->skip_empty_xacts) |
| return; |
| pg_output_stream_start(ctx, data, txn, true); |
| } |
| |
| static void |
| pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) |
| { |
| OutputPluginPrepareWrite(ctx, last_write); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "opening a streamed block for transaction"); |
| OutputPluginWrite(ctx, last_write); |
| } |
| |
| static void |
| pg_decode_stream_stop(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| |
| if (data->skip_empty_xacts && !txndata->stream_wrote_changes) |
| return; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "closing a streamed block for transaction"); |
| OutputPluginWrite(ctx, true); |
| } |
| |
| static void |
| pg_decode_stream_abort(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr abort_lsn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| |
| /* |
| * stream abort can be sent for an individual subtransaction but we |
| * maintain the output_plugin_private only under the toptxn so if this is |
| * not the toptxn then fetch the toptxn. |
| */ |
| ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); |
| TestDecodingTxnData *txndata = toptxn->output_plugin_private; |
| bool xact_wrote_changes = txndata->xact_wrote_changes; |
| |
| if (rbtxn_is_toptxn(txn)) |
| { |
| Assert(txn->output_plugin_private != NULL); |
| pfree(txndata); |
| txn->output_plugin_private = NULL; |
| } |
| |
| if (data->skip_empty_xacts && !xact_wrote_changes) |
| return; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "aborting streamed (sub)transaction"); |
| OutputPluginWrite(ctx, true); |
| } |
| |
| static void |
| pg_decode_stream_prepare(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr prepare_lsn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| |
| if (data->skip_empty_xacts && !txndata->xact_wrote_changes) |
| return; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u", |
| quote_literal_cstr(txn->gid), txn->xid); |
| else |
| appendStringInfo(ctx->out, "preparing streamed transaction %s", |
| quote_literal_cstr(txn->gid)); |
| |
| if (data->include_timestamp) |
| appendStringInfo(ctx->out, " (at %s)", |
| timestamptz_to_str(txn->xact_time.prepare_time)); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| static void |
| pg_decode_stream_commit(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| XLogRecPtr commit_lsn) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| bool xact_wrote_changes = txndata->xact_wrote_changes; |
| |
| pfree(txndata); |
| txn->output_plugin_private = NULL; |
| |
| if (data->skip_empty_xacts && !xact_wrote_changes) |
| return; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "committing streamed transaction"); |
| |
| if (data->include_timestamp) |
| appendStringInfo(ctx->out, " (at %s)", |
| timestamptz_to_str(txn->xact_time.commit_time)); |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* |
| * In streaming mode, we don't display the changes as the transaction can abort |
| * at a later point in time. We don't want users to see the changes until the |
| * transaction is committed. |
| */ |
| static void |
| pg_decode_stream_change(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, |
| Relation relation, |
| ReorderBufferChange *change) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| |
| /* output stream start if we haven't yet */ |
| if (data->skip_empty_xacts && !txndata->stream_wrote_changes) |
| { |
| pg_output_stream_start(ctx, data, txn, false); |
| } |
| txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "streaming change for transaction"); |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* |
| * In streaming mode, we don't display the contents for transactional messages |
| * as the transaction can abort at a later point in time. We don't want users to |
| * see the message contents until the transaction is committed. |
| */ |
| static void |
| pg_decode_stream_message(LogicalDecodingContext *ctx, |
| ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, |
| const char *prefix, Size sz, const char *message) |
| { |
| OutputPluginPrepareWrite(ctx, true); |
| |
| if (transactional) |
| { |
| appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu", |
| transactional, prefix, sz); |
| } |
| else |
| { |
| appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:", |
| transactional, prefix, sz); |
| appendBinaryStringInfo(ctx->out, message, sz); |
| } |
| |
| OutputPluginWrite(ctx, true); |
| } |
| |
| /* |
| * In streaming mode, we don't display the detailed information of Truncate. |
| * See pg_decode_stream_change. |
| */ |
| static void |
| pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, |
| int nrelations, Relation relations[], |
| ReorderBufferChange *change) |
| { |
| TestDecodingData *data = ctx->output_plugin_private; |
| TestDecodingTxnData *txndata = txn->output_plugin_private; |
| |
| if (data->skip_empty_xacts && !txndata->stream_wrote_changes) |
| { |
| pg_output_stream_start(ctx, data, txn, false); |
| } |
| txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; |
| |
| OutputPluginPrepareWrite(ctx, true); |
| if (data->include_xids) |
| appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); |
| else |
| appendStringInfoString(ctx->out, "streaming truncate for transaction"); |
| OutputPluginWrite(ctx, true); |
| } |