| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <unistd.h> |
| #include "postgres.h" |
| #include "fmgr.h" |
| #include "miscadmin.h" |
| #include "access/hash.h" |
| #include "catalog/catquery.h" |
| #include "catalog/pg_aggregate.h" |
| #include "catalog/pg_proc.h" |
| #include "cdb/cdbexplain.h" |
| #include "cdb/cdbvars.h" |
| #include "executor/execHHashagg.h" |
| #include "executor/instrument.h" |
| #include "nodes/pg_list.h" |
| #include "optimizer/clauses.h" |
| #include "optimizer/tlist.h" |
| #include "parser/parse_agg.h" |
| #include "parser/parse_coerce.h" |
| #include "parser/parse_expr.h" |
| #include "parser/parse_oper.h" |
| #include "utils/acl.h" |
| #include "utils/array.h" |
| #include "utils/builtins.h" |
| #include "utils/lsyscache.h" |
| #include "utils/numeric.h" |
| |
| #include "vagg.h" |
| #include "vtype.h" |
| #include "execVQual.h" |
| #include "tuplebatch.h" |
| #include "vexecutor.h" |
| |
| #define HHA_MSG_LVL DEBUG2 |
| |
| extern int BATCHSIZE; |
| |
| /* |
| * copy from src/backend/executor/execHHashagg.c |
| * Represent different types for input records to be inserted |
| * into the hash table. |
| */ |
| #define GET_BUFFER_SIZE(hashtable) \ |
| ((hashtable)->entry_buf.nfull_total * (hashtable)->entry_buf.cellbytes + \ |
| mpool_total_bytes_allocated((hashtable)->group_buf)) |
| |
| #define GET_USED_BUFFER_SIZE(hashtable) \ |
| ((hashtable)->entry_buf.nfull_total * (hashtable)->entry_buf.cellbytes + \ |
| mpool_bytes_used((hashtable)->group_buf)) |
| |
| #define SANITY_CHECK_METADATA_SIZE(hashtable) \ |
| do { \ |
| if ((hashtable)->mem_for_metadata >= (hashtable)->max_mem) \ |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR), \ |
| errmsg(ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)));\ |
| } while (0) |
| |
| #define GET_TOTAL_USED_SIZE(hashtable) \ |
| (GET_USED_BUFFER_SIZE(hashtable) + (hashtable)->mem_for_metadata) |
| |
| #define HAVE_FREESPACE(hashtable) \ |
| (GET_TOTAL_USED_SIZE(hashtable) < (hashtable)->max_mem) |
| |
| |
| /* |
| * implement the SUM aggregate functions. |
| * With the different column type of aggregate function, the transfer |
| * data type is different too. |
| * |
| * this MACRO only used by the int2 and int4. |
| */ |
| #define _VACCUM_NUMERIC(type) \ |
| PG_FUNCTION_INFO_V1(v##type##_accum); \ |
| Datum v##type##_accum(PG_FUNCTION_ARGS) \ |
| { \ |
| VectorizedAggData *vectransdata = \ |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); \ |
| BatchAggGroupData *groupdata = vectransdata->groupData; \ |
| GroupData *curHeader = NULL; \ |
| v##type *vvalue = (v##type *)DatumGetPointer(PG_GETARG_DATUM(1)); \ |
| int idx = 0;\ |
| int64 sum = 0; \ |
| bool allnull = true; \ |
| Assert(NULL != vectransdata); \ |
| if(NULL != groupdata) \ |
| { \ |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); \ |
| } \ |
| if(vectransdata->isnovalue) \ |
| { \ |
| vectransdata->data = 0; \ |
| vectransdata->isnull = true; \ |
| vectransdata->isnovalue = false; \ |
| } \ |
| if(curHeader) \ |
| { \ |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) \ |
| { \ |
| Assert(NULL != vvalue->isnull); \ |
| if(vvalue->isnull[idx]) \ |
| continue; \ |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) \ |
| continue; \ |
| allnull = false; \ |
| sum = sum + vvalue->values[idx];\ |
| } \ |
| } \ |
| else \ |
| { \ |
| for(idx = 0; idx < vvalue->dim; idx++) \ |
| { \ |
| Assert(NULL != vvalue->isnull); \ |
| if(vvalue->isnull[idx]) \ |
| continue; \ |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) \ |
| continue; \ |
| allnull = false; \ |
| sum = sum + vvalue->values[idx];\ |
| } \ |
| } \ |
| if(!allnull) \ |
| vectransdata->isnull = false; \ |
| vectransdata->data += sum; \ |
| return PointerGetDatum(vectransdata); \ |
| } |
| |
| /* |
| * for sum(int8) |
| * the transfer data type of sum(int8) is numeric |
| */ |
| PG_FUNCTION_INFO_V1(vint8_accum); |
| Datum vint8_accum(PG_FUNCTION_ARGS) |
| { |
| VectorizedAggData *vectransdata = |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); |
| BatchAggGroupData *groupdata = vectransdata->groupData; |
| GroupData *curHeader = NULL; |
| vint8 *vvalue = (vint8 *)DatumGetPointer(PG_GETARG_DATUM(1)); |
| int idx = 0; |
| int64 sum = 0; |
| Datum newval; |
| bool allnull = true; |
| Assert(NULL != vectransdata); |
| if(NULL != groupdata) |
| { |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); |
| } |
| if( vectransdata->isnull || vectransdata->isnovalue) |
| { |
| vectransdata->data = DirectFunctionCall1(int8_numeric, Int64GetDatum(0)); |
| vectransdata->isnull = true; |
| vectransdata->isnovalue = false; |
| vectransdata->isalloc = true; |
| } |
| |
| if(curHeader) |
| { |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) |
| { |
| Assert(NULL != vvalue->isnull); |
| if(vvalue->isnull[idx]) |
| continue; |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| sum = sum + DatumGetInt64(vvalue->values[idx]); |
| allnull = false; |
| } |
| } |
| else |
| { |
| for(idx = 0; idx < vvalue->dim; idx++) |
| { |
| Assert(NULL != vvalue->isnull); |
| if(vvalue->isnull[idx]) |
| continue; |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| sum = sum + DatumGetInt64(vvalue->values[idx]); |
| allnull = false; |
| } |
| } |
| |
| if(!allnull) |
| { |
| vectransdata->isnull = false; |
| vectransdata->isalloc = true; |
| /* OK to do the addition. */ |
| newval = DirectFunctionCall1(int8_numeric, Int64GetDatum(sum)); |
| vectransdata->data = DirectFunctionCall2(numeric_add, |
| vectransdata->data, newval); |
| } |
| return PointerGetDatum(vectransdata); |
| } |
| |
| /* |
| * for sum(float4) |
| * the transfer data type of sum(float4) is float4 |
| */ |
| PG_FUNCTION_INFO_V1(vfloat4_accum); |
| Datum vfloat4_accum(PG_FUNCTION_ARGS) |
| { |
| VectorizedAggData *vectransdata = |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); |
| BatchAggGroupData *groupdata = vectransdata->groupData; |
| GroupData *curHeader = NULL; |
| vfloat4 *vvalue = (vfloat4 *)DatumGetPointer(PG_GETARG_DATUM(1)); |
| int idx = 0; |
| float4 sum = 0; |
| bool allnull = true; |
| Datum newval; |
| Assert(NULL != vectransdata); |
| if(NULL != groupdata) |
| { |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); |
| } |
| if( vectransdata->isnull || vectransdata->isnovalue) |
| { |
| vectransdata->data = Float4GetDatum(0); |
| vectransdata->isnull = true; |
| vectransdata->isnovalue = false; |
| } |
| |
| if(curHeader) |
| { |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) |
| { |
| Assert(NULL != vvalue->isnull); |
| if(vvalue->isnull[idx]) |
| continue; |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| sum = sum + DatumGetFloat4(vvalue->values[idx]); |
| allnull = false; |
| } |
| } |
| else |
| { |
| for(idx = 0; idx < vvalue->dim; idx++) |
| { |
| Assert(NULL != vvalue->isnull); |
| if(vvalue->isnull[idx]) |
| continue; |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| sum = sum + DatumGetFloat4(vvalue->values[idx]); |
| allnull = false; |
| } |
| } |
| |
| if(!allnull) |
| vectransdata->isnull = false; |
| |
| /* OK to do the addition. */ |
| sum = sum + DatumGetFloat4(vectransdata->data); |
| vectransdata->data = Float4GetDatum(sum); |
| return PointerGetDatum(vectransdata); |
| } |
| |
| /* |
| * for sum(float8) |
| * the transfer data type of sum(float8) is float8 |
| */ |
| PG_FUNCTION_INFO_V1(vfloat8_accum); |
| Datum vfloat8_accum(PG_FUNCTION_ARGS) |
| { |
| VectorizedAggData *vectransdata = |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); |
| BatchAggGroupData *groupdata = vectransdata->groupData; |
| GroupData *curHeader = NULL; |
| vfloat8 *vvalue = (vfloat8 *)DatumGetPointer(PG_GETARG_DATUM(1)); |
| int idx = 0; |
| float8 sum = 0; |
| bool allnull = true; |
| Datum newval; |
| Assert(NULL != vectransdata); |
| if(NULL != groupdata) |
| { |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); |
| } |
| if( vectransdata->isnull || vectransdata->isnovalue) |
| { |
| vectransdata->data = Float8GetDatum(0); |
| vectransdata->isnull = true; |
| vectransdata->isnovalue = false; |
| } |
| |
| if(curHeader) |
| { |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) |
| { |
| Assert(NULL != vvalue->isnull); |
| if(vvalue->isnull[idx]) |
| continue; |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| sum = sum + DatumGetFloat8(vvalue->values[idx]); |
| allnull = false; |
| } |
| } |
| else |
| { |
| for(idx = 0; idx < vvalue->dim; idx++) |
| { |
| Assert(NULL != vvalue->isnull); |
| if(vvalue->isnull[idx]) |
| continue; |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| sum = sum + DatumGetFloat8(vvalue->values[idx]); |
| allnull = false; |
| } |
| } |
| |
| if(!allnull) |
| vectransdata->isnull = false; |
| |
| /* OK to do the addition. */ |
| sum = sum + DatumGetFloat8(vectransdata->data); |
| vectransdata->data = Float8GetDatumFast(sum); |
| return PointerGetDatum(vectransdata); |
| } |
| |
| /* |
| * implement the AVG aggregate functions. |
| */ |
| #define _VAVG_NUMERIC(type, XTYPE) \ |
| PG_FUNCTION_INFO_V1(v##type##_avg_accum); \ |
| Datum v##type##_avg_accum(PG_FUNCTION_ARGS) \ |
| { \ |
| VectorizedAggData *vectransdata = \ |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); \ |
| IntFloatAvgTransdata *transdata = NULL;\ |
| BatchAggGroupData *groupdata = vectransdata->groupData; \ |
| GroupData *curHeader = NULL; \ |
| v##type *vvalue = (v##type *)DatumGetPointer(PG_GETARG_DATUM(1)); \ |
| int idx = 0;\ |
| int64 count = 0; \ |
| float8 sum = 0; \ |
| Assert(NULL != vectransdata); \ |
| if(NULL != groupdata) \ |
| { \ |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); \ |
| } \ |
| if(vectransdata->isnull || vectransdata->isnovalue) \ |
| { \ |
| transdata = \ |
| (IntFloatAvgTransdata *) palloc(sizeof(IntFloatAvgTransdata)); \ |
| SET_VARSIZE(transdata, sizeof(IntFloatAvgTransdata)); \ |
| transdata->count = 0; \ |
| transdata->sum = 0; \ |
| vectransdata->isnull = false; \ |
| vectransdata->isnovalue = false; \ |
| vectransdata->isalloc = true;\ |
| vectransdata->data = PointerGetDatum(transdata); \ |
| } \ |
| else \ |
| { \ |
| transdata= (IntFloatAvgTransdata*)DatumGetPointer(vectransdata->data);\ |
| } \ |
| if(curHeader) \ |
| { \ |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) \ |
| { \ |
| Assert(NULL != vvalue->isnull); \ |
| if(vvalue->isnull[idx]) \ |
| continue; \ |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) \ |
| continue; \ |
| sum = sum + DatumGet##XTYPE(vvalue->values[idx]);\ |
| count++; \ |
| } \ |
| } \ |
| else \ |
| { \ |
| for(idx = 0; idx < vvalue->dim; idx++) \ |
| { \ |
| if(vvalue->isnull[idx]) \ |
| continue; \ |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) \ |
| continue; \ |
| sum = sum + DatumGet##XTYPE(vvalue->values[idx]);\ |
| count++; \ |
| } \ |
| } \ |
| transdata->count += count; \ |
| transdata->sum += sum; \ |
| return PointerGetDatum(vectransdata); \ |
| } |
| |
| |
| /* |
| * implement the COUNT(column) aggregate functions. |
| */ |
| #define _VINC_NUMERIC(type) \ |
| PG_FUNCTION_INFO_V1(v##type##_inc); \ |
| Datum v##type##_inc(PG_FUNCTION_ARGS) \ |
| { \ |
| VectorizedAggData *vectransdata = \ |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); \ |
| BatchAggGroupData *groupdata = vectransdata->groupData; \ |
| GroupData *curHeader = NULL; \ |
| int64 tval = 0; \ |
| v##type* vvalue = (v##type *)DatumGetPointer(PG_GETARG_DATUM(1)); \ |
| int idx; \ |
| Assert(NULL != vectransdata); \ |
| if(NULL != groupdata) \ |
| { \ |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); \ |
| } \ |
| if( vectransdata->isnull || vectransdata->isnovalue) \ |
| { \ |
| vectransdata->data = Int64GetDatum(0); \ |
| vectransdata->isnull = false; \ |
| vectransdata->isnovalue = false; \ |
| } \ |
| if(curHeader) \ |
| { \ |
| Assert(NULL != vvalue->isnull); \ |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) \ |
| { \ |
| if(vvalue->isnull[idx]) \ |
| continue; \ |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) \ |
| continue; \ |
| tval++; \ |
| } \ |
| } \ |
| else \ |
| { \ |
| Assert(NULL != vvalue->isnull); \ |
| for(idx = 0; idx < vvalue->dim; idx++) \ |
| { \ |
| if(vvalue->isnull[idx]) \ |
| continue; \ |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) \ |
| continue; \ |
| tval++; \ |
| } \ |
| } \ |
| vectransdata->data = (vectransdata->data + tval); \ |
| return PointerGetDatum(vectransdata); \ |
| } |
| |
| |
| /* |
| * implement the COUNT(*) aggregate functions. |
| * NOTE:we create an new aggregate functions 'veccount' to implement the |
| * vectorized couter aggregate, In the CHECK phase(vcheck.c), if we found |
| * an count(*), we use veccount(*) to replace it. |
| */ |
| PG_FUNCTION_INFO_V1(vec_inc_any); |
| Datum vec_inc_any(PG_FUNCTION_ARGS) |
| { |
| VectorizedAggData *vectransdata = |
| (VectorizedAggData *) PG_GETARG_BYTEA_P(0); |
| BatchAggGroupData *groupdata = vectransdata->groupData; |
| GroupData *curHeader = NULL; |
| int64 tval = 0; |
| int idx; |
| Assert(NULL != vectransdata); |
| if(NULL != groupdata) |
| { |
| curHeader = &(groupdata->group_header[groupdata->group_idx]); |
| } |
| if( vectransdata->isnull || vectransdata->isnovalue) |
| { |
| vectransdata->data = Int64GetDatum(0); |
| vectransdata->isnull = false; |
| vectransdata->isnovalue = false; |
| } |
| if(curHeader) |
| { |
| for(idx = curHeader->idx; idx != -1; idx = groupdata->idx_list[idx]) |
| { |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| tval++; |
| } |
| } |
| else |
| { |
| for(idx = 0; idx < vectransdata->nrows; idx++) |
| { |
| if(NULL != vectransdata->skip && vectransdata->skip[idx]) |
| continue; |
| tval++; |
| } |
| } |
| vectransdata->data = Int64GetDatum(DatumGetInt64(vectransdata->data) + tval); |
| return PointerGetDatum(vectransdata); |
| } |
| |
| _VACCUM_NUMERIC(int2) |
| _VACCUM_NUMERIC(int4) |
| |
| #define VACCUM_NUMERIC(type, XTYPE) \ |
| _VAVG_NUMERIC(type, XTYPE) \ |
| _VINC_NUMERIC(type) |
| |
| VACCUM_NUMERIC(int2, Int16) |
| VACCUM_NUMERIC(int4, Int32) |
| VACCUM_NUMERIC(int8, Int64) |
| VACCUM_NUMERIC(float4, Float4) |
| VACCUM_NUMERIC(float8, Float8) |
| |
| |
| /* |
| * copy from src/backend/executor/nodeagg.c |
| * |
| * Advance all the aggregates for one input tuple. The input tuple |
| * has been stored in tmpcontext->ecxt_scantuple, so that it is accessible |
| * to ExecEvalExpr. pergroup is the array of per-group structs to use |
| * (this might be in a hashtable entry). |
| * |
| * When called, CurrentMemoryContext should be the per-query context. |
| */ |
| static void |
| advance_vaggregates(AggState *aggstate, AggStatePerGroup pergroup, |
| MemoryManagerContainer *mem_manager) |
| { |
| int aggno; |
| TupleBatch tb; |
| VectorizedState *vstate = ((PlanState*)aggstate)->vectorized; |
| |
| Assert(NULL != vstate); |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; |
| AggStatePerGroup pergroupstate = &pergroup[aggno]; |
| Aggref *aggref = peraggstate->aggref; |
| PercentileExpr *perc = peraggstate->perc; |
| int i; |
| TupleTableSlot *slot; |
| int nargs; |
| |
| /* We can apply the transition function immediately */ |
| FunctionCallInfoData fcinfo; |
| |
| if (aggref) |
| nargs = list_length(aggref->args); |
| else |
| { |
| Assert (perc); |
| nargs = list_length(perc->args); |
| } |
| |
| /* Evaluate the current input expressions for this aggregate */ |
| Assert(NULL != vstate->aggslot); |
| slot = vstate->aggslot[aggno]; |
| |
| Assert(NULL != slot); |
| |
| tb = (TupleBatch)slot->PRIVATE_tb; |
| |
| /* Load values into fcinfo */ |
| /* Start from 1, since the 0th arg will be the transition value */ |
| if (aggref) |
| { |
| for (i = 0; i < nargs; i++) |
| { |
| tb->datagroup[i]->skipref = tb->skip; |
| fcinfo.arg[i + 1] = PointerGetDatum(tb->datagroup[i]); |
| fcinfo.argnull[i + 1] = false; |
| } |
| |
| } |
| else |
| { |
| /* |
| * In case of percentile functions, put everything into |
| * fcinfo's argument since there should be the required |
| * attributes as arguments in the tuple. |
| */ |
| int natts; |
| |
| Assert(perc); |
| natts = slot->tts_tupleDescriptor->natts; |
| for (i = 0; i < natts; i++) |
| { |
| fcinfo.arg[i + 1] = PointerGetDatum(tb->datagroup[i]); |
| fcinfo.argnull[i + 1] = false; |
| } |
| } |
| |
| advance_transition_function(aggstate, peraggstate, pergroupstate, |
| &fcinfo, mem_manager); |
| } /* aggno loop */ |
| } |
| |
| /* |
| * copy from src/backend/executor/nodeagg.c |
| * Function: setGroupAggs |
| * |
| * Set the groupaggs buffer in the hashtable to point to the right place |
| * in the given hash entry. |
| */ |
| static inline void |
| setGroupAggs(HashAggTable *hashtable, MemTupleBinding *mt_bind, HashAggEntry *entry) |
| { |
| Assert(mt_bind != NULL); |
| |
| if (entry != NULL) |
| { |
| int tup_len = memtuple_get_size((MemTuple)entry->tuple_and_aggs, mt_bind); |
| hashtable->groupaggs->tuple = (MemTuple)entry->tuple_and_aggs; |
| hashtable->groupaggs->aggs = (AggStatePerGroup) |
| ((char *)entry->tuple_and_aggs + MAXALIGN(tup_len)); |
| } |
| } |
| |
| /* |
| * copy from src/backend/executor/execHHashagg.c |
| * |
| * Calculate the hash value for the given input tuple. |
| * |
| * This based on but different from get_hash_value from the dynahash |
| * API. Use a different name to underline that we don't use dynahash. |
| */ |
| static uint32 |
| calc_hash_value(AggState* aggstate, TupleTableSlot *inputslot) |
| { |
| Agg *agg; |
| ExprContext *econtext; |
| MemoryContext oldContext; |
| int i; |
| FmgrInfo* info = aggstate->hashfunctions; |
| HashAggTable *hashtable = aggstate->hhashtable; |
| |
| agg = (Agg*)aggstate->ss.ps.plan; |
| econtext = aggstate->tmpcontext; /* short-lived, per-input-tuple */ |
| |
| oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| |
| for (i = 0; i < agg->numCols; i++, info++) |
| { |
| AttrNumber att = agg->grpColIdx[i]; |
| bool isnull = false; |
| Datum value = slot_getattr(inputslot, att, &isnull); |
| |
| if (!isnull) /* treat nulls as having hash key 0 */ |
| { |
| hashtable->hashkey_buf[i] = DatumGetUInt32(FunctionCall1(info, value)); |
| } |
| |
| else |
| hashtable->hashkey_buf[i] = 0xdeadbeef; |
| |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| return (uint32) hash_any((unsigned char *) hashtable->hashkey_buf, agg->numCols * sizeof(HashKey)); |
| } |
| |
| /* |
| * Vectorized Data |
| * |
| * To implement an aggregate function often have 3 phase: |
| * 1) Set the initialized values. |
| * 2) process data, it used "transfer data" for communication. |
| * 3) finalize aggregate functions, to get the final result. |
| * To implement the vectorized aggregate functions, we refactor the phase 2. |
| * we don't use the "transfer data" directly when the aggregate functions |
| * can be vectorized, we create a VectorizedAggData to encapsulates the |
| * "transfer data", and also set the groupping data in it, then the vectorized |
| * aggregate functions can get the grouping data and process it. |
| * Because "transfer data" is used as a Datum, and we use |
| * VectorizedAggData as a Datum too, so it can communicate properly. |
| * when the phase 2 is over, we strip the VectorizedAggData, it convert to |
| * "transfer data" again. |
| * Because the result of phase 2 is a scalar, so we |
| * can use the non-vectorized finalize aggregate functions directly. |
| */ |
| |
| /* |
| * Init Vectorized data, in order to transfer group data into the agg functions |
| */ |
| VectorizedAggData * |
| InitAggVectorizedData(AggState *aggstate) |
| { |
| int aggno; |
| |
| VectorizedAggData *trans = |
| (VectorizedAggData *) palloc0(sizeof(VectorizedAggData) * aggstate->numaggs); |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| SET_VARSIZE(&trans[aggno], sizeof(VectorizedAggData)); |
| } |
| |
| return trans; |
| } |
| |
| /* |
| * Use the new transfer data, it can send the group data to agg functions. |
| */ |
| static inline void |
| AddAggVectorizedData(AggState *aggstate, |
| AggStatePerGroup pergroup, VectorizedAggData *trans, |
| BatchAggGroupData *groupdata, bool *skip, int nrows) |
| { |
| int aggno; |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerGroup pergroupstate = &pergroup[aggno]; |
| trans[aggno].data = pergroupstate->transValue; |
| trans[aggno].isnull = pergroupstate->transValueIsNull; |
| trans[aggno].isnovalue = pergroupstate->noTransValue; |
| trans[aggno].groupData = groupdata; |
| trans[aggno].skip = skip; |
| trans[aggno].nrows = nrows; |
| trans[aggno].isalloc = false; |
| pergroupstate->transValueIsNull = false; |
| pergroupstate->noTransValue = false; |
| pergroupstate->transValue = PointerGetDatum(&(trans[aggno])); |
| } |
| |
| return; |
| } |
| |
| /* |
| * set skip array to filter some tuples |
| */ |
| static inline void |
| SetAggVectorizedSkip(AggState *aggstate, VectorizedAggData *trans, bool *skip, int nrows) |
| { |
| int aggno; |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| trans[aggno].skip = skip; |
| trans[aggno].nrows = nrows; |
| } |
| |
| return; |
| } |
| |
| /* |
| * remove the vectorized data struct |
| */ |
| static inline void |
| RemoveAggVectorizedData(AggState *aggstate, |
| AggStatePerGroup pergroup) |
| { |
| int aggno; |
| MemoryManagerContainer *mem_manager = &(aggstate->mem_manager); |
| |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerGroup pergroupstate = &pergroup[aggno]; |
| AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; |
| VectorizedAggData *trans = |
| (VectorizedAggData *) DatumGetPointer(pergroupstate->transValue); |
| |
| pergroupstate->transValueIsNull = trans->isnull; |
| pergroupstate->noTransValue = trans->isnovalue; |
| if(trans->isalloc) |
| pergroupstate->transValue = datumCopyWithMemManager(0, |
| trans->data, |
| peraggstate->transtypeByVal, |
| peraggstate->transtypeLen, |
| mem_manager); |
| else |
| pergroupstate->transValue = trans->data; |
| } |
| |
| return; |
| } |
| |
| |
| /* |
| * copied from src/backend/executor/execHHashagg.c |
| * agg_hash_table_stat_upd |
| * collect hash chain statistics for EXPLAIN ANALYZE |
| */ |
| static void |
| agg_hash_table_stat_upd(HashAggTable *ht) |
| { |
| unsigned int i; |
| |
| char hostname[SEGMENT_IDENTITY_NAME_LENGTH]; |
| gethostname(hostname,SEGMENT_IDENTITY_NAME_LENGTH); |
| for (i = 0; i < ht->nbuckets; i++) |
| { |
| HashAggEntry *entry = ht->buckets[i]; |
| int chainlength = 0; |
| |
| if (entry) |
| { |
| for (chainlength = 0; entry; chainlength++) |
| entry = entry->next; |
| |
| cdbexplain_agg_upd(&ht->chainlength, chainlength, i,hostname); |
| } |
| } |
| } /* agg_hash_table_stat_upd */ |
| |
| /* |
| * copied from src/backend/executor/nodeAgg.c |
| * |
| * agg_retrieve_scalar |
| * Compute the scalar aggregates. |
| * |
| */ |
| static TupleTableSlot * |
| agg_retrieve_scalar(AggState *aggstate) |
| { |
| AggStatePerAgg peragg = aggstate->peragg; |
| AggStatePerGroup pergroup = aggstate->pergroup ; |
| VectorizedState *vstate = ((PlanState*)aggstate)->vectorized; |
| VectorizedAggData *trans = (VectorizedAggData*)vstate->transdata; |
| |
| initialize_aggregates(aggstate, peragg, pergroup, &(aggstate->mem_manager)); |
| |
| /* |
| * In fact if there is no Group By clause, VectorizedData is not useful, |
| * we add it in order to keep compatibility. |
| */ |
| AddAggVectorizedData(aggstate, pergroup, trans, NULL, NULL, 0); |
| |
| /* |
| * We loop through input tuples, and compute the aggregates. |
| */ |
| while (!aggstate->agg_done) |
| { |
| ExprContext *tmpcontext = aggstate->tmpcontext; |
| /* Reset the per-input-tuple context */ |
| ResetExprContext(tmpcontext); |
| PlanState *outerPlan = outerPlanState(aggstate); |
| TupleTableSlot *outerslot = ExecProcNode(outerPlan); |
| TupleBatch tb = NULL; |
| if (TupIsNull(outerslot)) |
| { |
| aggstate->agg_done = true; |
| break; |
| } |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN); |
| CheckSendPlanStateGpmonPkt(&aggstate->ss.ps); |
| |
| tmpcontext->ecxt_scantuple = outerslot; |
| |
| tb = (TupleBatch)outerslot->PRIVATE_tb; |
| |
| SetAggVectorizedSkip(aggstate, trans, tb->skip, tb->nrows); |
| |
| /* |
| * In fact if there is no Group By clause, we can do the |
| * projection in the advance_vaggregates. |
| * we add it in order to keep compatibility. |
| */ |
| int aggno; |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; |
| |
| /* Evaluate the current input expressions for this aggregate */ |
| vstate->aggslot[aggno] = ExecVProject(peraggstate->evalproj, NULL); |
| } |
| |
| advance_vaggregates(aggstate, pergroup, &(aggstate->mem_manager)); |
| } |
| |
| RemoveAggVectorizedData(aggstate, pergroup); |
| |
| finalize_aggregates(aggstate, pergroup); |
| |
| |
| ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; |
| Agg *node = (Agg*)aggstate->ss.ps.plan; |
| econtext->grouping = node->grouping; |
| econtext->group_id = node->rollupGSTimes; |
| /* Check the qual (HAVING clause). */ |
| if (ExecQual(aggstate->ss.ps.qual, econtext, false)) |
| { |
| Gpmon_M_Incr_Rows_Out(GpmonPktFromAggState(aggstate)); |
| CheckSendPlanStateGpmonPkt(&aggstate->ss.ps); |
| |
| /* |
| * Form and return a projection tuple using the aggregate results |
| * and the representative input tuple. |
| */ |
| return ExecProject(aggstate->ss.ps.ps_ProjInfo, NULL); |
| } |
| return NULL; |
| } |
| |
| /* |
| * ExecAgg for non-hashed case. |
| */ |
| static TupleTableSlot * |
| agg_retrieve_direct(AggState *aggstate) |
| { |
| if (aggstate->agg_done) |
| { |
| return NULL; |
| } |
| |
| switch(aggstate->aggType) |
| { |
| case AggTypeScalar: |
| return agg_retrieve_scalar(aggstate); |
| |
| case AggTypeGroup: |
| case AggTypeIntermediateRollup: |
| case AggTypeFinalRollup: |
| default: |
| insist_log(false, "invalid Agg node: type %d", aggstate->aggType); |
| } |
| return NULL; |
| } |
| |
| |
| /* copy from src/backend/executor/execHHashagg.c*/ |
| static bool |
| agg_hash_initial_1pass(AggState *aggstate) |
| { |
| HashAggTable *hashtable = aggstate->hhashtable; |
| ExprContext *tmpcontext = aggstate->tmpcontext; /* per input tuple context */ |
| TupleTableSlot *outerslot = NULL; |
| bool streaming = ((Agg *) aggstate->ss.ps.plan)->streaming; |
| bool tuple_remaining = true; |
| MemTupleBinding *mt_bind = aggstate->hashslot->tts_mt_bind; |
| VectorizedState *vstate = ((PlanState*)aggstate)->vectorized; |
| VectorizedAggData *trans = (VectorizedAggData*)vstate->transdata; |
| |
| Assert(hashtable); |
| AssertImply(!streaming, hashtable->state == HASHAGG_BEFORE_FIRST_PASS); |
| elog(HHA_MSG_LVL, |
| "HashAgg: initial pass -- beginning to load hash table"); |
| |
| /* NOTE: Now we cannot use GUC parameters directly because vexecutor is a plug-in module */ |
| /* If we found cached workfiles, initialize and load the batch data here */ |
| if (gp_workfile_caching && aggstate->cached_workfiles_found) |
| { |
| elog(HHA_MSG_LVL, "Found existing SFS, reloading data from %s", hashtable->work_set->path); |
| /* Initialize all structures as if we just spilled everything */ |
| hashtable->spill_set = read_spill_set(aggstate); |
| aggstate->hhashtable->is_spilling = true; |
| aggstate->cached_workfiles_loaded = true; |
| |
| elog(gp_workfile_caching_loglevel, "HashAgg reusing cached workfiles, initiating Squelch walker"); |
| PlanState *outerNode = outerPlanState(aggstate); |
| ExecSquelchNode(outerNode); |
| |
| /* tuple table initialization */ |
| ScanState *scanstate = & aggstate->ss; |
| PlanState *outerPlan = outerPlanState(scanstate); |
| TupleDesc tupDesc = ExecGetResultType(outerPlan); |
| |
| if (aggstate->ss.ps.instrument) |
| { |
| aggstate->ss.ps.instrument->workfileReused = true; |
| } |
| |
| /* Initialize hashslot by cloning input slot. */ |
| ExecSetSlotDescriptor(aggstate->hashslot, tupDesc); |
| ExecStoreAllNullTuple(aggstate->hashslot); |
| mt_bind = aggstate->hashslot->tts_mt_bind; |
| |
| |
| return tuple_remaining; |
| } |
| |
| /* |
| * Check if an input tuple has been read, but not processed |
| * because of lack of space before streaming the results |
| * in the last call. |
| */ |
| if (aggstate->hashslot->tts_tupleDescriptor != NULL && |
| hashtable->prev_slot != NULL) |
| { |
| outerslot = hashtable->prev_slot; |
| hashtable->prev_slot = NULL; |
| } |
| else |
| { |
| outerslot = ExecProcNode(outerPlanState(aggstate)); |
| } |
| |
| /* |
| * Process outer-plan tuples, until we exhaust the outer plan. |
| */ |
| hashtable->pass = 0; |
| |
| while(true) |
| { |
| HashKey hashkey; |
| bool isNew; |
| HashAggEntry *entry; |
| TupleBatch tb; |
| int i = 0; |
| BatchAggGroupData *agg_groupdata; |
| GroupData *cur_header = NULL; |
| |
| /* no more tuple. Done */ |
| if (TupIsNull(outerslot)) |
| { |
| tuple_remaining = false; |
| break; |
| } |
| |
| Assert(NULL != vstate->batchGroupData); |
| Assert(NULL != vstate->groupData); |
| Assert(NULL != vstate->indexList); |
| memset(vstate->batchGroupData, 0, sizeof(BatchAggGroupData)); |
| memset(vstate->groupData, 0, sizeof(GroupData) * BATCHSIZE); |
| memset(vstate->indexList, -1, sizeof(int) * BATCHSIZE); |
| |
| agg_groupdata = vstate->batchGroupData; |
| agg_groupdata->group_header = vstate->groupData; |
| agg_groupdata->idx_list = vstate->indexList; |
| |
| tb = (TupleBatch)outerslot->PRIVATE_tb; |
| |
| if(NULL == tb || tb->nrows == 0) |
| { |
| tuple_remaining = false; |
| break; |
| } |
| |
| /* |
| * we have to convert the vectorized tuple to non-vectorized tuple, |
| * and process it one by one. |
| * the initialize value of i is -1, because outerslot may have no |
| * valid tuple(all tuple can not pass the qualification), then -1 |
| * indicate that no valid tuple. if there have valid tuple, we will |
| * reset the i to correct value. |
| */ |
| |
| for (i = -1; i < tb->nrows; i++) |
| { |
| if(!VirtualNodeProc(outerslot)) |
| break; |
| |
| /* set the correct value */ |
| i = tb->iter - 1; |
| |
| Gpmon_M_Incr(GpmonPktFromAggState(aggstate), GPMON_QEXEC_M_ROWSIN); |
| |
| if (aggstate->hashslot->tts_tupleDescriptor == NULL) |
| { |
| int size; |
| |
| /* Initialize hashslot by cloning input slot. */ |
| ExecSetSlotDescriptor(aggstate->hashslot, outerslot->tts_tupleDescriptor); |
| ExecStoreAllNullTuple(aggstate->hashslot); |
| mt_bind = aggstate->hashslot->tts_mt_bind; |
| |
| size = ((Agg *)aggstate->ss.ps.plan)->numCols * sizeof(HashKey); |
| |
| hashtable->hashkey_buf = (HashKey *)palloc0(size); |
| hashtable->mem_for_metadata += size; |
| } |
| |
| /* set up for advance_aggregates call */ |
| tmpcontext->ecxt_scantuple = outerslot; |
| |
| /* Find or (if there's room) build a hash table entry for the |
| * input tuple's group. */ |
| hashkey = calc_hash_value(aggstate, outerslot); |
| entry = lookup_agg_hash_entry(aggstate, (void *)outerslot, |
| INPUT_RECORD_TUPLE, 0, hashkey, 0, &isNew); |
| |
| if (entry == NULL) |
| { |
| if (GET_TOTAL_USED_SIZE(hashtable) > hashtable->mem_used) |
| hashtable->mem_used = GET_TOTAL_USED_SIZE(hashtable); |
| |
| if (hashtable->num_ht_groups <= 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| ERRMSG_GP_INSUFFICIENT_STATEMENT_MEMORY)); |
| |
| /* |
| * If stream_bottom is on, we store outerslot into hashslot, so that |
| * we can process it later. |
| */ |
| if (streaming) |
| { |
| Assert(tuple_remaining); |
| hashtable->prev_slot = outerslot; |
| break; |
| } |
| |
| /* CDB: Report statistics for EXPLAIN ANALYZE. */ |
| if (!hashtable->is_spilling && aggstate->ss.ps.instrument) |
| agg_hash_table_stat_upd(hashtable); |
| |
| spill_hash_table(aggstate); |
| |
| entry = lookup_agg_hash_entry(aggstate, (void *)outerslot, |
| INPUT_RECORD_TUPLE, 0, hashkey, 0, &isNew); |
| } |
| |
| if (isNew) |
| { |
| int tup_len = memtuple_get_size((MemTuple)entry->tuple_and_aggs, mt_bind); |
| setGroupAggs(hashtable, mt_bind, entry); |
| MemSet((char *)entry->tuple_and_aggs + MAXALIGN(tup_len), 0, |
| aggstate->numaggs * sizeof(AggStatePerGroupData)); |
| initialize_aggregates(aggstate, aggstate->peragg, hashtable->groupaggs->aggs, |
| &(aggstate->mem_manager)); |
| } |
| |
| //find current group_header if exists, just O(n) find |
| for (int j = 0; j < agg_groupdata->group_cnt; j++) |
| { |
| cur_header = NULL; |
| if (agg_groupdata->group_header[j].entry == entry) |
| { |
| cur_header = &(agg_groupdata->group_header[j]); |
| break; |
| } |
| } |
| |
| if (cur_header == NULL) |
| { |
| // add a new group header |
| agg_groupdata->group_header[agg_groupdata->group_cnt].idx = i; |
| agg_groupdata->group_header[agg_groupdata->group_cnt].entry = entry; |
| agg_groupdata->group_cnt++; |
| } |
| else |
| { |
| //group header already exists, just insert the current tuple to the "neck" |
| agg_groupdata->idx_list[i] = cur_header->idx; |
| cur_header->idx = i; |
| } |
| } |
| |
| /* |
| * if i == -1, it indicate that all tuple in outerslot is invalid, |
| * we need not to process it. |
| */ |
| if(-1 != i) |
| { |
| tmpcontext->ecxt_scantuple = outerslot; |
| |
| /* To avoid wasteful duplication of work, we do the projection here */ |
| int aggno; |
| for (aggno = 0; aggno < aggstate->numaggs; aggno++) |
| { |
| AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; |
| |
| /* Evaluate the current input expressions for this aggregate */ |
| vstate->aggslot[aggno] = ExecVProject(peraggstate->evalproj, NULL); |
| } |
| |
| /* we have known the group counts, so we process it one by one. */ |
| for (int i = 0; i < agg_groupdata->group_cnt; i++) { |
| GroupData *cur_header = &(agg_groupdata->group_header[i]); |
| agg_groupdata->group_idx = i; |
| |
| //set hashtable->groupaggs to the agg_hash_entry |
| setGroupAggs(hashtable, aggstate->hashslot->tts_mt_bind, cur_header->entry); |
| |
| /* HACK... */ |
| AddAggVectorizedData(aggstate, hashtable->groupaggs->aggs, trans, agg_groupdata, tb->skip, tb->nrows); |
| advance_vaggregates(aggstate, hashtable->groupaggs->aggs, &(aggstate->mem_manager)); |
| RemoveAggVectorizedData(aggstate, hashtable->groupaggs->aggs); |
| |
| } |
| |
| /* it is batch count now */ |
| hashtable->num_tuples++; |
| |
| /* Reset per-input-tuple context after each tuple */ |
| ResetExprContext(tmpcontext); |
| |
| if (streaming && !HAVE_FREESPACE(hashtable)) |
| { |
| Assert(tuple_remaining); |
| ExecClearTuple(aggstate->hashslot); |
| break; |
| } |
| } |
| |
| /* Read the next tuple */ |
| outerslot = ExecProcNode(outerPlanState(aggstate)); |
| } |
| |
| if (GET_TOTAL_USED_SIZE(hashtable) > hashtable->mem_used) |
| hashtable->mem_used = GET_TOTAL_USED_SIZE(hashtable); |
| |
| if (hashtable->is_spilling) |
| { |
| int freed_size = 0; |
| |
| /* |
| * Split out the rest of groups in the hashtable if spilling has already |
| * happened. This is because none of these groups can be immediately outputted |
| * any more. |
| */ |
| spill_hash_table(aggstate); |
| freed_size = suspendSpillFiles(hashtable->spill_set); |
| hashtable->mem_for_metadata -= freed_size; |
| |
| if (aggstate->ss.ps.instrument) |
| { |
| aggstate->ss.ps.instrument->workfileCreated = true; |
| } |
| } |
| |
| /* CDB: Report statistics for EXPLAIN ANALYZE. */ |
| if (!hashtable->is_spilling && aggstate->ss.ps.instrument) |
| agg_hash_table_stat_upd(hashtable); |
| |
| AssertImply(tuple_remaining, streaming); |
| if(tuple_remaining) |
| elog(HHA_MSG_LVL, "HashAgg: streaming out the intermediate results."); |
| |
| return tuple_remaining; |
| } |
| |
| /* |
| * copy from src/backend/executor/nodeAgg.c |
| * ExecAggExplainEnd |
| * Called before ExecutorEnd to finish EXPLAIN ANALYZE reporting. |
| */ |
| static void |
| ExecAggExplainEnd(PlanState *planstate, struct StringInfoData *buf) |
| { |
| AggState *aggstate = (AggState *)planstate; |
| |
| /* Report executor memory used by our memory context. */ |
| planstate->instrument->execmemused += |
| (double)MemoryContextGetPeakSpace(aggstate->aggcontext); |
| } /* ExecAggExplainEnd */ |
| |
| /* |
| * ExecAgg - copy from src/backend/executor/nodeAgg.c |
| */ |
| |
| TupleTableSlot * |
| ExecVAgg(AggState *node) |
| { |
| if (node->agg_done) |
| { |
| ExecEagerFreeAgg(node); |
| return NULL; |
| } |
| |
| |
| if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) |
| { |
| TupleTableSlot *tuple = NULL; |
| bool streaming = ((Agg *) node->ss.ps.plan)->streaming; |
| |
| /* |
| * ExecAgg processing for hashed aggregation -- returns the |
| * next result tuple or NULL. When returning NULL also sets |
| * aggstate to prevent future calls. |
| */ |
| |
| if (node->hhashtable == NULL) |
| { |
| bool tupremain; |
| |
| node->hhashtable = create_agg_hash_table(node); |
| tupremain = agg_hash_initial_1pass(node); |
| |
| if ( streaming ) |
| { |
| if ( tupremain ) |
| node->hhashtable->state = HASHAGG_STREAMING; |
| else |
| node->hhashtable->state = HASHAGG_END_OF_PASSES; |
| } |
| else |
| node->hhashtable->state = HASHAGG_BETWEEN_PASSES; |
| } |
| |
| |
| |
| /* On each call we either return a tuple corresponding to a hash |
| * entry (consuming the entry) or fall through to a state machine |
| * that tries to make additional hash entries available and continue |
| * the loop. (This may result in reaching the "exit" state and |
| * returning a NULL tuple). |
| */ |
| for (;;) |
| { |
| if (!node->hhashtable->is_spilling) |
| { |
| |
| tuple = agg_retrieve_hash_table(node); |
| node->agg_done = false; /* Not done 'til batches used up. */ |
| |
| if (tuple != NULL) |
| return tuple; |
| } |
| |
| switch (node->hhashtable->state) |
| { |
| case HASHAGG_BETWEEN_PASSES: |
| Assert(!streaming); |
| if (agg_hash_next_pass(node)) |
| { |
| node->hhashtable->state = HASHAGG_BETWEEN_PASSES; |
| continue; |
| } |
| node->hhashtable->state = HASHAGG_END_OF_PASSES; |
| /* |
| * pass through. Be sure that the next case statment |
| * is HASHAGG_END_OF_PASSES. |
| */ |
| |
| case HASHAGG_END_OF_PASSES: |
| node->agg_done = true; |
| if (gp_workfile_caching && node->workfiles_created) |
| { |
| /* |
| * HashAgg closes each spill file after it is done with |
| * them. Since we got here on the regular path, all |
| * files should be closed. |
| */ |
| Assert(node->hhashtable->work_set); |
| Assert(node->hhashtable->spill_set == NULL); |
| agg_hash_close_state_file(node->hhashtable); |
| agg_hash_mark_spillset_complete(node); |
| } |
| ExecEagerFreeAgg(node); |
| return NULL; |
| |
| case HASHAGG_STREAMING: |
| Assert(streaming); |
| /*expand agg_hash_stream */ |
| reset_agg_hash_table(node); |
| if (!agg_hash_initial_1pass(node)) |
| node->hhashtable->state = HASHAGG_END_OF_PASSES; |
| continue; |
| |
| case HASHAGG_BEFORE_FIRST_PASS: |
| default: |
| elog(ERROR,"hybrid hash aggregation sequencing error"); |
| } |
| } |
| } |
| else |
| { |
| return agg_retrieve_direct(node); |
| } |
| } |
| |
| /* |
| * getAggType |
| * Get the aggType for the given Agg node. |
| * |
| * We should really store the type in the Agg struct, and let the planner set the |
| * correct type. As an intermediate step, we compute the type here. |
| */ |
| static int |
| getAggType(Agg *node) |
| { |
| int aggType = AggTypeScalar; |
| if (node->numCols > 0 && |
| (node->lastAgg && |
| (node->inputHasGrouping && node->numNullCols > 0))) |
| { |
| aggType = AggTypeFinalRollup; |
| } |
| else if (node->numCols > 0 && |
| (node->inputHasGrouping && node->numNullCols > 0)) |
| { |
| aggType = AggTypeIntermediateRollup; |
| } |
| else if (node->numCols > 0) |
| { |
| aggType = AggTypeGroup; |
| } |
| else |
| { |
| insist_log(node->aggstrategy == AGG_PLAIN, "wrong Agg strategy: %d", node->aggstrategy); |
| } |
| |
| return aggType; |
| } |
| |
| |
| extern Datum ExecEvalVar(ExprState *exprstate, ExprContext *econtext, |
| bool *isNull, ExprDoneCond *isDone); |
| |
| /* |
| * Copied from src/backend/executor/nodeAgg.c |
| * |
| * We need convert some vectorized tupleDesc to non-vectorzied tupleDesc. |
| */ |
| AggState * |
| VExecInitAgg(Agg *node, EState *estate, int eflags) |
| { |
| AggState *aggstate; |
| AggStatePerAgg peragg; |
| Plan *outerPlan; |
| ExprContext *econtext; |
| int numaggs, |
| aggno; |
| ListCell *l; |
| List *nagglist = NULL; |
| |
| /* check for unsupported flags */ |
| Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
| |
| /* |
| * create state structure |
| */ |
| aggstate = makeNode(AggState); |
| aggstate->ss.ps.plan = (Plan *) node; |
| aggstate->ss.ps.state = estate; |
| |
| aggstate->aggs = NIL; |
| aggstate->numaggs = 0; |
| aggstate->eqfunctions = NULL; |
| aggstate->hashfunctions = NULL; |
| aggstate->peragg = NULL; |
| aggstate->agg_done = false; |
| aggstate->pergroup = NULL; |
| aggstate->grp_firstTuple = NULL; |
| aggstate->hashtable = NULL; |
| agg_hash_reset_workfile_state(aggstate); |
| |
| /* |
| * Create expression contexts. We need two, one for per-input-tuple |
| * processing and one for per-output-tuple processing. We cheat a little |
| * by using ExecAssignExprContext() to build both. |
| */ |
| ExecAssignExprContext(estate, &aggstate->ss.ps); |
| aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; |
| ExecAssignExprContext(estate, &aggstate->ss.ps); |
| |
| /* |
| * We also need a long-lived memory context for holding hashtable data |
| * structures and transition values. NOTE: the details of what is stored |
| * in aggcontext and what is stored in the regular per-query memory |
| * context are driven by a simple decision: we want to reset the |
| * aggcontext in ExecReScanAgg to recover no-longer-wanted space. |
| */ |
| aggstate->aggcontext = |
| AllocSetContextCreate(CurrentMemoryContext, |
| "AggContext", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| #define AGG_NSLOTS 3 |
| |
| /* |
| * tuple table initialization |
| */ |
| aggstate->ss.ss_ScanTupleSlot = ExecInitExtraTupleSlot(estate); |
| ExecInitResultTupleSlot(estate, &aggstate->ss.ps); |
| aggstate->hashslot = ExecInitExtraTupleSlot(estate); |
| |
| /* |
| * initialize child expressions |
| * |
| * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs |
| * contain other agg calls in their arguments. This would make no sense |
| * under SQL semantics anyway (and it's forbidden by the spec). Because |
| * that is true, we don't need to worry about evaluating the aggs in any |
| * particular order. |
| */ |
| /* |
| * if there are aggregate functions in the target list, and the argument |
| * of the aggregate functions are expression(such as operators),when we |
| * initialize the expression, the argument should set to be vectorized |
| * and the result of aggregate functions should set to be non-vectorzied. |
| * we initialize the targetlist two times here, the first time, we get |
| * the non-vectorized target list, then we reset the aggregate in |
| * aggstate and get the vectorized aggregate functions. |
| */ |
| /* get the non-vectorized expressions */ |
| aggstate->ss.ps.plan->vectorized = false; |
| aggstate->ss.ps.targetlist = (List *) |
| ExecInitExpr((Expr *) node->plan.targetlist, |
| (PlanState *) aggstate); |
| aggstate->ss.ps.plan->vectorized = true; |
| |
| /* |
| * reset the aggregate functions, |
| * NOTE: we have to save the aggregate function list, |
| * because the the aggregate functions in the non-vectrozied |
| * target list too |
| */ |
| nagglist = aggstate->aggs; |
| aggstate->aggs = NULL; |
| aggstate->numaggs = 0; |
| |
| /* |
| * it is the second time to initialize the expression, we don't care |
| * about the result of ExecInitExpr, it is only used to initialize the |
| * vectorized argument of the aggregate functions. |
| */ |
| ExecInitExpr((Expr *) node->plan.targetlist, |
| (PlanState *) aggstate); |
| |
| aggstate->ss.ps.qual = (List *) |
| ExecInitExpr((Expr *) node->plan.qual, |
| (PlanState *) aggstate); |
| |
| /* |
| * CDB: Offer extra info for EXPLAIN ANALYZE. |
| */ |
| if (estate->es_instrument) |
| { |
| /* Allocate string buffer. */ |
| aggstate->ss.ps.cdbexplainbuf = makeStringInfo(); |
| |
| /* Request a callback at end of query. */ |
| aggstate->ss.ps.cdbexplainfun = ExecAggExplainEnd; |
| } |
| |
| /* |
| * initialize child nodes |
| */ |
| outerPlan = outerPlan(node); |
| if (IsA(outerPlan, ExternalScan)) { |
| /* |
| * Hack to indicate to PXF when there is an external scan |
| */ |
| if (list_length(aggstate->aggs) == 1) { |
| AggrefExprState *aggrefstate = (AggrefExprState *) linitial(aggstate->aggs); |
| Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; |
| //Only dealing with one agg |
| if (aggref->aggfnoid == COUNT_ANY_OID || aggref->aggfnoid == COUNT_STAR_OID) { |
| eflags |= EXEC_FLAG_EXTERNAL_AGG_COUNT; |
| } |
| } |
| } |
| outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); |
| |
| /* |
| * initialize source tuple type. |
| */ |
| ExecAssignScanTypeFromOuterPlan(&aggstate->ss); |
| TupleDesc tdesc = ((ScanState*)aggstate)->ss_ScanTupleSlot->tts_tupleDescriptor; |
| BackportTupleDescriptor((PlanState*)aggstate, tdesc); |
| ExecSetSlotDescriptor(((ScanState*)aggstate)->ss_ScanTupleSlot, tdesc); |
| /* |
| * Initialize result tuple type and projection info. |
| */ |
| ExecAssignResultTypeFromTL(&aggstate->ss.ps); |
| TupleDesc ttupDesc = CreateTupleDescCopy(((PlanState*)aggstate)->ps_ResultTupleSlot->tts_tupleDescriptor); |
| |
| BackportTupleDescriptor((PlanState*)aggstate, ttupDesc); |
| ExecAssignResultType((PlanState*)aggstate, ttupDesc); |
| |
| ExecAssignProjectionInfo(&aggstate->ss.ps, NULL); |
| |
| /* |
| * get the count of aggregates in targetlist and quals |
| */ |
| numaggs = aggstate->numaggs; |
| Assert(numaggs == list_length(aggstate->aggs) + list_length(aggstate->percs)); |
| if (numaggs <= 0) |
| { |
| /* |
| * This is not an error condition: we might be using the Agg node just |
| * to do hash-based grouping. Even in the regular case, |
| * constant-expression simplification could optimize away all of the |
| * Aggrefs in the targetlist and qual. So keep going, but force local |
| * copy of numaggs positive so that palloc()s below don't choke. |
| */ |
| numaggs = 1; |
| } |
| |
| /* |
| * If we are grouping, precompute fmgr lookup data for inner loop. We need |
| * both equality and hashing functions to do it by hashing, but only |
| * equality if not hashing. |
| */ |
| if (node->numCols > 0) |
| { |
| TupleDesc desc = ExecGetScanType(&aggstate->ss); |
| desc = CreateTupleDescCopy(desc); |
| BackportTupleDescriptor((PlanState*)aggstate, desc); |
| |
| if (node->aggstrategy == AGG_HASHED) |
| { |
| execTuplesHashPrepare(desc, |
| node->numCols, |
| node->grpColIdx, |
| &aggstate->eqfunctions, |
| &aggstate->hashfunctions); |
| } |
| else |
| aggstate->eqfunctions = |
| execTuplesMatchPrepare(desc, |
| node->numCols, |
| node->grpColIdx); |
| } |
| |
| /* |
| * Set up aggregate-result storage in the output expr context, and also |
| * allocate my private per-agg working storage |
| */ |
| econtext = aggstate->ss.ps.ps_ExprContext; |
| econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); |
| econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); |
| |
| peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); |
| aggstate->peragg = peragg; |
| |
| if (node->aggstrategy == AGG_HASHED) |
| { |
| aggstate->hash_needed = get_agg_hash_collist(aggstate); |
| } |
| else |
| { |
| AggStatePerGroup pergroup; |
| |
| pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); |
| aggstate->pergroup = pergroup; |
| } |
| |
| /* |
| * set the NO of aggregate functions, it is used in the non-vectorzied |
| * target list. |
| */ |
| aggno = -1; |
| foreach(l, nagglist) |
| { |
| AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l); |
| Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; |
| AggStatePerAgg peraggstate; |
| int i; |
| |
| /* Planner should have assigned aggregate to correct level */ |
| Assert(aggref->agglevelsup == 0); |
| |
| /* Look for a previous duplicate aggregate */ |
| for (i = 0; i <= aggno; i++) |
| { |
| if (equal(aggref, peragg[i].aggref) && |
| !contain_volatile_functions((Node *) aggref)) |
| break; |
| } |
| if (i <= aggno) |
| { |
| /* Found a match to an existing entry, so just mark it */ |
| aggrefstate->aggno = i; |
| continue; |
| } |
| |
| /* Nope, so assign a new PerAgg record */ |
| peragg[++aggno].aggref = aggref; |
| |
| /* Mark Aggref state node with assigned index in the result array */ |
| aggrefstate->aggno = aggno; |
| } |
| |
| /* |
| * Perform lookups of aggregate function info, and initialize the |
| * unchanging fields of the per-agg data. We also detect duplicate |
| * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). When |
| * duplicates are detected, we only make an AggStatePerAgg struct for the |
| * first one. The clones are simply pointed at the same result entry by |
| * giving them duplicate aggno values. |
| */ |
| aggno = -1; |
| foreach(l, aggstate->aggs) |
| { |
| AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l); |
| Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; |
| AggStatePerAgg peraggstate; |
| List *inputTargets = NIL; |
| List *inputSortClauses = NIL; |
| Oid *inputTypes = NULL; |
| int numInputs; |
| int numArguments; |
| int numSortCols; |
| List *sortlist; |
| HeapTuple aggTuple; |
| Form_pg_aggregate aggform; |
| Oid aggtranstype; |
| AclResult aclresult; |
| Oid transfn_oid = InvalidOid, |
| finalfn_oid = InvalidOid; |
| Expr *transfnexpr, |
| *finalfnexpr, |
| *prelimfnexpr; |
| Datum textInitVal; |
| int i; |
| ListCell *lc; |
| cqContext *pcqCtx; |
| |
| /* Planner should have assigned aggregate to correct level */ |
| Assert(aggref->agglevelsup == 0); |
| |
| /* Look for a previous duplicate aggregate */ |
| for (i = 0; i <= aggno; i++) |
| { |
| if (equal(aggref, peragg[i].aggref) && |
| !contain_volatile_functions((Node *) aggref)) |
| break; |
| } |
| if (i <= aggno) |
| { |
| /* Found a match to an existing entry, so just mark it */ |
| aggrefstate->aggno = i; |
| continue; |
| } |
| |
| /* Nope, so assign a new PerAgg record */ |
| peraggstate = &peragg[++aggno]; |
| |
| /* Mark Aggref state node with assigned index in the result array */ |
| aggrefstate->aggno = aggno; |
| |
| /* Fill in the peraggstate data */ |
| peraggstate->aggrefstate = aggrefstate; |
| peraggstate->aggref = aggref; |
| numArguments = list_length(aggref->args); |
| peraggstate->numArguments = numArguments; |
| |
| /* |
| * Use these information from ExecInitExpr for per agg info. |
| */ |
| inputTargets = aggrefstate->inputTargets; |
| inputSortClauses = aggrefstate->inputSortClauses; |
| numInputs = list_length(inputTargets); |
| numSortCols = list_length(inputSortClauses); |
| |
| peraggstate->numSortCols = numSortCols; |
| peraggstate->numInputs = numInputs; |
| |
| /* MPP has some restrictions. */ |
| Assert(!(aggref->aggdistinct && aggref->aggorder)); |
| Assert(numArguments == 1 || !aggref->aggdistinct); |
| |
| /* Get actual datatypes of the inputs. These could be different from |
| * the agg's declared input types, when the agg accepts ANY, ANYARRAY |
| * or ANYELEMENT. The result will have argument types at 0 through |
| * numArguments-1 and sort key types mixed in or at numArguments through |
| * numInputs. |
| */ |
| inputTypes = (Oid*)palloc0(sizeof(Oid)*(numInputs)); |
| i = 0; |
| foreach(lc, inputTargets) |
| { |
| TargetEntry *tle = (TargetEntry *) lfirst(lc); |
| inputTypes[i++] = exprType((Node*)tle->expr); |
| } |
| |
| pcqCtx = caql_beginscan( |
| NULL, |
| cql("SELECT * FROM pg_aggregate " |
| " WHERE aggfnoid = :1 ", |
| ObjectIdGetDatum(aggref->aggfnoid))); |
| |
| aggTuple = caql_getnext(pcqCtx); |
| |
| if (!HeapTupleIsValid(aggTuple)) |
| elog(ERROR, "cache lookup failed for aggregate %u", |
| aggref->aggfnoid); |
| aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); |
| |
| /* Check permission to call aggregate function */ |
| aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), |
| ACL_EXECUTE); |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, ACL_KIND_PROC, |
| get_func_name(aggref->aggfnoid)); |
| |
| switch ( aggref->aggstage) /* MPP */ |
| { |
| case AGGSTAGE_NORMAL: /* Single-stage aggregation */ |
| peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; |
| peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; |
| break; |
| |
| case AGGSTAGE_PARTIAL:/* Two-stage aggregation -- preliminary stage */ |
| /* the perliminary stage for two-stage aggregation */ |
| peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; |
| peraggstate->finalfn_oid = finalfn_oid = InvalidOid; |
| break; |
| |
| case AGGSTAGE_INTERMEDIATE: |
| peraggstate->transfn_oid = transfn_oid = aggform->aggprelimfn; |
| peraggstate->finalfn_oid = finalfn_oid = InvalidOid; |
| break; |
| |
| case AGGSTAGE_FINAL: /* Two-stage aggregation - final stage */ |
| peraggstate->transfn_oid = transfn_oid = aggform->aggprelimfn; |
| peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; |
| break; |
| } |
| |
| peraggstate->prelimfn_oid = aggform->aggprelimfn; |
| |
| /* Check that aggregate owner has permission to call component fns */ |
| { |
| Oid aggOwner; |
| int fetchCount; |
| |
| aggOwner = caql_getoid_plus( |
| NULL, |
| &fetchCount, |
| NULL, |
| cql("SELECT proowner FROM pg_proc " |
| " WHERE oid = :1 ", |
| ObjectIdGetDatum(aggref->aggfnoid))); |
| |
| if (!fetchCount) |
| elog(ERROR, "cache lookup failed for function %u", |
| aggref->aggfnoid); |
| |
| aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, |
| ACL_EXECUTE); |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, ACL_KIND_PROC, |
| get_func_name(transfn_oid)); |
| if (OidIsValid(finalfn_oid)) |
| { |
| aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, |
| ACL_EXECUTE); |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, ACL_KIND_PROC, |
| get_func_name(finalfn_oid)); |
| } |
| if (OidIsValid(peraggstate->prelimfn_oid)) |
| { |
| aclresult = pg_proc_aclcheck(peraggstate->prelimfn_oid, aggOwner, |
| ACL_EXECUTE); |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, ACL_KIND_PROC, |
| get_func_name(peraggstate->prelimfn_oid)); |
| } |
| } |
| |
| /* check if the transition type is polymorphic and if so resolve it */ |
| aggtranstype = resolve_polymorphic_transtype(aggform->aggtranstype, |
| aggref->aggfnoid, |
| inputTypes); |
| |
| /* build expression trees using actual argument & result types */ |
| build_aggregate_fnexprs(inputTypes, |
| numArguments, |
| aggtranstype, |
| aggref->aggtype, |
| transfn_oid, |
| finalfn_oid, |
| InvalidOid /* prelim */, |
| InvalidOid /* invtrans */, |
| InvalidOid /* invprelim */, |
| &transfnexpr, |
| &finalfnexpr, |
| &prelimfnexpr, |
| NULL, |
| NULL); |
| |
| fmgr_info(transfn_oid, &peraggstate->transfn); |
| peraggstate->transfn.fn_expr = (Node *) transfnexpr; |
| |
| if (OidIsValid(finalfn_oid)) |
| { |
| fmgr_info(finalfn_oid, &peraggstate->finalfn); |
| peraggstate->finalfn.fn_expr = (Node *) finalfnexpr; |
| } |
| |
| if (OidIsValid(peraggstate->prelimfn_oid)) |
| { |
| fmgr_info(peraggstate->prelimfn_oid, &peraggstate->prelimfn); |
| peraggstate->prelimfn.fn_expr = (Node *) prelimfnexpr; |
| } |
| |
| get_typlenbyval(aggref->aggtype, |
| &peraggstate->resulttypeLen, |
| &peraggstate->resulttypeByVal); |
| get_typlenbyval(aggtranstype, |
| &peraggstate->transtypeLen, |
| &peraggstate->transtypeByVal); |
| |
| /* |
| * initval is potentially null, so don't try to access it as a struct |
| * field. Must do it the hard way with caql_getattr |
| */ |
| textInitVal = caql_getattr(pcqCtx, |
| Anum_pg_aggregate_agginitval, |
| &peraggstate->initValueIsNull); |
| |
| if (peraggstate->initValueIsNull) |
| peraggstate->initValue = (Datum) 0; |
| else |
| peraggstate->initValue = GetAggInitVal(textInitVal, |
| aggtranstype); |
| |
| /* |
| * If the transfn is strict and the initval is NULL, make sure input |
| * type and transtype are the same (or at least binary-compatible), so |
| * that it's OK to use the first input value as the initial |
| * transValue. This should have been checked at agg definition time, |
| * but just in case... |
| */ |
| if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) |
| { |
| if (numArguments < 1 || |
| !IsBinaryCoercible(inputTypes[0], aggtranstype)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), |
| errmsg("aggregate %u needs to have compatible input type and transition type", |
| aggref->aggfnoid))); |
| } |
| |
| /* |
| * Get a tupledesc corresponding to the inputs (including sort |
| * expressions) of the agg. |
| */ |
| peraggstate->evaldesc = ExecTypeFromTL(inputTargets, false); |
| |
| /* Create slot we're going to do argument evaluation in */ |
| peraggstate->evalslot = ExecInitExtraTupleSlot(estate); |
| ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc); |
| |
| /* Set up projection info for evaluation */ |
| peraggstate->evalproj = ExecBuildProjectionInfo(aggrefstate->args, |
| aggstate->tmpcontext, |
| peraggstate->evalslot, |
| NULL); |
| |
| /* |
| * If we're doing either DISTINCT or ORDER BY, then we have a list of |
| * SortGroupClause nodes; fish out the data in them and stick them |
| * into arrays. |
| * |
| * Note that by construction, if there is a DISTINCT clause then the |
| * ORDER BY clause is a prefix of it (see transformDistinctClause). |
| */ |
| if (aggref->aggdistinct) |
| { |
| TargetEntry *tle; |
| SortClause *sc; |
| Oid eq_function; |
| |
| /* |
| * GPDB 4 doesh't implement DISTINCT aggs for aggs having more than |
| * than one argument, nor does it allow an ordered aggregate to |
| * specify distinct, but PG 9 does. The SQL standard allows the |
| * one-arg-for-DISTINCT restriction, but we really we ought to |
| * implement it the way PG 9 does eventually. |
| * |
| * For now we use the scalar equalfn field of AggStatePerAggData |
| * for DQAs instead of treating DQAs more generally. |
| */ |
| if (numArguments != 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("DISTINCT is supported only for single-argument aggregates"))); |
| |
| eq_function = equality_oper_funcid(inputTypes[0]); |
| fmgr_info(eq_function, &(peraggstate->equalfn)); |
| |
| tle = (TargetEntry*)linitial(inputTargets); |
| tle->ressortgroupref = 1; |
| |
| sc = makeNode(SortClause); |
| sc->tleSortGroupRef = tle->ressortgroupref; |
| sc->sortop = ordering_oper_opid(inputTypes[0]); |
| |
| sortlist = list_make1(sc); |
| numSortCols = 1; |
| } |
| else if ( aggref->aggorder ) |
| { |
| sortlist = aggref->aggorder->sortClause; |
| numSortCols = list_length(sortlist); |
| } |
| else |
| { |
| sortlist = NULL; |
| numSortCols = 0; |
| } |
| |
| |
| peraggstate->numSortCols = numSortCols; |
| |
| if (numSortCols > 0) |
| { |
| /* |
| * We don't implement DISTINCT or ORDER BY aggs in the HASHED case |
| * (yet) |
| */ |
| Assert(node->aggstrategy != AGG_HASHED); |
| |
| /* If we have only one input, we need its len/byval info. */ |
| if (numInputs == 1) |
| { |
| get_typlenbyval(inputTypes[0], |
| &peraggstate->inputtypeLen, |
| &peraggstate->inputtypeByVal); |
| } |
| |
| /* Extract the sort information for use later */ |
| peraggstate->sortColIdx = |
| (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber)); |
| peraggstate->sortOperators = |
| (Oid *) palloc(numSortCols * sizeof(Oid)); |
| |
| i = 0; |
| foreach(lc, sortlist) |
| { |
| SortClause *sortcl = (SortClause *) lfirst(lc); |
| TargetEntry *tle = get_sortgroupclause_tle(sortcl, |
| inputTargets); |
| |
| /* the parser should have made sure of this */ |
| Assert(OidIsValid(sortcl->sortop)); |
| |
| peraggstate->sortColIdx[i] = tle->resno; |
| peraggstate->sortOperators[i] = sortcl->sortop; |
| i++; |
| } |
| Assert(i == numSortCols); |
| } |
| |
| if (aggref->aggdistinct) |
| { |
| Oid eqfunc; |
| |
| Assert(numArguments == 1); |
| Assert(numSortCols == 1); |
| |
| /* |
| * We need the equal function for the DISTINCT comparison we will |
| * make. |
| */ |
| eqfunc = equality_oper_funcid(inputTypes[0]); |
| fmgr_info(eqfunc, &peraggstate->equalfn); |
| } |
| |
| caql_endscan(pcqCtx); |
| } |
| |
| /* |
| * Process percentile expressions. These are treated separately from |
| * Aggref expressions at the moment as we cannot change the catalog, but |
| * this will be incorporated into the existing Agggref architecture |
| * when we can change the catalog. The operation for percentile functions |
| * is very similar to the Aggref operation except that there is no |
| * function oid for transition function. We manually manupilate |
| * FmgrInfo without the oid. |
| * In case the Agg handles PercentileExpr, there shouldn't be Aggref |
| * in conjunction with PercentileExpr in the target list (and havingQual), |
| * or vice versa, from the current design of percentile functions. |
| * However, we don't assert anything to keep that assumption, for the |
| * later extensibility. |
| */ |
| foreach (l, aggstate->percs) |
| { |
| PercentileExprState *percstate = (PercentileExprState *) lfirst(l); |
| PercentileExpr *perc = (PercentileExpr *) percstate->xprstate.expr; |
| AggStatePerAgg peraggstate; |
| FmgrInfo *transfn; |
| int numArguments; |
| int i; |
| Oid trans_argtypes[FUNC_MAX_ARGS]; |
| ListCell *lc; |
| Expr *dummy_expr; |
| |
| /* Look for a previous duplicate aggregate */ |
| for (i = 0; i <= aggno; i++) |
| { |
| /* |
| * In practice, percentile expression doesn't contain |
| * volatile functions since everything is evaluated and |
| * becomes Var during the preprocess such as ordering operations. |
| * However, adding a check for volatile may be robust and |
| * consistent with Aggref initialization. |
| */ |
| if (equal(perc, peragg[i].perc) && |
| !contain_volatile_functions((Node *) perc)) |
| break; |
| } |
| if (i <= aggno) |
| { |
| /* Found a match to an existing entry, so just mark it */ |
| percstate->aggno = i; |
| continue; |
| } |
| |
| /* Nope, so assign a new PerAgg record */ |
| peraggstate = &peragg[++aggno]; |
| |
| /* Mark Aggref state node with assigned index in the result array */ |
| percstate->aggno = aggno; |
| |
| /* Fill in the peraggstate data */ |
| peraggstate->percstate = percstate; |
| peraggstate->perc = perc; |
| /* |
| * numArguments = arg + ORDER BY + pc + tc |
| * See notes on percentile_cont_trans() and ExecInitExpr() for |
| * PercentileExpr. |
| */ |
| numArguments = list_length(perc->args) + list_length(perc->sortClause) + 2; |
| peraggstate->numArguments = numArguments; |
| |
| /* |
| * Set up transfn. In general, we should use fmgr_info, but we don't |
| * have the catalog function (thus no oid for functions) due to the |
| * difficulity of changing the catalog at the moment. This should |
| * be cleaned when we can change the catalog. |
| */ |
| transfn = &peraggstate->transfn; |
| transfn->fn_nargs = list_length(perc->args) + 1; |
| transfn->fn_strict = false; |
| transfn->fn_retset = false; |
| transfn->fn_mcxt = CurrentMemoryContext; |
| transfn->fn_addr = perc->perckind == PERC_DISC ? |
| percentile_disc_trans : percentile_cont_trans; |
| transfn->fn_oid = InvalidOid; |
| |
| /* |
| * trans type is the same as result type, as they don't have final func. |
| */ |
| trans_argtypes[0] = perc->perctype; |
| i = 1; |
| /* |
| * Literal arguments. |
| */ |
| foreach (lc, perc->args) |
| { |
| Node *arg = lfirst(lc); |
| trans_argtypes[i++] = exprType(arg); |
| } |
| /* |
| * ORDER BY arguments. |
| */ |
| foreach (lc, perc->sortTargets) |
| { |
| TargetEntry *tle = lfirst(lc); |
| trans_argtypes[i++] = exprType((Node *) tle->expr); |
| } |
| /* |
| * Peer count and total count. |
| */ |
| trans_argtypes[i++] = INT8OID; |
| trans_argtypes[i++] = INT8OID; |
| /* |
| * Build FuncExpr for the transition function. |
| */ |
| build_aggregate_fnexprs(trans_argtypes, |
| i, |
| perc->perctype, |
| perc->perctype, |
| InvalidOid, |
| InvalidOid, |
| InvalidOid, |
| InvalidOid, |
| InvalidOid, |
| (Expr **) &transfn->fn_expr, |
| &dummy_expr, NULL, NULL, NULL); |
| |
| get_typlenbyval(perc->perctype, |
| &peraggstate->resulttypeLen, |
| &peraggstate->resulttypeByVal); |
| get_typlenbyval(perc->perctype, |
| &peraggstate->transtypeLen, |
| &peraggstate->transtypeByVal); |
| |
| /* |
| * Hard code for the known information. |
| */ |
| peraggstate->initValueIsNull = true; |
| peraggstate->initValue = (Datum) 0; |
| |
| peraggstate->finalfn_oid = InvalidOid; |
| peraggstate->prelimfn_oid = InvalidOid; |
| |
| /* |
| * Get a tupledesc corresponding to the inputs (including sort |
| * expressions) of the agg. |
| */ |
| peraggstate->evaldesc = ExecTypeFromTL(percstate->tlist, false); |
| |
| /* Create slot we're going to do argument evaluation in */ |
| peraggstate->evalslot = ExecInitExtraTupleSlot(estate); |
| ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc); |
| |
| /* Set up projection info for evaluation */ |
| peraggstate->evalproj = ExecBuildProjectionInfo(percstate->args, |
| aggstate->tmpcontext, |
| peraggstate->evalslot, |
| NULL); |
| } |
| |
| /* Update numaggs to match number of unique aggregates found */ |
| aggstate->numaggs = aggno + 1; |
| |
| /* MPP */ |
| aggstate->hhashtable = NULL; |
| |
| /* ROLLUP */ |
| aggstate->perpassthru = NULL; |
| |
| if (node->inputHasGrouping) |
| { |
| AggStatePerGroup perpassthru; |
| |
| perpassthru = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); |
| aggstate->perpassthru = perpassthru; |
| |
| } |
| |
| aggstate->num_attrs = 0; |
| |
| aggstate->aggType = getAggType(node); |
| |
| /* Set the default memory manager */ |
| aggstate->mem_manager.alloc = cxt_alloc; |
| aggstate->mem_manager.free = cxt_free; |
| aggstate->mem_manager.manager = aggstate->aggcontext; |
| aggstate->mem_manager.realloc_ratio = 1; |
| |
| initGpmonPktForAgg((Plan *)node, &aggstate->ss.ps.gpmon_pkt, estate); |
| |
| return aggstate; |
| } |
| |