| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * execHHashagg.c |
| * GPDB additions to support hybrid hash aggregation algorithm. |
| * This file could be merged into nodeAgg.c. The separation is |
| * only to help isolate Greenplum Database-only code from future merges with |
| * PG code. Note, however, that nodeAgg.c is also modified to |
| * make use of this code. |
| * |
| * Portions Copyright (c) 2006-2007, Greenplum |
| * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * $Id$ |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <unistd.h> |
| |
| #include "miscadmin.h" /* work_mem */ |
| #include "executor/executor.h" |
| #include "nodes/execnodes.h" |
| #include "executor/tuptable.h" |
| #include "executor/instrument.h" /* Instrumentation */ |
| #include "executor/execHHashagg.h" |
| #include "executor/execWorkfile.h" |
| #include "storage/bfz.h" |
| #include "utils/datum.h" |
| #include "utils/memutils.h" |
| #include "utils/lsyscache.h" |
| #include "utils/elog.h" |
| #include "cdb/memquota.h" |
| #include "utils/workfile_mgr.h" |
| |
| #include "access/hash.h" |
| |
| #include "cdb/cdbcellbuf.h" |
| #include "cdb/cdbexplain.h" |
| #include "cdb/cdbvars.h" |
| #include "postmaster/primary_mirror_mode.h" |
| |
| |
| #define HHA_MSG_LVL DEBUG2 |
| |
| |
| /* Encapture data related to a batch file. */ |
| struct BatchFileInfo |
| { |
| int64 total_bytes; |
| int64 ntuples; |
| ExecWorkFile *wfile; |
| }; |
| |
| #define BATCHFILE_METADATA \ |
| (sizeof(BatchFileInfo) + sizeof(bfz_t) + sizeof(struct bfz_freeable_stuff)) |
| #define FREEABLE_BATCHFILE_METADATA (sizeof(struct bfz_freeable_stuff)) |
| /* |
| * Number of batchfile metadata to reserve during spilling in order to have |
| * enough memory to open them at reuse. |
| */ |
| #define NO_RESERVED_BATCHFILE_METADATA 256 |
| |
| /* Used for padding */ |
| static char padding_dummy[MAXIMUM_ALIGNOF]; |
| |
| /* |
| * Represent different types for input records to be inserted |
| * into the hash table. |
| */ |
| typedef enum InputRecordType |
| { |
| INPUT_RECORD_TUPLE = 0, |
| INPUT_RECORD_GROUP_AND_AGGS, |
| } InputRecordType; |
| |
| #define GET_BUFFER_SIZE(hashtable) \ |
| ((hashtable)->entry_buf.nfull_total * (hashtable)->entry_buf.cellbytes + \ |
| mpool_total_bytes_allocated((hashtable)->group_buf)) |
| |
| #define GET_USED_BUFFER_SIZE(hashtable) \ |
| ((hashtable)->entry_buf.nfull_total * (hashtable)->entry_buf.cellbytes + \ |
| mpool_bytes_used((hashtable)->group_buf)) |
| |
| #define SANITY_CHECK_METADATA_SIZE(hashtable) \ |
| do { \ |
| if ((hashtable)->mem_for_metadata >= (hashtable)->max_mem) \ |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), \ |
| errmsg(ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)));\ |
| } while (0) |
| |
| #define GET_TOTAL_USED_SIZE(hashtable) \ |
| (GET_USED_BUFFER_SIZE(hashtable) + (hashtable)->mem_for_metadata) |
| |
| #define HAVE_FREESPACE(hashtable) \ |
| (GET_TOTAL_USED_SIZE(hashtable) < (hashtable)->max_mem) |
| |
| /* Methods that handle batch files */ |
| static SpillSet *createSpillSet(unsigned branching_factor, unsigned parent_hash_bit); |
| static SpillSet *read_spill_set(AggState *aggstate); |
| static int closeSpillFile(AggState *aggstate, SpillSet *spill_set, int file_no); |
| static int closeSpillFiles(AggState *aggstate, SpillSet *spill_set); |
| static int suspendSpillFiles(SpillSet *spill_set); |
| static int32 writeHashEntry(AggState *aggstate, |
| BatchFileInfo *file_info, |
| HashAggEntry *entry); |
| static void *readHashEntry(AggState *aggstate, |
| BatchFileInfo *file_info, |
| HashKey *p_hashkey, |
| int32 *p_input_size); |
| |
| /* Methods for hash table */ |
| static uint32 calc_hash_value(AggState* aggstate, TupleTableSlot *inputslot); |
| static void spill_hash_table(AggState *aggstate); |
| static void init_agg_hash_iter(HashAggTable* ht); |
| static HashAggEntry *lookup_agg_hash_entry(AggState *aggstate, void *input_record, |
| InputRecordType input_type, int32 input_size, |
| uint32 hashkey, unsigned parent_hash_bit, bool *p_isnew); |
| static void agg_hash_table_stat_upd(HashAggTable *ht); |
| static void reset_agg_hash_table(AggState *aggstate); |
| static bool agg_hash_reload(AggState *aggstate); |
| static inline void *mpool_cxt_alloc(void *manager, Size len); |
| |
| /* Methods for state file */ |
| static void create_state_file(HashAggTable *hashtable); |
| static void agg_hash_save_spillfile_info(ExecWorkFile *state_file, SpillFile *spill_file); |
| static bool agg_hash_load_spillfile_info(ExecWorkFile *state_file, char **spill_file_name, unsigned *batch_hash_bit); |
| static void agg_hash_write_string(ExecWorkFile *ewf, const char *str, size_t len); |
| static char *agg_hash_read_string(ExecWorkFile *ewf); |
| |
| static inline void *mpool_cxt_alloc(void *manager, Size len) |
| { |
| return mpool_alloc((MPool *)manager, len); |
| } |
| |
| /* Function: calc_hash_value |
| * |
| * Calculate the hash value for the given input tuple. |
| * |
| * This based on but different from get_hash_value from the dynahash |
| * API. Use a different name to underline that we don't use dynahash. |
| */ |
| uint32 |
| calc_hash_value(AggState* aggstate, TupleTableSlot *inputslot) |
| { |
| Agg *agg; |
| ExprContext *econtext; |
| MemoryContext oldContext; |
| int i; |
| FmgrInfo* info = aggstate->hashfunctions; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| |
| agg = (Agg*)aggstate->ss.ps.plan; |
| econtext = aggstate->tmpcontext; /* short-lived, per-input-tuple */ |
| |
| oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| |
| for (i = 0; i < agg->numCols; i++, info++) |
| { |
| AttrNumber att = agg->grpColIdx[i]; |
| bool isnull = false; |
| Datum value = slot_getattr(inputslot, att, &isnull); |
| |
| if (!isnull) /* treat nulls as having hash key 0 */ |
| { |
| hashtable->hashkey_buf[i] = DatumGetUInt32(FunctionCall1(info, value)); |
| } |
| |
| else |
| hashtable->hashkey_buf[i] = 0xdeadbeef; |
| |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| return (uint32) hash_any((unsigned char *) hashtable->hashkey_buf, agg->numCols * sizeof(HashKey)); |
| } |
| |
| /* Function: adjustInputGroup |
| * |
| * Adjust the datum pointers stored in the byte array of an input group. |
| */ |
| static inline void |
| adjustInputGroup(AggState *aggstate, |
| void *input_group, |
| MemTupleBinding *mt_bind) |
| { |
| int32 tuple_size; |
| void *datum; |
| AggStatePerGroup pergroup; |
| AggStatePerAgg peragg = aggstate->peragg; |
| int aggno; |
| |
| tuple_size = memtuple_get_size((MemTuple)input_group, mt_bind); |
| pergroup = (AggStatePerGroup) ((char *)input_group + |
| MAXALIGN(tuple_size)); |
| Assert(pergroup != NULL); |
| datum = (char *)input_group + MAXALIGN(tuple_size) + |
| aggstate->numaggs * sizeof(AggStatePerGroupData); |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &peragg[aggno]; |
| AggStatePerGroup pergroupstate = &pergroup[aggno]; |
| |
| if (!peraggstate->transtypeByVal && |
| !pergroupstate->transValueIsNull) |
| { |
| Size datum_size; |
| pergroupstate->transValue = PointerGetDatum(datum); |
| datum_size = datumGetSize(pergroupstate->transValue, |
| peraggstate->transtypeByVal, |
| peraggstate->transtypeLen); |
| Assert(MAXALIGN(datum_size) - datum_size <= MAXIMUM_ALIGNOF); |
| datum = (char *)datum + MAXALIGN(datum_size); |
| } |
| } |
| } |
| |
| /* Function: getEmptyHashAggEntry |
| * |
| * Obtain a new empty HashAggEntry. |
| */ |
| static inline HashAggEntry * |
| getEmptyHashAggEntry(AggState *aggstate) |
| { |
| HashAggEntry *entry; |
| CdbCellBuf *entry_buf = &(aggstate->hhashtable->entry_buf); |
| |
| entry = (HashAggEntry *)CdbCellBuf_AppendCell(entry_buf); |
| |
| return entry; |
| } |
| |
| /* Function: makeHashAggEntryForInput |
| * |
| * Allocate a new hash agg entry for the given input tuple and hash key |
| * of the given AggState. This includes installing the grouping key heap tuple. |
| * |
| * It is the caller's responsibility to link the entry into the hash table |
| * and to initialize the per group data. |
| * |
| * If no enough memory is available, this function returns NULL. |
| */ |
| static HashAggEntry * |
| makeHashAggEntryForInput(AggState *aggstate, TupleTableSlot *inputslot, uint32 hashvalue) |
| { |
| HashAggEntry *entry; |
| MemoryContext oldcxt; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| TupleTableSlot *hashslot = aggstate->hashslot; |
| Datum *values = slot_get_values(aggstate->hashslot); |
| bool *isnull = slot_get_isnull(aggstate->hashslot); |
| ListCell *lc; |
| uint32 tup_len; |
| uint32 aggs_len; |
| uint32 len; |
| |
| /* |
| * Extract the grouping columns from the inputslot, and store them into |
| * hashslot. The first integer in aggstate->hash_needed is the largest |
| * Var number for all grouping columns. |
| */ |
| foreach (lc, aggstate->hash_needed) |
| { |
| const int n = lfirst_int(lc); |
| values[n-1] = slot_getattr(inputslot, n, &(isnull[n-1])); |
| } |
| |
| tup_len = 0; |
| aggs_len = aggstate->numaggs * sizeof(AggStatePerGroupData); |
| |
| oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt); |
| |
| entry = getEmptyHashAggEntry(aggstate); |
| entry->tuple_and_aggs = NULL; |
| entry->hashvalue = hashvalue; |
| entry->is_primodial = !(hashtable->is_spilling); |
| entry->next = NULL; |
| |
| /* |
| * Copy memtuple into group_buf. Remember to always allocate |
| * enough space before calling ExecCopySlotMemTupleTo() because |
| * this function will call palloc() to allocate bigger space if |
| * the given one is not big enough, which is what we want to avoid. |
| */ |
| entry->tuple_and_aggs = (void *)memtuple_form_to(hashslot->tts_mt_bind, |
| values, |
| isnull, |
| entry->tuple_and_aggs, |
| &tup_len, false); |
| Assert(tup_len > 0 && entry->tuple_and_aggs == NULL); |
| |
| if (GET_TOTAL_USED_SIZE(hashtable) + MAXALIGN(MAXALIGN(tup_len) + aggs_len) >= |
| hashtable->max_mem) |
| return NULL; |
| |
| entry->tuple_and_aggs = mpool_alloc(hashtable->group_buf, |
| MAXALIGN(MAXALIGN(tup_len) + aggs_len)); |
| len = tup_len; |
| entry->tuple_and_aggs = (void *)memtuple_form_to(hashslot->tts_mt_bind, |
| values, |
| isnull, |
| entry->tuple_and_aggs, |
| &len, false); |
| Assert(len == tup_len && entry->tuple_and_aggs != NULL); |
| |
| MemoryContextSwitchTo(oldcxt); |
| return entry; |
| } |
| |
| /* |
| * Function: makeHashAggEntryForGroup |
| * |
| * Allocate a new hash agg entry for the given byte array representing |
| * group keys and aggregate values. This function will initialize the |
| * per group data by pointing to the data stored on the given byte |
| * array. |
| * |
| * This function assumes that the given byte array contains both a |
| * memtuple that represents grouping keys, and their aggregate values, |
| * stored in the format defined in writeHashEntry(). |
| * |
| * It is the caller's responsibility to link the entry into the hash table. |
| * |
| * If no enough memory is available, this function returns NULL. |
| */ |
| static HashAggEntry * |
| makeHashAggEntryForGroup(AggState *aggstate, void *tuple_and_aggs, |
| int32 input_size, uint32 hashvalue) |
| { |
| HashAggEntry *entry; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| MemTupleBinding *mt_bind = aggstate->hashslot->tts_mt_bind; |
| void *copy_tuple_and_aggs; |
| |
| MemoryContext oldcxt; |
| |
| if (GET_TOTAL_USED_SIZE(hashtable) + input_size >= hashtable->max_mem) |
| return NULL; |
| |
| copy_tuple_and_aggs = mpool_alloc(hashtable->group_buf, input_size); |
| memcpy(copy_tuple_and_aggs, tuple_and_aggs, input_size); |
| |
| oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt); |
| |
| entry = getEmptyHashAggEntry(aggstate); |
| entry->hashvalue = hashvalue; |
| entry->is_primodial = !(hashtable->is_spilling); |
| entry->tuple_and_aggs = copy_tuple_and_aggs; |
| entry->next = NULL; |
| |
| /* Initialize per group data */ |
| adjustInputGroup(aggstate, entry->tuple_and_aggs, mt_bind); |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| return entry; |
| } |
| |
| /* |
| * Function: setGroupAggs |
| * |
| * Set the groupaggs buffer in the hashtable to point to the right place |
| * in the given hash entry. |
| */ |
| static inline void |
| setGroupAggs(HashAggTable *hashtable, MemTupleBinding *mt_bind, HashAggEntry *entry) |
| { |
| Assert(mt_bind != NULL); |
| |
| if (entry != NULL) |
| { |
| int tup_len = memtuple_get_size((MemTuple)entry->tuple_and_aggs, mt_bind); |
| hashtable->groupaggs->tuple = (MemTuple)entry->tuple_and_aggs; |
| hashtable->groupaggs->aggs = (AggStatePerGroup) |
| ((char *)entry->tuple_and_aggs + MAXALIGN(tup_len)); |
| } |
| } |
| |
| /* |
| * Function: lookup_agg_hash_entry |
| * |
| * Returns a pointer to the old or new hash table entry corresponding |
| * to the input record, or NULL if there is no such an entry (and |
| * the table is full). |
| * |
| * The input record can be one of the following two types: |
| * TableTupleSlot -- representing an input tuple |
| * a byte array -- representing a contiguous space that contains |
| * both group keys and aggregate values. |
| * 'input_size' represents how many bytes this byte array has. |
| * |
| * If an entry is returned and isNew is non-NULL, (*p_isnew) is set to true |
| * or false depending on whether the returned entry is new. Note that |
| * a new entry will have *initialized* per-group data (Aggref states). |
| */ |
| static HashAggEntry * |
| lookup_agg_hash_entry(AggState *aggstate, |
| void *input_record, |
| InputRecordType input_type, int32 input_size, |
| uint32 hashkey, unsigned parent_hash_bit, bool *p_isnew) |
| { |
| HashAggEntry *entry; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| MemTupleBinding *mt_bind = aggstate->hashslot->tts_mt_bind; |
| ExprContext *tmpcontext = aggstate->tmpcontext; /* per input tuple context */ |
| Agg *agg = (Agg*)aggstate->ss.ps.plan; |
| MemoryContext oldcxt; |
| unsigned int bucket_idx; |
| uint64 bloomval; /* bloom filter value */ |
| |
| Assert(mt_bind != NULL); |
| |
| if (p_isnew != NULL) |
| *p_isnew = false; |
| |
| oldcxt = MemoryContextSwitchTo(tmpcontext->ecxt_per_tuple_memory); |
| bucket_idx = (hashkey >> parent_hash_bit) % (hashtable->nbuckets); |
| bloomval = ((uint64)1) << ((hashkey >> 23) & 0x3f); |
| entry = (0 == (hashtable->bloom[bucket_idx] & bloomval) ? NULL : |
| hashtable->buckets[bucket_idx]); |
| |
| /* |
| * Search entry chain for the bucket. If such an entry found in the |
| * chain, move it to the front of the chain. Otherwise, if there |
| * are any space left, create a new entry, and insert it in |
| * the front of the chain. |
| */ |
| while (entry != NULL) |
| { |
| MemTuple mtup = (MemTuple) entry->tuple_and_aggs; |
| int i; |
| bool match = true; |
| |
| if (hashkey != entry->hashvalue) |
| { |
| entry = entry->next; |
| continue; |
| } |
| |
| for (i = 0; match && i < agg->numCols; i++) |
| { |
| AttrNumber att = agg->grpColIdx[i]; |
| Datum input_datum = 0; |
| Datum entry_datum = 0; |
| bool input_isNull = false; |
| bool entry_isNull = false; |
| |
| switch(input_type) |
| { |
| case INPUT_RECORD_TUPLE: |
| input_datum = slot_getattr((TupleTableSlot *)input_record, att, &input_isNull); |
| break; |
| case INPUT_RECORD_GROUP_AND_AGGS: |
| input_datum = memtuple_getattr((MemTuple)input_record, mt_bind, att, &input_isNull); |
| break; |
| default: |
| insist_log(false, "invalid record type %d", input_type); |
| } |
| |
| entry_datum = memtuple_getattr(mtup, mt_bind, att, &entry_isNull); |
| |
| if ( !input_isNull && !entry_isNull && |
| (DatumGetBool(FunctionCall2(&aggstate->eqfunctions[i], |
| input_datum, |
| entry_datum)) ) ) |
| continue; /* Both non-NULL and equal. */ |
| match = (input_isNull && entry_isNull);/* NULLs match in group keys. */ |
| } |
| |
| /* Break if found an existing matching entry. */ |
| if (match) |
| break; |
| |
| entry = entry->next; |
| } |
| |
| if (entry == NULL) |
| { |
| /* Create a new matching entry. */ |
| switch(input_type) |
| { |
| case INPUT_RECORD_TUPLE: |
| entry = makeHashAggEntryForInput(aggstate, (TupleTableSlot *)input_record, hashkey); |
| break; |
| case INPUT_RECORD_GROUP_AND_AGGS: |
| entry = makeHashAggEntryForGroup(aggstate, input_record, input_size, hashkey); |
| break; |
| default: |
| insist_log(false, "invalid record type %d", input_type); |
| } |
| |
| if (entry != NULL) |
| { |
| entry->next = hashtable->buckets[bucket_idx]; |
| hashtable->buckets[bucket_idx] = entry; |
| hashtable->bloom[bucket_idx] |= bloomval; |
| |
| hashtable->num_ht_groups++; |
| |
| *p_isnew = true; /* created a new entry */ |
| } |
| /* |
| else no matching entry, and no room to create one. |
| */ |
| } |
| |
| (void) MemoryContextSwitchTo(oldcxt); |
| |
| return entry; |
| } |
| |
| /* Function: calcHashAggTableSizes |
| * |
| * Check if the current memory quota is enough to handle the aggregation |
| * in the hash-based fashion. |
| */ |
| #define OVERHEAD_PER_ENTRY ((double)sizeof(uint32))/gp_hashagg_groups_per_bucket |
| |
| bool |
| calcHashAggTableSizes(double memquota, /* Memory quota in bytes. */ |
| double ngroups, /* Est # of groups. */ |
| int numaggs, /* Est # of aggregate functions */ |
| int keywidth, /* Est per entry size of hash key. */ |
| int transpace, /* Est per entry size of by-ref values. */ |
| bool force, /* true => succeed even if work_mem too small */ |
| HashAggTableSizes *out_hats) |
| { |
| bool expectSpill = false; |
| |
| int entrywidth = sizeof(HashAggEntry) |
| + numaggs * sizeof(AggStatePerGroupData) |
| + keywidth |
| + transpace; |
| |
| double nbuckets; |
| |
| /* Hash Entries */ |
| double nentries; |
| double entrysize = OVERHEAD_PER_ENTRY + entrywidth; |
| |
| double nbatches = 0; |
| double batchfile_buffer_size = BATCHFILE_METADATA; |
| |
| elog(HHA_MSG_LVL, "HashAgg: ngroups = %g, numaggs = %d, keywidth = %d, transpace = %d", |
| ngroups, numaggs, keywidth, transpace); |
| elog(HHA_MSG_LVL, "HashAgg: memquota = %g, entrysize = %g, batchfile_buffer_size = %g", |
| memquota, entrysize, batchfile_buffer_size); |
| |
| /* |
| * When all groups can not fit in the memory, we compute |
| * the number of batches to store spilled groups. Currently, we always |
| * set the number of batches to gp_hashagg_default_nbatches. |
| */ |
| if (memquota < ngroups*entrysize) |
| { |
| /* Set nbatches to its default value. */ |
| nbatches = gp_hashagg_default_nbatches; |
| expectSpill = true; |
| |
| /* If the memory quota is smaller than the overhead for batch files, |
| * return false. Note that we will always keep at most (nbatches + 1) |
| * batches in the memory. |
| */ |
| if (memquota < (nbatches + 1) * batchfile_buffer_size) |
| { |
| elog(HHA_MSG_LVL, "HashAgg: not enough memory for the overhead of batch files."); |
| return false; |
| } |
| } |
| |
| nentries = floor((memquota - nbatches * batchfile_buffer_size) / entrysize); |
| |
| /* Allocate at least a few hash entries regardless of memquota. */ |
| nentries = Max(nentries, gp_hashagg_groups_per_bucket); |
| |
| nbuckets = ceil(nentries/gp_hashagg_groups_per_bucket); |
| |
| /* Set nbuckets to the power of 2. */ |
| nbuckets = (((unsigned)1) << ((unsigned)ceil(log(nbuckets) / log(2)))); |
| |
| /* |
| * Always set nbuckets greater than gp_hashagg_default_nbatches since |
| * the spilling relies on this fact to choose which files to spill |
| * groups to. |
| */ |
| if (nbuckets < gp_hashagg_default_nbatches) |
| nbuckets = gp_hashagg_default_nbatches; |
| |
| if (nbatches > UINT_MAX || nentries > UINT_MAX || nbuckets > UINT_MAX) |
| { |
| if (force) |
| { |
| insist_log(false, "too many passes or hash entries"); |
| } |
| else |
| { |
| elog(HHA_MSG_LVL, "HashAgg: number of passes or hash entries bigger than int type!"); |
| return false; /* Too many groups. */ |
| } |
| } |
| |
| if (out_hats) |
| { |
| out_hats->nbuckets = (unsigned)nbuckets; |
| out_hats->nentries = (unsigned)nentries; |
| out_hats->nbatches = (unsigned)nbatches; |
| out_hats->hashentry_width = entrysize; |
| out_hats->spill = expectSpill; |
| out_hats->workmem_initial = (unsigned)(nbatches * batchfile_buffer_size); |
| out_hats->workmem_per_entry = (unsigned)entrysize; |
| } |
| |
| elog(HHA_MSG_LVL, "HashAgg: nbuckets = %d, nentries = %d, nbatches = %d", |
| (int)nbuckets, (int)nentries, (int)nbatches); |
| elog(HHA_MSG_LVL, "HashAgg: expected memory footprint = %d", |
| (int)( nentries*entrywidth + nbuckets*sizeof(HashAggEntry*) + nbatches*batchfile_buffer_size)); |
| |
| return true; |
| } |
| |
| /* Function: est_hash_tuple_size |
| * |
| * Estimate the average memory requirement for the grouping key HeapTuple |
| * in a hash table entry. |
| * |
| * This function purports to know (perhaps too much) about the format in |
| * which a tuple representing a grouping key (non-aggregated attrbutes, |
| * actually) will be stored. But it's all guess work, so no sense being |
| * too fussy. |
| */ |
| static Size est_hash_tuple_size(TupleTableSlot *hashslot, List *hash_needed) |
| { |
| Form_pg_attribute *attrs; |
| Form_pg_attribute attr; |
| int natts; |
| ListCell *lc; |
| Size len; |
| TupleDesc tupleDescriptor; |
| |
| tupleDescriptor = hashslot->tts_tupleDescriptor; |
| attrs = tupleDescriptor->attrs; |
| natts = tupleDescriptor->natts; |
| |
| len = offsetof(HeapTupleHeaderData, t_bits); |
| len += BITMAPLEN(natts); |
| |
| if (tupleDescriptor->tdhasoid) |
| len += sizeof(Oid); |
| |
| len = MAXALIGN(len); |
| |
| /* Add data sizes to len. */ |
| foreach( lc, hash_needed ) |
| { |
| int varNumber = lfirst_int(lc) - 1; |
| attr = attrs[varNumber]; |
| |
| Assert( !attr->attisdropped ); |
| |
| len = att_align(len, attr->attalign); |
| len += get_typavgwidth(attr->atttypid, attr->atttypmod); |
| } |
| |
| len = MAXALIGN(HEAPTUPLESIZE + len); |
| |
| elog(HHA_MSG_LVL, "HashAgg: estimated hash tuple size is %d", (int)len); |
| |
| return len; |
| } |
| |
| /* Function: create_agg_hash_table |
| * |
| * Creates and initializes a hash table for the given AggState. Should be |
| * called after the rest of the AggState is initialized. The resulting table |
| * should be installed in the AggState. |
| * |
| * The main control structure for the hash table is allocated in the memory |
| * context aggstate->aggcontext as is the bucket array, the hashtable and the items related |
| * to overflow files. |
| */ |
| HashAggTable * |
| create_agg_hash_table(AggState *aggstate) |
| { |
| HashAggTable *hashtable; |
| Agg *agg = (Agg *)aggstate->ss.ps.plan; |
| MemoryContext oldcxt; |
| |
| oldcxt = MemoryContextSwitchTo(aggstate->aggcontext); |
| hashtable = (HashAggTable *)palloc0(sizeof(HashAggTable)); |
| |
| hashtable->entry_cxt = AllocSetContextCreate(aggstate->aggcontext, |
| "HashAggTableContext", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| bool can_reuse_workfiles = false; |
| workfile_set *work_set = NULL; |
| if (gp_workfile_caching) |
| { |
| /* Look up SFR for existing spill set. Mark here if found */ |
| work_set = workfile_mgr_find_set(&aggstate->ss.ps); |
| /* |
| * Workaround for case when reusing an existing spill set would |
| * use too much metadata memory and might cause respilling: |
| * don't allow reusing for these sets for now. |
| */ |
| can_reuse_workfiles = (work_set != NULL) && |
| work_set->metadata.num_leaf_files <= NO_RESERVED_BATCHFILE_METADATA; |
| } |
| |
| uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate); |
| if (gp_workfile_caching && ! can_reuse_workfiles) |
| { |
| uint64 reservedMem = NO_RESERVED_BATCHFILE_METADATA * (BATCHFILE_METADATA - FREEABLE_BATCHFILE_METADATA); |
| operatorMemKB = operatorMemKB - reservedMem / 1024; |
| elog(gp_workfile_caching_loglevel, "HashAgg: reserved " INT64_FORMAT "KB for spilling", reservedMem / 1024); |
| } |
| |
| if (!calcHashAggTableSizes(1024.0 * (double) operatorMemKB, |
| (double)agg->numGroups, |
| aggstate->numaggs, |
| Min(est_hash_tuple_size(aggstate->ss.ss_ScanTupleSlot, |
| aggstate->hash_needed), |
| agg->plan.plan_width), |
| agg->transSpace, |
| true, |
| &(hashtable->hats))) |
| { |
| elog(ERROR, ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY); |
| } |
| |
| if (can_reuse_workfiles) |
| { |
| aggstate->cached_workfiles_found = true; |
| hashtable->work_set = work_set; |
| /* Initialize hashtable parameters from the cached workfile */ |
| hashtable->hats.nbatches = work_set->metadata.num_leaf_files; |
| hashtable->hats.nbuckets = work_set->metadata.buckets; |
| hashtable->num_batches = work_set->metadata.num_leaf_files; |
| } |
| |
| /* Initialize the hash buckets */ |
| hashtable->nbuckets = hashtable->hats.nbuckets; |
| hashtable->total_buckets = hashtable->nbuckets; |
| hashtable->buckets = (HashAggEntry **)palloc0(hashtable->nbuckets * sizeof(HashAggEntry *)); |
| hashtable->bloom = (uint64 *)palloc0(hashtable->nbuckets * sizeof(uint64)); |
| |
| MemoryContextSwitchTo(hashtable->entry_cxt); |
| |
| /* Initialize buffer for hash entries */ |
| CdbCellBuf_InitEasy(&(hashtable->entry_buf), sizeof(HashAggEntry)); |
| hashtable->group_buf = mpool_create(hashtable->entry_cxt, |
| "GroupsAndAggs Context"); |
| hashtable->groupaggs = (GroupKeysAndAggs *)palloc0(sizeof(GroupKeysAndAggs)); |
| |
| /* Set hashagg's memory manager */ |
| aggstate->mem_manager.alloc = mpool_cxt_alloc; |
| aggstate->mem_manager.free = NULL; |
| aggstate->mem_manager.manager = hashtable->group_buf; |
| aggstate->mem_manager.realloc_ratio = 2; |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| hashtable->max_mem = 1024.0 * operatorMemKB; |
| hashtable->mem_for_metadata = sizeof(HashAggTable) |
| + hashtable->nbuckets * sizeof(HashAggEntry *) |
| + hashtable->nbuckets * sizeof(uint64) |
| + sizeof(GroupKeysAndAggs); |
| hashtable->mem_wanted = hashtable->mem_for_metadata; |
| hashtable->mem_used = hashtable->mem_for_metadata; |
| |
| hashtable->prev_slot = NULL; |
| |
| MemSet(padding_dummy, 0, MAXIMUM_ALIGNOF); |
| |
| init_agg_hash_iter(hashtable); |
| |
| return hashtable; |
| } |
| |
| /* Function: agg_hash_initial_pass |
| * |
| * Performs ExecAgg initialization for the first pass of the hashed case: |
| * - reads the input tuples, |
| * - builds a hash table with an entry per group, |
| * - spills all groups in the hash table to several overflow batches |
| * to be processed during later passes. |
| * |
| * Note that overflowed groups are distributed to batches in such |
| * a way that groups with matching grouping keys will be in the same |
| * batch. |
| * |
| * When called, CurrentMemoryContext should be the per-query context. |
| */ |
| bool |
| agg_hash_initial_pass(AggState *aggstate) |
| { |
| HashAggTable *hashtable = aggstate->hhashtable; |
| ExprContext *tmpcontext = aggstate->tmpcontext; /* per input tuple context */ |
| TupleTableSlot *outerslot = NULL; |
| bool streaming = ((Agg *) aggstate->ss.ps.plan)->streaming; |
| bool tuple_remaining = true; |
| MemTupleBinding *mt_bind = aggstate->hashslot->tts_mt_bind; |
| |
| Assert(hashtable); |
| AssertImply(!streaming, hashtable->state == HASHAGG_BEFORE_FIRST_PASS); |
| elog(HHA_MSG_LVL, |
| "HashAgg: initial pass -- beginning to load hash table"); |
| |
| /* If we found cached workfiles, initialize and load the batch data here */ |
| if (gp_workfile_caching && aggstate->cached_workfiles_found) |
| { |
| elog(HHA_MSG_LVL, "Found existing SFS, reloading data from %s", hashtable->work_set->path); |
| /* Initialize all structures as if we just spilled everything */ |
| hashtable->spill_set = read_spill_set(aggstate); |
| aggstate->hhashtable->is_spilling = true; |
| aggstate->cached_workfiles_loaded = true; |
| |
| elog(gp_workfile_caching_loglevel, "HashAgg reusing cached workfiles, initiating Squelch walker"); |
| PlanState *outerNode = outerPlanState(aggstate); |
| ExecSquelchNode(outerNode); |
| |
| /* tuple table initialization */ |
| ScanState *scanstate = & aggstate->ss; |
| PlanState *outerPlan = outerPlanState(scanstate); |
| TupleDesc tupDesc = ExecGetResultType(outerPlan); |
| |
| if (aggstate->ss.ps.instrument) |
| { |
| aggstate->ss.ps.instrument->workfileReused = true; |
| } |
| |
| /* Initialize hashslot by cloning input slot. */ |
| ExecSetSlotDescriptor(aggstate->hashslot, tupDesc); |
| ExecStoreAllNullTuple(aggstate->hashslot); |
| mt_bind = aggstate->hashslot->tts_mt_bind; |
| |
| |
| return tuple_remaining; |
| } |
| |
| /* |
| * Check if an input tuple has been read, but not processed |
| * because of lack of space before streaming the results |
| * in the last call. |
| */ |
| if (aggstate->hashslot->tts_tupleDescriptor != NULL && |
| hashtable->prev_slot != NULL) |
| { |
| outerslot = hashtable->prev_slot; |
| hashtable->prev_slot = NULL; |
| } |
| |
| else |
| { |
| outerslot = ExecProcNode(outerPlanState(aggstate)); |
| } |
| |
| /* |
| * Process outer-plan tuples, until we exhaust the outer plan. |
| */ |
| hashtable->pass = 0; |
| |
| while(true) |
| { |
| HashKey hashkey; |
| bool isNew; |
| HashAggEntry *entry; |
| |
| /* no more tuple. Done */ |
| if (TupIsNull(outerslot)) |
| { |
| tuple_remaining = false; |
| break; |
| } |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN); |
| |
| if (aggstate->hashslot->tts_tupleDescriptor == NULL) |
| { |
| int size; |
| |
| /* Initialize hashslot by cloning input slot. */ |
| ExecSetSlotDescriptor(aggstate->hashslot, outerslot->tts_tupleDescriptor); |
| ExecStoreAllNullTuple(aggstate->hashslot); |
| mt_bind = aggstate->hashslot->tts_mt_bind; |
| |
| size = ((Agg *)aggstate->ss.ps.plan)->numCols * sizeof(HashKey); |
| |
| hashtable->hashkey_buf = (HashKey *)palloc0(size); |
| hashtable->mem_for_metadata += size; |
| } |
| |
| /* set up for advance_aggregates call */ |
| tmpcontext->ecxt_scantuple = outerslot; |
| |
| /* Find or (if there's room) build a hash table entry for the |
| * input tuple's group. */ |
| hashkey = calc_hash_value(aggstate, outerslot); |
| entry = lookup_agg_hash_entry(aggstate, (void *)outerslot, |
| INPUT_RECORD_TUPLE, 0, hashkey, 0, &isNew); |
| |
| if (entry == NULL) |
| { |
| if (GET_TOTAL_USED_SIZE(hashtable) > hashtable->mem_used) |
| hashtable->mem_used = GET_TOTAL_USED_SIZE(hashtable); |
| |
| if (hashtable->num_ht_groups <= 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)); |
| |
| /* |
| * If stream_bottom is on, we store outerslot into hashslot, so that |
| * we can process it later. |
| */ |
| if (streaming) |
| { |
| Assert(tuple_remaining); |
| hashtable->prev_slot = outerslot; |
| break; |
| } |
| |
| /* CDB: Report statistics for EXPLAIN ANALYZE. */ |
| if (!hashtable->is_spilling && aggstate->ss.ps.instrument) |
| agg_hash_table_stat_upd(hashtable); |
| |
| spill_hash_table(aggstate); |
| |
| entry = lookup_agg_hash_entry(aggstate, (void *)outerslot, |
| INPUT_RECORD_TUPLE, 0, hashkey, 0, &isNew); |
| } |
| |
| setGroupAggs(hashtable, mt_bind, entry); |
| |
| if (isNew) |
| { |
| int tup_len = memtuple_get_size((MemTuple)entry->tuple_and_aggs, mt_bind); |
| MemSet((char *)entry->tuple_and_aggs + MAXALIGN(tup_len), 0, |
| aggstate->numaggs * sizeof(AggStatePerGroupData)); |
| initialize_aggregates(aggstate, aggstate->peragg, hashtable->groupaggs->aggs, |
| &(aggstate->mem_manager)); |
| } |
| |
| /* Advance the aggregates */ |
| advance_aggregates(aggstate, hashtable->groupaggs->aggs, &(aggstate->mem_manager)); |
| |
| hashtable->num_tuples++; |
| |
| /* Reset per-input-tuple context after each tuple */ |
| ResetExprContext(tmpcontext); |
| |
| if (streaming && !HAVE_FREESPACE(hashtable)) |
| { |
| Assert(tuple_remaining); |
| ExecClearTuple(aggstate->hashslot); |
| break; |
| } |
| |
| /* Read the next tuple */ |
| outerslot = ExecProcNode(outerPlanState(aggstate)); |
| } |
| |
| if (GET_TOTAL_USED_SIZE(hashtable) > hashtable->mem_used) |
| hashtable->mem_used = GET_TOTAL_USED_SIZE(hashtable); |
| |
| if (hashtable->is_spilling) |
| { |
| int freed_size = 0; |
| |
| /* |
| * Split out the rest of groups in the hashtable if spilling has already |
| * happened. This is because none of these groups can be immediately outputted |
| * any more. |
| */ |
| spill_hash_table(aggstate); |
| freed_size = suspendSpillFiles(hashtable->spill_set); |
| hashtable->mem_for_metadata -= freed_size; |
| |
| if (aggstate->ss.ps.instrument) |
| { |
| aggstate->ss.ps.instrument->workfileCreated = true; |
| } |
| } |
| |
| /* CDB: Report statistics for EXPLAIN ANALYZE. */ |
| if (!hashtable->is_spilling && aggstate->ss.ps.instrument) |
| agg_hash_table_stat_upd(hashtable); |
| |
| AssertImply(tuple_remaining, streaming); |
| if(tuple_remaining) |
| elog(HHA_MSG_LVL, "HashAgg: streaming out the intermediate results."); |
| |
| return tuple_remaining; |
| } |
| |
| /* Create a spill set for the given branching_factor (a power of two) |
| * and hash key range. |
| * |
| * Use the arguments to derive break values. All but the first spill |
| * file carries the minimum hash key value that may appear in the file. |
| * The minimum value of the first spill file in a set is unused, but is |
| * set to the minimum of the spill set (debugging). The break values are |
| * chosen to distribute hash key values evenly across the given range. |
| */ |
| static SpillSet * |
| createSpillSet(unsigned branching_factor, unsigned parent_hash_bit) |
| { |
| int i; |
| SpillSet *spill_set; |
| |
| /* Allocate and initialize the SpillSet. */ |
| spill_set = palloc(sizeof(SpillSet) + (branching_factor-1) * sizeof (SpillFile)); |
| spill_set->parent_spill_file = NULL; |
| spill_set->level = 0; |
| spill_set->num_spill_files = branching_factor; |
| |
| /* Allocate and initialize its SpillFiles. */ |
| for ( i = 0; i < branching_factor; i++ ) |
| { |
| SpillFile *spill_file = &spill_set->spill_files[i]; |
| |
| spill_file->file_info = NULL; |
| spill_file->spill_set = NULL; |
| spill_file->parent_spill_set = spill_set; |
| spill_file->index_in_parent = i; |
| spill_file->respilled = false; |
| spill_file->batch_hash_bit = parent_hash_bit; |
| } |
| |
| elog(HHA_MSG_LVL, "HashAgg: created a new spill set with batch_hash_bit=%u, num_spill_files=%u", |
| parent_hash_bit, branching_factor); |
| |
| return spill_set; |
| } |
| |
| /* |
| * Free the space for a given SpillSet, and return the bytes that are freed. |
| */ |
| static inline int |
| freeSpillSet(SpillSet *spill_set) |
| { |
| int freedspace = 0; |
| if (spill_set == NULL) |
| { |
| return freedspace; |
| } |
| |
| elog(gp_workfile_caching_loglevel, "freeing up SpillSet with %d files", spill_set->num_spill_files); |
| |
| freedspace += sizeof(SpillSet) + (spill_set->num_spill_files - 1) * sizeof (SpillFile); |
| pfree(spill_set); |
| |
| return freedspace; |
| } |
| |
| |
| /* Get the spill file to which to spill a hash entry with the given key. |
| * |
| * |
| * If the temporary file has not been created for this spill file, it |
| * will be created in this function. The buffer space required for |
| * this temporary file is returned through 'p_alloc_size'. |
| */ |
| static SpillFile * |
| getSpillFile(workfile_set *work_set, SpillSet *set, int file_no, int *p_alloc_size) |
| { |
| SpillFile *spill_file; |
| |
| *p_alloc_size = 0; |
| |
| /* Find the right spill file */ |
| Assert(set != NULL); |
| spill_file = &set->spill_files[file_no]; |
| |
| if (spill_file->file_info == NULL) |
| { |
| |
| spill_file->file_info = (BatchFileInfo *)palloc(sizeof(BatchFileInfo)); |
| spill_file->file_info->total_bytes = 0; |
| spill_file->file_info->ntuples = 0; |
| /* Initialize to NULL in case the create function below throws an exception */ |
| spill_file->file_info->wfile = NULL; |
| spill_file->file_info->wfile = workfile_mgr_create_file(work_set); |
| |
| elog(HHA_MSG_LVL, "HashAgg: create %d level batch file %d with compression %d", |
| set->level, file_no, work_set->metadata.bfz_compress_type); |
| |
| *p_alloc_size = BATCHFILE_METADATA; |
| } |
| |
| return spill_file; |
| } |
| |
| /* |
| * suspendSpillFiles -- temporary suspend all spill files so that we |
| * can have more space for the hash table. |
| */ |
| static int |
| suspendSpillFiles(SpillSet *spill_set) |
| { |
| int file_no; |
| int freed_size = 0; |
| |
| if (spill_set == NULL || |
| spill_set->num_spill_files == 0) |
| return 0; |
| |
| for (file_no = 0; file_no < spill_set->num_spill_files; file_no++) |
| { |
| SpillFile *spill_file = &spill_set->spill_files[file_no]; |
| |
| if (spill_file->file_info && |
| spill_file->file_info->wfile != NULL) |
| { |
| ExecWorkFile_Suspend(spill_file->file_info->wfile); |
| |
| freed_size += FREEABLE_BATCHFILE_METADATA; |
| |
| elog(HHA_MSG_LVL, "HashAgg: %s contains " INT64_FORMAT " entries (" |
| INT64_FORMAT " bytes)", |
| spill_file->file_info->wfile->fileName, |
| spill_file->file_info->ntuples, spill_file->file_info->total_bytes); |
| } |
| } |
| |
| return freed_size; |
| } |
| |
| /* |
| * closeSpillFile -- close a given spill file and return its freed buffer |
| * space. All files under its spill_set are also closed. |
| */ |
| static int |
| closeSpillFile(AggState *aggstate, SpillSet *spill_set, int file_no) |
| { |
| int freedspace = 0; |
| SpillFile *spill_file; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| |
| Assert(spill_set != NULL && file_no < spill_set->num_spill_files); |
| |
| spill_file = &spill_set->spill_files[file_no]; |
| |
| if (spill_file->file_info && |
| gp_workfile_caching && |
| aggstate->workfiles_created && |
| !spill_file->respilled) |
| { |
| Assert(hashtable->state_file); |
| /* closing "leaf" spill file; save it's name to the state file for re-using */ |
| agg_hash_save_spillfile_info(hashtable->state_file, spill_file); |
| hashtable->work_set->metadata.num_leaf_files++; |
| } |
| |
| if (spill_file->spill_set != NULL) |
| { |
| freedspace += closeSpillFiles(aggstate, spill_file->spill_set); |
| } |
| |
| if (spill_file->file_info && |
| spill_file->file_info->wfile != NULL) |
| { |
| workfile_mgr_close_file(hashtable->work_set, spill_file->file_info->wfile); |
| spill_file->file_info->wfile = NULL; |
| freedspace += (BATCHFILE_METADATA - sizeof(BatchFileInfo)); |
| |
| } |
| if (spill_file->file_info) |
| { |
| pfree(spill_file->file_info); |
| spill_file->file_info = NULL; |
| freedspace += sizeof(BatchFileInfo); |
| } |
| |
| return freedspace; |
| } |
| |
| /* |
| * closeSpillFiles -- close all spill files for a given spill set to |
| * save the buffer space, and return how much space freed. |
| */ |
| static int |
| closeSpillFiles(AggState *aggstate, SpillSet *spill_set) |
| { |
| int file_no; |
| int freedspace = 0; |
| |
| if (spill_set == NULL || |
| spill_set->num_spill_files == 0) |
| return 0; |
| |
| for (file_no = 0; file_no < spill_set->num_spill_files; file_no++) |
| { |
| freedspace += closeSpillFile(aggstate, spill_set, file_no); |
| } |
| |
| return freedspace; |
| } |
| |
| /* |
| * Obtain the spill set to which the overflown entries are spilled. |
| * |
| * If such a spill set does not exist, it is created here. |
| * |
| * The statistics in the hashtable is also updated. |
| */ |
| static SpillSet * |
| obtain_spill_set(HashAggTable *hashtable) |
| { |
| SpillSet **p_spill_set; |
| unsigned set_hash_bit = 0; |
| |
| if (hashtable->curr_spill_file != NULL) |
| { |
| SpillSet *parent_spill_set; |
| p_spill_set = &(hashtable->curr_spill_file->spill_set); |
| parent_spill_set = hashtable->curr_spill_file->parent_spill_set; |
| Assert(parent_spill_set != NULL); |
| unsigned parent_hash_bit = hashtable->curr_spill_file->batch_hash_bit; |
| set_hash_bit = parent_hash_bit + |
| (unsigned)ceil(log(parent_spill_set->num_spill_files)/log(2)); |
| } |
| else |
| p_spill_set = &(hashtable->spill_set); |
| |
| if (*p_spill_set == NULL) |
| { |
| /* |
| * The optimizer may estimate that there is no need for a spill. However, |
| * it is wrong in this case. We need to set nbatches to its rightful value. |
| */ |
| if (hashtable->hats.nbatches == 0) |
| { |
| elog(DEBUG2, "Not all groups fit into memory; writing to disk"); |
| hashtable->hats.nbatches = gp_hashagg_default_nbatches; |
| } |
| |
| *p_spill_set = createSpillSet(hashtable->hats.nbatches, set_hash_bit); |
| |
| hashtable->num_overflows++; |
| hashtable->mem_for_metadata += |
| sizeof(SpillSet) + |
| (hashtable->hats.nbatches - 1) * sizeof(SpillFile); |
| |
| SANITY_CHECK_METADATA_SIZE(hashtable); |
| |
| if (hashtable->curr_spill_file != NULL) |
| { |
| hashtable->curr_spill_file->spill_set->level = |
| hashtable->curr_spill_file->parent_spill_set->level + 1; |
| hashtable->curr_spill_file->spill_set->parent_spill_file = |
| hashtable->curr_spill_file; |
| } |
| } |
| |
| return *p_spill_set; |
| } |
| |
| /* |
| * read_spill_set |
| * Read a previously written spill file set. |
| * |
| * The statistics in the hashtable is also updated. |
| */ |
| static SpillSet * |
| read_spill_set(AggState *aggstate) |
| { |
| Assert(aggstate != NULL); |
| Assert(aggstate->hhashtable != NULL); |
| Assert(aggstate->hhashtable->spill_set == NULL); |
| Assert(aggstate->hhashtable->curr_spill_file == NULL); |
| Assert(aggstate->hhashtable->work_set); |
| Assert(aggstate->hhashtable->work_set->metadata.num_leaf_files == aggstate->hhashtable->num_batches); |
| Assert(aggstate->hhashtable->work_set->metadata.buckets == aggstate->hhashtable->nbuckets); |
| |
| workfile_set *work_set = aggstate->hhashtable->work_set; |
| uint32 alloc_size = 0; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| |
| /* |
| * Create spill set. Initialize each batch hash bit with 0. We'll set them to the right |
| * value individually below. |
| */ |
| int default_hash_bit = 0; |
| SpillSet *spill_set = createSpillSet(work_set->metadata.num_leaf_files, default_hash_bit); |
| |
| /* |
| * Read metadata file to determine number and name of work files in the set |
| * Format of state file: |
| * - [name_of_leaf_workfile | batch_hash_bit] x N |
| */ |
| hashtable->state_file = workfile_mgr_open_fileno(work_set, WORKFILE_NUM_HASHAGG_METADATA); |
| Assert(hashtable->state_file != NULL); |
| |
| /* |
| * Read, allocate and open all spill files. |
| * The spill files are opened in reverse order when saving tuples, |
| * so re-open them in the same order. |
| */ |
| uint32 no_filenames_read = 0; |
| while(true) |
| { |
| |
| char *batch_filename = NULL; |
| unsigned read_batch_hashbit; |
| bool more_spillfiles = agg_hash_load_spillfile_info(hashtable->state_file, &batch_filename, &read_batch_hashbit); |
| if (!more_spillfiles) |
| { |
| break; |
| } |
| |
| uint32 current_spill_file_no = work_set->metadata.num_leaf_files - no_filenames_read - 1; |
| Assert(current_spill_file_no >= 0); |
| |
| SpillFile *spill_file = &spill_set->spill_files[current_spill_file_no]; |
| Assert(spill_file->index_in_parent == current_spill_file_no); |
| spill_file->batch_hash_bit = read_batch_hashbit; |
| |
| spill_file->file_info = (BatchFileInfo *)palloc(sizeof(BatchFileInfo)); |
| |
| spill_file->file_info->wfile = |
| ExecWorkFile_Open(batch_filename, BFZ, false /* delOnClose */, |
| work_set->metadata.bfz_compress_type); |
| |
| Assert(spill_file->file_info->wfile != NULL); |
| Assert(batch_filename != NULL); |
| pfree(batch_filename); |
| |
| spill_file->file_info->total_bytes = ExecWorkFile_GetSize(spill_file->file_info->wfile); |
| /* Made up values for this since we don't know it at this point */ |
| spill_file->file_info->ntuples = 1; |
| |
| elog(HHA_MSG_LVL, "HashAgg: OPEN %d level batch file %d with compression %d", |
| spill_set->level, no_filenames_read, work_set->metadata.bfz_compress_type); |
| /* |
| * bfz_open automatically frees up the freeable_stuff structure. |
| * Subtract that from the allocated size here. |
| */ |
| alloc_size += BATCHFILE_METADATA - FREEABLE_BATCHFILE_METADATA; |
| |
| no_filenames_read++; |
| } |
| |
| Assert(work_set->metadata.num_leaf_files == no_filenames_read); |
| |
| /* Update statistics */ |
| hashtable->num_overflows++; |
| hashtable->mem_for_metadata += |
| sizeof(SpillSet) + |
| (hashtable->hats.nbatches - 1) * sizeof(SpillFile); |
| #ifdef USE_ASSERT_CHECKING |
| SANITY_CHECK_METADATA_SIZE(hashtable); |
| #endif |
| |
| hashtable->mem_for_metadata += alloc_size; |
| if (alloc_size > 0) |
| { |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_SPILLBATCH); |
| CheckSendPlanStateGpmonPkt(&aggstate->ss.ps); |
| } |
| |
| |
| return spill_set; |
| } |
| |
| /* Spill all entries from the hash table to file in order to make room |
| * for new hash entries. |
| * |
| * We want to maximize the sequential writes to each spill file. Since |
| * the number of buckets and the number of batches (#batches) are the power of 2, |
| * We simply write bucket 0, #batches, 2 * #batches, ... to the batch 0; |
| * write bucket 1, (#batches + 1), (2 * #batches + 1), ... to the batch 1; |
| * and etc. |
| */ |
| static void |
| spill_hash_table(AggState *aggstate) |
| { |
| HashAggTable *hashtable = aggstate->hhashtable; |
| SpillSet *spill_set; |
| SpillFile *spill_file; |
| int bucket_no; |
| int file_no; |
| MemoryContext oldcxt; |
| uint64 old_num_spill_groups = hashtable->num_spill_groups; |
| |
| spill_set = obtain_spill_set(hashtable); |
| |
| oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt); |
| |
| /* Spill set does not have a workfile_set. Use existing or create new one as needed */ |
| if (hashtable->work_set == NULL) |
| { |
| hashtable->work_set = workfile_mgr_create_set(BFZ, true /* can_be_reused */, &aggstate->ss.ps, NULL_SNAPSHOT); |
| hashtable->work_set->metadata.buckets = hashtable->nbuckets; |
| if (gp_workfile_caching) |
| { |
| create_state_file(hashtable); |
| } |
| aggstate->workfiles_created = true; |
| } |
| |
| /* Book keeping. */ |
| hashtable->is_spilling = true; |
| |
| Assert(hashtable->nbuckets > spill_set->num_spill_files); |
| |
| /* |
| * Write each spill file. Write the last spill file first, since it will |
| * be processed the last. |
| */ |
| for (file_no = spill_set->num_spill_files - 1; file_no >= 0; file_no--) |
| { |
| int alloc_size = 0; |
| |
| spill_file = getSpillFile(hashtable->work_set, spill_set, file_no, &alloc_size); |
| Assert(spill_file != NULL); |
| |
| hashtable->mem_for_metadata += alloc_size; |
| if (alloc_size > 0) |
| { |
| #ifdef USE_ASSERT_CHECKING |
| SANITY_CHECK_METADATA_SIZE(hashtable); |
| #endif |
| hashtable->num_batches++; |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_SPILLBATCH); |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_BATCH); |
| |
| CheckSendPlanStateGpmonPkt(&aggstate->ss.ps); |
| } |
| |
| for (bucket_no = file_no; bucket_no < hashtable->nbuckets; |
| bucket_no += spill_set->num_spill_files) |
| { |
| HashAggEntry *entry = hashtable->buckets[bucket_no]; |
| |
| /* Ignore empty chains. */ |
| if (entry == NULL) continue; |
| |
| /* Write all entries in the hash chain. */ |
| while ( entry != NULL ) |
| { |
| HashAggEntry *spill_entry = entry; |
| entry = spill_entry->next; |
| |
| if (spill_entry != NULL) |
| { |
| int32 written_bytes; |
| |
| written_bytes = writeHashEntry(aggstate, spill_file->file_info, spill_entry); |
| spill_file->file_info->ntuples++; |
| spill_file->file_info->total_bytes += written_bytes; |
| |
| hashtable->num_spill_groups++; |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_SPILLTUPLE); |
| Gpmon_M_Add(GpmonPktFromAggState(aggstate), GPMON_AGG_SPILLBYTE, written_bytes); |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_TUPLE); |
| Gpmon_M_Add(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_BYTE, written_bytes); |
| } |
| } |
| |
| hashtable->buckets[bucket_no] = NULL; |
| } |
| } |
| |
| /* Reset the buffer */ |
| CdbCellBuf_Reset(&(hashtable->entry_buf)); |
| mpool_reset(hashtable->group_buf); |
| |
| elog(HHA_MSG_LVL, "HashAgg: spill " INT64_FORMAT " groups", |
| hashtable->num_spill_groups - old_num_spill_groups); |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| /* |
| * Create and open state file holding metadata. Used for workfile re-using. |
| */ |
| static void |
| create_state_file(HashAggTable *hashtable) |
| { |
| Assert(hashtable != NULL); |
| hashtable->state_file = workfile_mgr_create_fileno(hashtable->work_set, WORKFILE_NUM_HASHAGG_METADATA); |
| Assert(hashtable->state_file != NULL); |
| } |
| |
| /* |
| * Close state file holding spill set metadata. Used for workfile re-using. |
| */ |
| void |
| agg_hash_close_state_file(HashAggTable *hashtable) |
| { |
| if (hashtable->state_file != NULL) |
| { |
| workfile_mgr_close_file(hashtable->work_set, hashtable->state_file); |
| hashtable->state_file = NULL; |
| } |
| } |
| |
| /* |
| * writeHashEntry -- write an hash entry to a batch file. |
| * |
| * The hash entry is serialized here, including the tuple that contains |
| * grouping keys and aggregate values. |
| * |
| * readHashEntry() should expect to retrieve hash entries in the |
| * format defined in this function. |
| */ |
| static int32 |
| writeHashEntry(AggState *aggstate, BatchFileInfo *file_info, |
| HashAggEntry *entry) |
| { |
| MemTupleBinding *mt_bind = aggstate->hashslot->tts_mt_bind; |
| int32 tuple_agg_size = 0; |
| int32 total_size = 0; |
| AggStatePerGroup pergroup; |
| int aggno; |
| AggStatePerAgg peragg = aggstate->peragg; |
| |
| Assert(file_info != NULL); |
| Assert(file_info->wfile != NULL); |
| |
| ExecWorkFile_Write(file_info->wfile, (void *)(&(entry->hashvalue)), sizeof(entry->hashvalue)); |
| |
| tuple_agg_size = memtuple_get_size((MemTuple)entry->tuple_and_aggs, mt_bind); |
| pergroup = (AggStatePerGroup) ((char *)entry->tuple_and_aggs + MAXALIGN(tuple_agg_size)); |
| tuple_agg_size = MAXALIGN(tuple_agg_size) + |
| aggstate->numaggs * sizeof(AggStatePerGroupData); |
| total_size = MAXALIGN(tuple_agg_size); |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &peragg[aggno]; |
| AggStatePerGroup pergroupstate = &pergroup[aggno]; |
| |
| if (!peraggstate->transtypeByVal && |
| !pergroupstate->transValueIsNull) |
| { |
| Size datum_size = datumGetSize(pergroupstate->transValue, |
| peraggstate->transtypeByVal, |
| peraggstate->transtypeLen); |
| total_size += MAXALIGN(datum_size); |
| } |
| } |
| |
| ExecWorkFile_Write(file_info->wfile, (char *)&total_size, sizeof(total_size)); |
| ExecWorkFile_Write(file_info->wfile, entry->tuple_and_aggs, tuple_agg_size); |
| Assert(MAXALIGN(tuple_agg_size) - tuple_agg_size <= MAXIMUM_ALIGNOF); |
| if (MAXALIGN(tuple_agg_size) - tuple_agg_size > 0) |
| { |
| ExecWorkFile_Write(file_info->wfile, padding_dummy, MAXALIGN(tuple_agg_size) - tuple_agg_size); |
| } |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &peragg[aggno]; |
| AggStatePerGroup pergroupstate = &pergroup[aggno]; |
| |
| if (!peraggstate->transtypeByVal && |
| !pergroupstate->transValueIsNull) |
| { |
| Size datum_size = datumGetSize(pergroupstate->transValue, |
| peraggstate->transtypeByVal, |
| peraggstate->transtypeLen); |
| ExecWorkFile_Write(file_info->wfile, |
| DatumGetPointer(pergroupstate->transValue), datum_size); |
| Assert(MAXALIGN(datum_size) - datum_size <= MAXIMUM_ALIGNOF); |
| if (MAXALIGN(datum_size) - datum_size > 0) |
| { |
| ExecWorkFile_Write(file_info->wfile, |
| padding_dummy, MAXALIGN(datum_size) - datum_size); |
| } |
| } |
| } |
| |
| return (total_size + sizeof(total_size) + sizeof(entry->hashvalue)); |
| } |
| |
| /* |
| * agg_hash_table_stat_upd |
| * collect hash chain statistics for EXPLAIN ANALYZE |
| */ |
| static void |
| agg_hash_table_stat_upd(HashAggTable *ht) |
| { |
| unsigned int i; |
| |
| char hostname[SEGMENT_IDENTITY_NAME_LENGTH]; |
| gethostname(hostname,SEGMENT_IDENTITY_NAME_LENGTH); |
| for (i = 0; i < ht->nbuckets; i++) |
| { |
| HashAggEntry *entry = ht->buckets[i]; |
| int chainlength = 0; |
| |
| if (entry) |
| { |
| for (chainlength = 0; entry; chainlength++) |
| entry = entry->next; |
| |
| cdbexplain_agg_upd(&ht->chainlength, chainlength, i,hostname); |
| } |
| } |
| } /* agg_hash_table_stat_upd */ |
| |
| /* Function: init_agg_hash_iter |
| * |
| * Initialize the HashAggTable's (one and only) entry iterator. */ |
| void init_agg_hash_iter(HashAggTable* hashtable) |
| { |
| Assert( hashtable != NULL && hashtable->buckets != NULL && hashtable->nbuckets > 0 ); |
| |
| hashtable->curr_bucket_idx = -1; |
| hashtable->next_entry = NULL; |
| } |
| |
| /* Function: agg_hash_iter |
| * |
| * Returns a pointer to the next HashAggEntry on the given HashAggTable's |
| * iterator and advances the iterator. Returns NULL when there are no more |
| * entries. Be sure to call init_agg_hash_iter before the first call here. |
| * |
| * During the iteration, this function also writes out entries that belong |
| * to batch files which will be processed later. |
| */ |
| HashAggEntry * |
| agg_hash_iter(AggState *aggstate) |
| { |
| HashAggTable* hashtable = aggstate->hhashtable; |
| HashAggEntry *entry = hashtable->next_entry; |
| SpillSet *spill_set = hashtable->spill_set; |
| MemoryContext oldcxt; |
| |
| Assert( hashtable != NULL && hashtable->buckets != NULL && hashtable->nbuckets > 0 ); |
| |
| if (hashtable->curr_spill_file != NULL) |
| spill_set = hashtable->curr_spill_file->spill_set; |
| |
| oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt); |
| |
| while (entry == NULL && |
| hashtable->nbuckets > ++ hashtable->curr_bucket_idx) |
| { |
| entry = hashtable->buckets[hashtable->curr_bucket_idx]; |
| if (entry != NULL) |
| { |
| Assert(entry->is_primodial); |
| break; |
| } |
| } |
| |
| if (entry != NULL) |
| { |
| hashtable->num_output_groups++; |
| hashtable->next_entry = entry->next; |
| entry->next = NULL; |
| } |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| return entry; |
| } |
| |
| /* |
| * Read the serialized from of a hash entry from the given batch file. |
| * |
| * This function returns a byte array starting with a MemTuple which |
| * represents grouping keys, and being followed by its aggregate values. |
| * The complete format can be found at writeHashEntry(). The size |
| * of the byte array is also returned. |
| * |
| * The byte array is allocated inside the per-tuple memory context. |
| */ |
| static void * |
| readHashEntry(AggState *aggstate, BatchFileInfo *file_info, |
| HashKey *p_hashkey, int32 *p_input_size) |
| { |
| void *tuple_and_aggs = NULL; |
| MemoryContext oldcxt; |
| |
| Assert(file_info != NULL && file_info->wfile != NULL); |
| |
| *p_input_size = 0; |
| |
| if (ExecWorkFile_Read(file_info->wfile, (char *)p_hashkey, sizeof(HashKey)) != sizeof(HashKey)) |
| { |
| return NULL; |
| } |
| |
| if (ExecWorkFile_Read(file_info->wfile, (char *)p_input_size, sizeof(int32)) != |
| sizeof(int32)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("could not read from temporary file: %m"))); |
| } |
| |
| tuple_and_aggs = ExecWorkFile_ReadFromBuffer(file_info->wfile, *p_input_size); |
| if (tuple_and_aggs == NULL) |
| { |
| oldcxt = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); |
| tuple_and_aggs = palloc(*p_input_size); |
| int32 read_size = ExecWorkFile_Read(file_info->wfile, tuple_and_aggs, *p_input_size); |
| if (read_size != *p_input_size) |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("could not read from temporary file, requesting %d bytes, read %d bytes: %m", |
| *p_input_size, read_size))); |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| return tuple_and_aggs; |
| } |
| |
| /* Function: agg_hash_stream |
| * |
| * Call agg_hash_initial_pass (again) to load more input tuples |
| * into the hash table. Used only for streaming lower phase |
| * of a multiphase hashed aggregation to avoid spilling to |
| * file. |
| * |
| * Return true, if all input tuples have been consumed, else |
| * return false (call me again). |
| */ |
| bool |
| agg_hash_stream(AggState *aggstate) |
| { |
| Assert( ((Agg *) aggstate->ss.ps.plan)->streaming ); |
| |
| elog(HHA_MSG_LVL, |
| "HashAgg: streaming"); |
| |
| reset_agg_hash_table(aggstate); |
| |
| return agg_hash_initial_pass(aggstate); |
| } |
| |
| /* |
| * Function: agg_hash_load |
| * |
| * Load spilled groups into the hash table. Similar to the initial |
| * pass, when the hash table does not have space for new groups, all |
| * groups in the hash table are spilled to overflown batch files. |
| */ |
| static bool |
| agg_hash_reload(AggState *aggstate) |
| { |
| HashAggTable *hashtable = aggstate->hhashtable; |
| MemTupleBinding *mt_bind = aggstate->hashslot->tts_mt_bind; |
| ExprContext *tmpcontext = aggstate->tmpcontext; /* per input tuple context */ |
| bool has_tuples = false; |
| SpillFile *spill_file = hashtable->curr_spill_file; |
| int reloaded_hash_bit; |
| |
| /* |
| * Record the start value for mem_for_metadata, since its value |
| * has already been accumulated into mem_wanted. Any more memory |
| * added to mem_for_metadata after this point will be added to |
| * mem_wanted at the end of loading hashtable. |
| */ |
| uint64 start_mem_for_metadata = hashtable->mem_for_metadata; |
| |
| Assert(spill_file != NULL && spill_file->parent_spill_set != NULL); |
| |
| hashtable->is_spilling = false; |
| hashtable->num_reloads++; |
| hashtable->total_buckets += hashtable->nbuckets; |
| |
| reloaded_hash_bit = spill_file->batch_hash_bit + |
| (unsigned)ceil(log(spill_file->parent_spill_set->num_spill_files)/log(2)); |
| |
| |
| if (spill_file->file_info != NULL && |
| spill_file->file_info->wfile != NULL) |
| { |
| ExecWorkFile_Restart(spill_file->file_info->wfile); |
| hashtable->mem_for_metadata += FREEABLE_BATCHFILE_METADATA; |
| } |
| |
| while(true) |
| { |
| HashKey hashkey; |
| HashAggEntry *entry; |
| bool isNew = false; |
| int input_size = 0; |
| |
| void *input = readHashEntry(aggstate, spill_file->file_info, &hashkey, &input_size); |
| |
| if (input != NULL) |
| { |
| spill_file->file_info->ntuples--; |
| Assert(spill_file->parent_spill_set != NULL); |
| /* The following asserts the mapping between a hashkey bucket and the index in parent. |
| * This assertion does not hold for reloaded workfiles, since there the index in |
| * parent is different. */ |
| AssertImply(!aggstate->cached_workfiles_loaded, |
| (hashkey >> spill_file->batch_hash_bit) % |
| spill_file->parent_spill_set->num_spill_files == |
| spill_file->index_in_parent); |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_READTUPLE); |
| Gpmon_M_Add(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_READBYTE, input_size); |
| } |
| |
| else |
| { |
| /* Check we processed all tuples, only when not reading from disk */ |
| AssertImply(!aggstate->cached_workfiles_loaded, spill_file->file_info->ntuples == 0); |
| break; |
| } |
| |
| has_tuples = true; |
| |
| /* set up for advance_aggregates call */ |
| tmpcontext->ecxt_scantuple = aggstate->hashslot; |
| |
| entry = lookup_agg_hash_entry(aggstate, input, INPUT_RECORD_GROUP_AND_AGGS, input_size, |
| hashkey, reloaded_hash_bit, &isNew); |
| |
| if (entry == NULL) |
| { |
| Assert(!aggstate->cached_workfiles_loaded && "no re-spilling allowed when re-using cached workfiles"); |
| Assert(hashtable->curr_spill_file != NULL); |
| Assert(hashtable->curr_spill_file->parent_spill_set != NULL); |
| |
| if (GET_TOTAL_USED_SIZE(hashtable) > hashtable->mem_used) |
| hashtable->mem_used = GET_TOTAL_USED_SIZE(hashtable); |
| |
| if (hashtable->num_ht_groups <= 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)); |
| |
| /* CDB: Report statistics for EXPLAIN ANALYZE. */ |
| if (!hashtable->is_spilling && aggstate->ss.ps.instrument) |
| agg_hash_table_stat_upd(hashtable); |
| |
| elog(gp_workfile_caching_loglevel, "HashAgg: respill occurring in agg_hash_reload while loading batch data"); |
| |
| spill_hash_table(aggstate); |
| |
| entry = lookup_agg_hash_entry(aggstate, input, INPUT_RECORD_GROUP_AND_AGGS, input_size, |
| hashkey, reloaded_hash_bit, &isNew); |
| } |
| |
| if (!isNew) |
| { |
| int aggno; |
| AggStatePerGroup input_pergroupstate = (AggStatePerGroup) |
| ((char *)input + MAXALIGN(memtuple_get_size((MemTuple) input, mt_bind))); |
| |
| setGroupAggs(hashtable, mt_bind, entry); |
| |
| adjustInputGroup(aggstate, input, mt_bind); |
| |
| /* Advance the aggregates for the group by applying preliminary function. */ |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; |
| AggStatePerGroup pergroupstate = &hashtable->groupaggs->aggs[aggno]; |
| FunctionCallInfoData fcinfo; |
| |
| /* Set the input aggregate values */ |
| fcinfo.arg[1] = input_pergroupstate[aggno].transValue; |
| fcinfo.argnull[1] = input_pergroupstate[aggno].transValueIsNull; |
| |
| pergroupstate->transValue = |
| invoke_agg_trans_func(&(peraggstate->prelimfn), |
| peraggstate->prelimfn.fn_nargs - 1, |
| pergroupstate->transValue, |
| &(pergroupstate->noTransValue), |
| &(pergroupstate->transValueIsNull), |
| peraggstate->transtypeByVal, |
| peraggstate->transtypeLen, |
| &fcinfo, (void *)aggstate, |
| aggstate->tmpcontext->ecxt_per_tuple_memory, |
| &(aggstate->mem_manager)); |
| Assert(peraggstate->transtypeByVal || |
| (pergroupstate->transValueIsNull || |
| PointerIsValid(DatumGetPointer(pergroupstate->transValue)))); |
| |
| } |
| } |
| |
| /* Reset per-input-tuple context after each tuple */ |
| ResetExprContext(tmpcontext); |
| } |
| |
| CheckSendPlanStateGpmonPkt(&aggstate->ss.ps); |
| |
| if (hashtable->is_spilling) |
| { |
| int freed_size = 0; |
| |
| /* |
| * Split out the rest of groups in the hashtable if spilling has already |
| * happened. This is because none of these groups can be immediately outputted |
| * any more. |
| */ |
| spill_hash_table(aggstate); |
| freed_size = suspendSpillFiles(hashtable->curr_spill_file->spill_set); |
| hashtable->mem_for_metadata -= freed_size; |
| elog(gp_workfile_caching_loglevel, "loaded hashtable from file %s and then respilled. we should delete file from work_set now", |
| hashtable->curr_spill_file->file_info->wfile->fileName); |
| hashtable->curr_spill_file->respilled = true; |
| } |
| |
| else |
| { |
| /* |
| * Update the workmenwanted value when the hashtable is not spilling. |
| * At this point, we know that groups in this hashtable will not appear |
| * at later time. |
| */ |
| Assert(hashtable->mem_for_metadata >= start_mem_for_metadata); |
| hashtable->mem_wanted += |
| ((hashtable->mem_for_metadata - start_mem_for_metadata) + |
| GET_BUFFER_SIZE(hashtable)); |
| } |
| |
| /* CDB: Report statistics for EXPLAIN ANALYZE. */ |
| if (!hashtable->is_spilling && aggstate->ss.ps.instrument) |
| agg_hash_table_stat_upd(hashtable); |
| |
| return has_tuples; |
| } |
| |
| /* |
| * Function: reCalcNumberBatches |
| * |
| * Recalculate the number of batches based on the statistics we collected |
| * for a given spill file. This function limits the maximum number of |
| * batches to the default one -- gp_hashagg_default_nbatches. |
| * |
| * Note that we may over-estimate the number of batches, but it is still |
| * better than under-estimate it. |
| */ |
| static void |
| reCalcNumberBatches(HashAggTable *hashtable, SpillFile *spill_file) |
| { |
| unsigned nbatches; |
| double metadata_size; |
| uint64 total_bytes; |
| |
| Assert(spill_file->file_info != NULL); |
| Assert(hashtable->max_mem > hashtable->mem_for_metadata); |
| |
| total_bytes = spill_file->file_info->total_bytes + |
| spill_file->file_info->ntuples * sizeof(HashAggEntry); |
| |
| nbatches = |
| (total_bytes - 1) / |
| (hashtable->max_mem - hashtable->mem_for_metadata) + 1; |
| |
| /* Also need to deduct the batch file buffer size */ |
| metadata_size = hashtable->mem_for_metadata + |
| nbatches * BATCHFILE_METADATA; |
| |
| if (metadata_size < hashtable->max_mem) |
| nbatches = (total_bytes - 1) / |
| (hashtable->max_mem - metadata_size) + 1; |
| else |
| nbatches = 4; |
| |
| /* We never respill to a single batch file. */ |
| if (nbatches == 1) |
| nbatches = 4; |
| |
| /* Set the number batches to the power of 2 */ |
| nbatches = (((unsigned)1) << (unsigned)ceil(log(nbatches) / log(2))); |
| |
| /* |
| * We limit the number of batches to the default one. |
| */ |
| if (nbatches > gp_hashagg_default_nbatches) |
| nbatches = gp_hashagg_default_nbatches; |
| |
| if (hashtable->mem_for_metadata + |
| nbatches * BATCHFILE_METADATA > hashtable->max_mem) |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)); |
| |
| hashtable->hats.nbatches = nbatches; |
| } |
| |
| /* |
| * Fucntion: agg_hash_next_pass |
| * |
| * This function identifies a batch file to be processed next. |
| * |
| * When there are no batch files left, this function returns false. |
| */ |
| bool |
| agg_hash_next_pass(AggState *aggstate) |
| { |
| HashAggTable *hashtable = aggstate->hhashtable; |
| SpillSet *spill_set; |
| int file_no; |
| bool more = false; |
| |
| Assert( !((Agg *) aggstate->ss.ps.plan)->streaming ); |
| |
| if (hashtable->spill_set == NULL) |
| return false; |
| |
| reset_agg_hash_table(aggstate); |
| |
| elog(HHA_MSG_LVL, "HashAgg: outputed " INT64_FORMAT " groups.", hashtable->num_output_groups); |
| |
| if (hashtable->curr_spill_file == NULL) |
| { |
| spill_set = hashtable->spill_set; |
| file_no = 0; |
| } |
| else if (hashtable->curr_spill_file->spill_set != NULL) |
| { |
| spill_set = hashtable->curr_spill_file->spill_set; |
| file_no = 0; |
| } |
| else |
| { |
| spill_set = hashtable->curr_spill_file->parent_spill_set; |
| file_no = hashtable->curr_spill_file->index_in_parent; |
| |
| /* |
| * Close the current spill file since it is finished, and its buffer space |
| * can be freed to use. |
| */ |
| hashtable->mem_for_metadata -= closeSpillFile(aggstate, spill_set, file_no); |
| file_no++; |
| } |
| |
| /* Find the next SpillSet */ |
| while (spill_set != NULL) |
| { |
| SpillFile *parent_spillfile; |
| int freespace = 0; |
| |
| while (file_no < spill_set->num_spill_files) |
| { |
| if (spill_set->spill_files[file_no].file_info == NULL) |
| { |
| /* Gap in spill_files array, skip it */ |
| file_no++; |
| continue; |
| } |
| |
| Assert(spill_set->spill_files[file_no].file_info != NULL); |
| if (spill_set->spill_files[file_no].file_info->ntuples == 0) |
| { |
| /* Batch file with no tuples in it, close it and skip it */ |
| Assert(spill_set->spill_files[file_no].file_info->total_bytes == 0); |
| elog(HHA_MSG_LVL, "Skipping and closing empty batch file HashAgg_Slice%d_Batch_l%d_f%d", |
| currentSliceId, |
| spill_set->level, file_no); |
| /* |
| * Close this spill file since it is empty, and its buffer space |
| * can be freed to use. |
| */ |
| hashtable->mem_for_metadata -= closeSpillFile(aggstate, spill_set, file_no); |
| |
| file_no++; |
| continue; |
| } |
| |
| /* Valid spill file, we're done */ |
| break; |
| } |
| |
| if (file_no < spill_set->num_spill_files) |
| break; |
| |
| /* If this spill set is the root, break the loop. */ |
| if (spill_set->parent_spill_file == NULL) |
| { |
| freespace = freeSpillSet(spill_set); |
| hashtable->mem_for_metadata -= freespace; |
| spill_set = NULL; |
| break; |
| } |
| |
| parent_spillfile = spill_set->parent_spill_file; |
| spill_set = spill_set->parent_spill_file->parent_spill_set; |
| |
| freespace += freeSpillSet(parent_spillfile->spill_set); |
| hashtable->mem_for_metadata -= freespace; |
| parent_spillfile->spill_set = NULL; |
| |
| file_no = parent_spillfile->index_in_parent + 1; |
| |
| /* Close parent_spillfile */ |
| hashtable->mem_for_metadata -= closeSpillFile(aggstate, spill_set, |
| parent_spillfile->index_in_parent); |
| } |
| |
| if (spill_set != NULL) |
| { |
| Assert(file_no < spill_set->num_spill_files); |
| |
| hashtable->curr_spill_file = &(spill_set->spill_files[file_no]); |
| |
| /* |
| * Reset the number of batches if respilling is required. |
| * Note that we may over-estimate the number of batches, but it is still |
| * better than under-estimate it. |
| */ |
| reCalcNumberBatches(hashtable, hashtable->curr_spill_file); |
| |
| elog(HHA_MSG_LVL, "HashAgg: processing %d level batch file %d", |
| spill_set->level, file_no); |
| |
| more = agg_hash_reload(aggstate); |
| } |
| else |
| { |
| hashtable->curr_spill_file = NULL; |
| hashtable->spill_set = NULL; |
| } |
| |
| /* Report statistics for EXPLAIN ANALYZE. */ |
| if (!more && aggstate->ss.ps.instrument) |
| { |
| Instrumentation *instr = aggstate->ss.ps.instrument; |
| StringInfo hbuf = aggstate->ss.ps.cdbexplainbuf; |
| |
| instr->workmemwanted = Max(instr->workmemwanted, hashtable->mem_wanted); |
| instr->workmemused = hashtable->mem_used; |
| |
| appendStringInfo(hbuf, |
| INT64_FORMAT " groups total in %d batches", |
| hashtable->num_output_groups, |
| hashtable->num_batches); |
| |
| if (!aggstate->cached_workfiles_loaded) |
| { |
| appendStringInfo(hbuf, |
| "; %d overflows" |
| "; " INT64_FORMAT " spill groups", |
| hashtable->num_overflows, |
| hashtable->num_spill_groups); |
| } |
| |
| appendStringInfo(hbuf, ".\n"); |
| |
| /* Hash chain statistics */ |
| if (hashtable->chainlength.vcnt > 0) |
| appendStringInfo(hbuf, |
| "Hash chain length %.1f avg, %.0f max," |
| " using %d of " INT64_FORMAT " buckets.\n", |
| cdbexplain_agg_avg(&hashtable->chainlength), |
| hashtable->chainlength.vmax, |
| hashtable->chainlength.vcnt, |
| hashtable->total_buckets); |
| } |
| |
| |
| return more; |
| } |
| |
| /* Function: reset_agg_hash_table |
| * |
| * Clear the hash table content anchored by the bucket array. |
| */ |
| void reset_agg_hash_table(AggState *aggstate) |
| { |
| HashAggTable *hashtable = aggstate->hhashtable; |
| |
| elog(HHA_MSG_LVL, |
| "HashAgg: resetting " INT64_FORMAT "-entry hash table", |
| hashtable->num_ht_groups); |
| |
| MemSet(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashAggEntry*)); |
| MemSet(hashtable->bloom, 0, hashtable->nbuckets * sizeof(uint64)); |
| hashtable->num_ht_groups = 0; |
| |
| CdbCellBuf_Reset(&(hashtable->entry_buf)); |
| mpool_reset(hashtable->group_buf); |
| |
| init_agg_hash_iter(hashtable); |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_AGG_SPILLPASS); |
| /* Reset current pass read tuples much be before currentpass spill tuples */ |
| Gpmon_M_Reset(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_READTUPLE); |
| Gpmon_M_Reset(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_READBYTE); |
| Gpmon_M_Reset(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_TUPLE); |
| Gpmon_M_Reset(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_BYTE); |
| Gpmon_M_Reset(GpmonPktFromAggState(aggstate), GPMON_AGG_CURRSPILLPASS_BATCH); |
| |
| CheckSendPlanStateGpmonPkt(&aggstate->ss.ps); |
| } |
| |
| /* Function: destroy_agg_hash_table |
| * |
| * Give back the resources anchored by the hash table. Ok to call, even |
| * if the hash table isn't set up. |
| */ |
| void destroy_agg_hash_table(AggState *aggstate) |
| { |
| Agg *agg = (Agg*)aggstate->ss.ps.plan; |
| |
| if ( agg->aggstrategy == AGG_HASHED && aggstate->hhashtable != NULL ) |
| { |
| elog(HHA_MSG_LVL, |
| "HashAgg: destroying hash table -- ngroup=" INT64_FORMAT " ntuple=" INT64_FORMAT, |
| aggstate->hhashtable->num_ht_groups, |
| aggstate->hhashtable->num_tuples); |
| |
| reset_agg_hash_table(aggstate); |
| |
| /* destroy_batches(aggstate->hhashtable); */ |
| pfree(aggstate->hhashtable->buckets); |
| pfree(aggstate->hhashtable->bloom); |
| if (aggstate->hhashtable->hashkey_buf) |
| pfree(aggstate->hhashtable->hashkey_buf); |
| |
| closeSpillFiles(aggstate, aggstate->hhashtable->spill_set); |
| |
| if (NULL != aggstate->hhashtable->work_set) |
| { |
| agg_hash_close_state_file(aggstate->hhashtable); |
| workfile_mgr_close_set(aggstate->hhashtable->work_set); |
| } |
| |
| agg_hash_reset_workfile_state(aggstate); |
| |
| mpool_delete(aggstate->hhashtable->group_buf); |
| |
| pfree(aggstate->hhashtable); |
| aggstate->hhashtable = NULL; |
| } |
| } |
| |
| /* |
| * Reset workfile caching state |
| */ |
| void |
| agg_hash_reset_workfile_state(AggState *aggstate) |
| { |
| aggstate->workfiles_created = false; |
| aggstate->cached_workfiles_found = false; |
| aggstate->cached_workfiles_loaded = false; |
| } |
| |
| void |
| agg_hash_mark_spillset_complete(AggState *aggstate) |
| { |
| |
| Assert(aggstate != NULL); |
| Assert(aggstate->hhashtable != NULL); |
| Assert(aggstate->hhashtable->work_set != NULL); |
| |
| workfile_set *work_set = aggstate->hhashtable->work_set; |
| bool workset_metadata_too_big = work_set->metadata.num_leaf_files > NO_RESERVED_BATCHFILE_METADATA; |
| |
| if (workset_metadata_too_big) |
| { |
| work_set->can_be_reused = false; |
| elog(gp_workfile_caching_loglevel, "HashAgg: spill set contains too many files: %d. Not caching", |
| work_set->metadata.num_leaf_files); |
| } |
| |
| workfile_mgr_mark_complete(work_set); |
| |
| } |
| |
| /* |
| * Save a spill file information to the state file. |
| * Format is: [name_length|name|hash_bit]. |
| * The same format must be expected in agg_hash_load_spillfile_info |
| */ |
| static void |
| agg_hash_save_spillfile_info(ExecWorkFile *state_file, SpillFile *spill_file) |
| { |
| |
| if (WorkfileDiskspace_IsFull()) |
| { |
| /* |
| * We exceeded the amount of diskspace for spilling. Don't try to |
| * write anything anymore, as we're in the cleanup stage. |
| */ |
| return; |
| } |
| |
| agg_hash_write_string(state_file, |
| spill_file->file_info->wfile->fileName, |
| strlen(spill_file->file_info->wfile->fileName)); |
| ExecWorkFile_Write(state_file, |
| (char *) &spill_file->batch_hash_bit, |
| sizeof(spill_file->batch_hash_bit)); |
| } |
| |
| /* |
| * Load a spill file information to the state file. |
| * Format is: [name_length|name|hash_bit]. |
| * The same format must be written in agg_hash_save_spillfile_info |
| * Sets spill_file_name to point to the read file name, which is palloc-ed in |
| * the current context. |
| * Return FALSE at EOF, TRUE otherwise. |
| */ |
| static bool |
| agg_hash_load_spillfile_info(ExecWorkFile *state_file, char **spill_file_name, unsigned *batch_hash_bit) |
| { |
| *spill_file_name = agg_hash_read_string(state_file); |
| if (*spill_file_name == NULL) |
| { |
| /* EOF. No more file names to read. */ |
| return false; |
| } |
| unsigned read_hash_bit; |
| #ifdef USE_ASSERT_CHECKING |
| int res = |
| #endif |
| ExecWorkFile_Read(state_file, (char *) &read_hash_bit, sizeof(read_hash_bit)); |
| |
| Assert(res == sizeof(read_hash_bit)); |
| |
| *batch_hash_bit = read_hash_bit; |
| return true; |
| } |
| |
| /* Writing string to a bfz file |
| * Format: [length|data] |
| * This must be the same format used in agg_hash_read_string_bfz |
| */ |
| static void |
| agg_hash_write_string(ExecWorkFile *ewf, const char *str, size_t len) |
| { |
| Assert(ewf != NULL); |
| ExecWorkFile_Write(ewf, (char *) &len, sizeof(len)); |
| |
| /* Terminating null character is not written to disk */ |
| ExecWorkFile_Write(ewf, (char *) str, len); |
| } |
| |
| /* Reading a string from a bfz file |
| * Format: [length|string] |
| * This must be the same format used in agg_hash_write_string_bfz |
| * Returns the palloc-ed string in the current context, NULL if error occurs. |
| */ |
| static char * |
| agg_hash_read_string(ExecWorkFile *ewf) |
| { |
| Assert(ewf != NULL); |
| size_t slen = 0; |
| |
| int res = ExecWorkFile_Read(ewf, (char *) &slen, sizeof(slen)); |
| if (res != sizeof(slen)) |
| { |
| return NULL; |
| } |
| |
| char *read_string = palloc(slen+1); |
| res = ExecWorkFile_Read(ewf, read_string, slen); |
| if (res < slen) |
| { |
| pfree(read_string); |
| return NULL; |
| } |
| |
| read_string[slen]='\0'; |
| return read_string; |
| } |
| |
| /* EOF */ |