| /*------------------------------------------------------------------------- |
| * |
| * nodeHash.c |
| * Routines to hash relations for hashjoin |
| * |
| * Portions Copyright (c) 2006-2008, Greenplum inc |
| * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. |
| * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/executor/nodeHash.c |
| * |
| * See note on parallelism in nodeHashjoin.c. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| /* |
| * INTERFACE ROUTINES |
| * MultiExecHash - generate an in-memory hash table of the relation |
| * ExecInitHash - initialize node and subnodes |
| * ExecEndHash - shutdown node and subnodes |
| */ |
| |
| #include "postgres.h" |
| |
| #include <math.h> |
| #include <limits.h> |
| |
| #include "access/hash.h" |
| #include "access/htup_details.h" |
| #include "access/parallel.h" |
| #include "catalog/pg_statistic.h" |
| #include "commands/tablespace.h" |
| #include "executor/execdebug.h" |
| #include "executor/hashjoin.h" |
| #include "executor/nodeHash.h" |
| #include "executor/nodeHashjoin.h" |
| #include "executor/nodeRuntimeFilter.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "port/atomics.h" |
| #include "port/pg_bitutils.h" |
| #include "utils/dynahash.h" |
| #include "utils/guc.h" |
| #include "utils/lsyscache.h" |
| #include "utils/faultinjector.h" |
| #include "utils/syscache.h" |
| |
| #include "cdb/cdbexplain.h" |
| #include "cdb/cdbutil.h" |
| #include "cdb/cdbvars.h" |
| |
| #include "utils/memutils.h" |
| #include "utils/syscache.h" |
| |
| #include "lib/bloomfilter.h" |
| |
| static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); |
| static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); |
| static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); |
| static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); |
| static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, |
| int mcvsToUse); |
| static void ExecHashSkewTableInsert(HashState *hashState, |
| HashJoinTable hashtable, |
| TupleTableSlot *slot, |
| uint32 hashvalue, |
| int bucketNumber); |
| static void ExecHashRemoveNextSkewBucket(HashState *hashState, HashJoinTable hashtable); |
| |
| static void ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf); |
| static void |
| ExecHashTableExplainBatches(HashJoinTable hashtable, |
| StringInfo buf, |
| int ibatch_begin, |
| int ibatch_end, |
| const char *title); |
| static void *dense_alloc(HashJoinTable hashtable, Size size); |
| static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, |
| size_t size, |
| dsa_pointer *shared); |
| static void MultiExecPrivateHash(HashState *node); |
| static void MultiExecParallelHash(HashState *node); |
| static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, |
| int bucketno); |
| static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, |
| HashJoinTuple tuple); |
| static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, |
| HashJoinTuple tuple, |
| dsa_pointer tuple_shared); |
| static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); |
| static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); |
| static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); |
| static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); |
| static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, |
| dsa_pointer *shared); |
| static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, |
| int batchno, |
| size_t size); |
| static void ExecParallelHashMergeCounters(HashJoinTable hashtable); |
| static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); |
| static void AddTupleValuesIntoRF(HashState *node, TupleTableSlot *slot); |
| static void PushdownRuntimeFilter(HashState *node); |
| static void FreeRuntimeFilter(HashState *node); |
| static void ResetRuntimeFilter(HashState *node); |
| |
| /* ---------------------------------------------------------------- |
| * ExecHash |
| * |
| * stub for pro forma compliance |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * |
| ExecHash(PlanState *pstate) |
| { |
| elog(ERROR, "Hash node does not support ExecProcNode call convention"); |
| return NULL; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * MultiExecHash |
| * |
| * build hash table for hashjoin, doing partitioning if more |
| * than one batch is required. |
| * ---------------------------------------------------------------- |
| */ |
| Node * |
| MultiExecHash(HashState *node) |
| { |
| bool parallel = node->parallel_state != NULL; |
| |
| /* must provide our own instrumentation support */ |
| if (node->ps.instrument) |
| InstrStartNode(node->ps.instrument); |
| |
| if (node->parallel_state != NULL) |
| MultiExecParallelHash(node); |
| else |
| MultiExecPrivateHash(node); |
| |
| RFBuildFinishCallback(node->rfstate, parallel); |
| |
| /* must provide our own instrumentation support */ |
| if (node->ps.instrument) |
| InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); |
| |
| /* |
| * We do not return the hash table directly because it's not a subtype of |
| * Node, and so would violate the MultiExecProcNode API. Instead, our |
| * parent Hashjoin node is expected to know how to fish it out of our node |
| * state. Ugly but not really worth cleaning up, since Hashjoin knows |
| * quite a bit more about Hash besides that. |
| */ |
| return NULL; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * MultiExecPrivateHash |
| * |
| * parallel-oblivious version, building a backend-private |
| * hash table and (if necessary) batch files. |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| MultiExecPrivateHash(HashState *node) |
| { |
| PlanState *outerNode; |
| List *hashkeys; |
| HashJoinTable hashtable; |
| TupleTableSlot *slot; |
| ExprContext *econtext; |
| uint32 hashvalue; |
| |
| /* |
| * get state info from node |
| */ |
| outerNode = outerPlanState(node); |
| hashtable = node->hashtable; |
| |
| /* |
| * set expression context |
| */ |
| hashkeys = node->hashkeys; |
| econtext = node->ps.ps_ExprContext; |
| |
| SIMPLE_FAULT_INJECTOR("multi_exec_hash_large_vmem"); |
| |
| /* |
| * Get all tuples from the node below the Hash node and insert into the |
| * hash table (or temp files). |
| */ |
| for (;;) |
| { |
| slot = ExecProcNode(outerNode); |
| if (TupIsNull(slot)) |
| { |
| if (gp_enable_runtime_filter_pushdown && node->filters) |
| PushdownRuntimeFilter(node); |
| break; |
| } |
| |
| if (gp_enable_runtime_filter_pushdown && node->filters) |
| AddTupleValuesIntoRF(node, slot); |
| |
| /* We have to compute the hash value */ |
| econtext->ecxt_outertuple = slot; |
| bool hashkeys_null = false; |
| if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, |
| false, hashtable->keepNulls, |
| &hashvalue, &hashkeys_null)) |
| { |
| int bucketNumber; |
| |
| bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); |
| if (bucketNumber != INVALID_SKEW_BUCKET_NO) |
| { |
| /* It's a skew tuple, so put it into that hash table */ |
| ExecHashSkewTableInsert(node, hashtable, slot, hashvalue, |
| bucketNumber); |
| hashtable->skewTuples += 1; |
| } |
| else |
| { |
| /* Not subject to skew optimization, so insert normally */ |
| ExecHashTableInsert(node, hashtable, slot, hashvalue); |
| } |
| hashtable->totalTuples += 1; |
| } |
| |
| if (hashkeys_null) |
| { |
| node->hs_hashkeys_null = true; |
| if (node->hs_quit_if_hashkeys_null) |
| { |
| ExecSquelchNode(outerNode, false); |
| return; |
| } |
| } |
| } |
| |
| /* Now we have set up all the initial batches & primary overflow batches. */ |
| hashtable->nbatch_outstart = hashtable->nbatch; |
| |
| /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ |
| if (hashtable->nbuckets != hashtable->nbuckets_optimal) |
| ExecHashIncreaseNumBuckets(hashtable); |
| |
| /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ |
| hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); |
| if (hashtable->spaceUsed > hashtable->spacePeak) |
| hashtable->spacePeak = hashtable->spaceUsed; |
| |
| hashtable->partialTuples = hashtable->totalTuples; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * MultiExecParallelHash |
| * |
| * parallel-aware version, building a shared hash table and |
| * (if necessary) batch files using the combined effort of |
| * a set of co-operating backends. |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| MultiExecParallelHash(HashState *node) |
| { |
| ParallelHashJoinState *pstate; |
| PlanState *outerNode; |
| List *hashkeys; |
| HashJoinTable hashtable; |
| TupleTableSlot *slot; |
| ExprContext *econtext; |
| uint32 hashvalue; |
| Barrier *build_barrier; |
| int i; |
| |
| /* |
| * get state info from node |
| */ |
| outerNode = outerPlanState(node); |
| hashtable = node->hashtable; |
| |
| /* |
| * set expression context |
| */ |
| hashkeys = node->hashkeys; |
| econtext = node->ps.ps_ExprContext; |
| |
| /* |
| * Synchronize the parallel hash table build. At this stage we know that |
| * the shared hash table has been or is being set up by |
| * ExecHashTableCreate(), but we don't know if our peers have returned |
| * from there or are here in MultiExecParallelHash(), and if so how far |
| * through they are. To find out, we check the build_barrier phase then |
| * and jump to the right step in the build algorithm. |
| */ |
| pstate = hashtable->parallel_state; |
| build_barrier = &pstate->build_barrier; |
| Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE); |
| switch (BarrierPhase(build_barrier)) |
| { |
| case PHJ_BUILD_ALLOCATE: |
| |
| /* |
| * Either I just allocated the initial hash table in |
| * ExecHashTableCreate(), or someone else is doing that. Either |
| * way, wait for everyone to arrive here so we can proceed. |
| */ |
| BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE); |
| /* Fall through. */ |
| |
| case PHJ_BUILD_HASH_INNER: |
| |
| /* |
| * It's time to begin hashing, or if we just arrived here then |
| * hashing is already underway, so join in that effort. While |
| * hashing we have to be prepared to help increase the number of |
| * batches or buckets at any time, and if we arrived here when |
| * that was already underway we'll have to help complete that work |
| * immediately so that it's safe to access batches and buckets |
| * below. |
| */ |
| if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != |
| PHJ_GROW_BATCHES_ELECT) |
| ExecParallelHashIncreaseNumBatches(hashtable); |
| if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != |
| PHJ_GROW_BUCKETS_ELECT) |
| ExecParallelHashIncreaseNumBuckets(hashtable); |
| ExecParallelHashEnsureBatchAccessors(hashtable); |
| ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| for (;;) |
| { |
| bool hashkeys_null = false; |
| |
| /* CBDB_PARALLEL: Siblings must have found null value. */ |
| if (pstate->phs_lasj_has_null) |
| { |
| node->hs_hashkeys_null = true; |
| ExecSquelchNode(outerNode, false); |
| break; |
| } |
| |
| slot = ExecProcNode(outerNode); |
| if (TupIsNull(slot)) |
| break; |
| |
| econtext->ecxt_outertuple = slot; |
| if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, |
| false, hashtable->keepNulls, |
| &hashvalue, &hashkeys_null)) |
| ExecParallelHashTableInsert(hashtable, slot, hashvalue); |
| hashtable->partialTuples++; |
| |
| if (node->hs_quit_if_hashkeys_null && hashkeys_null) |
| { |
| /* CBDB_PARALLEL: |
| * If we are LASJ and found NULL value by ourself or sibling processes had |
| * found NULL values, quit and tell siblings to quit if possible. |
| * |
| * It's safe to fetch and set phs_lasj_has_null without lock here and at |
| * other places. As it's a atomic boolean value. And we should avoid more locks in HashJion Impl. |
| * If other processes miss it here and some others set it at the same time, just bypass |
| * and we may get it at the next Hash batch. |
| * If we missed it across all batches, we will know it when PHJ_BUILD_HASHING_INNER |
| * ends with the help of build_barrier. |
| * If we never participated in building hash table, check it when hash table |
| * creation job is finished. |
| */ |
| pstate->phs_lasj_has_null = true; |
| pg_write_barrier(); |
| node->hs_hashkeys_null = true; |
| ExecSquelchNode(outerNode, false); |
| break; |
| } |
| } |
| |
| /* CBDB_PARALLEL: No need to flush tuples if phs_lasj_has_null. */ |
| /* |
| * Make sure that any tuples we wrote to disk are visible to |
| * others before anyone tries to load them. |
| */ |
| if (!pstate->phs_lasj_has_null) |
| { |
| for (i = 0; i < hashtable->nbatch; ++i) |
| sts_end_write(hashtable->batches[i].inner_tuples); |
| } |
| |
| /* |
| * Update shared counters. We need an accurate total tuple count |
| * to control the empty table optimization. |
| */ |
| ExecParallelHashMergeCounters(hashtable); |
| |
| BarrierDetach(&pstate->grow_buckets_barrier); |
| BarrierDetach(&pstate->grow_batches_barrier); |
| |
| /* |
| * Wait for everyone to finish building and flushing files and |
| * counters. |
| */ |
| if (BarrierArriveAndWait(build_barrier, |
| WAIT_EVENT_HASH_BUILD_HASH_INNER)) |
| { |
| /* |
| * Elect one backend to disable any further growth. Batches |
| * are now fixed. While building them we made sure they'd fit |
| * in our memory budget when we load them back in later (or we |
| * tried to do that and gave up because we detected extreme |
| * skew). |
| */ |
| pstate->growth = PHJ_GROWTH_DISABLED; |
| /* In case we didn't find null values ourself. */ |
| if (pstate->phs_lasj_has_null) |
| { |
| node->hs_hashkeys_null = true; |
| return; |
| } |
| } |
| } |
| |
| /* In case we didn't participate in PHJ_BUILD_HASHING_INNER */ |
| pg_memory_barrier(); |
| if (pstate->phs_lasj_has_null) |
| { |
| node->hs_hashkeys_null = true; |
| return; |
| } |
| |
| /* |
| * We're not yet attached to a batch. We all agree on the dimensions and |
| * number of inner tuples (for the empty table optimization). |
| */ |
| hashtable->curbatch = -1; |
| hashtable->nbuckets = pstate->nbuckets; |
| hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); |
| hashtable->totalTuples = pstate->total_tuples; |
| |
| /* |
| * Unless we're completely done and the batch state has been freed, make |
| * sure we have accessors. |
| */ |
| if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE) |
| ExecParallelHashEnsureBatchAccessors(hashtable); |
| |
| /* |
| * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE |
| * case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't |
| * there already). |
| */ |
| Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER || |
| BarrierPhase(build_barrier) == PHJ_BUILD_RUN || |
| BarrierPhase(build_barrier) == PHJ_BUILD_FREE); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecInitHash |
| * |
| * Init routine for Hash node |
| * ---------------------------------------------------------------- |
| */ |
| HashState * |
| ExecInitHash(Hash *node, EState *estate, int eflags) |
| { |
| HashState *hashstate; |
| |
| /* check for unsupported flags */ |
| Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
| |
| /* |
| * create state structure |
| */ |
| hashstate = makeNode(HashState); |
| hashstate->ps.plan = (Plan *) node; |
| hashstate->ps.state = estate; |
| hashstate->ps.ExecProcNode = ExecHash; |
| hashstate->hashtable = NULL; |
| hashstate->hashkeys = NIL; /* will be set by parent HashJoin */ |
| |
| hashstate->rfstate = NULL; |
| |
| /* |
| * Miscellaneous initialization |
| * |
| * create expression context for node |
| */ |
| ExecAssignExprContext(estate, &hashstate->ps); |
| |
| /* |
| * initialize child nodes |
| */ |
| outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags); |
| |
| /* |
| * initialize our result slot and type. No need to build projection |
| * because this node doesn't do projections. |
| */ |
| ExecInitResultTupleSlotTL(&hashstate->ps, &TTSOpsMinimalTuple); |
| hashstate->ps.ps_ProjInfo = NULL; |
| |
| /* |
| * initialize child expressions |
| */ |
| Assert(node->plan.qual == NIL); |
| hashstate->hashkeys = |
| ExecInitExprList(node->hashkeys, (PlanState *) hashstate); |
| |
| return hashstate; |
| } |
| |
| /* --------------------------------------------------------------- |
| * ExecEndHash |
| * |
| * clean up routine for Hash node |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecEndHash(HashState *node) |
| { |
| PlanState *outerPlan; |
| |
| /* |
| * free exprcontext |
| */ |
| ExecFreeExprContext(&node->ps); |
| |
| /* |
| * shut down the subplan |
| */ |
| outerPlan = outerPlanState(node); |
| ExecEndNode(outerPlan); |
| |
| if (node->filters) |
| FreeRuntimeFilter(node); |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecHashTableCreate |
| * |
| * create an empty hashtable data structure for hashjoin. |
| * ---------------------------------------------------------------- |
| */ |
| HashJoinTable |
| ExecHashTableCreate(HashState *state, HashJoinState *hjstate, |
| List *hashOperators, List *hashCollations, |
| bool keepNulls, uint64 operatorMemKB) |
| { |
| Hash *node; |
| HashJoinTable hashtable; |
| Plan *outerNode; |
| size_t space_allowed; |
| int nbuckets; |
| int nbatch; |
| double rows; |
| int num_skew_mcvs; |
| int log2_nbuckets; |
| int nkeys; |
| int i; |
| ListCell *ho; |
| ListCell *hc; |
| MemoryContext oldcxt; |
| |
| /* |
| * Get information about the size of the relation to be hashed (it's the |
| * "outer" subtree of this node, but the inner relation of the hashjoin). |
| * Compute the appropriate size of the hash table. |
| */ |
| node = (Hash *) state->ps.plan; |
| outerNode = outerPlan(node); |
| |
| /* |
| * If this is shared hash table with a partial plan, then we can't use |
| * outerNode->plan_rows to estimate its size. We need an estimate of the |
| * total number of rows across all copies of the partial plan. |
| */ |
| rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; |
| |
| ExecChooseHashTableSize(rows, outerNode->plan_width, |
| OidIsValid(node->skewTable), |
| operatorMemKB, |
| state->parallel_state != NULL, |
| state->parallel_state != NULL ? |
| state->parallel_state->nparticipants - 1 : 0, |
| &space_allowed, |
| &nbuckets, &nbatch, &num_skew_mcvs); |
| |
| /* nbuckets must be a power of 2 */ |
| log2_nbuckets = my_log2(nbuckets); |
| Assert(nbuckets == (1 << log2_nbuckets)); |
| |
| /* |
| * Initialize the hash table control block. |
| * |
| * The hashtable control block is just palloc'd from the executor's |
| * per-query memory context. Everything else should be kept inside the |
| * subsidiary hashCxt, batchCxt or spillCxt. |
| */ |
| hashtable = palloc_object(HashJoinTableData); |
| hashtable->nbuckets = nbuckets; |
| hashtable->nbuckets_original = nbuckets; |
| hashtable->nbuckets_optimal = nbuckets; |
| hashtable->log2_nbuckets = log2_nbuckets; |
| hashtable->log2_nbuckets_optimal = log2_nbuckets; |
| hashtable->buckets.unshared = NULL; |
| hashtable->keepNulls = keepNulls; |
| hashtable->skewEnabled = false; |
| hashtable->skewBucket = NULL; |
| hashtable->skewBucketLen = 0; |
| hashtable->nSkewBuckets = 0; |
| hashtable->skewBucketNums = NULL; |
| hashtable->nbatch = nbatch; |
| hashtable->curbatch = 0; |
| hashtable->nbatch_original = nbatch; |
| hashtable->nbatch_outstart = nbatch; |
| hashtable->growEnabled = true; |
| hashtable->totalTuples = 0; |
| hashtable->partialTuples = 0; |
| hashtable->skewTuples = 0; |
| hashtable->innerBatchFile = NULL; |
| hashtable->outerBatchFile = NULL; |
| hashtable->spaceUsed = 0; |
| hashtable->spacePeak = 0; |
| hashtable->spaceAllowed = space_allowed; |
| hashtable->spaceUsedSkew = 0; |
| hashtable->spaceAllowedSkew = |
| hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100; |
| hashtable->stats = NULL; |
| hashtable->eagerlyReleased = false; |
| hashtable->hjstate = hjstate; |
| hashtable->first_pass = true; |
| hashtable->work_set = NULL; |
| |
| hashtable->chunks = NULL; |
| hashtable->current_chunk = NULL; |
| hashtable->parallel_state = state->parallel_state; |
| hashtable->area = state->ps.state->es_query_dsa; |
| hashtable->batches = NULL; |
| |
| #ifdef HJDEBUG |
| printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", |
| hashtable, nbatch, nbuckets); |
| #endif |
| |
| /* |
| * Create temporary memory contexts in which to keep the hashtable working |
| * storage. See notes in executor/hashjoin.h. |
| */ |
| hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext, |
| "HashTableContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt, |
| "HashBatchContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* CDB: track temp buf file allocations in separate context */ |
| hashtable->bfCxt = AllocSetContextCreate(CurrentMemoryContext, |
| "hbbfcxt", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt, |
| "HashSpillContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* Allocate data that will live for the life of the hashjoin */ |
| |
| oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
| |
| /* |
| * Get info about the hash functions to be used for each hash key. Also |
| * remember whether the join operators are strict. |
| */ |
| nkeys = list_length(hashOperators); |
| hashtable->outer_hashfunctions = palloc_array(FmgrInfo, nkeys); |
| hashtable->inner_hashfunctions = palloc_array(FmgrInfo, nkeys); |
| hashtable->hashStrict = palloc_array(bool, nkeys); |
| hashtable->collations = palloc_array(Oid, nkeys); |
| i = 0; |
| forboth(ho, hashOperators, hc, hashCollations) |
| { |
| Oid hashop = lfirst_oid(ho); |
| Oid left_hashfn; |
| Oid right_hashfn; |
| |
| if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) |
| elog(ERROR, "could not find hash function for hash operator %u", |
| hashop); |
| fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]); |
| fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]); |
| hashtable->hashStrict[i] = op_strict(hashop); |
| hashtable->collations[i] = lfirst_oid(hc); |
| i++; |
| } |
| |
| if (nbatch > 1 && hashtable->parallel_state == NULL) |
| { |
| MemoryContext oldctx; |
| |
| /* |
| * allocate and initialize the file arrays in hashCxt (not needed for |
| * parallel case which uses shared tuplestores instead of raw files) |
| */ |
| oldctx = MemoryContextSwitchTo(hashtable->spillCxt); |
| |
| hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); |
| hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); |
| |
| MemoryContextSwitchTo(oldctx); |
| |
| /* The files will not be opened until needed... */ |
| /* ... but make sure we have temp tablespaces established for them */ |
| PrepareTempTablespaces(); |
| } |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| if (hashtable->parallel_state) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| Barrier *build_barrier; |
| Barrier *sync_barrier; |
| |
| /* |
| * Attach to the build barrier. The corresponding detach operation is |
| * in ExecHashTableDetach. Note that we won't attach to the |
| * batch_barrier for batch 0 yet. We'll attach later and start it out |
| * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and |
| * then loaded while hashing (the standard hybrid hash join |
| * algorithm), and we'll coordinate that using build_barrier. |
| */ |
| build_barrier = &pstate->build_barrier; |
| sync_barrier = &pstate->sync_barrier; |
| |
| BarrierAttach(build_barrier); |
| |
| if (((Hash *) state->ps.plan)->sync_barrier) |
| BarrierArriveAndWait(sync_barrier, WAIT_EVENT_PARALLEL_FINISH); |
| |
| /* |
| * So far we have no idea whether there are any other participants, |
| * and if so, what phase they are working on. The only thing we care |
| * about at this point is whether someone has already created the |
| * SharedHashJoinBatch objects and the hash table for batch 0. One |
| * backend will be elected to do that now if necessary. |
| */ |
| if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT && |
| BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT)) |
| { |
| pstate->nbatch = nbatch; |
| pstate->space_allowed = space_allowed; |
| pstate->growth = PHJ_GROWTH_OK; |
| |
| /* Set up the shared state for coordinating batches. */ |
| ExecParallelHashJoinSetUpBatches(hashtable, nbatch); |
| |
| /* |
| * Allocate batch 0's hash table up front so we can load it |
| * directly while hashing. |
| */ |
| pstate->nbuckets = nbuckets; |
| ExecParallelHashTableAlloc(hashtable, 0); |
| } |
| |
| /* |
| * The next Parallel Hash synchronization point is in |
| * MultiExecParallelHash(), which will progress it all the way to |
| * PHJ_BUILD_RUN. The caller must not return control from this |
| * executor node between now and then. |
| */ |
| } |
| else |
| { |
| /* |
| * Prepare context for the first-scan space allocations; allocate the |
| * hashbucket array therein, and set each bucket "empty". |
| */ |
| MemoryContextSwitchTo(hashtable->batchCxt); |
| |
| hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets); |
| |
| /* |
| * Set up for skew optimization, if possible and there's a need for |
| * more than one batch. (In a one-batch join, there's no point in |
| * it.) |
| */ |
| if (nbatch > 1) |
| ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| return hashtable; |
| } |
| |
| |
| /* |
| * Compute appropriate size for hashtable given the estimated size of the |
| * relation to be hashed (number of rows and average row width). |
| * |
| * This is exported so that the planner's costsize.c can use it. |
| */ |
| |
| /* Target bucket loading (tuples per bucket) */ |
| /* |
| * CDB: we now use gp_hashjoin_tuples_per_bucket |
| * #define NTUP_PER_BUCKET 1 |
| */ |
| |
| void |
| ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, |
| uint64 operatorMemKB, |
| bool try_combined_hash_mem, |
| int parallel_workers, |
| size_t *space_allowed, |
| int *numbuckets, |
| int *numbatches, |
| int *num_skew_mcvs) |
| { |
| int tupsize; |
| double inner_rel_bytes; |
| size_t hash_table_bytes; |
| size_t bucket_bytes; |
| size_t max_pointers; |
| int nbatch = 1; |
| int nbuckets; |
| double dbuckets; |
| |
| /* Force a plausible relation size if no info */ |
| if (ntuples <= 0.0) |
| ntuples = 1000.0; |
| |
| /* |
| * Estimate tupsize based on footprint of tuple in hashtable... note this |
| * does not allow for any palloc overhead. The manipulations of spaceUsed |
| * don't count palloc overhead either. |
| */ |
| tupsize = ExecHashRowSize(tupwidth); |
| inner_rel_bytes = ntuples * tupsize; |
| |
| /* |
| * Compute in-memory hashtable size limit from GUCs. |
| */ |
| hash_table_bytes = operatorMemKB * 1024L; |
| |
| /* |
| * Parallel Hash tries to use the combined hash_mem of all workers to |
| * avoid the need to batch. If that won't work, it falls back to hash_mem |
| * per worker and tries to process batches in parallel. |
| */ |
| if (try_combined_hash_mem && parallel_workers > 0) |
| { |
| /* Careful, this could overflow size_t */ |
| double newlimit; |
| |
| /* CBDB_PARALLEL_FIXME: if we enable pg style parallel some day, we should reconsider it. */ |
| newlimit = (double) hash_table_bytes * (double) parallel_workers; |
| newlimit = Min(newlimit, (double) SIZE_MAX); |
| hash_table_bytes = (size_t) newlimit; |
| } |
| |
| *space_allowed = hash_table_bytes; |
| |
| /* |
| * If skew optimization is possible, estimate the number of skew buckets |
| * that will fit in the memory allowed, and decrement the assumed space |
| * available for the main hash table accordingly. |
| * |
| * We make the optimistic assumption that each skew bucket will contain |
| * one inner-relation tuple. If that turns out to be low, we will recover |
| * at runtime by reducing the number of skew buckets. |
| * |
| * hashtable->skewBucket will have up to 8 times as many HashSkewBucket |
| * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash |
| * will round up to the next power of 2 and then multiply by 4 to reduce |
| * collisions. |
| */ |
| if (useskew) |
| { |
| size_t bytes_per_mcv; |
| size_t skew_mcvs; |
| |
| /*---------- |
| * Compute number of MCVs we could hold in hash_table_bytes |
| * |
| * Divisor is: |
| * size of a hash tuple + |
| * worst-case size of skewBucket[] per MCV + |
| * size of skewBucketNums[] entry + |
| * size of skew bucket struct itself |
| *---------- |
| */ |
| bytes_per_mcv = tupsize + |
| (8 * sizeof(HashSkewBucket *)) + |
| sizeof(int) + |
| SKEW_BUCKET_OVERHEAD; |
| skew_mcvs = hash_table_bytes / bytes_per_mcv; |
| |
| /* |
| * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as |
| * not to worry about size_t overflow in the multiplication) |
| */ |
| skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100; |
| |
| /* Now clamp to integer range */ |
| skew_mcvs = Min(skew_mcvs, INT_MAX); |
| |
| *num_skew_mcvs = (int) skew_mcvs; |
| |
| /* Reduce hash_table_bytes by the amount needed for the skew table */ |
| if (skew_mcvs > 0) |
| hash_table_bytes -= skew_mcvs * bytes_per_mcv; |
| } |
| else |
| *num_skew_mcvs = 0; |
| |
| /* |
| * Set nbuckets to achieve an average bucket load of gp_hashjoin_tuples_per_bucket when |
| * memory is filled, assuming a single batch; but limit the value so that |
| * the pointer arrays we'll try to allocate do not exceed hash_table_bytes |
| * nor MaxAllocSize. |
| * |
| * Note that both nbuckets and nbatch must be powers of 2 to make |
| * ExecHashGetBucketAndBatch fast. |
| */ |
| max_pointers = hash_table_bytes / sizeof(HashJoinTuple); |
| max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); |
| /* If max_pointers isn't a power of 2, must round it down to one */ |
| max_pointers = pg_prevpower2_size_t(max_pointers); |
| |
| /* Also ensure we avoid integer overflow in nbatch and nbuckets */ |
| /* (this step is redundant given the current value of MaxAllocSize) */ |
| max_pointers = Min(max_pointers, INT_MAX / 2 + 1); |
| |
| dbuckets = ceil(ntuples / gp_hashjoin_tuples_per_bucket); |
| dbuckets = Min(dbuckets, max_pointers); |
| nbuckets = (int) dbuckets; |
| /* don't let nbuckets be really small, though ... */ |
| nbuckets = Max(nbuckets, 1024); |
| /* ... and force it to be a power of 2. */ |
| nbuckets = pg_nextpower2_32(nbuckets); |
| |
| /* |
| * If there's not enough space to store the projected number of tuples and |
| * the required bucket headers, we will need multiple batches. |
| */ |
| bucket_bytes = sizeof(HashJoinTuple) * nbuckets; |
| if (inner_rel_bytes + bucket_bytes > hash_table_bytes) |
| { |
| /* We'll need multiple batches */ |
| size_t sbuckets; |
| double dbatch; |
| int minbatch; |
| size_t bucket_size; |
| |
| /* |
| * If Parallel Hash with combined hash_mem would still need multiple |
| * batches, we'll have to fall back to regular hash_mem budget. |
| */ |
| if (try_combined_hash_mem) |
| { |
| ExecChooseHashTableSize(ntuples, tupwidth, useskew, |
| operatorMemKB, |
| false, parallel_workers, |
| space_allowed, |
| numbuckets, |
| numbatches, |
| num_skew_mcvs); |
| return; |
| } |
| |
| /* |
| * Estimate the number of buckets we'll want to have when hash_mem is |
| * entirely full. Each bucket will contain a bucket pointer plus |
| * gp_hashjoin_tuples_per_bucket tuples, whose projected size already includes |
| * overhead for the hash code, pointer to the next tuple, etc. |
| */ |
| bucket_size = (tupsize * gp_hashjoin_tuples_per_bucket + sizeof(HashJoinTuple)); |
| if (hash_table_bytes < bucket_size) |
| sbuckets = 1; |
| else |
| sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size); |
| sbuckets = Min(sbuckets, max_pointers); |
| nbuckets = (int) sbuckets; |
| nbuckets = pg_nextpower2_32(nbuckets); |
| bucket_bytes = nbuckets * sizeof(HashJoinTuple); |
| |
| /* |
| * Buckets are simple pointers to hashjoin tuples, while tupsize |
| * includes the pointer, hash code, and MinimalTupleData. So buckets |
| * should never really exceed 25% of hash_mem (even for |
| * gp_hashjoin_tuples_per_bucket=1); except maybe for hash_mem values that are not |
| * 2^N bytes, where we might get more because of doubling. So let's |
| * look for 50% here. |
| */ |
| Assert(bucket_bytes <= hash_table_bytes / 2); |
| |
| /* Calculate required number of batches. */ |
| dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes)); |
| dbatch = Min(dbatch, max_pointers); |
| minbatch = (int) dbatch; |
| nbatch = pg_nextpower2_32(Max(2, minbatch)); |
| |
| /* |
| * Check to see if we're capping the number of workfiles we allow per |
| * query |
| */ |
| if (gp_workfile_limit_files_per_query > 0) |
| { |
| int nbatch_lower = nbatch; |
| |
| /* |
| * We create two files per batch during spilling - one for outer |
| * and one of inner side. Lower the nbatch if necessary to fit |
| * under that limit. Don't go below two batches, because in that |
| * case we're basically disabling spilling. |
| */ |
| while ((nbatch_lower * 2 > gp_workfile_limit_files_per_query) && (nbatch_lower > 2)) |
| { |
| nbatch_lower >>= 1; |
| } |
| |
| Assert(nbatch_lower <= nbatch); |
| if (nbatch_lower != nbatch) |
| { |
| /* |
| * ExecChooseHashTableSize() is a hot function which is not only called by executor, |
| * but also by planner. Planner will call this function when calcualting cost for |
| * each join path. The number of join path grow exponentially with the number of |
| * table. As a result, do not using elog(LOG) to avoid generating too many logs. |
| */ |
| elog(DEBUG1, "HashJoin: Too many batches computed: nbatch=%d. gp_workfile_limit_files_per_query=%d, using nbatch=%d instead", |
| nbatch, gp_workfile_limit_files_per_query, nbatch_lower); |
| nbatch = nbatch_lower; |
| } |
| } |
| } |
| else |
| { |
| /* We expect the hashtable to fit in memory, we want to use |
| * more buckets if we have memory to spare */ |
| double dbuckets_lower; |
| double dbuckets_upper; |
| |
| /* divide our tuple row-count estimate by our the number of |
| * tuples we'd like in a bucket: this produces a small bucket |
| * count independent of our work_mem setting */ |
| dbuckets_lower = (double)ntuples / (double)gp_hashjoin_tuples_per_bucket; |
| |
| /* if we have work_mem to spare, we'd like to use it -- so |
| * divide up our memory evenly (see the spill case above) */ |
| dbuckets_upper = (double)hash_table_bytes / ((double)tupsize * gp_hashjoin_tuples_per_bucket); |
| |
| /* we'll use our "lower" work_mem independent guess as a lower |
| * limit; but if we've got memory to spare we'll take the mean |
| * of the lower-limit and the upper-limit */ |
| if (dbuckets_upper > dbuckets_lower) |
| dbuckets = (dbuckets_lower + dbuckets_upper)/2.0; |
| else |
| dbuckets = dbuckets_lower; |
| |
| dbuckets = ceil(dbuckets); |
| dbuckets = Min(dbuckets, max_pointers); |
| |
| /* |
| * Both nbuckets and nbatch must be powers of 2 to make |
| * ExecHashGetBucketAndBatch fast. We already fixed nbatch; now inflate |
| * nbuckets to the next larger power of 2. We also force nbuckets to not |
| * be real small, by starting the search at 2^10. (Note: above we made |
| * sure that nbuckets is not more than INT_MAX / 2, so this loop cannot |
| * overflow, nor can the final shift to recalculate nbuckets.) |
| */ |
| nbuckets = Max((int) dbuckets, 1024); |
| nbuckets = 1 << my_log2(nbuckets); |
| |
| nbatch = 1; |
| } |
| |
| Assert(nbuckets > 0); |
| Assert(nbatch > 0); |
| |
| Assert(nbuckets > 0); |
| Assert(nbatch > 0); |
| |
| *numbuckets = nbuckets; |
| *numbatches = nbatch; |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecHashTableDestroy |
| * |
| * destroy a hash table |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecHashTableDestroy(HashState *hashState, HashJoinTable hashtable) |
| { |
| int i; |
| |
| Assert(hashtable); |
| Assert(!hashtable->eagerlyReleased); |
| |
| /* |
| * Make sure all the temp files are closed. |
| * GPDB supports rescan of hashjoin, the batch0 can still have temp files. |
| */ |
| if (hashtable->innerBatchFile != NULL) |
| { |
| for (i = 0; i < hashtable->nbatch; i++) |
| { |
| if (hashtable->innerBatchFile[i]) |
| BufFileClose(hashtable->innerBatchFile[i]); |
| if (hashtable->outerBatchFile[i]) |
| BufFileClose(hashtable->outerBatchFile[i]); |
| hashtable->innerBatchFile[i] = NULL; |
| hashtable->outerBatchFile[i] = NULL; |
| } |
| } |
| |
| if (hashtable->work_set != NULL) |
| { |
| workfile_mgr_close_set(hashtable->work_set); |
| hashtable->work_set = NULL; |
| } |
| |
| /* Release working memory (batchCxt is a child, so it goes away too) */ |
| MemoryContextDelete(hashtable->hashCxt); |
| |
| /* |
| * If HashJoin find that the tuple it will return is NULL, it may squelch itself and its children. |
| * But there are some statistics(e.g. nbuckets, nbatch) maintained by the hash table being required during an explain analyze. |
| * Squelch HashJoin will call this function so that we can't simply call pfree and set hash table to NULL here, |
| * otherwise the statistics will be lost. |
| */ |
| } |
| |
| /* |
| * ExecHashIncreaseNumBatches |
| * increase the original number of batches in order to reduce |
| * current memory consumption |
| */ |
| static void |
| ExecHashIncreaseNumBatches(HashJoinTable hashtable) |
| { |
| int oldnbatch = hashtable->nbatch; |
| int curbatch = hashtable->curbatch; |
| int nbatch; |
| long ninmemory; |
| long nfreed; |
| HashMemoryChunk oldchunks; |
| Size spaceUsedBefore = hashtable->spaceUsed; |
| Size spaceFreed = 0; |
| HashJoinTableStats *stats = hashtable->stats; |
| |
| /* do nothing if we've decided to shut off growth */ |
| if (!hashtable->growEnabled) |
| return; |
| |
| /* safety check to avoid overflow */ |
| if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2))) |
| return; |
| |
| /* A reusable hash table can only respill during first pass */ |
| AssertImply(hashtable->hjstate->reuse_hashtable, hashtable->first_pass); |
| |
| nbatch = oldnbatch * 2; |
| Assert(nbatch > 1); |
| |
| #ifdef HJDEBUG |
| printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n", |
| hashtable, nbatch, hashtable->spaceUsed); |
| #endif |
| |
| if (hashtable->innerBatchFile == NULL) |
| { |
| MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); |
| |
| /* we had no file arrays before */ |
| hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); |
| hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| /* time to establish the temp tablespaces, too */ |
| PrepareTempTablespaces(); |
| } |
| else |
| { |
| /* enlarge arrays and zero out added entries */ |
| hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch); |
| hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch); |
| } |
| |
| /* EXPLAIN ANALYZE batch statistics */ |
| if (stats && stats->nbatchstats < nbatch) |
| { |
| Size sz = nbatch * sizeof(stats->batchstats[0]); |
| |
| stats->batchstats = |
| (HashJoinBatchStats *) repalloc(stats->batchstats, sz); |
| sz = (nbatch - stats->nbatchstats) * sizeof(stats->batchstats[0]); |
| memset(stats->batchstats + stats->nbatchstats, 0, sz); |
| stats->nbatchstats = nbatch; |
| } |
| hashtable->nbatch = nbatch; |
| |
| /* |
| * Scan through the existing hash table entries and dump out any that are |
| * no longer of the current batch. |
| */ |
| ninmemory = nfreed = 0; |
| |
| /* If know we need to resize nbuckets, we can do it while rebatching. */ |
| if (hashtable->nbuckets_optimal != hashtable->nbuckets) |
| { |
| /* we never decrease the number of buckets */ |
| Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); |
| |
| hashtable->nbuckets = hashtable->nbuckets_optimal; |
| hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; |
| |
| hashtable->buckets.unshared = |
| repalloc_array(hashtable->buckets.unshared, |
| HashJoinTuple, hashtable->nbuckets); |
| } |
| |
| /* |
| * We will scan through the chunks directly, so that we can reset the |
| * buckets now and not have to keep track which tuples in the buckets have |
| * already been processed. We will free the old chunks as we go. |
| */ |
| memset(hashtable->buckets.unshared, 0, |
| sizeof(HashJoinTuple) * hashtable->nbuckets); |
| oldchunks = hashtable->chunks; |
| hashtable->chunks = NULL; |
| |
| /* so, let's scan through the old chunks, and all tuples in each chunk */ |
| while (oldchunks != NULL) |
| { |
| HashMemoryChunk nextchunk = oldchunks->next.unshared; |
| |
| /* position within the buffer (up to oldchunks->used) */ |
| size_t idx = 0; |
| |
| /* process all tuples stored in this chunk (and then free it) */ |
| while (idx < oldchunks->used) |
| { |
| HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx); |
| MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); |
| int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); |
| int bucketno; |
| int batchno; |
| |
| ninmemory++; |
| ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| &bucketno, &batchno); |
| |
| if (batchno == curbatch) |
| { |
| /* keep tuple in memory - copy it into the new chunk */ |
| HashJoinTuple copyTuple; |
| |
| copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); |
| memcpy(copyTuple, hashTuple, hashTupleSize); |
| |
| /* and add it back to the appropriate bucket */ |
| copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| hashtable->buckets.unshared[bucketno] = copyTuple; |
| } |
| else |
| { |
| /* dump it out */ |
| Assert(batchno > curbatch); |
| ExecHashJoinSaveTuple(NULL, |
| HJTUPLE_MINTUPLE(hashTuple), |
| hashTuple->hashvalue, |
| hashtable, |
| &hashtable->innerBatchFile[batchno], |
| hashtable->bfCxt); |
| |
| hashtable->spaceUsed -= hashTupleSize; |
| spaceFreed += hashTupleSize; |
| if (stats) |
| stats->batchstats[batchno].spillspace_in += hashTupleSize; |
| |
| nfreed++; |
| } |
| |
| /* next tuple in this chunk */ |
| idx += MAXALIGN(hashTupleSize); |
| |
| /* allow this loop to be cancellable */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| /* we're done with this chunk - free it and proceed to the next one */ |
| pfree(oldchunks); |
| oldchunks = nextchunk; |
| } |
| |
| #ifdef HJDEBUG |
| printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n", |
| hashtable, nfreed, ninmemory, hashtable->spaceUsed); |
| #endif |
| |
| /* Update work_mem high-water mark and amount spilled. */ |
| if (stats) |
| { |
| stats->workmem_max = Max(stats->workmem_max, spaceUsedBefore); |
| stats->batchstats[curbatch].spillspace_out += spaceFreed; |
| stats->batchstats[curbatch].spillrows_out += nfreed; |
| } |
| |
| /* |
| * If we dumped out either all or none of the tuples in the table, disable |
| * further expansion of nbatch. This situation implies that we have |
| * enough tuples of identical hashvalues to overflow spaceAllowed. |
| * Increasing nbatch will not fix it since there's no way to subdivide the |
| * group any more finely. We have to just gut it out and hope the server |
| * has enough RAM. |
| */ |
| if (nfreed == 0 || nfreed == ninmemory) |
| { |
| hashtable->growEnabled = false; |
| #ifdef HJDEBUG |
| printf("Hashjoin %p: disabling further increase of nbatch\n", |
| hashtable); |
| #endif |
| } |
| } |
| |
| /* |
| * ExecParallelHashIncreaseNumBatches |
| * Every participant attached to grow_batches_barrier must run this |
| * function when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. |
| */ |
| static void |
| ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| |
| Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER); |
| |
| /* |
| * It's unlikely, but we need to be prepared for new participants to show |
| * up while we're in the middle of this operation so we need to switch on |
| * barrier phase here. |
| */ |
| switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) |
| { |
| case PHJ_GROW_BATCHES_ELECT: |
| |
| /* |
| * Elect one participant to prepare to grow the number of batches. |
| * This involves reallocating or resetting the buckets of batch 0 |
| * in preparation for all participants to begin repartitioning the |
| * tuples. |
| */ |
| if (BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| WAIT_EVENT_HASH_GROW_BATCHES_ELECT)) |
| { |
| dsa_pointer_atomic *buckets; |
| ParallelHashJoinBatch *old_batch0; |
| int new_nbatch; |
| int i; |
| |
| /* Move the old batch out of the way. */ |
| old_batch0 = hashtable->batches[0].shared; |
| pstate->old_batches = pstate->batches; |
| pstate->old_nbatch = hashtable->nbatch; |
| pstate->batches = InvalidDsaPointer; |
| |
| /* Free this backend's old accessors. */ |
| ExecParallelHashCloseBatchAccessors(hashtable); |
| |
| /* Figure out how many batches to use. */ |
| if (hashtable->nbatch == 1) |
| { |
| /* |
| * We are going from single-batch to multi-batch. We need |
| * to switch from one large combined memory budget to the |
| * regular hash_mem budget. |
| */ |
| pstate->space_allowed = get_hash_memory_limit(); |
| |
| /* |
| * The combined hash_mem of all participants wasn't |
| * enough. Therefore one batch per participant would be |
| * approximately equivalent and would probably also be |
| * insufficient. So try two batches per participant, |
| * rounded up to a power of two. |
| */ |
| new_nbatch = pg_nextpower2_32(pstate->nparticipants * 2); |
| } |
| else |
| { |
| /* |
| * We were already multi-batched. Try doubling the number |
| * of batches. |
| */ |
| new_nbatch = hashtable->nbatch * 2; |
| } |
| |
| /* Allocate new larger generation of batches. */ |
| Assert(hashtable->nbatch == pstate->nbatch); |
| ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); |
| Assert(hashtable->nbatch == pstate->nbatch); |
| |
| /* Replace or recycle batch 0's bucket array. */ |
| if (pstate->old_nbatch == 1) |
| { |
| double dtuples; |
| double dbuckets; |
| int new_nbuckets; |
| |
| /* |
| * We probably also need a smaller bucket array. How many |
| * tuples do we expect per batch, assuming we have only |
| * half of them so far? Normally we don't need to change |
| * the bucket array's size, because the size of each batch |
| * stays the same as we add more batches, but in this |
| * special case we move from a large batch to many smaller |
| * batches and it would be wasteful to keep the large |
| * array. |
| */ |
| dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; |
| dbuckets = ceil(dtuples / gp_hashjoin_tuples_per_bucket); |
| dbuckets = Min(dbuckets, |
| MaxAllocSize / sizeof(dsa_pointer_atomic)); |
| new_nbuckets = (int) dbuckets; |
| new_nbuckets = Max(new_nbuckets, 1024); |
| new_nbuckets = pg_nextpower2_32(new_nbuckets); |
| dsa_free(hashtable->area, old_batch0->buckets); |
| hashtable->batches[0].shared->buckets = |
| dsa_allocate(hashtable->area, |
| sizeof(dsa_pointer_atomic) * new_nbuckets); |
| buckets = (dsa_pointer_atomic *) |
| dsa_get_address(hashtable->area, |
| hashtable->batches[0].shared->buckets); |
| for (i = 0; i < new_nbuckets; ++i) |
| dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
| pstate->nbuckets = new_nbuckets; |
| } |
| else |
| { |
| /* Recycle the existing bucket array. */ |
| hashtable->batches[0].shared->buckets = old_batch0->buckets; |
| buckets = (dsa_pointer_atomic *) |
| dsa_get_address(hashtable->area, old_batch0->buckets); |
| for (i = 0; i < hashtable->nbuckets; ++i) |
| dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); |
| } |
| |
| /* Move all chunks to the work queue for parallel processing. */ |
| pstate->chunk_work_queue = old_batch0->chunks; |
| |
| /* Disable further growth temporarily while we're growing. */ |
| pstate->growth = PHJ_GROWTH_DISABLED; |
| } |
| else |
| { |
| /* All other participants just flush their tuples to disk. */ |
| ExecParallelHashCloseBatchAccessors(hashtable); |
| } |
| /* Fall through. */ |
| |
| case PHJ_GROW_BATCHES_REALLOCATE: |
| /* Wait for the above to be finished. */ |
| BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE); |
| /* Fall through. */ |
| |
| case PHJ_GROW_BATCHES_REPARTITION: |
| /* Make sure that we have the current dimensions and buckets. */ |
| ExecParallelHashEnsureBatchAccessors(hashtable); |
| ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| /* Then partition, flush counters. */ |
| ExecParallelHashRepartitionFirst(hashtable); |
| ExecParallelHashRepartitionRest(hashtable); |
| ExecParallelHashMergeCounters(hashtable); |
| /* Wait for the above to be finished. */ |
| BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION); |
| /* Fall through. */ |
| |
| case PHJ_GROW_BATCHES_DECIDE: |
| |
| /* |
| * Elect one participant to clean up and decide whether further |
| * repartitioning is needed, or should be disabled because it's |
| * not helping. |
| */ |
| if (BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| WAIT_EVENT_HASH_GROW_BATCHES_DECIDE)) |
| { |
| ParallelHashJoinBatch *old_batches; |
| bool space_exhausted = false; |
| bool extreme_skew_detected = false; |
| |
| /* Make sure that we have the current dimensions and buckets. */ |
| ExecParallelHashEnsureBatchAccessors(hashtable); |
| ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| |
| old_batches = dsa_get_address(hashtable->area, pstate->old_batches); |
| |
| /* Are any of the new generation of batches exhausted? */ |
| for (int i = 0; i < hashtable->nbatch; ++i) |
| { |
| ParallelHashJoinBatch *batch; |
| ParallelHashJoinBatch *old_batch; |
| int parent; |
| |
| batch = hashtable->batches[i].shared; |
| if (batch->space_exhausted || |
| batch->estimated_size > pstate->space_allowed) |
| space_exhausted = true; |
| |
| parent = i % pstate->old_nbatch; |
| old_batch = NthParallelHashJoinBatch(old_batches, parent); |
| if (old_batch->space_exhausted || |
| batch->estimated_size > pstate->space_allowed) |
| { |
| /* |
| * Did this batch receive ALL of the tuples from its |
| * parent batch? That would indicate that further |
| * repartitioning isn't going to help (the hash values |
| * are probably all the same). |
| */ |
| if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) |
| extreme_skew_detected = true; |
| } |
| } |
| |
| /* Don't keep growing if it's not helping or we'd overflow. */ |
| if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) |
| pstate->growth = PHJ_GROWTH_DISABLED; |
| else if (space_exhausted) |
| pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
| else |
| pstate->growth = PHJ_GROWTH_OK; |
| |
| /* Free the old batches in shared memory. */ |
| dsa_free(hashtable->area, pstate->old_batches); |
| pstate->old_batches = InvalidDsaPointer; |
| } |
| /* Fall through. */ |
| |
| case PHJ_GROW_BATCHES_FINISH: |
| /* Wait for the above to complete. */ |
| BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| WAIT_EVENT_HASH_GROW_BATCHES_FINISH); |
| } |
| } |
| |
| /* |
| * Repartition the tuples currently loaded into memory for inner batch 0 |
| * because the number of batches has been increased. Some tuples are retained |
| * in memory and some are written out to a later batch. |
| */ |
| static void |
| ExecParallelHashRepartitionFirst(HashJoinTable hashtable) |
| { |
| dsa_pointer chunk_shared; |
| HashMemoryChunk chunk; |
| |
| Assert(hashtable->nbatch == hashtable->parallel_state->nbatch); |
| |
| while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) |
| { |
| size_t idx = 0; |
| |
| /* Repartition all tuples in this chunk. */ |
| while (idx < chunk->used) |
| { |
| HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
| MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); |
| HashJoinTuple copyTuple; |
| dsa_pointer shared; |
| int bucketno; |
| int batchno; |
| |
| ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| &bucketno, &batchno); |
| |
| Assert(batchno < hashtable->nbatch); |
| if (batchno == 0) |
| { |
| /* It still belongs in batch 0. Copy to a new chunk. */ |
| copyTuple = |
| ExecParallelHashTupleAlloc(hashtable, |
| HJTUPLE_OVERHEAD + tuple->t_len, |
| &shared); |
| copyTuple->hashvalue = hashTuple->hashvalue; |
| memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); |
| ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| copyTuple, shared); |
| } |
| else |
| { |
| size_t tuple_size = |
| MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
| |
| /* It belongs in a later batch. */ |
| hashtable->batches[batchno].estimated_size += tuple_size; |
| sts_puttuple(hashtable->batches[batchno].inner_tuples, |
| &hashTuple->hashvalue, tuple); |
| } |
| |
| /* Count this tuple. */ |
| ++hashtable->batches[0].old_ntuples; |
| ++hashtable->batches[batchno].ntuples; |
| |
| idx += MAXALIGN(HJTUPLE_OVERHEAD + |
| HJTUPLE_MINTUPLE(hashTuple)->t_len); |
| } |
| |
| /* Free this chunk. */ |
| dsa_free(hashtable->area, chunk_shared); |
| |
| CHECK_FOR_INTERRUPTS(); |
| } |
| } |
| |
| /* |
| * Help repartition inner batches 1..n. |
| */ |
| static void |
| ExecParallelHashRepartitionRest(HashJoinTable hashtable) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| int old_nbatch = pstate->old_nbatch; |
| SharedTuplestoreAccessor **old_inner_tuples; |
| ParallelHashJoinBatch *old_batches; |
| int i; |
| |
| /* Get our hands on the previous generation of batches. */ |
| old_batches = (ParallelHashJoinBatch *) |
| dsa_get_address(hashtable->area, pstate->old_batches); |
| old_inner_tuples = palloc0_array(SharedTuplestoreAccessor *, old_nbatch); |
| for (i = 1; i < old_nbatch; ++i) |
| { |
| ParallelHashJoinBatch *shared = |
| NthParallelHashJoinBatch(old_batches, i); |
| |
| old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), |
| hashtable->hjstate->worker_id, |
| &pstate->fileset); |
| } |
| |
| /* Join in the effort to repartition them. */ |
| for (i = 1; i < old_nbatch; ++i) |
| { |
| MinimalTuple tuple; |
| uint32 hashvalue; |
| |
| /* Scan one partition from the previous generation. */ |
| sts_begin_parallel_scan(old_inner_tuples[i]); |
| while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) |
| { |
| size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
| int bucketno; |
| int batchno; |
| |
| /* Decide which partition it goes to in the new generation. */ |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, |
| &batchno); |
| |
| hashtable->batches[batchno].estimated_size += tuple_size; |
| ++hashtable->batches[batchno].ntuples; |
| ++hashtable->batches[i].old_ntuples; |
| |
| /* Store the tuple its new batch. */ |
| sts_puttuple(hashtable->batches[batchno].inner_tuples, |
| &hashvalue, tuple); |
| |
| CHECK_FOR_INTERRUPTS(); |
| } |
| sts_end_parallel_scan(old_inner_tuples[i]); |
| } |
| |
| pfree(old_inner_tuples); |
| } |
| |
| /* |
| * Transfer the backend-local per-batch counters to the shared totals. |
| */ |
| static void |
| ExecParallelHashMergeCounters(HashJoinTable hashtable) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| int i; |
| |
| LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| pstate->total_tuples = 0; |
| for (i = 0; i < hashtable->nbatch; ++i) |
| { |
| ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; |
| |
| batch->shared->size += batch->size; |
| batch->shared->estimated_size += batch->estimated_size; |
| batch->shared->ntuples += batch->ntuples; |
| batch->shared->old_ntuples += batch->old_ntuples; |
| batch->size = 0; |
| batch->estimated_size = 0; |
| batch->ntuples = 0; |
| batch->old_ntuples = 0; |
| pstate->total_tuples += batch->shared->ntuples; |
| } |
| LWLockRelease(&pstate->lock); |
| } |
| |
| /* |
| * ExecHashIncreaseNumBuckets |
| * increase the original number of buckets in order to reduce |
| * number of tuples per bucket |
| */ |
| static void |
| ExecHashIncreaseNumBuckets(HashJoinTable hashtable) |
| { |
| HashMemoryChunk chunk; |
| |
| /* do nothing if not an increase (it's called increase for a reason) */ |
| if (hashtable->nbuckets >= hashtable->nbuckets_optimal) |
| return; |
| |
| #ifdef HJDEBUG |
| printf("Hashjoin %p: increasing nbuckets %d => %d\n", |
| hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal); |
| #endif |
| |
| hashtable->nbuckets = hashtable->nbuckets_optimal; |
| hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; |
| |
| Assert(hashtable->nbuckets > 1); |
| Assert(hashtable->nbuckets <= (INT_MAX / 2)); |
| Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); |
| |
| /* |
| * Just reallocate the proper number of buckets - we don't need to walk |
| * through them - we can walk the dense-allocated chunks (just like in |
| * ExecHashIncreaseNumBatches, but without all the copying into new |
| * chunks) |
| */ |
| hashtable->buckets.unshared = |
| repalloc_array(hashtable->buckets.unshared, |
| HashJoinTuple, hashtable->nbuckets); |
| |
| memset(hashtable->buckets.unshared, 0, |
| hashtable->nbuckets * sizeof(HashJoinTuple)); |
| |
| /* scan through all tuples in all chunks to rebuild the hash table */ |
| for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) |
| { |
| /* process all tuples stored in this chunk */ |
| size_t idx = 0; |
| |
| while (idx < chunk->used) |
| { |
| HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
| int bucketno; |
| int batchno; |
| |
| ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| &bucketno, &batchno); |
| |
| /* add the tuple to the proper bucket */ |
| hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| hashtable->buckets.unshared[bucketno] = hashTuple; |
| |
| /* advance index past the tuple */ |
| idx += MAXALIGN(HJTUPLE_OVERHEAD + |
| HJTUPLE_MINTUPLE(hashTuple)->t_len); |
| } |
| |
| /* allow this loop to be cancellable */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| } |
| |
| static void |
| ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| int i; |
| HashMemoryChunk chunk; |
| dsa_pointer chunk_s; |
| |
| Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER); |
| |
| /* |
| * It's unlikely, but we need to be prepared for new participants to show |
| * up while we're in the middle of this operation so we need to switch on |
| * barrier phase here. |
| */ |
| switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) |
| { |
| case PHJ_GROW_BUCKETS_ELECT: |
| /* Elect one participant to prepare to increase nbuckets. */ |
| if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
| WAIT_EVENT_HASH_GROW_BUCKETS_ELECT)) |
| { |
| size_t size; |
| dsa_pointer_atomic *buckets; |
| |
| /* Double the size of the bucket array. */ |
| pstate->nbuckets *= 2; |
| size = pstate->nbuckets * sizeof(dsa_pointer_atomic); |
| hashtable->batches[0].shared->size += size / 2; |
| dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); |
| hashtable->batches[0].shared->buckets = |
| dsa_allocate(hashtable->area, size); |
| buckets = (dsa_pointer_atomic *) |
| dsa_get_address(hashtable->area, |
| hashtable->batches[0].shared->buckets); |
| for (i = 0; i < pstate->nbuckets; ++i) |
| dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
| |
| /* Put the chunk list onto the work queue. */ |
| pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; |
| |
| /* Clear the flag. */ |
| pstate->growth = PHJ_GROWTH_OK; |
| } |
| /* Fall through. */ |
| |
| case PHJ_GROW_BUCKETS_REALLOCATE: |
| /* Wait for the above to complete. */ |
| BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
| WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE); |
| /* Fall through. */ |
| |
| case PHJ_GROW_BUCKETS_REINSERT: |
| /* Reinsert all tuples into the hash table. */ |
| ExecParallelHashEnsureBatchAccessors(hashtable); |
| ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) |
| { |
| size_t idx = 0; |
| |
| while (idx < chunk->used) |
| { |
| HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
| dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; |
| int bucketno; |
| int batchno; |
| |
| ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| &bucketno, &batchno); |
| Assert(batchno == 0); |
| |
| /* add the tuple to the proper bucket */ |
| ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| hashTuple, shared); |
| |
| /* advance index past the tuple */ |
| idx += MAXALIGN(HJTUPLE_OVERHEAD + |
| HJTUPLE_MINTUPLE(hashTuple)->t_len); |
| } |
| |
| /* allow this loop to be cancellable */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
| WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT); |
| } |
| } |
| |
| /* |
| * ExecHashTableInsert |
| * insert a tuple into the hash table depending on the hash value |
| * it may just go to a temp file for later batches |
| * |
| * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual |
| * tuple; the minimal case in particular is certain to happen while reloading |
| * tuples from batch files. We could save some cycles in the regular-tuple |
| * case by not forcing the slot contents into minimal form; not clear if it's |
| * worth the messiness required. |
| * |
| * Returns true if the tuple belonged to this batch and was inserted to |
| * the in-memory hash table, or false if it belonged to a later batch and |
| * was pushed to a temp file. |
| */ |
| bool |
| ExecHashTableInsert(HashState *hashState, HashJoinTable hashtable, |
| TupleTableSlot *slot, |
| uint32 hashvalue) |
| { |
| bool shouldFree; |
| MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| int bucketno; |
| int batchno; |
| PlanState *ps = &hashState->ps; |
| |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, |
| &bucketno, &batchno); |
| |
| /* |
| * decide whether to put the tuple in the hash table or a temp file |
| */ |
| if (batchno == hashtable->curbatch) |
| { |
| /* |
| * put the tuple in hash table |
| */ |
| HashJoinTuple hashTuple; |
| int hashTupleSize; |
| double ntuples = (hashtable->totalTuples - hashtable->skewTuples); |
| |
| /* Create the HashJoinTuple */ |
| hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
| hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); |
| |
| hashTuple->hashvalue = hashvalue; |
| memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| |
| /* |
| * We always reset the tuple-matched flag on insertion. This is okay |
| * even when reloading a tuple from a batch file, since the tuple |
| * could not possibly have been matched to an outer tuple before it |
| * went into the batch file. |
| */ |
| HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| |
| /* Push it onto the front of the bucket's list */ |
| hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| hashtable->buckets.unshared[bucketno] = hashTuple; |
| |
| /* |
| * Increase the (optimal) number of buckets if we just exceeded the |
| * NTUP_PER_BUCKET threshold, but only when there's still a single |
| * batch. |
| */ |
| if (hashtable->nbatch == 1 && |
| ntuples > (hashtable->nbuckets_optimal * gp_hashjoin_tuples_per_bucket)) |
| { |
| /* Guard against integer overflow and alloc size overflow */ |
| if (hashtable->nbuckets_optimal <= INT_MAX / 2 && |
| hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple)) |
| { |
| hashtable->nbuckets_optimal *= 2; |
| hashtable->log2_nbuckets_optimal += 1; |
| } |
| } |
| |
| /* Account for space used, and back off if we've used too much */ |
| hashtable->spaceUsed += hashTupleSize; |
| if (hashtable->spaceUsed > hashtable->spacePeak) |
| hashtable->spacePeak = hashtable->spaceUsed; |
| if (hashtable->spaceUsed + |
| hashtable->nbuckets_optimal * sizeof(HashJoinTuple) |
| > hashtable->spaceAllowed) |
| { |
| ExecHashIncreaseNumBatches(hashtable); |
| |
| if (ps && ps->instrument) |
| { |
| ps->instrument->workfileCreated = true; |
| } |
| } |
| } |
| else |
| { |
| /* |
| * put the tuple into a temp file for later batches |
| */ |
| Assert(batchno > hashtable->curbatch); |
| ExecHashJoinSaveTuple(ps, tuple, |
| hashvalue, |
| hashtable, |
| &hashtable->innerBatchFile[batchno], |
| hashtable->bfCxt); |
| } |
| |
| if (shouldFree) |
| heap_free_minimal_tuple(tuple); |
| |
| return (batchno == hashtable->curbatch); |
| } |
| |
| /* |
| * ExecParallelHashTableInsert |
| * insert a tuple into a shared hash table or shared batch tuplestore |
| */ |
| void |
| ExecParallelHashTableInsert(HashJoinTable hashtable, |
| TupleTableSlot *slot, |
| uint32 hashvalue) |
| { |
| bool shouldFree; |
| MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| dsa_pointer shared; |
| int bucketno; |
| int batchno; |
| |
| retry: |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
| |
| if (batchno == 0) |
| { |
| HashJoinTuple hashTuple; |
| |
| /* Try to load it into memory. */ |
| Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == |
| PHJ_BUILD_HASH_INNER); |
| hashTuple = ExecParallelHashTupleAlloc(hashtable, |
| HJTUPLE_OVERHEAD + tuple->t_len, |
| &shared); |
| if (hashTuple == NULL) |
| goto retry; |
| |
| /* Store the hash value in the HashJoinTuple header. */ |
| hashTuple->hashvalue = hashvalue; |
| memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| |
| /* Push it onto the front of the bucket's list */ |
| ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| hashTuple, shared); |
| } |
| else |
| { |
| size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
| |
| Assert(batchno > 0); |
| |
| /* Try to preallocate space in the batch if necessary. */ |
| if (hashtable->batches[batchno].preallocated < tuple_size) |
| { |
| if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) |
| goto retry; |
| } |
| |
| Assert(hashtable->batches[batchno].preallocated >= tuple_size); |
| hashtable->batches[batchno].preallocated -= tuple_size; |
| sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, |
| tuple); |
| } |
| ++hashtable->batches[batchno].ntuples; |
| |
| if (shouldFree) |
| heap_free_minimal_tuple(tuple); |
| } |
| |
| /* |
| * Insert a tuple into the current hash table. Unlike |
| * ExecParallelHashTableInsert, this version is not prepared to send the tuple |
| * to other batches or to run out of memory, and should only be called with |
| * tuples that belong in the current batch once growth has been disabled. |
| */ |
| void |
| ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, |
| TupleTableSlot *slot, |
| uint32 hashvalue) |
| { |
| bool shouldFree; |
| MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| HashJoinTuple hashTuple; |
| dsa_pointer shared; |
| int batchno; |
| int bucketno; |
| |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
| Assert(batchno == hashtable->curbatch); |
| hashTuple = ExecParallelHashTupleAlloc(hashtable, |
| HJTUPLE_OVERHEAD + tuple->t_len, |
| &shared); |
| hashTuple->hashvalue = hashvalue; |
| memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| hashTuple, shared); |
| |
| if (shouldFree) |
| heap_free_minimal_tuple(tuple); |
| } |
| |
| /* |
| * ExecHashGetHashValue |
| * Compute the hash value for a tuple |
| * |
| * The tuple to be tested must be in econtext->ecxt_outertuple (thus Vars in |
| * the hashkeys expressions need to have OUTER_VAR as varno). If outer_tuple |
| * is false (meaning it's the HashJoin's inner node, Hash), econtext, |
| * hashkeys, and slot need to be from Hash, with hashkeys/slot referencing and |
| * being suitable for tuples from the node below the Hash. Conversely, if |
| * outer_tuple is true, econtext is from HashJoin, and hashkeys/slot need to |
| * be appropriate for tuples from HashJoin's outer node. |
| * |
| * A true result means the tuple's hash value has been successfully computed |
| * and stored at *hashvalue. A false result means the tuple cannot match |
| * because it contains a null attribute, and hence it should be discarded |
| * immediately. (If keep_nulls is true then false is never returned.) |
| * hashkeys_null indicates all the hashkeys are null. |
| */ |
| bool |
| ExecHashGetHashValue(HashState *hashState, HashJoinTable hashtable, |
| ExprContext *econtext, |
| List *hashkeys, |
| bool outer_tuple, |
| bool keep_nulls, |
| uint32 *hashvalue, |
| bool *hashkeys_null) |
| { |
| uint32 hashkey = 0; |
| FmgrInfo *hashfunctions; |
| List *keyvalues = NIL; |
| ListCell *hk; |
| int i = 0; |
| MemoryContext oldContext; |
| bool result = true; |
| bool build_runtime_filter; |
| |
| Assert(hashkeys_null); |
| |
| (*hashkeys_null) = true; |
| |
| /* |
| * We reset the eval context each time to reclaim any memory leaked in the |
| * hashkey expressions. |
| */ |
| ResetExprContext(econtext); |
| |
| oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| |
| if (outer_tuple) |
| hashfunctions = hashtable->outer_hashfunctions; |
| else |
| hashfunctions = hashtable->inner_hashfunctions; |
| |
| build_runtime_filter = !outer_tuple && hashState->rfstate != NULL && |
| !hashState->rfstate->build_suspend; |
| |
| foreach(hk, hashkeys) |
| { |
| ExprState *keyexpr = (ExprState *) lfirst(hk); |
| Datum keyval; |
| uint32 hkey = 0; |
| bool isNull = false; |
| |
| /* combine successive hashkeys by rotating */ |
| hashkey = pg_rotate_left32(hashkey, 1); |
| |
| /* |
| * Get the join attribute value of the tuple |
| */ |
| keyval = ExecEvalExpr(keyexpr, econtext, &isNull); |
| |
| if (!isNull) |
| { |
| *hashkeys_null = false; |
| } |
| |
| /* |
| * If the attribute is NULL, and the join operator is strict, then |
| * this tuple cannot pass the join qual so we can reject it |
| * immediately (unless we're scanning the outside of an outer join, in |
| * which case we must not reject it). Otherwise we act like the |
| * hashcode of NULL is zero (this will support operators that act like |
| * IS NOT DISTINCT, though not any more-random behavior). We treat |
| * the hash support function as strict even if the operator is not. |
| * |
| * Note: currently, all hashjoinable operators must be strict since |
| * the hash index AM assumes that. However, it takes so little extra |
| * code here to allow non-strict that we may as well do it. |
| */ |
| if (isNull) |
| { |
| if (hashtable->hashStrict[i] && !keep_nulls) |
| { |
| result = false; |
| } |
| /* else, leave hashkey unmodified, equivalent to hashcode 0 */ |
| } |
| else if (result) |
| { |
| /* Compute the hash function */ |
| hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval)); |
| hashkey ^= hkey; |
| } |
| |
| if (build_runtime_filter) |
| { |
| Datum *dp = palloc(sizeof(Datum)); |
| |
| /* consider use raw value or hash value */ |
| *dp = hashState->rfstate->raw_value[i] ? keyval : hkey; |
| keyvalues = lappend(keyvalues, dp); |
| } |
| |
| i++; |
| } |
| |
| if (build_runtime_filter && result) |
| RFAddTupleValues(hashState->rfstate, keyvalues); |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| *hashvalue = hashkey; |
| return result; |
| } |
| |
| /* |
| * ExecHashGetBucketAndBatch |
| * Determine the bucket number and batch number for a hash value |
| * |
| * Note: on-the-fly increases of nbatch must not change the bucket number |
| * for a given hash code (since we don't move tuples to different hash |
| * chains), and must only cause the batch number to remain the same or |
| * increase. Our algorithm is |
| * bucketno = hashvalue MOD nbuckets |
| * batchno = ROR(hashvalue, log2_nbuckets) MOD nbatch |
| * where nbuckets and nbatch are both expected to be powers of 2, so we can |
| * do the computations by shifting and masking. (This assumes that all hash |
| * functions are good about randomizing all their output bits, else we are |
| * likely to have very skewed bucket or batch occupancy.) |
| * |
| * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic |
| * bucket count growth. Once we start batching, the value is fixed and does |
| * not change over the course of the join (making it possible to compute batch |
| * number the way we do here). |
| * |
| * nbatch is always a power of 2; we increase it only by doubling it. This |
| * effectively adds one more bit to the top of the batchno. In very large |
| * joins, we might run out of bits to add, so we do this by rotating the hash |
| * value. This causes batchno to steal bits from bucketno when the number of |
| * virtual buckets exceeds 2^32. It's better to have longer bucket chains |
| * than to lose the ability to divide batches. |
| */ |
| void |
| ExecHashGetBucketAndBatch(HashJoinTable hashtable, |
| uint32 hashvalue, |
| int *bucketno, |
| int *batchno) |
| { |
| uint32 nbuckets = (uint32) hashtable->nbuckets; |
| uint32 nbatch = (uint32) hashtable->nbatch; |
| |
| if (nbatch > 1) |
| { |
| *bucketno = hashvalue & (nbuckets - 1); |
| *batchno = pg_rotate_right32(hashvalue, |
| hashtable->log2_nbuckets) & (nbatch - 1); |
| } |
| else |
| { |
| *bucketno = hashvalue & (nbuckets - 1); |
| *batchno = 0; |
| } |
| } |
| |
| /* |
| * ExecScanHashBucket |
| * scan a hash bucket for matches to the current outer tuple |
| * |
| * The current outer tuple must be stored in econtext->ecxt_outertuple. |
| * |
| * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| * for the latter. |
| */ |
| bool |
| ExecScanHashBucket(HashState *hashState, HashJoinState *hjstate, |
| ExprContext *econtext) |
| { |
| /* |
| * Greenplum specific behavior. |
| * Using hashqualclauses to support hash join on 'IS NOT DISTINCT FROM' |
| * as well as '='. |
| */ |
| ExprState *hjclauses = hjstate->hashqualclauses; |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| uint32 hashvalue = hjstate->hj_CurHashValue; |
| |
| /* |
| * hj_CurTuple is the address of the tuple last returned from the current |
| * bucket, or NULL if it's time to start scanning a new bucket. |
| * |
| * If the tuple hashed to a skew bucket then scan the skew bucket |
| * otherwise scan the standard hashtable bucket. |
| */ |
| if (hashTuple != NULL) |
| hashTuple = hashTuple->next.unshared; |
| else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) |
| hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; |
| else |
| hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; |
| |
| while (hashTuple != NULL) |
| { |
| if (hashTuple->hashvalue == hashvalue) |
| { |
| TupleTableSlot *inntuple; |
| |
| /* insert hashtable's tuple into exec slot so ExecQual sees it */ |
| inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| hjstate->hj_HashTupleSlot, |
| false); /* do not pfree */ |
| econtext->ecxt_innertuple = inntuple; |
| |
| if (ExecQualAndReset(hjclauses, econtext)) |
| { |
| hjstate->hj_CurTuple = hashTuple; |
| return true; |
| } |
| } |
| |
| hashTuple = hashTuple->next.unshared; |
| } |
| |
| /* |
| * no match |
| */ |
| return false; |
| } |
| |
| /* |
| * ExecParallelScanHashBucket |
| * scan a hash bucket for matches to the current outer tuple |
| * |
| * The current outer tuple must be stored in econtext->ecxt_outertuple. |
| * |
| * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| * for the latter. |
| */ |
| bool |
| ExecParallelScanHashBucket(HashState *hashState, HashJoinState *hjstate, |
| ExprContext *econtext) |
| { |
| /* |
| * Greenplum specific behavior. |
| * Using hashqualclauses to support hash join on 'IS NOT DISTINCT FROM' |
| * as well as '='. |
| */ |
| ExprState *hjclauses = hjstate->hashqualclauses; |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| uint32 hashvalue = hjstate->hj_CurHashValue; |
| |
| /* |
| * hj_CurTuple is the address of the tuple last returned from the current |
| * bucket, or NULL if it's time to start scanning a new bucket. |
| */ |
| if (hashTuple != NULL) |
| hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
| else |
| hashTuple = ExecParallelHashFirstTuple(hashtable, |
| hjstate->hj_CurBucketNo); |
| |
| while (hashTuple != NULL) |
| { |
| if (hashTuple->hashvalue == hashvalue) |
| { |
| TupleTableSlot *inntuple; |
| |
| /* insert hashtable's tuple into exec slot so ExecQual sees it */ |
| inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| hjstate->hj_HashTupleSlot, |
| false); /* do not pfree */ |
| econtext->ecxt_innertuple = inntuple; |
| |
| if (ExecQualAndReset(hjclauses, econtext)) |
| { |
| hjstate->hj_CurTuple = hashTuple; |
| return true; |
| } |
| } |
| |
| hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
| } |
| |
| /* |
| * no match |
| */ |
| return false; |
| } |
| |
| /* |
| * ExecPrepHashTableForUnmatched |
| * set up for a series of ExecScanHashTableForUnmatched calls |
| */ |
| void |
| ExecPrepHashTableForUnmatched(HashJoinState *hjstate) |
| { |
| /*---------- |
| * During this scan we use the HashJoinState fields as follows: |
| * |
| * hj_CurBucketNo: next regular bucket to scan |
| * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums) |
| * hj_CurTuple: last tuple returned, or NULL to start next bucket |
| *---------- |
| */ |
| hjstate->hj_CurBucketNo = 0; |
| hjstate->hj_CurSkewBucketNo = 0; |
| hjstate->hj_CurTuple = NULL; |
| } |
| |
| /* |
| * Decide if this process is allowed to run the unmatched scan. If so, the |
| * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned. |
| * Otherwise the batch is detached and false is returned. |
| */ |
| bool |
| ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| int curbatch = hashtable->curbatch; |
| ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; |
| |
| Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE); |
| |
| /* |
| * It would not be deadlock-free to wait on the batch barrier, because it |
| * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have |
| * already emitted tuples. Therefore, we'll hold a wait-free election: |
| * only one process can continue to the next phase, and all others detach |
| * from this batch. They can still go any work on other batches, if there |
| * are any. |
| */ |
| if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier)) |
| { |
| /* This process considers the batch to be done. */ |
| hashtable->batches[hashtable->curbatch].done = true; |
| |
| /* Make sure any temporary files are closed. */ |
| sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); |
| sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); |
| |
| /* |
| * Track largest batch we've seen, which would normally happen in |
| * ExecHashTableDetachBatch(). |
| */ |
| hashtable->spacePeak = |
| Max(hashtable->spacePeak, |
| batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); |
| hashtable->curbatch = -1; |
| return false; |
| } |
| |
| /* Now we are alone with this batch. */ |
| Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); |
| |
| /* |
| * Has another process decided to give up early and command all processes |
| * to skip the unmatched scan? |
| */ |
| if (batch->skip_unmatched) |
| { |
| hashtable->batches[hashtable->curbatch].done = true; |
| ExecHashTableDetachBatch(hashtable); |
| return false; |
| } |
| |
| /* Now prepare the process local state, just as for non-parallel join. */ |
| ExecPrepHashTableForUnmatched(hjstate); |
| |
| return true; |
| } |
| |
| /* |
| * ExecScanHashTableForUnmatched |
| * scan the hash table for unmatched inner tuples |
| * |
| * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| * for the latter. |
| */ |
| bool |
| ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| |
| for (;;) |
| { |
| /* |
| * hj_CurTuple is the address of the tuple last returned from the |
| * current bucket, or NULL if it's time to start scanning a new |
| * bucket. |
| */ |
| if (hashTuple != NULL) |
| hashTuple = hashTuple->next.unshared; |
| else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) |
| { |
| hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; |
| hjstate->hj_CurBucketNo++; |
| } |
| else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) |
| { |
| int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; |
| |
| hashTuple = hashtable->skewBucket[j]->tuples; |
| hjstate->hj_CurSkewBucketNo++; |
| } |
| else |
| break; /* finished all buckets */ |
| |
| while (hashTuple != NULL) |
| { |
| if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) |
| { |
| TupleTableSlot *inntuple; |
| |
| /* insert hashtable's tuple into exec slot */ |
| inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| hjstate->hj_HashTupleSlot, |
| false); /* do not pfree */ |
| econtext->ecxt_innertuple = inntuple; |
| |
| /* |
| * Reset temp memory each time; although this function doesn't |
| * do any qual eval, the caller will, so let's keep it |
| * parallel to ExecScanHashBucket. |
| */ |
| ResetExprContext(econtext); |
| |
| hjstate->hj_CurTuple = hashTuple; |
| return true; |
| } |
| |
| hashTuple = hashTuple->next.unshared; |
| } |
| |
| /* allow this loop to be cancellable */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| /* |
| * no more unmatched tuples |
| */ |
| return false; |
| } |
| |
| /* |
| * ExecParallelScanHashTableForUnmatched |
| * scan the hash table for unmatched inner tuples, in parallel join |
| * |
| * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| * for the latter. |
| */ |
| bool |
| ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, |
| ExprContext *econtext) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| |
| for (;;) |
| { |
| /* |
| * hj_CurTuple is the address of the tuple last returned from the |
| * current bucket, or NULL if it's time to start scanning a new |
| * bucket. |
| */ |
| if (hashTuple != NULL) |
| hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
| else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) |
| hashTuple = ExecParallelHashFirstTuple(hashtable, |
| hjstate->hj_CurBucketNo++); |
| else |
| break; /* finished all buckets */ |
| |
| while (hashTuple != NULL) |
| { |
| if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) |
| { |
| TupleTableSlot *inntuple; |
| |
| /* insert hashtable's tuple into exec slot */ |
| inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| hjstate->hj_HashTupleSlot, |
| false); /* do not pfree */ |
| econtext->ecxt_innertuple = inntuple; |
| |
| /* |
| * Reset temp memory each time; although this function doesn't |
| * do any qual eval, the caller will, so let's keep it |
| * parallel to ExecScanHashBucket. |
| */ |
| ResetExprContext(econtext); |
| |
| hjstate->hj_CurTuple = hashTuple; |
| return true; |
| } |
| |
| hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
| } |
| |
| /* allow this loop to be cancellable */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| /* |
| * no more unmatched tuples |
| */ |
| return false; |
| } |
| |
| /* |
| * ExecHashTableReset |
| * |
| * reset hash table header for new batch |
| */ |
| void |
| ExecHashTableReset(HashState *hashState, HashJoinTable hashtable) |
| { |
| MemoryContext oldcxt; |
| int nbuckets = hashtable->nbuckets; |
| |
| Assert(!hashtable->eagerlyReleased); |
| |
| /* |
| * Release all the hash buckets and tuples acquired in the prior pass, and |
| * reinitialize the context for a new pass. |
| */ |
| MemoryContextReset(hashtable->batchCxt); |
| oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); |
| |
| /* Reallocate and reinitialize the hash bucket headers. */ |
| hashtable->buckets.unshared = palloc0_array(HashJoinTuple, nbuckets); |
| |
| hashtable->spaceUsed = 0; |
| hashtable->totalTuples = 0; |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| /* Forget the chunks (the memory was freed by the context reset above). */ |
| hashtable->chunks = NULL; |
| } |
| |
| /* |
| * ExecHashTableResetMatchFlags |
| * Clear all the HeapTupleHeaderHasMatch flags in the table |
| */ |
| void |
| ExecHashTableResetMatchFlags(HashJoinTable hashtable) |
| { |
| HashJoinTuple tuple; |
| int i; |
| |
| /* Reset all flags in the main table ... */ |
| for (i = 0; i < hashtable->nbuckets; i++) |
| { |
| for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; |
| tuple = tuple->next.unshared) |
| HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); |
| } |
| |
| /* ... and the same for the skew buckets, if any */ |
| for (i = 0; i < hashtable->nSkewBuckets; i++) |
| { |
| int j = hashtable->skewBucketNums[i]; |
| HashSkewBucket *skewBucket = hashtable->skewBucket[j]; |
| |
| for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) |
| HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); |
| } |
| } |
| |
| |
| void |
| ExecReScanHash(HashState *node) |
| { |
| PlanState *outerPlan = outerPlanState(node); |
| |
| /* |
| * if chgParam of subnode is not null then plan will be re-scanned by |
| * first ExecProcNode. |
| */ |
| if (outerPlan->chgParam == NULL) |
| ExecReScan(outerPlan); |
| |
| if (gp_enable_runtime_filter_pushdown && node->filters) |
| ResetRuntimeFilter(node); |
| } |
| |
| |
| /* |
| * ExecHashTableExplainInit |
| * Called after ExecHashTableCreate to set up EXPLAIN ANALYZE reporting. |
| */ |
| void |
| ExecHashTableExplainInit(HashState *hashState, HashJoinState *hjstate, |
| HashJoinTable hashtable) |
| { |
| MemoryContext oldcxt; |
| int nbatch = Max(hashtable->nbatch, 1); |
| |
| /* Switch to a memory context that survives until ExecutorEnd. */ |
| oldcxt = MemoryContextSwitchTo(hjstate->js.ps.state->es_query_cxt); |
| |
| /* Request a callback at end of query. */ |
| hjstate->js.ps.cdbexplainfun = ExecHashTableExplainEnd; |
| |
| /* Create workarea and attach it to the HashJoinTable. */ |
| hashtable->stats = (HashJoinTableStats *)palloc0(sizeof(*hashtable->stats)); |
| hashtable->stats->endedbatch = -1; |
| |
| /* Create per-batch statistics array. */ |
| hashtable->stats->batchstats = |
| (HashJoinBatchStats *)palloc0(nbatch * sizeof(hashtable->stats->batchstats[0])); |
| hashtable->stats->nbatchstats = nbatch; |
| |
| /* Restore caller's memory context. */ |
| MemoryContextSwitchTo(oldcxt); |
| } /* ExecHashTableExplainInit */ |
| |
| |
| /* |
| * ExecHashTableExplainEnd |
| * Called before ExecutorEnd to finish EXPLAIN ANALYZE reporting. |
| */ |
| static void |
| ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf) |
| { |
| HashJoinState *hjstate = (HashJoinState *)planstate; |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| HashJoinTableStats *stats; |
| Instrumentation *jinstrument = hjstate->js.ps.instrument; |
| int total_buckets; |
| int i; |
| |
| if (!hashtable || |
| !hashtable->stats || |
| hashtable->nbatch < 1 || |
| !jinstrument || |
| !jinstrument->need_cdb) |
| return; |
| |
| stats = hashtable->stats; |
| |
| Assert(stats->batchstats); |
| |
| if (!hashtable->eagerlyReleased) |
| { |
| HashState *hashState = (HashState *) innerPlanState(hjstate); |
| |
| /* Report on batch in progress, in case the join is being ended early. */ |
| ExecHashTableExplainBatchEnd(hashState, hashtable); |
| } |
| |
| /* Report actual work_mem high water mark. */ |
| jinstrument->workmemused = Max(jinstrument->workmemused, stats->workmem_max); |
| |
| /* How much work_mem would suffice to hold all inner tuples in memory? */ |
| if (hashtable->nbatch > 1) |
| { |
| uint64 workmemwanted = 0; |
| |
| /* Space actually taken by hash rows in completed batches... */ |
| for (i = 0; i <= stats->endedbatch; i++) |
| workmemwanted += stats->batchstats[i].hashspace_final; |
| |
| /* ... plus workfile size for original batches not reached, plus... */ |
| for (; i < hashtable->nbatch_original; i++) |
| workmemwanted += stats->batchstats[i].innerfilesize; |
| |
| /* ... rows spilled to unreached oflo batches, in case quitting early */ |
| for (; i < stats->nbatchstats; i++) |
| workmemwanted += stats->batchstats[i].spillspace_in; |
| |
| /* |
| * Sometimes workfiles are used even though all the data would fit |
| * in work_mem. For example, if the planner overestimated the inner |
| * rel size, it might have instructed us to use more initial batches |
| * than were actually needed, causing unnecessary workfile I/O. To |
| * avoid this I/O, the user would have to increase work_mem based on |
| * the planner's estimate rather than our runtime observations. For |
| * now, we don't try to second-guess the planner; just keep quiet. |
| */ |
| if (workmemwanted > PlanStateOperatorMemKB(planstate) * 1024L) |
| jinstrument->workmemwanted = |
| Max(jinstrument->workmemwanted, workmemwanted); |
| } |
| |
| /* Report workfile I/O statistics. */ |
| /* CBDB_PARALLEL_FIXME: ExecHashTableExplainBatches if parallel_aware? */ |
| if (hashtable->nbatch > 1 && !planstate->plan->parallel_aware) |
| { |
| ExecHashTableExplainBatches(hashtable, buf, 0, 1, "Initial"); |
| ExecHashTableExplainBatches(hashtable, |
| buf, |
| 1, |
| hashtable->nbatch_original, |
| "Initial"); |
| ExecHashTableExplainBatches(hashtable, |
| buf, |
| hashtable->nbatch_original, |
| hashtable->nbatch_outstart, |
| "Overflow"); |
| ExecHashTableExplainBatches(hashtable, |
| buf, |
| hashtable->nbatch_outstart, |
| hashtable->nbatch, |
| "Secondary Overflow"); |
| } |
| |
| /* Report hash chain statistics. */ |
| total_buckets = stats->nonemptybatches * hashtable->nbuckets; |
| if (total_buckets > 0) |
| { |
| appendStringInfo(buf, |
| "Hash chain length" |
| " %.1f avg, %.0f max, using %d of %d buckets.", |
| cdbexplain_agg_avg(&stats->chainlength), |
| stats->chainlength.vmax, |
| stats->chainlength.vcnt, |
| total_buckets); |
| if (hashtable->nbatch > stats->nonemptybatches) |
| appendStringInfo(buf, |
| " Skipped %d empty batches.", |
| hashtable->nbatch - stats->nonemptybatches); |
| } |
| } /* ExecHashTableExplainEnd */ |
| |
| |
| /* |
| * ExecHashTableExplainBatches |
| * Report summary of EXPLAIN ANALYZE stats for a set of batches. |
| */ |
| static void |
| ExecHashTableExplainBatches(HashJoinTable hashtable, |
| StringInfo buf, |
| int ibatch_begin, |
| int ibatch_end, |
| const char *title) |
| { |
| HashJoinTableStats *stats = hashtable->stats; |
| CdbExplain_Agg irdbytes; |
| CdbExplain_Agg iwrbytes; |
| CdbExplain_Agg ordbytes; |
| CdbExplain_Agg owrbytes; |
| int i; |
| |
| if (ibatch_begin >= ibatch_end) |
| return; |
| |
| Assert(ibatch_begin >= 0 && |
| ibatch_end <= hashtable->nbatch && |
| hashtable->nbatch <= stats->nbatchstats && |
| stats->batchstats != NULL); |
| |
| cdbexplain_agg_init0(&irdbytes); |
| cdbexplain_agg_init0(&iwrbytes); |
| cdbexplain_agg_init0(&ordbytes); |
| cdbexplain_agg_init0(&owrbytes); |
| |
| /* Add up the batch stats. */ |
| for (i = ibatch_begin; i < ibatch_end; i++) |
| { |
| HashJoinBatchStats *bs = &stats->batchstats[i]; |
| |
| cdbexplain_agg_upd(&irdbytes, (double)bs->irdbytes, i); |
| cdbexplain_agg_upd(&iwrbytes, (double)bs->iwrbytes, i); |
| cdbexplain_agg_upd(&ordbytes, (double)bs->ordbytes, i); |
| cdbexplain_agg_upd(&owrbytes, (double)bs->owrbytes, i); |
| } |
| |
| if (iwrbytes.vcnt + irdbytes.vcnt + owrbytes.vcnt + ordbytes.vcnt > 0) |
| { |
| if (ibatch_begin == ibatch_end - 1) |
| appendStringInfo(buf, |
| "%s batch %d:\n", |
| title, |
| ibatch_begin); |
| else |
| appendStringInfo(buf, |
| "%s batches %d..%d:\n", |
| title, |
| ibatch_begin, |
| ibatch_end - 1); |
| } |
| |
| /* Inner bytes read from workfile */ |
| if (irdbytes.vcnt > 0) |
| { |
| appendStringInfo(buf, |
| " Read %.0fK bytes from inner workfile", |
| ceil(irdbytes.vsum / 1024)); |
| if (irdbytes.vcnt > 1) |
| appendStringInfo(buf, |
| ": %.0fK avg x %d nonempty batches" |
| ", %.0fK max", |
| ceil(cdbexplain_agg_avg(&irdbytes)/1024), |
| irdbytes.vcnt, |
| ceil(irdbytes.vmax / 1024)); |
| appendStringInfoString(buf, ".\n"); |
| } |
| |
| /* Inner rel bytes spilled to workfile */ |
| if (iwrbytes.vcnt > 0) |
| { |
| appendStringInfo(buf, |
| " Wrote %.0fK bytes to inner workfile", |
| ceil(iwrbytes.vsum / 1024)); |
| if (iwrbytes.vcnt > 1) |
| appendStringInfo(buf, |
| ": %.0fK avg x %d overflowing batches" |
| ", %.0fK max", |
| ceil(cdbexplain_agg_avg(&iwrbytes)/1024), |
| iwrbytes.vcnt, |
| ceil(iwrbytes.vmax / 1024)); |
| appendStringInfoString(buf, ".\n"); |
| } |
| |
| /* Outer bytes read from workfile */ |
| if (ordbytes.vcnt > 0) |
| { |
| appendStringInfo(buf, |
| " Read %.0fK bytes from outer workfile", |
| ceil(ordbytes.vsum / 1024)); |
| if (ordbytes.vcnt > 1) |
| appendStringInfo(buf, |
| ": %.0fK avg x %d nonempty batches" |
| ", %.0fK max", |
| ceil(cdbexplain_agg_avg(&ordbytes)/1024), |
| ordbytes.vcnt, |
| ceil(ordbytes.vmax / 1024)); |
| appendStringInfoString(buf, ".\n"); |
| } |
| |
| /* Outer rel bytes spilled to workfile */ |
| if (owrbytes.vcnt > 0) |
| { |
| appendStringInfo(buf, |
| " Wrote %.0fK bytes to outer workfile", |
| ceil(owrbytes.vsum / 1024)); |
| if (owrbytes.vcnt > 1) |
| appendStringInfo(buf, |
| ": %.0fK avg x %d overflowing batches" |
| ", %.0fK max", |
| ceil(cdbexplain_agg_avg(&owrbytes)/1024), |
| owrbytes.vcnt, |
| ceil(owrbytes.vmax / 1024)); |
| appendStringInfoString(buf, ".\n"); |
| } |
| } /* ExecHashTableExplainBatches */ |
| |
| |
| /* |
| * ExecHashTableExplainBatchEnd |
| * Called at end of each batch to collect statistics for EXPLAIN ANALYZE. |
| */ |
| void |
| ExecHashTableExplainBatchEnd(HashState *hashState, HashJoinTable hashtable) |
| { |
| int curbatch = hashtable->curbatch; |
| HashJoinTableStats *stats = hashtable->stats; |
| HashJoinBatchStats *batchstats = &stats->batchstats[curbatch]; |
| |
| Assert(!hashtable->eagerlyReleased); |
| |
| /* Already reported on this batch? */ |
| if ( stats->endedbatch == curbatch |
| || curbatch >= hashtable->nbatch || !hashtable->first_pass) |
| return; |
| stats->endedbatch = curbatch; |
| |
| /* Update high-water mark for work_mem actually used at one time. */ |
| if (stats->workmem_max < hashtable->spaceUsed) |
| stats->workmem_max = hashtable->spaceUsed; |
| |
| /* Final size of hash table for this batch */ |
| batchstats->hashspace_final = hashtable->spaceUsed; |
| |
| /* Collect buffile I/O statistics. */ |
| /* Parallel hash join uses shared tuplestores, don't consider it now. */ |
| if (hashtable->parallel_state == NULL) |
| { |
| if (hashtable->nbatch > 1) |
| { |
| uint64 owrbytes = 0; |
| uint64 iwrbytes = 0; |
| |
| Assert(stats->batchstats && |
| hashtable->nbatch <= stats->nbatchstats); |
| |
| /* for curbatch=0, the inner tuple is in the in-memory hash table, the outer tuple is |
| * read from outer relation, nothing need to read from batch file, but the innerfilesize |
| * is initialized to 0 and the outerBatchFile[0] is initialized to NULL. |
| * for curbatch>0, the inner tuple and outer tuple are read from batch file. |
| */ |
| |
| /* How much was read from inner buffile for current batch? */ |
| batchstats->irdbytes = batchstats->innerfilesize; |
| |
| /* How much was read from outer buffiles for current batch? */ |
| if (hashtable->outerBatchFile && |
| hashtable->outerBatchFile[curbatch] != NULL) |
| { |
| batchstats->ordbytes = BufFileSize(hashtable->outerBatchFile[curbatch]); |
| } |
| |
| /* for curbatch=0, the tuple which is not belong to the batch 0 is put into the temp |
| * file for later batches. |
| * for curbatch>0, It's possible that we increase the number, so that by the time we |
| * reload curbatch file, some of the tuples we wrote here will logically belong to a later |
| * file, they may be sent to future batches, so we count the increasing size here. |
| */ |
| |
| /* How much was written to buffiles for the remaining batches? */ |
| for (int i = curbatch + 1; i < hashtable->nbatch; i++) |
| { |
| HashJoinBatchStats *bs = &stats->batchstats[i]; |
| uint64 filebytes = 0; |
| |
| if (hashtable->outerBatchFile && |
| hashtable->outerBatchFile[i] != NULL) |
| { |
| filebytes = BufFileSize(hashtable->outerBatchFile[i]); |
| } |
| |
| Assert(filebytes >= bs->outerfilesize); |
| owrbytes += filebytes - bs->outerfilesize; |
| bs->outerfilesize = filebytes; |
| |
| filebytes = 0; |
| |
| if (hashtable->innerBatchFile && |
| hashtable->innerBatchFile[i]) |
| { |
| filebytes = BufFileSize(hashtable->innerBatchFile[i]); |
| } |
| |
| Assert(filebytes >= bs->innerfilesize); |
| iwrbytes += filebytes - bs->innerfilesize; |
| bs->innerfilesize = filebytes; |
| } |
| batchstats->owrbytes = owrbytes; |
| batchstats->iwrbytes = iwrbytes; |
| } /* give buffile I/O statistics */ |
| |
| /* Collect hash chain statistics. */ |
| stats->nonemptybatches++; |
| for (int i = 0; i < hashtable->nbuckets; i++) |
| { |
| HashJoinTuple hashtuple = hashtable->buckets.unshared[i]; |
| int chainlength; |
| |
| if (hashtuple) |
| { |
| for (chainlength = 0; hashtuple; hashtuple = hashtuple->next.unshared) |
| chainlength++; |
| cdbexplain_agg_upd(&stats->chainlength, chainlength, i); |
| } |
| } |
| } |
| } /* ExecHashTableExplainBatchEnd */ |
| |
| |
| /* |
| * ExecHashBuildSkewHash |
| * |
| * Set up for skew optimization if we can identify the most common values |
| * (MCVs) of the outer relation's join key. We make a skew hash bucket |
| * for the hash value of each MCV, up to the number of slots allowed |
| * based on available memory. |
| */ |
| static void |
| ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) |
| { |
| HeapTupleData *statsTuple; |
| AttStatsSlot sslot; |
| |
| /* Do nothing if planner didn't identify the outer relation's join key */ |
| if (!OidIsValid(node->skewTable)) |
| return; |
| /* Also, do nothing if we don't have room for at least one skew bucket */ |
| if (mcvsToUse <= 0) |
| return; |
| |
| /* |
| * Try to find the MCV statistics for the outer relation's join key. |
| */ |
| statsTuple = SearchSysCache3(STATRELATTINH, |
| ObjectIdGetDatum(node->skewTable), |
| Int16GetDatum(node->skewColumn), |
| BoolGetDatum(node->skewInherit)); |
| if (!HeapTupleIsValid(statsTuple)) |
| return; |
| |
| if (get_attstatsslot(&sslot, statsTuple, |
| STATISTIC_KIND_MCV, InvalidOid, |
| ATTSTATSSLOT_VALUES | ATTSTATSSLOT_NUMBERS)) |
| { |
| double frac; |
| int nbuckets; |
| FmgrInfo *hashfunctions; |
| int i; |
| |
| if (mcvsToUse > sslot.nvalues) |
| mcvsToUse = sslot.nvalues; |
| |
| /* |
| * Calculate the expected fraction of outer relation that will |
| * participate in the skew optimization. If this isn't at least |
| * SKEW_MIN_OUTER_FRACTION, don't use skew optimization. |
| */ |
| frac = 0; |
| for (i = 0; i < mcvsToUse; i++) |
| frac += sslot.numbers[i]; |
| if (frac < SKEW_MIN_OUTER_FRACTION) |
| { |
| free_attstatsslot(&sslot); |
| ReleaseSysCache(statsTuple); |
| return; |
| } |
| |
| /* |
| * Okay, set up the skew hashtable. |
| * |
| * skewBucket[] is an open addressing hashtable with a power of 2 size |
| * that is greater than the number of MCV values. (This ensures there |
| * will be at least one null entry, so searches will always |
| * terminate.) |
| * |
| * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or |
| * MaxAllocSize/sizeof(void *)/8, but that is not currently possible |
| * since we limit pg_statistic entries to much less than that. |
| */ |
| nbuckets = pg_nextpower2_32(mcvsToUse + 1); |
| /* use two more bits just to help avoid collisions */ |
| nbuckets <<= 2; |
| |
| hashtable->skewEnabled = true; |
| hashtable->skewBucketLen = nbuckets; |
| |
| /* |
| * We allocate the bucket memory in the hashtable's batch context. It |
| * is only needed during the first batch, and this ensures it will be |
| * automatically removed once the first batch is done. |
| */ |
| hashtable->skewBucket = (HashSkewBucket **) |
| MemoryContextAllocZero(hashtable->batchCxt, |
| nbuckets * sizeof(HashSkewBucket *)); |
| hashtable->skewBucketNums = (int *) |
| MemoryContextAllocZero(hashtable->batchCxt, |
| mcvsToUse * sizeof(int)); |
| |
| hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *) |
| + mcvsToUse * sizeof(int); |
| hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) |
| + mcvsToUse * sizeof(int); |
| if (hashtable->spaceUsed > hashtable->spacePeak) |
| hashtable->spacePeak = hashtable->spaceUsed; |
| |
| /* |
| * Create a skew bucket for each MCV hash value. |
| * |
| * Note: it is very important that we create the buckets in order of |
| * decreasing MCV frequency. If we have to remove some buckets, they |
| * must be removed in reverse order of creation (see notes in |
| * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to |
| * be removed first. |
| */ |
| hashfunctions = hashtable->outer_hashfunctions; |
| |
| for (i = 0; i < mcvsToUse; i++) |
| { |
| uint32 hashvalue; |
| int bucket; |
| |
| hashvalue = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[0], |
| hashtable->collations[0], |
| sslot.values[i])); |
| |
| /* |
| * While we have not hit a hole in the hashtable and have not hit |
| * the desired bucket, we have collided with some previous hash |
| * value, so try the next bucket location. NB: this code must |
| * match ExecHashGetSkewBucket. |
| */ |
| bucket = hashvalue & (nbuckets - 1); |
| while (hashtable->skewBucket[bucket] != NULL && |
| hashtable->skewBucket[bucket]->hashvalue != hashvalue) |
| bucket = (bucket + 1) & (nbuckets - 1); |
| |
| /* |
| * If we found an existing bucket with the same hashvalue, leave |
| * it alone. It's okay for two MCVs to share a hashvalue. |
| */ |
| if (hashtable->skewBucket[bucket] != NULL) |
| continue; |
| |
| /* Okay, create a new skew bucket for this hashvalue. */ |
| hashtable->skewBucket[bucket] = (HashSkewBucket *) |
| MemoryContextAlloc(hashtable->batchCxt, |
| sizeof(HashSkewBucket)); |
| hashtable->skewBucket[bucket]->hashvalue = hashvalue; |
| hashtable->skewBucket[bucket]->tuples = NULL; |
| hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; |
| hashtable->nSkewBuckets++; |
| hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; |
| hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; |
| if (hashtable->spaceUsed > hashtable->spacePeak) |
| hashtable->spacePeak = hashtable->spaceUsed; |
| } |
| |
| free_attstatsslot(&sslot); |
| } |
| |
| ReleaseSysCache(statsTuple); |
| } |
| |
| /* |
| * ExecHashGetSkewBucket |
| * |
| * Returns the index of the skew bucket for this hashvalue, |
| * or INVALID_SKEW_BUCKET_NO if the hashvalue is not |
| * associated with any active skew bucket. |
| */ |
| int |
| ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) |
| { |
| int bucket; |
| |
| /* |
| * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in |
| * particular, this happens after the initial batch is done). |
| */ |
| if (!hashtable->skewEnabled) |
| return INVALID_SKEW_BUCKET_NO; |
| |
| /* |
| * Since skewBucketLen is a power of 2, we can do a modulo by ANDing. |
| */ |
| bucket = hashvalue & (hashtable->skewBucketLen - 1); |
| |
| /* |
| * While we have not hit a hole in the hashtable and have not hit the |
| * desired bucket, we have collided with some other hash value, so try the |
| * next bucket location. |
| */ |
| while (hashtable->skewBucket[bucket] != NULL && |
| hashtable->skewBucket[bucket]->hashvalue != hashvalue) |
| bucket = (bucket + 1) & (hashtable->skewBucketLen - 1); |
| |
| /* |
| * Found the desired bucket? |
| */ |
| if (hashtable->skewBucket[bucket] != NULL) |
| return bucket; |
| |
| /* |
| * There must not be any hashtable entry for this hash value. |
| */ |
| return INVALID_SKEW_BUCKET_NO; |
| } |
| |
| /* |
| * ExecHashSkewTableInsert |
| * |
| * Insert a tuple into the skew hashtable. |
| * |
| * This should generally match up with the current-batch case in |
| * ExecHashTableInsert. |
| */ |
| static void |
| ExecHashSkewTableInsert(HashState *hashState, |
| HashJoinTable hashtable, |
| TupleTableSlot *slot, |
| uint32 hashvalue, |
| int bucketNumber) |
| { |
| bool shouldFree; |
| MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| HashJoinTuple hashTuple; |
| int hashTupleSize; |
| |
| /* Create the HashJoinTuple */ |
| hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
| hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, |
| hashTupleSize); |
| hashTuple->hashvalue = hashvalue; |
| memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| |
| /* Push it onto the front of the skew bucket's list */ |
| hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; |
| hashtable->skewBucket[bucketNumber]->tuples = hashTuple; |
| Assert(hashTuple != hashTuple->next.unshared); |
| |
| /* Account for space used, and back off if we've used too much */ |
| hashtable->spaceUsed += hashTupleSize; |
| hashtable->spaceUsedSkew += hashTupleSize; |
| if (hashtable->spaceUsed > hashtable->spacePeak) |
| hashtable->spacePeak = hashtable->spaceUsed; |
| while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) |
| ExecHashRemoveNextSkewBucket(hashState, hashtable); |
| |
| /* Check we are not over the total spaceAllowed, either */ |
| if (hashtable->spaceUsed > hashtable->spaceAllowed) |
| ExecHashIncreaseNumBatches(hashtable); |
| |
| if (shouldFree) |
| heap_free_minimal_tuple(tuple); |
| } |
| |
| /* |
| * ExecHashRemoveNextSkewBucket |
| * |
| * Remove the least valuable skew bucket by pushing its tuples into |
| * the main hash table. |
| */ |
| static void |
| ExecHashRemoveNextSkewBucket(HashState *hashState, HashJoinTable hashtable) |
| { |
| PlanState *ps = &hashState->ps; |
| int bucketToRemove; |
| HashSkewBucket *bucket; |
| uint32 hashvalue; |
| int bucketno; |
| int batchno; |
| HashJoinTuple hashTuple; |
| |
| /* Locate the bucket to remove */ |
| bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; |
| bucket = hashtable->skewBucket[bucketToRemove]; |
| |
| /* |
| * Calculate which bucket and batch the tuples belong to in the main |
| * hashtable. They all have the same hash value, so it's the same for all |
| * of them. Also note that it's not possible for nbatch to increase while |
| * we are processing the tuples. |
| */ |
| hashvalue = bucket->hashvalue; |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
| |
| /* Process all tuples in the bucket */ |
| hashTuple = bucket->tuples; |
| while (hashTuple != NULL) |
| { |
| HashJoinTuple nextHashTuple = hashTuple->next.unshared; |
| MinimalTuple tuple; |
| Size tupleSize; |
| |
| /* |
| * This code must agree with ExecHashTableInsert. We do not use |
| * ExecHashTableInsert directly as ExecHashTableInsert expects a |
| * TupleTableSlot while we already have HashJoinTuples. |
| */ |
| tuple = HJTUPLE_MINTUPLE(hashTuple); |
| tupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
| |
| /* Decide whether to put the tuple in the hash table or a temp file */ |
| if (batchno == hashtable->curbatch) |
| { |
| /* Move the tuple to the main hash table */ |
| HashJoinTuple copyTuple; |
| |
| /* |
| * We must copy the tuple into the dense storage, else it will not |
| * be found by, eg, ExecHashIncreaseNumBatches. |
| */ |
| copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize); |
| memcpy(copyTuple, hashTuple, tupleSize); |
| pfree(hashTuple); |
| |
| copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| hashtable->buckets.unshared[bucketno] = copyTuple; |
| |
| /* We have reduced skew space, but overall space doesn't change */ |
| hashtable->spaceUsedSkew -= tupleSize; |
| } |
| else |
| { |
| /* Put the tuple into a temp file for later batches */ |
| Assert(batchno > hashtable->curbatch); |
| ExecHashJoinSaveTuple(ps, tuple, |
| hashvalue, |
| hashtable, |
| &hashtable->innerBatchFile[batchno], hashtable->bfCxt); |
| pfree(hashTuple); |
| hashtable->spaceUsed -= tupleSize; |
| hashtable->spaceUsedSkew -= tupleSize; |
| } |
| |
| hashTuple = nextHashTuple; |
| |
| /* allow this loop to be cancellable */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| /* |
| * Free the bucket struct itself and reset the hashtable entry to NULL. |
| * |
| * NOTE: this is not nearly as simple as it looks on the surface, because |
| * of the possibility of collisions in the hashtable. Suppose that hash |
| * values A and B collide at a particular hashtable entry, and that A was |
| * entered first so B gets shifted to a different table entry. If we were |
| * to remove A first then ExecHashGetSkewBucket would mistakenly start |
| * reporting that B is not in the hashtable, because it would hit the NULL |
| * before finding B. However, we always remove entries in the reverse |
| * order of creation, so this failure cannot happen. |
| */ |
| hashtable->skewBucket[bucketToRemove] = NULL; |
| hashtable->nSkewBuckets--; |
| pfree(bucket); |
| hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD; |
| hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD; |
| |
| /* |
| * If we have removed all skew buckets then give up on skew optimization. |
| * Release the arrays since they aren't useful any more. |
| */ |
| if (hashtable->nSkewBuckets == 0) |
| { |
| hashtable->skewEnabled = false; |
| pfree(hashtable->skewBucket); |
| pfree(hashtable->skewBucketNums); |
| hashtable->skewBucket = NULL; |
| hashtable->skewBucketNums = NULL; |
| hashtable->spaceUsed -= hashtable->spaceUsedSkew; |
| hashtable->spaceUsedSkew = 0; |
| } |
| } |
| |
| /* |
| * Reserve space in the DSM segment for instrumentation data. |
| */ |
| void |
| ExecHashEstimate(HashState *node, ParallelContext *pcxt) |
| { |
| size_t size; |
| |
| /* don't need this if not instrumenting or no workers */ |
| if (!node->ps.instrument || pcxt->nworkers == 0) |
| return; |
| |
| size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); |
| size = add_size(size, offsetof(SharedHashInfo, hinstrument)); |
| shm_toc_estimate_chunk(&pcxt->estimator, size); |
| shm_toc_estimate_keys(&pcxt->estimator, 1); |
| } |
| |
| /* |
| * Set up a space in the DSM for all workers to record instrumentation data |
| * about their hash table. |
| */ |
| void |
| ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) |
| { |
| size_t size; |
| |
| /* don't need this if not instrumenting or no workers */ |
| if (!node->ps.instrument || pcxt->nworkers == 0) |
| return; |
| |
| size = offsetof(SharedHashInfo, hinstrument) + |
| pcxt->nworkers * sizeof(HashInstrumentation); |
| node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); |
| |
| /* Each per-worker area must start out as zeroes. */ |
| memset(node->shared_info, 0, size); |
| |
| node->shared_info->num_workers = pcxt->nworkers; |
| shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, |
| node->shared_info); |
| } |
| |
| /* |
| * Locate the DSM space for hash table instrumentation data that we'll write |
| * to at shutdown time. |
| */ |
| void |
| ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) |
| { |
| SharedHashInfo *shared_info; |
| HashJoinState *hjstate = node->hashtable->hjstate; |
| /* don't need this if not instrumenting */ |
| if (!node->ps.instrument || !hjstate) |
| return; |
| |
| /* |
| * Find our entry in the shared area, and set up a pointer to it so that |
| * we'll accumulate stats there when shutting down or rebuilding the hash |
| * table. |
| */ |
| shared_info = (SharedHashInfo *) |
| shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); |
| Assert(hjstate->worker_id >= 1); |
| node->hinstrument = &shared_info->hinstrument[hjstate->worker_id - 1]; |
| } |
| |
| /* |
| * Collect EXPLAIN stats if needed, saving them into DSM memory if |
| * ExecHashInitializeWorker was called, or local storage if not. In the |
| * parallel case, this must be done in ExecShutdownHash() rather than |
| * ExecEndHash() because the latter runs after we've detached from the DSM |
| * segment. |
| */ |
| void |
| ExecShutdownHash(HashState *node) |
| { |
| /* Allocate save space if EXPLAIN'ing and we didn't do so already */ |
| if (node->ps.instrument && !node->hinstrument) |
| node->hinstrument = palloc0_object(HashInstrumentation); |
| /* Now accumulate data for the current (final) hash table */ |
| if (node->hinstrument && node->hashtable) |
| ExecHashAccumInstrumentation(node->hinstrument, node->hashtable); |
| } |
| |
| /* |
| * Retrieve instrumentation data from workers before the DSM segment is |
| * detached, so that EXPLAIN can access it. |
| */ |
| void |
| ExecHashRetrieveInstrumentation(HashState *node) |
| { |
| SharedHashInfo *shared_info = node->shared_info; |
| size_t size; |
| |
| |
| if (shared_info == NULL) |
| return; |
| |
| /* Replace node->shared_info with a copy in backend-local memory. */ |
| size = offsetof(SharedHashInfo, hinstrument) + |
| shared_info->num_workers * sizeof(HashInstrumentation); |
| node->shared_info = palloc(size); |
| memcpy(node->shared_info, shared_info, size); |
| } |
| |
| /* |
| * Accumulate instrumentation data from 'hashtable' into an |
| * initially-zeroed HashInstrumentation struct. |
| * |
| * This is used to merge information across successive hash table instances |
| * within a single plan node. We take the maximum values of each interesting |
| * number. The largest nbuckets and largest nbatch values might have occurred |
| * in different instances, so there's some risk of confusion from reporting |
| * unrelated numbers; but there's a bigger risk of misdiagnosing a performance |
| * issue if we don't report the largest values. Similarly, we want to report |
| * the largest spacePeak regardless of whether it happened in the same |
| * instance as the largest nbuckets or nbatch. All the instances should have |
| * the same nbuckets_original and nbatch_original; but there's little value |
| * in depending on that here, so handle them the same way. |
| */ |
| void |
| ExecHashAccumInstrumentation(HashInstrumentation *instrument, |
| HashJoinTable hashtable) |
| { |
| instrument->nbuckets = Max(instrument->nbuckets, |
| hashtable->nbuckets); |
| instrument->nbuckets_original = Max(instrument->nbuckets_original, |
| hashtable->nbuckets_original); |
| instrument->nbatch = Max(instrument->nbatch, |
| hashtable->nbatch); |
| instrument->nbatch_original = Max(instrument->nbatch_original, |
| hashtable->nbatch_original); |
| instrument->space_peak = Max(instrument->space_peak, |
| hashtable->spacePeak); |
| } |
| |
| /* |
| * Allocate 'size' bytes from the currently active HashMemoryChunk |
| */ |
| static void * |
| dense_alloc(HashJoinTable hashtable, Size size) |
| { |
| HashMemoryChunk newChunk; |
| char *ptr; |
| |
| /* just in case the size is not already aligned properly */ |
| size = MAXALIGN(size); |
| |
| /* |
| * If tuple size is larger than threshold, allocate a separate chunk. |
| */ |
| if (size > HASH_CHUNK_THRESHOLD) |
| { |
| /* allocate new chunk and put it at the beginning of the list */ |
| newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, |
| HASH_CHUNK_HEADER_SIZE + size); |
| newChunk->maxlen = size; |
| newChunk->used = size; |
| newChunk->ntuples = 1; |
| |
| /* |
| * Add this chunk to the list after the first existing chunk, so that |
| * we don't lose the remaining space in the "current" chunk. |
| */ |
| if (hashtable->chunks != NULL) |
| { |
| newChunk->next = hashtable->chunks->next; |
| hashtable->chunks->next.unshared = newChunk; |
| } |
| else |
| { |
| newChunk->next.unshared = hashtable->chunks; |
| hashtable->chunks = newChunk; |
| } |
| |
| return HASH_CHUNK_DATA(newChunk); |
| } |
| |
| /* |
| * See if we have enough space for it in the current chunk (if any). If |
| * not, allocate a fresh chunk. |
| */ |
| if ((hashtable->chunks == NULL) || |
| (hashtable->chunks->maxlen - hashtable->chunks->used) < size) |
| { |
| /* allocate new chunk and put it at the beginning of the list */ |
| newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, |
| HASH_CHUNK_HEADER_SIZE + HASH_CHUNK_SIZE); |
| |
| newChunk->maxlen = HASH_CHUNK_SIZE; |
| newChunk->used = size; |
| newChunk->ntuples = 1; |
| |
| newChunk->next.unshared = hashtable->chunks; |
| hashtable->chunks = newChunk; |
| |
| return HASH_CHUNK_DATA(newChunk); |
| } |
| |
| /* There is enough space in the current chunk, let's add the tuple */ |
| ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used; |
| hashtable->chunks->used += size; |
| hashtable->chunks->ntuples += 1; |
| |
| /* return pointer to the start of the tuple memory */ |
| return ptr; |
| } |
| |
| /* |
| * Allocate space for a tuple in shared dense storage. This is equivalent to |
| * dense_alloc but for Parallel Hash using shared memory. |
| * |
| * While loading a tuple into shared memory, we might run out of memory and |
| * decide to repartition, or determine that the load factor is too high and |
| * decide to expand the bucket array, or discover that another participant has |
| * commanded us to help do that. Return NULL if number of buckets or batches |
| * has changed, indicating that the caller must retry (considering the |
| * possibility that the tuple no longer belongs in the same batch). |
| */ |
| static HashJoinTuple |
| ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, |
| dsa_pointer *shared) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| dsa_pointer chunk_shared; |
| HashMemoryChunk chunk; |
| Size chunk_size; |
| HashJoinTuple result; |
| int curbatch = hashtable->curbatch; |
| |
| size = MAXALIGN(size); |
| |
| /* |
| * Fast path: if there is enough space in this backend's current chunk, |
| * then we can allocate without any locking. |
| */ |
| chunk = hashtable->current_chunk; |
| if (chunk != NULL && |
| size <= HASH_CHUNK_THRESHOLD && |
| chunk->maxlen - chunk->used >= size) |
| { |
| |
| chunk_shared = hashtable->current_chunk_shared; |
| Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); |
| *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; |
| result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used); |
| chunk->used += size; |
| |
| Assert(chunk->used <= chunk->maxlen); |
| Assert(result == dsa_get_address(hashtable->area, *shared)); |
| |
| return result; |
| } |
| |
| /* Slow path: try to allocate a new chunk. */ |
| LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| |
| /* |
| * Check if we need to help increase the number of buckets or batches. |
| */ |
| if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || |
| pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| { |
| ParallelHashGrowth growth = pstate->growth; |
| |
| hashtable->current_chunk = NULL; |
| LWLockRelease(&pstate->lock); |
| |
| /* Another participant has commanded us to help grow. */ |
| if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) |
| ExecParallelHashIncreaseNumBatches(hashtable); |
| else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| ExecParallelHashIncreaseNumBuckets(hashtable); |
| |
| /* The caller must retry. */ |
| return NULL; |
| } |
| |
| /* Oversized tuples get their own chunk. */ |
| if (size > HASH_CHUNK_THRESHOLD) |
| chunk_size = size + HASH_CHUNK_HEADER_SIZE; |
| else |
| chunk_size = HASH_CHUNK_SIZE; |
| |
| /* Check if it's time to grow batches or buckets. */ |
| if (pstate->growth != PHJ_GROWTH_DISABLED) |
| { |
| Assert(curbatch == 0); |
| Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER); |
| |
| /* |
| * Check if our space limit would be exceeded. To avoid choking on |
| * very large tuples or very low hash_mem setting, we'll always allow |
| * each backend to allocate at least one chunk. |
| */ |
| if (hashtable->batches[0].at_least_one_chunk && |
| hashtable->batches[0].shared->size + |
| chunk_size > pstate->space_allowed) |
| { |
| pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
| hashtable->batches[0].shared->space_exhausted = true; |
| LWLockRelease(&pstate->lock); |
| |
| return NULL; |
| } |
| |
| /* Check if our load factor limit would be exceeded. */ |
| if (hashtable->nbatch == 1) |
| { |
| hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; |
| hashtable->batches[0].ntuples = 0; |
| /* Guard against integer overflow and alloc size overflow */ |
| if (hashtable->batches[0].shared->ntuples + 1 > |
| hashtable->nbuckets * gp_hashjoin_tuples_per_bucket && |
| hashtable->nbuckets < (INT_MAX / 2) && |
| hashtable->nbuckets * 2 <= |
| MaxAllocSize / sizeof(dsa_pointer_atomic)) |
| { |
| pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; |
| LWLockRelease(&pstate->lock); |
| |
| return NULL; |
| } |
| } |
| } |
| |
| /* We are cleared to allocate a new chunk. */ |
| chunk_shared = dsa_allocate(hashtable->area, chunk_size); |
| hashtable->batches[curbatch].shared->size += chunk_size; |
| hashtable->batches[curbatch].at_least_one_chunk = true; |
| |
| /* Set up the chunk. */ |
| chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); |
| *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; |
| chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; |
| chunk->used = size; |
| |
| /* |
| * Push it onto the list of chunks, so that it can be found if we need to |
| * increase the number of buckets or batches (batch 0 only) and later for |
| * freeing the memory (all batches). |
| */ |
| chunk->next.shared = hashtable->batches[curbatch].shared->chunks; |
| hashtable->batches[curbatch].shared->chunks = chunk_shared; |
| |
| if (size <= HASH_CHUNK_THRESHOLD) |
| { |
| /* |
| * Make this the current chunk so that we can use the fast path to |
| * fill the rest of it up in future calls. |
| */ |
| hashtable->current_chunk = chunk; |
| hashtable->current_chunk_shared = chunk_shared; |
| } |
| LWLockRelease(&pstate->lock); |
| |
| Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared)); |
| result = (HashJoinTuple) HASH_CHUNK_DATA(chunk); |
| |
| return result; |
| } |
| |
| /* |
| * One backend needs to set up the shared batch state including tuplestores. |
| * Other backends will ensure they have correctly configured accessors by |
| * called ExecParallelHashEnsureBatchAccessors(). |
| */ |
| static void |
| ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| ParallelHashJoinBatch *batches; |
| MemoryContext oldcxt; |
| int i; |
| |
| Assert(hashtable->batches == NULL); |
| |
| /* Allocate space. */ |
| pstate->batches = |
| dsa_allocate0(hashtable->area, |
| EstimateParallelHashJoinBatch(hashtable) * nbatch); |
| pstate->nbatch = nbatch; |
| batches = dsa_get_address(hashtable->area, pstate->batches); |
| |
| /* |
| * Use hash join spill memory context to allocate accessors, including |
| * buffers for the temporary files. |
| */ |
| oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); |
| |
| /* Allocate this backend's accessor array. */ |
| hashtable->nbatch = nbatch; |
| hashtable->batches = |
| palloc0_array(ParallelHashJoinBatchAccessor, hashtable->nbatch); |
| |
| /* Set up the shared state, tuplestores and backend-local accessors. */ |
| for (i = 0; i < hashtable->nbatch; ++i) |
| { |
| ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; |
| ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); |
| char name[MAXPGPATH]; |
| |
| /* |
| * All members of shared were zero-initialized. We just need to set |
| * up the Barrier. |
| */ |
| BarrierInit(&shared->batch_barrier, 0); |
| if (i == 0) |
| { |
| /* Batch 0 doesn't need to be loaded. */ |
| BarrierAttach(&shared->batch_barrier); |
| while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE) |
| BarrierArriveAndWait(&shared->batch_barrier, 0); |
| BarrierDetach(&shared->batch_barrier); |
| } |
| |
| /* Initialize accessor state. All members were zero-initialized. */ |
| accessor->shared = shared; |
| |
| /* Initialize the shared tuplestores. */ |
| snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch); |
| accessor->inner_tuples = |
| sts_initialize(ParallelHashJoinBatchInner(shared), |
| pstate->nparticipants, |
| hashtable->hjstate->worker_id, |
| sizeof(uint32), |
| SHARED_TUPLESTORE_SINGLE_PASS, |
| &pstate->fileset, |
| name); |
| snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch); |
| accessor->outer_tuples = |
| sts_initialize(ParallelHashJoinBatchOuter(shared, |
| pstate->nparticipants), |
| pstate->nparticipants, |
| hashtable->hjstate->worker_id, |
| sizeof(uint32), |
| SHARED_TUPLESTORE_SINGLE_PASS, |
| &pstate->fileset, |
| name); |
| } |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| /* |
| * Free the current set of ParallelHashJoinBatchAccessor objects. |
| */ |
| static void |
| ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) |
| { |
| int i; |
| |
| for (i = 0; i < hashtable->nbatch; ++i) |
| { |
| /* Make sure no files are left open. */ |
| sts_end_write(hashtable->batches[i].inner_tuples); |
| sts_end_write(hashtable->batches[i].outer_tuples); |
| sts_end_parallel_scan(hashtable->batches[i].inner_tuples); |
| sts_end_parallel_scan(hashtable->batches[i].outer_tuples); |
| } |
| pfree(hashtable->batches); |
| hashtable->batches = NULL; |
| } |
| |
| /* |
| * Make sure this backend has up-to-date accessors for the current set of |
| * batches. |
| */ |
| static void |
| ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| ParallelHashJoinBatch *batches; |
| MemoryContext oldcxt; |
| int i; |
| |
| if (hashtable->batches != NULL) |
| { |
| if (hashtable->nbatch == pstate->nbatch) |
| return; |
| ExecParallelHashCloseBatchAccessors(hashtable); |
| } |
| |
| /* |
| * We should never see a state where the batch-tracking array is freed, |
| * because we should have given up sooner if we join when the build |
| * barrier has reached the PHJ_BUILD_FREE phase. |
| */ |
| Assert(DsaPointerIsValid(pstate->batches)); |
| |
| /* |
| * Use hash join spill memory context to allocate accessors, including |
| * buffers for the temporary files. |
| */ |
| oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); |
| |
| /* Allocate this backend's accessor array. */ |
| hashtable->nbatch = pstate->nbatch; |
| hashtable->batches = |
| palloc0_array(ParallelHashJoinBatchAccessor, hashtable->nbatch); |
| |
| /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ |
| batches = (ParallelHashJoinBatch *) |
| dsa_get_address(hashtable->area, pstate->batches); |
| |
| /* Set up the accessor array and attach to the tuplestores. */ |
| for (i = 0; i < hashtable->nbatch; ++i) |
| { |
| ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; |
| ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); |
| |
| accessor->shared = shared; |
| accessor->preallocated = 0; |
| accessor->done = false; |
| accessor->outer_eof = false; |
| accessor->inner_tuples = |
| sts_attach(ParallelHashJoinBatchInner(shared), |
| hashtable->hjstate->worker_id, |
| &pstate->fileset); |
| accessor->outer_tuples = |
| sts_attach(ParallelHashJoinBatchOuter(shared, |
| pstate->nparticipants), |
| hashtable->hjstate->worker_id, |
| &pstate->fileset); |
| } |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| /* |
| * Allocate an empty shared memory hash table for a given batch. |
| */ |
| void |
| ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) |
| { |
| ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; |
| dsa_pointer_atomic *buckets; |
| int nbuckets = hashtable->parallel_state->nbuckets; |
| int i; |
| |
| batch->buckets = |
| dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); |
| buckets = (dsa_pointer_atomic *) |
| dsa_get_address(hashtable->area, batch->buckets); |
| for (i = 0; i < nbuckets; ++i) |
| dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
| } |
| |
| /* |
| * If we are currently attached to a shared hash join batch, detach. If we |
| * are last to detach, clean up. |
| */ |
| void |
| ExecHashTableDetachBatch(HashJoinTable hashtable) |
| { |
| if (hashtable->parallel_state != NULL && |
| hashtable->curbatch >= 0) |
| { |
| int curbatch = hashtable->curbatch; |
| ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; |
| bool attached = true; |
| |
| /* Make sure any temporary files are closed. */ |
| sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); |
| sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); |
| |
| /* After attaching we always get at least to PHJ_BATCH_PROBE. */ |
| Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || |
| BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); |
| |
| /* |
| * If we're abandoning the PHJ_BATCH_PROBE phase early without having |
| * reached the end of it, it means the plan doesn't want any more |
| * tuples, and it is happy to abandon any tuples buffered in this |
| * process's subplans. For correctness, we can't allow any process to |
| * execute the PHJ_BATCH_SCAN phase, because we will never have the |
| * complete set of match bits. Therefore we skip emitting unmatched |
| * tuples in all backends (if this is a full/right join), as if those |
| * tuples were all due to be emitted by this process and it has |
| * abandoned them too. |
| */ |
| if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && |
| !hashtable->batches[curbatch].outer_eof) |
| { |
| /* |
| * This flag may be written to by multiple backends during |
| * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN |
| * phase so requires no extra locking. |
| */ |
| batch->skip_unmatched = true; |
| } |
| |
| /* |
| * Even if we aren't doing a full/right outer join, we'll step through |
| * the PHJ_BATCH_SCAN phase just to maintain the invariant that |
| * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. |
| */ |
| if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE) |
| attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); |
| |
| /* |
| * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware) |
| * If phs_lasj_has_null is true, that means we have found null when building hash table, |
| * there were no batches to detach. |
| */ |
| if (!hashtable->parallel_state->phs_lasj_has_null && attached && BarrierArriveAndDetach(&batch->batch_barrier)) |
| { |
| /* |
| * We are not longer attached to the batch barrier, but we're the |
| * process that was chosen to free resources and it's safe to |
| * assert the current phase. The ParallelHashJoinBatch can't go |
| * away underneath us while we are attached to the build barrier, |
| * making this access safe. |
| */ |
| Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); |
| |
| /* Free shared chunks and buckets. */ |
| while (DsaPointerIsValid(batch->chunks)) |
| { |
| HashMemoryChunk chunk = |
| dsa_get_address(hashtable->area, batch->chunks); |
| dsa_pointer next = chunk->next.shared; |
| |
| dsa_free(hashtable->area, batch->chunks); |
| batch->chunks = next; |
| } |
| if (DsaPointerIsValid(batch->buckets)) |
| { |
| dsa_free(hashtable->area, batch->buckets); |
| batch->buckets = InvalidDsaPointer; |
| } |
| } |
| |
| /* |
| * Track the largest batch we've been attached to. Though each |
| * backend might see a different subset of batches, explain.c will |
| * scan the results from all backends to find the largest value. |
| */ |
| hashtable->spacePeak = |
| Max(hashtable->spacePeak, |
| batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); |
| |
| /* Remember that we are not attached to a batch. */ |
| hashtable->curbatch = -1; |
| } |
| } |
| |
| /* |
| * Detach from all shared resources. If we are last to detach, clean up. |
| */ |
| void |
| ExecHashTableDetach(HashJoinTable hashtable) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| |
| /* |
| * If we're involved in a parallel query, we must either have gotten all |
| * the way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE. |
| */ |
| Assert(!pstate || |
| BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUN); |
| |
| if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN) |
| { |
| int i; |
| |
| /* Make sure any temporary files are closed. */ |
| if (hashtable->batches) |
| { |
| for (i = 0; i < hashtable->nbatch; ++i) |
| { |
| sts_end_write(hashtable->batches[i].inner_tuples); |
| sts_end_write(hashtable->batches[i].outer_tuples); |
| sts_end_parallel_scan(hashtable->batches[i].inner_tuples); |
| sts_end_parallel_scan(hashtable->batches[i].outer_tuples); |
| } |
| } |
| |
| /* If we're last to detach, clean up shared memory. */ |
| if (BarrierArriveAndDetach(&pstate->build_barrier)) |
| { |
| /* |
| * Late joining processes will see this state and give up |
| * immediately. |
| */ |
| Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_FREE); |
| |
| if (DsaPointerIsValid(pstate->batches)) |
| { |
| dsa_free(hashtable->area, pstate->batches); |
| pstate->batches = InvalidDsaPointer; |
| } |
| } |
| } |
| hashtable->parallel_state = NULL; |
| } |
| |
| /* |
| * Get the first tuple in a given bucket identified by number. |
| */ |
| static inline HashJoinTuple |
| ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) |
| { |
| HashJoinTuple tuple; |
| dsa_pointer p; |
| |
| Assert(hashtable->parallel_state); |
| p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); |
| tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); |
| |
| return tuple; |
| } |
| |
| /* |
| * Get the next tuple in the same bucket as 'tuple'. |
| */ |
| static inline HashJoinTuple |
| ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) |
| { |
| HashJoinTuple next; |
| |
| Assert(hashtable->parallel_state); |
| next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); |
| |
| return next; |
| } |
| |
| /* |
| * Insert a tuple at the front of a chain of tuples in DSA memory atomically. |
| */ |
| static inline void |
| ExecParallelHashPushTuple(dsa_pointer_atomic *head, |
| HashJoinTuple tuple, |
| dsa_pointer tuple_shared) |
| { |
| for (;;) |
| { |
| tuple->next.shared = dsa_pointer_atomic_read(head); |
| if (dsa_pointer_atomic_compare_exchange(head, |
| &tuple->next.shared, |
| tuple_shared)) |
| break; |
| } |
| } |
| |
| /* |
| * Prepare to work on a given batch. |
| */ |
| void |
| ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) |
| { |
| Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); |
| |
| hashtable->curbatch = batchno; |
| hashtable->buckets.shared = (dsa_pointer_atomic *) |
| dsa_get_address(hashtable->area, |
| hashtable->batches[batchno].shared->buckets); |
| hashtable->nbuckets = hashtable->parallel_state->nbuckets; |
| hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); |
| hashtable->current_chunk = NULL; |
| hashtable->current_chunk_shared = InvalidDsaPointer; |
| hashtable->batches[batchno].at_least_one_chunk = false; |
| } |
| |
| /* |
| * Take the next available chunk from the queue of chunks being worked on in |
| * parallel. Return NULL if there are none left. Otherwise return a pointer |
| * to the chunk, and set *shared to the DSA pointer to the chunk. |
| */ |
| static HashMemoryChunk |
| ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| HashMemoryChunk chunk; |
| |
| LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| if (DsaPointerIsValid(pstate->chunk_work_queue)) |
| { |
| *shared = pstate->chunk_work_queue; |
| chunk = (HashMemoryChunk) |
| dsa_get_address(hashtable->area, *shared); |
| pstate->chunk_work_queue = chunk->next.shared; |
| } |
| else |
| chunk = NULL; |
| LWLockRelease(&pstate->lock); |
| |
| return chunk; |
| } |
| |
| /* |
| * Increase the space preallocated in this backend for a given inner batch by |
| * at least a given amount. This allows us to track whether a given batch |
| * would fit in memory when loaded back in. Also increase the number of |
| * batches or buckets if required. |
| * |
| * This maintains a running estimation of how much space will be taken when we |
| * load the batch back into memory by simulating the way chunks will be handed |
| * out to workers. It's not perfectly accurate because the tuples will be |
| * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but |
| * it should be pretty close. It tends to overestimate by a fraction of a |
| * chunk per worker since all workers gang up to preallocate during hashing, |
| * but workers tend to reload batches alone if there are enough to go around, |
| * leaving fewer partially filled chunks. This effect is bounded by |
| * nparticipants. |
| * |
| * Return false if the number of batches or buckets has changed, and the |
| * caller should reconsider which batch a given tuple now belongs in and call |
| * again. |
| */ |
| static bool |
| ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) |
| { |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; |
| size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); |
| |
| Assert(batchno > 0); |
| Assert(batchno < hashtable->nbatch); |
| Assert(size == MAXALIGN(size)); |
| |
| LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| |
| /* Has another participant commanded us to help grow? */ |
| if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || |
| pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| { |
| ParallelHashGrowth growth = pstate->growth; |
| |
| LWLockRelease(&pstate->lock); |
| if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) |
| ExecParallelHashIncreaseNumBatches(hashtable); |
| else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| ExecParallelHashIncreaseNumBuckets(hashtable); |
| |
| return false; |
| } |
| |
| if (pstate->growth != PHJ_GROWTH_DISABLED && |
| batch->at_least_one_chunk && |
| (batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE |
| > pstate->space_allowed)) |
| { |
| /* |
| * We have determined that this batch would exceed the space budget if |
| * loaded into memory. Command all participants to help repartition. |
| */ |
| batch->shared->space_exhausted = true; |
| pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
| LWLockRelease(&pstate->lock); |
| |
| return false; |
| } |
| |
| batch->at_least_one_chunk = true; |
| batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; |
| batch->preallocated = want; |
| LWLockRelease(&pstate->lock); |
| |
| return true; |
| } |
| |
| /* |
| * Calculate the limit on how much memory can be used by Hash and similar |
| * plan types. This is work_mem times hash_mem_multiplier, and is |
| * expressed in bytes. |
| * |
| * Exported for use by the planner, as well as other hash-like executor |
| * nodes. This is a rather random place for this, but there is no better |
| * place. |
| * |
| * GPDB_14_MERGE_FIXME: Postgres uses work_mem to control the memory usage of a query operation(e.g. sort and hash table), |
| * while Greenplum use statement_mem to control the memory used by a statement. |
| * Although work_mem is marked as deprecated, there are many places of our code using it, |
| * which introduces confusion for developers. We will unify these two configurations someday. |
| * Besides work_mem, Postgres also uses hash_mem_multiplier to increase the memory usage of hash table |
| * to avoid spilling to disk. Currently, it's used by get_hash_memory_limit to control the memory usage, |
| * and no other places actually need it to multiply with work_mem. |
| */ |
| size_t |
| get_hash_memory_limit(void) |
| { |
| double mem_limit; |
| |
| /* Do initial calculation in double arithmetic */ |
| mem_limit = (double) work_mem * hash_mem_multiplier * 1024.0; |
| |
| /* Clamp in case it doesn't fit in size_t */ |
| mem_limit = Min(mem_limit, (double) SIZE_MAX); |
| |
| return (size_t) mem_limit; |
| } |
| |
| /* |
| * Convert AttrFilter to ScanKeyData and send these runtime filters to the |
| * target node(seqscan). |
| */ |
| void |
| PushdownRuntimeFilter(HashState *node) |
| { |
| ListCell *lc; |
| List *scankeys; |
| ScanKey sk; |
| AttrFilter *attr_filter; |
| |
| foreach (lc, node->filters) |
| { |
| scankeys = NIL; |
| |
| attr_filter = lfirst(lc); |
| if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) |
| continue; |
| |
| /* bloom filter */ |
| sk = (ScanKey)palloc0(sizeof(ScanKeyData)); |
| sk->sk_flags = SK_BLOOM_FILTER; |
| sk->sk_attno = attr_filter->lattno; |
| sk->sk_subtype = INT8OID; |
| sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); |
| scankeys = lappend(scankeys, sk); |
| |
| /* range filter */ |
| sk = (ScanKey)palloc0(sizeof(ScanKeyData)); |
| sk->sk_flags = 0; |
| sk->sk_attno = attr_filter->lattno; |
| sk->sk_strategy = BTGreaterEqualStrategyNumber; |
| sk->sk_subtype = INT8OID; |
| sk->sk_argument = attr_filter->min; |
| scankeys = lappend(scankeys, sk); |
| |
| sk = (ScanKey)palloc0(sizeof(ScanKeyData)); |
| sk->sk_flags = 0; |
| sk->sk_attno = attr_filter->lattno; |
| sk->sk_strategy = BTLessEqualStrategyNumber; |
| sk->sk_subtype = INT8OID; |
| sk->sk_argument = attr_filter->max; |
| scankeys = lappend(scankeys, sk); |
| |
| /* append new runtime filters to target node */ |
| SeqScanState *sss = castNode(SeqScanState, attr_filter->target); |
| sss->filters = list_concat(sss->filters, scankeys); |
| } |
| } |
| |
| static void |
| AddTupleValuesIntoRF(HashState *node, TupleTableSlot *slot) |
| { |
| Datum val; |
| bool isnull; |
| ListCell *lc; |
| AttrFilter *attr_filter; |
| |
| foreach (lc, node->filters) |
| { |
| attr_filter = (AttrFilter *) lfirst(lc); |
| |
| val = slot_getattr(slot, attr_filter->rattno, &isnull); |
| if (isnull) |
| continue; |
| |
| attr_filter->empty = false; |
| |
| if ((int64_t)val < (int64_t)attr_filter->min) |
| attr_filter->min = val; |
| |
| if ((int64_t)val > (int64_t)attr_filter->max) |
| attr_filter->max = val; |
| |
| if (attr_filter->blm_filter) |
| bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); |
| } |
| } |
| |
| void |
| FreeRuntimeFilter(HashState *node) |
| { |
| ListCell *lc; |
| AttrFilter *attr_filter; |
| |
| if (!node->filters) |
| return; |
| |
| foreach (lc, node->filters) |
| { |
| attr_filter = lfirst(lc); |
| if (attr_filter->blm_filter) |
| bloom_free(attr_filter->blm_filter); |
| } |
| |
| list_free_deep(node->filters); |
| node->filters = NIL; |
| } |
| |
| void |
| ResetRuntimeFilter(HashState *node) |
| { |
| ListCell *lc; |
| AttrFilter *attr_filter; |
| SeqScanState *sss; |
| |
| if (!node->filters) |
| return; |
| |
| foreach (lc, node->filters) |
| { |
| attr_filter = lfirst(lc); |
| attr_filter->empty = true; |
| |
| if (IsA(attr_filter->target, SeqScanState)) |
| { |
| sss = castNode(SeqScanState, attr_filter->target); |
| if (sss->filters) |
| { |
| list_free_deep(sss->filters); |
| sss->filters = NIL; |
| } |
| } |
| |
| if (attr_filter->blm_filter) |
| bloom_free(attr_filter->blm_filter); |
| |
| attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, |
| work_mem, |
| random()); |
| |
| StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); |
| StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); |
| attr_filter->min = LONG_MAX; |
| attr_filter->max = LONG_MIN; |
| } |
| } |