| /*------------------------------------------------------------------------- |
| * |
| * nodeHashjoin.c |
| * Routines to handle hash join nodes |
| * |
| * Portions Copyright (c) 2005-2008, Greenplum inc |
| * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. |
| * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/executor/nodeHashjoin.c |
| * |
| * HASH JOIN |
| * |
| * This is based on the "hybrid hash join" algorithm described shortly in the |
| * following page |
| * |
| * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join |
| * |
| * and in detail in the referenced paper: |
| * |
| * "An Adaptive Hash Join Algorithm for Multiuser Environments" |
| * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference. |
| * Brisbane: 186–197. |
| * |
| * If the inner side tuples of a hash join do not fit in memory, the hash join |
| * can be executed in multiple batches. |
| * |
| * If the statistics on the inner side relation are accurate, planner chooses a |
| * multi-batch strategy and estimates the number of batches. |
| * |
| * The query executor measures the real size of the hashtable and increases the |
| * number of batches if the hashtable grows too large. |
| * |
| * The number of batches is always a power of two, so an increase in the number |
| * of batches doubles it. |
| * |
| * Serial hash join measures batch size lazily -- waiting until it is loading a |
| * batch to determine if it will fit in memory. While inserting tuples into the |
| * hashtable, serial hash join will, if that tuple were to exceed work_mem, |
| * dump out the hashtable and reassign them either to other batch files or the |
| * current batch resident in the hashtable. |
| * |
| * Parallel hash join, on the other hand, completes all changes to the number |
| * of batches during the build phase. If it increases the number of batches, it |
| * dumps out all the tuples from all batches and reassigns them to entirely new |
| * batch files. Then it checks every batch to ensure it will fit in the space |
| * budget for the query. |
| * |
| * In both parallel and serial hash join, the executor currently makes a best |
| * effort. If a particular batch will not fit in memory, it tries doubling the |
| * number of batches. If after a batch increase, there is a batch which |
| * retained all or none of its tuples, the executor disables growth in the |
| * number of batches globally. After growth is disabled, all batches that would |
| * have previously triggered an increase in the number of batches instead |
| * exceed the space allowed. |
| * |
| * PARALLELISM |
| * |
| * Hash joins can participate in parallel query execution in several ways. A |
| * parallel-oblivious hash join is one where the node is unaware that it is |
| * part of a parallel plan. In this case, a copy of the inner plan is used to |
| * build a copy of the hash table in every backend, and the outer plan could |
| * either be built from a partial or complete path, so that the results of the |
| * hash join are correspondingly either partial or complete. A parallel-aware |
| * hash join is one that behaves differently, coordinating work between |
| * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel |
| * Hash Join always appears with a Parallel Hash node. |
| * |
| * Parallel-aware hash joins use the same per-backend state machine to track |
| * progress through the hash join algorithm as parallel-oblivious hash joins. |
| * In a parallel-aware hash join, there is also a shared state machine that |
| * co-operating backends use to synchronize their local state machines and |
| * program counters. The shared state machine is managed with a Barrier IPC |
| * primitive. When all attached participants arrive at a barrier, the phase |
| * advances and all waiting participants are released. |
| * |
| * When a participant begins working on a parallel hash join, it must first |
| * figure out how much progress has already been made, because participants |
| * don't wait for each other to begin. For this reason there are switch |
| * statements at key points in the code where we have to synchronize our local |
| * state machine with the phase, and then jump to the correct part of the |
| * algorithm so that we can get started. |
| * |
| * One barrier called build_barrier is used to coordinate the hashing phases. |
| * The phase is represented by an integer which begins at zero and increments |
| * one by one, but in the code it is referred to by symbolic names as follows. |
| * An asterisk indicates a phase that is performed by a single arbitrarily |
| * chosen process. |
| * |
| * PHJ_BUILD_ELECT -- initial state |
| * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0 |
| * PHJ_BUILD_HASH_INNER -- all hash the inner rel |
| * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer |
| * PHJ_BUILD_RUN -- building done, probing can begin |
| * PHJ_BUILD_FREE* -- all work complete, one frees batches |
| * |
| * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may |
| * be used repeatedly as required to coordinate expansions in the number of |
| * batches or buckets. Their phases are as follows: |
| * |
| * PHJ_GROW_BATCHES_ELECT -- initial state |
| * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches |
| * PHJ_GROW_BATCHES_REPARTITION -- all repartition |
| * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up |
| * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle |
| * |
| * PHJ_GROW_BUCKETS_ELECT -- initial state |
| * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets |
| * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples |
| * |
| * If the planner got the number of batches and buckets right, those won't be |
| * necessary, but on the other hand we might finish up needing to expand the |
| * buckets or batches multiple times while hashing the inner relation to stay |
| * within our memory budget and load factor target. For that reason it's a |
| * separate pair of barriers using circular phases. |
| * |
| * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins, |
| * because we need to divide the outer relation into batches up front in order |
| * to be able to process batches entirely independently. In contrast, the |
| * parallel-oblivious algorithm simply throws tuples 'forward' to 'later' |
| * batches whenever it encounters them while scanning and probing, which it |
| * can do because it processes batches in serial order. |
| * |
| * Once PHJ_BUILD_RUN is reached, backends then split up and process |
| * different batches, or gang up and work together on probing batches if there |
| * aren't enough to go around. For each batch there is a separate barrier |
| * with the following phases: |
| * |
| * PHJ_BATCH_ELECT -- initial state |
| * PHJ_BATCH_ALLOCATE* -- one allocates buckets |
| * PHJ_BATCH_LOAD -- all load the hash table from disk |
| * PHJ_BATCH_PROBE -- all probe |
| * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan |
| * PHJ_BATCH_FREE* -- one frees memory |
| * |
| * Batch 0 is a special case, because it starts out in phase |
| * PHJ_BATCH_PROBE; populating batch 0's hash table is done during |
| * PHJ_BUILD_HASH_INNER so we can skip loading. |
| * |
| * Initially we try to plan for a single-batch hash join using the combined |
| * hash_mem of all participants to create a large shared hash table. If that |
| * turns out either at planning or execution time to be impossible then we |
| * fall back to regular hash_mem sized hash tables. |
| * |
| * To avoid deadlocks, we never wait for any barrier unless it is known that |
| * all other backends attached to it are actively executing the node or have |
| * finished. Practically, that means that we never emit a tuple while attached |
| * to a barrier, unless the barrier has reached a phase that means that no |
| * process will wait on it again. We emit tuples while attached to the build |
| * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase |
| * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN |
| * respectively without waiting, using BarrierArriveAndDetach() and |
| * BarrierArriveAndDetachExceptLast() respectively. The last to detach |
| * receives a different return value so that it knows that it's safe to |
| * clean up. Any straggler process that attaches after that phase is reached |
| * will see that it's too late to participate or access the relevant shared |
| * memory objects. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/htup_details.h" |
| #include "access/parallel.h" |
| #include "executor/executor.h" |
| #include "executor/hashjoin.h" |
| #include "executor/instrument.h" /* Instrumentation */ |
| #include "executor/nodeHash.h" |
| #include "executor/nodeHashjoin.h" |
| #include "executor/nodeRuntimeFilter.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "utils/guc.h" |
| #include "utils/fmgroids.h" |
| #include "utils/memutils.h" |
| #include "utils/sharedtuplestore.h" |
| |
| #include "cdb/cdbvars.h" |
| #include "miscadmin.h" /* work_mem */ |
| #include "utils/faultinjector.h" |
| |
| /* |
| * States of the ExecHashJoin state machine |
| */ |
| #define HJ_BUILD_HASHTABLE 1 |
| #define HJ_NEED_NEW_OUTER 2 |
| #define HJ_SCAN_BUCKET 3 |
| #define HJ_FILL_OUTER_TUPLE 4 |
| #define HJ_FILL_INNER_TUPLES 5 |
| #define HJ_NEED_NEW_BATCH 6 |
| |
| /* Returns true if doing null-fill on outer relation */ |
| #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL) |
| /* Returns true if doing null-fill on inner relation */ |
| #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL) |
| |
| static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, |
| HashJoinState *hjstate, |
| uint32 *hashvalue); |
| static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, |
| HashJoinState *hjstate, |
| uint32 *hashvalue); |
| static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, |
| BufFile *file, |
| uint32 *hashvalue, |
| TupleTableSlot *tupleSlot); |
| static bool ExecHashJoinNewBatch(HashJoinState *hjstate); |
| static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); |
| static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate); |
| |
| #ifdef USE_ASSERT_CHECKING |
| static bool isNotDistinctJoin(List *qualList); |
| #endif |
| static void ReleaseHashTable(HashJoinState *node); |
| static void SpillCurrentBatch(HashJoinState *node); |
| static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); |
| static void ExecEagerFreeHashJoin(HashJoinState *node); |
| static void CreateRuntimeFilter(HashJoinState* hjstate); |
| static bool IsEqualOp(Expr *expr); |
| static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); |
| static bool CheckTargetNode(PlanState *node, |
| AttrNumber attno, |
| AttrNumber *lattno); |
| static List *FindTargetNodes(HashJoinState *hjstate, |
| AttrNumber attno, |
| AttrNumber *lattno); |
| static AttrFilter *CreateAttrFilter(PlanState *target, |
| AttrNumber lattno, |
| AttrNumber rattno, |
| double plan_rows); |
| extern bool Test_print_prefetch_joinqual; |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecHashJoinImpl |
| * |
| * This function implements the Hybrid Hashjoin algorithm. It is marked |
| * with an always-inline attribute so that ExecHashJoin() and |
| * ExecParallelHashJoin() can inline it. Compilers that respect the |
| * attribute should create versions specialized for parallel == true and |
| * parallel == false with unnecessary branches removed. |
| * |
| * Note: the relation we build hash table on is the "inner" |
| * the other one is "outer". |
| * ---------------------------------------------------------------- |
| */ |
| static pg_attribute_always_inline TupleTableSlot * |
| ExecHashJoinImpl(PlanState *pstate, bool parallel) |
| { |
| HashJoinState *node = castNode(HashJoinState, pstate); |
| PlanState *outerNode; |
| HashState *hashNode; |
| ExprState *joinqual; |
| ExprState *otherqual; |
| ExprContext *econtext; |
| HashJoinTable hashtable; |
| TupleTableSlot *outerTupleSlot; |
| uint32 hashvalue; |
| int batchno; |
| ParallelHashJoinState *parallel_state; |
| EState *estate; |
| |
| /* |
| * get information from HashJoin node |
| */ |
| estate = node->js.ps.state; |
| joinqual = node->js.joinqual; |
| otherqual = node->js.ps.qual; |
| hashNode = (HashState *) innerPlanState(node); |
| outerNode = outerPlanState(node); |
| hashtable = node->hj_HashTable; |
| econtext = node->js.ps.ps_ExprContext; |
| parallel_state = hashNode->parallel_state; |
| /* CBDB_PARALLEL_FIXME: When parallel is true and parallel_state is NULL */ |
| parallel = parallel && (parallel_state != NULL); |
| |
| /* |
| * Reset per-tuple memory context to free any expression evaluation |
| * storage allocated in the previous tuple cycle. |
| */ |
| ResetExprContext(econtext); |
| |
| /* |
| * Executor try to squelch nodes in it‘s subtree after a node returning a NULL tuple. |
| * If the chgParam is not null, squelching is not safe. |
| * If outer node gets empty tuple, squelching the outer node is too early. |
| * To fix that, we should add delayEagerFree logic to Limit node, |
| * to not call ExecSquelchNode() when the node might get rescanned later. |
| */ |
| if (outerNode->chgParam != NULL) |
| node->delayEagerFree = true; |
| /* |
| * run the hash join state machine |
| */ |
| for (;;) |
| { |
| /* We must never use an eagerly released hash table */ |
| Assert(hashtable == NULL || !hashtable->eagerlyReleased); |
| /* |
| * It's possible to iterate this loop many times before returning a |
| * tuple, in some pathological cases such as needing to move much of |
| * the current batch to a later batch. So let's check for interrupts |
| * each time through. |
| */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| switch (node->hj_JoinState) |
| { |
| case HJ_BUILD_HASHTABLE: |
| |
| /* |
| * First time through: build hash table for inner relation. |
| */ |
| Assert(hashtable == NULL); |
| |
| /* |
| * If the outer relation is completely empty, and it's not |
| * right/right-anti/full join, we can quit without building |
| * the hash table. However, for an inner join it is only a |
| * win to check this when the outer relation's startup cost is |
| * less than the projected cost of building the hash table. |
| * Otherwise it's best to build the hash table first and see |
| * if the inner relation is empty. (When it's a left join, we |
| * should always make this check, since we aren't going to be |
| * able to skip the join on the strength of an empty inner |
| * relation anyway.) |
| * |
| * So now prefetch_inner is set (see createplan.c) if we have *any* motion |
| * below us. If we don't have any motion, it doesn't matter. |
| * |
| * See motion_sanity_walker() for details on how a deadlock may occur. |
| */ |
| if (!node->prefetch_inner) |
| { |
| /* |
| * If the outer relation is completely empty, and it's not |
| * right/full join, we can quit without building the hash |
| * table. However, for an inner join it is only a win to |
| * check this when the outer relation's startup cost is less |
| * than the projected cost of building the hash table. |
| * Otherwise it's best to build the hash table first and see |
| * if the inner relation is empty. (When it's a left join, we |
| * should always make this check, since we aren't going to be |
| * able to skip the join on the strength of an empty inner |
| * relation anyway.) |
| * |
| * If we are rescanning the join, we make use of information |
| * gained on the previous scan: don't bother to try the |
| * prefetch if the previous scan found the outer relation |
| * nonempty. This is not 100% reliable since with new |
| * parameters the outer relation might yield different |
| * results, but it's a good heuristic. |
| * |
| * The only way to make the check is to try to fetch a tuple |
| * from the outer plan node. If we succeed, we have to stash |
| * it away for later consumption by ExecHashJoinOuterGetTuple. |
| */ |
| if (HJ_FILL_INNER(node)) |
| { |
| /* no chance to not build the hash table */ |
| node->hj_FirstOuterTupleSlot = NULL; |
| } |
| else if (parallel) |
| { |
| /* |
| * The empty-outer optimization is not implemented for |
| * shared hash tables, because no one participant can |
| * determine that there are no outer tuples, and it's not |
| * yet clear that it's worth the synchronization overhead |
| * of reaching consensus to figure that out. So we have |
| * to build the hash table. |
| */ |
| node->hj_FirstOuterTupleSlot = NULL; |
| } |
| else if (HJ_FILL_OUTER(node) || |
| (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && |
| !node->hj_OuterNotEmpty)) |
| { |
| node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode); |
| if (TupIsNull(node->hj_FirstOuterTupleSlot)) |
| { |
| node->hj_OuterNotEmpty = false; |
| return NULL; |
| } |
| else |
| node->hj_OuterNotEmpty = true; |
| } |
| else |
| node->hj_FirstOuterTupleSlot = NULL; |
| } |
| else |
| node->hj_FirstOuterTupleSlot = NULL; |
| |
| /* |
| * Create the hash table. If using Parallel Hash, then |
| * whoever gets here first will create the hash table and any |
| * later arrivals will merely attach to it. |
| */ |
| hashtable = ExecHashTableCreate(hashNode, |
| node, |
| node->hj_HashOperators, |
| node->hj_Collations, |
| /* |
| * hashNode->hs_keepnull is required to support using IS NOT DISTINCT FROM as hash condition |
| * For example, in ORCA, `explain SELECT t2.a FROM t2 INTERSECT (SELECT t1.a FROM t1);` |
| */ |
| HJ_FILL_INNER(node) || hashNode->hs_keepnull, |
| PlanStateOperatorMemKB((PlanState *) hashNode)); |
| node->hj_HashTable = hashtable; |
| |
| /* |
| * CDB: Offer extra info for EXPLAIN ANALYZE. |
| */ |
| if ((estate->es_instrument & INSTRUMENT_CDB)) |
| ExecHashTableExplainInit(hashNode, node, hashtable); |
| |
| /* |
| * Only if doing a LASJ_NOTIN join, we want to quit as soon as we find |
| * a NULL key on the inner side |
| */ |
| hashNode->hs_quit_if_hashkeys_null = (node->js.jointype == JOIN_LASJ_NOTIN); |
| |
| /* |
| * Execute the Hash node, to build the hash table. If using |
| * Parallel Hash, then we'll try to help hashing unless we |
| * arrived too late. |
| */ |
| hashNode->hashtable = hashtable; |
| (void) MultiExecProcNode((PlanState *) hashNode); |
| |
| #ifdef HJDEBUG |
| elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by executing subplan for batch 0", hashtable->totalTuples); |
| #endif |
| |
| /** |
| * If LASJ_NOTIN and a null was found on the inner side, then clean out. |
| */ |
| if (node->js.jointype == JOIN_LASJ_NOTIN && hashNode->hs_hashkeys_null) |
| return NULL; |
| |
| /* |
| * If the inner relation is completely empty, and we're not |
| * doing a left outer join, we can quit without scanning the |
| * outer relation. |
| */ |
| if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) |
| { |
| if (parallel) |
| { |
| /* |
| * Advance the build barrier to PHJ_BUILD_RUN before |
| * proceeding so we can negotiate resource cleanup. |
| */ |
| Barrier *build_barrier = ¶llel_state->build_barrier; |
| |
| while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN) |
| BarrierArriveAndWait(build_barrier, 0); |
| } |
| return NULL; |
| } |
| |
| /* |
| * Prefetch JoinQual or NonJoinQual to prevent motion hazard. |
| * |
| * See ExecPrefetchQual() for details. |
| */ |
| if (node->prefetch_joinqual) |
| { |
| ExecPrefetchQual(&node->js, true); |
| node->prefetch_joinqual = false; |
| } |
| |
| if (node->prefetch_qual) |
| { |
| ExecPrefetchQual(&node->js, false); |
| node->prefetch_qual = false; |
| } |
| |
| /* |
| * We just scanned the entire inner side and built the hashtable |
| * (and its overflow batches). Check here and remember if the inner |
| * side is empty. |
| */ |
| node->hj_InnerEmpty = (hashtable->totalTuples == 0); |
| |
| /* |
| * need to remember whether nbatch has increased since we |
| * began scanning the outer relation |
| */ |
| hashtable->nbatch_outstart = hashtable->nbatch; |
| |
| /* |
| * Reset OuterNotEmpty for scan. (It's OK if we fetched a |
| * tuple above, because ExecHashJoinOuterGetTuple will |
| * immediately set it again.) |
| */ |
| node->hj_OuterNotEmpty = false; |
| |
| if (parallel) |
| { |
| Barrier *build_barrier; |
| Barrier *outer_motion_barrier = ¶llel_state->outer_motion_barrier; |
| |
| build_barrier = ¶llel_state->build_barrier; |
| Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER || |
| BarrierPhase(build_barrier) == PHJ_BUILD_RUN || |
| BarrierPhase(build_barrier) == PHJ_BUILD_FREE); |
| if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER) |
| { |
| /* |
| * If multi-batch, we need to hash the outer relation |
| * up front. |
| */ |
| if (hashtable->nbatch > 1) |
| ExecParallelHashJoinPartitionOuter(node); |
| /* |
| * CBDB_PARALLEL |
| * If outer side has motion behind, we need to wait for all siblings |
| * before next phase. |
| */ |
| if (((HashJoin *)node->js.ps.plan)->outer_motionhazard) |
| BarrierArriveAndWait(outer_motion_barrier, WAIT_EVENT_PARALLEL_FINISH); |
| |
| BarrierArriveAndWait(build_barrier, |
| WAIT_EVENT_HASH_BUILD_HASH_OUTER); |
| } |
| else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE) |
| { |
| /* |
| * If we attached so late that the job is finished and |
| * the batch state has been freed, we can return |
| * immediately. |
| */ |
| return NULL; |
| } |
| |
| /* Each backend should now select a batch to work on. */ |
| Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN); |
| hashtable->curbatch = -1; |
| node->hj_JoinState = HJ_NEED_NEW_BATCH; |
| |
| continue; |
| } |
| else |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| |
| /* FALL THRU */ |
| |
| case HJ_NEED_NEW_OUTER: |
| |
| /* For a rescannable hash table we might need to reload batch 0 during rescan */ |
| if (hashtable->curbatch == -1 && !hashtable->first_pass) |
| { |
| hashtable->curbatch = 0; |
| if (!ExecHashJoinReloadHashTable(node)) |
| return NULL; |
| } |
| |
| /* |
| * We don't have an outer tuple, try to get the next one |
| */ |
| if (parallel) |
| outerTupleSlot = |
| ExecParallelHashJoinOuterGetTuple(outerNode, node, |
| &hashvalue); |
| else |
| outerTupleSlot = |
| ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue); |
| |
| if (TupIsNull(outerTupleSlot)) |
| { |
| /* end of batch, or maybe whole join */ |
| if (HJ_FILL_INNER(node)) |
| { |
| /* set up to scan for unmatched inner tuples */ |
| if (parallel) |
| { |
| /* |
| * Only one process is currently allow to handle |
| * each batch's unmatched tuples, in a parallel |
| * join. |
| */ |
| if (ExecParallelPrepHashTableForUnmatched(node)) |
| node->hj_JoinState = HJ_FILL_INNER_TUPLES; |
| else |
| node->hj_JoinState = HJ_NEED_NEW_BATCH; |
| } |
| else |
| { |
| ExecPrepHashTableForUnmatched(node); |
| node->hj_JoinState = HJ_FILL_INNER_TUPLES; |
| } |
| } |
| else |
| node->hj_JoinState = HJ_NEED_NEW_BATCH; |
| continue; |
| } |
| |
| econtext->ecxt_outertuple = outerTupleSlot; |
| node->hj_MatchedOuter = false; |
| |
| /* |
| * Find the corresponding bucket for this tuple in the main |
| * hash table or skew hash table. |
| */ |
| node->hj_CurHashValue = hashvalue; |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, |
| &node->hj_CurBucketNo, &batchno); |
| node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable, |
| hashvalue); |
| node->hj_CurTuple = NULL; |
| |
| /* |
| * The tuple might not belong to the current batch (where |
| * "current batch" includes the skew buckets if any). |
| */ |
| if (batchno != hashtable->curbatch && |
| node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) |
| { |
| bool shouldFree; |
| MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, |
| &shouldFree); |
| |
| /* |
| * Need to postpone this outer tuple to a later batch. |
| * Save it in the corresponding outer-batch file. |
| */ |
| Assert(parallel_state == NULL); |
| Assert(batchno > hashtable->curbatch); |
| ExecHashJoinSaveTuple(&node->js.ps, mintuple, |
| hashvalue, |
| hashtable, |
| &hashtable->outerBatchFile[batchno], |
| hashtable->bfCxt); |
| |
| if (shouldFree) |
| heap_free_minimal_tuple(mintuple); |
| |
| /* Loop around, staying in HJ_NEED_NEW_OUTER state */ |
| continue; |
| } |
| |
| /* OK, let's scan the bucket for matches */ |
| node->hj_JoinState = HJ_SCAN_BUCKET; |
| |
| /* FALL THRU */ |
| |
| case HJ_SCAN_BUCKET: |
| |
| /* |
| * OPT-3325: Handle NULLs in the outer side of LASJ_NOTIN |
| * - if tuple is NULL and inner is not empty, drop outer tuple |
| * - if tuple is NULL and inner is empty, keep going as we'll |
| * find no match for this tuple in the inner side |
| */ |
| if (node->js.jointype == JOIN_LASJ_NOTIN && |
| !node->hj_InnerEmpty && |
| isJoinExprNull(node->hj_OuterHashKeys,econtext)) |
| { |
| node->hj_MatchedOuter = true; |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| continue; |
| } |
| |
| /* |
| * Scan the selected hash bucket for matches to current outer |
| */ |
| if (parallel) |
| { |
| if (!ExecParallelScanHashBucket(hashNode, node, econtext)) |
| { |
| /* out of matches; check for possible outer-join fill */ |
| node->hj_JoinState = HJ_FILL_OUTER_TUPLE; |
| continue; |
| } |
| } |
| else |
| { |
| if (!ExecScanHashBucket(hashNode, node, econtext)) |
| { |
| /* out of matches; check for possible outer-join fill */ |
| node->hj_JoinState = HJ_FILL_OUTER_TUPLE; |
| continue; |
| } |
| } |
| |
| /* |
| * We've got a match, but still need to test non-hashed quals. |
| * ExecScanHashBucket already set up all the state needed to |
| * call ExecQual. |
| * |
| * If we pass the qual, then save state for next call and have |
| * ExecProject form the projection, store it in the tuple |
| * table, and return the slot. |
| * |
| * Only the joinquals determine tuple match status, but all |
| * quals must pass to actually return the tuple. |
| */ |
| if (joinqual == NULL || ExecQual(joinqual, econtext)) |
| { |
| node->hj_MatchedOuter = true; |
| |
| |
| /* |
| * This is really only needed if HJ_FILL_INNER(node), but |
| * we'll avoid the branch and just set it always. |
| */ |
| if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) |
| HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); |
| |
| /* In an antijoin, we never return a matched tuple */ |
| if (node->js.jointype == JOIN_ANTI || |
| node->js.jointype == JOIN_LASJ_NOTIN) |
| { |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| continue; |
| } |
| |
| /* |
| * If we only need to consider the first matching inner |
| * tuple, then advance to next outer tuple after we've |
| * processed this one. |
| */ |
| if (node->js.single_match) |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| |
| /* |
| * In a right-antijoin, we never return a matched tuple. |
| * If it's not an inner_unique join, we need to stay on |
| * the current outer tuple to continue scanning the inner |
| * side for matches. |
| */ |
| if (node->js.jointype == JOIN_RIGHT_ANTI) |
| continue; |
| |
| if (otherqual == NULL || ExecQual(otherqual, econtext)) |
| return ExecProject(node->js.ps.ps_ProjInfo); |
| else |
| InstrCountFiltered2(node, 1); |
| } |
| else |
| InstrCountFiltered1(node, 1); |
| break; |
| |
| case HJ_FILL_OUTER_TUPLE: |
| |
| /* |
| * The current outer tuple has run out of matches, so check |
| * whether to emit a dummy outer-join tuple. Whether we emit |
| * one or not, the next state is NEED_NEW_OUTER. |
| */ |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| |
| if (!node->hj_MatchedOuter && |
| HJ_FILL_OUTER(node)) |
| { |
| /* |
| * Generate a fake join tuple with nulls for the inner |
| * tuple, and return it if it passes the non-join quals. |
| */ |
| econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; |
| |
| if (otherqual == NULL || ExecQual(otherqual, econtext)) |
| return ExecProject(node->js.ps.ps_ProjInfo); |
| else |
| InstrCountFiltered2(node, 1); |
| } |
| break; |
| |
| case HJ_FILL_INNER_TUPLES: |
| |
| /* |
| * We have finished a batch, but we are doing |
| * right/right-anti/full join, so any unmatched inner tuples |
| * in the hashtable have to be emitted before we continue to |
| * the next batch. |
| */ |
| if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) |
| : ExecScanHashTableForUnmatched(node, econtext))) |
| { |
| /* no more unmatched tuples */ |
| node->hj_JoinState = HJ_NEED_NEW_BATCH; |
| continue; |
| } |
| |
| /* |
| * Generate a fake join tuple with nulls for the outer tuple, |
| * and return it if it passes the non-join quals. |
| */ |
| econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot; |
| |
| if (otherqual == NULL || ExecQual(otherqual, econtext)) |
| return ExecProject(node->js.ps.ps_ProjInfo); |
| else |
| InstrCountFiltered2(node, 1); |
| break; |
| |
| case HJ_NEED_NEW_BATCH: |
| |
| /* |
| * Try to advance to next batch. Done if there are no more. |
| */ |
| if (parallel) |
| { |
| if (!ExecParallelHashJoinNewBatch(node)) |
| return NULL; /* end of parallel-aware join */ |
| } |
| else |
| { |
| if (!ExecHashJoinNewBatch(node)) |
| return NULL; /* end of parallel-oblivious join */ |
| } |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| break; |
| |
| default: |
| elog(ERROR, "unrecognized hashjoin state: %d", |
| (int) node->hj_JoinState); |
| } |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecHashJoin |
| * |
| * Parallel-oblivious version. |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * /* return: a tuple or NULL */ |
| ExecHashJoin(PlanState *pstate) |
| { |
| TupleTableSlot *result; |
| |
| /* |
| * On sufficiently smart compilers this should be inlined with the |
| * parallel-aware branches removed. |
| */ |
| result = ExecHashJoinImpl(pstate, false); |
| |
| if (TupIsNull(result) && !((HashJoinState *) pstate)->reuse_hashtable |
| && !((HashJoinState *) pstate)->delayEagerFree) |
| { |
| /* |
| * CDB: We'll read no more from inner subtree. To keep our |
| * sibling QEs from being starved, tell source QEs not to |
| * clog up the pipeline with our never-to-be-consumed |
| * data. |
| */ |
| ExecSquelchNode(pstate, false); |
| } |
| |
| return result; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecParallelHashJoin |
| * |
| * Parallel-aware version. |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * /* return: a tuple or NULL */ |
| ExecParallelHashJoin(PlanState *pstate) |
| { |
| TupleTableSlot *result; |
| |
| /* |
| * On sufficiently smart compilers this should be inlined with the |
| * parallel-oblivious branches removed. |
| */ |
| result = ExecHashJoinImpl(pstate, true); |
| |
| if (TupIsNull(result) && !((HashJoinState *) pstate)->reuse_hashtable |
| && !((HashJoinState *) pstate)->delayEagerFree) |
| { |
| /* |
| * CDB: We'll read no more from inner subtree. To keep our |
| * sibling QEs from being starved, tell source QEs not to |
| * clog up the pipeline with our never-to-be-consumed |
| * data. |
| */ |
| ExecSquelchNode(pstate, false); |
| } |
| |
| return result; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecInitHashJoin |
| * |
| * Init routine for HashJoin node. |
| * ---------------------------------------------------------------- |
| */ |
| HashJoinState * |
| ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) |
| { |
| HashJoinState *hjstate; |
| PlanState *outerState; |
| HashState *hstate; |
| Plan *outerNode; |
| Hash *hashNode; |
| TupleDesc outerDesc, |
| innerDesc; |
| const TupleTableSlotOps *ops; |
| |
| /* check for unsupported flags */ |
| Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
| |
| /* |
| * create state structure |
| */ |
| hjstate = makeNode(HashJoinState); |
| hjstate->js.ps.plan = (Plan *) node; |
| hjstate->js.ps.state = estate; |
| hjstate->reuse_hashtable = (eflags & EXEC_FLAG_REWIND) != 0; |
| |
| /* |
| * If eflag contains EXEC_FLAG_REWIND, |
| * then this node is not eager free safe. |
| */ |
| hjstate->delayEagerFree = (eflags & EXEC_FLAG_REWIND) != 0; |
| /* |
| * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker() |
| * where this function may be replaced with a parallel version, if we |
| * managed to launch a parallel query. |
| */ |
| if (node->join.plan.parallel_aware) |
| { |
| hjstate->js.ps.ExecProcNode = ExecParallelHashJoin; |
| } |
| else |
| { |
| hjstate->js.ps.ExecProcNode = ExecHashJoin; |
| } |
| hjstate->js.jointype = node->join.jointype; |
| |
| /* |
| * Miscellaneous initialization |
| * |
| * create expression context for node |
| */ |
| ExecAssignExprContext(estate, &hjstate->js.ps); |
| |
| if (node->hashqualclauses != NIL) |
| { |
| /* CDB: This must be an IS NOT DISTINCT join! */ |
| Assert(isNotDistinctJoin(node->hashqualclauses)); |
| hjstate->hj_nonequijoin = true; |
| } |
| else |
| hjstate->hj_nonequijoin = false; |
| |
| /* |
| * MPP-3300, we only pre-build hashtable if we need to (this is relaxing |
| * the fix to MPP-989) |
| */ |
| hjstate->prefetch_inner = node->join.prefetch_inner; |
| hjstate->prefetch_joinqual = node->join.prefetch_joinqual; |
| hjstate->prefetch_qual = node->join.prefetch_qual; |
| |
| if (Test_print_prefetch_joinqual && hjstate->prefetch_joinqual) |
| elog(NOTICE, |
| "prefetch join qual in slice %d of plannode %d", |
| currentSliceId, ((Plan *) node)->plan_node_id); |
| |
| /* |
| * reuse GUC Test_print_prefetch_joinqual to output debug information for |
| * prefetching non join qual |
| */ |
| if (Test_print_prefetch_joinqual && hjstate->prefetch_qual) |
| elog(NOTICE, |
| "prefetch non join qual in slice %d of plannode %d", |
| currentSliceId, ((Plan *) node)->plan_node_id); |
| |
| /* |
| * initialize child nodes |
| * |
| * Note: we could suppress the REWIND flag for the inner input, which |
| * would amount to betting that the hash will be a single batch. Not |
| * clear if this would be a win or not. |
| */ |
| outerNode = outerPlan(node); |
| hashNode = (Hash *) innerPlan(node); |
| |
| /* |
| * XXX The following order are significant. We init Hash first, then the outerNode |
| * this is the same order as we execute (in the sense of the first exec called). |
| * Until we have a better way to uncouple, share input needs this to be true. If the |
| * order is wrong, when both hash and outer node have share input and (both ?) have |
| * a subquery node, share input will fail because the estate of the nodes can not be |
| * set up correctly. |
| */ |
| innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags); |
| innerDesc = ExecGetResultType(innerPlanState(hjstate)); |
| ((HashState *) innerPlanState(hjstate))->hs_keepnull = hjstate->hj_nonequijoin; |
| |
| outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags); |
| outerDesc = ExecGetResultType(outerPlanState(hjstate)); |
| |
| /* |
| * Initialize result slot, type and projection. |
| */ |
| ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual); |
| ExecAssignProjectionInfo(&hjstate->js.ps, NULL); |
| |
| /* |
| * tuple table initialization |
| */ |
| ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL); |
| hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc, |
| ops); |
| |
| /* |
| * detect whether we need only consider the first matching inner tuple |
| */ |
| hjstate->js.single_match = (node->join.inner_unique || |
| node->join.jointype == JOIN_SEMI); |
| |
| /* set up null tuples for outer joins, if needed */ |
| switch (node->join.jointype) |
| { |
| case JOIN_INNER: |
| case JOIN_SEMI: |
| break; |
| case JOIN_LEFT: |
| case JOIN_ANTI: |
| case JOIN_LASJ_NOTIN: |
| hjstate->hj_NullInnerTupleSlot = |
| ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual); |
| break; |
| case JOIN_RIGHT: |
| case JOIN_RIGHT_ANTI: |
| hjstate->hj_NullOuterTupleSlot = |
| ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual); |
| break; |
| case JOIN_FULL: |
| hjstate->hj_NullOuterTupleSlot = |
| ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual); |
| hjstate->hj_NullInnerTupleSlot = |
| ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual); |
| break; |
| default: |
| elog(ERROR, "unrecognized join type: %d", |
| (int) node->join.jointype); |
| } |
| |
| /* |
| * now for some voodoo. our temporary tuple slot is actually the result |
| * tuple slot of the Hash node (which is our inner plan). we can do this |
| * because Hash nodes don't return tuples via ExecProcNode() -- instead |
| * the hash join node uses ExecScanHashBucket() to get at the contents of |
| * the hash table. -cim 6/9/91 |
| */ |
| { |
| HashState *hashstate = (HashState *) innerPlanState(hjstate); |
| TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot; |
| |
| hjstate->hj_HashTupleSlot = slot; |
| } |
| |
| /* |
| * initialize child expressions |
| */ |
| hjstate->js.ps.qual = |
| ExecInitQual(node->join.plan.qual, (PlanState *) hjstate); |
| hjstate->js.joinqual = |
| ExecInitQual(node->join.joinqual, (PlanState *) hjstate); |
| hjstate->hashclauses = |
| ExecInitQual(node->hashclauses, (PlanState *) hjstate); |
| |
| if (node->hashqualclauses != NIL) |
| { |
| hjstate->hashqualclauses = |
| ExecInitQual(node->hashqualclauses, (PlanState *) hjstate); |
| } |
| else |
| { |
| hjstate->hashqualclauses = hjstate->hashclauses; |
| } |
| |
| /* |
| * initialize hash-specific info |
| */ |
| hjstate->hj_HashTable = NULL; |
| hjstate->hj_FirstOuterTupleSlot = NULL; |
| |
| hjstate->hj_CurHashValue = 0; |
| hjstate->hj_CurBucketNo = 0; |
| hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; |
| hjstate->hj_CurTuple = NULL; |
| |
| hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys, |
| (PlanState *) hjstate); |
| hjstate->hj_HashOperators = node->hashoperators; |
| hjstate->hj_Collations = node->hashcollations; |
| |
| hjstate->hj_JoinState = HJ_BUILD_HASHTABLE; |
| hjstate->hj_MatchedOuter = false; |
| hjstate->hj_OuterNotEmpty = false; |
| hjstate->worker_id = -1; |
| |
| /* Setup the relationship of HashJoin, Hash and RuntimeFilter node. */ |
| hstate = (HashState *) innerPlanState(hjstate); |
| outerState = outerPlanState(hjstate); |
| if (IsA(outerState, RuntimeFilterState)) |
| { |
| RuntimeFilterState *rfstate = (RuntimeFilterState *) outerState; |
| rfstate->hjstate = hjstate; |
| hstate->rfstate = rfstate; |
| ExecInitRuntimeFilterFinish(rfstate, hstate->ps.plan->plan_rows); |
| } |
| |
| if (Gp_role == GP_ROLE_EXECUTE |
| && gp_enable_runtime_filter_pushdown |
| && !estate->useMppParallelMode) |
| CreateRuntimeFilter(hjstate); |
| |
| return hjstate; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecEndHashJoin |
| * |
| * clean up routine for HashJoin node |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecEndHashJoin(HashJoinState *node) |
| { |
| /* |
| * Free hash table |
| */ |
| if (node->hj_HashTable) |
| { |
| if (!node->hj_HashTable->eagerlyReleased) |
| { |
| HashState *hashState = (HashState *) innerPlanState(node); |
| |
| ExecHashTableDestroy(hashState, node->hj_HashTable); |
| } |
| pfree(node->hj_HashTable); |
| node->hj_HashTable = NULL; |
| } |
| |
| /* |
| * Free the exprcontext |
| */ |
| ExecFreeExprContext(&node->js.ps); |
| |
| /* |
| * clean out the tuple table |
| */ |
| ExecClearTuple(node->js.ps.ps_ResultTupleSlot); |
| ExecClearTuple(node->hj_OuterTupleSlot); |
| ExecClearTuple(node->hj_HashTupleSlot); |
| |
| /* |
| * clean up subtrees |
| */ |
| ExecEndNode(outerPlanState(node)); |
| ExecEndNode(innerPlanState(node)); |
| } |
| |
| /* |
| * ExecHashJoinOuterGetTuple |
| * |
| * get the next outer tuple for a parallel oblivious hashjoin: either by |
| * executing the outer plan node in the first pass, or from the temp |
| * files for the hashjoin batches. |
| * |
| * Returns a null slot if no more outer tuples (within the current batch). |
| * |
| * On success, the tuple's hash value is stored at *hashvalue --- this is |
| * either originally computed, or re-read from the temp file. |
| */ |
| static TupleTableSlot * |
| ExecHashJoinOuterGetTuple(PlanState *outerNode, |
| HashJoinState *hjstate, |
| uint32 *hashvalue) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| int curbatch = hashtable->curbatch; |
| TupleTableSlot *slot; |
| ExprContext *econtext; |
| HashState *hashState = (HashState *) innerPlanState(hjstate); |
| |
| /* Read tuples from outer relation only if it's the first batch */ |
| if (curbatch == 0) |
| { |
| /* |
| * Check to see if first outer tuple was already fetched by |
| * ExecHashJoin() and not used yet. |
| */ |
| slot = hjstate->hj_FirstOuterTupleSlot; |
| if (!TupIsNull(slot)) |
| hjstate->hj_FirstOuterTupleSlot = NULL; |
| else |
| slot = ExecProcNode(outerNode); |
| |
| while (!TupIsNull(slot)) |
| { |
| /* |
| * We have to compute the tuple's hash value. |
| */ |
| econtext = hjstate->js.ps.ps_ExprContext; |
| econtext->ecxt_outertuple = slot; |
| |
| bool hashkeys_null = false; |
| bool keep_nulls = HJ_FILL_OUTER(hjstate) || |
| hjstate->hj_nonequijoin; |
| if (ExecHashGetHashValue(hashState, hashtable, econtext, |
| hjstate->hj_OuterHashKeys, |
| true, /* outer tuple */ |
| keep_nulls, |
| hashvalue, |
| &hashkeys_null)) |
| { |
| /* remember outer relation is not empty for possible rescan */ |
| hjstate->hj_OuterNotEmpty = true; |
| |
| return slot; |
| } |
| |
| /* |
| * That tuple couldn't match because of a NULL, so discard it and |
| * continue with the next one. |
| */ |
| slot = ExecProcNode(outerNode); |
| } |
| |
| #ifdef HJDEBUG |
| elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples for batch %d", hashtable->totalTuples, curbatch); |
| #endif |
| } |
| else if (curbatch < hashtable->nbatch) |
| { |
| BufFile *file = hashtable->outerBatchFile[curbatch]; |
| |
| /* |
| * In outer-join cases, we could get here even though the batch file |
| * is empty. |
| */ |
| if (file == NULL) |
| return NULL; |
| |
| /* |
| * For batches > 0, we can be reading many many outer tuples from disk |
| * and probing them against the hashtable. If we don't find any |
| * matches, we'll keep coming back here to read tuples from disk and |
| * returning them (MPP-23213). Break this long tight loop here. |
| */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| if (QueryFinishPending) |
| return NULL; |
| |
| slot = ExecHashJoinGetSavedTuple(hjstate, |
| file, |
| hashvalue, |
| hjstate->hj_OuterTupleSlot); |
| if (!TupIsNull(slot)) |
| return slot; |
| |
| #ifdef HJDEBUG |
| elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples for batch %d", hashtable->totalTuples, curbatch); |
| #endif |
| } |
| |
| /* End of this batch */ |
| return NULL; |
| } |
| |
| /* |
| * ExecHashJoinOuterGetTuple variant for the parallel case. |
| */ |
| static TupleTableSlot * |
| ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, |
| HashJoinState *hjstate, |
| uint32 *hashvalue) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| int curbatch = hashtable->curbatch; |
| TupleTableSlot *slot; |
| HashState *hashState = (HashState *) innerPlanState(hjstate); |
| |
| /* |
| * In the Parallel Hash case we only run the outer plan directly for |
| * single-batch hash joins. Otherwise we have to go to batch files, even |
| * for batch 0. |
| */ |
| if (curbatch == 0 && hashtable->nbatch == 1) |
| { |
| slot = ExecProcNode(outerNode); |
| |
| while (!TupIsNull(slot)) |
| { |
| ExprContext *econtext = hjstate->js.ps.ps_ExprContext; |
| bool hashkeys_null = false; |
| bool keep_nulls; |
| |
| keep_nulls = HJ_FILL_OUTER(hjstate) || |
| hjstate->hj_nonequijoin; |
| |
| econtext->ecxt_outertuple = slot; |
| if (ExecHashGetHashValue(hashState, |
| hashtable, econtext, |
| hjstate->hj_OuterHashKeys, |
| true, /* outer tuple */ |
| keep_nulls, |
| hashvalue, |
| &hashkeys_null)) |
| return slot; |
| |
| /* |
| * That tuple couldn't match because of a NULL, so discard it and |
| * continue with the next one. |
| */ |
| slot = ExecProcNode(outerNode); |
| } |
| } |
| else if (curbatch < hashtable->nbatch) |
| { |
| MinimalTuple tuple; |
| |
| tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, |
| hashvalue); |
| if (tuple != NULL) |
| { |
| ExecForceStoreMinimalTuple(tuple, |
| hjstate->hj_OuterTupleSlot, |
| false); |
| slot = hjstate->hj_OuterTupleSlot; |
| return slot; |
| } |
| else |
| ExecClearTuple(hjstate->hj_OuterTupleSlot); |
| } |
| |
| /* End of this batch */ |
| hashtable->batches[curbatch].outer_eof = true; |
| |
| return NULL; |
| } |
| |
| /* |
| * ExecHashJoinNewBatch |
| * switch to a new hashjoin batch |
| * |
| * Returns true if successful, false if there are no more batches. |
| */ |
| static bool |
| ExecHashJoinNewBatch(HashJoinState *hjstate) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| int nbatch; |
| int curbatch; |
| |
| SIMPLE_FAULT_INJECTOR("exec_hashjoin_new_batch"); |
| |
| HashState *hashState = (HashState *) innerPlanState(hjstate); |
| |
| nbatch = hashtable->nbatch; |
| curbatch = hashtable->curbatch; |
| |
| if (curbatch >= nbatch) |
| return false; |
| |
| if (curbatch >= 0 && hashtable->stats) |
| ExecHashTableExplainBatchEnd(hashState, hashtable); |
| |
| if (curbatch > 0) |
| { |
| /* |
| * We no longer need the previous outer batch file; close it right |
| * away to free disk space. |
| */ |
| if (hashtable->outerBatchFile[curbatch]) |
| BufFileClose(hashtable->outerBatchFile[curbatch]); |
| hashtable->outerBatchFile[curbatch] = NULL; |
| } |
| else /* we just finished the first batch */ |
| { |
| /* |
| * Reset some of the skew optimization state variables, since we no |
| * longer need to consider skew tuples after the first batch. The |
| * memory context reset we are about to do will release the skew |
| * hashtable itself. |
| */ |
| hashtable->skewEnabled = false; |
| hashtable->skewBucket = NULL; |
| hashtable->skewBucketNums = NULL; |
| hashtable->nSkewBuckets = 0; |
| hashtable->spaceUsedSkew = 0; |
| } |
| |
| /* |
| * If we want to keep the hash table around, for re-scan, then write |
| * the current batch's state to disk before moving to the next one. |
| * It's possible that we increase the number of batches later, so that |
| * by the time we reload this file, some of the tuples we wrote here |
| * will logically belong to a later file. ExecHashJoinReloadHashTable |
| * will move such tuples when the file is reloaded. |
| * |
| * If we have already re-scanned, we might still have the old file |
| * around, in which case there's no need to write it again. |
| * XXX: Currently, we actually always re-create it, see comments in |
| * ExecHashJoinReloadHashTable. |
| */ |
| if (nbatch > 1 && hjstate->reuse_hashtable && |
| hashtable->innerBatchFile[curbatch] == NULL) |
| { |
| SpillCurrentBatch(hjstate); |
| } |
| |
| /* |
| * We can always skip over any batches that are completely empty on both |
| * sides. We can sometimes skip over batches that are empty on only one |
| * side, but there are exceptions: |
| * |
| * 1. In a left/full outer join, we have to process outer batches even if |
| * the inner batch is empty. Similarly, in a right/right-anti/full outer |
| * join, we have to process inner batches even if the outer batch is |
| * empty. |
| * |
| * 2. If we have increased nbatch since the initial estimate, we have to |
| * scan inner batches since they might contain tuples that need to be |
| * reassigned to later inner batches. |
| * |
| * 3. Similarly, if we have increased nbatch since starting the outer |
| * scan, we have to rescan outer batches in case they contain tuples that |
| * need to be reassigned. |
| */ |
| curbatch++; |
| while (curbatch < nbatch && |
| (hashtable->outerBatchFile[curbatch] == NULL || |
| hashtable->innerBatchFile[curbatch] == NULL)) |
| { |
| /* |
| * For rescannable we must complete respilling on first batch |
| * |
| * Consider case 2: the inner workfile is not null. We are on the first pass |
| * (before ReScan was called). I.e., we are processing a join for the base |
| * case of a recursive CTE. If the base case does not have tuples for batch |
| * k (i.e., the outer workfile for batch k is null), and we never increased |
| * the initial number of batches, then we will skip the inner batchfile (case 2). |
| * |
| * However, one iteration of recursive CTE is no guarantee that the future outer |
| * batch will also not match batch k on the inner. Therefore, we may have a |
| * non-null outer batch k on some future iteration. |
| * |
| * If during loading batch k inner workfile for future iteration triggers a re-spill |
| * we will be forced to increase number of batches. This will result in wrong result |
| * as we will not write any inner tuples (we consider inner workfiles read-only after |
| * a rescan call). |
| * |
| * So, to produce wrong result, without this guard, the following conditions have |
| * to be true: |
| * |
| * 1. Outer batchfile for batch k is null |
| * 2. Inner batchfile for batch k not null |
| * 3. No resizing of nbatch for batch (0...(k-1)) |
| * 4. Inner batchfile for batch k is too big to fit in memory |
| */ |
| if (hjstate->reuse_hashtable) |
| break; |
| |
| if (hashtable->outerBatchFile[curbatch] && |
| HJ_FILL_OUTER(hjstate)) |
| break; /* must process due to rule 1 */ |
| if (hashtable->innerBatchFile[curbatch] && |
| HJ_FILL_INNER(hjstate)) |
| break; /* must process due to rule 1 */ |
| if (hashtable->innerBatchFile[curbatch] && |
| nbatch != hashtable->nbatch_original) |
| break; /* must process due to rule 2 */ |
| if (hashtable->outerBatchFile[curbatch] && |
| nbatch != hashtable->nbatch_outstart) |
| break; /* must process due to rule 3 */ |
| /* We can ignore this batch. */ |
| /* Release associated temp files right away. */ |
| if (hashtable->innerBatchFile[curbatch] && !hjstate->reuse_hashtable) |
| BufFileClose(hashtable->innerBatchFile[curbatch]); |
| hashtable->innerBatchFile[curbatch] = NULL; |
| if (hashtable->outerBatchFile[curbatch]) |
| BufFileClose(hashtable->outerBatchFile[curbatch]); |
| hashtable->outerBatchFile[curbatch] = NULL; |
| curbatch++; |
| } |
| |
| hashtable->curbatch = curbatch; /* CDB: upd before return, even if no |
| * more data, so stats logic can see |
| * whether join was run to completion */ |
| |
| if (curbatch >= nbatch) |
| return false; /* no more batches */ |
| |
| if (!ExecHashJoinReloadHashTable(hjstate)) |
| { |
| /* We no longer continue as we couldn't load the batch */ |
| return false; |
| } |
| |
| /* |
| * Rewind outer batch file (if present), so that we can start reading it. |
| */ |
| if (hashtable->outerBatchFile[curbatch] != NULL) |
| { |
| if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not rewind hash-join temporary file"))); |
| } |
| |
| return true; |
| } |
| |
| /* |
| * Choose a batch to work on, and attach to it. Returns true if successful, |
| * false if there are no more batches. |
| */ |
| static bool |
| ExecParallelHashJoinNewBatch(HashJoinState *hjstate) |
| { |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| int start_batchno; |
| int batchno; |
| Barrier *batch0_barrier = NULL; |
| ParallelHashJoinState *pstate = hashtable->parallel_state; |
| |
| /* |
| * If we were already attached to a batch, remember not to bother checking |
| * it again, and detach from it (possibly freeing the hash table if we are |
| * last to detach). |
| */ |
| if (hashtable->curbatch >= 0) |
| { |
| hashtable->batches[hashtable->curbatch].done = true; |
| ExecHashTableDetachBatch(hashtable); |
| } |
| |
| /* |
| * Search for a batch that isn't done. We use an atomic counter to start |
| * our search at a different batch in every participant when there are |
| * more batches than participants. |
| */ |
| batchno = start_batchno = |
| pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) % |
| hashtable->nbatch; |
| do |
| { |
| uint32 hashvalue; |
| MinimalTuple tuple; |
| TupleTableSlot *slot; |
| |
| if (!hashtable->batches[batchno].done) |
| { |
| SharedTuplestoreAccessor *inner_tuples; |
| Barrier *batch_barrier = |
| &hashtable->batches[batchno].shared->batch_barrier; |
| int phase = BarrierAttach(batch_barrier); |
| |
| if (hashtable->nbatch == 1 && batchno == 0 && ((HashJoin *)hjstate->js.ps.plan)->batch0_barrier) |
| { |
| Assert(phase == PHJ_BATCH_PROBE); |
| |
| batch0_barrier = &pstate->batch0_barrier; |
| BarrierArriveAndWait(batch0_barrier, WAIT_EVENT_PARALLEL_FINISH); |
| } |
| |
| switch (phase) |
| { |
| |
| case PHJ_BATCH_ELECT: |
| |
| /* One backend allocates the hash table. */ |
| if (BarrierArriveAndWait(batch_barrier, |
| WAIT_EVENT_HASH_BATCH_ELECT)) |
| ExecParallelHashTableAlloc(hashtable, batchno); |
| /* Fall through. */ |
| |
| case PHJ_BATCH_ALLOCATE: |
| /* Wait for allocation to complete. */ |
| BarrierArriveAndWait(batch_barrier, |
| WAIT_EVENT_HASH_BATCH_ALLOCATE); |
| /* Fall through. */ |
| |
| case PHJ_BATCH_LOAD: |
| /* Start (or join in) loading tuples. */ |
| ExecParallelHashTableSetCurrentBatch(hashtable, batchno); |
| inner_tuples = hashtable->batches[batchno].inner_tuples; |
| sts_begin_parallel_scan(inner_tuples); |
| while ((tuple = sts_parallel_scan_next(inner_tuples, |
| &hashvalue))) |
| { |
| ExecForceStoreMinimalTuple(tuple, |
| hjstate->hj_HashTupleSlot, |
| false); |
| slot = hjstate->hj_HashTupleSlot; |
| ExecParallelHashTableInsertCurrentBatch(hashtable, slot, |
| hashvalue); |
| } |
| sts_end_parallel_scan(inner_tuples); |
| BarrierArriveAndWait(batch_barrier, |
| WAIT_EVENT_HASH_BATCH_LOAD); |
| /* Fall through. */ |
| |
| case PHJ_BATCH_PROBE: |
| |
| /* |
| * This batch is ready to probe. Return control to |
| * caller. We stay attached to batch_barrier so that the |
| * hash table stays alive until everyone's finished |
| * probing it, but no participant is allowed to wait at |
| * this barrier again (or else a deadlock could occur). |
| * All attached participants must eventually detach from |
| * the barrier and one worker must advance the phase so |
| * that the final phase is reached. |
| */ |
| ExecParallelHashTableSetCurrentBatch(hashtable, batchno); |
| sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); |
| |
| return true; |
| case PHJ_BATCH_SCAN: |
| |
| /* |
| * In principle, we could help scan for unmatched tuples, |
| * since that phase is already underway (the thing we |
| * can't do under current deadlock-avoidance rules is wait |
| * for others to arrive at PHJ_BATCH_SCAN, because |
| * PHJ_BATCH_PROBE emits tuples, but in this case we just |
| * got here without waiting). That is not yet done. For |
| * now, we just detach and go around again. We have to |
| * use ExecHashTableDetachBatch() because there's a small |
| * chance we'll be the last to detach, and then we're |
| * responsible for freeing memory. |
| */ |
| ExecParallelHashTableSetCurrentBatch(hashtable, batchno); |
| hashtable->batches[batchno].done = true; |
| ExecHashTableDetachBatch(hashtable); |
| break; |
| |
| case PHJ_BATCH_FREE: |
| |
| /* |
| * Already done. Detach and go around again (if any |
| * remain). |
| */ |
| BarrierDetach(batch_barrier); |
| hashtable->batches[batchno].done = true; |
| hashtable->curbatch = -1; |
| break; |
| |
| default: |
| elog(ERROR, "unexpected batch phase %d", |
| BarrierPhase(batch_barrier)); |
| } |
| } |
| batchno = (batchno + 1) % hashtable->nbatch; |
| } while (batchno != start_batchno); |
| |
| return false; |
| } |
| |
| /* |
| * ExecHashJoinSaveTuple |
| * save a tuple to a batch file. |
| * |
| * The data recorded in the file for each tuple is its hash value, |
| * then the tuple in MinimalTuple format. |
| * |
| * fileptr points to a batch file in one of the the hashtable arrays. |
| * |
| * The batch files (and their buffers) are allocated in the spill context |
| * created for the hashtable. |
| */ |
| void |
| ExecHashJoinSaveTuple(PlanState *ps, MinimalTuple tuple, uint32 hashvalue, |
| HashJoinTable hashtable, BufFile **fileptr, |
| MemoryContext bfCxt) |
| { |
| BufFile *file = *fileptr; |
| |
| if (hashtable->work_set == NULL) |
| { |
| /* |
| * First time spilling. |
| */ |
| if (hashtable->hjstate->js.ps.instrument) |
| { |
| hashtable->hjstate->js.ps.instrument->workfileCreated = true; |
| } |
| |
| MemoryContext oldcxt; |
| |
| oldcxt = MemoryContextSwitchTo(bfCxt); |
| hashtable->work_set = workfile_mgr_create_set("HashJoin", NULL, true /* hold pin */); |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| if (file == NULL) |
| { |
| MemoryContext oldcxt; |
| |
| oldcxt = MemoryContextSwitchTo(bfCxt); |
| |
| /* First write to this batch file, so create it */ |
| Assert(hashtable->work_set != NULL); |
| file = BufFileCreateTempInSet("HashJoin", false /* interXact */, |
| hashtable->work_set); |
| BufFilePledgeSequential(file); /* allow compression */ |
| *fileptr = file; |
| |
| elog(gp_workfile_caching_loglevel, "create batch file %s", |
| BufFileGetFilename(file)); |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| BufFileWrite(file, &hashvalue, sizeof(uint32)); |
| BufFileWrite(file, tuple, tuple->t_len); |
| } |
| |
| /* |
| * ExecHashJoinGetSavedTuple |
| * read the next tuple from a batch file. Return NULL if no more. |
| * |
| * On success, *hashvalue is set to the tuple's hash value, and the tuple |
| * itself is stored in the given slot. |
| */ |
| static TupleTableSlot * |
| ExecHashJoinGetSavedTuple(HashJoinState *hjstate, |
| BufFile *file, |
| uint32 *hashvalue, |
| TupleTableSlot *tupleSlot) |
| { |
| uint32 header[2]; |
| size_t nread; |
| MinimalTuple tuple; |
| |
| /* |
| * We check for interrupts here because this is typically taken as an |
| * alternative code path to an ExecProcNode() call, which would include |
| * such a check. |
| */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* |
| * Since both the hash value and the MinimalTuple length word are uint32, |
| * we can read them both in one BufFileRead() call without any type |
| * cheating. |
| */ |
| nread = BufFileReadMaybeEOF(file, header, sizeof(header), true); |
| if (nread == 0) /* end of file */ |
| { |
| ExecClearTuple(tupleSlot); |
| return NULL; |
| } |
| *hashvalue = header[0]; |
| tuple = (MinimalTuple) palloc(header[1]); |
| tuple->t_len = header[1]; |
| BufFileReadExact(file, |
| (char *) tuple + sizeof(uint32), |
| header[1] - sizeof(uint32)); |
| ExecForceStoreMinimalTuple(tuple, tupleSlot, true); |
| return tupleSlot; |
| } |
| |
| |
| void |
| ExecReScanHashJoin(HashJoinState *node) |
| { |
| PlanState *outerPlan = outerPlanState(node); |
| PlanState *innerPlan = innerPlanState(node); |
| |
| /* |
| * In a multi-batch join, we currently have to do rescans the hard way, |
| * primarily because batch temp files may have already been released. But |
| * if it's a single-batch join, and there is no parameter change for the |
| * inner subnode, then we can just re-use the existing hash table without |
| * rebuilding it. |
| */ |
| if (node->hj_HashTable != NULL) |
| { |
| node->hj_HashTable->first_pass = false; |
| |
| if (node->js.ps.righttree->chgParam == NULL && |
| !node->hj_HashTable->eagerlyReleased) |
| { |
| /* |
| * Okay to reuse the hash table; needn't rescan inner, either. |
| * |
| * However, if it's a right/right-anti/full join, we'd better |
| * reset the inner-tuple match flags contained in the table. |
| */ |
| if (HJ_FILL_INNER(node)) |
| ExecHashTableResetMatchFlags(node->hj_HashTable); |
| |
| /* |
| * Also, we need to reset our state about the emptiness of the |
| * outer relation, so that the new scan of the outer will update |
| * it correctly if it turns out to be empty this time. (There's no |
| * harm in clearing it now because ExecHashJoin won't need the |
| * info. In the other cases, where the hash table doesn't exist |
| * or we are destroying it, we leave this state alone because |
| * ExecHashJoin will need it the first time through.) |
| */ |
| node->hj_OuterNotEmpty = false; |
| |
| /* ExecHashJoin can skip the BUILD_HASHTABLE step */ |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| |
| if (node->hj_HashTable->nbatch > 1) |
| { |
| /* Force reloading batch 0 upon next ExecHashJoin */ |
| node->hj_HashTable->curbatch = -1; |
| } |
| else |
| { |
| /* MPP-1600: reset the batch number */ |
| node->hj_HashTable->curbatch = 0; |
| } |
| } |
| else |
| { |
| /* must destroy and rebuild hash table */ |
| if (!node->hj_HashTable->eagerlyReleased) |
| { |
| HashState *hashNode = castNode(HashState, innerPlan); |
| |
| Assert(hashNode->hashtable == node->hj_HashTable); |
| /* accumulate stats from old hash table, if wanted */ |
| /* (this should match ExecShutdownHash) */ |
| if (hashNode->ps.instrument && !hashNode->hinstrument) |
| hashNode->hinstrument = (HashInstrumentation *) |
| palloc0(sizeof(HashInstrumentation)); |
| if (hashNode->hinstrument) |
| ExecHashAccumInstrumentation(hashNode->hinstrument, |
| hashNode->hashtable); |
| /* for safety, be sure to clear child plan node's pointer too */ |
| hashNode->hashtable = NULL; |
| |
| ExecHashTableDestroy(hashNode, node->hj_HashTable); |
| } |
| pfree(node->hj_HashTable); |
| node->hj_HashTable = NULL; |
| node->hj_JoinState = HJ_BUILD_HASHTABLE; |
| |
| /* |
| * if chgParam of subnode is not null then plan will be re-scanned |
| * by first ExecProcNode. |
| */ |
| if (innerPlan->chgParam == NULL) |
| ExecReScan(innerPlan); |
| } |
| } |
| |
| /* Always reset intra-tuple state */ |
| node->hj_CurHashValue = 0; |
| node->hj_CurBucketNo = 0; |
| node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO; |
| node->hj_CurTuple = NULL; |
| |
| node->hj_MatchedOuter = false; |
| node->hj_FirstOuterTupleSlot = NULL; |
| |
| /* |
| * if chgParam of subnode is not null then plan will be re-scanned by |
| * first ExecProcNode. |
| */ |
| if (outerPlan->chgParam == NULL) |
| ExecReScan(outerPlan); |
| } |
| |
| /** |
| * This method releases the hash table's memory. It maintains some of the other |
| * aspects of the hash table like memory usage statistics. These may be required |
| * during an explain analyze. A hash table that has been released cannot perform |
| * any useful function anymore. |
| */ |
| static void |
| ReleaseHashTable(HashJoinState *node) |
| { |
| if (node->hj_HashTable) |
| { |
| HashState *hashState = (HashState *) innerPlanState(node); |
| |
| /* This hashtable should not have been released already! */ |
| Assert(!node->hj_HashTable->eagerlyReleased); |
| if (node->hj_HashTable->stats) |
| { |
| /* Report on batch in progress. */ |
| ExecHashTableExplainBatchEnd(hashState, node->hj_HashTable); |
| } |
| ExecHashTableDestroy(hashState, node->hj_HashTable); |
| node->hj_HashTable->eagerlyReleased = true; |
| } |
| |
| /* Always reset intra-tuple state */ |
| node->hj_CurHashValue = 0; |
| node->hj_CurBucketNo = 0; |
| node->hj_CurTuple = NULL; |
| |
| node->hj_JoinState = HJ_NEED_NEW_OUTER; |
| node->hj_MatchedOuter = false; |
| node->hj_FirstOuterTupleSlot = NULL; |
| |
| } |
| |
| #ifdef USE_ASSERT_CHECKING |
| /* Is this an IS-NOT-DISTINCT-join qual list (as opposed the an equijoin)? |
| * |
| * XXX We perform an abbreviated test based on the assumptions that |
| * these are the only possibilities and that all conjuncts are |
| * alike in this regard. |
| */ |
| static bool |
| isNotDistinctJoin(List *qualList) |
| { |
| ListCell *lc; |
| |
| foreach(lc, qualList) |
| { |
| BoolExpr *bex = (BoolExpr *) lfirst(lc); |
| DistinctExpr *dex; |
| |
| if (IsA(bex, BoolExpr) &&bex->boolop == NOT_EXPR) |
| { |
| dex = (DistinctExpr *) linitial(bex->args); |
| |
| if (IsA(dex, DistinctExpr)) |
| return true; /* We assume the rest follow suit! */ |
| } |
| } |
| return false; |
| } |
| #endif |
| |
| static void |
| ExecEagerFreeHashJoin(HashJoinState *node) |
| { |
| if (node->hj_HashTable != NULL && !node->hj_HashTable->eagerlyReleased && !node->hj_HashTable->parallel_state) |
| { |
| ReleaseHashTable(node); |
| } |
| } |
| |
| void |
| ExecSquelchHashJoin(HashJoinState *node, bool force) |
| { |
| if (!node->js.ps.squelched) |
| { |
| ExecEagerFreeHashJoin(node); |
| node->js.ps.squelched = true; |
| } |
| ExecSquelchNode(outerPlanState(node), force); |
| ExecSquelchNode(innerPlanState(node), force); |
| } |
| |
| |
| /* |
| * In our hybrid hash join we either spill when we increase number of batches |
| * or when we re-spill. As we go, we normally destroy the batch file of the |
| * batch that we have already processed. But if we need to support re-scanning |
| * of the outer tuples, without also re-scanning the inner side, we need to |
| * save the current hash for the next re-scan, instead. |
| */ |
| static void |
| SpillCurrentBatch(HashJoinState *node) |
| { |
| HashJoinTable hashtable = node->hj_HashTable; |
| int curbatch = hashtable->curbatch; |
| HashJoinTuple tuple; |
| int i; |
| |
| Assert(hashtable->innerBatchFile[curbatch] == NULL); |
| |
| for (i = 0; i < hashtable->nbuckets; i++) |
| { |
| /* don't need to consider parallel hashjoins which use shared tuplestores instead of raw files */ |
| tuple = hashtable->buckets.unshared[i]; |
| |
| while (tuple != NULL) |
| { |
| ExecHashJoinSaveTuple(NULL, HJTUPLE_MINTUPLE(tuple), |
| tuple->hashvalue, |
| hashtable, |
| &hashtable->innerBatchFile[curbatch], |
| hashtable->bfCxt); |
| tuple = tuple->next.unshared; |
| } |
| } |
| } |
| |
| static bool |
| ExecHashJoinReloadHashTable(HashJoinState *hjstate) |
| { |
| HashState *hashState = (HashState *) innerPlanState(hjstate); |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| TupleTableSlot *slot; |
| uint32 hashvalue; |
| int curbatch = hashtable->curbatch; |
| int nmoved = 0; |
| #if 0 |
| int orignbatch = hashtable->nbatch; |
| #endif |
| |
| /* |
| * Reload the hash table with the new inner batch (which could be empty) |
| */ |
| ExecHashTableReset(hashState, hashtable); |
| |
| if (hashtable->innerBatchFile[curbatch] != NULL) |
| { |
| /* Rewind batch file */ |
| if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0, SEEK_SET) != 0) |
| { |
| ereport(ERROR, (errcode_for_file_access(), |
| errmsg("could not access temporary file"))); |
| } |
| |
| for (;;) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| if (QueryFinishPending) |
| return false; |
| |
| slot = ExecHashJoinGetSavedTuple(hjstate, |
| hashtable->innerBatchFile[curbatch], |
| &hashvalue, |
| hjstate->hj_HashTupleSlot); |
| if (!slot) |
| break; |
| |
| /* |
| * NOTE: some tuples may be sent to future batches. Also, it is |
| * possible for hashtable->nbatch to be increased here! |
| */ |
| if (!ExecHashTableInsert(hashState, hashtable, slot, hashvalue)) |
| nmoved++; |
| } |
| |
| /* |
| * after we build the hash table, the inner batch file is no longer |
| * needed |
| */ |
| if (hjstate->js.ps.instrument && hjstate->js.ps.instrument->need_cdb) |
| { |
| Assert(hashtable->stats); |
| hashtable->stats->batchstats[curbatch].innerfilesize = |
| BufFileSize(hashtable->innerBatchFile[curbatch]); |
| } |
| |
| SIMPLE_FAULT_INJECTOR("workfile_hashjoin_failure"); |
| |
| /* |
| * If we want to re-use the hash table after a re-scan, don't |
| * delete it yet. But if we did not load the batch file into memory as is, |
| * because some tuples were sent to later batches, then delete it now, so |
| * that it will be recreated with just the remaining tuples, after processing |
| * this batch. |
| * |
| * XXX: Currently, we actually always close the file, and recreate it |
| * afterwards, even if there are no changes. That's because the workfile |
| * API doesn't support appending to a file that's already been read from. |
| * FIXME: could fix that now |
| */ |
| #if 0 |
| if (!hjstate->reuse_hashtable || nmoved > 0 || hashtable->nbatch != orignbatch) |
| #endif |
| { |
| BufFileClose(hashtable->innerBatchFile[curbatch]); |
| hashtable->innerBatchFile[curbatch] = NULL; |
| } |
| } |
| |
| return true; |
| } |
| |
| void |
| ExecShutdownHashJoin(HashJoinState *node) |
| { |
| if (node->hj_HashTable) |
| { |
| /* |
| * Detach from shared state before DSM memory goes away. This makes |
| * sure that we don't have any pointers into DSM memory by the time |
| * ExecEndHashJoin runs. |
| */ |
| ExecHashTableDetachBatch(node->hj_HashTable); |
| ExecHashTableDetach(node->hj_HashTable); |
| } |
| } |
| |
| static void |
| ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) |
| { |
| PlanState *outerState = outerPlanState(hjstate); |
| ExprContext *econtext = hjstate->js.ps.ps_ExprContext; |
| HashJoinTable hashtable = hjstate->hj_HashTable; |
| TupleTableSlot *slot; |
| uint32 hashvalue; |
| int i; |
| HashState *hashState = (HashState *) innerPlanState(hjstate); |
| |
| Assert(hjstate->hj_FirstOuterTupleSlot == NULL); |
| |
| /* Execute outer plan, writing all tuples to shared tuplestores. */ |
| for (;;) |
| { |
| slot = ExecProcNode(outerState); |
| if (TupIsNull(slot)) |
| break; |
| econtext->ecxt_outertuple = slot; |
| |
| bool hashkeys_null = false; |
| bool keep_nulls = HJ_FILL_OUTER(hjstate) || |
| hjstate->hj_nonequijoin; |
| if (ExecHashGetHashValue(hashState, hashtable, econtext, |
| hjstate->hj_OuterHashKeys, |
| true, /* outer tuple */ |
| keep_nulls, |
| &hashvalue, |
| &hashkeys_null)) |
| { |
| int batchno; |
| int bucketno; |
| bool shouldFree; |
| MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| |
| ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, |
| &batchno); |
| sts_puttuple(hashtable->batches[batchno].outer_tuples, |
| &hashvalue, mintup); |
| |
| if (shouldFree) |
| heap_free_minimal_tuple(mintup); |
| } |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| /* Make sure all outer partitions are readable by any backend. */ |
| for (i = 0; i < hashtable->nbatch; ++i) |
| sts_end_write(hashtable->batches[i].outer_tuples); |
| } |
| |
| void |
| ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt) |
| { |
| shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState)); |
| shm_toc_estimate_keys(&pcxt->estimator, 1); |
| } |
| |
| void |
| ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) |
| { |
| int plan_node_id = state->js.ps.plan->plan_node_id; |
| HashState *hashNode; |
| ParallelHashJoinState *pstate; |
| EState *estate = state->js.ps.state; |
| /* |
| * Disable shared hash table mode if we failed to create a real DSM |
| * segment, because that means that we don't have a DSA area to work with. |
| */ |
| if (pcxt->seg == NULL) |
| return; |
| |
| ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); |
| |
| /* |
| * Set up the state needed to coordinate access to the shared hash |
| * table(s), using the plan node ID as the toc key. |
| */ |
| pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState)); |
| shm_toc_insert(pcxt->toc, plan_node_id, pstate); |
| |
| /* |
| * Set up the shared hash join state with no batches initially. |
| * ExecHashTableCreate() will prepare at least one later and set nbatch |
| * and space_allowed. |
| */ |
| pstate->nbatch = 0; |
| pstate->space_allowed = 0; |
| pstate->batches = InvalidDsaPointer; |
| pstate->old_batches = InvalidDsaPointer; |
| pstate->nbuckets = 0; |
| pstate->growth = PHJ_GROWTH_OK; |
| pstate->chunk_work_queue = InvalidDsaPointer; |
| pg_atomic_init_u32(&pstate->distributor, 0); |
| if (estate->useMppParallelMode) |
| pstate->nparticipants = pcxt->nworkers; |
| else |
| pstate->nparticipants = pcxt->nworkers + 1; |
| |
| pstate->total_tuples = 0; |
| LWLockInitialize(&pstate->lock, |
| LWTRANCHE_PARALLEL_HASH_JOIN); |
| BarrierInit(&pstate->build_barrier, 0); |
| BarrierInit(&pstate->grow_batches_barrier, 0); |
| BarrierInit(&pstate->grow_buckets_barrier, 0); |
| |
| BarrierInit(&pstate->sync_barrier, pcxt->nworkers); |
| BarrierInit(&pstate->batch0_barrier, pcxt->nworkers); |
| |
| if (((HashJoin *)state->js.ps.plan)->outer_motionhazard) |
| BarrierInit(&pstate->outer_motion_barrier, pcxt->nworkers); |
| |
| pstate->phs_lasj_has_null = false; |
| |
| /* Set up the space we'll use for shared temporary files. */ |
| SharedFileSetInit(&pstate->fileset, pcxt->seg); |
| state->worker_id = 0; /* First worker process */ |
| |
| /* Initialize the shared state in the hash node. */ |
| hashNode = (HashState *) innerPlanState(state); |
| hashNode->parallel_state = pstate; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecHashJoinReInitializeDSM |
| * |
| * Reset shared state before beginning a fresh scan. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt) |
| { |
| int plan_node_id = state->js.ps.plan->plan_node_id; |
| ParallelHashJoinState *pstate; |
| |
| /* Nothing to do if we failed to create a DSM segment. */ |
| if (pcxt->seg == NULL) |
| return; |
| |
| pstate = shm_toc_lookup(pcxt->toc, plan_node_id, false); |
| |
| /* |
| * It would be possible to reuse the shared hash table in single-batch |
| * cases by resetting and then fast-forwarding build_barrier to |
| * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but |
| * currently shared hash tables are already freed by now (by the last |
| * participant to detach from the batch). We could consider keeping it |
| * around for single-batch joins. We'd also need to adjust |
| * finalize_plan() so that it doesn't record a dummy dependency for |
| * Parallel Hash nodes, preventing the rescan optimization. For now we |
| * don't try. |
| */ |
| |
| /* Detach, freeing any remaining shared memory. */ |
| if (state->hj_HashTable != NULL) |
| { |
| ExecHashTableDetachBatch(state->hj_HashTable); |
| ExecHashTableDetach(state->hj_HashTable); |
| } |
| |
| /* Clear any shared batch files. */ |
| SharedFileSetDeleteAll(&pstate->fileset); |
| |
| /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */ |
| BarrierInit(&pstate->build_barrier, 0); |
| } |
| |
| void |
| ExecHashJoinInitializeWorker(HashJoinState *state, |
| ParallelWorkerContext *pwcxt) |
| { |
| HashState *hashNode; |
| EState *estate = state->js.ps.state; |
| int plan_node_id = state->js.ps.plan->plan_node_id; |
| ParallelHashJoinState *pstate = |
| shm_toc_lookup(pwcxt->toc, plan_node_id, false); |
| |
| /* Attach to the space for shared temporary files. */ |
| SharedFileSetAttach(&pstate->fileset, pwcxt->seg); |
| |
| /* Attach to the shared state in the hash node. */ |
| hashNode = (HashState *) innerPlanState(state); |
| hashNode->parallel_state = pstate; |
| if (estate->useMppParallelMode) |
| state->worker_id = pwcxt->worker_id; |
| else |
| { |
| Assert(ParallelWorkerNumber >= 0); |
| state->worker_id = ParallelWorkerNumber + 1; |
| ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); |
| } |
| } |
| |
| /* |
| * Find "inner var = outer var" in hj->hashclauses and create runtime filter |
| * for it. |
| */ |
| void |
| CreateRuntimeFilter(HashJoinState* hjstate) |
| { |
| AttrNumber lattno, rattno; |
| Expr *expr; |
| JoinType jointype; |
| HashJoin *hj; |
| HashState *hstate; |
| AttrFilter *attr_filter; |
| ListCell *lc; |
| ListCell *lc2; |
| List *targets; |
| |
| /* |
| * A build-side Bloom filter tells us if a row is definitely not in the build |
| * side. This allows us to early-eliminate rows or early-accept rows depending |
| * on the type of join. |
| * Left Outer Join and Full Outer Join output all rows, so a build-side Bloom |
| * filter would only allow us to early-output. Left Antijoin outputs only if |
| * there is no match, so again early output. We don't implement early output |
| * for now. |
| * So it's only applicatable for inner, right and semi join. |
| */ |
| jointype = hjstate->js.jointype; |
| if (jointype != JOIN_INNER && |
| jointype != JOIN_RIGHT && |
| jointype != JOIN_SEMI) |
| return; |
| |
| hstate = castNode(HashState, innerPlanState(hjstate)); |
| hstate->filters = NIL; |
| |
| /* |
| * check and initialize the runtime filter for all hash conds in |
| * hj->hashclauses |
| */ |
| hj = castNode(HashJoin, hjstate->js.ps.plan); |
| foreach (lc, hj->hashclauses) |
| { |
| expr = (Expr *)lfirst(lc); |
| |
| if (!IsEqualOp(expr)) |
| continue; |
| |
| lattno = -1; |
| rattno = -1; |
| if (!CheckEqualArgs(expr, &lattno, &rattno)) |
| continue; |
| |
| if (lattno < 1 || rattno < 1) |
| continue; |
| |
| targets = FindTargetNodes(hjstate, lattno, &lattno); |
| if (lattno == -1 || targets == NULL) |
| continue; |
| |
| foreach(lc2, targets) |
| { |
| PlanState *target = lfirst(lc2); |
| Assert(IsA(target, SeqScanState)); |
| |
| attr_filter = CreateAttrFilter(target, lattno, rattno, |
| hstate->ps.plan->plan_rows); |
| if (attr_filter->blm_filter) |
| hstate->filters = lappend(hstate->filters, attr_filter); |
| else |
| pfree(attr_filter); |
| } |
| } |
| } |
| |
| static bool |
| IsEqualOp(Expr *expr) |
| { |
| Oid funcid = InvalidOid; |
| |
| if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) |
| return false; |
| |
| if (IsA(expr, OpExpr)) |
| funcid = ((OpExpr *)expr)->opfuncid; |
| else if (IsA(expr, FuncExpr)) |
| funcid = ((FuncExpr *)expr)->funcid; |
| else |
| return false; |
| |
| if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ |
| || funcid == F_INT24EQ || funcid == F_INT42EQ |
| || funcid == F_INT28EQ || funcid == F_INT82EQ |
| || funcid == F_INT48EQ || funcid == F_INT84EQ |
| ) |
| return true; |
| |
| return false; |
| } |
| |
| /* |
| * runtime filters which can be pushed down: |
| * 1. hash expr MUST BE equal op; |
| * 2. args MUST BE Var node; |
| * 3. the data type MUST BE integer; |
| */ |
| static bool |
| CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) |
| { |
| Var *var; |
| List *args; |
| |
| if (lattno == NULL || rattno == NULL) |
| return false; |
| |
| if (IsA(expr, OpExpr)) |
| args = ((OpExpr *)expr)->args; |
| else if (IsA(expr, FuncExpr)) |
| args = ((FuncExpr *)expr)->args; |
| else |
| return false; |
| |
| if (!args || list_length(args) != 2) |
| return false; |
| |
| /* check the first arg */ |
| if (!IsA(linitial(args), Var)) |
| return false; |
| |
| var = linitial(args); |
| if (var->varno == INNER_VAR) |
| *rattno = var->varattno; |
| else if (var->varno == OUTER_VAR) |
| *lattno = var->varattno; |
| else |
| return false; |
| |
| /* check the second arg */ |
| if (!IsA(lsecond(args), Var)) |
| return false; |
| |
| var = lsecond(args); |
| if (var->varno == INNER_VAR) |
| *rattno = var->varattno; |
| else if (var->varno == OUTER_VAR) |
| *lattno = var->varattno; |
| else |
| return false; |
| |
| return true; |
| } |
| |
| static bool |
| CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno) |
| { |
| Var *var; |
| TargetEntry *te; |
| |
| if (!IsA(node, SeqScanState)) |
| return false; |
| |
| te = (TargetEntry *)list_nth(node->plan->targetlist, attno - 1); |
| if (!IsA(te->expr, Var)) |
| return false; |
| |
| var = castNode(Var, te->expr); |
| |
| /* system column is not allowed */ |
| if (var->varattno <= 0) |
| return false; |
| |
| *lattno = var->varattno; |
| |
| return true; |
| } |
| |
| /* |
| * it's just allowed like this: |
| * HashJoin |
| * ... a series of HashJoin nodes |
| * HashJoin |
| * SeqScan <- target |
| */ |
| static List * |
| FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) |
| { |
| Var *var; |
| PlanState *child, *parent; |
| TargetEntry *te; |
| List *targetNodes; |
| |
| parent = (PlanState *)hjstate; |
| child = outerPlanState(hjstate); |
| Assert(child); |
| |
| *lattno = -1; |
| targetNodes = NIL; |
| while (true) |
| { |
| /* target is seqscan */ |
| if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) && IsA(child, SeqScanState)) |
| { |
| /* |
| * hashjoin |
| * seqscan |
| * or |
| * hashjoin |
| * result |
| * seqscan |
| */ |
| if (!CheckTargetNode(child, attno, lattno)) |
| return NULL; |
| |
| targetNodes = lappend(targetNodes, child); |
| return targetNodes; |
| } |
| else if (IsA(parent, AppendState) && child == NULL) |
| { |
| /* |
| * append |
| * seqscan on t1_prt_1 |
| * seqscan on t1_prt_2 |
| * ... |
| */ |
| AppendState *as = castNode(AppendState, parent); |
| for (int i = 0; i < as->as_nplans; i++) |
| { |
| child = as->appendplans[i]; |
| if (!CheckTargetNode(child, attno, lattno)) |
| return NULL; |
| |
| targetNodes = lappend(targetNodes, child); |
| } |
| |
| return targetNodes; |
| } |
| |
| /* |
| * hashjoin |
| * result (hash filter) |
| * seqscan on t1, t1 is replicated table |
| * or |
| * hashjoin |
| * append |
| * seqscan on t1_prt_1 |
| * seqscan on t1_prt_2 |
| * ... |
| */ |
| if (!IsA(child, HashJoinState) && !IsA(child, ResultState) && !IsA(child, AppendState)) |
| return NULL; |
| |
| /* child is hashjoin, result or append node */ |
| te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); |
| if (!IsA(te->expr, Var)) |
| return NULL; |
| |
| var = castNode(Var, te->expr); |
| if (var->varno == INNER_VAR) |
| return NULL; |
| |
| attno = var->varattno; |
| |
| /* find at child node */ |
| parent = child; |
| child = outerPlanState(parent); |
| } |
| |
| return NULL; |
| } |
| |
| static AttrFilter* |
| CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno, |
| double plan_rows) |
| { |
| AttrFilter *attr_filter = palloc0(sizeof(AttrFilter)); |
| attr_filter->empty = true; |
| attr_filter->target = target; |
| |
| attr_filter->lattno = lattno; |
| attr_filter->rattno = rattno; |
| |
| attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random()); |
| |
| StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); |
| StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); |
| attr_filter->min = LONG_MAX; |
| attr_filter->max = LONG_MIN; |
| |
| return attr_filter; |
| } |