| /*------------------------------------------------------------------------- |
| * |
| * memquota.c |
| * Routines related to memory quota for queries. |
| * |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| *-------------------------------------------------------------------------*/ |
| |
| #include "cdb/memquota.h" |
| #include "access/heapam.h" |
| #include "catalog/pg_attribute_encoding.h" |
| #include "catalog/pg_class.h" |
| #include "catalog/pg_resqueue.h" |
| #include "cdb/cdbllize.h" |
| #include "cdb/cdbparquetam.h" |
| #include "cdb/cdbpartition.h" |
| #include "cdb/cdbvars.h" |
| #include "commands/queue.h" |
| #include "commands/tablecmds.h" |
| #include "executor/executor.h" |
| #include "executor/execdesc.h" |
| #include "miscadmin.h" |
| #include "optimizer/clauses.h" |
| #include "parser/parsetree.h" |
| #include "storage/lwlock.h" |
| #include "utils/guc_tables.h" |
| #include "utils/lsyscache.h" |
| #include "utils/relcache.h" |
| #include "utils/resscheduler.h" |
| |
| /** |
| * This contains information that will be used memory quota assignment. |
| */ |
| typedef struct MemQuotaContext |
| { |
| plan_tree_base_prefix base; /* Required prefix for plan_tree_walker/mutator */ |
| uint64 numNonMemIntensiveOperators; /* number of non-blocking operators */ |
| uint64 numMemIntensiveOperators; /* number of blocking operators */ |
| uint64 queryMemKB; |
| PlannedStmt *plannedStmt; /* pointer to the planned statement */ |
| uint64 parquetOpReservedMemKB; /* memory reserved for parquet related operators */ |
| |
| /* hash table of root Oids of parquet tables for Planner's plan */ |
| HTAB * parquetRootOids; |
| } MemQuotaContext; |
| |
| /** |
| * Forward declarations. |
| */ |
| static bool PrelimWalker(Node *node, MemQuotaContext *context); |
| static bool IsAggMemoryIntensive(Agg *agg); |
| static bool IsMemoryIntensiveOperator(Node *node, PlannedStmt *stmt); |
| static bool IsParquetScanOperator(Node *node, PlannedStmt *stmt); |
| static bool IsParquetInsertOperator(Node *node, PlannedStmt *stmt); |
| static uint64 MemoryReservedForParquetInsertForPlannerPlan(PlannedStmt *stmt); |
| static List* GetScannedColumnsForTable(Node *node, Oid rel_oid); |
| static uint64 MemoryReservedForParquetScan(Node *node, PlannedStmt *stmt, HTAB *parquetRootOids); |
| static uint64 MemoryReservedForParquetInsert(Oid rel_oid); |
| static Oid GetPartOidForParquetScan(Node *node, Oid rootOid, uint64 *maxMemRequired, bool needRemap); |
| static Oid GetPartOidForParquetInsert(Oid root_oid, uint64 *maxMemRequired); |
| static HTAB* createOidHTAB(); |
| |
| struct OperatorGroupNode; |
| |
| /* |
| * OperatorGroupNode |
| * Store the information regarding an operator group. |
| */ |
| typedef struct OperatorGroupNode |
| { |
| /* The id for this group */ |
| uint32 groupId; |
| |
| /* |
| * The number of non-memory-intensive operators and memory-intensive |
| * operators in the group. |
| */ |
| uint64 numNonMemIntenseOps; |
| uint64 numMemIntenseOps; |
| |
| /* |
| * The maximal number of non-memory-intensive and memory-intensive |
| * operators in all child groups of this group which might be active |
| * concurrently. |
| */ |
| uint64 maxNumConcNonMemIntenseOps; |
| uint64 maxNumConcMemIntenseOps; |
| |
| /* The list of child groups */ |
| List *childGroups; |
| |
| /* The parent group */ |
| struct OperatorGroupNode *parentGroup; |
| |
| /* The memory limit for this group and its child groups */ |
| uint64 groupMemKB; |
| |
| /* memory reserved for parquet related operators in this group */ |
| uint64 parquetOpReservedMemKB; |
| |
| /* hash table of root Oids of parquet tables for Planner's plan */ |
| HTAB * parquetRootOids; |
| |
| } OperatorGroupNode; |
| |
| /* |
| * PolicyEagerFreeContext |
| * Store the intermediate states during the tree walking for the optimize |
| * memory distribution policy. |
| */ |
| typedef struct PolicyEagerFreeContext |
| { |
| plan_tree_base_prefix base; |
| OperatorGroupNode *groupTree; /* the root of the group tree */ |
| OperatorGroupNode *groupNode; /* the current group node in the group tree */ |
| uint32 nextGroupId; /* the group id for a new group node */ |
| uint64 queryMemKB; /* the query memory limit */ |
| PlannedStmt *plannedStmt; /* pointer to the planned statement */ |
| } PolicyEagerFreeContext; |
| |
| /** |
| * GUCs |
| */ |
| int gp_resqueue_memory_policy_auto_fixed_mem; |
| bool gp_resqueue_print_operator_memory_limits; |
| |
| /** |
| * create a HTAB for Oids |
| */ |
| static HTAB* |
| createOidHTAB() |
| { |
| HASHCTL hashCtl; |
| MemSet(&hashCtl, 0, sizeof(HASHCTL)); |
| hashCtl.keysize = sizeof(Oid); |
| hashCtl.entrysize = sizeof(Oid); |
| hashCtl.hash = oid_hash; |
| hashCtl.hcxt = CurrentMemoryContext; |
| |
| return hash_create("Parquet Table Root Oids", |
| INITIAL_NUM_PIDS, |
| &hashCtl, |
| HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); |
| } |
| |
| /** |
| * Is an agg operator memory intensive? The following cases mean it is: |
| * 1. If agg strategy is hashed |
| * 2. If targetlist or qual contains a DQA |
| * 3. If there is an ordered aggregated. |
| */ |
| static bool IsAggMemoryIntensive(Agg *agg) |
| { |
| Assert(agg); |
| |
| /* Case 1 */ |
| if (agg->aggstrategy == AGG_HASHED) |
| { |
| return true; |
| } |
| |
| AggClauseCounts aggInfo; |
| |
| /* Zero it out */ |
| MemSet(&aggInfo, 0, sizeof(aggInfo)); |
| |
| Plan *plan = (Plan *) &(agg->plan); |
| count_agg_clauses((Node *) plan->targetlist, &aggInfo); |
| count_agg_clauses((Node *) plan->qual, &aggInfo); |
| |
| /* Case 2 */ |
| if (aggInfo.numDistinctAggs >0) |
| { |
| return true; |
| } |
| |
| /* Case 3 */ |
| if (aggInfo.aggOrder != NIL ) |
| { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /* |
| * IsAggBlockingOperator |
| * Return true if an Agg node is a blocking operator. |
| * |
| * Agg is a blocking operator when it is |
| * 1. Scalar Agg |
| * 2. Second stage HashAgg when streaming is on. |
| * 3. First and Second stage HashAgg when streaming is off. |
| */ |
| static bool |
| IsAggBlockingOperator(Agg *agg) |
| { |
| if (agg->aggstrategy == AGG_PLAIN) |
| { |
| return true; |
| } |
| |
| return (agg->aggstrategy == AGG_HASHED && !agg->streaming); |
| } |
| |
| /* |
| * isMaterialBlockingOperator |
| * Return true if a Material node is a blocking operator. |
| * |
| * Material node is a blocking operator when cdb_strict is on. |
| */ |
| static bool |
| IsMaterialBlockingOperator(Material *material) |
| { |
| return material->cdb_strict; |
| } |
| |
| /* |
| * IsBlockingOperator |
| * Return true when the given plan node is a blocking operator. |
| */ |
| static bool |
| IsBlockingOperator(Node *node) |
| { |
| switch(nodeTag(node)) |
| { |
| case T_BitmapIndexScan: |
| case T_Hash: |
| case T_Sort: |
| return true; |
| |
| case T_Material: |
| return IsMaterialBlockingOperator((Material *)node); |
| |
| case T_Agg: |
| return IsAggBlockingOperator((Agg *)node); |
| |
| default: |
| return false; |
| } |
| } |
| |
| /** |
| * Is a result node memory intensive? It is if it contains function calls. |
| */ |
| bool |
| IsResultMemoryIntesive(Result *res) |
| { |
| return contains_node_type(NULL, (Node *) ((Plan *) res)->targetlist, T_FuncExpr); |
| } |
| |
| /** |
| * Is an operator memory intensive? |
| */ |
| static bool |
| IsMemoryIntensiveOperator(Node *node, PlannedStmt *stmt) |
| { |
| Assert(is_plan_node(node)); |
| switch(nodeTag(node)) |
| { |
| case T_Material: |
| case T_Sort: |
| case T_ShareInputScan: |
| case T_Hash: |
| case T_BitmapIndexScan: |
| case T_FunctionScan: |
| case T_Window: |
| case T_TableFunctionScan: |
| return true; |
| case T_Agg: |
| { |
| Agg *agg = (Agg *) node; |
| return IsAggMemoryIntensive(agg); |
| } |
| case T_Result: |
| { |
| Result *res = (Result *) node; |
| return IsResultMemoryIntesive(res); |
| } |
| default: |
| return false; |
| } |
| } |
| |
| /** |
| * Calculate memory required for a parquet table scan |
| * |
| * node: Plan node |
| * stmt: Plan statement |
| * return: memory in byte that is reserved for parquet table scan |
| */ |
| static uint64 |
| MemoryReservedForParquetScan(Node *node, PlannedStmt *stmt, HTAB *parquetRootOids) |
| { |
| Assert(NULL != node); |
| Assert(NULL != stmt); |
| Assert(NULL != parquetRootOids); |
| |
| Oid rel_oid = getrelid(((Scan *) node)->scanrelid, stmt->rtable); |
| uint64 memoryReservedForParquetScan = 0; |
| |
| /* For dynamic table scan, we need to iterate through all the part tables and find out the |
| * parquet part table that has the maximum memory requirement. */ |
| if (IsA(node, DynamicTableScan)) |
| { |
| #if USE_ASSERT_CHECKING |
| Oid part_oid = |
| #endif |
| GetPartOidForParquetScan(node, rel_oid, &memoryReservedForParquetScan, true /* need remap */); |
| Assert(InvalidOid != part_oid); |
| } |
| /* Planner's plan for parquet table scan */ |
| else if(IsA(node, ParquetScan)) |
| { |
| Oid root_oid = rel_partition_get_master(rel_oid); |
| if (InvalidOid == root_oid) |
| { |
| /* it is not a partitioned table */ |
| List *scanned_columns_list = GetScannedColumnsForTable(node, rel_oid); |
| memoryReservedForParquetScan = memReservedForParquetScan(rel_oid, scanned_columns_list); |
| list_free(scanned_columns_list); |
| } |
| /* it is a partitioned table */ |
| else |
| { |
| /** |
| * GPSQL-2477: we should assign parquet required memory to one |
| * partitioned table scan for Planner's plan. |
| */ |
| bool foundPtr = false; |
| hash_search(parquetRootOids, &root_oid, HASH_ENTER, &foundPtr); |
| |
| if (foundPtr) |
| { |
| /* |
| * the root_oid is already in the hash table which means the |
| * parquet required memory has been assigned to some other ParquetScan. |
| * Assign 100K to this operator. |
| */ |
| memoryReservedForParquetScan = (uint64) gp_resqueue_memory_policy_auto_fixed_mem * 1024; |
| } |
| else |
| { |
| /* |
| * this is the first time we encounter ParquetScan for a partitioned parquet table. |
| * Assign the maximum memory required for this table to this operator. |
| */ |
| #if USE_ASSERT_CHECKING |
| Oid part_oid = |
| #endif |
| GetPartOidForParquetScan(node, root_oid, &memoryReservedForParquetScan, false /* no need remap */); |
| Assert(InvalidOid != part_oid); |
| } |
| } |
| } |
| else |
| { |
| List *scanned_columns_list = GetScannedColumnsForTable(node, rel_oid); |
| memoryReservedForParquetScan = memReservedForParquetScan(rel_oid, scanned_columns_list); |
| list_free(scanned_columns_list); |
| } |
| return memoryReservedForParquetScan; |
| } |
| |
| /** |
| * Calculate memory required for parquet table insert |
| * |
| * rel_oid: target table's Oid, it can be a root table of a partitioned table |
| * return: memory in byte that is reserved for parquet table insert |
| * It will return 0 if the target table is not a parquet table or |
| * no parquet part table exists |
| */ |
| static uint64 |
| MemoryReservedForParquetInsert(Oid rel_oid) |
| { |
| Assert(InvalidOid != rel_oid); |
| uint64 memoryReserved = 0; |
| /* partitioned case */ |
| if (rel_is_partitioned(rel_oid)) |
| { |
| GetPartOidForParquetInsert(rel_oid, &memoryReserved); |
| } |
| /* non-partition case, parquet table */ |
| else if(relstorage_is_aoparquet(get_rel_relstorage(rel_oid))) |
| { |
| memoryReserved = memReservedForParquetInsert(rel_oid); |
| } |
| |
| return memoryReserved; |
| } |
| |
| /** |
| * Is an operator parquet scan? |
| */ |
| static bool |
| IsParquetScanOperator(Node *node, PlannedStmt *stmt) |
| { |
| Assert(NULL != node); |
| Assert(NULL != stmt); |
| Assert(is_plan_node(node)); |
| switch(nodeTag(node)) |
| { |
| case T_ParquetScan: |
| return true; |
| case T_TableScan: |
| { |
| Oid rel_oid = getrelid(((Scan *) node)->scanrelid, stmt->rtable); |
| if (relstorage_is_aoparquet(get_rel_relstorage(rel_oid))) |
| { |
| return true; |
| } |
| else |
| { |
| return false; |
| } |
| } |
| case T_DynamicTableScan: |
| { |
| uint64 memRequired = 0; |
| Oid rel_oid = getrelid(((Scan *) node)->scanrelid, stmt->rtable); |
| if (InvalidOid == GetPartOidForParquetScan(node, rel_oid, &memRequired, true /* need remap */)) |
| { |
| return false; |
| } |
| else |
| { |
| return true; |
| } |
| } |
| default: |
| return false; |
| } |
| } |
| |
| /** |
| * Is an operator parquet Insert? |
| */ |
| static bool |
| IsParquetInsertOperator(Node *node, PlannedStmt *stmt) |
| { |
| Assert(NULL != node); |
| Assert(NULL != stmt); |
| Assert(is_plan_node(node)); |
| if (IsA(node, DML)) |
| { |
| if (CMD_INSERT == stmt->commandType) |
| { |
| Oid rel_oid = getrelid(((Scan *) node)->scanrelid, stmt->rtable); |
| /* partitioned case */ |
| if (rel_is_partitioned(rel_oid)) |
| { |
| uint64 memRequired = 0; |
| if (InvalidOid == GetPartOidForParquetInsert(rel_oid, &memRequired)) |
| { |
| return false; |
| } |
| else |
| { |
| return true; |
| } |
| } |
| /* non-partition case, parquet table */ |
| else if (relstorage_is_aoparquet(get_rel_relstorage(rel_oid))) |
| { |
| return true; |
| } |
| else |
| { |
| return false; |
| } |
| } |
| else |
| { |
| return false; |
| } |
| } |
| else |
| { |
| return false; |
| } |
| } |
| |
| /** |
| * Compute memory required for Parquet table insert if the plan |
| * is produced by Planner. Because unlike ORCA, Planner does not put |
| * insert in its plan nodes. We collect memory by traversing |
| * the resultRelations and sum up the memory needed for each relation. |
| * |
| * stmt: Plan statement |
| * return: memory in byte that is reserved for parquet table insert |
| * It will return 0 if no parquet table exists. |
| */ |
| static uint64 |
| MemoryReservedForParquetInsertForPlannerPlan(PlannedStmt *stmt) |
| { |
| Assert(PLANGEN_PLANNER == stmt->planGen); |
| Assert(CMD_INSERT == stmt->commandType); |
| uint64 memRequired = 0; |
| ListCell *lc = NULL; |
| foreach(lc, stmt->resultRelations) |
| { |
| Oid rel_oid = getrelid(lfirst_int(lc), stmt->rtable); |
| memRequired += MemoryReservedForParquetInsert(rel_oid); |
| } |
| return memRequired; |
| } |
| |
| /* |
| * Get list of columns need to be scanned for a parquet table |
| * |
| * The return list is palloc-ed in the current memory context, and the caller |
| * is required to free it. |
| * |
| * node: Plan node |
| * rel_oid: target table's Oid |
| * return: List of columns that need to be scanned |
| */ |
| static List* |
| GetScannedColumnsForTable(Node *node, Oid rel_oid) |
| { |
| Plan *plan = (Plan *) &((Scan *) node)->plan; |
| int num_col = get_relnatts(rel_oid); |
| bool *proj = (bool *)palloc0(sizeof(bool) * num_col); |
| GetNeededColumnsForScan((Node *)plan->targetlist, proj, num_col); |
| GetNeededColumnsForScan((Node *)plan->qual, proj, num_col); |
| List *result = NIL; |
| for(int col_no = 0; col_no < num_col; col_no++) |
| { |
| if(proj[col_no]) |
| { |
| result = lappend_int(result, col_no + 1); |
| } |
| } |
| pfree(proj); |
| |
| /* In some cases (e.g.: count(*)), no column is specified. |
| * We always scan the first column. |
| */ |
| if (0 == list_length(result)) |
| { |
| result = lappend_int(result, 1); |
| } |
| return result; |
| } |
| |
| /* |
| * Iterate through all part tables and try to find the parquet part table that |
| * has the maximum memory requirement. |
| * |
| * Return InvalidOid if no parquet part table exists. |
| * |
| * node: Plan node |
| * root_oid: Oid of root table |
| * maxMemRequired: Output argument for memory in byte that is reserved for the part table scan |
| * |
| */ |
| static Oid GetPartOidForParquetScan(Node *node, Oid rootOid, uint64 *maxMemRequired, bool needRemap) |
| { |
| Assert(NULL != maxMemRequired); |
| Assert(rel_is_partitioned(rootOid)); |
| Oid maxRowgroupsizePartTableOid = InvalidOid; |
| *maxMemRequired = 0; |
| |
| PartitionNode *pn = get_parts(rootOid, 0 /* level */, 0 /* parent */, false /* inctemplate */, |
| CurrentMemoryContext, true /*include_subparts*/); |
| Assert(pn); |
| |
| List *scanned_columns_list = GetScannedColumnsForTable(node, rootOid); |
| |
| /* |
| * If we have dropped columns in root table, we need to remap |
| * the attribute numbers for part tables to get accurate columns |
| * to be scanned. |
| */ |
| Relation rootRel = heap_open(rootOid, AccessShareLock); |
| TupleDesc rootTupDesc = rootRel->rd_att; |
| |
| List *lRelOids = all_partition_relids(pn); |
| ListCell *lc_part_oid = NULL; |
| foreach (lc_part_oid, lRelOids) |
| { |
| Oid part_oid = lfirst_oid(lc_part_oid); |
| if (relstorage_is_aoparquet(get_rel_relstorage(part_oid))) |
| { |
| /* remap scanned columns for part table if needed */ |
| Relation partRel = heap_open(part_oid, AccessShareLock); |
| TupleDesc partTupDesc = partRel->rd_att; |
| AttrNumber *attMap = varattnos_map(rootTupDesc, partTupDesc); |
| List *remap_scanned_columns_list = NIL; |
| if (attMap && needRemap) |
| { |
| ListCell *cell = NULL; |
| foreach(cell, scanned_columns_list) |
| { |
| AttrNumber remap_att_num = attMap[lfirst_int(cell)-1]; |
| remap_scanned_columns_list = lappend_int(remap_scanned_columns_list, remap_att_num); |
| } |
| } |
| /* no mapping needed */ |
| else |
| { |
| remap_scanned_columns_list = scanned_columns_list; |
| } |
| heap_close(partRel, AccessShareLock); |
| |
| uint64 memRequired = memReservedForParquetScan(part_oid, remap_scanned_columns_list); |
| if (memRequired > *maxMemRequired) |
| { |
| *maxMemRequired = memRequired; |
| maxRowgroupsizePartTableOid = part_oid; |
| } |
| |
| /* clean up */ |
| if (attMap && needRemap) |
| { |
| list_free(remap_scanned_columns_list); |
| pfree(attMap); |
| } |
| } |
| } |
| heap_close(rootRel, AccessShareLock); |
| list_free(scanned_columns_list); |
| |
| return maxRowgroupsizePartTableOid; |
| } |
| |
| /* |
| * Iterate through all part tables and try to find the parquet part table that |
| * has the maximum memory requirement for insert. |
| * |
| * Return InvalidOid if no parquet part table exists. |
| * |
| * root_oid: Oid of root table |
| * maxMemRequired: Output argument for memory in byte that is reserved for the part table insert |
| * |
| */ |
| static Oid |
| GetPartOidForParquetInsert(Oid root_oid, uint64 *maxMemRequired) |
| { |
| Assert(NULL != maxMemRequired); |
| Assert(rel_is_partitioned(root_oid)); |
| Oid maxRowgroupsizePartTableOid = InvalidOid; |
| *maxMemRequired = 0; |
| |
| PartitionNode *pn = get_parts(root_oid, 0 /* level */, 0 /* parent */, false /* inctemplate */, |
| CurrentMemoryContext, true /*include_subparts*/); |
| Assert(pn); |
| |
| List *lRelOids = all_partition_relids(pn); |
| ListCell *lc_part_oid = NULL; |
| foreach (lc_part_oid, lRelOids) |
| { |
| Oid part_oid = lfirst_oid(lc_part_oid); |
| if (relstorage_is_aoparquet(get_rel_relstorage(part_oid))) |
| { |
| uint64 memRequired = memReservedForParquetInsert(part_oid); |
| if (memRequired > *maxMemRequired) |
| { |
| *maxMemRequired = memRequired; |
| maxRowgroupsizePartTableOid = part_oid; |
| } |
| } |
| } |
| return maxRowgroupsizePartTableOid; |
| } |
| |
| /* |
| * IsRootOperatorInGroup |
| * Return true if the given node is the root operator in an operator group. |
| * |
| * A node can be a root operator in a group if it satisfies the following three |
| * conditions: |
| * (1) a Plan node. |
| * (2) a Blocking operator. |
| * (3) not rescan required (no external parameters). |
| */ |
| static bool |
| IsRootOperatorInGroup(Node *node) |
| { |
| return (is_plan_node(node) && IsBlockingOperator(node) && ((Plan *)(node))->extParam == NULL); |
| } |
| |
| /** |
| * This walker counts the number of memory intensive and non-memory intensive operators |
| * in a plan. |
| */ |
| |
| static bool PrelimWalker(Node *node, MemQuotaContext *context) |
| { |
| if (node == NULL) |
| { |
| return false; |
| } |
| |
| Assert(node); |
| Assert(context); |
| if (is_plan_node(node)) |
| { |
| if (IsParquetScanOperator(node, context->plannedStmt)) |
| { |
| if (NULL == context->parquetRootOids) |
| { |
| context->parquetRootOids = createOidHTAB(); |
| } |
| uint64 memResevedForParquetScan = MemoryReservedForParquetScan(node, context->plannedStmt, context->parquetRootOids); |
| uint64 memResevedForParquetScanKB = (uint64) ( (double) memResevedForParquetScan / 1024); |
| context->parquetOpReservedMemKB += memResevedForParquetScanKB; |
| /* Assign the memory required to the operator so we do not need to compute it again */ |
| Plan *planNode = (Plan *) node; |
| planNode->operatorMemKB = memResevedForParquetScanKB; |
| } |
| else if (IsParquetInsertOperator(node, context->plannedStmt)) |
| { |
| /* get memory required for parquet table insert */ |
| Oid rel_oid = getrelid(((Scan *) node)->scanrelid, context->plannedStmt->rtable); |
| uint64 memResevedForParquetInsert = MemoryReservedForParquetInsert(rel_oid); |
| uint64 memResevedForParquetInsertKB = (uint64) ( (double) memResevedForParquetInsert / 1024); |
| context->parquetOpReservedMemKB += memResevedForParquetInsertKB; |
| /* Assign the memory required to the operator so we do not need to compute it again */ |
| Plan *planNode = (Plan *) node; |
| planNode->operatorMemKB = memResevedForParquetInsertKB; |
| } |
| else if (IsMemoryIntensiveOperator(node, context->plannedStmt)) |
| { |
| context->numMemIntensiveOperators++; |
| } |
| else |
| { |
| context->numNonMemIntensiveOperators++; |
| } |
| } |
| return plan_tree_walker(node, PrelimWalker, context); |
| } |
| |
| |
| /** |
| * What should be query mem such that memory intensive operators get a certain |
| * minimum amount of memory. Return value is in KB. |
| */ |
| uint64 StatementMemForNoSpillKB(PlannedStmt *stmt, uint64 minOperatorMemKB) |
| { |
| Assert(stmt); |
| Assert(minOperatorMemKB > 0); |
| |
| const uint64 nonMemIntenseOpMemKB = (uint64) gp_resqueue_memory_policy_auto_fixed_mem; |
| |
| MemQuotaContext ctx; |
| exec_init_plan_tree_base(&ctx.base, stmt); |
| ctx.queryMemKB = (uint64) (stmt->query_mem / 1024); |
| ctx.numMemIntensiveOperators = 0; |
| ctx.numNonMemIntensiveOperators = 0; |
| ctx.parquetOpReservedMemKB = 0; |
| ctx.plannedStmt = stmt; |
| ctx.parquetRootOids = NULL; |
| |
| #ifdef USE_ASSERT_CHECKING |
| bool result = |
| #endif |
| PrelimWalker((Node *) stmt->planTree, &ctx); |
| |
| Assert(!result); |
| Assert(ctx.numMemIntensiveOperators + ctx.numNonMemIntensiveOperators > 0); |
| |
| /** |
| * Right now, the inverse is straightforward. |
| * TODO: Siva - employ binary search to find the right value. |
| */ |
| uint64 requiredStatementMemKB = ctx.numNonMemIntensiveOperators * nonMemIntenseOpMemKB |
| + ctx.numMemIntensiveOperators * minOperatorMemKB |
| + ctx.parquetOpReservedMemKB; |
| |
| return requiredStatementMemKB; |
| } |
| |
| /* |
| * CreateOperatorGroup |
| * create a new operator group with a specified id. |
| */ |
| static OperatorGroupNode * |
| CreateOperatorGroupNode(uint32 groupId, OperatorGroupNode *parentGroup) |
| { |
| OperatorGroupNode *node = palloc0(sizeof(OperatorGroupNode)); |
| node->groupId = groupId; |
| node->parentGroup = parentGroup; |
| node->parquetOpReservedMemKB = 0; |
| node->parquetRootOids = NULL; |
| |
| return node; |
| } |
| |
| /* |
| * IncrementOperatorCount |
| * Increment the count of operators in the current group based |
| * on the type of the operator. |
| */ |
| static void |
| IncrementOperatorCount(Node *node, OperatorGroupNode *groupNode, PlannedStmt *stmt) |
| { |
| Assert(node != NULL); |
| Assert(groupNode != NULL); |
| |
| if (IsParquetScanOperator(node, stmt)) |
| { |
| if (NULL == groupNode->parquetRootOids) |
| { |
| groupNode->parquetRootOids = createOidHTAB(); |
| } |
| uint64 memResevedForParquetScan = MemoryReservedForParquetScan(node, stmt, groupNode->parquetRootOids); |
| uint64 memResevedForParquetScanKB = (uint64) ((double) memResevedForParquetScan / 1024); |
| groupNode->parquetOpReservedMemKB += memResevedForParquetScanKB; |
| /* Assign the memory required to the operator so we do not need to compute it again */ |
| Plan *planNode = (Plan *) node; |
| planNode->operatorMemKB = memResevedForParquetScanKB; |
| |
| } |
| else if (IsParquetInsertOperator(node, stmt)) |
| { |
| /* get memory required for parquet table insert */ |
| Oid rel_oid = getrelid(((Scan *) node)->scanrelid, stmt->rtable); |
| uint64 memResevedForParquetInsert = MemoryReservedForParquetInsert(rel_oid); |
| uint64 memResevedForParquetInsertKB = (uint64) ( (double) memResevedForParquetInsert / 1024); |
| groupNode->parquetOpReservedMemKB += memResevedForParquetInsertKB; |
| /* Assign the memory required to the operator so we do not need to compute it again */ |
| Plan *planNode = (Plan *) node; |
| planNode->operatorMemKB = memResevedForParquetInsertKB; |
| } |
| else if (IsMemoryIntensiveOperator(node, stmt)) |
| { |
| groupNode->numMemIntenseOps++; |
| } |
| else |
| { |
| groupNode->numNonMemIntenseOps++; |
| } |
| } |
| |
| /* |
| * GetParentOperatorNode |
| * Return the parent operator group for a given group. |
| */ |
| static OperatorGroupNode * |
| GetParentOperatorGroup(OperatorGroupNode *groupNode) |
| { |
| Assert(groupNode != NULL); |
| return groupNode->parentGroup; |
| } |
| |
| |
| /* |
| * CreateOperatorGroupForOperator |
| * create an operator group for a given operator node if the given operator node |
| * is a potential root of an operator group. |
| */ |
| static OperatorGroupNode * |
| CreateOperatorGroupForOperator(Node *node, |
| PolicyEagerFreeContext *context) |
| { |
| Assert(is_plan_node(node)); |
| |
| OperatorGroupNode *groupNode = context->groupNode; |
| |
| /* |
| * If the group tree has not been built, we create the first operator |
| * group here. |
| */ |
| if (context->groupTree == NULL) |
| { |
| groupNode = CreateOperatorGroupNode(context->nextGroupId, NULL); |
| Assert(groupNode != NULL); |
| |
| context->groupTree = groupNode; |
| context->groupTree->groupMemKB = context->queryMemKB; |
| context->nextGroupId++; |
| } |
| /* |
| * If this node is a potential root of an operator group, this means that |
| * the current group ends, and a new group starts. we create a new operator |
| * group. |
| */ |
| else if (IsRootOperatorInGroup(node)) |
| { |
| Assert(groupNode != NULL); |
| |
| OperatorGroupNode *parentGroupNode = groupNode; |
| groupNode = CreateOperatorGroupNode(context->nextGroupId, groupNode); |
| if (parentGroupNode != NULL) |
| { |
| parentGroupNode->childGroups = lappend(parentGroupNode->childGroups, groupNode); |
| } |
| context->nextGroupId++; |
| } |
| |
| return groupNode; |
| } |
| |
| /* |
| * FindOperatorGroupForOperator |
| * find the operator group for a given operator node that is the root operator |
| * of the returning group. |
| */ |
| static OperatorGroupNode * |
| FindOperatorGroupForOperator(Node *node, |
| PolicyEagerFreeContext *context) |
| { |
| Assert(is_plan_node(node)); |
| Assert(context->groupTree != NULL); |
| |
| OperatorGroupNode *groupNode = context->groupNode; |
| |
| /* |
| * If this is the beginning of the walk (or the current group is NULL), |
| * this operator node belongs to the first operator group. |
| */ |
| if (groupNode == NULL) |
| { |
| groupNode = context->groupTree; |
| context->nextGroupId++; |
| } |
| |
| /* |
| * If this operator is a potential root of an operator group, this means |
| * the current group ends, and a new group start. We find the group that |
| * has this node as its root. |
| */ |
| else if (IsRootOperatorInGroup(node)) |
| { |
| Assert(context->groupNode != NULL); |
| |
| ListCell *lc = NULL; |
| OperatorGroupNode *childGroup = NULL; |
| foreach(lc, context->groupNode->childGroups) |
| { |
| childGroup = (OperatorGroupNode *)lfirst(lc); |
| if (childGroup->groupId == context->nextGroupId) |
| { |
| break; |
| } |
| } |
| |
| Assert(childGroup != NULL); |
| groupNode = childGroup; |
| context->nextGroupId++; |
| } |
| |
| return groupNode; |
| } |
| |
| #if 0 |
| /* |
| * FindOperatorGroupForNode |
| * find the operator group for a given node. |
| * |
| * If this is during prelim walker phase, this function creates such a group |
| * if it does not exist. Otherwise, this function finds such a group in |
| * the group tree. |
| * |
| * This function also returns the parentGroupNode pointer, and a boolean |
| * indicating if the given node is the first node in the group. |
| */ |
| static OperatorGroupNode * |
| FindOperatorGroupForNode(Node *node, |
| PolicyEagerFreeContext *context, |
| bool isPrelimWalker, |
| OperatorGroupNode **parentGroupNodeP, |
| bool *isFirstInGroupP) |
| { |
| Assert(is_plan_node(node)); |
| |
| OperatorGroupNode *groupNode = context->groupNode; |
| |
| if (context->groupNode == NULL) |
| { |
| if (isPrelimWalker) |
| { |
| Assert(context->groupTree == NULL); |
| context->groupTree = CreateOperatorGroupNode(context->nextGroupId, NULL); |
| groupNode = context->groupTree; |
| |
| /* The memory limit for the first group is the query memory limit. */ |
| groupNode->groupMemKB = context->queryMemKB; |
| } |
| else |
| { |
| Assert(context->groupTree != NULL); |
| groupNode = context->groupTree; |
| Assert(groupNode->groupMemKB > 0); |
| } |
| |
| context->nextGroupId++; |
| *isFirstInGroupP = true; |
| } |
| |
| /* |
| * If this node is a blocking operator and this node does not contain an extParam, |
| * it means that the current group ends, and a new group starts. |
| */ |
| if (!*isFirstInGroupP && |
| IsRootOperatorInGroup(node)) |
| { |
| Assert(context->groupNode != NULL); |
| if (isPrelimWalker) |
| { |
| groupNode = CreateOperatorGroupNode(context->nextGroupId, context->groupNode); |
| context->groupNode->childGroups = lappend(context->groupNode->childGroups, groupNode); |
| } |
| else |
| { |
| ListCell *lc = NULL; |
| OperatorGroupNode *childGroup = NULL; |
| foreach(lc, context->groupNode->childGroups) |
| { |
| childGroup = (OperatorGroupNode *)lfirst(lc); |
| if (childGroup->groupId == context->nextGroupId) |
| { |
| break; |
| } |
| } |
| |
| Assert(childGroup != NULL); |
| groupNode = childGroup; |
| } |
| |
| context->nextGroupId++; |
| *isFirstInGroupP = true; |
| *parentGroupNodeP = context->groupNode; |
| } |
| |
| return groupNode; |
| } |
| #endif |
| |
| /* |
| * ComputeAvgMemKBForMemIntenseOp |
| * Compute the average memory limit for each memory-intensive operators |
| * in a given group. |
| * |
| * If there is no memory-intensive operators in this group, return 0. |
| */ |
| static uint64 |
| ComputeAvgMemKBForMemIntenseOp(OperatorGroupNode *groupNode) |
| { |
| if (groupNode->numMemIntenseOps == 0) |
| { |
| return 0; |
| } |
| |
| const uint64 nonMemIntenseOpMemKB = (uint64)gp_resqueue_memory_policy_auto_fixed_mem; |
| |
| return (((double)groupNode->groupMemKB |
| - (double)groupNode->numNonMemIntenseOps * nonMemIntenseOpMemKB |
| - (double) groupNode->parquetOpReservedMemKB ) / |
| groupNode->numMemIntenseOps); |
| } |
| |
| /* |
| * ComputeMemLimitForChildGroups |
| * compute the query memory limit for all child groups of a given |
| * parent group if it has not been computed before. |
| */ |
| static void |
| ComputeMemLimitForChildGroups(OperatorGroupNode *parentGroupNode) |
| { |
| Assert(parentGroupNode != NULL); |
| |
| uint64 totalNumMemIntenseOps = 0; |
| uint64 totalNumNonMemIntenseOps = 0; |
| |
| ListCell *lc; |
| foreach(lc, parentGroupNode->childGroups) |
| { |
| OperatorGroupNode *childGroup = (OperatorGroupNode *)lfirst(lc); |
| |
| /* |
| * If the memory limit has been computed, then we are done. |
| */ |
| if (childGroup->groupMemKB != 0) |
| { |
| return; |
| } |
| |
| totalNumMemIntenseOps += |
| Max(childGroup->maxNumConcMemIntenseOps, childGroup->numMemIntenseOps); |
| totalNumNonMemIntenseOps += |
| Max(childGroup->maxNumConcNonMemIntenseOps, childGroup->numNonMemIntenseOps); |
| } |
| |
| const uint64 nonMemIntenseOpMemKB = (uint64)gp_resqueue_memory_policy_auto_fixed_mem; |
| |
| foreach(lc, parentGroupNode->childGroups) |
| { |
| OperatorGroupNode *childGroup = (OperatorGroupNode *)lfirst(lc); |
| |
| Assert(childGroup->groupMemKB == 0); |
| |
| if (parentGroupNode->groupMemKB < totalNumNonMemIntenseOps * nonMemIntenseOpMemKB |
| + parentGroupNode->parquetOpReservedMemKB) |
| { |
| if (parentGroupNode->parquetOpReservedMemKB > 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| errmsg("insufficient memory reserved for statement"), |
| errhint("Increase statement memory or reduce the number of Parquet tables to be scanned."))); |
| } |
| else |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| errmsg("insufficient memory reserved for statement"))); |
| } |
| |
| } |
| |
| double memIntenseOpMemKB = 0; |
| if (totalNumMemIntenseOps > 0) |
| { |
| memIntenseOpMemKB = |
| ((double)(parentGroupNode->groupMemKB |
| - totalNumNonMemIntenseOps * nonMemIntenseOpMemKB |
| - parentGroupNode->parquetOpReservedMemKB)) / |
| ((double)totalNumMemIntenseOps); |
| } |
| |
| Assert(parentGroupNode->groupMemKB > 0); |
| |
| /* |
| * MPP-23130 |
| * In memory policy eager free, scaleFactor is used to balance memory allocated to |
| * each child group based on the number of memory-intensive and non-memory-intensive |
| * operators they have. The calculation of scaleFactor is as follows: |
| * scaleFactor = (memIntensiveOpMem * |
| * Max(childGroup->maxNumConcMemIntenseOps, childGroup->numMemIntenseOps) |
| * + nonMemIntenseOpMemKB * |
| * Max(childGroup->maxNumConcNonMemIntenseOps, childGroup->numNonMemIntenseOps)) |
| * / parentGroupNode->groupMemKB |
| * Child group's memory: childGroup->groupMemKB = scaleFactor * parentGroupNode->groupMemKB, |
| * which is the denominator of the scaleFactor formula. |
| */ |
| childGroup->groupMemKB = (uint64) (memIntenseOpMemKB * |
| Max(childGroup->maxNumConcMemIntenseOps, childGroup->numMemIntenseOps) + |
| nonMemIntenseOpMemKB * |
| Max(childGroup->maxNumConcNonMemIntenseOps, childGroup->numNonMemIntenseOps) + |
| childGroup->parquetOpReservedMemKB); |
| } |
| } |
| |
| /* |
| * PolicyEagerFreePrelimWalker |
| * Walk the plan tree to build a group tree by dividing the plan tree |
| * into several groups, each of which has a block operator as its border |
| * node (except for the leaves of the leave groups). At the same time, |
| * we collect some stats information about operators in each group. |
| */ |
| static bool |
| PolicyEagerFreePrelimWalker(Node *node, PolicyEagerFreeContext *context) |
| { |
| if (node == NULL) |
| { |
| return false; |
| } |
| |
| Assert(node); |
| Assert(context); |
| |
| OperatorGroupNode *parentGroupNode = NULL; |
| bool isTopPlanNode = false; |
| |
| if (is_plan_node(node)) |
| { |
| if (context->groupTree == NULL) |
| { |
| isTopPlanNode = true; |
| } |
| |
| context->groupNode = CreateOperatorGroupForOperator(node, context); |
| Assert(context->groupNode != NULL); |
| |
| IncrementOperatorCount(node, context->groupNode, context->plannedStmt); |
| |
| /* |
| * We also increment the parent group's counter if this node |
| * is the root node in a new group. |
| */ |
| parentGroupNode = GetParentOperatorGroup(context->groupNode); |
| if (IsRootOperatorInGroup(node) && parentGroupNode != NULL) |
| { |
| IncrementOperatorCount(node, parentGroupNode, context->plannedStmt); |
| } |
| } |
| |
| bool result = plan_tree_walker(node, PolicyEagerFreePrelimWalker, context); |
| Assert(!result); |
| |
| /* |
| * If this node is the top nodoe in a group, at this point, we should have all info about |
| * its child groups. We then calculate the maximum number of potential concurrently |
| * active memory-intensive operators and non-memory-intensive operators in all |
| * child groups. |
| */ |
| if (isTopPlanNode || IsRootOperatorInGroup(node)) |
| { |
| Assert(context->groupNode != NULL); |
| |
| uint64 maxNumConcNonMemIntenseOps = 0; |
| uint64 maxNumConcMemIntenseOps = 0; |
| uint64 childrenParquetOpReservedMem = 0; |
| ListCell *lc; |
| foreach(lc, context->groupNode->childGroups) |
| { |
| OperatorGroupNode *childGroup = (OperatorGroupNode *)lfirst(lc); |
| maxNumConcNonMemIntenseOps += |
| Max(childGroup->maxNumConcNonMemIntenseOps, childGroup->numNonMemIntenseOps); |
| maxNumConcMemIntenseOps += |
| Max(childGroup->maxNumConcMemIntenseOps, childGroup->numMemIntenseOps); |
| childrenParquetOpReservedMem += childGroup->parquetOpReservedMemKB; |
| } |
| |
| Assert(context->groupNode->maxNumConcNonMemIntenseOps == 0 && |
| context->groupNode->maxNumConcMemIntenseOps == 0); |
| context->groupNode->maxNumConcNonMemIntenseOps = maxNumConcNonMemIntenseOps; |
| context->groupNode->maxNumConcMemIntenseOps = maxNumConcMemIntenseOps; |
| context->groupNode->parquetOpReservedMemKB += childrenParquetOpReservedMem; |
| /* Reset the groupNode to point to its parentGroupNode */ |
| context->groupNode = GetParentOperatorGroup(context->groupNode); |
| } |
| |
| return result; |
| } |
| |
| /* |
| * PolicyEagerFreeAssignWalker |
| * Walk the plan tree and assign the memory to each plan node. |
| */ |
| static bool |
| PolicyEagerFreeAssignWalker(Node *node, PolicyEagerFreeContext *context) |
| { |
| if (node == NULL) |
| { |
| return false; |
| } |
| |
| Assert(node); |
| Assert(context); |
| |
| const uint64 nonMemIntenseOpMemKB = (uint64)gp_resqueue_memory_policy_auto_fixed_mem; |
| |
| if (is_plan_node(node)) |
| { |
| Plan *planNode = (Plan *)node; |
| |
| context->groupNode = FindOperatorGroupForOperator(node, context); |
| Assert(context->groupNode != NULL); |
| |
| /* |
| * If this is the root node in a group, compute the new query limit for |
| * all child groups of the parent group. |
| */ |
| if (IsRootOperatorInGroup(node) && |
| GetParentOperatorGroup(context->groupNode) != NULL) |
| { |
| ComputeMemLimitForChildGroups(GetParentOperatorGroup(context->groupNode)); |
| } |
| |
| if (IsParquetScanOperator(node, context->plannedStmt) || IsParquetInsertOperator(node, context->plannedStmt)) |
| { |
| /* do nothing as we already assigned the memory to the operator */ |
| } |
| else if (!IsMemoryIntensiveOperator(node, context->plannedStmt)) |
| { |
| planNode->operatorMemKB = nonMemIntenseOpMemKB; |
| } |
| else |
| { |
| /* |
| * Evenly distribute the remaining memory among all memory-intensive |
| * operators. |
| */ |
| uint64 memKB = ComputeAvgMemKBForMemIntenseOp(context->groupNode); |
| |
| Assert(planNode->operatorMemKB == 0); |
| |
| planNode->operatorMemKB = memKB; |
| |
| OperatorGroupNode *parentGroupNode = GetParentOperatorGroup(context->groupNode); |
| |
| /* |
| * If this is the root node in the group, we also calculate the memory |
| * for this node as it appears in the parent group. The final memory limit |
| * for this node is the minimal value of the two. |
| */ |
| if (IsRootOperatorInGroup(node) && |
| parentGroupNode != NULL) |
| { |
| uint64 memKBInParentGroup = ComputeAvgMemKBForMemIntenseOp(parentGroupNode); |
| |
| if (memKBInParentGroup < planNode->operatorMemKB) |
| { |
| planNode->operatorMemKB = memKBInParentGroup; |
| } |
| } |
| } |
| } |
| |
| bool result = plan_tree_walker(node, PolicyEagerFreeAssignWalker, context); |
| Assert(!result); |
| |
| /* |
| * If this node is the root in a group, we reset some values in the context. |
| */ |
| if (IsRootOperatorInGroup(node)) |
| { |
| Assert(context->groupNode != NULL); |
| context->groupNode = GetParentOperatorGroup(context->groupNode); |
| } |
| |
| return result; |
| } |
| |
| /* |
| * AssignOperatorMemoryKB |
| * Main entry point for memory quota OPTIMIZE. This function distributes the memory |
| * among all operators in a more optimized way than the AUTO policy. |
| * |
| * This function considers not all memory-intensive operators will be active concurrently, |
| * and distributes the memory accordingly. |
| */ |
| void |
| AssignOperatorMemoryKB(PlannedStmt *stmt, uint64 memAvailableBytes) |
| { |
| PolicyEagerFreeContext ctx; |
| exec_init_plan_tree_base(&ctx.base, stmt); |
| ctx.groupTree = NULL; |
| ctx.groupNode = NULL; |
| ctx.nextGroupId = 0; |
| ctx.queryMemKB = memAvailableBytes / 1024; |
| ctx.plannedStmt = stmt; |
| |
| /* If it is a planner's plan for parquet insert, we need to reserve |
| * the memory before traverse the plan nodes. Because unlike ORCA, |
| * insert is not in the plan nodes. So we compute the memory required |
| * for parquet table insert and subtract it from the available query memory. |
| */ |
| if (PLANGEN_PLANNER == stmt->planGen && CMD_INSERT == stmt->commandType) |
| { |
| uint64 memRequiredForParquetInsert = MemoryReservedForParquetInsertForPlannerPlan(stmt); |
| uint64 memRequiredForParquetInsertKB = (uint64) ( (double) memRequiredForParquetInsert / 1024); |
| if (ctx.queryMemKB <= memRequiredForParquetInsertKB) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| errmsg("insufficient memory reserved for statement"), |
| errhint("Increase statement memory for parquet table insert."))); |
| } |
| ctx.queryMemKB -= memRequiredForParquetInsertKB; |
| } |
| |
| #ifdef USE_ASSERT_CHECKING |
| bool result = |
| #endif |
| PolicyEagerFreePrelimWalker((Node *) stmt->planTree, &ctx); |
| |
| Assert(!result); |
| |
| /* |
| * Reset groupNode and nextGroupId so that we can start from the |
| * beginning of the group tree. |
| */ |
| ctx.groupNode = NULL; |
| ctx.nextGroupId = 0; |
| |
| /* |
| * Check if memory exceeds the limit in the root group |
| */ |
| const uint64 nonMemIntenseOpMemKB = (uint64)gp_resqueue_memory_policy_auto_fixed_mem; |
| if (ctx.groupTree->groupMemKB < ctx.groupTree->numNonMemIntenseOps * nonMemIntenseOpMemKB |
| + ctx.groupTree->parquetOpReservedMemKB) |
| { |
| if(ctx.groupTree->parquetOpReservedMemKB > 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| errmsg("insufficient memory reserved for statement"), |
| errhint("Increase statement memory or reduce the number of Parquet tables to be scanned."))); |
| } |
| else |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| errmsg("insufficient memory reserved for statement"))); |
| } |
| } |
| |
| #ifdef USE_ASSERT_CHECKING |
| result = |
| #endif |
| PolicyEagerFreeAssignWalker((Node *) stmt->planTree, &ctx); |
| |
| Assert(!result); |
| } |
| |
| |
| |