blob: 2ecdf15d130fa8e4bd845c549e8b3903994be192 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* nodeHash.c
* Routines to hash relations for hashjoin
*
* Portions Copyright (c) 2006-2008, Greenplum inc
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.107.2.1 2007/06/01 15:58:01 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUTINES
* MultiExecHash - generate an in-memory hash table of the relation
* ExecInitHash - initialize node and subnodes
* ExecEndHash - shutdown node and subnodes
*/
#include "postgres.h"
#include <unistd.h>
#include <math.h>
#include <limits.h>
#include "access/hash.h"
#include "executor/execdebug.h"
#include "executor/hashjoin.h"
#include "executor/instrument.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
#include "parser/parse_expr.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/debugbreak.h"
#include "utils/faultinjector.h"
#include "cdb/cdbexplain.h"
#include "cdb/cdbvars.h"
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
static void ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf);
static void
ExecHashTableExplainBatches(HashJoinTable hashtable,
StringInfo buf,
int ibatch_begin,
int ibatch_end,
const char *title);
static void ExecHashTableReallocBatchData(HashJoinTable hashtable, int new_nbatch);
static int ExecChoosePrimeNBuckets(int nbuckets);
void ExecChooseHashTableSize(double ntuples, int tupwidth,
int *numbuckets,
int *numbatches,
uint64 operatorMemKB
);
#define BLOOMVAL(hk) (((uint64)1) << (((hk) >> 13) & 0x3f))
/* Amount of metadata memory required per batch */
#define MD_MEM_PER_BATCH (sizeof(HashJoinBatchData *) + sizeof(HashJoinBatchData))
/* Amount of metadata memory required per bucket */
#define MD_MEM_PER_BUCKET (sizeof(HashJoinTuple) + sizeof(uint64))
/* ----------------------------------------------------------------
* ExecHash
*
* stub for pro forma compliance
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecHash(HashState *node)
{
elog(ERROR, "Hash node does not support ExecProcNode call convention");
return NULL;
}
/* ----------------------------------------------------------------
* MultiExecHash
*
* build hash table for hashjoin, doing partitioning if more
* than one batch is required.
* ----------------------------------------------------------------
*/
Node *
MultiExecHash(HashState *node)
{
PlanState *outerNode;
List *hashkeys;
HashJoinTable hashtable;
TupleTableSlot *slot;
ExprContext *econtext;
uint32 hashvalue = 0;
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStartNode(node->ps.instrument);
/*
* get state info from node
*/
outerNode = outerPlanState(node);
hashtable = node->hashtable;
/*
* set expression context
*/
hashkeys = node->hashkeys;
econtext = node->ps.ps_ExprContext;
#ifdef FAULT_INJECTOR
FaultInjector_InjectFaultIfSet(
MultiExecHashLargeVmem,
DDLNotSpecified,
"", // databaseName
""); // tableName
#endif
/*
* get all inner tuples and insert into the hash table (or temp files)
*/
for (;;)
{
slot = ExecProcNode(outerNode);
if (TupIsNull(slot))
break;
Gpmon_M_Incr(GpmonPktFromHashState(node), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&node->ps);
/* We have to compute the hash value */
econtext->ecxt_innertuple = slot;
bool hashkeys_null = false;
if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, node->hs_keepnull, &hashvalue, &hashkeys_null))
{
ExecHashTableInsert(node, hashtable, slot, hashvalue);
}
if (hashkeys_null)
{
node->hs_hashkeys_null = true;
if (node->hs_quit_if_hashkeys_null)
{
ExecSquelchNode(outerNode);
return NULL;
}
}
}
/* Now we have set up all the initial batches & primary overflow batches. */
hashtable->nbatch_outstart = hashtable->nbatch;
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, hashtable->totalTuples);
/*
* We do not return the hash table directly because it's not a subtype of
* Node, and so would violate the MultiExecProcNode API. Instead, our
* parent Hashjoin node is expected to know how to fish it out of our node
* state. Ugly but not really worth cleaning up, since Hashjoin knows
* quite a bit more about Hash besides that.
*/
return NULL;
}
/* ----------------------------------------------------------------
* ExecInitHash
*
* Init routine for Hash node
* ----------------------------------------------------------------
*/
HashState *
ExecInitHash(Hash *node, EState *estate, int eflags)
{
HashState *hashstate;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
/*
* create state structure
*/
hashstate = makeNode(HashState);
hashstate->ps.plan = (Plan *) node;
hashstate->ps.state = estate;
hashstate->hashtable = NULL;
hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &hashstate->ps);
#define HASH_NSLOTS 1
/*
* initialize our result slot
*/
ExecInitResultTupleSlot(estate, &hashstate->ps);
/*
* initialize child expressions
*/
hashstate->ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist,
(PlanState *) hashstate);
hashstate->ps.qual = (List *)
ExecInitExpr((Expr *) node->plan.qual,
(PlanState *) hashstate);
/*
* initialize child nodes
*/
outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
/*
* initialize tuple type. no need to initialize projection info because
* this node doesn't do projections
*/
ExecAssignResultTypeFromTL(&hashstate->ps);
hashstate->ps.ps_ProjInfo = NULL;
initGpmonPktForHash((Plan *)node, &hashstate->ps.gpmon_pkt, estate);
return hashstate;
}
int
ExecCountSlotsHash(Hash *node)
{
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
HASH_NSLOTS;
}
/* ---------------------------------------------------------------
* ExecEndHash
*
* clean up routine for Hash node
* ----------------------------------------------------------------
*/
void
ExecEndHash(HashState *node)
{
PlanState *outerPlan;
/*
* free exprcontext
*/
ExecFreeExprContext(&node->ps);
/*
* shut down the subplan
*/
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
EndPlanStateGpmonPkt(&node->ps);
}
/* ----------------------------------------------------------------
* ExecHashTableCreate
*
* create an empty hashtable data structure for hashjoin.
* ----------------------------------------------------------------
*/
HashJoinTable
ExecHashTableCreate(HashState *hashState, HashJoinState *hjstate, List *hashOperators, uint64 operatorMemKB, workfile_set * workfile_set)
{
HashJoinTable hashtable;
Plan *outerNode;
int nbuckets;
int nbatch;
int nkeys;
int i;
ListCell *ho;
MemoryContext oldcxt;
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
Hash *node = (Hash *) hashState->ps.plan;
/*
* Get information about the size of the relation to be hashed (it's the
* "outer" subtree of this node, but the inner relation of the hashjoin).
* Compute the appropriate size of the hash table.
*/
outerNode = outerPlan(node);
/*
* Initialize the hash table control block.
*
* The hashtable control block is just palloc'd from the executor's
* per-query memory context.
*/
hashtable = (HashJoinTable)palloc0(sizeof(HashJoinTableData));
hashtable->buckets = NULL;
hashtable->bloom = NULL;
hashtable->curbatch = 0;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
hashtable->batches = NULL;
hashtable->work_set = NULL;
hashtable->state_file = NULL;
hashtable->spaceAllowed = operatorMemKB * 1024L;
hashtable->stats = NULL;
hashtable->eagerlyReleased = false;
hashtable->hjstate = hjstate;
/*
* Create temporary memory contexts in which to keep the hashtable working
* storage. See notes in executor/hashjoin.h.
*/
hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
"HashTableContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
"HashBatchContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* CDB */ /* track temp buf file allocations in separate context */
hashtable->bfCxt = AllocSetContextCreate(CurrentMemoryContext,
"hbbfcxt",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (workfile_set != NULL)
{
hashtable->work_set = workfile_set;
ExecHashJoinLoadBucketsBatches(hashtable);
Assert(hjstate->nbatch_loaded_state == -1);
Assert(hjstate->cached_workfiles_batches_buckets_loaded);
hjstate->nbatch_loaded_state = hashtable->nbatch;
}
else
{
ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
&hashtable->nbuckets, &hashtable->nbatch, operatorMemKB);
}
nbuckets = hashtable->nbuckets;
nbatch = hashtable->nbatch;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
#ifdef HJDEBUG
elog(LOG, "HJ: nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
#endif
/*
* Get info about the hash functions to be used for each hash key.
* Also remember whether the join operators are strict.
*/
nkeys = list_length(hashOperators);
hashtable->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
i = 0;
foreach(ho, hashOperators)
{
Oid hashop = lfirst_oid(ho);
Oid hashfn;
hashfn = get_op_hash_function(hashop);
if (!OidIsValid(hashfn))
elog(ERROR, "could not find hash function for hash operator %u",
hashop);
fmgr_info(hashfn, &hashtable->hashfunctions[i]);
hashtable->hashStrict[i] = op_strict(hashop);
i++;
}
/*
* Allocate data that will live for the life of the hashjoin
*/
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
#ifdef HJDEBUG
{
/* Memory needed to allocate hashtable->batches, which consists of nbatch pointers */
int md_batch_size = (nbatch * sizeof(hashtable->batches[0])) / (1024 * 1024);
/* Memory needed to allocate hashtable->batches entries, which consist of nbatch HashJoinBatchData structures */
int md_batch_data_size = (nbatch * sizeof(HashJoinBatchData)) / (1024 * 1024);
/* Memory needed to allocate hashtable->buckets, which consists of nbuckets HashJoinTuple structures*/
int md_buckets_size = (nbuckets * sizeof(HashJoinTuple)) / (1024 * 1024);
/* Memory needed to allocate hashtable->bloom, which consists of nbuckets int64 values */
int md_bloom_size = (nbuckets * sizeof(uint64)) / (1024 * 1024);
/* Total memory needed for the hashtable metadata */
int md_tot = md_batch_size + md_batch_data_size + md_buckets_size + md_bloom_size;
elog(LOG, "About to allocate HashTable. HT_MEMORY=%dMB Memory needed for metadata: MDBATCH_ARR=%dMB, MDBATCH_DATA=%dMB, MDBUCKETS_ARR=%dMB, MDBLOOM_ARR=%dMB, TOTAL=%dMB",
(int) (hashtable->spaceAllowed / (1024 * 1024)),
md_batch_size, md_batch_data_size, md_buckets_size, md_bloom_size, md_tot);
elog(LOG, "sizeof(hashtable->batches[0])=%d, sizeof(HashJoinBatchData)=%d, sizeof(HashJoinTuple)=%d, sizeof(uint64)=%d",
(int) sizeof(hashtable->batches[0]), (int) sizeof(HashJoinBatchData),
(int) sizeof(HashJoinTuple), (int) sizeof(uint64));
}
#endif
/* array of BatchData ptrs */
hashtable->batches =
(HashJoinBatchData **)palloc(nbatch * sizeof(hashtable->batches[0]));
/* one BatchData entry per initial batch */
for (i = 0; i < nbatch; i++)
hashtable->batches[i] =
(HashJoinBatchData *)palloc0(sizeof(HashJoinBatchData));
/*
* Prepare context for the first-scan space allocations; allocate the
* hashbucket array therein, and set each bucket "empty".
*/
MemoryContextSwitchTo(hashtable->batchCxt);
hashtable->buckets = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));
if(gp_hashjoin_bloomfilter!=0)
hashtable->bloom = (uint64*) palloc0(nbuckets * sizeof(uint64));
MemoryContextSwitchTo(oldcxt);
}
END_MEMORY_ACCOUNT();
return hashtable;
}
/*
* Compute appropriate size for hashtable given the estimated size of the
* relation to be hashed (number of rows and average row width).
*
* This is exported so that the planner's costsize.c can use it.
*/
/* Target bucket loading (tuples per bucket) */
/*
* CDB: we now use gp_hashjoin_tuples_per_bucket
* #define NTUP_PER_BUCKET 10
*/
/* Prime numbers that we like to use as nbuckets values */
static const int hprimes[] = {
1033, 2063, 4111, 8219, 16417, 32779, 65539, 131111,
262151, 524341, 1048589, 2097211, 4194329, 8388619, 16777289, 33554473,
67108913, 134217773, 268435463, 536870951, 1073741831
};
/*
* We want nbuckets to be prime so as to avoid having bucket and batch
* numbers depend on only some bits of the hash code. Choose the next
* larger prime from the list in hprimes[]. (This also enforces that
* nbuckets is not very small, by the simple expedient of not putting any
* very small entries in hprimes[].)
*/
static int
ExecChoosePrimeNBuckets(int nbuckets)
{
for (int i = 0; i < (int) lengthof(hprimes); i++)
{
if (hprimes[i] >= nbuckets)
{
nbuckets = hprimes[i];
break;
}
}
return nbuckets;
}
void
ExecChooseHashTableSize(double ntuples, int tupwidth,
int *numbuckets,
int *numbatches,
uint64 operatorMemKB)
{
int tupsize;
double inner_rel_bytes;
long hash_table_bytes;
int nbatch;
int nbuckets;
/* num tuples is a global number. We should be receiving only part of that */
if (Gp_role == GP_ROLE_EXECUTE)
{
int segment_num = GetQEGangNum();
Assert(segment_num > 0);
ntuples = ntuples / segment_num;
}
/* Force a plausible relation size if no info */
if (ntuples <= 0.0)
ntuples = 1000.0;
/*
* Estimate tupsize based on footprint of tuple in hashtable... note this
* does not allow for any palloc overhead. The manipulations of spaceUsed
* don't count palloc overhead either.
*/
tupsize = ExecHashRowSize(tupwidth);
inner_rel_bytes = ntuples * tupsize;
/*
* Target in-memory hashtable size is work_mem kilobytes.
*/
hash_table_bytes = operatorMemKB * 1024L;
/*
* Set nbuckets to achieve an average bucket load of gp_hashjoin_tuples_per_bucket when
* memory is filled. Set nbatch to the smallest power of 2 that appears
* sufficient.
*/
if (inner_rel_bytes > hash_table_bytes)
{
/* We'll need multiple batches */
long lbuckets;
double dbatch;
int minbatch;
lbuckets = (hash_table_bytes / tupsize) / gp_hashjoin_tuples_per_bucket;
lbuckets = Min(lbuckets, INT_MAX / 32);
/* Pick the closest prime number for the number of buckets */
nbuckets = ExecChoosePrimeNBuckets((int) lbuckets);
dbatch = ceil(inner_rel_bytes / hash_table_bytes);
dbatch = Min(dbatch, INT_MAX / 32);
minbatch = (int) dbatch;
nbatch = 2;
while (nbatch < minbatch)
{
nbatch <<= 1;
}
/* Check to see if we're capping the number of workfiles we allow per query */
if (gp_workfile_limit_files_per_query > 0)
{
int nbatch_lower = nbatch;
/*
* We create two files per batch during spilling - one for
* outer and one of inner side. Lower the nbatch if necessary
* to fit under that limit. Don't go below two batches,
* because in that case we're basically disabling spilling.
*/
while ((nbatch_lower * 2 > gp_workfile_limit_files_per_query) && (nbatch_lower > 2))
{
nbatch_lower >>= 1;
}
Assert(nbatch_lower <= nbatch);
if (nbatch_lower != nbatch)
{
elog(LOG,"HashJoin: Too many batches computed: nbatch=%d. gp_workfile_limit_files_per_query=%d, using nbatch=%d instead",
nbatch, gp_workfile_limit_files_per_query, nbatch_lower);
nbatch = nbatch_lower;
}
}
/*
* Check to see if we're capping the amount of memory allowed for
* hasthable metadata (MPP-22417)
*/
if (gp_hashjoin_metadata_memory_percent > 0)
{
/* Compute how much memory we are willing to use for batch metadata */
long md_mem_for_batches = ((float) hash_table_bytes * (((float) gp_hashjoin_metadata_memory_percent) / 100))
- (nbuckets * MD_MEM_PER_BUCKET);
if (md_mem_for_batches < 0)
{
/* We are already out of metadata memory, we can't execute query. Error out */
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg(ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)));
}
if (nbatch * MD_MEM_PER_BATCH > md_mem_for_batches)
{
/*
* We're trying to allocate too many batches, it won't fit.
* Reduce the number of batches such that it fits in available metadata memory.
*/
int nbatch_lower = nbatch;
while (nbatch_lower > 1 && nbatch_lower * MD_MEM_PER_BATCH > md_mem_for_batches)
{
nbatch_lower >>= 1;
}
Assert(nbatch_lower <= nbatch);
elog(LOG,"HashJoin: Too many batches computed: nbatch=%d. Memory required for metadata=%dMB, available=%dMB. Chose nbatch=%d instead",
nbatch, (int) (nbatch * MD_MEM_PER_BATCH) / (1024 * 1024),
(int) md_mem_for_batches / (1024 * 1024),
nbatch_lower);
nbatch = nbatch_lower;
}
}
}
else
{
/* We expect the hashtable to fit in memory, we want to use
* more buckets if we have memory to spare */
double dbuckets_lower;
double dbuckets_upper;
double dbuckets;
/* divide our tuple row-count estimate by our the number of
* tuples we'd like in a bucket: this produces a small bucket
* count independent of our work_mem setting */
dbuckets_lower = (double)ntuples / (double)gp_hashjoin_tuples_per_bucket;
/* if we have work_mem to spare, we'd like to use it -- so
* divide up our memory evenly (see the spill case above) */
dbuckets_upper = (double)hash_table_bytes / ((double)tupsize * gp_hashjoin_tuples_per_bucket);
/* we'll use our "lower" work_mem independent guess as a lower
* limit; but if we've got memory to spare we'll take the mean
* of the lower-limit and the upper-limit */
if (dbuckets_upper > dbuckets_lower)
dbuckets = (dbuckets_lower + dbuckets_upper)/2.0;
else
dbuckets = dbuckets_lower;
dbuckets = ceil(dbuckets);
dbuckets = Min(dbuckets, INT_MAX);
/* Pick the closest prime number for the number of buckets */
nbuckets = ExecChoosePrimeNBuckets((int) dbuckets);
nbatch = 1;
}
*numbuckets = nbuckets;
*numbatches = nbatch;
}
/* ----------------------------------------------------------------
* ExecHashTableDestroy
*
* destroy a hash table
* ----------------------------------------------------------------
*/
void
ExecHashTableDestroy(HashState *hashState, HashJoinTable hashtable)
{
int i;
Assert(hashtable);
Assert(!hashtable->eagerlyReleased);
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
/*
* Make sure all the temp files are closed.
*/
for (i = 0; i < hashtable->nbatch; i++)
{
HashJoinBatchData *batch = hashtable->batches[i];
if (batch->innerside.workfile != NULL)
{
workfile_mgr_close_file(hashtable->work_set, batch->innerside.workfile);
batch->innerside.workfile = NULL;
}
if (batch->outerside.workfile != NULL)
{
workfile_mgr_close_file(hashtable->work_set, batch->outerside.workfile);
batch->outerside.workfile = NULL;
}
}
/* Close state file as well */
if (hashtable->state_file != NULL)
{
workfile_mgr_close_file(hashtable->work_set, hashtable->state_file);
hashtable->state_file = NULL;
}
if (hashtable->work_set != NULL)
{
workfile_mgr_close_set(hashtable->work_set);
hashtable->work_set = NULL;
}
/* Release working memory (batchCxt is a child, so it goes away too) */
MemoryContextDelete(hashtable->hashCxt);
hashtable->batches = NULL;
}
END_MEMORY_ACCOUNT();
}
/*
* ExecHashIncreaseNumBatches
* increase the original number of batches in order to reduce
* current memory consumption
*/
static void
ExecHashIncreaseNumBatches(HashJoinTable hashtable)
{
HashJoinBatchData *fullbatch = hashtable->batches[hashtable->curbatch];
int oldnbatch = hashtable->nbatch;
int curbatch = hashtable->curbatch;
int nbatch;
int i;
long ninmemory;
long nfreed;
Size spaceFreed = 0;
HashJoinTableStats *stats = hashtable->stats;
/* do nothing if we've decided to shut off growth */
if (!hashtable->growEnabled)
return;
/* safety check to avoid overflow */
if (oldnbatch > INT_MAX / 2)
return;
nbatch = oldnbatch * 2;
Assert(nbatch > 1);
#ifdef HJDEBUG
elog(LOG, "Increasing number of batches from %d to %d", oldnbatch, nbatch);
#endif
{
/* Print a warning for the extreme bad cases. */
unsigned mintup = hashtable->batches[0]->innertuples;
unsigned maxtup = mintup;
for(i=1; i<oldnbatch; ++i)
{
unsigned ntup = hashtable->batches[i]->innertuples;
maxtup = Max(ntup, maxtup);
mintup = Min(ntup, mintup);
}
if(maxtup > (mintup * 10))
elog(LOG, "Extreme skew in the innerside of Hashjoin, nbatch %d, mintuples %u, maxtuples %u", oldnbatch, mintup, maxtup);
}
ExecHashTableReallocBatchData(hashtable, nbatch);
Assert(hashtable->nbatch == nbatch);
/*
* Scan through the existing hash table entries and dump out any that are
* no longer of the current batch.
*/
ninmemory = nfreed = 0;
for (i = 0; i < hashtable->nbuckets; i++)
{
HashJoinTuple prevtuple;
HashJoinTuple tuple;
uint64 bloom = 0;
prevtuple = NULL;
tuple = hashtable->buckets[i];
while (tuple != NULL)
{
/* save link in case we delete */
HashJoinTuple nexttuple = tuple->next;
int bucketno;
int batchno;
ninmemory++;
ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
&bucketno, &batchno);
Assert(bucketno == i);
if (batchno == curbatch)
{
/* keep tuple */
prevtuple = tuple;
bloom |= BLOOMVAL(tuple->hashvalue);
}
else
{
Size spaceTuple;
/* dump it out */
Assert(batchno > curbatch);
Assert(batchno >= hashtable->hjstate->nbatch_loaded_state);
ExecHashJoinSaveTuple(NULL, HJTUPLE_MINTUPLE(tuple),
tuple->hashvalue,
hashtable,
&hashtable->batches[batchno]->innerside,
hashtable->bfCxt);
/* and remove from hash table */
if (prevtuple)
prevtuple->next = nexttuple;
else
hashtable->buckets[i] = nexttuple;
/* prevtuple doesn't change */
hashtable->totalTuples--;
spaceTuple = HJTUPLE_OVERHEAD + memtuple_get_size(HJTUPLE_MINTUPLE(tuple), NULL);
spaceFreed += spaceTuple;
if (stats)
stats->batchstats[batchno].spillspace_in += spaceTuple;
pfree(tuple);
nfreed++;
}
tuple = nexttuple;
}
if(gp_hashjoin_bloomfilter!=0)
hashtable->bloom[i] = bloom;
}
#ifdef HJDEBUG
elog(gp_workfile_caching_loglevel, "HJ batch %d: Freed %ld of %ld tuples, %lu of %lu bytes, space now %lu",
curbatch,
nfreed,
ninmemory,
(unsigned long)spaceFreed,
(unsigned long)fullbatch->innerspace,
(unsigned long)(fullbatch->innerspace - spaceFreed));
#endif
/* Update work_mem high-water mark and amount spilled. */
if (stats)
{
stats->workmem_max = Max(stats->workmem_max, fullbatch->innerspace);
stats->batchstats[curbatch].spillspace_out += spaceFreed;
stats->batchstats[curbatch].spillrows_out += nfreed;
}
/* Allow reuse of the space that has just been freed. */
fullbatch->innerspace -= spaceFreed;
fullbatch->innertuples -= nfreed;
/*
* If we dumped out either all or none of the tuples in the table, disable
* further expansion of nbatch. This situation implies that we have
* enough tuples of identical hashvalues to overflow spaceAllowed.
* Increasing nbatch will not fix it since there's no way to subdivide the
* group any more finely. We have to just gut it out and hope the server
* has enough RAM.
*/
if (nfreed == 0 || nfreed == ninmemory)
{
hashtable->growEnabled = false;
elog(LOG, "HJ: Disabling further increase of nbatch");
}
}
/*
* Re-allocate the batch data array when the number of batches increases
*/
static void
ExecHashTableReallocBatchData(HashJoinTable hashtable, int new_nbatch)
{
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
/*
* enlarge arrays and zero out added entries
*/
/* array of BatchData ptrs */
hashtable->batches = (HashJoinBatchData **)
repalloc(hashtable->batches, new_nbatch * sizeof(hashtable->batches[0]));
int old_nbatch = hashtable->nbatch;
int i;
/* one BatchData entry per additional batch */
for (i = old_nbatch; i < new_nbatch; i++)
{
hashtable->batches[i] =
(HashJoinBatchData *)palloc0(sizeof(HashJoinBatchData));
}
/* EXPLAIN ANALYZE batch statistics */
HashJoinTableStats *stats = hashtable->stats;
if (stats && stats->nbatchstats < new_nbatch)
{
Size sz = new_nbatch * sizeof(stats->batchstats[0]);
stats->batchstats =
(HashJoinBatchStats *)repalloc(stats->batchstats, sz);
sz = (new_nbatch - stats->nbatchstats) * sizeof(stats->batchstats[0]);
memset(stats->batchstats + stats->nbatchstats, 0, sz);
stats->nbatchstats = new_nbatch;
}
MemoryContextSwitchTo(oldcxt);
hashtable->nbatch = new_nbatch;
}
/*
* ExecHashTableInsert
* insert a tuple into the hash table depending on the hash value
* it may just go to a temp file for later batches
*
* Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
* tuple; the minimal case in particular is certain to happen while reloading
* tuples from batch files. We could save some cycles in the regular-tuple
* case by not forcing the slot contents into minimal form; not clear if it's
* worth the messiness required.
*/
void
ExecHashTableInsert(HashState *hashState, HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue)
{
MemTuple tuple = ExecFetchSlotMemTuple(slot, false);
HashJoinBatchData *batch;
int bucketno;
int batchno;
int hashTupleSize;
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
PlanState *ps = &hashState->ps;
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&bucketno, &batchno);
batch = hashtable->batches[batchno];
hashTupleSize = HJTUPLE_OVERHEAD + memtuple_get_size(tuple, NULL);
/* Update batch size. */
batch->innertuples++;
batch->innerspace += hashTupleSize;
/*
* decide whether to put the tuple in the hash table or a temp file
*/
if (batchno == hashtable->curbatch)
{
/*
* put the tuple in hash table
*/
HashJoinTuple hashTuple;
hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
hashTupleSize);
hashTuple->hashvalue = hashvalue;
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, memtuple_get_size(tuple, NULL));
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
hashtable->totalTuples += 1;
if(gp_hashjoin_bloomfilter!=0)
hashtable->bloom[bucketno] |= BLOOMVAL(hashvalue);
/* Double the number of batches when too much data in hash table. */
if (batch->innerspace > hashtable->spaceAllowed ||
batch->innertuples > UINT_MAX/2)
{
ExecHashIncreaseNumBatches(hashtable);
if (ps && ps->instrument)
{
ps->instrument->workfileCreated = true;
}
/* Gpmon stuff */
if(ps)
{
Gpmon_M_Set(&ps->gpmon_pkt, GPMON_HASH_SPILLBATCH, hashtable->nbatch);
CheckSendPlanStateGpmonPkt(ps);
}
}
}
else
{
/*
* put the tuple into a temp file for later batches, only when the cached
* workfile is not used.
*/
if (!hashtable->hjstate->cached_workfiles_found)
{
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ps, tuple, hashvalue, hashtable, &batch->innerside, hashtable->bfCxt);
}
}
}
END_MEMORY_ACCOUNT();
}
/*
* ExecHashGetHashValue
* Compute the hash value for a tuple
*
* The tuple to be tested must be in either econtext->ecxt_outertuple or
* econtext->ecxt_innertuple. Vars in the hashkeys expressions reference
* either OUTER or INNER.
*
* A TRUE result means the tuple's hash value has been successfully computed
* and stored at *hashvalue. A FALSE result means the tuple cannot match
* because it contains a null attribute, and hence it should be discarded
* immediately. (If keep_nulls is true then FALSE is never returned.)
* Found_null indicates all the hashkeys are null.
*/
bool
ExecHashGetHashValue(HashState *hashState, HashJoinTable hashtable,
ExprContext *econtext,
List *hashkeys,
bool keep_nulls,
uint32 *hashvalue,
bool *hashkeys_null)
{
uint32 hashkey = 0;
ListCell *hk;
int i = 0;
MemoryContext oldContext;
bool result = true;
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
Assert(hashkeys_null);
(*hashkeys_null) = true;
/*
* We reset the eval context each time to reclaim any memory leaked in the
* hashkey expressions.
*/
ResetExprContext(econtext);
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
foreach(hk, hashkeys)
{
ExprState *keyexpr = (ExprState *) lfirst(hk);
Datum keyval;
bool isNull = false;
/* rotate hashkey left 1 bit at each step */
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
/*
* Get the join attribute value of the tuple
*/
keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
if (!isNull)
{
*hashkeys_null = false;
}
/*
* If the attribute is NULL, and the join operator is strict, then
* this tuple cannot pass the join qual so we can reject it
* immediately (unless we're scanning the outside of an outer join,
* in which case we must not reject it). Otherwise we act like the
* hashcode of NULL is zero (this will support operators that act like
* IS NOT DISTINCT, though not any more-random behavior). We treat
* the hash support function as strict even if the operator is not.
*
* Note: currently, all hashjoinable operators must be strict since
* the hash index AM assumes that. However, it takes so little
* extra code here to allow non-strict that we may as well do it.
*/
if (isNull)
{
if (hashtable->hashStrict[i] && !keep_nulls)
{
result = false;
}
/* else, leave hashkey unmodified, equivalent to hashcode 0 */
}
else if (result)
{
/* Compute the hash function */
uint32 hkey;
hkey = DatumGetUInt32(FunctionCall1(&hashtable->hashfunctions[i],
keyval));
hashkey ^= hkey;
}
i++;
}
MemoryContextSwitchTo(oldContext);
*hashvalue = hashkey;
}
END_MEMORY_ACCOUNT();
return result;
}
/*
* ExecHashGetBucketAndBatch
* Determine the bucket number and batch number for a hash value
*
* Note: on-the-fly increases of nbatch must not change the bucket number
* for a given hash code (since we don't move tuples to different hash
* chains), and must only cause the batch number to remain the same or
* increase. Our algorithm is
* bucketno = hashvalue MOD nbuckets
* batchno = hash_uint32(hashvalue) MOD nbatch
* which gives reasonably independent bucket and batch numbers in the face
* of some rather poorly-implemented hash functions in hashfunc.c. (This
* will change in PG 8.3.)
*
* nbuckets doesn't change over the course of the join.
*
* nbatch is always a power of 2; we increase it only by doubling it. This
* effectively adds one more bit to the top of the batchno.
*/
void
ExecHashGetBucketAndBatch(HashJoinTable hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno)
{
uint32 nbuckets = (uint32) hashtable->nbuckets;
uint32 nbatch = (uint32) hashtable->nbatch;
if (nbatch > 1)
{
*bucketno = hashvalue % nbuckets;
/* since nbatch is a power of 2, can do MOD by masking */
*batchno = hash_uint32(hashvalue) & (nbatch - 1);
}
else
{
*bucketno = hashvalue % nbuckets;
*batchno = 0;
}
}
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
*
* The current outer tuple must be stored in econtext->ecxt_outertuple.
*/
HashJoinTuple
ExecScanHashBucket(HashState *hashState, HashJoinState *hjstate,
ExprContext *econtext)
{
List *hjclauses = hjstate->hashqualclauses;
HashJoinTable hashtable = hjstate->hj_HashTable;
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
uint32 hashvalue = hjstate->hj_CurHashValue;
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
/*
* hj_CurTuple is NULL to start scanning a new bucket, or the address of
* the last tuple returned from the current bucket.
*/
if (hashTuple == NULL)
{
/* if bloom filter fails, then no match - don't even bother to scan */
if (gp_hashjoin_bloomfilter == 0 || 0 != (hashtable->bloom[hjstate->hj_CurBucketNo] & BLOOMVAL(hashvalue)))
hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
}
else
hashTuple = hashTuple->next;
while (hashTuple != NULL)
{
if (hashTuple->hashvalue == hashvalue)
{
TupleTableSlot *inntuple;
/* insert hashtable's tuple into exec slot so ExecQual sees it */
inntuple = ExecStoreMemTuple(HJTUPLE_MINTUPLE(hashTuple),
hjstate->hj_HashTupleSlot,
false); /* do not pfree */
econtext->ecxt_innertuple = inntuple;
/* reset temp memory each time to avoid leaks from qual expr */
ResetExprContext(econtext);
if (ExecQual(hjclauses, econtext, false))
{
hjstate->hj_CurTuple = hashTuple;
return hashTuple;
}
}
hashTuple = hashTuple->next;
}
}
END_MEMORY_ACCOUNT();
/*
* no match
*/
return NULL;
}
/*
* ExecHashTableReset
*
* reset hash table header for new batch
*/
void
ExecHashTableReset(HashState *hashState, HashJoinTable hashtable)
{
MemoryContext oldcxt;
int nbuckets = 0;
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
Assert(hashtable);
Assert(!hashtable->eagerlyReleased);
nbuckets = hashtable->nbuckets;
/*
* Release all the hash buckets and tuples acquired in the prior pass, and
* reinitialize the context for a new pass.
*/
MemoryContextReset(hashtable->batchCxt);
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
/* Reallocate and reinitialize the hash bucket headers. */
hashtable->buckets = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));
if(gp_hashjoin_bloomfilter != 0)
hashtable->bloom = (uint64*) palloc0(nbuckets * sizeof(uint64));
hashtable->batches[hashtable->curbatch]->innerspace = 0;
hashtable->batches[hashtable->curbatch]->innertuples = 0;
hashtable->totalTuples = 0;
MemoryContextSwitchTo(oldcxt);
}
END_MEMORY_ACCOUNT();
}
void
ExecReScanHash(HashState *node, ExprContext *exprCtxt)
{
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (((PlanState *) node)->lefttree->chgParam == NULL)
ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
}
/*
* ExecHashTableExplainInit
* Called after ExecHashTableCreate to set up EXPLAIN ANALYZE reporting.
*/
void
ExecHashTableExplainInit(HashState *hashState, HashJoinState *hjstate,
HashJoinTable hashtable)
{
MemoryContext oldcxt;
int nbatch = Max(hashtable->nbatch, 1);
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
/* Switch to a memory context that survives until ExecutorEnd. */
oldcxt = MemoryContextSwitchTo(hjstate->js.ps.state->es_query_cxt);
/* Request a callback at end of query. */
hjstate->js.ps.cdbexplainfun = ExecHashTableExplainEnd;
/* Create workarea and attach it to the HashJoinTable. */
hashtable->stats = (HashJoinTableStats *)palloc0(sizeof(*hashtable->stats));
hashtable->stats->endedbatch = -1;
/* Create per-batch statistics array. */
hashtable->stats->batchstats =
(HashJoinBatchStats *)palloc0(nbatch * sizeof(hashtable->stats->batchstats[0]));
hashtable->stats->nbatchstats = nbatch;
/* Restore caller's memory context. */
MemoryContextSwitchTo(oldcxt);
}
END_MEMORY_ACCOUNT();
} /* ExecHashTableExplainInit */
/*
* ExecHashTableExplainEnd
* Called before ExecutorEnd to finish EXPLAIN ANALYZE reporting.
*/
void
ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
{
HashJoinState *hjstate = (HashJoinState *)planstate;
HashJoinTable hashtable = hjstate->hj_HashTable;
HashJoinTableStats *stats;
Instrumentation *jinstrument = hjstate->js.ps.instrument;
int total_buckets;
int i;
if (!hashtable ||
!hashtable->stats ||
hashtable->nbatch < 1 ||
!jinstrument)
return;
stats = hashtable->stats;
/* Check batchstats not null: If nodeHash failed to palloc batchstats, it will
* throw. Posgres will catch and handle it, but no matter what, postgres will
* try to get some explain results. We must check here in this case or we will
* segv.
*/
if(stats->batchstats == NULL)
return;
if (!hashtable->eagerlyReleased)
{
HashState *hashState = (HashState *) innerPlanState(hjstate);
/* Report on batch in progress, in case the join is being ended early. */
ExecHashTableExplainBatchEnd(hashState, hashtable);
/* Report executor memory used by our memory context. */
jinstrument->execmemused +=
(double)MemoryContextGetPeakSpace(hashtable->hashCxt);
}
else
{
/**
* Memory has been eagerly released. We can't get statistics
* from the memory context. We approximate from stats structure.
*/
Assert(gp_eager_hashtable_release);
jinstrument->execmemused +=
(double) stats->workmem_max;
}
/* Report actual work_mem high water mark. */
jinstrument->workmemused = Max(jinstrument->workmemused, stats->workmem_max);
/* How much work_mem would suffice to hold all inner tuples in memory? */
if (hashtable->nbatch > 1)
{
uint64 workmemwanted = 0;
/* Space actually taken by hash rows in completed batches... */
for (i = 0; i <= stats->endedbatch; i++)
workmemwanted += stats->batchstats[i].hashspace_final;
/* ... plus workfile size for original batches not reached, plus... */
for (; i < hashtable->nbatch_original; i++)
workmemwanted += stats->batchstats[i].innerfilesize;
/* ... rows spilled to unreached oflo batches, in case quitting early */
for (; i < stats->nbatchstats; i++)
workmemwanted += stats->batchstats[i].spillspace_in;
/*
* Sometimes workfiles are used even though all the data would fit
* in work_mem. For example, if the planner overestimated the inner
* rel size, it might have instructed us to use more initial batches
* than were actually needed, causing unnecessary workfile I/O. To
* avoid this I/O, the user would have to increase work_mem based on
* the planner's estimate rather than our runtime observations. For
* now, we don't try to second-guess the planner; just keep quiet.
*/
if (workmemwanted > PlanStateOperatorMemKB(planstate) * 1024L)
jinstrument->workmemwanted =
Max(jinstrument->workmemwanted, workmemwanted);
}
/* Report workfile I/O statistics. */
if (hashtable->nbatch > 1)
{
if (!hjstate->cached_workfiles_loaded)
{
ExecHashTableExplainBatches(hashtable, buf, 0, 1, "Initial");
ExecHashTableExplainBatches(hashtable,
buf,
1,
hashtable->nbatch_original,
"Initial");
ExecHashTableExplainBatches(hashtable,
buf,
hashtable->nbatch_original,
hashtable->nbatch_outstart,
"Overflow");
ExecHashTableExplainBatches(hashtable,
buf,
hashtable->nbatch_outstart,
hashtable->nbatch,
"Secondary Overflow");
}
else
{
ExecHashTableExplainBatches(hashtable, buf, 0, hashtable->nbatch, "Reuse");
}
}
/* Report hash chain statistics. */
total_buckets = stats->nonemptybatches * hashtable->nbuckets;
if (total_buckets > 0)
{
appendStringInfo(buf,
"Hash chain length"
" %.1f avg, %.0f max, using %d of %d buckets. ",
cdbexplain_agg_avg(&stats->chainlength),
stats->chainlength.vmax,
stats->chainlength.vcnt,
total_buckets);
if (hashtable->nbatch > stats->nonemptybatches)
appendStringInfo(buf,
"Skipped %d empty batches.",
hashtable->nbatch - stats->nonemptybatches);
appendStringInfoChar(buf, '\n');
}
} /* ExecHashTableExplainEnd */
/*
* ExecHashTableExplainBatches
* Report summary of EXPLAIN ANALYZE stats for a set of batches.
*/
void
ExecHashTableExplainBatches(HashJoinTable hashtable,
StringInfo buf,
int ibatch_begin,
int ibatch_end,
const char *title)
{
HashJoinTableStats *stats = hashtable->stats;
CdbExplain_Agg irdbytes;
CdbExplain_Agg iwrbytes;
CdbExplain_Agg ordbytes;
CdbExplain_Agg owrbytes;
int i;
if (ibatch_begin >= ibatch_end)
return;
Assert(ibatch_begin >= 0 &&
ibatch_end <= hashtable->nbatch &&
hashtable->nbatch <= stats->nbatchstats &&
stats->batchstats != NULL);
cdbexplain_agg_init0(&irdbytes);
cdbexplain_agg_init0(&iwrbytes);
cdbexplain_agg_init0(&ordbytes);
cdbexplain_agg_init0(&owrbytes);
/* Add up the batch stats. */
char hostname[SEGMENT_IDENTITY_NAME_LENGTH];
gethostname(hostname,SEGMENT_IDENTITY_NAME_LENGTH);
for (i = ibatch_begin; i < ibatch_end; i++)
{
HashJoinBatchStats *bs = &stats->batchstats[i];
cdbexplain_agg_upd(&irdbytes, (double)bs->irdbytes, i, hostname);
cdbexplain_agg_upd(&iwrbytes, (double)bs->iwrbytes, i, hostname);
cdbexplain_agg_upd(&ordbytes, (double)bs->ordbytes, i, hostname);
cdbexplain_agg_upd(&owrbytes, (double)bs->owrbytes, i, hostname);
}
if (iwrbytes.vcnt + irdbytes.vcnt + owrbytes.vcnt + ordbytes.vcnt > 0)
{
if (ibatch_begin == ibatch_end - 1)
appendStringInfo(buf,
"%s batch %d:\n",
title,
ibatch_begin);
else
appendStringInfo(buf,
"%s batches %d..%d:\n",
title,
ibatch_begin,
ibatch_end - 1);
}
/* Inner bytes read from workfile */
if (irdbytes.vcnt > 0)
{
appendStringInfo(buf,
" Read %.0fK bytes from inner workfile",
ceil(irdbytes.vsum / 1024));
if (irdbytes.vcnt > 1)
appendStringInfo(buf,
": %.0fK avg x %d nonempty batches"
", %.0fK max",
ceil(cdbexplain_agg_avg(&irdbytes)/1024),
irdbytes.vcnt,
ceil(irdbytes.vmax / 1024));
appendStringInfoString(buf, ".\n");
}
/* Inner rel bytes spilled to workfile */
if (iwrbytes.vcnt > 0)
{
appendStringInfo(buf,
" Wrote %.0fK bytes to inner workfile",
ceil(iwrbytes.vsum / 1024));
if (iwrbytes.vcnt > 1)
appendStringInfo(buf,
": %.0fK avg x %d overflowing batches"
", %.0fK max",
ceil(cdbexplain_agg_avg(&iwrbytes)/1024),
iwrbytes.vcnt,
ceil(iwrbytes.vmax / 1024));
appendStringInfoString(buf, ".\n");
}
/* Outer bytes read from workfile */
if (ordbytes.vcnt > 0)
{
appendStringInfo(buf,
" Read %.0fK bytes from outer workfile",
ceil(ordbytes.vsum / 1024));
if (ordbytes.vcnt > 1)
appendStringInfo(buf,
": %.0fK avg x %d nonempty batches"
", %.0fK max",
ceil(cdbexplain_agg_avg(&ordbytes)/1024),
ordbytes.vcnt,
ceil(ordbytes.vmax / 1024));
appendStringInfoString(buf, ".\n");
}
/* Outer rel bytes spilled to workfile */
if (owrbytes.vcnt > 0)
{
appendStringInfo(buf,
" Wrote %.0fK bytes to outer workfile",
ceil(owrbytes.vsum / 1024));
if (owrbytes.vcnt > 1)
appendStringInfo(buf,
": %.0fK avg x %d overflowing batches"
", %.0fK max",
ceil(cdbexplain_agg_avg(&owrbytes)/1024),
owrbytes.vcnt,
ceil(owrbytes.vmax / 1024));
appendStringInfoString(buf, ".\n");
}
} /* ExecHashTableExplainBatches */
/*
* ExecHashTableExplainBatchEnd
* Called at end of each batch to collect statistics for EXPLAIN ANALYZE.
*/
void
ExecHashTableExplainBatchEnd(HashState *hashState, HashJoinTable hashtable)
{
int curbatch = hashtable->curbatch;
HashJoinTableStats *stats = hashtable->stats;
HashJoinBatchStats *batchstats = &stats->batchstats[curbatch];
HashJoinBatchData *batch = NULL;
int i;
START_MEMORY_ACCOUNT(hashState->ps.plan->memoryAccount);
{
Assert(!hashtable->eagerlyReleased);
Assert(hashtable->batches);
batch = hashtable->batches[curbatch];
/* Already reported on this batch? */
if ( stats->endedbatch == curbatch
|| curbatch >= hashtable->nbatch)
return;
stats->endedbatch = curbatch;
/* Update high-water mark for work_mem actually used at one time. */
if (stats->workmem_max < batch->innerspace)
stats->workmem_max = batch->innerspace;
/* Final size of hash table for this batch */
batchstats->hashspace_final = batch->innerspace;
/* Collect workfile I/O statistics. */
if (hashtable->nbatch > 1)
{
uint64 owrbytes = 0;
uint64 iwrbytes = 0;
Assert(stats->batchstats &&
hashtable->nbatch <= stats->nbatchstats);
/* How much was read from inner workfile for current batch? */
batchstats->irdbytes = batchstats->innerfilesize;
/* How much was read from outer workfiles for current batch? */
if (hashtable->batches[curbatch]->outerside.workfile != NULL)
batchstats->ordbytes =
ExecWorkFile_Tell64(hashtable->batches[curbatch]->outerside.workfile);
/*
* How much was written to workfiles for the remaining batches?
* In workfile caching, there is no need to write to the remaining batches.
*/
if (!hashtable->hjstate->cached_workfiles_loaded)
{
for (i = curbatch + 1; i < hashtable->nbatch; i++)
{
HashJoinBatchData *batch = hashtable->batches[i];
HashJoinBatchStats *bs = &stats->batchstats[i];
uint64 filebytes = 0;
if (batch->outerside.workfile != NULL)
filebytes = ExecWorkFile_Tell64(batch->outerside.workfile);
Assert(filebytes >= bs->outerfilesize);
owrbytes += filebytes - bs->outerfilesize;
bs->outerfilesize = filebytes;
filebytes = 0;
if (batch->innerside.workfile)
filebytes = ExecWorkFile_Tell64(batch->innerside.workfile);
Assert(filebytes >= bs->innerfilesize);
iwrbytes += filebytes - bs->innerfilesize;
bs->innerfilesize = filebytes;
}
batchstats->owrbytes = owrbytes;
batchstats->iwrbytes = iwrbytes;
}
} /* give workfile I/O statistics */
/* Collect hash chain statistics. */
if (batch->innertuples > 0)
{
stats->nonemptybatches++;
char hostname[SEGMENT_IDENTITY_NAME_LENGTH];
gethostname(hostname,SEGMENT_IDENTITY_NAME_LENGTH);
for (i = 0; i < hashtable->nbuckets; i++)
{
HashJoinTuple hashtuple = hashtable->buckets[i];
int chainlength;
if (hashtuple)
{
for (chainlength = 0; hashtuple; hashtuple = hashtuple->next)
chainlength++;
cdbexplain_agg_upd(&stats->chainlength, chainlength, i, hostname);
}
}
}
}
END_MEMORY_ACCOUNT();
} /* ExecHashTableExplainBatchEnd */
void
initGpmonPktForHash(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, Hash));
{
Assert(GPMON_HASH_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_Hash,
(int64)planNode->plan_rows,
NULL);
}
}