| /*-------------------------------------------------------------------------- |
| * |
| * aocsam_handler.c |
| * Append only columnar access methods handler |
| * |
| * Portions Copyright (c) 2009-2010, Greenplum Inc. |
| * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/access/aocs/aocsam_handler.c |
| * |
| *-------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include "access/aomd.h" |
| #include "access/appendonlywriter.h" |
| #include "access/heapam.h" |
| #include "access/multixact.h" |
| #include "access/reloptions.h" |
| #include "access/tableam.h" |
| #include "access/tsmapi.h" |
| #include "access/xact.h" |
| #include "catalog/aoseg.h" |
| #include "catalog/catalog.h" |
| #include "catalog/heap.h" |
| #include "catalog/index.h" |
| #include "catalog/pg_am.h" |
| #include "catalog/pg_appendonly.h" |
| #include "catalog/storage.h" |
| #include "catalog/storage_xlog.h" |
| #include "cdb/cdbaocsam.h" |
| #include "cdb/cdbvars.h" |
| #include "commands/defrem.h" |
| #include "commands/progress.h" |
| #include "commands/vacuum.h" |
| #include "executor/executor.h" |
| #include "nodes/makefuncs.h" |
| #include "nodes/nodeFuncs.h" |
| #include "pgstat.h" |
| #include "storage/lmgr.h" |
| #include "storage/procarray.h" |
| #include "storage/smgr.h" |
| #include "utils/builtins.h" |
| #include "utils/faultinjector.h" |
| #include "utils/lsyscache.h" |
| #include "utils/pg_rusage.h" |
| #include "utils/guc.h" |
| |
| #define IS_BTREE(r) ((r)->rd_rel->relam == BTREE_AM_OID) |
| |
| extern BlockNumber system_nextsampleblock(SampleScanState *node, BlockNumber nblocks); |
| |
| /* |
| * Used for bitmapHeapScan. Also look at the comment in cdbaocsam.h regarding |
| * AOCSScanDescIdentifier. |
| * |
| * In BitmapHeapScans, it is needed to keep track of two distict fetch |
| * descriptors. One for direct fetches, and another one for recheck fetches. The |
| * distinction allows for a different set of columns to be populated in each |
| * case. During initialiazation of this structure, it is required to populate |
| * the proj array accordingly. It is later, during the actual fetching of the |
| * tuple, that the corresponding fetch descriptor will be lazily initialized. |
| * |
| * Finally, in this struct, state between next_block and next_tuple calls is |
| * kept, in order to minimize the work that is done in the latter. |
| */ |
| typedef struct AOCSBitmapScanData |
| { |
| TableScanDescData rs_base; /* AM independent part of the descriptor */ |
| |
| enum AOCSScanDescIdentifier descIdentifier; |
| |
| Snapshot appendOnlyMetaDataSnapshot; |
| |
| enum |
| { |
| NO_RECHECK, |
| RECHECK |
| } whichDesc; |
| |
| struct { |
| struct AOCSFetchDescData *bitmapFetch; |
| bool *proj; |
| } bitmapScanDesc[2]; |
| |
| int rs_cindex; /* current tuple's index tbmres->offset or -1 */ |
| } *AOCSBitmapScan; |
| |
| /* |
| * Per-relation backend-local DML state for DML or DML-like operations. |
| */ |
| typedef struct AOCODMLState |
| { |
| Oid relationOid; |
| AOCSInsertDesc insertDesc; |
| AOCSDeleteDesc deleteDesc; |
| AOCSUniqueCheckDesc uniqueCheckDesc; |
| /* |
| * CBDB_PARALLEL |
| * head: the Head of multiple segment files insertion list. |
| * insertMultiFiles: number of seg files to be inserted into. |
| * used_segment_files: used to avoid used files when asking |
| * for a new segment file. |
| */ |
| dlist_head head; |
| int insertMultiFiles; |
| List* used_segment_files; |
| } AOCODMLState; |
| |
| static void reset_state_cb(void *arg); |
| |
| /* |
| * A repository for per-relation backend-local DML states. Contains: |
| * a quick look up member for the common case (only 1 relation) |
| * a hash table which keeps per relation information |
| * a memory context that should be long lived enough and is |
| * responsible for reseting the state via its reset cb |
| */ |
| typedef struct AOCODMLStates |
| { |
| AOCODMLState *last_used_state; |
| HTAB *state_table; |
| |
| MemoryContext stateCxt; |
| MemoryContextCallback cb; |
| } AOCODMLStates; |
| |
| static AOCODMLStates aocoDMLStates; |
| |
| /* |
| * There are two cases that we are called from, during context destruction |
| * after a successful completion and after a transaction abort. Only in the |
| * second case we should not have cleaned up the DML state and the entries in |
| * the hash table. We need to reset our global state. The actual clean up is |
| * taken care elsewhere. |
| */ |
| static void |
| reset_state_cb(void *arg) |
| { |
| aocoDMLStates.state_table = NULL; |
| aocoDMLStates.last_used_state = NULL; |
| aocoDMLStates.stateCxt = NULL; |
| } |
| |
| |
| /* |
| * Initialize the backend local AOCODMLStates object for this backend for the |
| * current DML or DML-like command (if not already initialized). |
| * |
| * This function should be called with a current memory context whose life |
| * span is enough to last until the end of this command execution. |
| */ |
| static void |
| init_aoco_dml_states() |
| { |
| HASHCTL hash_ctl; |
| |
| if (!aocoDMLStates.state_table) |
| { |
| Assert(aocoDMLStates.stateCxt == NULL); |
| aocoDMLStates.stateCxt = AllocSetContextCreate( |
| CurrentMemoryContext, |
| "AppendOnly DML State Context", |
| ALLOCSET_SMALL_SIZES); |
| |
| aocoDMLStates.cb.func = reset_state_cb; |
| aocoDMLStates.cb.arg = NULL; |
| MemoryContextRegisterResetCallback(aocoDMLStates.stateCxt, |
| &aocoDMLStates.cb); |
| |
| memset(&hash_ctl, 0, sizeof(hash_ctl)); |
| hash_ctl.keysize = sizeof(Oid); |
| hash_ctl.entrysize = sizeof(AOCODMLState); |
| hash_ctl.hcxt = aocoDMLStates.stateCxt; |
| aocoDMLStates.state_table = |
| hash_create("AppendOnly DML state", 128, &hash_ctl, |
| HASH_CONTEXT | HASH_ELEM | HASH_BLOBS); |
| } |
| } |
| |
| /* |
| * Create and insert a state entry for a relation. The actual descriptors will |
| * be created lazily when/if needed. |
| * |
| * Should be called exactly once per relation. |
| */ |
| static inline void |
| init_dml_state(const Oid relationOid) |
| { |
| AOCODMLState *state; |
| bool found; |
| |
| Assert(aocoDMLStates.state_table); |
| |
| state = (AOCODMLState *) hash_search(aocoDMLStates.state_table, |
| &relationOid, |
| HASH_ENTER, |
| &found); |
| |
| state->insertDesc = NULL; |
| state->deleteDesc = NULL; |
| state->uniqueCheckDesc = NULL; |
| state->insertMultiFiles = 0; |
| state->used_segment_files = NIL; |
| dlist_init(&state->head); |
| |
| Assert(!found); |
| |
| aocoDMLStates.last_used_state = state; |
| } |
| |
| /* |
| * Retrieve the state information for a relation. |
| * It is required that the state has been created before hand. |
| */ |
| static inline AOCODMLState * |
| find_dml_state(const Oid relationOid) |
| { |
| AOCODMLState *state; |
| Assert(aocoDMLStates.state_table); |
| |
| if (aocoDMLStates.last_used_state && |
| aocoDMLStates.last_used_state->relationOid == relationOid) |
| return aocoDMLStates.last_used_state; |
| |
| state = (AOCODMLState *) hash_search(aocoDMLStates.state_table, |
| &relationOid, |
| HASH_FIND, |
| NULL); |
| |
| Assert(state); |
| |
| aocoDMLStates.last_used_state = state; |
| return state; |
| } |
| |
| /* |
| * Remove the state information for a relation. |
| * It is required that the state has been created before hand. |
| * |
| * Should be called exactly once per relation. |
| */ |
| static inline void |
| remove_dml_state(const Oid relationOid) |
| { |
| AOCODMLState *state; |
| Assert(aocoDMLStates.state_table); |
| |
| state = (AOCODMLState *) hash_search(aocoDMLStates.state_table, |
| &relationOid, |
| HASH_REMOVE, |
| NULL); |
| |
| if (!state) |
| return; |
| |
| if (aocoDMLStates.last_used_state && |
| aocoDMLStates.last_used_state->relationOid == relationOid) |
| aocoDMLStates.last_used_state = NULL; |
| |
| return; |
| } |
| |
| /* |
| * Although the operation param is superfluous at the momment, the signature of |
| * the function is such for balance between the init and finish. |
| * |
| * This function should be called exactly once per relation. |
| */ |
| void |
| aoco_dml_init(Relation relation, CmdType operation) |
| { |
| init_aoco_dml_states(); |
| init_dml_state(RelationGetRelid(relation)); |
| } |
| |
| /* |
| * This function should be called exactly once per relation. |
| */ |
| void |
| aoco_dml_finish(Relation relation, CmdType operation) |
| { |
| AOCODMLState *state; |
| bool had_delete_desc = false; |
| |
| Oid relationOid = RelationGetRelid(relation); |
| |
| Assert(aocoDMLStates.state_table); |
| |
| state = (AOCODMLState *) hash_search(aocoDMLStates.state_table, |
| &relationOid, |
| HASH_FIND, |
| NULL); |
| |
| if (!state) |
| return; |
| |
| if (state->deleteDesc) |
| { |
| aocs_delete_finish(state->deleteDesc); |
| state->deleteDesc = NULL; |
| |
| /* |
| * Bump up the modcount. If we inserted something (meaning that |
| * this was an UPDATE), we can skip this, as the insertion bumped |
| * up the modcount already. |
| */ |
| if (!state->insertDesc) |
| AORelIncrementModCount(relation); |
| |
| had_delete_desc = true; |
| } |
| |
| if (state->insertDesc) |
| { |
| Assert(state->insertDesc->aoi_rel == relation); |
| aocs_insert_finish(state->insertDesc, &state->head); |
| state->insertDesc = NULL; |
| state->insertMultiFiles = 0; |
| pfree(state->used_segment_files); |
| state->used_segment_files = NIL; |
| } |
| |
| if (state->uniqueCheckDesc) |
| { |
| /* clean up the block directory */ |
| AppendOnlyBlockDirectory_End_forUniqueChecks(state->uniqueCheckDesc->blockDirectory); |
| pfree(state->uniqueCheckDesc->blockDirectory); |
| state->uniqueCheckDesc->blockDirectory = NULL; |
| |
| /* |
| * If this fetch is a part of an UPDATE, then we have been reusing the |
| * visimapDelete used by the delete half of the UPDATE, which would have |
| * already been cleaned up above. Clean up otherwise. |
| */ |
| if (!had_delete_desc) |
| { |
| AppendOnlyVisimap_Finish_forUniquenessChecks(state->uniqueCheckDesc->visimap); |
| pfree(state->uniqueCheckDesc->visimap); |
| } |
| else |
| { |
| /* |
| * Github issue: https://github.com/apache/cloudberry/issues/557 |
| * |
| * For partition tables, it's possible to update across partitions. |
| * And it does have deleteDesc and uniqueCheckDesc if there were. |
| * Some partitions have visimap but some not, clean them if possible. |
| */ |
| if (state->uniqueCheckDesc->visimap) |
| { |
| AppendOnlyVisimapStore_Finish(&state->uniqueCheckDesc->visimap->visimapStore, AccessShareLock); |
| } |
| } |
| state->uniqueCheckDesc->visimap = NULL; |
| state->uniqueCheckDesc->visiMapDelete = NULL; |
| |
| pfree(state->uniqueCheckDesc); |
| state->uniqueCheckDesc = NULL; |
| } |
| |
| remove_dml_state(relationOid); |
| } |
| |
| /* |
| * Retrieve the insertDescriptor for a relation. Initialize it if needed. |
| */ |
| static AOCSInsertDesc |
| get_insert_descriptor(const Relation relation) |
| { |
| AOCODMLState *state; |
| AOCSInsertDesc next = NULL; |
| MemoryContext oldcxt; |
| |
| state = find_dml_state(RelationGetRelid(relation)); |
| oldcxt = MemoryContextSwitchTo(aocoDMLStates.stateCxt); |
| if (state->insertDesc == NULL) |
| { |
| |
| /* |
| * CBDB_PARALLEL: |
| * Should not enable insertMultiFiles if the table is created by own transaction |
| * or in utility mode. |
| */ |
| if (Gp_role != GP_ROLE_UTILITY && |
| gp_appendonly_insert_files > 1 && |
| !ShouldUseReservedSegno(relation, CHOOSE_MODE_WRITE)) |
| state->insertMultiFiles = gp_appendonly_insert_files; |
| |
| state->insertDesc = aocs_insert_init(relation, |
| ChooseSegnoForWrite(relation)); |
| |
| state->used_segment_files = list_make1_int(state->insertDesc->cur_segno); |
| dlist_init(&state->head); |
| dlist_push_tail(&state->head, &state->insertDesc->node); |
| |
| } |
| |
| /* switch insertDesc */ |
| if (state->insertMultiFiles && state->insertDesc->range == gp_appendonly_insert_files_tuples_range) |
| { |
| state->insertDesc->range = 0; |
| |
| if (list_length(state->used_segment_files) < state->insertMultiFiles) |
| { |
| next = aocs_insert_init(relation, ChooseSegnoForWriteMultiFile(relation, state->used_segment_files)); |
| dlist_push_tail(&state->head, &next->node); |
| state->used_segment_files = lappend_int(state->used_segment_files, next->cur_segno); |
| } |
| |
| if (!dlist_has_next(&state->head, &state->insertDesc->node)) |
| next = (AOCSInsertDesc)dlist_container(AOCSInsertDescData, node, dlist_head_node(&state->head)); |
| else |
| next = (AOCSInsertDesc)dlist_container(AOCSInsertDescData, node, dlist_next_node(&state->head, &state->insertDesc->node)); |
| |
| state->insertDesc = next; |
| } |
| /* |
| * If we have a unique index, insert a placeholder block directory row to |
| * entertain uniqueness checks from concurrent inserts. See |
| * AppendOnlyBlockDirectory_InsertPlaceholder() for details. |
| * |
| * Note: For AOCO tables, we need to only insert a placeholder block |
| * directory row for the 1st non-dropped column. This is because |
| * during a uniqueness check, only the first non-dropped column's block |
| * directory entry is consulted. (See AppendOnlyBlockDirectory_CoversTuple()) |
| */ |
| if (relationHasUniqueIndex(relation) && !state->insertDesc->placeholderInserted) |
| { |
| int firstNonDroppedColumn = -1; |
| int64 firstRowNum; |
| DatumStreamWrite *dsw; |
| BufferedAppend *bufferedAppend; |
| int64 fileOffset; |
| AOCSInsertDesc insertDesc; |
| |
| |
| for(int i = 0; i < relation->rd_att->natts; i++) |
| { |
| if (!relation->rd_att->attrs[i].attisdropped) { |
| firstNonDroppedColumn = i; |
| break; |
| } |
| } |
| Assert(firstNonDroppedColumn != -1); |
| |
| insertDesc = state->insertDesc; |
| dsw = insertDesc->ds[firstNonDroppedColumn]; |
| firstRowNum = dsw->blockFirstRowNum; |
| bufferedAppend = &dsw->ao_write.bufferedAppend; |
| fileOffset = BufferedAppendNextBufferPosition(bufferedAppend); |
| |
| AppendOnlyBlockDirectory_InsertPlaceholder(&insertDesc->blockDirectory, |
| firstRowNum, |
| fileOffset, |
| firstNonDroppedColumn); |
| insertDesc->placeholderInserted = true; |
| } |
| MemoryContextSwitchTo(oldcxt); |
| return state->insertDesc; |
| } |
| |
| |
| /* |
| * Retrieve the deleteDescriptor for a relation. Initialize it if needed. |
| */ |
| static AOCSDeleteDesc |
| get_delete_descriptor(const Relation relation, bool forUpdate) |
| { |
| AOCODMLState *state; |
| |
| state = find_dml_state(RelationGetRelid(relation)); |
| |
| if (state->deleteDesc == NULL) |
| { |
| MemoryContext oldcxt; |
| |
| oldcxt = MemoryContextSwitchTo(aocoDMLStates.stateCxt); |
| state->deleteDesc = aocs_delete_init(relation); |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| return state->deleteDesc; |
| } |
| |
| static AOCSUniqueCheckDesc |
| get_or_create_unique_check_desc(Relation relation, Snapshot snapshot) |
| { |
| AOCODMLState *state = find_dml_state(RelationGetRelid(relation)); |
| |
| if (!state->uniqueCheckDesc) |
| { |
| MemoryContext oldcxt; |
| AOCSUniqueCheckDesc uniqueCheckDesc; |
| |
| oldcxt = MemoryContextSwitchTo(aocoDMLStates.stateCxt); |
| uniqueCheckDesc = palloc0(sizeof(AOCSUniqueCheckDescData)); |
| |
| /* Initialize the block directory */ |
| uniqueCheckDesc->blockDirectory = palloc0(sizeof(AppendOnlyBlockDirectory)); |
| AppendOnlyBlockDirectory_Init_forUniqueChecks(uniqueCheckDesc->blockDirectory, |
| relation, |
| relation->rd_att->natts, /* numColGroups */ |
| snapshot); |
| |
| /* |
| * If this is part of an UPDATE, we need to reuse the visimapDelete |
| * support structure from the delete half of the update. This is to |
| * avoid spurious conflicts when the key's previous and new value are |
| * identical. Using it ensures that we can recognize any tuples deleted |
| * by us prior to this insert, within this command. |
| * |
| * Note: It is important that we reuse the visimapDelete structure and |
| * not the visimap structure. This is because, when a uniqueness check |
| * is performed as part of an UPDATE, visimap changes aren't persisted |
| * yet (they are persisted at dml_finish() time, see |
| * AppendOnlyVisimapDelete_Finish()). So, if we use the visimap |
| * structure, we would not necessarily see all the changes. |
| */ |
| if (state->deleteDesc) |
| { |
| uniqueCheckDesc->visiMapDelete = &state->deleteDesc->visiMapDelete; |
| uniqueCheckDesc->visimap = NULL; |
| } |
| else |
| { |
| /* COPY/INSERT: Initialize the visimap */ |
| uniqueCheckDesc->visimap = palloc0(sizeof(AppendOnlyVisimap)); |
| AppendOnlyVisimap_Init_forUniqueCheck(uniqueCheckDesc->visimap, |
| relation, |
| snapshot); |
| } |
| |
| state->uniqueCheckDesc = uniqueCheckDesc; |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| return state->uniqueCheckDesc; |
| } |
| |
| /* |
| * AO_COLUMN access method uses virtual tuples |
| */ |
| static const TupleTableSlotOps * |
| aoco_slot_callbacks(Relation relation) |
| { |
| return &TTSOpsVirtual; |
| } |
| |
| struct ExtractcolumnContext |
| { |
| bool *cols; |
| AttrNumber natts; |
| bool found; |
| }; |
| |
| static bool |
| extractcolumns_walker(Node *node, struct ExtractcolumnContext *ecCtx) |
| { |
| if (node == NULL) |
| return false; |
| |
| if (IsA(node, Var)) |
| { |
| Var *var = (Var *)node; |
| |
| if (IS_SPECIAL_VARNO(var->varno)) |
| return false; |
| |
| if (var->varattno > 0 && var->varattno <= ecCtx->natts) |
| { |
| ecCtx->cols[var->varattno -1] = true; |
| ecCtx->found = true; |
| } |
| /* |
| * If all attributes are included, |
| * set all entries in mask to true. |
| */ |
| else if (var->varattno == 0) |
| { |
| for (AttrNumber attno = 0; attno < ecCtx->natts; attno++) |
| ecCtx->cols[attno] = true; |
| ecCtx->found = true; |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| return expression_tree_walker(node, extractcolumns_walker, (void *)ecCtx); |
| } |
| |
| bool |
| extractcolumns_from_node(Node *expr, bool *cols, AttrNumber natts) |
| { |
| struct ExtractcolumnContext ecCtx; |
| |
| ecCtx.cols = cols; |
| ecCtx.natts = natts; |
| ecCtx.found = false; |
| |
| extractcolumns_walker(expr, &ecCtx); |
| |
| return ecCtx.found; |
| } |
| |
| static TableScanDesc |
| aoco_beginscan_extractcolumns(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key, |
| ParallelTableScanDesc parallel_scan, |
| PlanState *ps, uint32 flags) |
| { |
| AOCSScanDesc aoscan; |
| AttrNumber natts = RelationGetNumberOfAttributes(rel); |
| List *targetlist = ps->plan->targetlist; |
| List *qual = ps->plan->qual; |
| bool *cols; |
| bool found = false; |
| |
| cols = palloc0((natts + 1) * sizeof(*cols)); |
| |
| found |= extractcolumns_from_node((Node *)targetlist, cols, natts); |
| found |= extractcolumns_from_node((Node *)qual, cols, natts); |
| |
| /* |
| * In some cases (for example, count(*)), targetlist and qual may be null, |
| * extractcolumns_walker will return immediately, so no columns are specified. |
| * We always scan the first column. |
| */ |
| if (!found) |
| cols[0] = true; |
| |
| aoscan = aocs_beginscan(rel, |
| snapshot, |
| parallel_scan, |
| cols, |
| flags); |
| |
| pfree(cols); |
| |
| if (gp_enable_predicate_pushdown) |
| ps->qual = aocs_predicate_pushdown_prepare(aoscan, qual, ps->qual, ps->ps_ExprContext, ps); |
| |
| return (TableScanDesc)aoscan; |
| } |
| |
| static TableScanDesc |
| aoco_beginscan_extractcolumns_bm(Relation rel, Snapshot snapshot, |
| List *targetlist, List *qual, |
| List *bitmapqualorig, |
| uint32 flags) |
| { |
| AOCSBitmapScan aocsBitmapScan; |
| AttrNumber natts = RelationGetNumberOfAttributes(rel); |
| bool *proj; |
| bool *projRecheck; |
| bool found; |
| |
| aocsBitmapScan = palloc0(sizeof(*aocsBitmapScan)); |
| aocsBitmapScan->descIdentifier = AOCSBITMAPSCANDATA; |
| |
| aocsBitmapScan->rs_base.rs_rd = rel; |
| aocsBitmapScan->rs_base.rs_snapshot = snapshot; |
| aocsBitmapScan->rs_base.rs_flags = flags; |
| |
| proj = palloc0(natts * sizeof(*proj)); |
| projRecheck = palloc0(natts * sizeof(*projRecheck)); |
| |
| if (snapshot == SnapshotAny) |
| aocsBitmapScan->appendOnlyMetaDataSnapshot = GetTransactionSnapshot(); |
| else |
| aocsBitmapScan->appendOnlyMetaDataSnapshot = snapshot; |
| |
| found = extractcolumns_from_node((Node *)targetlist, proj, natts); |
| found |= extractcolumns_from_node((Node *)qual, proj, natts); |
| |
| memcpy(projRecheck, proj, natts * sizeof(*projRecheck)); |
| if (extractcolumns_from_node((Node *)bitmapqualorig, projRecheck, natts)) |
| { |
| /* |
| * At least one column needs to be projected in non-recheck case. |
| * Otherwise, the AO_COLUMN fetch code may skip visimap checking because |
| * there are no columns to be scanned and we may get wrong results. |
| */ |
| if (!found) |
| proj[0] = true; |
| } |
| else if (!found) |
| { |
| /* XXX can we have no columns to project at all? */ |
| proj[0] = projRecheck[0] = true; |
| } |
| |
| aocsBitmapScan->bitmapScanDesc[NO_RECHECK].proj = proj; |
| aocsBitmapScan->bitmapScanDesc[RECHECK].proj = projRecheck; |
| |
| return (TableScanDesc)aocsBitmapScan; |
| } |
| |
| /* |
| * This function intentionally ignores key and nkeys |
| */ |
| static TableScanDesc |
| aoco_beginscan(Relation relation, |
| Snapshot snapshot, |
| int nkeys, struct ScanKeyData *key, |
| ParallelTableScanDesc pscan, |
| uint32 flags) |
| { |
| AOCSScanDesc aoscan; |
| |
| aoscan = aocs_beginscan(relation, |
| snapshot, |
| pscan, |
| NULL, |
| flags); |
| |
| return (TableScanDesc) aoscan; |
| } |
| |
| static void |
| aoco_endscan(TableScanDesc scan) |
| { |
| AOCSScanDesc aocsScanDesc; |
| AOCSBitmapScan aocsBitmapScan; |
| |
| aocsScanDesc = (AOCSScanDesc) scan; |
| if (aocsScanDesc->descIdentifier == AOCSSCANDESCDATA) |
| { |
| aocs_endscan(aocsScanDesc); |
| return; |
| } |
| |
| Assert(aocsScanDesc->descIdentifier == AOCSBITMAPSCANDATA); |
| aocsBitmapScan = (AOCSBitmapScan) scan; |
| |
| if (aocsBitmapScan->bitmapScanDesc[NO_RECHECK].bitmapFetch) |
| aocs_fetch_finish(aocsBitmapScan->bitmapScanDesc[NO_RECHECK].bitmapFetch); |
| if (aocsBitmapScan->bitmapScanDesc[RECHECK].bitmapFetch) |
| aocs_fetch_finish(aocsBitmapScan->bitmapScanDesc[RECHECK].bitmapFetch); |
| |
| pfree(aocsBitmapScan->bitmapScanDesc[NO_RECHECK].proj); |
| pfree(aocsBitmapScan->bitmapScanDesc[RECHECK].proj); |
| } |
| |
| static void |
| aoco_rescan(TableScanDesc scan, ScanKey key, |
| bool set_params, bool allow_strat, |
| bool allow_sync, bool allow_pagemode) |
| { |
| AOCSScanDesc aoscan = (AOCSScanDesc) scan; |
| |
| if (aoscan->descIdentifier == AOCSSCANDESCDATA) |
| aocs_rescan(aoscan); |
| } |
| |
| static bool |
| aoco_getnextslot(TableScanDesc scan, ScanDirection direction, TupleTableSlot *slot) |
| { |
| AOCSScanDesc aoscan = (AOCSScanDesc)scan; |
| |
| ExecClearTuple(slot); |
| if (aocs_getnext(aoscan, direction, slot)) |
| { |
| ExecStoreVirtualTuple(slot); |
| pgstat_count_heap_getnext(aoscan->rs_base.rs_rd); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| static uint32 |
| aoco_scan_flags(Relation rel) |
| { |
| return SCAN_SUPPORT_COLUMN_ORIENTED_SCAN | SCAN_SUPPORT_VECTORIZATION; |
| } |
| |
| static Size |
| aoco_parallelscan_estimate(Relation rel) |
| { |
| return sizeof(ParallelBlockTableScanDescData); |
| } |
| |
| /* |
| * AOCO only uses part fields of ParallelBlockTableScanDesc. |
| */ |
| static Size |
| aoco_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) |
| { |
| ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; |
| |
| bpscan->base.phs_relid = RelationGetRelid(rel); |
| bpscan->phs_nblocks = 0; /* init, will be updated later by table_parallelscan_initialize */ |
| pg_atomic_init_u64(&bpscan->phs_nallocated, 0); |
| /* we don't need phs_mutex and phs_startblock in ao, though, init them. */ |
| SpinLockInit(&bpscan->phs_mutex); |
| bpscan->phs_startblock = InvalidBlockNumber; |
| return sizeof(ParallelBlockTableScanDescData); |
| } |
| |
| static void |
| aoco_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) |
| { |
| ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; |
| |
| pg_atomic_write_u64(&bpscan->phs_nallocated, 0); |
| } |
| |
| static IndexFetchTableData * |
| aoco_index_fetch_begin(Relation rel) |
| { |
| IndexFetchAOCOData *aocoscan = palloc0(sizeof(IndexFetchAOCOData)); |
| |
| aocoscan->xs_base.rel = rel; |
| |
| /* aocoscan other variables are initialized lazily on first fetch */ |
| |
| return &aocoscan->xs_base; |
| } |
| |
| static void |
| aoco_index_fetch_reset(IndexFetchTableData *scan) |
| { |
| /* |
| * Unlike Heap, we don't release the resources (fetch descriptor and its |
| * members) here because it is more like a global data structure shared |
| * across scans, rather than an iterator to yield a granularity of data. |
| * |
| * Additionally, should be aware of that no matter whether allocation or |
| * release on fetch descriptor, it is considerably expensive. |
| */ |
| return; |
| } |
| |
| static void |
| aoco_index_fetch_end(IndexFetchTableData *scan) |
| { |
| IndexFetchAOCOData *aocoscan = (IndexFetchAOCOData *) scan; |
| |
| if (aocoscan->aocofetch) |
| { |
| aocs_fetch_finish(aocoscan->aocofetch); |
| pfree(aocoscan->aocofetch); |
| aocoscan->aocofetch = NULL; |
| } |
| |
| if (aocoscan->proj) |
| { |
| pfree(aocoscan->proj); |
| aocoscan->proj = NULL; |
| } |
| pfree(aocoscan); |
| } |
| |
| static bool |
| aoco_index_fetch_tuple(struct IndexFetchTableData *scan, |
| ItemPointer tid, |
| Snapshot snapshot, |
| TupleTableSlot *slot, |
| bool *call_again, bool *all_dead) |
| { |
| IndexFetchAOCOData *aocoscan = (IndexFetchAOCOData *) scan; |
| bool found = false; |
| |
| if (!aocoscan->aocofetch) |
| { |
| Snapshot appendOnlyMetaDataSnapshot; |
| int natts; |
| |
| /* Initiallize the projection info, assumes the whole row */ |
| Assert(!aocoscan->proj); |
| natts = RelationGetNumberOfAttributes(scan->rel); |
| aocoscan->proj = palloc(natts * sizeof(*aocoscan->proj)); |
| MemSet(aocoscan->proj, true, natts * sizeof(*aocoscan->proj)); |
| |
| appendOnlyMetaDataSnapshot = snapshot; |
| if (appendOnlyMetaDataSnapshot == SnapshotAny) |
| { |
| /* |
| * the append-only meta data should never be fetched with |
| * SnapshotAny as bogus results are returned. |
| */ |
| appendOnlyMetaDataSnapshot = GetTransactionSnapshot(); |
| } |
| |
| aocoscan->aocofetch = aocs_fetch_init(aocoscan->xs_base.rel, |
| snapshot, |
| appendOnlyMetaDataSnapshot, |
| aocoscan->proj); |
| } |
| |
| /* |
| * There is no reason to expect changes on snapshot between tuple |
| * fetching calls after fech_init is called, treat it as a |
| * programming error in case of occurrence. |
| */ |
| Assert(aocoscan->aocofetch->snapshot == snapshot); |
| |
| ExecClearTuple(slot); |
| |
| if (aocs_fetch(aocoscan->aocofetch, (AOTupleId *) tid, slot)) |
| { |
| ExecStoreVirtualTuple(slot); |
| found = true; |
| } |
| |
| /* |
| * Currently, we don't determine this parameter. By contract, it is to be |
| * set to true iff we can determine that this row is dead to all |
| * transactions. Failure to set this will lead to use of a garbage value |
| * in certain code, such as that for unique index checks. |
| * This is typically used for HOT chains, which we don't support. |
| */ |
| if (all_dead) |
| *all_dead = false; |
| |
| /* Currently, we don't determine this parameter. By contract, it is to be |
| * set to true iff there is another tuple for the tid, so that we can prompt |
| * the caller to call index_fetch_tuple() again for the same tid. |
| * This is typically used for HOT chains, which we don't support. |
| */ |
| if (call_again) |
| *call_again = false; |
| |
| return found; |
| } |
| |
| /* |
| * Check if a visible tuple exists given the tid and a snapshot. This is |
| * currently used to determine uniqueness checks. |
| * |
| * We determine existence simply by checking if a *visible* block directory |
| * entry covers the given tid. |
| * |
| * There is no need to fetch the tuple (we actually can't reliably do so as |
| * we might encounter a placeholder row in the block directory) |
| * |
| * If no visible block directory entry exists, we are done. If it does, we need |
| * to further check the visibility of the tuple itself by consulting the visimap. |
| * Now, the visimap check can be skipped if the tuple was found to have been |
| * inserted by a concurrent in-progress transaction, in which case we return |
| * true and have the xwait machinery kick in. |
| */ |
| static bool |
| aoco_index_unique_check(Relation rel, |
| ItemPointer tid, |
| Snapshot snapshot, |
| bool *all_dead) |
| { |
| AOCSUniqueCheckDesc uniqueCheckDesc; |
| AOTupleId *aoTupleId = (AOTupleId *) tid; |
| bool visible; |
| |
| #ifdef USE_ASSERT_CHECKING |
| int segmentFileNum = AOTupleIdGet_segmentFileNum(aoTupleId); |
| int64 rowNum = AOTupleIdGet_rowNum(aoTupleId); |
| |
| Assert(segmentFileNum != InvalidFileSegNumber); |
| Assert(rowNum != InvalidAORowNum); |
| /* |
| * Since this can only be called in the context of a unique index check, the |
| * snapshots that are supplied can only be non-MVCC snapshots: SELF and DIRTY. |
| */ |
| Assert(snapshot->snapshot_type == SNAPSHOT_SELF || |
| snapshot->snapshot_type == SNAPSHOT_DIRTY); |
| #endif |
| |
| /* |
| * Currently, we don't determine this parameter. By contract, it is to be |
| * set to true iff we can determine that this row is dead to all |
| * transactions. Failure to set this will lead to use of a garbage value |
| * in certain code, such as that for unique index checks. |
| * This is typically used for HOT chains, which we don't support. |
| */ |
| if (all_dead) |
| *all_dead = false; |
| |
| /* |
| * FIXME: for when we want CREATE UNIQUE INDEX CONCURRENTLY to work |
| * Unique constraint violation checks with SNAPSHOT_SELF are currently |
| * required to support CREATE UNIQUE INDEX CONCURRENTLY. Currently, the |
| * sole placeholder row inserted at first insert might not be visible to |
| * the snapshot, if it was already updated by its actual first row. So, |
| * we would need to flush a placeholder row at the beginning of each new |
| * in-memory minipage. Currently, CREATE INDEX CONCURRENTLY isn't |
| * supported, so we assume such a check satisfies SNAPSHOT_SELF. |
| */ |
| if (snapshot->snapshot_type == SNAPSHOT_SELF) |
| return true; |
| |
| uniqueCheckDesc = get_or_create_unique_check_desc(rel, snapshot); |
| |
| /* First, scan the block directory */ |
| if (!AppendOnlyBlockDirectory_UniqueCheck(uniqueCheckDesc->blockDirectory, |
| aoTupleId, |
| snapshot)) |
| return false; |
| |
| /* |
| * If the xmin or xmax are set for the dirty snapshot, after the block |
| * directory is scanned with the snapshot, it means that there is a |
| * concurrent in-progress transaction inserting the tuple. So, return true |
| * and have the xwait machinery kick in. |
| */ |
| Assert(snapshot->snapshot_type == SNAPSHOT_DIRTY); |
| if (TransactionIdIsValid(snapshot->xmin) || TransactionIdIsValid(snapshot->xmax)) |
| return true; |
| |
| /* Now, perform a visibility check against the visimap infrastructure */ |
| visible = AppendOnlyVisimap_UniqueCheck(uniqueCheckDesc->visiMapDelete, |
| uniqueCheckDesc->visimap, |
| aoTupleId, |
| snapshot); |
| |
| /* |
| * Since we disallow deletes and updates running in parallel with inserts, |
| * there is no way that the dirty snapshot has it's xmin and xmax populated |
| * after the visimap has been scanned with it. |
| * |
| * Note: we disallow it by grabbing an ExclusiveLock on the QD (See |
| * CdbTryOpenTable()). So if we are running in utility mode, there is no |
| * such restriction. |
| */ |
| AssertImply(Gp_role != GP_ROLE_UTILITY, |
| (!TransactionIdIsValid(snapshot->xmin) && !TransactionIdIsValid(snapshot->xmax))); |
| |
| return visible; |
| } |
| |
| static void |
| aoco_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, |
| int options, BulkInsertState bistate) |
| { |
| |
| AOCSInsertDesc insertDesc; |
| |
| insertDesc = get_insert_descriptor(relation); |
| |
| aocs_insert(insertDesc, slot); |
| |
| pgstat_count_heap_insert(relation, 1); |
| } |
| |
| /* |
| * We don't support speculative inserts on appendoptimized tables, i.e. we don't |
| * support INSERT ON CONFLICT DO NOTHING or INSERT ON CONFLICT DO UPDATE. Thus, |
| * the following functions are left unimplemented. |
| */ |
| |
| static void |
| aoco_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, |
| CommandId cid, int options, |
| BulkInsertState bistate, uint32 specToken) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("speculative insert is not supported on appendoptimized relations"))); |
| } |
| |
| static void |
| aoco_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, |
| uint32 specToken, bool succeeded) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("speculative insert is not supported on appendoptimized relations"))); |
| } |
| |
| /* |
| * aoco_multi_insert - insert multiple tuples into an ao relation |
| * |
| * This is like aoco_tuple_insert(), but inserts multiple tuples in one |
| * operation. Typicaly used by COPY. This is preferrable than calling |
| * aoco_tuple_insert() in a loop because ... WAL?? |
| */ |
| static void |
| aoco_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, |
| CommandId cid, int options, BulkInsertState bistate) |
| { |
| AOCSInsertDesc insertDesc; |
| insertDesc = get_insert_descriptor(relation); |
| |
| AOCODMLState *state; |
| state = find_dml_state(RelationGetRelid(relation)); |
| |
| for (int i = 0; i < ntuples; i++) |
| { |
| slot_getallattrs(slots[i]); |
| /* |
| * For bulk insert, we may switch insertDesc |
| * on the fly. |
| */ |
| if (state->insertMultiFiles && state->insertDesc->range == gp_appendonly_insert_files_tuples_range) |
| { |
| insertDesc = get_insert_descriptor(relation); |
| } |
| |
| aocs_insert_values(insertDesc, slots[i]->tts_values, slots[i]->tts_isnull, (AOTupleId *) &slots[i]->tts_tid); |
| } |
| |
| pgstat_count_heap_insert(relation, ntuples); |
| } |
| |
| static TM_Result |
| aoco_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, |
| Snapshot snapshot, Snapshot crosscheck, bool wait, |
| TM_FailureData *tmfd, bool changingPart) |
| { |
| AOCSDeleteDesc deleteDesc; |
| TM_Result result; |
| |
| deleteDesc = get_delete_descriptor(relation, false); |
| result = aocs_delete(deleteDesc, (AOTupleId *) tid); |
| if (result == TM_Ok) |
| pgstat_count_heap_delete(relation); |
| else if (result == TM_SelfModified) |
| { |
| /* |
| * The visibility map entry has been set and it was in this command. |
| * |
| * Our caller might want to investigate tmfd to decide on appropriate |
| * action. Set it here to match expectations. The uglyness here is |
| * preferrable to having to inspect the relation's am in the caller. |
| */ |
| tmfd->cmax = cid; |
| } |
| |
| return result; |
| } |
| |
| |
| static TM_Result |
| aoco_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, |
| CommandId cid, Snapshot snapshot, Snapshot crosscheck, |
| bool wait, TM_FailureData *tmfd, |
| LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes) |
| { |
| AOCSInsertDesc insertDesc; |
| AOCSDeleteDesc deleteDesc; |
| TM_Result result; |
| |
| insertDesc = get_insert_descriptor(relation); |
| deleteDesc = get_delete_descriptor(relation, true); |
| |
| /* Update the tuple with table oid */ |
| slot->tts_tableOid = RelationGetRelid(relation); |
| |
| #ifdef FAULT_INJECTOR |
| FaultInjector_InjectFaultIfSet( |
| "appendonly_update", |
| DDLNotSpecified, |
| "", //databaseName |
| RelationGetRelationName(insertDesc->aoi_rel)); |
| /* tableName */ |
| #endif |
| |
| result = aocs_delete(deleteDesc, (AOTupleId *) otid); |
| if (result != TM_Ok) |
| return result; |
| |
| aocs_insert(insertDesc, slot); |
| |
| pgstat_count_heap_update(relation, false, false); |
| /* No HOT updates with AO tables. */ |
| *update_indexes = TU_All; |
| |
| return result; |
| } |
| |
| /* |
| * This API is called for a variety of purposes, which are either not supported |
| * for AO/CO tables or not supported for GPDB in general: |
| * |
| * (1) UPSERT: ExecOnConflictUpdate() calls this, but clearly upsert is not |
| * supported for AO/CO tables. |
| * |
| * (2) DELETE and UPDATE triggers: GetTupleForTrigger() calls this, but clearly |
| * these trigger types are not supported for AO/CO tables. |
| * |
| * (3) Logical replication: RelationFindReplTupleByIndex() and |
| * RelationFindReplTupleSeq() calls this, but clearly we don't support logical |
| * replication yet for GPDB. |
| * |
| * (4) For DELETEs/UPDATEs, when a state of TM_Updated is returned from |
| * table_tuple_delete() and table_tuple_update() respectively, this API is invoked. |
| * However, that is impossible for AO/CO tables as an AO/CO tuple cannot be |
| * deleted/updated while another transaction is updating it (see CdbTryOpenTable()). |
| * |
| * (5) Row-level locking (SELECT FOR ..): ExecLockRows() calls this but a plan |
| * containing the LockRows plan node is never generated for AO/CO tables. In fact, |
| * we lock at the table level instead. |
| */ |
| static TM_Result |
| aoco_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, |
| TupleTableSlot *slot, CommandId cid, LockTupleMode mode, |
| LockWaitPolicy wait_policy, uint8 flags, |
| TM_FailureData *tmfd) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("tuple locking is not supported on appendoptimized tables"))); |
| } |
| |
| static void |
| aoco_finish_bulk_insert(Relation relation, int options) |
| { |
| aoco_dml_finish(relation, CMD_INSERT); |
| } |
| |
| |
| |
| |
| /* ------------------------------------------------------------------------ |
| * Callbacks for non-modifying operations on individual tuples for heap AM |
| * ------------------------------------------------------------------------ |
| */ |
| |
| static bool |
| aoco_fetch_row_version(Relation relation, |
| ItemPointer tid, |
| Snapshot snapshot, |
| TupleTableSlot *slot) |
| { |
| /* |
| * This is a generic interface. It is currently used in three distinct |
| * cases, only one of which is currently invoking it for AO tables. |
| * This is DELETE RETURNING. In order to return the slot via the tid for |
| * AO tables one would have to scan the block directory and the visibility |
| * map. A block directory is not guarranteed to exist. Even if it exists, a |
| * state would have to be created and dropped for every tuple look up since |
| * this interface does not allow for the state to be passed around. This is |
| * a very costly operation to be performed per tuple lookup. Furthermore, if |
| * a DELETE operation is currently on the fly, the corresponding visibility |
| * map entries will not have been finalized into a visibility map tuple. |
| * |
| * Error out with feature not supported. Given that this is a generic |
| * interface, we can not really say which feature is that, although we do |
| * know that is DELETE RETURNING. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("feature not supported on appendoptimized relations"))); |
| } |
| |
| static void |
| aoco_get_latest_tid(TableScanDesc sscan, |
| ItemPointer tid) |
| { |
| /* |
| * Tid scans are not supported for appendoptimized relation. This function |
| * should not have been called in the first place, but if it is called, |
| * better to error out. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("feature not supported on appendoptimized relations"))); |
| } |
| |
| static bool |
| aoco_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) |
| { |
| /* |
| * Tid scans are not supported for appendoptimized relation. This function |
| * should not have been called in the first place, but if it is called, |
| * better to error out. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("feature not supported on appendoptimized relations"))); |
| } |
| |
| static bool |
| aoco_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, |
| Snapshot snapshot) |
| { |
| /* |
| * AO_COLUMN table dose not support unique and tidscan yet. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("feature not supported on appendoptimized relations"))); |
| } |
| |
| static TransactionId |
| aoco_index_delete_tuples(Relation rel, |
| TM_IndexDeleteOp *delstate) |
| { |
| /* |
| * This API is only useful for hot standby snapshot conflict resolution |
| * (for eg. see btree_xlog_delete()), in the context of index page-level |
| * vacuums (aka page-level cleanups). This operation is only done when |
| * IndexScanDesc->kill_prior_tuple is true, which is never for AO/CO tables |
| * (we always return all_dead = false in the index_fetch_tuple() callback |
| * as we don't support HOT) |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("feature not supported on appendoptimized relations"))); |
| } |
| |
| |
| /* ------------------------------------------------------------------------ |
| * DDL related callbacks for ao_column AM. |
| * ------------------------------------------------------------------------ |
| */ |
| static void |
| aoco_relation_set_new_filenode(Relation rel, |
| const RelFileLocator *newrnode, |
| char persistence, |
| TransactionId *freezeXid, |
| MultiXactId *minmulti) |
| { |
| SMgrRelation srel; |
| |
| /* |
| * Append-optimized tables do not contain transaction information in |
| * tuples. |
| */ |
| *freezeXid = *minmulti = InvalidTransactionId; |
| |
| /* |
| * No special treatment is needed for new AO_ROW/COLUMN relation. Create |
| * the underlying disk file storage for the relation. No clean up is |
| * needed, RelationCreateStorage() is transactional. |
| * |
| * Segment files will be created when / if needed. |
| */ |
| srel = RelationCreateStorage(*newrnode, persistence, true, SMGR_AO, rel); |
| |
| /* |
| * If required, set up an init fork for an unlogged table so that it can |
| * be correctly reinitialized on restart. An immediate sync is required |
| * even if the page has been logged, because the write did not go through |
| * shared_buffers and therefore a concurrent checkpoint may have moved the |
| * redo pointer past our xlog record. Recovery may as well remove it |
| * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE |
| * record. Therefore, logging is necessary even if wal_level=minimal. |
| */ |
| if (persistence == RELPERSISTENCE_UNLOGGED) |
| { |
| Assert(rel->rd_rel->relkind == RELKIND_RELATION || |
| rel->rd_rel->relkind == RELKIND_MATVIEW || |
| rel->rd_rel->relkind == RELKIND_TOASTVALUE || |
| rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE); |
| smgrcreate(srel, INIT_FORKNUM, false); |
| log_smgrcreate(newrnode, INIT_FORKNUM, SMGR_AO); |
| smgrimmedsync(srel, INIT_FORKNUM); |
| } |
| |
| smgrclose(srel); |
| } |
| |
| /* helper routine to call open a rel and call heap_truncate_one_rel() on it */ |
| static void |
| heap_truncate_one_relid(Oid relid) |
| { |
| if (OidIsValid(relid)) |
| { |
| Relation rel = relation_open(relid, AccessExclusiveLock); |
| heap_truncate_one_rel(rel); |
| relation_close(rel, NoLock); |
| } |
| } |
| |
| static void |
| aoco_relation_nontransactional_truncate(Relation rel) |
| { |
| Oid aoseg_relid = InvalidOid; |
| Oid aoblkdir_relid = InvalidOid; |
| Oid aovisimap_relid = InvalidOid; |
| |
| ao_truncate_one_rel(rel); |
| |
| /* Also truncate the aux tables */ |
| GetAppendOnlyEntryAuxOids(rel, |
| &aoseg_relid, |
| &aoblkdir_relid, NULL, |
| &aovisimap_relid, NULL); |
| |
| heap_truncate_one_relid(aoseg_relid); |
| heap_truncate_one_relid(aoblkdir_relid); |
| heap_truncate_one_relid(aovisimap_relid); |
| } |
| |
| static void |
| aoco_relation_copy_data(Relation rel, const RelFileLocator *newrnode) |
| { |
| SMgrRelation dstrel; |
| |
| /* |
| * Use the "AO-specific" (non-shared buffers backed storage) SMGR |
| * implementation |
| */ |
| dstrel = smgropen(*newrnode, rel->rd_backend, SMGR_AO, rel); |
| |
| /* |
| * Create and copy all forks of the relation, and schedule unlinking of |
| * old physical files. |
| * |
| * NOTE: any conflict in relfilenode value will be caught in |
| * RelationCreateStorage(). |
| */ |
| RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, true, SMGR_AO, rel); |
| |
| copy_append_only_data(rel->rd_locator, *newrnode, RelationGetSmgr(rel), dstrel, rel->rd_backend, rel->rd_rel->relpersistence); |
| |
| /* |
| * For append-optimized tables, no forks other than the main fork should |
| * exist with the exception of unlogged tables. For unlogged AO tables, |
| * INIT_FORK must exist. |
| */ |
| if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) |
| { |
| Assert (smgrexists(RelationGetSmgr(rel), INIT_FORKNUM)); |
| |
| /* |
| * INIT_FORK is empty, creating it is sufficient, no need to copy |
| * contents from source to destination. |
| */ |
| smgrcreate(dstrel, INIT_FORKNUM, false); |
| |
| log_smgrcreate(newrnode, INIT_FORKNUM, SMGR_AO); |
| } |
| |
| /* drop old relation, and close new one */ |
| RelationDropStorage(rel); |
| smgrclose(dstrel); |
| } |
| |
| static void |
| aoco_vacuum_rel(Relation onerel, VacuumParams *params, |
| BufferAccessStrategy bstrategy) |
| { |
| /* |
| * We VACUUM an AO_COLUMN table through multiple phases. vacuum_rel() |
| * orchestrates the phases and calls itself again for each phase, so we |
| * get here for every phase. ao_vacuum_rel() is a wrapper of dedicated |
| * ao_vacuum_rel_*() functions for the specific phases. |
| */ |
| ao_vacuum_rel(onerel, params, bstrategy); |
| |
| return; |
| } |
| |
| static void |
| aoco_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, |
| Relation OldIndex, bool use_sort, |
| TransactionId OldestXmin, |
| TransactionId *xid_cutoff, |
| MultiXactId *multi_cutoff, |
| double *num_tuples, |
| double *tups_vacuumed, |
| double *tups_recently_dead) |
| { |
| TupleDesc oldTupDesc; |
| TupleDesc newTupDesc; |
| int natts; |
| Datum *values; |
| bool *isnull; |
| TransactionId FreezeXid; |
| MultiXactId MultiXactCutoff; |
| Tuplesortstate *tuplesort; |
| PGRUsage ru0; |
| |
| AOTupleId aoTupleId; |
| AOCSInsertDesc idesc = NULL; |
| int write_seg_no; |
| AOCSScanDesc scan = NULL; |
| TupleTableSlot *slot; |
| |
| pg_rusage_init(&ru0); |
| |
| /* |
| * Curently AO storage lacks cost model for IndexScan, thus IndexScan |
| * is not functional. In future, probably, this will be fixed and CLUSTER |
| * command will support this. Though, random IO over AO on TID stream |
| * can be impractical anyway. |
| * Here we are sorting data on on the lines of heap tables, build a tuple |
| * sort state and sort the entire AO table using the index key, rewrite |
| * the table, one tuple at a time, in order as returned by tuple sort state. |
| */ |
| if (OldIndex == NULL || !IS_BTREE(OldIndex)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot cluster append-optimized table \"%s\"", RelationGetRelationName(OldHeap)), |
| errdetail("Append-optimized tables can only be clustered against a B-tree index"))); |
| |
| /* |
| * Their tuple descriptors should be exactly alike, but here we only need |
| * assume that they have the same number of columns. |
| */ |
| oldTupDesc = RelationGetDescr(OldHeap); |
| newTupDesc = RelationGetDescr(NewHeap); |
| Assert(newTupDesc->natts == oldTupDesc->natts); |
| |
| /* Preallocate values/isnull arrays to deform heap tuples after sort */ |
| natts = newTupDesc->natts; |
| values = (Datum *) palloc(natts * sizeof(Datum)); |
| isnull = (bool *) palloc(natts * sizeof(bool)); |
| |
| /* |
| * If the OldHeap has a toast table, get lock on the toast table to keep |
| * it from being vacuumed. This is needed because autovacuum processes |
| * toast tables independently of their main tables, with no lock on the |
| * latter. If an autovacuum were to start on the toast table after we |
| * compute our OldestXmin below, it would use a later OldestXmin, and then |
| * possibly remove as DEAD toast tuples belonging to main tuples we think |
| * are only RECENTLY_DEAD. Then we'd fail while trying to copy those |
| * tuples. |
| * |
| * We don't need to open the toast relation here, just lock it. The lock |
| * will be held till end of transaction. |
| */ |
| if (OldHeap->rd_rel->reltoastrelid) |
| LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); |
| |
| /* use_wal off requires smgr_targblock be initially invalid */ |
| Assert(RelationGetTargetBlock(NewHeap) == InvalidBlockNumber); |
| |
| /* |
| * Compute sane values for FreezeXid and CutoffMulti with regular |
| * VACUUM machinery to avoidconfising existing CLUSTER code. |
| */ |
| vacuum_set_xid_limits(OldHeap, 0, 0, 0, 0, |
| &OldestXmin, &FreezeXid, NULL, &MultiXactCutoff, |
| NULL); |
| |
| /* |
| * FreezeXid will become the table's new relfrozenxid, and that mustn't go |
| * backwards, so take the max. |
| */ |
| if (TransactionIdPrecedes(FreezeXid, OldHeap->rd_rel->relfrozenxid)) |
| FreezeXid = OldHeap->rd_rel->relfrozenxid; |
| |
| /* |
| * MultiXactCutoff, similarly, shouldn't go backwards either. |
| */ |
| if (MultiXactIdPrecedes(MultiXactCutoff, OldHeap->rd_rel->relminmxid)) |
| MultiXactCutoff = OldHeap->rd_rel->relminmxid; |
| |
| /* return selected values to caller */ |
| *xid_cutoff = FreezeXid; |
| *multi_cutoff = MultiXactCutoff; |
| |
| tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex, |
| maintenance_work_mem, NULL, false); |
| |
| |
| /* Log what we're doing */ |
| ereport(DEBUG2, |
| (errmsg("clustering \"%s.%s\" using sequential scan and sort", |
| get_namespace_name(RelationGetNamespace(OldHeap)), |
| RelationGetRelationName(OldHeap)))); |
| |
| /* Scan through old table to convert data into tuples for sorting */ |
| slot = table_slot_create(OldHeap, NULL); |
| |
| scan = aocs_beginscan(OldHeap, GetActiveSnapshot(), |
| NULL /* parallel_scan */, |
| NULL /* proj */, |
| 0 /* flags */); |
| |
| while (aocs_getnext(scan, ForwardScanDirection, slot)) |
| { |
| Datum *slot_values; |
| bool *slot_isnull; |
| HeapTuple tuple; |
| CHECK_FOR_INTERRUPTS(); |
| |
| slot_getallattrs(slot); |
| slot_values = slot->tts_values; |
| slot_isnull = slot->tts_isnull; |
| |
| tuple = heap_form_tuple(oldTupDesc, slot_values, slot_isnull); |
| |
| *num_tuples += 1; |
| tuplesort_putheaptuple(tuplesort, tuple); |
| heap_freetuple(tuple); |
| } |
| |
| ExecDropSingleTupleTableSlot(slot); |
| aocs_endscan(scan); |
| |
| |
| /* |
| * Сomplete the sort, then read out all tuples |
| * from the tuplestore and write them to the new relation. |
| */ |
| |
| tuplesort_performsort(tuplesort); |
| |
| write_seg_no = ChooseSegnoForWrite(NewHeap); |
| |
| idesc = aocs_insert_init(NewHeap, write_seg_no); |
| |
| /* Insert sorted heap tuples into new storage */ |
| for (;;) |
| { |
| HeapTuple tuple; |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| tuple = tuplesort_getheaptuple(tuplesort, true); |
| if (tuple == NULL) |
| break; |
| |
| heap_deform_tuple(tuple, oldTupDesc, values, isnull); |
| aocs_insert_values(idesc, values, isnull, &aoTupleId); |
| } |
| |
| tuplesort_end(tuplesort); |
| |
| /* Finish and deallocate insertion */ |
| aocs_insert_finish(idesc, NULL); |
| } |
| |
| static bool |
| aoco_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, |
| BufferAccessStrategy bstrategy) |
| { |
| AOCSScanDesc aoscan = (AOCSScanDesc) scan; |
| aoscan->targetTupleId = blockno; |
| |
| return true; |
| } |
| |
| static bool |
| aoco_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, |
| double *liverows, double *deadrows, |
| TupleTableSlot *slot) |
| { |
| AOCSScanDesc aoscan = (AOCSScanDesc) scan; |
| bool ret = false; |
| |
| /* skip several tuples if they are not sampling target */ |
| while (aoscan->targetTupleId > aoscan->nextTupleId) |
| { |
| aoco_getnextslot(scan, ForwardScanDirection, slot); |
| aoscan->nextTupleId++; |
| } |
| |
| if (aoscan->targetTupleId == aoscan->nextTupleId) |
| { |
| ret = aoco_getnextslot(scan, ForwardScanDirection, slot); |
| aoscan->nextTupleId++; |
| |
| if (ret) |
| *liverows += 1; |
| else |
| *deadrows += 1; /* if return an invisible tuple */ |
| } |
| |
| return ret; |
| } |
| |
| static double |
| aoco_index_build_range_scan(Relation heapRelation, |
| Relation indexRelation, |
| IndexInfo *indexInfo, |
| bool allow_sync, |
| bool anyvisible, |
| bool progress, |
| BlockNumber start_blockno, |
| BlockNumber numblocks, |
| IndexBuildCallback callback, |
| void *callback_state, |
| TableScanDesc scan) |
| { |
| AOCSScanDesc aocoscan; |
| bool is_system_catalog; |
| bool checking_uniqueness; |
| Datum values[INDEX_MAX_KEYS]; |
| bool isnull[INDEX_MAX_KEYS]; |
| double reltuples; |
| ExprState *predicate; |
| TupleTableSlot *slot; |
| EState *estate; |
| ExprContext *econtext; |
| Snapshot snapshot; |
| AOCSFileSegInfo **seginfo = NULL; |
| int32 segfile_count = 0; |
| int64 total_blockcount = 0; |
| BlockNumber lastBlock = start_blockno; |
| int64 blockcounts = 0; |
| #if 0 |
| bool need_create_blk_directory = false; |
| List *tlist = NIL; |
| List *qual = indexInfo->ii_Predicate; |
| #endif |
| Oid blkdirrelid; |
| Oid blkidxrelid; |
| int64 previous_blkno = -1; |
| |
| /* |
| * sanity checks |
| */ |
| Assert(OidIsValid(indexRelation->rd_rel->relam)); |
| |
| /* Remember if it's a system catalog */ |
| is_system_catalog = IsSystemRelation(heapRelation); |
| |
| /* Appendoptimized catalog tables are not supported. */ |
| Assert(!is_system_catalog); |
| /* Appendoptimized tables have no data on master unless we are in singlenode mode. */ |
| if (IS_QUERY_DISPATCHER() && Gp_role != GP_ROLE_UTILITY) |
| return 0; |
| |
| /* See whether we're verifying uniqueness/exclusion properties */ |
| checking_uniqueness = (indexInfo->ii_Unique || |
| indexInfo->ii_ExclusionOps != NULL); |
| |
| /* |
| * "Any visible" mode is not compatible with uniqueness checks; make sure |
| * only one of those is requested. |
| */ |
| Assert(!(anyvisible && checking_uniqueness)); |
| |
| /* |
| * Need an EState for evaluation of index expressions and partial-index |
| * predicates. Also a slot to hold the current tuple. |
| */ |
| estate = CreateExecutorState(); |
| econtext = GetPerTupleExprContext(estate); |
| slot = table_slot_create(heapRelation, NULL); |
| |
| /* Arrange for econtext's scan tuple to be the tuple under test */ |
| econtext->ecxt_scantuple = slot; |
| |
| /* Set up execution state for predicate, if any. */ |
| predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate); |
| |
| if (!scan) |
| { |
| /* |
| * Serial index build. |
| * |
| * XXX: We always use SnapshotAny here. An MVCC snapshot and oldest xmin |
| * calculation is necessary to support indexes built CONCURRENTLY. |
| */ |
| snapshot = SnapshotAny; |
| |
| scan = table_beginscan_strat(heapRelation, /* relation */ |
| snapshot, /* snapshot */ |
| 0, /* number of keys */ |
| NULL, /* scan key */ |
| true, /* buffer access strategy OK */ |
| allow_sync); /* syncscan OK? */ |
| } |
| else |
| { |
| /* |
| * Parallel index build. |
| * |
| * Parallel case never registers/unregisters own snapshot. Snapshot |
| * is taken from parallel heap scan, and is SnapshotAny or an MVCC |
| * snapshot, based on same criteria as serial case. |
| */ |
| Assert(!IsBootstrapProcessingMode()); |
| Assert(allow_sync); |
| snapshot = scan->rs_snapshot; |
| } |
| |
| aocoscan = (AOCSScanDesc) scan; |
| |
| /* |
| * If block directory is empty, it must also be built along with the index. |
| */ |
| GetAppendOnlyEntryAuxOids(heapRelation, NULL, |
| &blkdirrelid, &blkidxrelid, NULL, NULL); |
| /* |
| * Note that block directory is created during creation of the first |
| * index. If it is found empty, it means the block directory was created |
| * by this create index transaction. The caller (DefineIndex) must have |
| * acquired sufficiently strong lock on the appendoptimized table such |
| * that index creation as well as insert from concurrent transactions are |
| * blocked. We can rest assured of exclusive access to the block |
| * directory relation. |
| */ |
| Relation blkdir = relation_open(blkdirrelid, AccessShareLock); |
| if (RelationGetNumberOfBlocks(blkdir) == 0) |
| { |
| /* |
| * Allocate blockDirectory in scan descriptor to let the access method |
| * know that it needs to also build the block directory while |
| * scanning. |
| */ |
| Assert(aocoscan->blockDirectory == NULL); |
| aocoscan->blockDirectory = palloc0(sizeof(AppendOnlyBlockDirectory)); |
| } |
| relation_close(blkdir, NoLock); |
| |
| |
| /* Publish number of blocks to scan */ |
| if (progress) |
| { |
| |
| /* CBDB_FIXME: fixme after block directory support cherry-picked */ |
| #if 0 |
| FileSegTotals *fileSegTotals; |
| BlockNumber totalBlocks; |
| |
| /* XXX: How can we report for builds with parallel scans? */ |
| Assert(!aocoscan->rs_base.rs_parallel); |
| |
| /* |
| * We will need to scan the entire table if we need to create a block |
| * directory, otherwise we need to scan only the columns projected. So, |
| * calculate the total blocks accordingly. |
| */ |
| if (need_create_blk_directory) |
| fileSegTotals = GetAOCSSSegFilesTotals(heapRelation, |
| aocoscan->appendOnlyMetaDataSnapshot); |
| else |
| fileSegTotals = GetAOCSSSegFilesTotalsWithProj(heapRelation, |
| aocoscan->appendOnlyMetaDataSnapshot, |
| aocoscan->columnScanInfo.proj_atts, |
| aocoscan->columnScanInfo.num_proj_atts); |
| |
| Assert(fileSegTotals->totalbytes >= 0); |
| totalBlocks = RelationGuessNumberOfBlocksFromSize(fileSegTotals->totalbytes); |
| #endif |
| FileSegTotals *fileSegTotals; |
| fileSegTotals = GetAOCSSSegFilesTotals(heapRelation, |
| aocoscan->appendOnlyMetaDataSnapshot); |
| |
| pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_TOTAL, |
| RelationGuessNumberOfBlocksFromSize(fileSegTotals->totalbytes)); |
| } |
| |
| /* set our scan endpoints */ |
| if (!allow_sync) |
| { |
| } |
| else |
| { |
| /* syncscan can only be requested on whole relation */ |
| Assert(start_blockno == 0); |
| Assert(numblocks == InvalidBlockNumber); |
| } |
| |
| reltuples = 0; |
| |
| /* |
| * Scan all tuples in the base relation. |
| */ |
| while (aoco_getnextslot(&aocoscan->rs_base, ForwardScanDirection, slot)) |
| { |
| bool tupleIsAlive; |
| AOTupleId *aoTupleId; |
| BlockNumber currblockno = ItemPointerGetBlockNumber(&slot->tts_tid); |
| |
| CHECK_FOR_INTERRUPTS(); |
| if (currblockno != lastBlock) |
| { |
| lastBlock = currblockno; |
| ++blockcounts; |
| } |
| |
| /* |
| * GPDB_12_MERGE_FIXME: How to properly do a partial scan? Currently, |
| * we scan the whole table, and throw away tuples that are not in the |
| * range. That's clearly very inefficient. |
| */ |
| if (currblockno < start_blockno || |
| (numblocks != InvalidBlockNumber && currblockno >= (start_blockno + numblocks))) |
| continue; |
| |
| /* Report scan progress, if asked to. */ |
| if (progress) |
| { |
| int64 current_blkno = |
| RelationGuessNumberOfBlocksFromSize(aocoscan->totalBytesRead); |
| |
| /* XXX: How can we report for builds with parallel scans? */ |
| Assert(!aocoscan->rs_base.rs_parallel); |
| |
| /* As soon as a new block starts, report it as scanned */ |
| if (current_blkno != previous_blkno) |
| { |
| pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE, |
| current_blkno); |
| previous_blkno = current_blkno; |
| } |
| } |
| |
| aoTupleId = (AOTupleId *) &slot->tts_tid; |
| /* |
| * We didn't perform the check to see if the tuple was deleted in |
| * aocs_getnext(), since we passed it SnapshotAny. See aocs_getnext() |
| * for details. We need to do this to avoid spurious conflicts with |
| * deleted tuples for unique index builds. |
| */ |
| if (AppendOnlyVisimap_IsVisible(&aocoscan->visibilityMap, aoTupleId)) |
| { |
| tupleIsAlive = true; |
| reltuples += 1; |
| } |
| else |
| tupleIsAlive = false; /* excluded from unique-checking */ |
| |
| MemoryContextReset(econtext->ecxt_per_tuple_memory); |
| |
| /* |
| * In a partial index, discard tuples that don't satisfy the |
| * predicate. |
| */ |
| if (predicate != NULL) |
| { |
| if (!ExecQual(predicate, econtext)) |
| continue; |
| } |
| |
| /* |
| * For the current heap tuple, extract all the attributes we use in |
| * this index, and note which are null. This also performs evaluation |
| * of any expressions needed. |
| */ |
| FormIndexDatum(indexInfo, |
| slot, |
| estate, |
| values, |
| isnull); |
| |
| /* |
| * You'd think we should go ahead and build the index tuple here, but |
| * some index AMs want to do further processing on the data first. So |
| * pass the values[] and isnull[] arrays, instead. |
| */ |
| |
| /* Call the AM's callback routine to process the tuple */ |
| /* |
| * GPDB: the callback is modified to accept ItemPointer as argument |
| * instead of HeapTuple. That allows the callback to be reused for |
| * appendoptimized tables. |
| */ |
| callback(indexRelation, &slot->tts_tid, values, isnull, tupleIsAlive, |
| callback_state); |
| |
| } |
| |
| if (progress) |
| { |
| pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE, |
| total_blockcount); |
| if (seginfo) |
| { |
| FreeAllAOCSSegFileInfo(seginfo, segfile_count); |
| pfree(seginfo); |
| } |
| } |
| |
| table_endscan(scan); |
| |
| ExecDropSingleTupleTableSlot(slot); |
| |
| FreeExecutorState(estate); |
| |
| /* These may have been pointing to the now-gone estate */ |
| indexInfo->ii_ExpressionsState = NIL; |
| indexInfo->ii_PredicateState = NULL; |
| |
| return reltuples; |
| } |
| |
| static void |
| aoco_index_validate_scan(Relation heapRelation, |
| Relation indexRelation, |
| IndexInfo *indexInfo, |
| Snapshot snapshot, |
| ValidateIndexState *state) |
| { |
| elog(ERROR, "not implemented yet"); |
| } |
| |
| /* ------------------------------------------------------------------------ |
| * Miscellaneous callbacks for the heap AM |
| * ------------------------------------------------------------------------ |
| */ |
| |
| /* |
| * This pretends that the all the space is taken by the main fork. |
| * Returns the compressed size. |
| */ |
| static uint64 |
| aoco_relation_size(Relation rel, ForkNumber forkNumber) |
| { |
| AOCSFileSegInfo **allseg; |
| Snapshot snapshot; |
| uint64 totalbytes = 0; |
| int totalseg; |
| |
| if (forkNumber != MAIN_FORKNUM) |
| return totalbytes; |
| |
| snapshot = RegisterSnapshot(GetLatestSnapshot()); |
| allseg = GetAllAOCSFileSegInfo(rel, snapshot, &totalseg, NULL); |
| for (int seg = 0; seg < totalseg; seg++) |
| { |
| for (int attr = 0; attr < RelationGetNumberOfAttributes(rel); attr++) |
| { |
| AOCSVPInfoEntry *entry; |
| |
| /* |
| * AWAITING_DROP segments might be missing information for some |
| * (newly-added) columns. |
| */ |
| if (attr < allseg[seg]->vpinfo.nEntry) |
| { |
| entry = getAOCSVPEntry(allseg[seg], attr); |
| /* Always return the compressed size */ |
| totalbytes += entry->eof; |
| } |
| |
| CHECK_FOR_INTERRUPTS(); |
| } |
| } |
| |
| if (allseg) |
| { |
| FreeAllAOCSSegFileInfo(allseg, totalseg); |
| pfree(allseg); |
| } |
| UnregisterSnapshot(snapshot); |
| |
| return totalbytes; |
| } |
| |
| /* |
| * For each AO segment, get the starting heap block number and the number of |
| * heap blocks (together termed as a BlockSequence). The starting heap block |
| * number is always deterministic given a segment number. See AOtupleId. |
| * |
| * The number of heap blocks can be determined from the last row number present |
| * in the segment. See appendonlytid.h for details. |
| */ |
| static BlockSequence * |
| aoco_relation_get_block_sequences(Relation rel, int *numSequences) |
| { |
| Snapshot snapshot; |
| Oid segrelid; |
| int nsegs; |
| BlockSequence *sequences; |
| AOCSFileSegInfo **seginfos; |
| |
| Assert(RelationIsValid(rel)); |
| Assert(numSequences); |
| |
| snapshot = RegisterSnapshot(GetCatalogSnapshot(InvalidOid)); |
| |
| seginfos = GetAllAOCSFileSegInfo(rel, snapshot, &nsegs, &segrelid); |
| sequences = (BlockSequence *) palloc(sizeof(BlockSequence) * nsegs); |
| *numSequences = nsegs; |
| |
| /* |
| * For each aoseg, the sequence starts at a fixed heap block number and |
| * contains up to the highest numbered heap block corresponding to the |
| * lastSequence value of that segment. |
| */ |
| for (int i = 0; i < nsegs; i++) |
| AOSegment_PopulateBlockSequence(&sequences[i], segrelid, seginfos[i]->segno); |
| |
| UnregisterSnapshot(snapshot); |
| |
| if (seginfos != NULL) |
| { |
| FreeAllAOCSSegFileInfo(seginfos, nsegs); |
| pfree(seginfos); |
| } |
| |
| return sequences; |
| } |
| |
| /* |
| * Populate the BlockSequence corresponding to the AO segment in which the |
| * logical heap block 'blkNum' falls. |
| */ |
| static void |
| aoco_relation_get_block_sequence(Relation rel, |
| BlockNumber blkNum, |
| BlockSequence *sequence) |
| { |
| Oid segrelid; |
| |
| GetAppendOnlyEntryAuxOids(rel, &segrelid, NULL, NULL, NULL, NULL); |
| AOSegment_PopulateBlockSequence(sequence, segrelid, AOSegmentGet_segno(blkNum)); |
| } |
| |
| static bool |
| aoco_relation_needs_toast_table(Relation rel) |
| { |
| /* |
| * AO_COLUMN never used the toasting, don't create the toast table from |
| * Cloudberry 7 |
| */ |
| return false; |
| } |
| |
| /* ------------------------------------------------------------------------ |
| * Planner related callbacks for the heap AM |
| * ------------------------------------------------------------------------ |
| */ |
| static void |
| aoco_estimate_rel_size(Relation rel, int32 *attr_widths, |
| BlockNumber *pages, double *tuples, |
| double *allvisfrac) |
| { |
| FileSegTotals *fileSegTotals; |
| Snapshot snapshot; |
| |
| *pages = 1; |
| *tuples = 1; |
| *allvisfrac = 0; |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| return; |
| |
| snapshot = RegisterSnapshot(GetLatestSnapshot()); |
| fileSegTotals = GetAOCSSSegFilesTotals(rel, snapshot); |
| |
| *tuples = (double)fileSegTotals->totaltuples; |
| |
| /* Quick exit if empty */ |
| if (*tuples == 0) |
| { |
| UnregisterSnapshot(snapshot); |
| *pages = 0; |
| return; |
| } |
| |
| Assert(fileSegTotals->totalbytesuncompressed > 0); |
| *pages = RelationGuessNumberOfBlocksFromSize( |
| (uint64)fileSegTotals->totalbytesuncompressed); |
| |
| UnregisterSnapshot(snapshot); |
| |
| /* |
| * Do not bother scanning the visimap aux table. |
| * Investigate if really needed. |
| * |
| * Refer to the comments at the end of function |
| * appendonly_estimate_rel_size(). |
| */ |
| |
| return; |
| } |
| |
| /* ------------------------------------------------------------------------ |
| * Executor related callbacks for the heap AM |
| * ------------------------------------------------------------------------ |
| */ |
| static bool |
| aoco_scan_bitmap_next_block(TableScanDesc scan, |
| TBMIterateResult *tbmres) |
| { |
| AOCSBitmapScan aocsBitmapScan = (AOCSBitmapScan)scan; |
| |
| /* Make sure we never cross 15-bit offset number [MPP-24326] */ |
| Assert(tbmres->ntuples <= INT16_MAX + 1); |
| |
| /* |
| * Start scanning from the beginning of the offsets array (or |
| * at first "offset number" if it's a lossy page). |
| * In nodeBitmapHeapscan.c's BitmapHeapNext. After call |
| * `table_scan_bitmap_next_block` and return false, it doesn't |
| * clean the tbmres. Then it'll call aoco_scan_bitmap_next_tuple |
| * to try to get tuples from the skipped page, and it'll return false. |
| * Althouth aoco_scan_bitmap_next_tuple works fine. |
| * But it still be better to set these init value before return in case |
| * of wrong init value. |
| */ |
| aocsBitmapScan->rs_cindex = 0; |
| |
| /* If tbmres contains no tuples, continue. */ |
| if (tbmres->ntuples == 0) |
| return false; |
| |
| /* |
| * which descriptor to be used for fetching the data |
| */ |
| aocsBitmapScan->whichDesc = (tbmres->recheck) ? RECHECK : NO_RECHECK; |
| |
| return true; |
| } |
| |
| static bool |
| aoco_scan_bitmap_next_tuple(TableScanDesc scan, |
| TBMIterateResult *tbmres, |
| TupleTableSlot *slot) |
| { |
| AOCSBitmapScan aocsBitmapScan = (AOCSBitmapScan)scan; |
| AOCSFetchDesc aocoFetchDesc; |
| OffsetNumber pseudoOffset; |
| ItemPointerData pseudoTid; |
| AOTupleId aoTid; |
| int numTuples; |
| |
| aocoFetchDesc = aocsBitmapScan->bitmapScanDesc[aocsBitmapScan->whichDesc].bitmapFetch; |
| if (aocoFetchDesc == NULL) |
| { |
| aocoFetchDesc = aocs_fetch_init(aocsBitmapScan->rs_base.rs_rd, |
| aocsBitmapScan->rs_base.rs_snapshot, |
| aocsBitmapScan->appendOnlyMetaDataSnapshot, |
| aocsBitmapScan->bitmapScanDesc[aocsBitmapScan->whichDesc].proj); |
| aocsBitmapScan->bitmapScanDesc[aocsBitmapScan->whichDesc].bitmapFetch = aocoFetchDesc; |
| } |
| |
| ExecClearTuple(slot); |
| |
| /* ntuples == -1 indicates a lossy page */ |
| numTuples = (tbmres->ntuples == -1) ? INT16_MAX + 1 : tbmres->ntuples; |
| while (aocsBitmapScan->rs_cindex < numTuples) |
| { |
| /* |
| * If it's a lossy page, iterate through all possible "offset numbers". |
| * Otherwise iterate through the array of "offset numbers". |
| */ |
| if (tbmres->ntuples == -1) |
| { |
| /* |
| * +1 to convert index to offset, since TID offsets are not zero |
| * based. |
| */ |
| pseudoOffset = aocsBitmapScan->rs_cindex + 1; |
| } |
| else |
| pseudoOffset = tbmres->offsets[aocsBitmapScan->rs_cindex]; |
| |
| aocsBitmapScan->rs_cindex++; |
| |
| /* |
| * Okay to fetch the tuple |
| */ |
| ItemPointerSet(&pseudoTid, tbmres->blockno, pseudoOffset); |
| tbm_convert_appendonly_tid_out(&pseudoTid, &aoTid); |
| |
| if (aocs_fetch(aocoFetchDesc, &aoTid, slot)) |
| { |
| /* OK to return this tuple */ |
| ExecStoreVirtualTuple(slot); |
| pgstat_count_heap_fetch(aocsBitmapScan->rs_base.rs_rd); |
| |
| return true; |
| } |
| } |
| |
| /* Done with this block */ |
| return false; |
| } |
| |
| static bool |
| aoco_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) |
| { |
| AOCSScanDesc aoscan = (AOCSScanDesc) scan; |
| TsmRoutine *tsm = scanstate->tsmroutine; |
| BlockNumber blockno = 0; |
| |
| if (aoscan->totalTuples == 0) |
| { |
| AOCSFileSegInfo **seginfo; |
| int segfile_count; |
| int64 total_tupcount = 0; |
| |
| seginfo = GetAllAOCSFileSegInfo(aoscan->rs_base.rs_rd, NULL, &segfile_count, NULL); |
| for (int seginfo_no = 0; seginfo_no < segfile_count; seginfo_no++) |
| { |
| total_tupcount += seginfo[seginfo_no]->total_tupcount; |
| } |
| |
| aoscan->totalTuples = total_tupcount; |
| |
| if (seginfo) |
| { |
| FreeAllAOCSSegFileInfo(seginfo, segfile_count); |
| pfree(seginfo); |
| } |
| } |
| |
| if (tsm->NextSampleBlock) |
| { |
| blockno = tsm->NextSampleBlock(scanstate, aoscan->totalTuples); |
| } |
| else |
| { |
| /* |
| * Tuple sampling is in advance implemented in next_block because |
| * next_tuple operates on a tuple, not on a block, and cannot be sampled. |
| * The reason the whole logic is implemented this way is because |
| * variable length blocks cannot be accessed randomly, and the tuple visibility |
| * needs to be determined by traversing the block. Therefore, the current implementation is adopted. |
| */ |
| blockno = system_nextsampleblock(scanstate, aoscan->totalTuples); |
| } |
| |
| if (!BlockNumberIsValid(blockno)) |
| return false; |
| |
| aoscan->fetchTupleId = blockno; |
| return true; |
| } |
| |
| static bool |
| aoco_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, |
| TupleTableSlot *slot) |
| { |
| AOCSScanDesc aoscan = (AOCSScanDesc) scan; |
| bool ret = false; |
| |
| /* skip several tuples if they are not sampling target */ |
| while (aoscan->fetchTupleId > aoscan->nextTupleId) |
| { |
| aoco_getnextslot(scan, ForwardScanDirection, slot); |
| aoscan->nextTupleId++; |
| } |
| |
| if (aoscan->fetchTupleId == aoscan->nextTupleId) |
| { |
| ret = aoco_getnextslot(scan, ForwardScanDirection, slot); |
| aoscan->nextTupleId++; |
| } |
| |
| return ret; |
| } |
| |
| static void |
| aoco_swap_relation_files(Oid relid1, Oid relid2, |
| TransactionId frozenXid pg_attribute_unused(), |
| MultiXactId cutoffMulti pg_attribute_unused()) |
| { |
| SwapAppendonlyEntries(relid1, relid2); |
| } |
| |
| static void |
| aoco_validate_column_encoding_clauses(List *aocoColumnEncoding) |
| { |
| validateAOCOColumnEncodingClauses(aocoColumnEncoding); |
| } |
| |
| static List * |
| aoco_transform_column_encoding_clauses(Relation rel, List *aocoColumnEncoding, |
| bool validate, |
| bool optionFromType pg_attribute_unused()) |
| { |
| ListCell *lc; |
| DefElem *dl; |
| bool foundCompressType = false; |
| bool foundCompressTypeNone = false; |
| char *cmplevel = NULL; |
| bool foundBlockSize = false; |
| bool hasAttrs; |
| char *arg; |
| List *retList = list_copy(aocoColumnEncoding); |
| DefElem *el; |
| const StdRdOptions *ao_opts = currentAOStorageOptions(); |
| |
| int32 blocksize = -1; |
| int16 compresslevel = 0; |
| char *compresstype = NULL; |
| NameData compresstype_nd; |
| |
| /* |
| * The relam of partition table may be ao table, but partition table |
| * has no entry in pg_appendonly. It shouldn't fetch encoding options |
| * from here for partition tables. See details in function |
| * transformColumnEncoding. |
| */ |
| hasAttrs = rel && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; |
| |
| foreach(lc, aocoColumnEncoding) |
| { |
| dl = (DefElem *) lfirst(lc); |
| if (pg_strncasecmp(dl->defname, SOPT_CHECKSUM, strlen(SOPT_CHECKSUM)) == 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("\"%s\" is not a column specific option", |
| SOPT_CHECKSUM))); |
| } |
| } |
| |
| foreach(lc, aocoColumnEncoding) |
| { |
| el = lfirst(lc); |
| |
| if (pg_strcasecmp("compresstype", el->defname) == 0) |
| { |
| foundCompressType = true; |
| arg = defGetString(el); |
| if (pg_strcasecmp("none", arg) == 0) |
| foundCompressTypeNone = true; |
| } |
| else if (pg_strcasecmp("compresslevel", el->defname) == 0) |
| { |
| cmplevel = defGetString(el); |
| } |
| else if (pg_strcasecmp("blocksize", el->defname) == 0) |
| foundBlockSize = true; |
| } |
| |
| /* |
| * if rel is not NULL, the function is called for ADD COLUMN. |
| * table setting in pg_appendonly is preferred over default |
| * options in GUC gp_default_storage_option. |
| */ |
| if (hasAttrs) |
| { |
| GetAppendOnlyEntryAttributes(RelationGetRelid(rel), |
| &blocksize, |
| &compresslevel, |
| NULL, |
| &compresstype_nd); |
| compresstype = NameStr(compresstype_nd); |
| } |
| |
| if (!foundCompressType && hasAttrs && compresstype[0]) |
| { |
| el = makeDefElem("compresstype", (Node *) makeString(pstrdup(compresstype)), -1); |
| retList = lappend(retList, el); |
| if (compresslevel == 0 && ao_opts->compresslevel != 0) |
| compresslevel = ao_opts->compresslevel; |
| |
| if (compresslevel != 0) |
| { |
| el = makeDefElem("compresslevel", |
| (Node *) makeInteger(compresslevel), |
| -1); |
| retList = lappend(retList, el); |
| } |
| } |
| else if (!foundCompressType && cmplevel == NULL) |
| { |
| /* No compression option specified, use current defaults. */ |
| arg = ao_opts->compresstype[0] ? |
| pstrdup(ao_opts->compresstype) : "none"; |
| el = makeDefElem("compresstype", (Node *) makeString(arg), -1); |
| retList = lappend(retList, el); |
| el = makeDefElem("compresslevel", |
| (Node *) makeInteger(ao_opts->compresslevel), |
| -1); |
| retList = lappend(retList, el); |
| } |
| else if (!foundCompressType && cmplevel) |
| { |
| if (strcmp(cmplevel, "0") == 0) |
| { |
| /* |
| * User wants to disable compression by specifying |
| * compresslevel=0. |
| */ |
| el = makeDefElem("compresstype", (Node *) makeString("none"), -1); |
| retList = lappend(retList, el); |
| } |
| else |
| { |
| /* |
| * User wants to enable compression by specifying non-zero |
| * compresslevel. Therefore, choose default compresstype |
| * if configured, otherwise use zlib. |
| */ |
| if (ao_opts->compresstype[0] && |
| strcmp(ao_opts->compresstype, "none") != 0) |
| { |
| arg = pstrdup(ao_opts->compresstype); |
| } |
| else |
| { |
| arg = AO_DEFAULT_COMPRESSTYPE; |
| } |
| el = makeDefElem("compresstype", (Node *) makeString(arg), -1); |
| retList = lappend(retList, el); |
| } |
| } |
| else if (foundCompressType && cmplevel == NULL) |
| { |
| if (foundCompressTypeNone) |
| { |
| /* |
| * User wants to disable compression by specifying |
| * compresstype=none. |
| */ |
| el = makeDefElem("compresslevel", (Node *) makeInteger(0), -1); |
| retList = lappend(retList, el); |
| } |
| else |
| { |
| /* |
| * Valid compresstype specified. Use default |
| * compresslevel if it's non-zero, otherwise use 1. |
| */ |
| el = makeDefElem("compresslevel", |
| (Node *) makeInteger(ao_opts->compresslevel > 0 ? |
| ao_opts->compresslevel : 1), |
| -1); |
| retList = lappend(retList, el); |
| } |
| } |
| |
| if (foundBlockSize == false) |
| { |
| if (blocksize <= 0) |
| blocksize = ao_opts->blocksize; |
| el = makeDefElem("blocksize", (Node *) makeInteger(blocksize), -1); |
| retList = lappend(retList, el); |
| } |
| /* |
| * The following two statements validate that the encoding clause is well |
| * formed. |
| */ |
| if (validate) |
| { |
| Datum d; |
| |
| d = transformRelOptions(PointerGetDatum(NULL), |
| retList, |
| NULL, NULL, |
| true, false); |
| (void) default_reloptions(d, true, RELOPT_KIND_APPENDOPTIMIZED); |
| } |
| |
| return retList; |
| } |
| |
| /* ------------------------------------------------------------------------ |
| * Definition of the AO_COLUMN table access method. |
| * |
| * NOTE: While there is a lot of functionality shared with the appendoptimized |
| * access method, is best for the hanlder methods to remain static in order to |
| * honour the contract of the access method interface. |
| * ------------------------------------------------------------------------ |
| */ |
| static TableAmRoutine ao_column_methods = { |
| .type = T_TableAmRoutine, |
| .slot_callbacks = aoco_slot_callbacks, |
| |
| /* |
| * GPDB: it is needed to extract the column information for |
| * scans before calling beginscan. This can not happen in beginscan because |
| * the needed information is not available at that time. It is the caller's |
| * responsibility to choose to call aoco_beginscan_extractcolumns or |
| * aoco_beginscan. |
| */ |
| .scan_begin_extractcolumns = aoco_beginscan_extractcolumns, |
| |
| /* |
| * GPDB: Like above but for bitmap scans. |
| */ |
| .scan_begin_extractcolumns_bm = aoco_beginscan_extractcolumns_bm, |
| |
| .scan_begin = aoco_beginscan, |
| .scan_end = aoco_endscan, |
| .scan_rescan = aoco_rescan, |
| .scan_getnextslot = aoco_getnextslot, |
| .scan_flags = aoco_scan_flags, |
| |
| .parallelscan_estimate = aoco_parallelscan_estimate, |
| .parallelscan_initialize = aoco_parallelscan_initialize, |
| .parallelscan_reinitialize = aoco_parallelscan_reinitialize, |
| |
| .index_fetch_begin = aoco_index_fetch_begin, |
| .index_fetch_reset = aoco_index_fetch_reset, |
| .index_fetch_end = aoco_index_fetch_end, |
| .index_fetch_tuple = aoco_index_fetch_tuple, |
| .index_unique_check = aoco_index_unique_check, |
| |
| .tuple_insert = aoco_tuple_insert, |
| .tuple_insert_speculative = aoco_tuple_insert_speculative, |
| .tuple_complete_speculative = aoco_tuple_complete_speculative, |
| .multi_insert = aoco_multi_insert, |
| .tuple_delete = aoco_tuple_delete, |
| .tuple_update = aoco_tuple_update, |
| .tuple_lock = aoco_tuple_lock, |
| .finish_bulk_insert = aoco_finish_bulk_insert, |
| |
| .tuple_fetch_row_version = aoco_fetch_row_version, |
| .tuple_get_latest_tid = aoco_get_latest_tid, |
| .tuple_tid_valid = aoco_tuple_tid_valid, |
| .tuple_satisfies_snapshot = aoco_tuple_satisfies_snapshot, |
| .index_delete_tuples = aoco_index_delete_tuples, |
| |
| .relation_set_new_filelocator = aoco_relation_set_new_filenode, |
| .relation_nontransactional_truncate = aoco_relation_nontransactional_truncate, |
| .relation_copy_data = aoco_relation_copy_data, |
| .relation_copy_for_cluster = aoco_relation_copy_for_cluster, |
| .relation_vacuum = aoco_vacuum_rel, |
| .scan_analyze_next_block = aoco_scan_analyze_next_block, |
| .scan_analyze_next_tuple = aoco_scan_analyze_next_tuple, |
| .index_build_range_scan = aoco_index_build_range_scan, |
| .index_validate_scan = aoco_index_validate_scan, |
| |
| .relation_size = aoco_relation_size, |
| .relation_get_block_sequences = aoco_relation_get_block_sequences, |
| .relation_get_block_sequence = aoco_relation_get_block_sequence, |
| .relation_needs_toast_table = aoco_relation_needs_toast_table, |
| |
| .relation_estimate_size = aoco_estimate_rel_size, |
| |
| .scan_bitmap_next_block = aoco_scan_bitmap_next_block, |
| .scan_bitmap_next_tuple = aoco_scan_bitmap_next_tuple, |
| .scan_sample_next_block = aoco_scan_sample_next_block, |
| .scan_sample_next_tuple = aoco_scan_sample_next_tuple, |
| .acquire_sample_rows = acquire_sample_rows, |
| |
| .amoptions = ao_amoptions, |
| .swap_relation_files = aoco_swap_relation_files, |
| .validate_column_encoding_clauses = aoco_validate_column_encoding_clauses, |
| .transform_column_encoding_clauses = aoco_transform_column_encoding_clauses, |
| }; |
| |
| Datum |
| ao_column_tableam_handler(PG_FUNCTION_ARGS) |
| { |
| PG_RETURN_POINTER(&ao_column_methods); |
| } |