| /*------------------------------------------------------------------------- |
| * |
| * tcn.c |
| * triggered change notification support for PostgreSQL |
| * |
| * Portions Copyright (c) 2011-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * contrib/tcn/tcn.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/htup_details.h" |
| #include "commands/async.h" |
| #include "commands/trigger.h" |
| #include "executor/spi.h" |
| #include "lib/stringinfo.h" |
| #include "utils/rel.h" |
| #include "utils/syscache.h" |
| |
| PG_MODULE_MAGIC; |
| |
| /* |
| * Copy from s (for source) to r (for result), wrapping with q (quote) |
| * characters and doubling any quote characters found. |
| */ |
| static void |
| strcpy_quoted(StringInfo r, const char *s, const char q) |
| { |
| appendStringInfoCharMacro(r, q); |
| while (*s) |
| { |
| if (*s == q) |
| appendStringInfoCharMacro(r, q); |
| appendStringInfoCharMacro(r, *s); |
| s++; |
| } |
| appendStringInfoCharMacro(r, q); |
| } |
| |
| /* |
| * triggered_change_notification |
| * |
| * This trigger function will send a notification of data modification with |
| * primary key values. The channel will be "tcn" unless the trigger is |
| * created with a parameter, in which case that parameter will be used. |
| */ |
| PG_FUNCTION_INFO_V1(triggered_change_notification); |
| |
| Datum |
| triggered_change_notification(PG_FUNCTION_ARGS) |
| { |
| TriggerData *trigdata = (TriggerData *) fcinfo->context; |
| Trigger *trigger; |
| int nargs; |
| HeapTuple trigtuple; |
| Relation rel; |
| TupleDesc tupdesc; |
| char *channel; |
| char operation; |
| StringInfo payload = makeStringInfo(); |
| bool foundPK; |
| |
| List *indexoidlist; |
| ListCell *indexoidscan; |
| |
| /* make sure it's called as a trigger */ |
| if (!CALLED_AS_TRIGGER(fcinfo)) |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), |
| errmsg("triggered_change_notification: must be called as trigger"))); |
| |
| /* and that it's called after the change */ |
| if (!TRIGGER_FIRED_AFTER(trigdata->tg_event)) |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), |
| errmsg("triggered_change_notification: must be called after the change"))); |
| |
| /* and that it's called for each row */ |
| if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), |
| errmsg("triggered_change_notification: must be called for each row"))); |
| |
| if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) |
| operation = 'I'; |
| else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) |
| operation = 'U'; |
| else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) |
| operation = 'D'; |
| else |
| { |
| elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation"); |
| operation = 'X'; /* silence compiler warning */ |
| } |
| |
| trigger = trigdata->tg_trigger; |
| nargs = trigger->tgnargs; |
| if (nargs > 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), |
| errmsg("triggered_change_notification: must not be called with more than one parameter"))); |
| |
| if (nargs == 0) |
| channel = "tcn"; |
| else |
| channel = trigger->tgargs[0]; |
| |
| /* get tuple data */ |
| trigtuple = trigdata->tg_trigtuple; |
| rel = trigdata->tg_relation; |
| tupdesc = rel->rd_att; |
| |
| foundPK = false; |
| |
| /* |
| * Get the list of index OIDs for the table from the relcache, and look up |
| * each one in the pg_index syscache until we find one marked primary key |
| * (hopefully there isn't more than one such). |
| */ |
| indexoidlist = RelationGetIndexList(rel); |
| |
| foreach(indexoidscan, indexoidlist) |
| { |
| Oid indexoid = lfirst_oid(indexoidscan); |
| HeapTuple indexTuple; |
| Form_pg_index index; |
| |
| indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); |
| if (!HeapTupleIsValid(indexTuple)) /* should not happen */ |
| elog(ERROR, "cache lookup failed for index %u", indexoid); |
| index = (Form_pg_index) GETSTRUCT(indexTuple); |
| /* we're only interested if it is the primary key and valid */ |
| if (index->indisprimary && index->indisvalid) |
| { |
| int indnkeyatts = index->indnkeyatts; |
| |
| if (indnkeyatts > 0) |
| { |
| int i; |
| |
| foundPK = true; |
| |
| strcpy_quoted(payload, RelationGetRelationName(rel), '"'); |
| appendStringInfoCharMacro(payload, ','); |
| appendStringInfoCharMacro(payload, operation); |
| |
| for (i = 0; i < indnkeyatts; i++) |
| { |
| int colno = index->indkey.values[i]; |
| Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1); |
| |
| appendStringInfoCharMacro(payload, ','); |
| strcpy_quoted(payload, NameStr(attr->attname), '"'); |
| appendStringInfoCharMacro(payload, '='); |
| strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\''); |
| } |
| |
| Async_Notify(channel, payload->data); |
| } |
| ReleaseSysCache(indexTuple); |
| break; |
| } |
| ReleaseSysCache(indexTuple); |
| } |
| |
| list_free(indexoidlist); |
| |
| if (!foundPK) |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), |
| errmsg("triggered_change_notification: must be called on a table with a primary key"))); |
| |
| return PointerGetDatum(NULL); /* after trigger; value doesn't matter */ |
| } |