| /*------------------------------------------------------------------------- |
| * relation.c |
| * PostgreSQL logical replication relation mapping cache |
| * |
| * Copyright (c) 2016-2021, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * src/backend/replication/logical/relation.c |
| * |
| * NOTES |
| * Routines in this file mainly have to do with mapping the properties |
| * of local replication target relations to the properties of their |
| * remote counterpart. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/table.h" |
| #include "catalog/namespace.h" |
| #include "catalog/pg_subscription_rel.h" |
| #include "executor/executor.h" |
| #include "nodes/makefuncs.h" |
| #include "replication/logicalrelation.h" |
| #include "replication/worker_internal.h" |
| #include "utils/inval.h" |
| |
| |
| static MemoryContext LogicalRepRelMapContext = NULL; |
| |
| static HTAB *LogicalRepRelMap = NULL; |
| |
| /* |
| * Partition map (LogicalRepPartMap) |
| * |
| * When a partitioned table is used as replication target, replicated |
| * operations are actually performed on its leaf partitions, which requires |
| * the partitions to also be mapped to the remote relation. Parent's entry |
| * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because |
| * individual partitions may have different attribute numbers, which means |
| * attribute mappings to remote relation's attributes must be maintained |
| * separately for each partition. |
| */ |
| static MemoryContext LogicalRepPartMapContext = NULL; |
| static HTAB *LogicalRepPartMap = NULL; |
| typedef struct LogicalRepPartMapEntry |
| { |
| Oid partoid; /* LogicalRepPartMap's key */ |
| LogicalRepRelMapEntry relmapentry; |
| } LogicalRepPartMapEntry; |
| |
| /* |
| * Relcache invalidation callback for our relation map cache. |
| */ |
| static void |
| logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) |
| { |
| LogicalRepRelMapEntry *entry; |
| |
| /* Just to be sure. */ |
| if (LogicalRepRelMap == NULL) |
| return; |
| |
| if (reloid != InvalidOid) |
| { |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepRelMap); |
| |
| /* TODO, use inverse lookup hashtable? */ |
| while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
| { |
| if (entry->localreloid == reloid) |
| { |
| entry->localrelvalid = false; |
| hash_seq_term(&status); |
| break; |
| } |
| } |
| } |
| else |
| { |
| /* invalidate all cache entries */ |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepRelMap); |
| |
| while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
| entry->localrelvalid = false; |
| } |
| } |
| |
| /* |
| * Initialize the relation map cache. |
| */ |
| static void |
| logicalrep_relmap_init(void) |
| { |
| HASHCTL ctl; |
| |
| if (!LogicalRepRelMapContext) |
| LogicalRepRelMapContext = |
| AllocSetContextCreate(CacheMemoryContext, |
| "LogicalRepRelMapContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* Initialize the relation hash table. */ |
| ctl.keysize = sizeof(LogicalRepRelId); |
| ctl.entrysize = sizeof(LogicalRepRelMapEntry); |
| ctl.hcxt = LogicalRepRelMapContext; |
| |
| LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl, |
| HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
| |
| /* Watch for invalidation events. */ |
| CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, |
| (Datum) 0); |
| } |
| |
| /* |
| * Free the entry of a relation map cache. |
| */ |
| static void |
| logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) |
| { |
| LogicalRepRelation *remoterel; |
| |
| remoterel = &entry->remoterel; |
| |
| pfree(remoterel->nspname); |
| pfree(remoterel->relname); |
| |
| if (remoterel->natts > 0) |
| { |
| int i; |
| |
| for (i = 0; i < remoterel->natts; i++) |
| pfree(remoterel->attnames[i]); |
| |
| pfree(remoterel->attnames); |
| pfree(remoterel->atttyps); |
| } |
| bms_free(remoterel->attkeys); |
| |
| if (entry->attrmap) |
| pfree(entry->attrmap); |
| } |
| |
| /* |
| * Add new entry or update existing entry in the relation map cache. |
| * |
| * Called when new relation mapping is sent by the publisher to update |
| * our expected view of incoming data from said publisher. |
| */ |
| void |
| logicalrep_relmap_update(LogicalRepRelation *remoterel) |
| { |
| MemoryContext oldctx; |
| LogicalRepRelMapEntry *entry; |
| bool found; |
| int i; |
| |
| if (LogicalRepRelMap == NULL) |
| logicalrep_relmap_init(); |
| |
| /* |
| * HASH_ENTER returns the existing entry if present or creates a new one. |
| */ |
| entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid, |
| HASH_ENTER, &found); |
| |
| if (found) |
| logicalrep_relmap_free_entry(entry); |
| |
| memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
| |
| /* Make cached copy of the data */ |
| oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| entry->remoterel.remoteid = remoterel->remoteid; |
| entry->remoterel.nspname = pstrdup(remoterel->nspname); |
| entry->remoterel.relname = pstrdup(remoterel->relname); |
| entry->remoterel.natts = remoterel->natts; |
| entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
| entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
| for (i = 0; i < remoterel->natts; i++) |
| { |
| entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
| entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
| } |
| entry->remoterel.replident = remoterel->replident; |
| entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
| MemoryContextSwitchTo(oldctx); |
| } |
| |
| /* |
| * Find attribute index in TupleDesc struct by attribute name. |
| * |
| * Returns -1 if not found. |
| */ |
| static int |
| logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) |
| { |
| int i; |
| |
| for (i = 0; i < remoterel->natts; i++) |
| { |
| if (strcmp(remoterel->attnames[i], attname) == 0) |
| return i; |
| } |
| |
| return -1; |
| } |
| |
| /* |
| * Report error with names of the missing local relation column(s), if any. |
| */ |
| static void |
| logicalrep_report_missing_attrs(LogicalRepRelation *remoterel, |
| Bitmapset *missingatts) |
| { |
| if (!bms_is_empty(missingatts)) |
| { |
| StringInfoData missingattsbuf; |
| int missingattcnt = 0; |
| int i; |
| |
| initStringInfo(&missingattsbuf); |
| |
| while ((i = bms_first_member(missingatts)) >= 0) |
| { |
| missingattcnt++; |
| if (missingattcnt == 1) |
| appendStringInfo(&missingattsbuf, _("\"%s\""), |
| remoterel->attnames[i]); |
| else |
| appendStringInfo(&missingattsbuf, _(", \"%s\""), |
| remoterel->attnames[i]); |
| } |
| |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s", |
| "logical replication target relation \"%s.%s\" is missing replicated columns: %s", |
| missingattcnt, |
| remoterel->nspname, |
| remoterel->relname, |
| missingattsbuf.data))); |
| } |
| } |
| |
| /* |
| * Open the local relation associated with the remote one. |
| * |
| * Rebuilds the Relcache mapping if it was invalidated by local DDL. |
| */ |
| LogicalRepRelMapEntry * |
| logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) |
| { |
| LogicalRepRelMapEntry *entry; |
| bool found; |
| LogicalRepRelation *remoterel; |
| |
| if (LogicalRepRelMap == NULL) |
| logicalrep_relmap_init(); |
| |
| /* Search for existing entry. */ |
| entry = hash_search(LogicalRepRelMap, (void *) &remoteid, |
| HASH_FIND, &found); |
| |
| if (!found) |
| elog(ERROR, "no relation map entry for remote relation ID %u", |
| remoteid); |
| |
| remoterel = &entry->remoterel; |
| |
| /* Ensure we don't leak a relcache refcount. */ |
| if (entry->localrel) |
| elog(ERROR, "remote relation ID %u is already open", remoteid); |
| |
| /* |
| * When opening and locking a relation, pending invalidation messages are |
| * processed which can invalidate the relation. Hence, if the entry is |
| * currently considered valid, try to open the local relation by OID and |
| * see if invalidation ensues. |
| */ |
| if (entry->localrelvalid) |
| { |
| entry->localrel = try_table_open(entry->localreloid, lockmode, false); |
| if (!entry->localrel) |
| { |
| /* Table was renamed or dropped. */ |
| entry->localrelvalid = false; |
| } |
| else if (!entry->localrelvalid) |
| { |
| /* Note we release the no-longer-useful lock here. */ |
| table_close(entry->localrel, lockmode); |
| entry->localrel = NULL; |
| } |
| } |
| |
| /* |
| * If the entry has been marked invalid since we last had lock on it, |
| * re-open the local relation by name and rebuild all derived data. |
| */ |
| if (!entry->localrelvalid) |
| { |
| Oid relid; |
| Bitmapset *idkey; |
| TupleDesc desc; |
| MemoryContext oldctx; |
| int i; |
| Bitmapset *missingatts; |
| |
| /* Release the no-longer-useful attrmap, if any. */ |
| if (entry->attrmap) |
| { |
| pfree(entry->attrmap); |
| entry->attrmap = NULL; |
| } |
| |
| /* Try to find and lock the relation by name. */ |
| relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, |
| remoterel->relname, -1), |
| lockmode, true); |
| if (!OidIsValid(relid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication target relation \"%s.%s\" does not exist", |
| remoterel->nspname, remoterel->relname))); |
| entry->localrel = table_open(relid, NoLock); |
| entry->localreloid = relid; |
| |
| /* Check for supported relkind. */ |
| CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, |
| remoterel->nspname, remoterel->relname); |
| |
| /* |
| * Build the mapping of local attribute numbers to remote attribute |
| * numbers and validate that we don't miss any replicated columns as |
| * that would result in potentially unwanted data loss. |
| */ |
| desc = RelationGetDescr(entry->localrel); |
| oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| entry->attrmap = make_attrmap(desc->natts); |
| MemoryContextSwitchTo(oldctx); |
| |
| /* check and report missing attrs, if any */ |
| missingatts = bms_add_range(NULL, 0, remoterel->natts - 1); |
| for (i = 0; i < desc->natts; i++) |
| { |
| int attnum; |
| Form_pg_attribute attr = TupleDescAttr(desc, i); |
| |
| if (attr->attisdropped || attr->attgenerated) |
| { |
| entry->attrmap->attnums[i] = -1; |
| continue; |
| } |
| |
| attnum = logicalrep_rel_att_by_name(remoterel, |
| NameStr(attr->attname)); |
| |
| entry->attrmap->attnums[i] = attnum; |
| if (attnum >= 0) |
| missingatts = bms_del_member(missingatts, attnum); |
| } |
| |
| logicalrep_report_missing_attrs(remoterel, missingatts); |
| |
| /* be tidy */ |
| bms_free(missingatts); |
| |
| /* |
| * Check that replica identity matches. We allow for stricter replica |
| * identity (fewer columns) on subscriber as that will not stop us |
| * from finding unique tuple. IE, if publisher has identity |
| * (id,timestamp) and subscriber just (id) this will not be a problem, |
| * but in the opposite scenario it will. |
| * |
| * Don't throw any error here just mark the relation entry as not |
| * updatable, as replica identity is only for updates and deletes but |
| * inserts can be replicated even without it. |
| */ |
| entry->updatable = true; |
| idkey = RelationGetIndexAttrBitmap(entry->localrel, |
| INDEX_ATTR_BITMAP_IDENTITY_KEY); |
| /* fallback to PK if no replica identity */ |
| if (idkey == NULL) |
| { |
| idkey = RelationGetIndexAttrBitmap(entry->localrel, |
| INDEX_ATTR_BITMAP_PRIMARY_KEY); |
| |
| /* |
| * If no replica identity index and no PK, the published table |
| * must have replica identity FULL. |
| */ |
| if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) |
| entry->updatable = false; |
| } |
| |
| i = -1; |
| while ((i = bms_next_member(idkey, i)) >= 0) |
| { |
| int attnum = i + FirstLowInvalidHeapAttributeNumber; |
| |
| if (!AttrNumberIsForUserDefinedAttr(attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication target relation \"%s.%s\" uses " |
| "system columns in REPLICA IDENTITY index", |
| remoterel->nspname, remoterel->relname))); |
| |
| attnum = AttrNumberGetAttrOffset(attnum); |
| |
| if (entry->attrmap->attnums[attnum] < 0 || |
| !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) |
| { |
| entry->updatable = false; |
| break; |
| } |
| } |
| |
| entry->localrelvalid = true; |
| } |
| |
| if (entry->state != SUBREL_STATE_READY) |
| entry->state = GetSubscriptionRelState(MySubscription->oid, |
| entry->localreloid, |
| &entry->statelsn); |
| |
| return entry; |
| } |
| |
| /* |
| * Close the previously opened logical relation. |
| */ |
| void |
| logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) |
| { |
| table_close(rel->localrel, lockmode); |
| rel->localrel = NULL; |
| } |
| |
| /* |
| * Partition cache: look up partition LogicalRepRelMapEntry's |
| * |
| * Unlike relation map cache, this is keyed by partition OID, not remote |
| * relation OID, because we only have to use this cache in the case where |
| * partitions are not directly mapped to any remote relation, such as when |
| * replication is occurring with one of their ancestors as target. |
| */ |
| |
| /* |
| * Relcache invalidation callback |
| */ |
| static void |
| logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) |
| { |
| LogicalRepPartMapEntry *entry; |
| |
| /* Just to be sure. */ |
| if (LogicalRepPartMap == NULL) |
| return; |
| |
| if (reloid != InvalidOid) |
| { |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepPartMap); |
| |
| /* TODO, use inverse lookup hashtable? */ |
| while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
| { |
| if (entry->relmapentry.localreloid == reloid) |
| { |
| entry->relmapentry.localrelvalid = false; |
| hash_seq_term(&status); |
| break; |
| } |
| } |
| } |
| else |
| { |
| /* invalidate all cache entries */ |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepPartMap); |
| |
| while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
| entry->relmapentry.localrelvalid = false; |
| } |
| } |
| |
| /* |
| * Initialize the partition map cache. |
| */ |
| static void |
| logicalrep_partmap_init(void) |
| { |
| HASHCTL ctl; |
| |
| if (!LogicalRepPartMapContext) |
| LogicalRepPartMapContext = |
| AllocSetContextCreate(CacheMemoryContext, |
| "LogicalRepPartMapContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* Initialize the relation hash table. */ |
| ctl.keysize = sizeof(Oid); /* partition OID */ |
| ctl.entrysize = sizeof(LogicalRepPartMapEntry); |
| ctl.hcxt = LogicalRepPartMapContext; |
| |
| LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl, |
| HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
| |
| /* Watch for invalidation events. */ |
| CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb, |
| (Datum) 0); |
| } |
| |
| /* |
| * logicalrep_partition_open |
| * |
| * Returned entry reuses most of the values of the root table's entry, save |
| * the attribute map, which can be different for the partition. However, |
| * we must physically copy all the data, in case the root table's entry |
| * gets freed/rebuilt. |
| * |
| * Note there's no logicalrep_partition_close, because the caller closes the |
| * component relation. |
| */ |
| LogicalRepRelMapEntry * |
| logicalrep_partition_open(LogicalRepRelMapEntry *root, |
| Relation partrel, AttrMap *map) |
| { |
| LogicalRepRelMapEntry *entry; |
| LogicalRepPartMapEntry *part_entry; |
| LogicalRepRelation *remoterel = &root->remoterel; |
| Oid partOid = RelationGetRelid(partrel); |
| AttrMap *attrmap = root->attrmap; |
| bool found; |
| MemoryContext oldctx; |
| |
| if (LogicalRepPartMap == NULL) |
| logicalrep_partmap_init(); |
| |
| /* Search for existing entry. */ |
| part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap, |
| (void *) &partOid, |
| HASH_ENTER, &found); |
| |
| entry = &part_entry->relmapentry; |
| |
| if (found && entry->localrelvalid) |
| return entry; |
| |
| /* Switch to longer-lived context. */ |
| oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); |
| |
| if (!found) |
| { |
| memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); |
| part_entry->partoid = partOid; |
| } |
| |
| if (!entry->remoterel.remoteid) |
| { |
| int i; |
| |
| /* Remote relation is copied as-is from the root entry. */ |
| entry = &part_entry->relmapentry; |
| entry->remoterel.remoteid = remoterel->remoteid; |
| entry->remoterel.nspname = pstrdup(remoterel->nspname); |
| entry->remoterel.relname = pstrdup(remoterel->relname); |
| entry->remoterel.natts = remoterel->natts; |
| entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
| entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
| for (i = 0; i < remoterel->natts; i++) |
| { |
| entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
| entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
| } |
| entry->remoterel.replident = remoterel->replident; |
| entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
| } |
| |
| entry->localrel = partrel; |
| entry->localreloid = partOid; |
| |
| /* |
| * If the partition's attributes don't match the root relation's, we'll |
| * need to make a new attrmap which maps partition attribute numbers to |
| * remoterel's, instead of the original which maps root relation's |
| * attribute numbers to remoterel's. |
| * |
| * Note that 'map' which comes from the tuple routing data structure |
| * contains 1-based attribute numbers (of the parent relation). However, |
| * the map in 'entry', a logical replication data structure, contains |
| * 0-based attribute numbers (of the remote relation). |
| */ |
| if (map) |
| { |
| AttrNumber attno; |
| |
| entry->attrmap = make_attrmap(map->maplen); |
| for (attno = 0; attno < entry->attrmap->maplen; attno++) |
| { |
| AttrNumber root_attno = map->attnums[attno]; |
| |
| /* 0 means it's a dropped attribute. See comments atop AttrMap. */ |
| if (root_attno == 0) |
| entry->attrmap->attnums[attno] = -1; |
| else |
| entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; |
| } |
| } |
| else |
| { |
| /* Lacking copy_attmap, do this the hard way. */ |
| entry->attrmap = make_attrmap(attrmap->maplen); |
| memcpy(entry->attrmap->attnums, attrmap->attnums, |
| attrmap->maplen * sizeof(AttrNumber)); |
| } |
| |
| entry->updatable = root->updatable; |
| |
| entry->localrelvalid = true; |
| |
| /* state and statelsn are left set to 0. */ |
| MemoryContextSwitchTo(oldctx); |
| |
| return entry; |
| } |