| /*------------------------------------------------------------------------- |
| * relation.c |
| * PostgreSQL logical replication relation mapping cache |
| * |
| * Copyright (c) 2016-2023, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * src/backend/replication/logical/relation.c |
| * |
| * NOTES |
| * Routines in this file mainly have to do with mapping the properties |
| * of local replication target relations to the properties of their |
| * remote counterpart. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/genam.h" |
| #include "access/table.h" |
| #include "catalog/namespace.h" |
| #include "catalog/pg_am_d.h" |
| #include "catalog/pg_subscription_rel.h" |
| #include "executor/executor.h" |
| #include "nodes/makefuncs.h" |
| #include "replication/logicalrelation.h" |
| #include "replication/worker_internal.h" |
| #include "utils/inval.h" |
| |
| |
| static MemoryContext LogicalRepRelMapContext = NULL; |
| |
| static HTAB *LogicalRepRelMap = NULL; |
| |
| /* |
| * Partition map (LogicalRepPartMap) |
| * |
| * When a partitioned table is used as replication target, replicated |
| * operations are actually performed on its leaf partitions, which requires |
| * the partitions to also be mapped to the remote relation. Parent's entry |
| * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because |
| * individual partitions may have different attribute numbers, which means |
| * attribute mappings to remote relation's attributes must be maintained |
| * separately for each partition. |
| */ |
| static MemoryContext LogicalRepPartMapContext = NULL; |
| static HTAB *LogicalRepPartMap = NULL; |
| typedef struct LogicalRepPartMapEntry |
| { |
| Oid partoid; /* LogicalRepPartMap's key */ |
| LogicalRepRelMapEntry relmapentry; |
| } LogicalRepPartMapEntry; |
| |
| static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, |
| AttrMap *attrMap); |
| |
| /* |
| * Relcache invalidation callback for our relation map cache. |
| */ |
| static void |
| logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) |
| { |
| LogicalRepRelMapEntry *entry; |
| |
| /* Just to be sure. */ |
| if (LogicalRepRelMap == NULL) |
| return; |
| |
| if (reloid != InvalidOid) |
| { |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepRelMap); |
| |
| /* TODO, use inverse lookup hashtable? */ |
| while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
| { |
| if (entry->localreloid == reloid) |
| { |
| entry->localrelvalid = false; |
| hash_seq_term(&status); |
| break; |
| } |
| } |
| } |
| else |
| { |
| /* invalidate all cache entries */ |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepRelMap); |
| |
| while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
| entry->localrelvalid = false; |
| } |
| } |
| |
| /* |
| * Initialize the relation map cache. |
| */ |
| static void |
| logicalrep_relmap_init(void) |
| { |
| HASHCTL ctl; |
| |
| if (!LogicalRepRelMapContext) |
| LogicalRepRelMapContext = |
| AllocSetContextCreate(CacheMemoryContext, |
| "LogicalRepRelMapContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* Initialize the relation hash table. */ |
| ctl.keysize = sizeof(LogicalRepRelId); |
| ctl.entrysize = sizeof(LogicalRepRelMapEntry); |
| ctl.hcxt = LogicalRepRelMapContext; |
| |
| LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl, |
| HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
| |
| /* Watch for invalidation events. */ |
| CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, |
| (Datum) 0); |
| } |
| |
| /* |
| * Free the entry of a relation map cache. |
| */ |
| static void |
| logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) |
| { |
| LogicalRepRelation *remoterel; |
| |
| remoterel = &entry->remoterel; |
| |
| pfree(remoterel->nspname); |
| pfree(remoterel->relname); |
| |
| if (remoterel->natts > 0) |
| { |
| int i; |
| |
| for (i = 0; i < remoterel->natts; i++) |
| pfree(remoterel->attnames[i]); |
| |
| pfree(remoterel->attnames); |
| pfree(remoterel->atttyps); |
| } |
| bms_free(remoterel->attkeys); |
| |
| if (entry->attrmap) |
| free_attrmap(entry->attrmap); |
| } |
| |
| /* |
| * Add new entry or update existing entry in the relation map cache. |
| * |
| * Called when new relation mapping is sent by the publisher to update |
| * our expected view of incoming data from said publisher. |
| */ |
| void |
| logicalrep_relmap_update(LogicalRepRelation *remoterel) |
| { |
| MemoryContext oldctx; |
| LogicalRepRelMapEntry *entry; |
| bool found; |
| int i; |
| |
| if (LogicalRepRelMap == NULL) |
| logicalrep_relmap_init(); |
| |
| /* |
| * HASH_ENTER returns the existing entry if present or creates a new one. |
| */ |
| entry = hash_search(LogicalRepRelMap, &remoterel->remoteid, |
| HASH_ENTER, &found); |
| |
| if (found) |
| logicalrep_relmap_free_entry(entry); |
| |
| memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
| |
| /* Make cached copy of the data */ |
| oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| entry->remoterel.remoteid = remoterel->remoteid; |
| entry->remoterel.nspname = pstrdup(remoterel->nspname); |
| entry->remoterel.relname = pstrdup(remoterel->relname); |
| entry->remoterel.natts = remoterel->natts; |
| entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
| entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
| for (i = 0; i < remoterel->natts; i++) |
| { |
| entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
| entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
| } |
| entry->remoterel.replident = remoterel->replident; |
| entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
| MemoryContextSwitchTo(oldctx); |
| } |
| |
| /* |
| * Find attribute index in TupleDesc struct by attribute name. |
| * |
| * Returns -1 if not found. |
| */ |
| static int |
| logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) |
| { |
| int i; |
| |
| for (i = 0; i < remoterel->natts; i++) |
| { |
| if (strcmp(remoterel->attnames[i], attname) == 0) |
| return i; |
| } |
| |
| return -1; |
| } |
| |
| /* |
| * Report error with names of the missing local relation column(s), if any. |
| */ |
| static void |
| logicalrep_report_missing_attrs(LogicalRepRelation *remoterel, |
| Bitmapset *missingatts) |
| { |
| if (!bms_is_empty(missingatts)) |
| { |
| StringInfoData missingattsbuf; |
| int missingattcnt = 0; |
| int i; |
| |
| initStringInfo(&missingattsbuf); |
| |
| i = -1; |
| while ((i = bms_next_member(missingatts, i)) >= 0) |
| { |
| missingattcnt++; |
| if (missingattcnt == 1) |
| appendStringInfo(&missingattsbuf, _("\"%s\""), |
| remoterel->attnames[i]); |
| else |
| appendStringInfo(&missingattsbuf, _(", \"%s\""), |
| remoterel->attnames[i]); |
| } |
| |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s", |
| "logical replication target relation \"%s.%s\" is missing replicated columns: %s", |
| missingattcnt, |
| remoterel->nspname, |
| remoterel->relname, |
| missingattsbuf.data))); |
| } |
| } |
| |
| /* |
| * Check if replica identity matches and mark the updatable flag. |
| * |
| * We allow for stricter replica identity (fewer columns) on subscriber as |
| * that will not stop us from finding unique tuple. IE, if publisher has |
| * identity (id,timestamp) and subscriber just (id) this will not be a |
| * problem, but in the opposite scenario it will. |
| * |
| * We just mark the relation entry as not updatable here if the local |
| * replica identity is found to be insufficient for applying |
| * updates/deletes (inserts don't care!) and leave it to |
| * check_relation_updatable() to throw the actual error if needed. |
| */ |
| static void |
| logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) |
| { |
| Bitmapset *idkey; |
| LogicalRepRelation *remoterel = &entry->remoterel; |
| int i; |
| |
| entry->updatable = true; |
| |
| idkey = RelationGetIndexAttrBitmap(entry->localrel, |
| INDEX_ATTR_BITMAP_IDENTITY_KEY); |
| /* fallback to PK if no replica identity */ |
| if (idkey == NULL) |
| { |
| idkey = RelationGetIndexAttrBitmap(entry->localrel, |
| INDEX_ATTR_BITMAP_PRIMARY_KEY); |
| |
| /* |
| * If no replica identity index and no PK, the published table must |
| * have replica identity FULL. |
| */ |
| if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) |
| entry->updatable = false; |
| } |
| |
| i = -1; |
| while ((i = bms_next_member(idkey, i)) >= 0) |
| { |
| int attnum = i + FirstLowInvalidHeapAttributeNumber; |
| |
| if (!AttrNumberIsForUserDefinedAttr(attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication target relation \"%s.%s\" uses " |
| "system columns in REPLICA IDENTITY index", |
| remoterel->nspname, remoterel->relname))); |
| |
| attnum = AttrNumberGetAttrOffset(attnum); |
| |
| if (entry->attrmap->attnums[attnum] < 0 || |
| !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) |
| { |
| entry->updatable = false; |
| break; |
| } |
| } |
| } |
| |
| /* |
| * Open the local relation associated with the remote one. |
| * |
| * Rebuilds the Relcache mapping if it was invalidated by local DDL. |
| */ |
| LogicalRepRelMapEntry * |
| logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) |
| { |
| LogicalRepRelMapEntry *entry; |
| bool found; |
| LogicalRepRelation *remoterel; |
| |
| if (LogicalRepRelMap == NULL) |
| logicalrep_relmap_init(); |
| |
| /* Search for existing entry. */ |
| entry = hash_search(LogicalRepRelMap, &remoteid, |
| HASH_FIND, &found); |
| |
| if (!found) |
| elog(ERROR, "no relation map entry for remote relation ID %u", |
| remoteid); |
| |
| remoterel = &entry->remoterel; |
| |
| /* Ensure we don't leak a relcache refcount. */ |
| if (entry->localrel) |
| elog(ERROR, "remote relation ID %u is already open", remoteid); |
| |
| /* |
| * When opening and locking a relation, pending invalidation messages are |
| * processed which can invalidate the relation. Hence, if the entry is |
| * currently considered valid, try to open the local relation by OID and |
| * see if invalidation ensues. |
| */ |
| if (entry->localrelvalid) |
| { |
| entry->localrel = try_table_open(entry->localreloid, lockmode, false); |
| if (!entry->localrel) |
| { |
| /* Table was renamed or dropped. */ |
| entry->localrelvalid = false; |
| } |
| else if (!entry->localrelvalid) |
| { |
| /* Note we release the no-longer-useful lock here. */ |
| table_close(entry->localrel, lockmode); |
| entry->localrel = NULL; |
| } |
| } |
| |
| /* |
| * If the entry has been marked invalid since we last had lock on it, |
| * re-open the local relation by name and rebuild all derived data. |
| */ |
| if (!entry->localrelvalid) |
| { |
| Oid relid; |
| TupleDesc desc; |
| MemoryContext oldctx; |
| int i; |
| Bitmapset *missingatts; |
| |
| /* Release the no-longer-useful attrmap, if any. */ |
| if (entry->attrmap) |
| { |
| pfree(entry->attrmap); |
| entry->attrmap = NULL; |
| } |
| |
| /* Try to find and lock the relation by name. */ |
| relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, |
| remoterel->relname, -1), |
| lockmode, true); |
| if (!OidIsValid(relid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication target relation \"%s.%s\" does not exist", |
| remoterel->nspname, remoterel->relname))); |
| entry->localrel = table_open(relid, NoLock); |
| entry->localreloid = relid; |
| |
| /* Check for supported relkind. */ |
| CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, |
| remoterel->nspname, remoterel->relname); |
| |
| /* |
| * Build the mapping of local attribute numbers to remote attribute |
| * numbers and validate that we don't miss any replicated columns as |
| * that would result in potentially unwanted data loss. |
| */ |
| desc = RelationGetDescr(entry->localrel); |
| oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| entry->attrmap = make_attrmap(desc->natts); |
| MemoryContextSwitchTo(oldctx); |
| |
| /* check and report missing attrs, if any */ |
| missingatts = bms_add_range(NULL, 0, remoterel->natts - 1); |
| for (i = 0; i < desc->natts; i++) |
| { |
| int attnum; |
| Form_pg_attribute attr = TupleDescAttr(desc, i); |
| |
| if (attr->attisdropped || attr->attgenerated) |
| { |
| entry->attrmap->attnums[i] = -1; |
| continue; |
| } |
| |
| attnum = logicalrep_rel_att_by_name(remoterel, |
| NameStr(attr->attname)); |
| |
| entry->attrmap->attnums[i] = attnum; |
| if (attnum >= 0) |
| missingatts = bms_del_member(missingatts, attnum); |
| } |
| |
| logicalrep_report_missing_attrs(remoterel, missingatts); |
| |
| /* be tidy */ |
| bms_free(missingatts); |
| |
| /* |
| * Set if the table's replica identity is enough to apply |
| * update/delete. |
| */ |
| logicalrep_rel_mark_updatable(entry); |
| |
| /* |
| * Finding a usable index is an infrequent task. It occurs when an |
| * operation is first performed on the relation, or after invalidation |
| * of the relation cache entry (such as ANALYZE or CREATE/DROP index |
| * on the relation). |
| */ |
| entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel, |
| entry->attrmap); |
| |
| entry->localrelvalid = true; |
| } |
| |
| if (entry->state != SUBREL_STATE_READY) |
| entry->state = GetSubscriptionRelState(MySubscription->oid, |
| entry->localreloid, |
| &entry->statelsn); |
| |
| return entry; |
| } |
| |
| /* |
| * Close the previously opened logical relation. |
| */ |
| void |
| logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) |
| { |
| table_close(rel->localrel, lockmode); |
| rel->localrel = NULL; |
| } |
| |
| /* |
| * Partition cache: look up partition LogicalRepRelMapEntry's |
| * |
| * Unlike relation map cache, this is keyed by partition OID, not remote |
| * relation OID, because we only have to use this cache in the case where |
| * partitions are not directly mapped to any remote relation, such as when |
| * replication is occurring with one of their ancestors as target. |
| */ |
| |
| /* |
| * Relcache invalidation callback |
| */ |
| static void |
| logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) |
| { |
| LogicalRepPartMapEntry *entry; |
| |
| /* Just to be sure. */ |
| if (LogicalRepPartMap == NULL) |
| return; |
| |
| if (reloid != InvalidOid) |
| { |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepPartMap); |
| |
| /* TODO, use inverse lookup hashtable? */ |
| while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
| { |
| if (entry->relmapentry.localreloid == reloid) |
| { |
| entry->relmapentry.localrelvalid = false; |
| hash_seq_term(&status); |
| break; |
| } |
| } |
| } |
| else |
| { |
| /* invalidate all cache entries */ |
| HASH_SEQ_STATUS status; |
| |
| hash_seq_init(&status, LogicalRepPartMap); |
| |
| while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
| entry->relmapentry.localrelvalid = false; |
| } |
| } |
| |
| /* |
| * Reset the entries in the partition map that refer to remoterel. |
| * |
| * Called when new relation mapping is sent by the publisher to update our |
| * expected view of incoming data from said publisher. |
| * |
| * Note that we don't update the remoterel information in the entry here, |
| * we will update the information in logicalrep_partition_open to avoid |
| * unnecessary work. |
| */ |
| void |
| logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel) |
| { |
| HASH_SEQ_STATUS status; |
| LogicalRepPartMapEntry *part_entry; |
| LogicalRepRelMapEntry *entry; |
| |
| if (LogicalRepPartMap == NULL) |
| return; |
| |
| hash_seq_init(&status, LogicalRepPartMap); |
| while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
| { |
| entry = &part_entry->relmapentry; |
| |
| if (entry->remoterel.remoteid != remoterel->remoteid) |
| continue; |
| |
| logicalrep_relmap_free_entry(entry); |
| |
| memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
| } |
| } |
| |
| /* |
| * Initialize the partition map cache. |
| */ |
| static void |
| logicalrep_partmap_init(void) |
| { |
| HASHCTL ctl; |
| |
| if (!LogicalRepPartMapContext) |
| LogicalRepPartMapContext = |
| AllocSetContextCreate(CacheMemoryContext, |
| "LogicalRepPartMapContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* Initialize the relation hash table. */ |
| ctl.keysize = sizeof(Oid); /* partition OID */ |
| ctl.entrysize = sizeof(LogicalRepPartMapEntry); |
| ctl.hcxt = LogicalRepPartMapContext; |
| |
| LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl, |
| HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
| |
| /* Watch for invalidation events. */ |
| CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb, |
| (Datum) 0); |
| } |
| |
| /* |
| * logicalrep_partition_open |
| * |
| * Returned entry reuses most of the values of the root table's entry, save |
| * the attribute map, which can be different for the partition. However, |
| * we must physically copy all the data, in case the root table's entry |
| * gets freed/rebuilt. |
| * |
| * Note there's no logicalrep_partition_close, because the caller closes the |
| * component relation. |
| */ |
| LogicalRepRelMapEntry * |
| logicalrep_partition_open(LogicalRepRelMapEntry *root, |
| Relation partrel, AttrMap *map) |
| { |
| LogicalRepRelMapEntry *entry; |
| LogicalRepPartMapEntry *part_entry; |
| LogicalRepRelation *remoterel = &root->remoterel; |
| Oid partOid = RelationGetRelid(partrel); |
| AttrMap *attrmap = root->attrmap; |
| bool found; |
| MemoryContext oldctx; |
| |
| if (LogicalRepPartMap == NULL) |
| logicalrep_partmap_init(); |
| |
| /* Search for existing entry. */ |
| part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap, |
| &partOid, |
| HASH_ENTER, &found); |
| |
| entry = &part_entry->relmapentry; |
| |
| if (found && entry->localrelvalid) |
| return entry; |
| |
| /* Switch to longer-lived context. */ |
| oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); |
| |
| if (!found) |
| { |
| memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); |
| part_entry->partoid = partOid; |
| } |
| |
| if (!entry->remoterel.remoteid) |
| { |
| int i; |
| |
| /* Remote relation is copied as-is from the root entry. */ |
| entry = &part_entry->relmapentry; |
| entry->remoterel.remoteid = remoterel->remoteid; |
| entry->remoterel.nspname = pstrdup(remoterel->nspname); |
| entry->remoterel.relname = pstrdup(remoterel->relname); |
| entry->remoterel.natts = remoterel->natts; |
| entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
| entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
| for (i = 0; i < remoterel->natts; i++) |
| { |
| entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
| entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
| } |
| entry->remoterel.replident = remoterel->replident; |
| entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
| } |
| |
| entry->localrel = partrel; |
| entry->localreloid = partOid; |
| |
| /* |
| * If the partition's attributes don't match the root relation's, we'll |
| * need to make a new attrmap which maps partition attribute numbers to |
| * remoterel's, instead of the original which maps root relation's |
| * attribute numbers to remoterel's. |
| * |
| * Note that 'map' which comes from the tuple routing data structure |
| * contains 1-based attribute numbers (of the parent relation). However, |
| * the map in 'entry', a logical replication data structure, contains |
| * 0-based attribute numbers (of the remote relation). |
| */ |
| if (map) |
| { |
| AttrNumber attno; |
| |
| entry->attrmap = make_attrmap(map->maplen); |
| for (attno = 0; attno < entry->attrmap->maplen; attno++) |
| { |
| AttrNumber root_attno = map->attnums[attno]; |
| |
| /* 0 means it's a dropped attribute. See comments atop AttrMap. */ |
| if (root_attno == 0) |
| entry->attrmap->attnums[attno] = -1; |
| else |
| entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; |
| } |
| } |
| else |
| { |
| /* Lacking copy_attmap, do this the hard way. */ |
| entry->attrmap = make_attrmap(attrmap->maplen); |
| memcpy(entry->attrmap->attnums, attrmap->attnums, |
| attrmap->maplen * sizeof(AttrNumber)); |
| } |
| |
| /* Set if the table's replica identity is enough to apply update/delete. */ |
| logicalrep_rel_mark_updatable(entry); |
| |
| /* state and statelsn are left set to 0. */ |
| MemoryContextSwitchTo(oldctx); |
| |
| /* |
| * Finding a usable index is an infrequent task. It occurs when an |
| * operation is first performed on the relation, or after invalidation of |
| * the relation cache entry (such as ANALYZE or CREATE/DROP index on the |
| * relation). |
| * |
| * We also prefer to run this code on the oldctx so that we do not leak |
| * anything in the LogicalRepPartMapContext (hence CacheMemoryContext). |
| */ |
| entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, |
| entry->attrmap); |
| |
| entry->localrelvalid = true; |
| |
| return entry; |
| } |
| |
| /* |
| * Returns the oid of an index that can be used by the apply worker to scan |
| * the relation. |
| * |
| * We expect to call this function when REPLICA IDENTITY FULL is defined for |
| * the remote relation. |
| * |
| * If no suitable index is found, returns InvalidOid. |
| */ |
| static Oid |
| FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap) |
| { |
| List *idxlist = RelationGetIndexList(localrel); |
| ListCell *lc; |
| |
| foreach(lc, idxlist) |
| { |
| Oid idxoid = lfirst_oid(lc); |
| bool isUsableIdx; |
| Relation idxRel; |
| IndexInfo *idxInfo; |
| |
| idxRel = index_open(idxoid, AccessShareLock); |
| idxInfo = BuildIndexInfo(idxRel); |
| isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxInfo, attrmap); |
| index_close(idxRel, AccessShareLock); |
| |
| /* Return the first eligible index found */ |
| if (isUsableIdx) |
| return idxoid; |
| } |
| |
| return InvalidOid; |
| } |
| |
| /* |
| * Returns true if the index is usable for replica identity full. |
| * |
| * The index must be btree, non-partial, and the leftmost field must be a |
| * column (not an expression) that references the remote relation column. |
| * These limitations help to keep the index scan similar to PK/RI index |
| * scans. |
| * |
| * attrmap is a map of local attributes to remote ones. We can consult this |
| * map to check whether the local index attribute has a corresponding remote |
| * attribute. |
| * |
| * Note that the limitations of index scans for replica identity full only |
| * adheres to a subset of the limitations of PK/RI. For example, we support |
| * columns that are marked as [NULL] or we are not interested in the [NOT |
| * DEFERRABLE] aspect of constraints here. It works for us because we always |
| * compare the tuples for non-PK/RI index scans. See |
| * RelationFindReplTupleByIndex(). |
| * |
| * XXX: There are no fundamental problems for supporting non-btree indexes. |
| * We mostly need to relax the limitations in RelationFindReplTupleByIndex(). |
| * For partial indexes, the required changes are likely to be larger. If |
| * none of the tuples satisfy the expression for the index scan, we fall-back |
| * to sequential execution, which might not be a good idea in some cases. |
| */ |
| bool |
| IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo, AttrMap *attrmap) |
| { |
| AttrNumber keycol; |
| |
| /* The index must be a Btree index */ |
| if (indexInfo->ii_Am != BTREE_AM_OID) |
| return false; |
| |
| /* The index must not be a partial index */ |
| if (indexInfo->ii_Predicate != NIL) |
| return false; |
| |
| Assert(indexInfo->ii_NumIndexAttrs >= 1); |
| |
| /* The leftmost index field must not be an expression */ |
| keycol = indexInfo->ii_IndexAttrNumbers[0]; |
| if (!AttributeNumberIsValid(keycol)) |
| return false; |
| |
| /* |
| * And the leftmost index field must reference the remote relation column. |
| * This is because if it doesn't, the sequential scan is favorable over |
| * index scan in most cases. |
| */ |
| if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol) || |
| attrmap->attnums[AttrNumberGetAttrOffset(keycol)] < 0) |
| return false; |
| |
| return true; |
| } |
| |
| /* |
| * Get replica identity index or if it is not defined a primary key. |
| * |
| * If neither is defined, returns InvalidOid |
| */ |
| Oid |
| GetRelationIdentityOrPK(Relation rel) |
| { |
| Oid idxoid; |
| |
| idxoid = RelationGetReplicaIndex(rel); |
| |
| if (!OidIsValid(idxoid)) |
| idxoid = RelationGetPrimaryKeyIndex(rel); |
| |
| return idxoid; |
| } |
| |
| /* |
| * Returns the index oid if we can use an index for subscriber. Otherwise, |
| * returns InvalidOid. |
| */ |
| static Oid |
| FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, |
| AttrMap *attrMap) |
| { |
| Oid idxoid; |
| |
| /* |
| * We never need index oid for partitioned tables, always rely on leaf |
| * partition's index. |
| */ |
| if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| return InvalidOid; |
| |
| /* |
| * Simple case, we already have a primary key or a replica identity index. |
| */ |
| idxoid = GetRelationIdentityOrPK(localrel); |
| if (OidIsValid(idxoid)) |
| return idxoid; |
| |
| if (remoterel->replident == REPLICA_IDENTITY_FULL) |
| { |
| /* |
| * We are looking for one more opportunity for using an index. If |
| * there are any indexes defined on the local relation, try to pick a |
| * suitable index. |
| * |
| * The index selection safely assumes that all the columns are going |
| * to be available for the index scan given that remote relation has |
| * replica identity full. |
| * |
| * Note that we are not using the planner to find the cheapest method |
| * to scan the relation as that would require us to either use lower |
| * level planner functions which would be a maintenance burden in the |
| * long run or use the full-fledged planner which could cause |
| * overhead. |
| */ |
| return FindUsableIndexForReplicaIdentityFull(localrel, attrMap); |
| } |
| |
| return InvalidOid; |
| } |