| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * workfile_mgr.c |
| * Implementation of workfile manager and workfile caching. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include <postgres.h> |
| #include <unistd.h> |
| #include <sys/stat.h> |
| |
| #include "catalog/pg_proc.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/cdbsrlz.h" |
| #include "libpq/libpq.h" |
| #include "miscadmin.h" |
| #include "nodes/print.h" |
| #include "optimizer/walkers.h" |
| #include "postmaster/primary_mirror_mode.h" |
| #include "utils/atomic.h" |
| #include "utils/builtins.h" |
| #include "utils/debugbreak.h" |
| #include "utils/faultinjector.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/workfile_mgr.h" |
| |
| #define WORKFILE_SET_MASK "XXXXXXXXXX" |
| |
| /* Type of temp file to use for storing the plan */ |
| #define WORKFILE_PLAN_FILE_TYPE BUFFILE |
| |
| /* Information needed to populate a new workfile_set structure */ |
| typedef struct workset_info |
| { |
| ExecWorkFileType file_type; |
| workfile_set_snapshot snapshot; |
| NodeTag nodeType; |
| TimestampTz session_start_time; |
| uint64 operator_work_mem; |
| char *dir_path; |
| bool can_be_reused; |
| bool on_disk; |
| } workset_info; |
| |
| /* Forward declarations */ |
| static workfile_set *workfile_mgr_lookup_set(PlanState *ps); |
| static bool workfile_mgr_is_reusable(PlanState *ps); |
| static bool workfile_mgr_is_cacheable_plan(PlanState *ps); |
| static workfile_set_hashkey_t workfile_mgr_hash_key(workfile_set_plan *plan); |
| static workfile_set_plan *workfile_mgr_serialize_plan(PlanState *ps); |
| static void workfile_mgr_free_plan(workfile_set_plan *splan); |
| static void workfile_mgr_save_plan(workfile_set *work_set, workfile_set_plan *sf_plan); |
| static bool workfile_set_equivalent(const void *virtual_resource, const void *physical_resource); |
| static bool workfile_mgr_compare_plan(workfile_set *work_set, workfile_set_plan *sf_plan); |
| static void workfile_mgr_populate_set(const void *resource, const void *param); |
| static void workfile_mgr_cleanup_set(const void *resource); |
| static void workfile_mgr_delete_set_directory(char *workset_path); |
| static void workfile_mgr_unlink_directory(const char *dirpath); |
| static StringInfo get_name_from_nodeType(const NodeTag node_type); |
| static uint64 get_operator_work_mem(PlanState *ps); |
| static CdbVisitOpt PlanNonCacheableWalker(PlanState *ps, void *context); |
| static bool ExprNonCacheableWalker(Node *expr, void *ctx); |
| static bool isFuncCacheable(Oid fn_oid); |
| static CacheEntry *acquire_entry_retry(Cache *cache, workset_info *populate_param); |
| static char *create_workset_directory(NodeTag node_type, int slice_id); |
| |
| |
| /* Workfile manager cache is stored here, once attached to */ |
| Cache *workfile_mgr_cache = NULL; |
| |
| /* Workfile error type */ |
| WorkfileError workfileError = WORKFILE_ERROR_UNKNOWN; |
| |
| /* |
| * Initialize the cache in shared memory, or attach to an existing one |
| * |
| */ |
| void |
| workfile_mgr_cache_init(void) |
| { |
| CacheCtl cacheCtl; |
| MemSet(&cacheCtl, 0, sizeof(CacheCtl)); |
| |
| cacheCtl.maxSize = gp_workfile_max_entries; |
| cacheCtl.cacheName = "Workfile Manager Cache"; |
| cacheCtl.entrySize = sizeof(workfile_set); |
| cacheCtl.keySize = sizeof(((workfile_set *)0)->key); |
| cacheCtl.keyOffset = GPDB_OFFSET(workfile_set, key); |
| |
| cacheCtl.hash = int32_hash; |
| cacheCtl.keyCopy = (HashCopyFunc) memcpy; |
| cacheCtl.match = (HashCompareFunc) memcmp; |
| cacheCtl.equivalentEntries = workfile_set_equivalent; |
| cacheCtl.cleanupEntry = workfile_mgr_cleanup_set; |
| cacheCtl.populateEntry = workfile_mgr_populate_set; |
| |
| cacheCtl.baseLWLockId = FirstWorkfileMgrLock; |
| cacheCtl.numPartitions = NUM_WORKFILEMGR_PARTITIONS; |
| |
| workfile_mgr_cache = Cache_Create(&cacheCtl); |
| Assert(NULL != workfile_mgr_cache); |
| |
| /* |
| * Initialize the WorkfileDiskspace and WorkfileQueryspace APIs |
| * to track disk space usage |
| */ |
| WorkfileDiskspace_Init(); |
| } |
| |
| /* |
| * Returns pointer to the workfile manager cache |
| */ |
| Cache * |
| workfile_mgr_get_cache(void) |
| { |
| Assert(NULL != workfile_mgr_cache); |
| return workfile_mgr_cache; |
| } |
| |
| /* |
| * compute the size of shared memory for the workfile manager |
| */ |
| Size |
| workfile_mgr_shmem_size(void) |
| { |
| return Cache_SharedMemSize(gp_workfile_max_entries, sizeof(workfile_set)) + |
| WorkfileDiskspace_ShMemSize() + WorkfileQueryspace_ShMemSize(); |
| } |
| |
| |
| |
| /* |
| * Retrieves the operator name. |
| * Result is palloc-ed in the current memory context. |
| */ |
| static StringInfo |
| get_name_from_nodeType(const NodeTag node_type) |
| { |
| StringInfo operator_name = makeStringInfo(); |
| |
| switch ( node_type ) |
| { |
| case T_AggState: |
| appendStringInfoString(operator_name,"Agg"); |
| break; |
| case T_HashJoinState: |
| appendStringInfoString(operator_name,"HashJoin"); |
| break; |
| case T_MaterialState: |
| appendStringInfoString(operator_name,"Material"); |
| break; |
| case T_SortState: |
| appendStringInfoString(operator_name,"Sort"); |
| break; |
| case T_Invalid: |
| /* When spilling from a builtin function, we don't have a valid node type */ |
| appendStringInfoString(operator_name,"BuiltinFunction"); |
| break; |
| default: |
| Assert(false && "Operator not supported by the workfile manager"); |
| } |
| |
| return operator_name; |
| } |
| |
| /* |
| * Create a new file set |
| * type is the WorkFileType for the files: BUFFILE or BFZ |
| * can_be_reused: if set to false, then we don't insert this set into the cache, |
| * since the caller is telling us there is no point. This can happen for |
| * example when spilling during index creation. |
| * ps is the PlanState for the subtree rooted at the operator |
| * snapshot contains snapshot information for the current transaction |
| * |
| */ |
| workfile_set * |
| workfile_mgr_create_set(enum ExecWorkFileType type, bool can_be_reused, PlanState *ps, workfile_set_snapshot snapshot) |
| { |
| Assert(NULL != workfile_mgr_cache); |
| |
| Plan *plan = NULL; |
| if (ps != NULL) |
| { |
| plan = ps->plan; |
| } |
| |
| AssertImply(can_be_reused, plan != NULL); |
| |
| NodeTag node_type = T_Invalid; |
| if (ps != NULL) |
| { |
| node_type = ps->type; |
| } |
| char *dir_path = create_workset_directory(node_type, currentSliceId); |
| |
| /* Create parameter info for the populate function */ |
| workset_info set_info; |
| set_info.file_type = type; |
| set_info.snapshot = snapshot; |
| set_info.nodeType = node_type; |
| set_info.can_be_reused = can_be_reused && workfile_mgr_is_reusable(ps); |
| set_info.dir_path = dir_path; |
| set_info.session_start_time = GetCurrentTimestamp(); |
| set_info.operator_work_mem = get_operator_work_mem(ps); |
| set_info.on_disk = true; |
| |
| CacheEntry *newEntry = NULL; |
| |
| PG_TRY(); |
| { |
| newEntry = acquire_entry_retry(workfile_mgr_cache, &set_info); |
| } |
| PG_CATCH(); |
| { |
| /* Failed to acquire new entry, cache full. Clean up the directory we created. */ |
| workfile_mgr_delete_set_directory(dir_path); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| /* Path has now been copied to the workfile_set. We can free it */ |
| pfree(dir_path); |
| |
| /* Complete initialization of the entry with post-acquire actions */ |
| Assert(NULL != newEntry); |
| workfile_set *work_set = CACHE_ENTRY_PAYLOAD(newEntry); |
| Assert(work_set != NULL); |
| if (work_set->can_be_reused) |
| { |
| Assert(plan != NULL); |
| Assert(nodeTag(plan) >= T_Plan && nodeTag(plan) < T_PlanInvalItem); |
| |
| workfile_set_plan *s_plan = workfile_mgr_serialize_plan(ps); |
| work_set->key = workfile_mgr_hash_key(s_plan); |
| workfile_mgr_save_plan(work_set, s_plan); |
| workfile_mgr_free_plan(s_plan); |
| } |
| |
| elog(gp_workfile_caching_loglevel, "new spill file set. key=0x%x can_be_reused=%d prefix=%s opMemKB=" INT64_FORMAT, |
| work_set->key, work_set->can_be_reused, work_set->path, work_set->metadata.operator_work_mem); |
| |
| return work_set; |
| } |
| |
| /* |
| * Creates the workset directory and returns the path. |
| * Throws an error if path or directory cannot be created. |
| * |
| * Returns the name of the directory created. |
| * The name returned is palloc-ed in the current memory context. |
| * |
| */ |
| static char * |
| create_workset_directory(NodeTag node_type, int slice_id) |
| { |
| /* Create base directory here. We need database relative path */ |
| StringInfo tmp_dirpath = makeStringInfo(); |
| |
| appendStringInfo(tmp_dirpath, |
| "%s/%s", |
| getCurrentTempFilePath, |
| PG_TEMP_FILES_DIR); |
| |
| if (tmp_dirpath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path %s/%s", |
| getCurrentTempFilePath, |
| PG_TEMP_FILES_DIR))); |
| } |
| |
| mkdir(tmp_dirpath->data, S_IRWXU); |
| pfree(tmp_dirpath->data); |
| pfree(tmp_dirpath); |
| |
| /* Create workset directory here */ |
| StringInfo operator_name = get_name_from_nodeType(node_type); |
| StringInfo workset_path_masked = makeStringInfo(); |
| |
| appendStringInfo(workset_path_masked, |
| "%s/%s/%s_%s_Slice%d.%s", |
| getCurrentTempFilePath, |
| PG_TEMP_FILES_DIR, |
| WORKFILE_SET_PREFIX, |
| operator_name->data, |
| slice_id, |
| WORKFILE_SET_MASK); |
| |
| if (workset_path_masked->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path %s/%s/%s_%s_Slice%d.%s", |
| getCurrentTempFilePath, |
| PG_TEMP_FILES_DIR, |
| WORKFILE_SET_PREFIX, |
| operator_name->data, |
| slice_id, |
| WORKFILE_SET_MASK))); |
| } |
| |
| char *workset_path_unmasked = gp_mkdtemp(workset_path_masked->data); |
| if (workset_path_unmasked == NULL) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("could not create spill file directory: %s", workset_path_masked->data))); |
| } |
| |
| char *final_path = (char *) palloc0(MAXPGPATH); |
| |
| /* Initialize result path. Strip prefix from path since bfz/fd add the getCurrentTempFilePath to it */ |
| strncpy(final_path, |
| workset_path_unmasked + strlen(getCurrentTempFilePath) + 1, |
| MAXPGPATH); |
| |
| if ( strlen(workset_path_unmasked + strlen(getCurrentTempFilePath) + 1) |
| > MAXPGPATH ) |
| { |
| ereport(ERROR, (errmsg("cannot generate path %s", |
| workset_path_unmasked + strlen(getCurrentTempFilePath) + 1))); |
| } |
| |
| pfree(workset_path_masked->data); |
| pfree(workset_path_masked); |
| pfree(operator_name->data); |
| pfree(operator_name); |
| |
| return final_path; |
| } |
| |
| /* |
| * SharedCache callback. Populates a newly acquired workfile_set before |
| * returning it to the caller. |
| */ |
| static void |
| workfile_mgr_populate_set(const void *resource, const void *param) |
| { |
| Assert(NULL != resource); |
| Assert(NULL != param); |
| |
| workfile_set *work_set = (workfile_set *) resource; |
| workset_info *set_info = (workset_info *) param; |
| |
| work_set->metadata.operator_work_mem = set_info->operator_work_mem; |
| work_set->set_plan = NULL; |
| |
| if (!set_info->on_disk) |
| { |
| /* This is for a "virtual" work_set, used for look-ups. No need to populate further */ |
| Assert(NULL == set_info->dir_path); |
| work_set->on_disk = false; |
| } |
| else |
| { |
| |
| work_set->complete = false; |
| work_set->no_files = 0; |
| work_set->size = 0L; |
| work_set->in_progress_size = 0L; |
| work_set->node_type = set_info->nodeType; |
| work_set->metadata.type = set_info->file_type; |
| work_set->metadata.bfz_compress_type = gp_workfile_compress_algorithm; |
| work_set->metadata.snapshot = set_info->snapshot; |
| work_set->metadata.num_leaf_files = 0; |
| work_set->slice_id = currentSliceId; |
| work_set->session_id = gp_session_id; |
| work_set->command_count = gp_command_count; |
| work_set->session_start_time = set_info->session_start_time; |
| |
| /* If workfile caching is disabled, nothing should be re-used, so override whatever the caller says */ |
| work_set->can_be_reused = gp_workfile_caching && set_info->can_be_reused; |
| |
| Assert(strlen(set_info->dir_path) < MAXPGPATH); |
| strncpy(work_set->path, set_info->dir_path, MAXPGPATH); |
| work_set->on_disk = true; |
| } |
| } |
| |
| /* |
| * Determine operatorMemKB for this operator. |
| * For HashJoin, this is given by the right child, for everyone else it is the actual node. |
| * |
| * If PlanState is NULL (e.g. when spilling from a built-in function), return 0. |
| */ |
| static uint64 |
| get_operator_work_mem(PlanState *ps) |
| { |
| if (NULL == ps) |
| { |
| return 0; |
| } |
| |
| PlanState *psOp = ps; |
| if (IsA(ps,HashJoinState)) |
| { |
| Assert(IsA(ps->righttree, HashState)); |
| psOp = ps->righttree; |
| } |
| |
| return PlanStateOperatorMemKB(psOp); |
| } |
| |
| /* |
| * Look up file set in cache given a certain plan. Check if it's usable |
| * for the current query. If not found, or not re-usable, return NULL. |
| */ |
| workfile_set * |
| workfile_mgr_find_set(PlanState *ps) |
| { |
| Assert(NULL != ps); |
| |
| /* Check if there is any point in looking this up in the cache. Some subplans are just not reusable */ |
| if (!workfile_mgr_is_reusable(ps)) |
| { |
| return NULL; |
| } |
| |
| /* Look up plan in cache */ |
| workfile_set *work_set = workfile_mgr_lookup_set(ps); |
| if (work_set == NULL) |
| { |
| return NULL; |
| } |
| |
| /* Check to see if we can reuse what we found */ |
| if (workfile_mgr_can_reuse(work_set, ps)) |
| { |
| return work_set; |
| } |
| else |
| { |
| elog(gp_workfile_caching_loglevel, "Found matching work_set but cannot reuse"); |
| return NULL; |
| } |
| } |
| |
| /* |
| * Check to see if a plan is can ever generate reusable workfiles. |
| * For example, we cannot store workfiles for queries containing parameters |
| * or external table scans. |
| */ |
| static bool |
| workfile_mgr_is_reusable(PlanState *ps) |
| { |
| Assert(NULL != ps); |
| |
| Plan *plan = ps->plan; |
| |
| /* Don't allow caching of workfiles for parameterized queries */ |
| bool parameterized = (plan->allParam != NULL) || (plan->extParam != NULL); |
| if (parameterized) |
| { |
| elog(gp_workfile_caching_loglevel, "Parameterized plan not considered for workfile caching"); |
| return false; |
| } |
| |
| bool cacheable_plan = workfile_mgr_is_cacheable_plan(ps); |
| if (!cacheable_plan) |
| { |
| elog(gp_workfile_caching_loglevel, "Plan not considered for workfile caching"); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /* |
| * Walker function to test if a subtree plan contains any operators that cannot |
| * be cached: |
| * - node evaluates a non-immutable function |
| * - node has an expression in the target or qual list that evaluates |
| * a non-immutable function |
| * - external table scan |
| * - share input scan (because of synchronization issues) |
| * |
| * Returns CdbVisit_Failure if it finds an offending node |
| */ |
| static CdbVisitOpt |
| PlanNonCacheableWalker(PlanState *ps, |
| void *context) |
| { |
| if (IsA(ps, FunctionScanState)) |
| { |
| Expr *fn_expr = ((FunctionScanState *) ps)->funcexpr->expr; |
| /* |
| * ExecMakeTableFunctionResult() says that the funcexpr associated |
| * with this node can be either FuncExpr or a generic Expr. |
| * Use the generic expression walker to look for non-cacheable |
| * functions, as it supports both cases. |
| */ |
| if (ExprNonCacheableWalker((Node *) fn_expr, NULL /* ctx */)) |
| { |
| return CdbVisit_Failure; |
| } |
| } |
| |
| if (IsA(ps, TableFunctionState)) |
| { |
| Oid fn_oid = ((TableFunctionState *) ps)->fcache->func.fn_oid; |
| if (!isFuncCacheable(fn_oid)) |
| { |
| return CdbVisit_Failure; |
| } |
| } |
| |
| if (IsA(ps, ExternalScanState) || IsA(ps,ShareInputScanState)) |
| { |
| return CdbVisit_Failure; |
| } |
| |
| /* Check qual and target list of the node for any non-cacheable functions */ |
| List *qual = ps->plan->qual; |
| List *tlist = ps->plan->targetlist; |
| if (ExprNonCacheableWalker((Node *) tlist, NULL /* ctx */) || |
| ExprNonCacheableWalker((Node *) qual, NULL /* ctx */)) |
| { |
| return CdbVisit_Failure; |
| } |
| |
| /* Continue walk */ |
| return CdbVisit_Walk; |
| } |
| |
| |
| /* |
| * Walker function to check if an expression list can be cached. |
| * This is used for both the projection and qual lists. |
| * |
| * Returns true if any non-cacheable functions are found, false otherwise. |
| */ |
| static bool |
| ExprNonCacheableWalker(Node *expr, void *ctx) |
| { |
| Assert(ctx == NULL); |
| |
| if (expr == NULL) |
| { |
| return false; |
| } |
| else if (IsA(expr, FuncExpr)) |
| { |
| Oid fn_oid = ((FuncExpr *) expr)->funcid; |
| if (!isFuncCacheable(fn_oid)) |
| { |
| /* Found expression using non-cacheable function. We're done, end walker */ |
| return true; |
| } |
| } |
| |
| return expression_tree_walker(expr, ExprNonCacheableWalker, ctx); |
| } |
| |
| /* |
| * Checks if the results of a function can be cached. |
| * Only IMMUTABLE functions can be cached. |
| * |
| * Returns true if function results can be cached, false otherwise. |
| */ |
| static bool |
| isFuncCacheable(Oid fn_oid) |
| { |
| char fn_provolatile = func_volatile(fn_oid); |
| return (fn_provolatile == PROVOLATILE_IMMUTABLE); |
| } |
| |
| /* |
| * Check if a plan contains any operators that cannot be cached |
| * |
| * Returns true if subplan can be cached, false otherwise |
| */ |
| static bool |
| workfile_mgr_is_cacheable_plan(PlanState *ps) |
| { |
| Assert(NULL != ps); |
| |
| CdbVisitOpt status = planstate_walk_node(ps, PlanNonCacheableWalker, NULL); |
| |
| /* status == CdbVisit_Failure means we found a volatile node */ |
| return (status != CdbVisit_Failure); |
| } |
| |
| /* |
| * Look up file set the cache given a certain PlanState. |
| * Return NULL if not found. |
| */ |
| static workfile_set * |
| workfile_mgr_lookup_set(PlanState *ps) |
| { |
| Assert(NULL != ps); |
| Assert(NULL != workfile_mgr_cache); |
| Assert(NULL != ps->plan); |
| Assert(nodeTag(ps->plan) >= T_Plan && nodeTag(ps->plan) < T_PlanInvalItem); |
| |
| /* Create parameter info for the populate function */ |
| workset_info set_info; |
| set_info.dir_path = NULL; |
| set_info.operator_work_mem = get_operator_work_mem(ps); |
| set_info.on_disk = false; |
| |
| CacheEntry *localEntry = acquire_entry_retry(workfile_mgr_cache, &set_info); |
| Assert(localEntry != NULL); |
| |
| workfile_set *local_work_set = (workfile_set *) CACHE_ENTRY_PAYLOAD(localEntry); |
| |
| /* Populate the rest of the entries needed for look-up |
| * Allocate the serialized plan in the TopMemoryContext since this memory |
| * context is still available when calling the transaction callback at the |
| * time when the transaction aborts. |
| */ |
| MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext); |
| workfile_set_plan *s_plan = workfile_mgr_serialize_plan(ps); |
| MemoryContextSwitchTo(oldcxt); |
| |
| Assert(s_plan != NULL); |
| local_work_set->set_plan = s_plan; |
| local_work_set->key = workfile_mgr_hash_key(s_plan); |
| |
| CacheEntry *cachedEntry = Cache_Lookup(workfile_mgr_cache, localEntry); |
| |
| /* Release local entry and free up plan memory. We don't need it anymore */ |
| Cache_Release(workfile_mgr_cache, localEntry); |
| |
| workfile_set *work_set = NULL; |
| if (NULL != cachedEntry) |
| { |
| work_set = (workfile_set *) CACHE_ENTRY_PAYLOAD(cachedEntry); |
| } |
| |
| return work_set; |
| } |
| |
| /* |
| * Acquire an entry from the cache. If the cache is full (reached gp_workfile_max_entries), |
| * trigger evictions and try again. |
| * If the cache remains full after max_retries, give up and error out. |
| * |
| * populate_param is the parameter to be passed to Cache_AcquireEntry. It |
| * will be used to populate the entry before being returned. |
| */ |
| static CacheEntry * |
| acquire_entry_retry(Cache *cache, workset_info *populate_param) |
| { |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, populate_param); |
| |
| int crt_retry = 0; |
| while (NULL == localEntry && crt_retry < MAX_EVICT_ATTEMPTS) |
| { |
| /* |
| * We reached maximum number of entries in the cache. Evict something |
| * then try again. |
| */ |
| int64 size_evicted = workfile_mgr_evict(MIN_EVICT_SIZE); |
| elog(gp_workfile_caching_loglevel, "Hit cache entries full, evicted " INT64_FORMAT " bytes", size_evicted); |
| |
| localEntry = Cache_AcquireEntry(cache, populate_param); |
| crt_retry++; |
| } |
| |
| if (NULL == localEntry) |
| { |
| /* Could not acquire another entry from the cache - we filled it up */ |
| elog(ERROR, "could not create workfile manager entry: exceeded number of concurrent spilling queries"); |
| } |
| |
| return localEntry; |
| } |
| |
| /* |
| * Thoroughly checks if an existing workfile_set can be used for the current |
| * subplan. |
| */ |
| bool |
| workfile_mgr_can_reuse(workfile_set *work_set, PlanState *ps) |
| { |
| Assert(NULL != work_set); |
| Assert(NULL != ps); |
| |
| uint64 operatorMemKB = get_operator_work_mem(ps); |
| Assert(operatorMemKB > 0); |
| |
| if (operatorMemKB < work_set->metadata.operator_work_mem) |
| { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /* |
| * Clears entire contents of workfile cache |
| * |
| * If seg_id == UNDEF_SEGMENT run on all segments, otherwise run only |
| * on segment seg_id. |
| * |
| * Returns the number of entries removed |
| */ |
| int32 |
| workfile_mgr_clear_cache(int seg_id) |
| { |
| int no_cleared = 0; |
| if (seg_id == UNDEF_SEGMENT || GetQEIndex() == seg_id) |
| { |
| Cache *cache = workfile_mgr_get_cache(); |
| no_cleared = Cache_Clear(cache); |
| } |
| |
| return no_cleared; |
| } |
| |
| /* |
| * Physically delete a spill set. Path must not include database prefix. |
| */ |
| static void |
| workfile_mgr_delete_set_directory(char *workset_path) |
| { |
| /* Add filespace prefix to path */ |
| char *reldirpath = (char*)palloc(PATH_MAX); |
| if (snprintf(reldirpath, PATH_MAX, "%s/%s", getCurrentTempFilePath, workset_path) > PATH_MAX) |
| { |
| ereport(ERROR, (errmsg("cannot generate path %s/%s", getCurrentTempFilePath, |
| workset_path))); |
| } |
| |
| Assert(reldirpath != NULL); |
| |
| workfile_mgr_unlink_directory(reldirpath); |
| pfree(reldirpath); |
| } |
| |
| /* |
| * Physically delete a spill file set. Path is assumed to be database relative. |
| */ |
| static void |
| workfile_mgr_unlink_directory(const char *dirpath) |
| { |
| |
| elog(gp_workfile_caching_loglevel, "deleting spill file set directory %s", dirpath); |
| |
| int res = rmtree(dirpath,true); |
| |
| if (!res) |
| { |
| int error_level = ERROR; |
| |
| /* If we are already in an abort transaction, don't throw an exception */ |
| if (IsAbortInProgress()) |
| { |
| error_level = WARNING; |
| } |
| ereport(error_level, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("could not remove spill file directory"))); |
| } |
| |
| } |
| |
| /* |
| * Workfile-manager specific function to clean up before releasing a |
| * workfile set from the cache. |
| * |
| */ |
| static void |
| workfile_mgr_cleanup_set(const void *resource) |
| { |
| workfile_set *work_set = (workfile_set *) resource; |
| |
| /* |
| * We have to make this callback function return cleanly ALL the |
| * time. It shouldn't throw an exception. |
| * We must try to clean up as much as we can in the callback, and |
| * then never be called again. |
| * This means holding interrupts, catching and handling all exceptions. |
| */ |
| |
| if (work_set->on_disk) |
| { |
| ereport(gp_workfile_caching_loglevel, |
| (errmsg("workfile mgr cleanup deleting set: key=0x%0xd, size=" INT64_FORMAT |
| " in_progress_size=" INT64_FORMAT " path=%s", |
| work_set->key, |
| work_set->size, |
| work_set->in_progress_size, |
| work_set->path), |
| errprintstack(true))); |
| |
| Assert(NULL == work_set->set_plan); |
| |
| PG_TRY(); |
| { |
| |
| #ifdef FAULT_INJECTOR |
| FaultInjector_InjectFaultIfSet( |
| WorkfileCleanupSet, |
| DDLNotSpecified, |
| "", /* databaseName */ |
| "" /* tableName */ |
| ); |
| #endif |
| |
| /* Prevent interrupts while cleaning up */ |
| HOLD_INTERRUPTS(); |
| |
| workfile_mgr_delete_set_directory(work_set->path); |
| |
| /* Now we can allow interrupts again */ |
| RESUME_INTERRUPTS(); |
| |
| } |
| PG_CATCH(); |
| { |
| elog(LOG, "Cleaning up workfile set directory path=%s failed. Proceeding", |
| work_set->path); |
| |
| /* We're not re-throwing the error. Otherwise we'll end up having |
| * to clean up again, probably failing again. |
| */ |
| } |
| PG_END_TRY(); |
| |
| /* |
| * The most accurate size of a workset is recorded in work_set->in_progress_size. |
| * work_set->size is only updated when we close a file, so it lags behind |
| */ |
| |
| Assert(work_set->in_progress_size >= work_set->size); |
| int64 size_to_delete = work_set->in_progress_size; |
| |
| elog(gp_workfile_caching_loglevel, "Subtracting " INT64_FORMAT " from workfile diskspace", size_to_delete); |
| |
| /* |
| * When subtracting the size of this workset from our accounting, |
| * only update the per-query counter if we created the workset. |
| * In that case, the state is ACQUIRED, otherwise is CACHED or DELETED |
| */ |
| CacheEntry *cacheEntry = CACHE_ENTRY_HEADER(resource); |
| bool update_query_space = (cacheEntry->state == CACHE_ENTRY_ACQUIRED); |
| |
| WorkfileDiskspace_Commit(0, size_to_delete, update_query_space); |
| } |
| else |
| { |
| /* Non-physical workfile set, we need to free up the plan memory */ |
| if (NULL != work_set->set_plan->serialized_plan) |
| { |
| pfree(work_set->set_plan->serialized_plan); |
| } |
| |
| if (NULL != work_set->set_plan) |
| { |
| pfree(work_set->set_plan); |
| } |
| } |
| } |
| |
| /* |
| * Close a spill file set. If we're planning to re-use it, insert it in the |
| * cache. If not, let the cleanup routine delete the files and free up memory. |
| */ |
| void |
| workfile_mgr_close_set(workfile_set *work_set) |
| { |
| Assert(work_set!=NULL); |
| |
| elog(gp_workfile_caching_loglevel, "closing workfile set: can_be_reused=%d complete=%d location: %s, size=" INT64_FORMAT |
| " in_progress_size=" INT64_FORMAT, |
| work_set->can_be_reused, work_set->complete, work_set->path, |
| work_set->size, work_set->in_progress_size); |
| |
| CacheEntry *cache_entry = CACHE_ENTRY_HEADER(work_set); |
| |
| if (Cache_IsCached(cache_entry)) |
| { |
| /* Workset came from cache. Just release it, nothing to do */ |
| Cache_Release(workfile_mgr_cache, cache_entry); |
| return; |
| } |
| |
| if (work_set->complete && work_set->can_be_reused) |
| { |
| cache_entry->size = work_set->size; |
| |
| /* We want to keep this one around. Insert into cache */ |
| Cache_Insert(workfile_mgr_cache, cache_entry); |
| Cache_Release(workfile_mgr_cache, cache_entry); |
| return; |
| } |
| |
| |
| /* |
| * Fall-through case: We need to delete this work_set, as it's not reusable. |
| */ |
| Assert(!work_set->complete || !work_set->can_be_reused); |
| Cache_Release(workfile_mgr_cache, cache_entry); |
| } |
| |
| /* |
| * Mark a workfile_set as complete. This means it should be cached upon closing, |
| * as it can be re-used. |
| * If the operator is canceled or fails after this, the workfile set can |
| * still be re-used. |
| * This should be done after all the data has been flushed to disk. |
| * After this point, assume anyone can read and re-use this set. |
| */ |
| |
| void workfile_mgr_mark_complete(workfile_set *work_set) |
| { |
| Assert(work_set != NULL); |
| Assert(!work_set->complete); |
| |
| work_set->complete = true; |
| } |
| |
| /* |
| * This function is called at transaction commit or abort to delete closed |
| * workfiles. |
| */ |
| void |
| workfile_mgr_cleanup(void) |
| { |
| Assert(NULL != workfile_mgr_cache); |
| Cache_SurrenderClientEntries(workfile_mgr_cache); |
| } |
| |
| /* |
| * Create a hash value based on a workfile_set_plan's signature. |
| */ |
| static workfile_set_hashkey_t |
| workfile_mgr_hash_key(workfile_set_plan *plan) |
| { |
| int key_len = plan->serialized_plan_len; |
| return tag_hash(plan->serialized_plan, key_len); |
| } |
| |
| /* |
| * Serializes a given plan node for hashing and matching. |
| * The serialized plan is palloc'd in the current memory context. |
| */ |
| static workfile_set_plan * |
| workfile_mgr_serialize_plan(PlanState *ps) |
| { |
| Assert(ps); |
| Plan *plan = ps->plan; |
| workfile_set_plan *splan = NULL; |
| splan = (workfile_set_plan *) palloc0(sizeof(workfile_set_plan)); |
| |
| Assert(nodeTag(plan) >= T_Plan && nodeTag(plan) < T_PlanInvalItem); |
| |
| /* serialize plan, without outputting the variable fields */ |
| outfast_workfile_mgr_init(ps->state->es_range_table); |
| |
| char *serialized_plan = NULL; |
| int plan_len = 0; |
| PG_TRY(); |
| { |
| serialized_plan = nodeToBinaryStringFast(plan, &plan_len); |
| Assert(plan_len > 0); |
| } |
| PG_CATCH(); |
| { |
| outfast_workfile_mgr_end(); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| outfast_workfile_mgr_end(); |
| |
| Assert(serialized_plan); |
| splan->serialized_plan = serialized_plan; |
| splan->serialized_plan_len = plan_len; |
| |
| return splan; |
| } |
| |
| /* |
| * Free up a workfile manager plan structure. |
| */ |
| static void |
| workfile_mgr_free_plan(workfile_set_plan *sf_plan) |
| { |
| Assert(sf_plan != NULL); |
| pfree(sf_plan->serialized_plan); |
| pfree(sf_plan); |
| } |
| |
| /* |
| * Save the serialized plan to a file in the workfile set. |
| * It will be used to do full plan matching before reusing. |
| */ |
| static void |
| workfile_mgr_save_plan(workfile_set *work_set, workfile_set_plan *sf_plan) |
| { |
| Assert(work_set); |
| Assert(sf_plan); |
| |
| ExecWorkFile *plan_file = workfile_mgr_create_fileno(work_set, WORKFILE_NUM_ALL_PLAN); |
| insist_log(plan_file != NULL, "Could not create temporary work file: %m"); |
| |
| elog(gp_workfile_caching_loglevel, "Saving query plan to file %s", ExecWorkFile_GetFileName(plan_file)); |
| |
| |
| bool res = ExecWorkFile_Write(plan_file, sf_plan->serialized_plan, |
| sf_plan->serialized_plan_len); |
| if(!res) |
| { |
| workfile_mgr_report_error(); |
| } |
| |
| workfile_mgr_close_file(work_set, plan_file, true); |
| } |
| |
| /* |
| * Callback function to test if two cache resources are equivalent. |
| */ |
| static bool |
| workfile_set_equivalent(const void *virtual_resource, const void *physical_resource) |
| { |
| Assert(NULL != virtual_resource); |
| Assert(NULL != physical_resource); |
| |
| workfile_set *virtual_workset = (workfile_set *) virtual_resource; |
| workfile_set *physical_workset = (workfile_set *) physical_resource; |
| |
| if (virtual_workset->key != physical_workset->key) |
| { |
| return false; |
| } |
| |
| if (virtual_workset->metadata.operator_work_mem < physical_workset->metadata.operator_work_mem) |
| { |
| /* |
| * Found a potential match, but the work_mem with which it was spilled |
| * is too high, so we cannot load it with our current work_mem. Skip it. |
| */ |
| return false; |
| } |
| |
| Assert(!virtual_workset->on_disk && physical_workset->on_disk && "comparing two physical or two virtual worksets not supported"); |
| Assert(NULL != virtual_workset->set_plan); |
| |
| return workfile_mgr_compare_plan(physical_workset, virtual_workset->set_plan); |
| |
| } |
| |
| /* |
| * Do a byte-by-byte comparison between a given plan and a saved one. |
| * |
| * Returns true if identical, false otherwise |
| * |
| */ |
| static bool |
| workfile_mgr_compare_plan(workfile_set *work_set, workfile_set_plan *sf_plan) |
| { |
| Assert(NULL != work_set); |
| Assert(NULL != sf_plan); |
| |
| ExecWorkFile *plan_file = workfile_mgr_open_fileno(work_set, WORKFILE_NUM_ALL_PLAN); |
| elog(gp_workfile_caching_loglevel, "Loading and comparing query plan from file %s", |
| ExecWorkFile_GetFileName(plan_file)); |
| |
| if (plan_file == NULL) |
| { |
| elog(gp_workfile_caching_loglevel, "could not open plan file for matching for set %s", |
| work_set->path); |
| return false; |
| } |
| |
| char buffer[BLCKSZ]; |
| uint64 plan_offset = 0; |
| bool match = false; |
| |
| while (true) |
| { |
| uint64 size_read = ExecWorkFile_Read(plan_file, buffer, sizeof(buffer)); |
| |
| if (plan_offset + size_read > sf_plan->serialized_plan_len) |
| { |
| /* Disk plan is larger than new plan. No match */ |
| break; |
| } |
| |
| if (size_read < sizeof(buffer) && |
| plan_offset + size_read < sf_plan->serialized_plan_len) |
| { |
| /* Disk plan is smaller than new plan. No match */ |
| break; |
| } |
| |
| /* We have enough data in memory to compare */ |
| char *plan_pointer = ((char *) sf_plan->serialized_plan ) + plan_offset; |
| if ( memcmp(buffer, plan_pointer, size_read) != 0) |
| { |
| break; |
| } |
| |
| /* Reached the end of both streams, with no miss-match */ |
| if (size_read < sizeof(buffer)) |
| { |
| match = true; |
| break; |
| } |
| plan_offset += size_read; |
| } |
| |
| workfile_mgr_close_file(work_set, plan_file, true); |
| return match; |
| } |
| |
| /* |
| * Runs the eviction algorithm to identify victims and evicts them. It attempts |
| * to evict victims with cumulative size >= desiredSize |
| * |
| * Returns the actual cumulative size of all the sets evicted |
| */ |
| int64 |
| workfile_mgr_evict(int64 size_requested) |
| { |
| Assert(size_requested > 0); |
| Assert(NULL != workfile_mgr_cache); |
| |
| int64 size_evicted = Cache_Evict(workfile_mgr_cache, size_requested); |
| |
| elog(gp_workfile_caching_loglevel, "Eviction: requested=" INT64_FORMAT " evicted=" INT64_FORMAT, |
| size_requested, size_evicted); |
| |
| return size_evicted; |
| } |
| |
| /* |
| * Updates the in-progress size of a workset while it is being created. |
| */ |
| void |
| workfile_update_in_progress_size(ExecWorkFile *workfile, int64 size) |
| { |
| if (NULL != workfile->work_set) |
| { |
| workfile->work_set->in_progress_size += size; |
| Assert(workfile->work_set->in_progress_size >= 0); |
| } |
| } |
| |
| /* |
| * Reports corresponding error message when the query or segment size limit is exceeded. |
| */ |
| void |
| workfile_mgr_report_error(void) |
| { |
| char* message = NULL; |
| |
| switch(workfileError) |
| { |
| case WORKFILE_ERROR_LIMIT_PER_QUERY: |
| message = "workfile per query size limit exceeded"; |
| break; |
| case WORKFILE_ERROR_LIMIT_PER_SEGMENT: |
| message = "workfile per segment size limit exceeded"; |
| break; |
| case WORKFILE_ERROR_LIMIT_FILES_PER_QUERY: |
| message = "number of workfiles per query limit exceeded"; |
| break; |
| default: |
| message = "could not write to temporary file"; |
| break; |
| } |
| |
| ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), |
| errmsg("%s", message))); |
| } |
| |
| /* EOF */ |