blob: e4eb79171b70d530af262ab27962f6241acb3fc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* nodeAgg.c
* Routines to handle aggregate nodes.
*
* ExecAgg evaluates each aggregate in the following steps:
*
* transvalue = initcond
* foreach input_tuple do
* transvalue = transfunc(transvalue, input_value(s))
* result = finalfunc(transvalue)
*
* If a finalfunc is not supplied then the result is just the ending
* value of transvalue.
*
* If transfunc is marked "strict" in pg_proc and initcond is NULL,
* then the first non-NULL input_value is assigned directly to transvalue,
* and transfunc isn't applied until the second non-NULL input_value.
* The agg's first input type and transtype must be the same in this case!
*
* If transfunc is marked "strict" then NULL input_values are skipped,
* keeping the previous transvalue. If transfunc is not strict then it
* is called for every input tuple and must deal with NULL initcond
* or NULL input_values for itself.
*
* If finalfunc is marked "strict" then it is not called when the
* ending transvalue is NULL, instead a NULL result is created
* automatically (this is just the usual handling of strict functions,
* of course). A non-strict finalfunc can make its own choice of
* what to return for a NULL ending transvalue.
*
* We compute aggregate input expressions and run the transition functions
* in a temporary econtext (aggstate->tmpcontext). This is reset at
* least once per input tuple, so when the transvalue datatype is
* pass-by-reference, we have to be careful to copy it into a longer-lived
* memory context, and free the prior value to avoid memory leakage.
*
* Postgres stores transvalues in the memory context aggstate->aggcontext,
* which is also used for the hashtable structures in AGG_HASHED mode.
* MPP (in order to support hybrid hash aggregation) stores hash table
* entries and associated transition values in aggstate->aggcontext.
*
* The node's regular econtext (aggstate->ss.ps.ps_ExprContext)
* is used to run finalize functions and compute the output tuple;
* this context can be reset once per output tuple.
*
* Beginning in PostgreSQL 8.1, the executor's AggState node is passed as
* the fmgr "context" value in all transfunc and finalfunc calls. It is
* not really intended that the transition functions will look into the
* AggState node, but they can use code like
* if (fcinfo->context && IsA(fcinfo->context, AggState))
* to verify that they are being called by nodeAgg.c and not as ordinary
* SQL functions. The main reason a transition function might want to know
* that is that it can avoid palloc'ing a fixed-size pass-by-ref transition
* value on every call: it can instead just scribble on and return its left
* input. Ordinarily it is completely forbidden for functions to modify
* pass-by-ref inputs, but in the aggregate case we know the left input is
* either the initial transition value or a previous function result, and
* in either case its value need not be preserved. See int8inc() for an
* example. Notice that advance_transition_function() is coded to avoid a
* data copy step when the previous transition value pointer is returned.
*
* In Greenplum 4.2.2, we add PercentileExpr support along with Aggref.
* It is used to implement inverse distribution function support, namely
* percentile_cont, percentile_disc and median. The semantics for them
* is almost same as Aggref, where the aggregate process is handled by
* an individual function and the expression node only returns a pre-computed
* result. PercentileExpr is used in Agg node because we cannot change
* the catalog in this release, and it may be removed and integrated to
* standard Aggref itself.
*
* Portions Copyright (c) 2007-2008, Greenplum inc
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.146.2.2 2007/08/08 18:07:03 neilc Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "catalog/catquery.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/execHHashagg.h"
#include "executor/nodeAgg.h"
#include "lib/stringinfo.h" /* StringInfo */
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
#include "utils/datum.h"
#include "cdb/cdbexplain.h"
#include "cdb/cdbvars.h" /* mpp_hybrid_hash_agg */
#define IS_HASHAGG(aggstate) (((Agg *) (aggstate)->ss.ps.plan)->aggstrategy == AGG_HASHED)
/*
* AggStatePerAggData -- per-aggregate working state
* AggStatePerGroupData - per-aggregate-per-group working state
*
* Definition moved to nodeAgg.c to provide visibility to execHHashagg.c
*/
/*
* To implement hashed aggregation, we need a hashtable that stores a
* representative tuple and an array of AggStatePerGroup structs for each
* distinct set of GROUP BY column values. We compute the hash key from
* the GROUP BY columns.
*/
typedef struct AggHashEntryData *AggHashEntry;
typedef struct AggHashEntryData
{
TupleHashEntryData shared; /* common header for hash table entries */
/* per-aggregate transition status array - must be last! */
AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */
} AggHashEntryData; /* VARIABLE LENGTH STRUCT */
static void advance_transition_function(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
FunctionCallInfoData *fcinfo,
MemoryManagerContainer *mem_manager);
static void process_ordered_aggregate_single(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate);
static void process_ordered_aggregate_multi(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate);
static void finalize_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull);
static void finalize_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
static void ExecAggExplainEnd(PlanState *planstate, struct StringInfoData *buf);
static int count_extra_agg_slots(Node *node);
static bool count_extra_agg_slots_walker(Node *node, int *count);
static TupleTableSlot *computeTupleWithFinalAggregate(AggState *aggstate,
TupleTableSlot *firstSlot);
Datum
datumCopyWithMemManager(Datum oldvalue, Datum value, bool typByVal, int typLen,
MemoryManagerContainer *mem_manager)
{
Datum res;
if (typByVal)
res = value;
else
{
Size realSize;
Size old_realSize = 0;
char *s;
if (DatumGetPointer(value) == NULL)
return PointerGetDatum(NULL);
if (DatumGetPointer(oldvalue) != NULL)
old_realSize = MAXALIGN(datumGetSize(oldvalue, typByVal, typLen));
realSize = datumGetSize(value, typByVal, typLen);
if (old_realSize == 0 || old_realSize < realSize)
{
int alloc_size = MAXALIGN(mem_manager->realloc_ratio * old_realSize);
if (alloc_size < realSize)
alloc_size = MAXALIGN(realSize);
if (mem_manager->free)
{
(*mem_manager->free)(mem_manager->manager, DatumGetPointer(oldvalue));
}
s = (char *) (*mem_manager->alloc)(mem_manager->manager, alloc_size);
}
else
s = (char *) DatumGetPointer(oldvalue);
memcpy(s, DatumGetPointer(value), realSize);
res = PointerGetDatum(s);
}
return res;
}
/*
* Initialize all aggregates for a new group of input values.
*
* When called, CurrentMemoryContext should be the per-query context.
*
* Note that the memory allocation is done through provided memory manager.
*/
void
initialize_aggregates(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroup,
MemoryManagerContainer *mem_manager)
{
int aggno;
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
/*
* Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
*/
if (peraggstate->numSortCols > 0)
{
/*
* In case of rescan, maybe there could be an uncompleted sort
* operation? Clean it up if so.
*/
if(gp_enable_mk_sort)
{
if (peraggstate->sortstate)
tuplesort_end_mk((Tuplesortstate_mk *) peraggstate->sortstate);
/*
* We use a plain Datum sorter when there's a single input column;
* otherwise sort the full tuple. (See comments for
* process_ordered_aggregate_single.)
*/
peraggstate->sortstate =
(peraggstate->numInputs == 1) ?
tuplesort_begin_datum_mk(& aggstate->ss,
peraggstate->evaldesc->attrs[0]->atttypid,
peraggstate->sortOperators[0],
PlanStateOperatorMemKB((PlanState *) aggstate), false) :
tuplesort_begin_heap_mk(& aggstate->ss,
peraggstate->evaldesc,
peraggstate->numSortCols,
peraggstate->sortOperators,
peraggstate->sortColIdx,
PlanStateOperatorMemKB((PlanState *) aggstate), false);
/*
* CDB: If EXPLAIN ANALYZE, let all of our tuplesort operations
* share our Instrumentation object and message buffer.
*/
if (aggstate->ss.ps.instrument)
tuplesort_set_instrument_mk((Tuplesortstate_mk *) peraggstate->sortstate,
aggstate->ss.ps.instrument,
aggstate->ss.ps.cdbexplainbuf);
}
else /* gp_enable_mk_sort is off */
{
if (peraggstate->sortstate)
tuplesort_end((Tuplesortstate *) peraggstate->sortstate);
/*
* We use a plain Datum sorter when there's a single input column;
* otherwise sort the full tuple. (See comments for
* process_ordered_aggregate_single.)
*/
peraggstate->sortstate =
(peraggstate->numInputs == 1) ?
tuplesort_begin_datum(peraggstate->evaldesc->attrs[0]->atttypid,
peraggstate->sortOperators[0],
PlanStateOperatorMemKB((PlanState *) aggstate), false) :
tuplesort_begin_heap(peraggstate->evaldesc,
peraggstate->numSortCols,
peraggstate->sortOperators,
peraggstate->sortColIdx,
PlanStateOperatorMemKB((PlanState *) aggstate), false);
/*
* CDB: If EXPLAIN ANALYZE, let all of our tuplesort operations
* share our Instrumentation object and message buffer.
*/
if (aggstate->ss.ps.instrument)
tuplesort_set_instrument((Tuplesortstate *) peraggstate->sortstate,
aggstate->ss.ps.instrument,
aggstate->ss.ps.cdbexplainbuf);
}
/* CDB: Set enhanced sort options. */
{
int64 limit = 0;
int64 offset = 0;
int unique = peragg->aggref->aggdistinct &&
( gp_enable_sort_distinct ? 1 : 0) ;
int sort_flags = gp_sort_flags; /* get the guc */
int maxdistinct = gp_sort_max_distinct; /* get guc */
if(gp_enable_mk_sort)
cdb_tuplesort_init_mk((Tuplesortstate_mk *) peraggstate->sortstate,
offset, limit, unique,
sort_flags, maxdistinct);
else
cdb_tuplesort_init((Tuplesortstate *) peraggstate->sortstate,
offset, limit, unique,
sort_flags, maxdistinct);
}
}
/*
* (Re)set transValue to the initial value.
*
* Note that when the initial value is pass-by-ref, we must copy it
* (into the aggcontext) since we will pfree the transValue later.
*/
if (peraggstate->initValueIsNull)
{
pergroupstate->transValue = peraggstate->initValue;
}
else
{
pergroupstate->transValue =
datumCopyWithMemManager(0,
peraggstate->initValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen,
mem_manager);
}
pergroupstate->transValueIsNull = peraggstate->initValueIsNull;
/*
* If the initial value for the transition state doesn't exist in the
* pg_aggregate table then we will let the first non-NULL value
* returned from the outer procNode become the initial value. (This is
* useful for aggregates like max() and min().) The noTransValue flag
* signals that we still need to do this.
*/
pergroupstate->noTransValue = peraggstate->initValueIsNull;
}
}
/*
* Given new input value(s), advance the transition function of an aggregate.
*
* The new values (and null flags) have been preloaded into argument positions
* 1 and up in fcinfo, so that we needn't copy them again to pass to the
* transition function. No other fields of fcinfo are assumed valid.
*
* It doesn't matter which memory context this is called in.
*/
static void
advance_transition_function(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
FunctionCallInfoData *fcinfo,
MemoryManagerContainer *mem_manager)
{
pergroupstate->transValue =
invoke_agg_trans_func(&(peraggstate->transfn),
peraggstate->numArguments,
pergroupstate->transValue,
&(pergroupstate->noTransValue),
&(pergroupstate->transValueIsNull),
peraggstate->transtypeByVal,
peraggstate->transtypeLen,
fcinfo, (void *)aggstate,
aggstate->tmpcontext->ecxt_per_tuple_memory,
mem_manager);
}
Datum
invoke_agg_trans_func(FmgrInfo *transfn, int numargs, Datum transValue,
bool *noTransvalue, bool *transValueIsNull,
bool transtypeByVal, int16 transtypeLen,
FunctionCallInfoData *fcinfo, void *funcctx,
MemoryContext tuplecontext,
MemoryManagerContainer *mem_manager)
{
MemoryContext oldContext;
Datum newVal;
int i;
if (transfn->fn_strict)
{
/*
* For a strict transfn, nothing happens when there's a NULL input; we
* just keep the prior transValue.
*/
for (i = 1; i <= numargs; i++)
{
if (fcinfo->argnull[i])
return transValue;
}
if (*noTransvalue)
{
/*
* transValue has not been initialized. This is the first non-NULL
* input value. We use it as the initial value for transValue. (We
* already checked that the agg's input type is binary-compatible
* with its transtype, so straight copy here is OK.)
*
* We must copy the datum into aggcontext if it is pass-by-ref.
* We do not need to pfree the old transValue, since it's NULL.
*/
newVal = datumCopyWithMemManager(transValue, fcinfo->arg[1], transtypeByVal,
transtypeLen, mem_manager);
*transValueIsNull = false;
*noTransvalue = false;
fcinfo->isnull = false;
return newVal;
}
if (*transValueIsNull)
{
/*
* Don't call a strict function with NULL inputs. Note it is
* possible to get here despite the above tests, if the transfn is
* strict *and* returned a NULL on a prior cycle. If that happens
* we will propagate the NULL all the way to the end.
*/
fcinfo->isnull = true;
return transValue;
}
}
/* We run the transition functions in per-input-tuple memory context */
oldContext = MemoryContextSwitchTo(tuplecontext);
/*
* OK to call the transition function
*/
InitFunctionCallInfoData(*fcinfo, transfn,
numargs + 1,
(void *) funcctx, NULL);
fcinfo->arg[0] = transValue;
fcinfo->argnull[0] = *transValueIsNull;
newVal = FunctionCallInvoke(fcinfo);
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything.
*/
if (!transtypeByVal &&
DatumGetPointer(newVal) != DatumGetPointer(transValue))
{
if (!fcinfo->isnull)
{
newVal = datumCopyWithMemManager(transValue, newVal, transtypeByVal,
transtypeLen, mem_manager);
}
}
*transValueIsNull = fcinfo->isnull;
if (!fcinfo->isnull)
*noTransvalue = false;
MemoryContextSwitchTo(oldContext);
return newVal;
}
/*
* Advance all the aggregates for one input tuple. The input tuple
* has been stored in tmpcontext->ecxt_scantuple, so that it is accessible
* to ExecEvalExpr. pergroup is the array of per-group structs to use
* (this might be in a hashtable entry).
*
* When called, CurrentMemoryContext should be the per-query context.
*/
void
advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup,
MemoryManagerContainer *mem_manager)
{
int aggno;
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
Datum value;
bool isnull;
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
Aggref *aggref = peraggstate->aggref;
PercentileExpr *perc = peraggstate->perc;
int i;
TupleTableSlot *slot;
int nargs;
if (aggref)
nargs = list_length(aggref->args);
else
{
Assert (perc);
nargs = list_length(perc->args);
}
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(peraggstate->evalproj, NULL);
slot_getallattrs(slot);
if (peraggstate->numSortCols > 0)
{
/* DISTINCT and/or ORDER BY case */
Assert(slot->PRIVATE_tts_nvalid == peraggstate->numInputs);
Assert(!perc);
/*
* If the transfn is strict, we want to check for nullity before
* storing the row in the sorter, to save space if there are a lot
* of nulls. Note that we must only check numArguments columns,
* not numInputs, since nullity in columns used only for sorting
* is not relevant here.
*/
if (peraggstate->transfn.fn_strict)
{
for (i = 0; i < nargs; i++)
{
value = slot_getattr(slot, i+1, &isnull);
if (isnull)
break; /* arg loop */
}
if (i < nargs)
continue; /* aggno loop */
}
/* OK, put the tuple into the tuplesort object */
if (peraggstate->numInputs == 1)
{
value = slot_getattr(slot, 1, &isnull);
if (gp_enable_mk_sort)
tuplesort_putdatum_mk((Tuplesortstate_mk*) peraggstate->sortstate,
value,
isnull);
else
tuplesort_putdatum((Tuplesortstate*) peraggstate->sortstate,
value,
isnull);
}
else
{
if (gp_enable_mk_sort)
tuplesort_puttupleslot_mk((Tuplesortstate_mk*) peraggstate->sortstate,
slot);
else
tuplesort_puttupleslot((Tuplesortstate*) peraggstate->sortstate,
slot);
}
}
else
{
/* We can apply the transition function immediately */
FunctionCallInfoData fcinfo;
/* Load values into fcinfo */
/* Start from 1, since the 0th arg will be the transition value */
Assert(slot->PRIVATE_tts_nvalid >= nargs);
if (aggref)
{
for (i = 0; i < nargs; i++)
{
fcinfo.arg[i + 1] = slot_getattr(slot, i+1, &isnull);
fcinfo.argnull[i + 1] = isnull;
}
}
else
{
/*
* In case of percentile functions, put everything into
* fcinfo's argument since there should be the required
* attributes as arguments in the tuple.
*/
int natts;
Assert(perc);
natts = slot->tts_tupleDescriptor->natts;
for (i = 0; i < natts; i++)
{
fcinfo.arg[i + 1] = slot_getattr(slot, i + 1, &isnull);
fcinfo.argnull[i + 1] = isnull;
}
}
advance_transition_function(aggstate, peraggstate, pergroupstate,
&fcinfo, mem_manager);
}
} /* aggno loop */
}
/*
* Run the transition function for a DISTINCT or ORDER BY aggregate
* with only one input. This is called after we have completed
* entering all the input values into the sort object. We complete the
* sort, read out the values in sorted order, and run the transition
* function on each value (applying DISTINCT if appropriate).
*
* Note that the strictness of the transition function was checked when
* entering the values into the sort, so we don't check it again here;
* we just apply standard SQL DISTINCT logic.
*
* The one-input case is handled separately from the multi-input case
* for performance reasons: for single by-value inputs, such as the
* common case of count(distinct id), the tuplesort_getdatum code path
* is around 300% faster. (The speedup for by-reference types is less
* but still noticeable.)
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
process_ordered_aggregate_single(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate)
{
Datum oldVal = (Datum) 0;
bool oldIsNull = true;
bool haveOldVal = false;
MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
MemoryContext oldContext;
bool isDistinct = peraggstate->aggref->aggdistinct;
Datum *newVal;
bool *isNull;
FunctionCallInfoData fcinfo;
Assert(peraggstate->numInputs == 1);
if(gp_enable_mk_sort)
tuplesort_performsort_mk((Tuplesortstate_mk *) peraggstate->sortstate);
else
tuplesort_performsort((Tuplesortstate *) peraggstate->sortstate);
/* Load the column into argument 1 (arg 0 will be transition value) */
newVal = fcinfo.arg + 1;
isNull = fcinfo.argnull + 1;
/*
* Note: if input type is pass-by-ref, the datums returned by the sort are
* freshly palloc'd in the per-query context, so we must be careful to
* pfree them when they are no longer needed.
*/
while (
gp_enable_mk_sort ?
tuplesort_getdatum_mk((Tuplesortstate_mk *)peraggstate->sortstate, true, newVal, isNull)
:
tuplesort_getdatum((Tuplesortstate *)peraggstate->sortstate, true, newVal, isNull)
)
{
/*
* Clear and select the working context for evaluation of the equality
* function and transition function.
*/
MemoryContextReset(workcontext);
oldContext = MemoryContextSwitchTo(workcontext);
/*
* If DISTINCT mode, and not distinct from prior, skip it.
*/
if (isDistinct && *isNull )
{
/* per SQL, DISTINCT doesn't use nulls */
}
else if (isDistinct &&
haveOldVal &&
((oldIsNull && *isNull) ||
(!oldIsNull && !*isNull &&
DatumGetBool(FunctionCall2(&peraggstate->equalfn,
oldVal, *newVal)))))
{
/* equal to prior, so forget this one */
if (!peraggstate->inputtypeByVal && !*isNull)
pfree(DatumGetPointer(*newVal));
}
else
{
advance_transition_function(aggstate, peraggstate, pergroupstate,
&fcinfo, &(aggstate->mem_manager));
/* forget the old value, if any */
if (!oldIsNull && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
/* and remember the new one for subsequent equality checks */
oldVal = *newVal;
oldIsNull = *isNull;
haveOldVal = true;
}
MemoryContextSwitchTo(oldContext);
}
if (!oldIsNull && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
if(gp_enable_mk_sort)
tuplesort_end_mk((Tuplesortstate_mk *) peraggstate->sortstate);
else
tuplesort_end((Tuplesortstate *) peraggstate->sortstate);
peraggstate->sortstate = NULL;
}
/*
* Run the transition function for an ORDER BY aggregate with more than
* one input. In PG DISTINCT aggregates may also have multiple columns,
* but in GPDB, only ORDER BY aggregates do. This is called after we have
* completed entering all the input values into the sort object. We
* complete the sort, read out the values in sorted order, and run the
* transition function on each value.
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
process_ordered_aggregate_multi(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate)
{
MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
FunctionCallInfoData fcinfo;
TupleTableSlot *slot = peraggstate->evalslot;
int numArguments = peraggstate->numArguments;
int i;
if(gp_enable_mk_sort)
tuplesort_performsort_mk((Tuplesortstate_mk *) peraggstate->sortstate);
else
tuplesort_performsort((Tuplesortstate *) peraggstate->sortstate);
ExecClearTuple(slot);
PG_TRY();
{
while (
gp_enable_mk_sort ?
tuplesort_gettupleslot_mk((Tuplesortstate_mk *)peraggstate->sortstate, true, slot)
:
tuplesort_gettupleslot((Tuplesortstate *)peraggstate->sortstate, true, slot)
)
{
/*
* Extract the first numArguments as datums to pass to the transfn.
* (This will help execTuplesMatch too, so do it immediately.)
*/
slot_getsomeattrs(slot, numArguments);
/* Load values into fcinfo */
/* Start from 1, since the 0th arg will be the transition value */
for (i = 0; i < numArguments; i++)
{
fcinfo.arg[i + 1] = slot_get_values(slot)[i];
fcinfo.argnull[i + 1] = slot_get_isnull(slot)[i];
}
advance_transition_function(aggstate, peraggstate, pergroupstate,
&fcinfo, &(aggstate->mem_manager));
/* Reset context each time */
MemoryContextReset(workcontext);
ExecClearTuple(slot);
}
}
PG_CATCH();
{
/*
* The tuple is stored in a memory context that will be released during
* the error handling. If we don't clear it here we will attempt to
* clear it later after the memory context has been released which would
* be a memory context error.
*/
ExecClearTuple(slot);
/* Carry on with error handling. */
PG_RE_THROW();
}
PG_END_TRY();
if(gp_enable_mk_sort)
tuplesort_end_mk((Tuplesortstate_mk *) peraggstate->sortstate);
else
tuplesort_end((Tuplesortstate *) peraggstate->sortstate);
peraggstate->sortstate = NULL;
}
/*
* finalize_aggregates
* Compute the final value for all aggregate functions.
*/
static void
finalize_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
{
AggStatePerAgg peragg = aggstate->peragg;
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
Datum *aggvalues = econtext->ecxt_aggvalues;
bool *aggnulls = econtext->ecxt_aggnulls;
for (int aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
if ( peraggstate->numSortCols > 0 )
{
if ( peraggstate->numInputs == 1 )
{
process_ordered_aggregate_single(aggstate, peraggstate, pergroupstate);
}
else
{
process_ordered_aggregate_multi(aggstate, peraggstate, pergroupstate);
}
}
finalize_aggregate(aggstate, peraggstate, pergroupstate,
&aggvalues[aggno], &aggnulls[aggno]);
}
}
/*
* Compute the final value of one aggregate.
*
* The finalfunction will be run, and the result delivered, in the
* output-tuple context; caller's CurrentMemoryContext does not matter.
*/
static void
finalize_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull)
{
MemoryContext oldContext;
oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
if (OidIsValid(peraggstate->finalfn_oid))
{
FunctionCallInfoData fcinfo;
InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
(void *) aggstate, NULL);
fcinfo.arg[0] = pergroupstate->transValue;
fcinfo.argnull[0] = pergroupstate->transValueIsNull;
if (fcinfo.flinfo->fn_strict && pergroupstate->transValueIsNull)
{
/* don't call a strict function with NULL inputs */
*resultVal = (Datum) 0;
*resultIsNull = true;
}
else
{
*resultVal = FunctionCallInvoke(&fcinfo);
*resultIsNull = fcinfo.isnull;
}
}
else
{
*resultVal = pergroupstate->transValue;
*resultIsNull = pergroupstate->transValueIsNull;
}
/*
* If result is pass-by-ref, make sure it is in the right context.
*/
if (!peraggstate->resulttypeByVal && !*resultIsNull &&
!MemoryContextContainsGenericAllocation(CurrentMemoryContext,
DatumGetPointer(*resultVal)))
*resultVal = datumCopy(*resultVal,
peraggstate->resulttypeByVal,
peraggstate->resulttypeLen);
MemoryContextSwitchTo(oldContext);
}
/*
* find_unaggregated_cols
* Construct a bitmapset of the column numbers of un-aggregated Vars
* appearing in our targetlist and qual (HAVING clause)
*/
static Bitmapset *
find_unaggregated_cols(AggState *aggstate)
{
Agg *node = (Agg *) aggstate->ss.ps.plan;
Bitmapset *colnos;
colnos = NULL;
(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
&colnos);
(void) find_unaggregated_cols_walker((Node *) node->plan.qual,
&colnos);
return colnos;
}
static bool
find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
{
if (node == NULL)
return false;
if (IsA(node, Var))
{
Var *var = (Var *) node;
/* setrefs.c should have set the varno to 0 */
Assert(var->varno == 0);
Assert(var->varlevelsup == 0);
*colnos = bms_add_member(*colnos, var->varattno);
return false;
}
if (IsA(node, Aggref)) /* do not descend into aggregate exprs */
return false;
return expression_tree_walker(node, find_unaggregated_cols_walker,
(void *) colnos);
}
/*
* Create a list of the tuple columns that actually need to be stored
* in hashtable entries. The incoming tuples from the child plan node
* will contain grouping columns, other columns referenced in our
* targetlist and qual, columns used to compute the aggregate functions,
* and perhaps just junk columns we don't use at all. Only columns of the
* first two types need to be stored in the hashtable, and getting rid of
* the others can make the table entries significantly smaller. To avoid
* messing up Var numbering, we keep the same tuple descriptor for
* hashtable entries as the incoming tuples have, but set unwanted columns
* to NULL in the tuples that go into the table.
*
* To eliminate duplicates, we build a bitmapset of the needed columns,
* then convert it to an integer list (cheaper to scan at runtime).
* The list is in decreasing order so that the first entry is the largest;
* lookup_hash_entry depends on this to use slot_getsomeattrs correctly.
*
* Note: at present, searching the tlist/qual is not really necessary
* since the parser should disallow any unaggregated references to
* ungrouped columns. However, the search will be needed when we add
* support for SQL99 semantics that allow use of "functionally dependent"
* columns that haven't been explicitly grouped by.
*/
List *
get_agg_hash_collist(AggState *aggstate)
{
Agg *node = (Agg *) aggstate->ss.ps.plan;
Bitmapset *colnos;
List *collist;
int i;
/* Find Vars that will be needed in tlist and qual */
colnos = find_unaggregated_cols(aggstate);
/* Add in all the grouping columns */
for (i = 0; i < node->numCols; i++)
colnos = bms_add_member(colnos, node->grpColIdx[i]);
/* Convert to list, using lcons so largest element ends up first */
collist = NIL;
while ((i = bms_first_member(colnos)) >= 0)
collist = lcons_int(i, collist);
return collist;
}
/*
* Estimate per-hash-table-entry overhead for the planner.
*
* Note that the estimate does not include space for pass-by-reference
* transition data values, nor for the representative tuple of each group.
*/
Size
hash_agg_entry_size(int numAggs)
{
Size entrysize;
/* This must match build_hash_table */
entrysize = sizeof(AggHashEntryData) +
(numAggs - 1) *sizeof(AggStatePerGroupData);
entrysize = MAXALIGN(entrysize);
/* Account for hashtable overhead (assuming fill factor = 1) */
entrysize += 3 * sizeof(void *);
return entrysize;
}
/*
* ExecAgg -
*
* ExecAgg receives tuples from its outer subplan and aggregates over
* the appropriate attribute for each aggregate function use (Aggref
* node) appearing in the targetlist or qual of the node. The number
* of tuples to aggregate over depends on whether grouped or plain
* aggregation is selected. In grouped aggregation, we produce a result
* row for each group; in plain aggregation there's a single result row
* for the whole query. In either case, the value of each aggregate is
* stored in the expression context to be used when ExecProject evaluates
* the result tuple.
*
* XXX: Fix BTree code.
*
* Streaming bottom: forces end of passes when no tuple for underlying node.
*
* MPP-2614: Btree scan will return null tuple at end of scan. However,
* if one calls ExecProNode again on a btree scan, it will restart from
* beginning even though we did not call rescan. This is a bug on
* btree scan, but mask it off here for v3.1. Really should fix Btree
* code.
*/
TupleTableSlot *
ExecAgg(AggState *node)
{
if (node->agg_done)
{
ExecEagerFreeAgg(node);
return NULL;
}
if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
{
TupleTableSlot *tuple = NULL;
bool streaming = ((Agg *) node->ss.ps.plan)->streaming;
/*
* ExecAgg processing for hashed aggregation -- returns the
* next result tuple or NULL. When returning NULL also sets
* aggstate to prevent future calls.
*/
if (node->hhashtable == NULL)
{
bool tupremain;
node->hhashtable = create_agg_hash_table(node);
tupremain = agg_hash_initial_pass(node);
if ( streaming )
{
if ( tupremain )
node->hhashtable->state = HASHAGG_STREAMING;
else
node->hhashtable->state = HASHAGG_END_OF_PASSES;
}
else
node->hhashtable->state = HASHAGG_BETWEEN_PASSES;
}
/* On each call we either return a tuple corresponding to a hash
* entry (consuming the entry) or fall through to a state machine
* that tries to make additional hash entries available and continue
* the loop. (This may result in reaching the "exit" state and
* returning a NULL tuple).
*/
for (;;)
{
if (!node->hhashtable->is_spilling)
{
tuple = agg_retrieve_hash_table(node);
node->agg_done = false; /* Not done 'til batches used up. */
if (tuple != NULL)
return tuple;
}
switch (node->hhashtable->state)
{
case HASHAGG_BETWEEN_PASSES:
Assert(!streaming);
if (agg_hash_next_pass(node))
{
node->hhashtable->state = HASHAGG_BETWEEN_PASSES;
continue;
}
node->hhashtable->state = HASHAGG_END_OF_PASSES;
/*
* pass through. Be sure that the next case statment
* is HASHAGG_END_OF_PASSES.
*/
case HASHAGG_END_OF_PASSES:
node->agg_done = true;
if (gp_workfile_caching && node->workfiles_created)
{
/*
* HashAgg closes each spill file after it is done with
* them. Since we got here on the regular path, all
* files should be closed.
*/
Assert(node->hhashtable->work_set);
Assert(node->hhashtable->spill_set == NULL);
agg_hash_close_state_file(node->hhashtable);
agg_hash_mark_spillset_complete(node);
}
ExecEagerFreeAgg(node);
return NULL;
case HASHAGG_STREAMING:
Assert(streaming);
if ( !agg_hash_stream(node) )
node->hhashtable->state = HASHAGG_END_OF_PASSES;
continue;
case HASHAGG_BEFORE_FIRST_PASS:
default:
elog(ERROR,"hybrid hash aggregation sequencing error");
}
}
}
else
{
return agg_retrieve_direct(node);
}
}
/*
* agg_retrieve_scalar
* Compute the scalar aggregates.
*/
static TupleTableSlot *
agg_retrieve_scalar(AggState *aggstate)
{
AggStatePerAgg peragg = aggstate->peragg;
AggStatePerGroup pergroup = aggstate->pergroup ;
initialize_aggregates(aggstate, peragg, pergroup, &(aggstate->mem_manager));
/*
* We loop through input tuples, and compute the aggregates.
*/
while (!aggstate->agg_done)
{
ExprContext *tmpcontext = aggstate->tmpcontext;
/* Reset the per-input-tuple context */
ResetExprContext(tmpcontext);
PlanState *outerPlan = outerPlanState(aggstate);
TupleTableSlot *outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
{
aggstate->agg_done = true;
break;
}
Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
tmpcontext->ecxt_scantuple = outerslot;
advance_aggregates(aggstate, pergroup, &(aggstate->mem_manager));
}
finalize_aggregates(aggstate, pergroup);
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
Agg *node = (Agg*)aggstate->ss.ps.plan;
econtext->grouping = node->grouping;
econtext->group_id = node->rollupGSTimes;
/* Check the qual (HAVING clause). */
if (ExecQual(aggstate->ss.ps.qual, econtext, false))
{
Gpmon_M_Incr_Rows_Out(GpmonPktFromAggState(aggstate));
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
/*
* Form and return a projection tuple using the aggregate results
* and the representative input tuple.
*/
return ExecProject(aggstate->ss.ps.ps_ProjInfo, NULL);
}
return NULL;
}
/*
* copyFirstTupleInGroup
* Make a copy of a given tuple as the first tuple in a group.
*/
static void
copyFirstTupleInGroup(AggState *aggstate, TupleTableSlot *outerslot)
{
TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot;
slot_getallattrs(outerslot);
aggstate->grp_firstTuple = memtuple_form_to(firstSlot->tts_mt_bind,
slot_get_values(outerslot),
slot_get_isnull(outerslot),
NULL /* mtup */,
NULL /* destlen */,
false /* inline_toast */);
}
/*
* processFirstTupleInGroup
* Process the first input tuple in a group: initialize the working state
* for the new group, and accumulate the tuple into the aggregates.
*/
static void
processFirstTupleInGroup(AggState *aggstate)
{
Assert(aggstate->grp_firstTuple != NULL);
TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot;
ExecStoreMemTuple(aggstate->grp_firstTuple,
firstSlot,
true);
aggstate->grp_firstTuple = NULL;
/* Clear the per-output-tuple context for each group */
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
ResetExprContext(econtext);
MemoryContextResetAndDeleteChildren(aggstate->aggcontext);
/* Initialize working state for a new input tuple group */
AggStatePerAgg peragg = aggstate->peragg;
AggStatePerGroup pergroup = aggstate->pergroup;
initialize_aggregates(aggstate, peragg, pergroup, &(aggstate->mem_manager));
ExprContext *tmpcontext = aggstate->tmpcontext;
tmpcontext->ecxt_scantuple = firstSlot;
advance_aggregates(aggstate, pergroup, &(aggstate->mem_manager));
}
/*
* processNextTuple
* Process an input tuple. If this tuple belongs to the current group,
* its value is accumulated into the aggregates for the current group.
* Otherwise, this tuple indicates a new group starts. This function
* makes a copy for this tuple.
*
* This function returns true if a new group starts.
*/
static bool
processNextTuple(AggState *aggstate, TupleTableSlot *outerslot)
{
Assert(aggstate->grp_firstTuple == NULL);
TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot;
Assert(!TupIsNull(firstSlot));
Agg *node = (Agg*)aggstate->ss.ps.plan;
ExprContext *tmpcontext = aggstate->tmpcontext;
bool tuple_match = execTuplesMatch(firstSlot,
outerslot,
node->numCols - node->numNullCols,
node->grpColIdx,
aggstate->eqfunctions,
tmpcontext->ecxt_per_tuple_memory);
bool isNewGroup = false;
if (tuple_match)
{
tmpcontext->ecxt_scantuple = outerslot;
AggStatePerGroup pergroup = aggstate->pergroup;
advance_aggregates(aggstate, pergroup, &(aggstate->mem_manager));
}
else
{
copyFirstTupleInGroup(aggstate, outerslot);
isNewGroup = true;
}
return isNewGroup;
}
/*
* agg_retrieve_group
* Compute an aggreate for each group.
*
* This function assumes that input tuples arrive in groups. A new tuple
* with a different grouping key means the start of a new group.
*
* This function also computes Grouping(), Group_id() if they appear in
* the target list.
*/
static TupleTableSlot *
agg_retrieve_group(AggState *aggstate)
{
Assert(((Agg*)aggstate->ss.ps.plan)->aggstrategy == AGG_SORTED);
TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot;
PlanState *outerPlan = outerPlanState(aggstate);
TupleTableSlot *outerslot = NULL;
/* Read the first input tuple in a group if this is not done. */
if (TupIsNull(firstSlot) &&
aggstate->grp_firstTuple == NULL)
{
outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
{
aggstate->agg_done = true;
return NULL;
}
Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
copyFirstTupleInGroup(aggstate, outerslot);
}
/*
* We loop through input tuples. If a tuple has a different grouping key,
* a new group starts. At this point, we have seen all tuples for the
* previous group, and is able to compute and return the final aggregate
* value for this group.
*/
while (!aggstate->agg_done)
{
/*
* If grp_firstTuple is not null, it means that we have read the
* first tuple in a new group. Initialize the working state for
* the new group, and advance the aggregate state for this tuple.
*/
if (aggstate->grp_firstTuple != NULL)
{
processFirstTupleInGroup(aggstate);
}
Assert(!TupIsNull(firstSlot));
/*
* Read and process the next input tuple. If this tuple belongs to
* the same group, simply accumlate its value to the intermediate
* state, and go back to the beginning of the loop to process the
* next input tuple. Otherwise, a new group starts. We finalize
* the aggregate value for the current group, and return the value
* if it satisfies the qual.
*/
/* Reset the per-input-tuple context */
ExprContext *tmpcontext = aggstate->tmpcontext;
ResetExprContext(tmpcontext);
bool isNewGroup = false;
outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
{
aggstate->agg_done = true;
isNewGroup = true;
}
else
{
Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
isNewGroup = processNextTuple(aggstate, outerslot);
}
if (isNewGroup)
{
Assert(!TupIsNull(firstSlot));
TupleTableSlot *result =
computeTupleWithFinalAggregate(aggstate, firstSlot);
if (!TupIsNull(result))
{
return result;
}
}
}
return NULL;
}
/*
* isGroupTuple
* Return true when the given tuple is a tuple in a group that
* contribute to the final aggregates.
*
* This is only used in a ROLLUP Agg to distinguish tuples from
* those that are only pass-through.
*/
static bool
isGroupTuple(AggState *aggstate,
TupleTableSlot *outerslot)
{
Agg *node = (Agg*)aggstate->ss.ps.plan;
Assert(node->numCols - node->numNullCols >= 2);
uint64 inputGrouping = node->inputGrouping;
int groupingAttNo = node->grpColIdx[node->numCols - node->numNullCols - 2];
uint64 tupleGrouping =
tuple_grouping(outerslot, inputGrouping, groupingAttNo);
Assert(tupleGrouping <= inputGrouping);
return (tupleGrouping == inputGrouping);
}
/*
* preparePassThruTuple
* Prepare a input tuple to be outputted by the Agg node.
* If this Agg node is the last rollup Agg node, the input tuple
* needs to be finalized.
*
* This function returns NULL if the input tuple does not
* satisfy the qual, or a result tuple that is ready to be
* returned to the upper node.
*/
static TupleTableSlot *
preparePassThruTuple(AggState *aggstate,
TupleTableSlot *outerslot)
{
Agg *node = (Agg*)aggstate->ss.ps.plan;
Assert(node->numCols - node->numNullCols >= 2);
Assert(!TupIsNull(outerslot));
int groupingAttNo = node->grpColIdx[(node->numCols - node->numNullCols) - 2];
int groupIdAttNo = node->grpColIdx[(node->numCols - node->numNullCols) - 1];
TupleTableSlot *result = outerslot;
/*
* If this node is the final rollup, the pass-through input tuples
* need to be finalized before returning.
*/
if (aggstate->aggType == AggTypeFinalRollup)
{
AggStatePerAgg peragg = aggstate->peragg;
AggStatePerGroup perpassthru = aggstate->perpassthru;
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
initialize_aggregates(aggstate, peragg, perpassthru, &(aggstate->mem_manager));
ExprContext *tmpcontext = aggstate->tmpcontext;
tmpcontext->ecxt_scantuple = outerslot;
advance_aggregates(aggstate, perpassthru, &(aggstate->mem_manager));
finalize_aggregates(aggstate, perpassthru);
econtext->ecxt_scantuple = outerslot;
Assert(node->inputHasGrouping);
econtext->grouping =
get_grouping_groupid(econtext->ecxt_scantuple, groupingAttNo);
econtext->group_id =
get_grouping_groupid(econtext->ecxt_scantuple, groupIdAttNo);
/*
* Check the qual. Form and return a projection tuple using the aggregate results
* and the representative input tuple. Note we do not support
* aggregates returning sets ...
*/
result = NULL;
if (ExecQual(aggstate->ss.ps.qual, econtext, false))
{
Gpmon_M_Incr_Rows_Out(GpmonPktFromAggState(aggstate));
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
result = ExecProject(aggstate->ss.ps.ps_ProjInfo, NULL);
}
}
return result;
}
/*
* computeTupleWithFinalAggregate
* Finalize the aggregate values and check whether the result
* satisfies the given qual. If so, form and return a projection
* tuple.
*/
static TupleTableSlot *
computeTupleWithFinalAggregate(AggState *aggstate,
TupleTableSlot *firstSlot)
{
AggStatePerGroup pergroup = aggstate->pergroup;
finalize_aggregates(aggstate, pergroup);
ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
econtext->ecxt_scantuple = firstSlot;
/*
* If this is part of a rollup, we need to mask the grouping attributes that
* are supposedly null in this Agg node.
*/
Agg *node = (Agg*)aggstate->ss.ps.plan;
if (node->numNullCols > 0)
{
slot_getallattrs(firstSlot);
bool *isnull = slot_get_isnull(firstSlot);
for (int attno = node->numCols - node->numNullCols; attno < node->numCols; attno++)
{
isnull[node->grpColIdx[attno] - 1] = true;
}
}
econtext->grouping = node->grouping;
econtext->group_id = node->rollupGSTimes;
/*
* In the second stage of a rollup, grouping and group_id values in the
* target list from the input tuples.
*/
if (aggstate->aggType == AggTypeGroup && node->inputHasGrouping)
{
int groupingAttNo = node->grpColIdx[node->numCols - node->numNullCols - 2];
int groupIdAttNo = node->grpColIdx[node->numCols-node->numNullCols - 1];
econtext->grouping =
get_grouping_groupid(econtext->ecxt_scantuple, groupingAttNo);
econtext->group_id =
get_grouping_groupid(econtext->ecxt_scantuple, groupIdAttNo);
}
/*
* Check the qual (HAVING clause). If it is satisfied, form and return
* a projection tuple using the aggregate results and the representative
* input tuple.
*/
TupleTableSlot *result = NULL;
if (ExecQual(aggstate->ss.ps.qual, econtext, false))
{
Gpmon_M_Incr_Rows_Out(GpmonPktFromAggState(aggstate));
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
result = ExecProject(aggstate->ss.ps.ps_ProjInfo, NULL);
}
return result;
}
/*
* agg_retrieve_rollup
* Compute aggregates for a group in a rollup.
*
* The difference between this Agg and the ordinary group agg is that
* (1) not all input tuples need to be aggregated. Some of input tuples
* are result tuples from 2 or more levels lower in the rollup hierarchy.
* These tuples need to be passed up to the upper node (pass-through).
* (2) all input tuples are part of the result tuples for this Agg node.
*/
static TupleTableSlot *
agg_retrieve_rollup(AggState *aggstate)
{
Assert(((Agg*)aggstate->ss.ps.plan)->aggstrategy == AGG_SORTED);
Assert(((Agg*)aggstate->ss.ps.plan)->inputHasGrouping);
Assert(aggstate->aggType == AggTypeIntermediateRollup ||
aggstate->aggType == AggTypeFinalRollup);
TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot;
PlanState *outerPlan = outerPlanState(aggstate);
TupleTableSlot *outerslot = NULL;
/*
* Read the input tuple until we read a tuple that is
* the first tuple in a group to be aggregated. In the meanwhile,
* all input tuples need to be returned.
*/
while (TupIsNull(firstSlot) &&
aggstate->grp_firstTuple == NULL)
{
/* Reset the per-input-tuple context */
ExprContext *tmpcontext = aggstate->tmpcontext;
ResetExprContext(tmpcontext);
outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
{
aggstate->agg_done = true;
return NULL;
}
Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
/*
* If this tuple needs to be aggregated, this is the first tuple
* in a group. We make a copy of this tuple. Otherwise,
* outputted this tuple as a pass-through tuple. The first tuple
* in a new group will be passed-through below.
*/
if (isGroupTuple(aggstate, outerslot))
{
copyFirstTupleInGroup(aggstate, outerslot);
}
else
{
/*
* If this tuple needs to be pass-through, do it now.
*/
TupleTableSlot *passThruTuple = preparePassThruTuple(aggstate, outerslot);
if (!TupIsNull(passThruTuple))
{
return passThruTuple;
}
}
}
/*
* We loop through input tuples. If a tuple has a different grouping key,
* a new group starts. At this point, we have seen all tuples for the
* previous group, and is able to compute and return the final aggregate
* value for this group.
*
* Note that input tuples also need to be pass-through.
*/
while (!aggstate->agg_done)
{
/*
* If grp_firstTuple is not null, it means that we have read the
* first tuple in a new group. Initialize the working state for
* the new group, and advance the aggregate state for this tuple.
*
* This tuple also needs to be outputted here.
*/
if (aggstate->grp_firstTuple != NULL)
{
processFirstTupleInGroup(aggstate);
/* Pass through the first tuple in this new group now. */
TupleTableSlot *passThruTuple = preparePassThruTuple(aggstate, firstSlot);
if (!TupIsNull(passThruTuple))
{
return passThruTuple;
}
}
/*
* Read and process the next input tuple. If this tuple is a pass-through
* tuple, return it. If this tuple needs to be aggregated, and belongs to
* the same group, simply accumulate its value to the intermediate
* state, and go back to the beginning of the loop to process the
* next input tuple. Otherwise, a new group starts. We finalize
* the aggregate value for the current group, and return the value
* if it satisfies the qual.
*/
/* Reset the per-input-tuple context */
ExprContext *tmpcontext = aggstate->tmpcontext;
ResetExprContext(tmpcontext);
bool isNewGroup = false;
outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
{
aggstate->agg_done = true;
isNewGroup = true;
}
else
{
Assert(!TupIsNull(outerslot));
Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
if (isGroupTuple(aggstate, outerslot))
{
isNewGroup = processNextTuple(aggstate, outerslot);
}
}
if (isNewGroup)
{
Assert(!TupIsNull(firstSlot));
TupleTableSlot *result = computeTupleWithFinalAggregate(aggstate, firstSlot);
if (!TupIsNull(result))
{
return result;
}
}
if (!TupIsNull(outerslot))
{
TupleTableSlot *passThruTuple = preparePassThruTuple(aggstate, outerslot);
if (!TupIsNull(passThruTuple))
{
return passThruTuple;
}
}
}
return NULL;
}
/*
* ExecAgg for non-hashed case.
*/
static TupleTableSlot *
agg_retrieve_direct(AggState *aggstate)
{
if (aggstate->agg_done)
{
return NULL;
}
switch(aggstate->aggType)
{
case AggTypeScalar:
return agg_retrieve_scalar(aggstate);
case AggTypeGroup:
return agg_retrieve_group(aggstate);
case AggTypeIntermediateRollup:
case AggTypeFinalRollup:
return agg_retrieve_rollup(aggstate);
default:
insist_log(false, "invalid Agg node: type %d", aggstate->aggType);
}
return NULL;
}
/*
* ExecAgg for hashed case: retrieve groups from hash table
*/
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
ExprContext *econtext;
ProjectionInfo *projInfo;
Datum *aggvalues;
bool *aggnulls;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
TupleTableSlot *firstSlot;
Agg *node = (Agg *) aggstate->ss.ps.plan;
bool input_has_grouping = node->inputHasGrouping;
bool is_final_rollup_agg =
(node->lastAgg ||
(input_has_grouping && node->numNullCols == 0));
/*
* get state info from node
*/
/* econtext is the per-output-tuple expression context */
econtext = aggstate->ss.ps.ps_ExprContext;
aggvalues = econtext->ecxt_aggvalues;
aggnulls = econtext->ecxt_aggnulls;
projInfo = aggstate->ss.ps.ps_ProjInfo;
peragg = aggstate->peragg;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
if (aggstate->agg_done)
return NULL;
/*
* We loop retrieving groups until we find one satisfying
* aggstate->ss.ps.qual
*/
while (!aggstate->agg_done)
{
HashAggEntry *entry = agg_hash_iter(aggstate);
if (entry == NULL)
{
aggstate->agg_done = TRUE;
return NULL;
}
ResetExprContext(econtext);
/*
* Store the copied first input tuple in the tuple table slot reserved
* for it, so that it can be used in ExecProject.
*/
ExecStoreMemTuple((MemTuple)entry->tuple_and_aggs, firstSlot, false);
pergroup = (AggStatePerGroup)((char *)entry->tuple_and_aggs +
MAXALIGN(memtuple_get_size((MemTuple)entry->tuple_and_aggs,
aggstate->hashslot->tts_mt_bind)));
/*
* Finalize each aggregate calculation, and stash results in the
* per-output-tuple context.
*/
finalize_aggregates(aggstate, pergroup);
/*
* Use the representative input tuple for any references to
* non-aggregated input columns in the qual and tlist.
*/
econtext->ecxt_scantuple = firstSlot;
if (is_final_rollup_agg && input_has_grouping)
{
econtext->group_id =
get_grouping_groupid(econtext->ecxt_scantuple,
node->grpColIdx[node->numCols-node->numNullCols-1]);
econtext->grouping =
get_grouping_groupid(econtext->ecxt_scantuple,
node->grpColIdx[node->numCols-node->numNullCols-2]);
}
else
{
econtext->group_id = node->rollupGSTimes;
econtext->grouping = node->grouping;
}
/*
* Check the qual (HAVING clause); if the group does not match, ignore
* it and loop back to try to process another group.
*/
if (ExecQual(aggstate->ss.ps.qual, econtext, false))
{
/*
* Form and return a projection tuple using the aggregate results
* and the representative input tuple. Note we do not support
* aggregates returning sets ...
*/
Gpmon_M_Incr_Rows_Out(GpmonPktFromAggState(aggstate));
CheckSendPlanStateGpmonPkt(&aggstate->ss.ps);
return ExecProject(projInfo, NULL);
}
}
/* No more groups */
return NULL;
}
/*
* getAggType
* Get the aggType for the given Agg node.
*
* We should really store the type in the Agg struct, and let the planner set the
* correct type. As an intermediate step, we compute the type here.
*/
static int
getAggType(Agg *node)
{
int aggType = AggTypeScalar;
if (node->numCols > 0 &&
(node->lastAgg &&
(node->inputHasGrouping && node->numNullCols > 0)))
{
aggType = AggTypeFinalRollup;
}
else if (node->numCols > 0 &&
(node->inputHasGrouping && node->numNullCols > 0))
{
aggType = AggTypeIntermediateRollup;
}
else if (node->numCols > 0)
{
aggType = AggTypeGroup;
}
else
{
insist_log(node->aggstrategy == AGG_PLAIN, "wrong Agg strategy: %d", node->aggstrategy);
}
return aggType;
}
/* -----------------
* ExecInitAgg
*
* Creates the run-time information for the agg node produced by the
* planner and initializes its outer subtree
* -----------------
*/
AggState *
ExecInitAgg(Agg *node, EState *estate, int eflags)
{
AggState *aggstate;
AggStatePerAgg peragg;
Plan *outerPlan;
ExprContext *econtext;
int numaggs,
aggno;
ListCell *l;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
/*
* create state structure
*/
aggstate = makeNode(AggState);
aggstate->ss.ps.plan = (Plan *) node;
aggstate->ss.ps.state = estate;
aggstate->aggs = NIL;
aggstate->numaggs = 0;
aggstate->eqfunctions = NULL;
aggstate->hashfunctions = NULL;
aggstate->peragg = NULL;
aggstate->agg_done = false;
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
aggstate->hashtable = NULL;
agg_hash_reset_workfile_state(aggstate);
/*
* Create expression contexts. We need two, one for per-input-tuple
* processing and one for per-output-tuple processing. We cheat a little
* by using ExecAssignExprContext() to build both.
*/
ExecAssignExprContext(estate, &aggstate->ss.ps);
aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
ExecAssignExprContext(estate, &aggstate->ss.ps);
/*
* We also need a long-lived memory context for holding hashtable data
* structures and transition values. NOTE: the details of what is stored
* in aggcontext and what is stored in the regular per-query memory
* context are driven by a simple decision: we want to reset the
* aggcontext in ExecReScanAgg to recover no-longer-wanted space.
*/
aggstate->aggcontext =
AllocSetContextCreate(CurrentMemoryContext,
"AggContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
#define AGG_NSLOTS 3
/*
* tuple table initialization
*/
aggstate->ss.ss_ScanTupleSlot = ExecInitExtraTupleSlot(estate);
ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
aggstate->hashslot = ExecInitExtraTupleSlot(estate);
/*
* initialize child expressions
*
* Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs
* contain other agg calls in their arguments. This would make no sense
* under SQL semantics anyway (and it's forbidden by the spec). Because
* that is true, we don't need to worry about evaluating the aggs in any
* particular order.
*/
aggstate->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist,
(PlanState *) aggstate);
aggstate->ss.ps.qual = (List *)
ExecInitExpr((Expr *) node->plan.qual,
(PlanState *) aggstate);
/*
* CDB: Offer extra info for EXPLAIN ANALYZE.
*/
if (estate->es_instrument)
{
/* Allocate string buffer. */
aggstate->ss.ps.cdbexplainbuf = makeStringInfo();
/* Request a callback at end of query. */
aggstate->ss.ps.cdbexplainfun = ExecAggExplainEnd;
}
/*
* initialize child nodes
*/
outerPlan = outerPlan(node);
outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
/*
* initialize source tuple type.
*/
ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
/*
* Initialize result tuple type and projection info.
*/
ExecAssignResultTypeFromTL(&aggstate->ss.ps);
ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
/*
* get the count of aggregates in targetlist and quals
*/
numaggs = aggstate->numaggs;
Assert(numaggs == list_length(aggstate->aggs) + list_length(aggstate->percs));
if (numaggs <= 0)
{
/*
* This is not an error condition: we might be using the Agg node just
* to do hash-based grouping. Even in the regular case,
* constant-expression simplification could optimize away all of the
* Aggrefs in the targetlist and qual. So keep going, but force local
* copy of numaggs positive so that palloc()s below don't choke.
*/
numaggs = 1;
}
/*
* If we are grouping, precompute fmgr lookup data for inner loop. We need
* both equality and hashing functions to do it by hashing, but only
* equality if not hashing.
*/
if (node->numCols > 0)
{
if (node->aggstrategy == AGG_HASHED)
execTuplesHashPrepare(ExecGetScanType(&aggstate->ss),
node->numCols,
node->grpColIdx,
&aggstate->eqfunctions,
&aggstate->hashfunctions);
else
aggstate->eqfunctions =
execTuplesMatchPrepare(ExecGetScanType(&aggstate->ss),
node->numCols,
node->grpColIdx);
}
/*
* Set up aggregate-result storage in the output expr context, and also
* allocate my private per-agg working storage
*/
econtext = aggstate->ss.ps.ps_ExprContext;
econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
aggstate->peragg = peragg;
if (node->aggstrategy == AGG_HASHED)
{
aggstate->hash_needed = get_agg_hash_collist(aggstate);
}
else
{
AggStatePerGroup pergroup;
pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs);
aggstate->pergroup = pergroup;
}
/*
* Perform lookups of aggregate function info, and initialize the
* unchanging fields of the per-agg data. We also detect duplicate
* aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). When
* duplicates are detected, we only make an AggStatePerAgg struct for the
* first one. The clones are simply pointed at the same result entry by
* giving them duplicate aggno values.
*/
aggno = -1;
foreach(l, aggstate->aggs)
{
AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr;
AggStatePerAgg peraggstate;
List *inputTargets = NIL;
List *inputSortClauses = NIL;
Oid *inputTypes = NULL;
int numInputs;
int numArguments;
int numSortCols;
List *sortlist;
HeapTuple aggTuple;
Form_pg_aggregate aggform;
Oid aggtranstype;
AclResult aclresult;
Oid transfn_oid = InvalidOid,
finalfn_oid = InvalidOid;
Expr *transfnexpr,
*finalfnexpr,
*prelimfnexpr;
Datum textInitVal;
int i;
ListCell *lc;
cqContext *pcqCtx;
/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0);
/* Look for a previous duplicate aggregate */
for (i = 0; i <= aggno; i++)
{
if (equal(aggref, peragg[i].aggref) &&
!contain_volatile_functions((Node *) aggref))
break;
}
if (i <= aggno)
{
/* Found a match to an existing entry, so just mark it */
aggrefstate->aggno = i;
continue;
}
/* Nope, so assign a new PerAgg record */
peraggstate = &peragg[++aggno];
/* Mark Aggref state node with assigned index in the result array */
aggrefstate->aggno = aggno;
/* Fill in the peraggstate data */
peraggstate->aggrefstate = aggrefstate;
peraggstate->aggref = aggref;
numArguments = list_length(aggref->args);
peraggstate->numArguments = numArguments;
/*
* Use these information from ExecInitExpr for per agg info.
*/
inputTargets = aggrefstate->inputTargets;
inputSortClauses = aggrefstate->inputSortClauses;
numInputs = list_length(inputTargets);
numSortCols = list_length(inputSortClauses);
peraggstate->numSortCols = numSortCols;
peraggstate->numInputs = numInputs;
/* MPP has some restrictions. */
Assert(!(aggref->aggdistinct && aggref->aggorder));
Assert(numArguments == 1 || !aggref->aggdistinct);
/* Get actual datatypes of the inputs. These could be different from
* the agg's declared input types, when the agg accepts ANY, ANYARRAY
* or ANYELEMENT. The result will have argument types at 0 through
* numArguments-1 and sort key types mixed in or at numArguments through
* numInputs.
*/
inputTypes = (Oid*)palloc0(sizeof(Oid)*(numInputs));
i = 0;
foreach(lc, inputTargets)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
inputTypes[i++] = exprType((Node*)tle->expr);
}
pcqCtx = caql_beginscan(
NULL,
cql("SELECT * FROM pg_aggregate "
" WHERE aggfnoid = :1 ",
ObjectIdGetDatum(aggref->aggfnoid)));
aggTuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(aggTuple))
elog(ERROR, "cache lookup failed for aggregate %u",
aggref->aggfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
/* Check permission to call aggregate function */
aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(aggref->aggfnoid));
switch ( aggref->aggstage) /* MPP */
{
case AGGSTAGE_NORMAL: /* Single-stage aggregation */
peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
break;
case AGGSTAGE_PARTIAL:/* Two-stage aggregation -- preliminary stage */
/* the perliminary stage for two-stage aggregation */
peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
peraggstate->finalfn_oid = finalfn_oid = InvalidOid;
break;
case AGGSTAGE_INTERMEDIATE:
peraggstate->transfn_oid = transfn_oid = aggform->aggprelimfn;
peraggstate->finalfn_oid = finalfn_oid = InvalidOid;
break;
case AGGSTAGE_FINAL: /* Two-stage aggregation - final stage */
peraggstate->transfn_oid = transfn_oid = aggform->aggprelimfn;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
break;
}
peraggstate->prelimfn_oid = aggform->aggprelimfn;
/* Check that aggregate owner has permission to call component fns */
{
Oid aggOwner;
int fetchCount;
aggOwner = caql_getoid_plus(
NULL,
&fetchCount,
NULL,
cql("SELECT proowner FROM pg_proc "
" WHERE oid = :1 ",
ObjectIdGetDatum(aggref->aggfnoid)));
if (!fetchCount)
elog(ERROR, "cache lookup failed for function %u",
aggref->aggfnoid);
aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(transfn_oid));
if (OidIsValid(finalfn_oid))
{
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(finalfn_oid));
}
if (OidIsValid(peraggstate->prelimfn_oid))
{
aclresult = pg_proc_aclcheck(peraggstate->prelimfn_oid, aggOwner,
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(peraggstate->prelimfn_oid));
}
}
/* check if the transition type is polymorphic and if so resolve it */
aggtranstype = resolve_polymorphic_transtype(aggform->aggtranstype,
aggref->aggfnoid,
inputTypes);
/* build expression trees using actual argument & result types */
build_aggregate_fnexprs(inputTypes,
numArguments,
aggtranstype,
aggref->aggtype,
transfn_oid,
finalfn_oid,
InvalidOid /* prelim */,
InvalidOid /* invtrans */,
InvalidOid /* invprelim */,
&transfnexpr,
&finalfnexpr,
&prelimfnexpr,
NULL,
NULL);
fmgr_info(transfn_oid, &peraggstate->transfn);
peraggstate->transfn.fn_expr = (Node *) transfnexpr;
if (OidIsValid(finalfn_oid))
{
fmgr_info(finalfn_oid, &peraggstate->finalfn);
peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
}
if (OidIsValid(peraggstate->prelimfn_oid))
{
fmgr_info(peraggstate->prelimfn_oid, &peraggstate->prelimfn);
peraggstate->prelimfn.fn_expr = (Node *) prelimfnexpr;
}
get_typlenbyval(aggref->aggtype,
&peraggstate->resulttypeLen,
&peraggstate->resulttypeByVal);
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
* field. Must do it the hard way with caql_getattr
*/
textInitVal = caql_getattr(pcqCtx,
Anum_pg_aggregate_agginitval,
&peraggstate->initValueIsNull);
if (peraggstate->initValueIsNull)
peraggstate->initValue = (Datum) 0;
else
peraggstate->initValue = GetAggInitVal(textInitVal,
aggtranstype);
/*
* If the transfn is strict and the initval is NULL, make sure input
* type and transtype are the same (or at least binary-compatible), so
* that it's OK to use the first input value as the initial
* transValue. This should have been checked at agg definition time,
* but just in case...
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
{
if (numArguments < 1 ||
!IsBinaryCoercible(inputTypes[0], aggtranstype))
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate %u needs to have compatible input type and transition type",
aggref->aggfnoid)));
}
/*
* Get a tupledesc corresponding to the inputs (including sort
* expressions) of the agg.
*/
peraggstate->evaldesc = ExecTypeFromTL(inputTargets, false);
/* Create slot we're going to do argument evaluation in */
peraggstate->evalslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc);
/* Set up projection info for evaluation */
peraggstate->evalproj = ExecBuildProjectionInfo(aggrefstate->args,
aggstate->tmpcontext,
peraggstate->evalslot,
NULL);
/*
* If we're doing either DISTINCT or ORDER BY, then we have a list of
* SortGroupClause nodes; fish out the data in them and stick them
* into arrays.
*
* Note that by construction, if there is a DISTINCT clause then the
* ORDER BY clause is a prefix of it (see transformDistinctClause).
*/
if (aggref->aggdistinct)
{
TargetEntry *tle;
SortClause *sc;
Oid eq_function;
/*
* GPDB 4 doesh't implement DISTINCT aggs for aggs having more than
* than one argument, nor does it allow an ordered aggregate to
* specify distinct, but PG 9 does. The SQL standard allows the
* one-arg-for-DISTINCT restriction, but we really we ought to
* implement it the way PG 9 does eventually.
*
* For now we use the scalar equalfn field of AggStatePerAggData
* for DQAs instead of treating DQAs more generally.
*/
if (numArguments != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DISTINCT is supported only for single-argument aggregates")));
eq_function = equality_oper_funcid(inputTypes[0]);
fmgr_info(eq_function, &(peraggstate->equalfn));
tle = (TargetEntry*)linitial(inputTargets);
tle->ressortgroupref = 1;
sc = makeNode(SortClause);
sc->tleSortGroupRef = tle->ressortgroupref;
sc->sortop = ordering_oper_opid(inputTypes[0]);
sortlist = list_make1(sc);
numSortCols = 1;
}
else if ( aggref->aggorder )
{
sortlist = aggref->aggorder->sortClause;
numSortCols = list_length(sortlist);
}
else
{
sortlist = NULL;
numSortCols = 0;
}
peraggstate->numSortCols = numSortCols;
if (numSortCols > 0)
{
/*
* We don't implement DISTINCT or ORDER BY aggs in the HASHED case
* (yet)
*/
Assert(node->aggstrategy != AGG_HASHED);
/* If we have only one input, we need its len/byval info. */
if (numInputs == 1)
{
get_typlenbyval(inputTypes[0],
&peraggstate->inputtypeLen,
&peraggstate->inputtypeByVal);
}
/* Extract the sort information for use later */
peraggstate->sortColIdx =
(AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
peraggstate->sortOperators =
(Oid *) palloc(numSortCols * sizeof(Oid));
i = 0;
foreach(lc, sortlist)
{
SortClause *sortcl = (SortClause *) lfirst(lc);
TargetEntry *tle = get_sortgroupclause_tle(sortcl,
inputTargets);
/* the parser should have made sure of this */
Assert(OidIsValid(sortcl->sortop));
peraggstate->sortColIdx[i] = tle->resno;
peraggstate->sortOperators[i] = sortcl->sortop;
i++;
}
Assert(i == numSortCols);
}
if (aggref->aggdistinct)
{
Oid eqfunc;
Assert(numArguments == 1);
Assert(numSortCols == 1);
/*
* We need the equal function for the DISTINCT comparison we will
* make.
*/
eqfunc = equality_oper_funcid(inputTypes[0]);
fmgr_info(eqfunc, &peraggstate->equalfn);
}
caql_endscan(pcqCtx);
}
/*
* Process percentile expressions. These are treated separately from
* Aggref expressions at the moment as we cannot change the catalog, but
* this will be incorporated into the existing Agggref architecture
* when we can change the catalog. The operation for percentile functions
* is very similar to the Aggref operation except that there is no
* function oid for transition function. We manually manupilate
* FmgrInfo without the oid.
* In case the Agg handles PercentileExpr, there shouldn't be Aggref
* in conjunction with PercentileExpr in the target list (and havingQual),
* or vice versa, from the current design of percentile functions.
* However, we don't assert anything to keep that assumption, for the
* later extensibility.
*/
foreach (l, aggstate->percs)
{
PercentileExprState *percstate = (PercentileExprState *) lfirst(l);
PercentileExpr *perc = (PercentileExpr *) percstate->xprstate.expr;
AggStatePerAgg peraggstate;
FmgrInfo *transfn;
int numArguments;
int i;
Oid trans_argtypes[FUNC_MAX_ARGS];
ListCell *lc;
Expr *dummy_expr;
/* Look for a previous duplicate aggregate */
for (i = 0; i <= aggno; i++)
{
/*
* In practice, percentile expression doesn't contain
* volatile functions since everything is evaluated and
* becomes Var during the preprocess such as ordering operations.
* However, adding a check for volatile may be robust and
* consistent with Aggref initialization.
*/
if (equal(perc, peragg[i].perc) &&
!contain_volatile_functions((Node *) perc))
break;
}
if (i <= aggno)
{
/* Found a match to an existing entry, so just mark it */
percstate->aggno = i;
continue;
}
/* Nope, so assign a new PerAgg record */
peraggstate = &peragg[++aggno];
/* Mark Aggref state node with assigned index in the result array */
percstate->aggno = aggno;
/* Fill in the peraggstate data */
peraggstate->percstate = percstate;
peraggstate->perc = perc;
/*
* numArguments = arg + ORDER BY + pc + tc
* See notes on percentile_cont_trans() and ExecInitExpr() for
* PercentileExpr.
*/
numArguments = list_length(perc->args) + list_length(perc->sortClause) + 2;
peraggstate->numArguments = numArguments;
/*
* Set up transfn. In general, we should use fmgr_info, but we don't
* have the catalog function (thus no oid for functions) due to the
* difficulity of changing the catalog at the moment. This should
* be cleaned when we can change the catalog.
*/
transfn = &peraggstate->transfn;
transfn->fn_nargs = list_length(perc->args) + 1;
transfn->fn_strict = false;
transfn->fn_retset = false;
transfn->fn_mcxt = CurrentMemoryContext;
transfn->fn_addr = perc->perckind == PERC_DISC ?
percentile_disc_trans : percentile_cont_trans;
transfn->fn_oid = InvalidOid;
/*
* trans type is the same as result type, as they don't have final func.
*/
trans_argtypes[0] = perc->perctype;
i = 1;
/*
* Literal arguments.
*/
foreach (lc, perc->args)
{
Node *arg = lfirst(lc);
trans_argtypes[i++] = exprType(arg);
}
/*
* ORDER BY arguments.
*/
foreach (lc, perc->sortTargets)
{
TargetEntry *tle = lfirst(lc);
trans_argtypes[i++] = exprType((Node *) tle->expr);
}
/*
* Peer count and total count.
*/
trans_argtypes[i++] = INT8OID;
trans_argtypes[i++] = INT8OID;
/*
* Build FuncExpr for the transition function.
*/
build_aggregate_fnexprs(trans_argtypes,
i,
perc->perctype,
perc->perctype,
InvalidOid,
InvalidOid,
InvalidOid,
InvalidOid,
InvalidOid,
(Expr **) &transfn->fn_expr,
&dummy_expr, NULL, NULL, NULL);
get_typlenbyval(perc->perctype,
&peraggstate->resulttypeLen,
&peraggstate->resulttypeByVal);
get_typlenbyval(perc->perctype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
/*
* Hard code for the known information.
*/
peraggstate->initValueIsNull = true;
peraggstate->initValue = (Datum) 0;
peraggstate->finalfn_oid = InvalidOid;
peraggstate->prelimfn_oid = InvalidOid;
/*
* Get a tupledesc corresponding to the inputs (including sort
* expressions) of the agg.
*/
peraggstate->evaldesc = ExecTypeFromTL(percstate->tlist, false);
/* Create slot we're going to do argument evaluation in */
peraggstate->evalslot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc);
/* Set up projection info for evaluation */
peraggstate->evalproj = ExecBuildProjectionInfo(percstate->args,
aggstate->tmpcontext,
peraggstate->evalslot,
NULL);
}
/* Update numaggs to match number of unique aggregates found */
aggstate->numaggs = aggno + 1;
/* MPP */
aggstate->hhashtable = NULL;
/* ROLLUP */
aggstate->perpassthru = NULL;
if (node->inputHasGrouping)
{
AggStatePerGroup perpassthru;
perpassthru = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs);
aggstate->perpassthru = perpassthru;
}
aggstate->num_attrs = 0;
aggstate->aggType = getAggType(node);
/* Set the default memory manager */
aggstate->mem_manager.alloc = cxt_alloc;
aggstate->mem_manager.free = cxt_free;
aggstate->mem_manager.manager = aggstate->aggcontext;
aggstate->mem_manager.realloc_ratio = 1;
initGpmonPktForAgg((Plan *)node, &aggstate->ss.ps.gpmon_pkt, estate);
return aggstate;
}
Datum
GetAggInitVal(Datum textInitVal, Oid transtype)
{
Oid typinput,
typioparam;
char *strInitVal;
Datum initVal;
getTypeInputInfo(transtype, &typinput, &typioparam);
strInitVal = DatumGetCString(DirectFunctionCall1(textout, textInitVal));
initVal = OidInputFunctionCall(typinput, strInitVal,
typioparam, -1);
pfree(strInitVal);
return initVal;
}
/*
* Standard API to count tuple table slots used by an execution
* instance of an Agg node.
*
* GPDB precomputes tuple table size, but use of projection means
* aggregates use a slot. Since the count is needed earlier, we
* than the determination of then number of different aggregate
* call that happens during initializaiton, we just count Aggref
* nodes. This may be an over count (in case some aggregate
* calls are duplicated), but shouldn't be too bad.
*/
int
ExecCountSlotsAgg(Agg *node)
{
int nextraslots = 0;
nextraslots += count_extra_agg_slots((Node*)node->plan.targetlist);
nextraslots += count_extra_agg_slots((Node*)node->plan.qual);
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
nextraslots + /* may be high due to duplicate Aggref nodes. */
AGG_NSLOTS;
}
void
ExecEndAgg(AggState *node)
{
PlanState *outerPlan;
ExecEagerFreeAgg(node);
/*
* Free both the expr contexts.
*/
ExecFreeExprContext(&node->ss.ps);
node->ss.ps.ps_ExprContext = node->tmpcontext;
ExecFreeExprContext(&node->ss.ps);
/* clean up tuple table */
ExecClearTuple(node->ss.ss_ScanTupleSlot);
if (node->num_attrs > 0)
{
pfree(node->replValues);
pfree(node->replIsnull);
pfree(node->doReplace);
}
MemoryContextDelete(node->aggcontext);
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
EndPlanStateGpmonPkt(&node->ss.ps);
}
void
ExecReScanAgg(AggState *node, ExprContext *exprCtxt)
{
ExprContext *econtext = node->ss.ps.ps_ExprContext;
ExecEagerFreeAgg(node);
/*
* Release all temp storage. Note that with AGG_HASHED, the hash table
* is allocated in a sub-context of the aggcontext. We're going to
* rebuild the hash table from scratch, so we need to use
* MemoryContextResetAndDeleteChildren() to avoid leaking the old hash
* table's memory context header.
*/
MemoryContextResetAndDeleteChildren(node->aggcontext);
/* Re-initialize some variables */
node->agg_done = false;
ExecClearTuple(node->ss.ss_ScanTupleSlot);
/* Forget current agg values */
MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
if (!IS_HASHAGG(node))
{
/*
* Reset the per-group state (in particular, mark transvalues null)
*/
MemSet(node->pergroup, 0,
sizeof(AggStatePerGroupData) * node->numaggs);
}
if (((Agg *) node->ss.ps.plan)->inputHasGrouping)
{
/*
* Reset the per-passthru state (in particular, mark transvalues null)
*/
MemSet(node->perpassthru, 0,
sizeof(AggStatePerGroupData) * node->numaggs);
}
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (((PlanState *) node)->lefttree->chgParam == NULL)
ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
}
/*
* ExecAggExplainEnd
* Called before ExecutorEnd to finish EXPLAIN ANALYZE reporting.
*/
void
ExecAggExplainEnd(PlanState *planstate, struct StringInfoData *buf)
{
AggState *aggstate = (AggState *)planstate;
/* Report executor memory used by our memory context. */
planstate->instrument->execmemused +=
(double)MemoryContextGetPeakSpace(aggstate->aggcontext);
} /* ExecAggExplainEnd */
/*
* aggregate_dummy - dummy execution routine for aggregate functions
*
* This function is listed as the implementation (prosrc field) of pg_proc
* entries for aggregate functions. Its only purpose is to throw an error
* if someone mistakenly executes such a function in the normal way.
*
* Perhaps someday we could assign real meaning to the prosrc field of
* an aggregate?
*/
Datum
aggregate_dummy(PG_FUNCTION_ARGS)
{
elog(ERROR, "aggregate function %u called as normal function",
fcinfo->flinfo->fn_oid);
return (Datum) 0; /* keep compiler quiet */
}
/* resolve actual type of transition state, if polymorphic */
Oid
resolve_polymorphic_transtype(Oid aggtranstype, Oid aggfnoid,
Oid *inputTypes)
{
if (aggtranstype == ANYARRAYOID || aggtranstype == ANYELEMENTOID)
{
/* have to fetch the agg's declared input types... */
Oid *declaredArgTypes;
int agg_nargs;
(void) get_func_signature(aggfnoid, &declaredArgTypes, &agg_nargs);
aggtranstype = enforce_generic_type_consistency(inputTypes,
declaredArgTypes,
agg_nargs,
aggtranstype);
pfree(declaredArgTypes);
}
return aggtranstype;
}
/*
* tuple_grouping - return the GROUPING value for an input tuple.
*
* This is used for a ROLLUP.
*/
int64
tuple_grouping(TupleTableSlot *outerslot,
int input_grouping,
int grpingIdx)
{
Datum grping_datum;
int64 grping;
bool isnull;
/* Simple return 0 if input_grouping is 0. */
if (input_grouping == 0)
return 0;
grping_datum = slot_getattr(outerslot, grpingIdx, &isnull);
Assert(!isnull);
grping = DatumGetInt64(grping_datum);
return grping;
}
/*
* get_grouping_groupid() -- return either grouping or group_id
* as given in 'grping_attno'.
*/
uint64
get_grouping_groupid(TupleTableSlot *slot, int grping_attno)
{
bool isnull;
uint64 grouping;
/* Obtain grouping or group_id from input */
Datum grping_datum = slot_getattr(slot,
grping_attno,
&isnull);
Assert(!isnull);
grouping = DatumGetInt64(grping_datum);
return grouping;
}
void
initGpmonPktForAgg(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, Agg));
{
PerfmonNodeType type = PMNT_Invalid;
switch(((Agg*)planNode)->aggstrategy)
{
case AGG_PLAIN:
type = PMNT_Aggregate;
break;
case AGG_SORTED:
type = PMNT_GroupAggregate;
break;
case AGG_HASHED:
type = PMNT_HashAggregate;
break;
}
Assert(type != PMNT_Invalid);
Assert(GPMON_AGG_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, type,
(int64)planNode->plan_rows,
NULL);
}
}
/*
* Combine the argument and sortkey expressions of an Aggref
* node into a single target list (of TargetEntry*) and, if
* needed, an associated sort key list (of SortClause*).
*
* The explicit result is a palloc'd target list incorporating
* the underlying expressions by reference. (Everything but
* the expressions is newly allocated.)
*
* The implicit result, if requested by passing a non-null
* pointer in sort_clauses, is a palloc'd sort key list.
*/
List *
combineAggrefArgs(Aggref *aggref, List **sort_clauses)
{
ListCell *lc;
TargetEntry *tle;
List *inputTargets = NIL;
List *inputSorts = NIL;
int i = 0;
/* In GPDB, can't have it both ways. */
Assert( !aggref->aggdistinct || aggref->aggorder == NULL );
/* Target list for cataloged aggregate arguments. */
foreach(lc, aggref->args)
{
TargetEntry *tle = makeNode(TargetEntry);
tle->expr = (Expr*)lfirst(lc);
tle->resno = ++i;
inputTargets = lappend(inputTargets, tle);
}
if ( aggref->aggorder != NULL )
{
/* Add targets and sort clauses for call supplied ordering. */
inputSorts = aggref->aggorder->sortClause;
if ( sort_clauses != NULL )
inputSorts = (List*)copyObject(inputSorts);
foreach(lc, inputSorts)
{
SortClause *sc = (SortClause*)lfirst(lc);
TargetEntry *newtle;
tle = get_sortgroupclause_tle(sc, aggref->aggorder->sortTargets);
/* XXX Is it worth looking for tle->expr in the tlist so far to avoid copy? */
newtle = makeNode(TargetEntry);
newtle->expr = tle->expr; /* by reference */
newtle->resno = ++i;
newtle->resname = tle->resname ? pstrdup(tle->resname) : NULL;
newtle->ressortgroupref = tle->ressortgroupref;
inputTargets = lappend(inputTargets, newtle);
}
}
else if ( aggref->aggdistinct )
{
SortClause *sc;
/* In GPDB, DISTINCT implies single argument. */
Assert( list_length(inputTargets) == 1 );
/* Add targets and sort clauses for implied DISTINCT ordering. */
tle = (TargetEntry*)linitial(inputTargets);
tle->ressortgroupref = 1;
if ( sort_clauses != NULL )
{
sc = makeNode(SortClause);
sc->tleSortGroupRef = tle->ressortgroupref;
inputSorts = list_make1(sc);
}
}
if ( sort_clauses != NULL )
*sort_clauses = inputSorts;
return inputTargets;
}
/*
* Combine the argument and ordering expression with the peer count
* and the total count, to create the TupleTableSlot for this
* expression. This is similar to ordered aggregate Aggref,
* but the difference is that PercentileExpr will accept those
* additional values as the arguments to the transition function.
*/
List *
combinePercentileArgs(PercentileExpr *p)
{
List *tlist;
ListCell *l;
AttrNumber resno;
TargetEntry *tle;
tlist = NIL;
resno = 1;
foreach (l, p->args)
{
Expr *arg = lfirst(l);
tle = makeTargetEntry((Expr *) arg,
resno++,
NULL,
false);
tlist = lappend(tlist, tle);
}
/*
* Extract ordering expressions from sortTargets.
*/
foreach (l, p->sortClause)
{
SortClause *sc = lfirst(l);
TargetEntry *sc_tle;
sc_tle = get_sortgroupclause_tle(sc, p->sortTargets);
tle = flatCopyTargetEntry(sc_tle);
tle->resno = resno++;
tlist = lappend(tlist, tle);
}
/*
* peer count expresssion.
*/
Assert(p->pcExpr);
tle = makeTargetEntry((Expr *) p->pcExpr,
resno++,
"peer_count",
false);
tlist = lappend(tlist, tle);
/*
* total count expresssion.
*/
Assert(p->tcExpr);
tle = makeTargetEntry((Expr *) p->tcExpr,
resno++,
"total_count",
false);
tlist = lappend(tlist, tle);
return tlist;
}
/*
* Subroutines for ExecCountSlotsAgg.
*/
int
count_extra_agg_slots(Node *node)
{
int count = 0;
count_extra_agg_slots_walker(node, &count);
return count;
}
bool
count_extra_agg_slots_walker(Node *node, int *count)
{
if (node == NULL)
return false;
if (IsA(node, Aggref))
{
(*count)++;
}
else if (IsA(node, PercentileExpr))
{
(*count)++;
}
return expression_tree_walker(node, count_extra_agg_slots_walker, (void *) count);
}
void
ExecEagerFreeAgg(AggState *node)
{
/* Close any open tuplesorts */
for (int aggno = 0; aggno < node->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &node->peragg[aggno];
if (!peraggstate->sortstate)
{
continue;
}
if (gp_enable_mk_sort)
{
tuplesort_end_mk((Tuplesortstate_mk *) peraggstate->sortstate);
}
else
{
tuplesort_end((Tuplesortstate *) peraggstate->sortstate);
}
peraggstate->sortstate = NULL;
}
if (IS_HASHAGG(node))
{
destroy_agg_hash_table(node);
/**
* Clean out the tuple descriptor.
*/
if (node->hashslot
&& node->hashslot->tts_tupleDescriptor)
{
ReleaseTupleDesc(node->hashslot->tts_tupleDescriptor);
node->hashslot->tts_tupleDescriptor = NULL;
}
}
/* Release first tuple of group, if we have made a copy. */
if (node->grp_firstTuple != NULL)
{
pfree(node->grp_firstTuple);
node->grp_firstTuple = NULL;
}
}