blob: e049bc6b0c054aeda0c0d903a520c57a52deb543 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbgroup.c
* Routines to aid in planning grouping queries for parallel
* execution. This is, essentially, an extension of the file
* optimizer/prep/planner.c, although some functions are not
* externalized.
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <limits.h>
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/print.h"
#include "optimizer/clauses.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/planshare.h"
#include "optimizer/prep.h"
#include "optimizer/subselect.h"
#include "optimizer/tlist.h"
#include "optimizer/var.h"
#include "parser/parsetree.h"
#include "parser/parse_clause.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "parser/parse_relation.h"
#include "utils/lsyscache.h"
#include "utils/selfuncs.h"
#include "utils/syscache.h"
#include "catalog/pg_aggregate.h"
#include "cdb/cdbllize.h"
#include "cdb/cdbpathtoplan.h" /* cdbpathtoplan_create_flow() */
#include "cdb/cdbpath.h"
#include "cdb/cdbpullup.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbhash.h" /* isGreenplumDbHashable() */
#include "cdb/cdbsetop.h"
#include "cdb/cdbgroup.h"
extern void UpdateScatterClause(Query *query, List *newtlist);
/*
* MppGroupPrep represents a strategy by which to precondition the
* argument to a parallel aggregation plan.
*
* MPP_GRP_PREP_NONE
* Use the input plan as is.
* MPP_GRP_PREP_HASH_GROUPS
* Redistribute the input to collocate groups.
* MPP_GRP_PREP_HASH_DISTINCT
* Redistribute the input on the sole distinct expression used as
* an aggregate argument.
* MPP_GRP_PREP_FOCUS_QE
* Focus a partitioned input on the utility QE.
* MPP_GRP_PREP_FOCUS_QD
* Focus a partitioned input on the QD.
* MPP_GRP_PREP_BROADCAST
* Broadcast to convert a partitioned input into a replicated one.
*/
typedef enum MppGroupPrep
{
MPP_GRP_PREP_NONE = 0,
MPP_GRP_PREP_HASH_GROUPS,
MPP_GRP_PREP_HASH_DISTINCT,
MPP_GRP_PREP_FOCUS_QE,
MPP_GRP_PREP_FOCUS_QD,
MPP_GRP_PREP_BROADCAST
} MppGroupPrep;
/*
* MppGroupType represents a stategy by which to implement parallel
* aggregation on a possibly preconditioned input plan.
*
* MPP_GRP_TYPE_NONE
* No parallel plan found.
* MPP_GRP_TYPE_BASEPLAN
* Use the sequential plan as is.
* MPP_GRP_TYPE_PLAIN_2STAGE,
* Ungrouped, 2-stage aggregation.
* MPP_GRP_TYPE_GROUPED_2STAGE,
* Grouped, 2-stage aggregation.
* MPP_GRP_TYPE_PLAIN_DQA_2STAGE,
* Ungrouped, 2-stage aggregation.
* MPP_GRP_TYPE_GROUPED_DQA_2STAGE,
* Grouped, 2-stage aggregation.
*
* TODO: Add types for min-max optimization, when ready:
*
* MPP_GRP_TYPE_MINMAX
* Use the sequential min-max optimization plan as is.
* MPP_GRP_TYPE_MINMAX_2STAGE
* Use a 2-stage variant of min-max aggregation.
*/
typedef enum MppGroupType
{
MPP_GRP_TYPE_NONE = 0,
MPP_GRP_TYPE_BASEPLAN,
MPP_GRP_TYPE_PLAIN_2STAGE,
MPP_GRP_TYPE_GROUPED_2STAGE,
MPP_GRP_TYPE_PLAIN_DQA_2STAGE,
MPP_GRP_TYPE_GROUPED_DQA_2STAGE,
} MppGroupType;
/* Summary values detailing the post-Motion part of a coplan for
* three-phase aggreation. The code is as follows:
* S = Sort
* G = GroupAgg
* H = HashAgg
* P = PlainAgg
*
* So GSH means (GroupAgg (Sort (HashAgg X))).
*/
typedef enum DqaCoplanType
{
DQACOPLAN_GGS,
DQACOPLAN_GSH,
DQACOPLAN_SHH,
DQACOPLAN_HH,
DQACOPLAN_PGS,
DQACOPLAN_PH
} DqaCoplanType;
typedef enum DqaJoinStrategy
{
DqaJoinUndefined = 0,
DqaJoinNone, /* No join required for solitary DQA argument. */
DqaJoinCross, /* Scalar aggregation uses cross product. */
DqaJoinHash, /* Hash join (possibly with subsequent sort) */
DqaJoinMerge, /* Merge join */
/* These last are abstract and will be replaced
* by DqaJoinHash aor DqaJoinMerge once planning
* is complete.
*/
DqaJoinSorted, /* Sorted output required. */
DqaJoinCheapest, /* No sort requirement. */
} DqaJoinStrategy;
/* DQA coplan information */
typedef struct DqaInfo
{
Node *distinctExpr; /* By reference from agg_counts for convenience. */
AttrNumber base_index; /* Index of attribute in base plan targetlist */
bool can_hash;
double num_rows; /* Estimated cardinality of grouping key, dqa arg */
Plan *coplan; /* Coplan for this (later this and all prior) coplan */
Query *parse; /* Plausible root->parse for the coplan. */
bool distinctkey_collocate; /* Whether the input plan collocates on this
* distinct key */
/* These fields are for costing and planning. Before constructing
* the coplan for this DQA argument, determine cheapest way to get
* the answer and cheapest way to get the answer in grouping key
* order.
*/
bool use_hashed_preliminary;
Cost cost_sorted;
DqaCoplanType coplan_type_sorted;
Cost cost_cheapest;
DqaCoplanType coplan_type_cheapest;
} DqaInfo;
/* Information about the overall plan for a one-, two- or one coplan of
* a three-phase aggregation.
*/
typedef struct AggPlanInfo
{
/*
* The input is either represented as a Path or a Plan and a Path.
* If input_plan is given, use this plan instead of creating one
* through input_path.
* */
Path *input_path;
Plan *input_plan;
/* These are the ordinary fields characterizing an aggregation */
CdbPathLocus input_locus;
MppGroupPrep group_prep;
MppGroupType group_type;
CdbPathLocus output_locus;
bool distinctkey_collocate; /* Whether the input plan collocates on the
* distinct key */
/* These are extra for 3-phase plans */
DqaJoinStrategy join_strategy;
bool use_sharing;
/* These summarize the status of the structure's cost value. */
bool valid;
Cost plan_cost;
} AggPlanInfo;
typedef struct MppGroupContext
{
MppGroupPrep prep;
MppGroupType type;
List *tlist; /* The preprocessed targetlist of the original query. */
Node *havingQual; /* The proprocessed having qual of the original query. */
Path *best_path;
Path *cheapest_path;
Plan *subplan;
AggClauseCounts *agg_counts;
double tuple_fraction;
double *p_dNumGroups; /* Group count estimate shared up the call tree. */
CanonicalGroupingSets *canonical_grpsets;
int64 grouping; /* the GROUPING value */
bool is_grpext; /* identify if this is a grouping extension query */
List *sub_tlist; /* Derived (in cdb_grouping_planner) input targetlist. */
int numGroupCols;
AttrNumber *groupColIdx;
int numDistinctCols;
AttrNumber *distinctColIdx;
DqaInfo *dqaArgs;
bool use_hashed_grouping;
CdbPathLocus input_locus;
CdbPathLocus output_locus;
/* Indicate whether the input plan collocates on the distinct key if any.
* It is used for one or two-phase aggregation. For three-phase aggregation,
* distinctkey_collocate inside DqaInfo is used.
*/
bool distinctkey_collocate;
List *current_pathkeys;
/* Indicate if root->parse has been changed during planning. Carry in pointer
* to root for miscellaneous globals.
*/
bool querynode_changed;
PlannerInfo *root;
/* Work space for aggregate/tlist deconstruction and reconstruction */
Index final_varno; /* input */
bool use_irefs_tlist; /* input */
bool use_dqa_pruning; /* input */
List *prefs_tlist; /* Aggref attributes for prelim_tlist */
List *irefs_tlist; /* Aggref attributes for optional inter_tlist */
List *frefs_tlist; /* Aggref attributes for optional join tlists */
List *dqa_tlist; /* DQA argument attributes for prelim_tlist */
List **dref_tlists; /* Array of DQA Aggref tlists (dqa_tlist order) */
List *grps_tlist; /* Grouping attributes for prelim_tlist */
List *fin_tlist; /* Final tlist cache. */
List *fin_hqual; /* Final having qual cache. */
Index split_aggref_sortgroupref; /* for TargetEntrys made in split_aggref */
Index outer_varno; /* work */
Index inner_varno; /* work */
int *dqa_offsets; /* work */
List *top_tlist; /* work - the target list to finalize */
/* 3-phase DQA decisions */
DqaJoinStrategy join_strategy;
bool use_sharing;
List *wagSortClauses; /* List of List; within-agg multi sort level */
} MppGroupContext;
/* Constants for aggregation approaches.
*/
#define AGG_NONE 0x00
#define AGG_1PHASE 0x01
#define AGG_2PHASE 0x02
#define AGG_2PHASE_DQA 0x04
#define AGG_3PHASE 0x08
#define AGG_SINGLEPHASE (AGG_1PHASE)
#define AGG_MULTIPHASE (AGG_2PHASE | AGG_2PHASE_DQA | AGG_3PHASE)
#define AGG_ALL (AGG_SINGLEPHASE | AGG_MULTIPHASE)
/* Constants for DQA pruning:
*/
static const Index grp_varno = 1; /* var refers to grps_tlist */
static const Index ref_varno = 2; /* var refers to prefs_tlist or relatives */
static const Index dqa_base_varno = 3; /* refers to one of the dref_tlists */
/* Coefficients for cost calculation adjustments: These are candidate GUCs
* or, perhaps, replacements for the gp_eager_... series. We wouldn't
* need these if our statistics and cost calculations were correct, but
* as of 3.2, they not.
*
* Early testing suggested that (1.0, 0.45, 1.7) was about right, but the
* risk of introducing skew in the initial redistribution of a 1-phase plan
* is great (especially given the 3.2 tendency to way underestimate the
* cardinality of joins), so we penalize 1-phase and normalize to the
* 2-phase cost (approximately).
*/
static const double gp_coefficient_1phase_agg = 20.0; /* penalty */
static const double gp_coefficient_2phase_agg = 1.0; /* normalized */
static const double gp_coefficient_3phase_agg = 3.3; /* increase systematic under estimate */
/* Forward declarations */
static Plan * make_one_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx);
static Plan * make_two_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx);
static Plan * make_three_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx);
static Plan * make_plan_for_one_dqa(PlannerInfo *root, MppGroupContext *ctx,
int dqa_index, Plan* result_plan,
Query** coquery_p);
static Plan * join_dqa_coplan(PlannerInfo *root, MppGroupContext *ctx, Plan *plan, int dqa_index);
static int compareDqas(const void *larg, const void *rarg);
static void planDqaJoinOrder(PlannerInfo *root, MppGroupContext *ctx,
double input_rows);
static List *make_subplan_tlist(List *tlist, Node *havingQual,
List *grp_clauses, int *pnum_gkeys, AttrNumber **pcols_gkeys,
List *dqa_args, int *pnum_dqas, AttrNumber **pcols_dqas);
static List *describe_subplan_tlist(List *sub_tlist,
List *tlist, Node *havingQual,
List *grp_clauses, int *pnum_gkeys, AttrNumber **pcols_gkeys,
List *dqa_args, int *pnum_dqas, AttrNumber **pcols_dqas);
static void generate_multi_stage_tlists(MppGroupContext* ctx,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist,
List **p_final_qual);
static void prepare_dqa_pruning_tlists(MppGroupContext *ctx);
static void generate_dqa_pruning_tlists(MppGroupContext *ctx,
int dqa_index,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist,
List **p_final_qual);
static void deconstruct_agg_info(MppGroupContext *ctx);
static void reconstruct_agg_info(MppGroupContext *ctx,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist,
List **p_final_qual);
static void reconstruct_coplan_info(MppGroupContext *ctx,
int dqa_index,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist);
static Expr *deconstruct_expr(Expr *expr, MppGroupContext *ctx);
static Node* deconstruct_expr_mutator(Node *node, MppGroupContext *ctx);
static Node *split_aggref(Aggref *aggref, MppGroupContext *ctx);
static List *make_vars_tlist(List *tlist, Index varno, AttrNumber offset);
static Plan* add_subqueryscan(PlannerInfo* root, List **p_pathkeys,
Index varno, Query *subquery, Plan *subplan);
static List *seq_tlist_concat(List *tlist1, List *tlist2);
static Node *finalize_split_expr(Node *expr, MppGroupContext *ctx);
static Node* finalize_split_expr_mutator(Node *node, MppGroupContext *ctx);
static Oid lookup_agg_transtype(Aggref *aggref);
static bool hash_safe_type(Oid type);
static bool sorting_prefixes_grouping(PlannerInfo *root);
static bool gp_hash_safe_grouping(PlannerInfo *root);
static Cost cost_common_agg(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info, Plan *dummy);
static Cost cost_1phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info);
static Cost cost_2phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info);
static Cost cost_3phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info);
static void set_cost_of_join_strategies(MppGroupContext *ctx, Cost *hashjoin_cost, Cost *mergejoin_cost);
static void initAggPlanInfo(AggPlanInfo *info, Path *input_path, Plan *input_plan);
static void set_coplan_strategies(PlannerInfo *root, MppGroupContext *ctx, DqaInfo *dqaArg, Path *input);
static Cost incremental_sort_cost(double rows, int width, int numKeyCols);
static Cost incremental_agg_cost(double rows, int width, AggStrategy strategy,
int numGroupCols, double numGroups,
int numAggs, int transSpace);
static Cost incremental_motion_cost(double sendrows, double recvrows);
/*---------------------------------------------
* WITHIN/Percentile stuff
*---------------------------------------------*/
/*
* WithinAggContext is a variable set used throughout plan_within_agg_persort().
*/
typedef struct
{
bool use_deduplicate; /* true to choose deduplicate strategy */
AttrNumber pc_pos; /* resno for peer count in outer tlist */
AttrNumber tc_pos; /* resno for total count in inner tlist */
List *current_pathkeys; /* pathkeys tracking */
List *inner_pathkeys; /* pathkeys for inner plan */
List *rtable; /* outer/inner RTE of the output */
} WithinAggContext;
static bool choose_deduplicate(PlannerInfo *root, List *sortExprs,
Plan *input_plan, double *numGroups);
static Plan *wrap_plan_index(PlannerInfo *root, Plan *plan, Query *query,
List **p_pathkeys, Index varno, const char *alias_name, Query **query_p);
static void rebuild_simple_rel_and_rte(PlannerInfo *root);
static Plan *make_parallel_or_sequential_agg(PlannerInfo *root,
AggClauseCounts *agg_count, GroupContext *group_context,
List **current_pathkeys_p);
static Node *deconstruct_within_agg(Node *node, MppGroupContext *ctx);
static Node *deconstruct_within_agg_mutator(Node *node, MppGroupContext *ctx);
static List *fetch_percentiles(Query *parse, List *sortClause);
static Plan *make_deduplicate_plan(PlannerInfo *root, GroupContext *group_context,
List *groupClause, List *sortClause, double numGroups,
AttrNumber *pc_pos_p, List **current_pathkeys_p, Plan *subplan);
static Plan *within_agg_make_baseplan(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
List *sortClause,
Plan *result_plan);
static Plan *within_agg_add_outer_sort(PlannerInfo *root,
WithinAggContext *wag_context,
List *sortClause,
Plan *outer_plan);
static Plan *within_agg_construct_inner(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
Plan *inner_plan);
static Plan *within_agg_join_plans(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
Plan *outer_plan,
Plan *inner_plan);
static Plan *within_agg_final_agg(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
List *sortClause,
Plan *result_plan);
static Plan *plan_within_agg_persort(PlannerInfo *root, GroupContext *group_context,
List *sortClause, List *current_pathkeys, Plan *base_plan);
/*
* add_motion_to_dqa_plan
* Add a Redistribute motion to a dqa child plan if the plan is not already
* distributed on the grouping columns
*/
static Plan *add_motion_to_dqa_child(Plan *plan, PlannerInfo *root, bool *motion_added);
/*
* Function: cdb_grouping_planner
*
* This is basically an extension of the function grouping_planner() from
* planner.c. It (conditionally) replaces the logic in the "normal case"
* (the aggregation/grouping branch) of the main "if" statement.
*
* The result is a Plan for one-, two-, or three-phase grouping/aggregation
* (possibly including a top-level join in case of DQA pruning) or NULL.
*
* A NULL result means that the ordinary sequential planner will produce
* a correct (and preferred) result, so the "normal case" code should be
* used.
*
* A non-NULL result is taken as a partially formed Plan to be processed
* by the trailing sort/distinct/limit logic of grouping_planner(). In
* otherwords, after any associated changes to the local environment (see
* the calling code), the resulting plan should be treated as if from the
* "normal case" or the function optimize_minmax_aggregates().
*
* The pointer at location pcurrent_pathkeys is updated to refer to the
* ordering pathkey or NIL, if none. The parse tree at root->parse may
* be modified in place to reflect changes in the context (e.g. current
* range table).
*/
Plan *
cdb_grouping_planner(PlannerInfo* root,
AggClauseCounts *agg_counts,
GroupContext *group_context)
{
MppGroupContext ctx;
Plan * result_plan = NULL;
List * sub_tlist = NIL;
bool has_groups = root->parse->groupClause != NIL;
bool has_aggs = agg_counts->numAggs > 0;
bool has_ordered_aggs = list_length(agg_counts->aggOrder) > 0;
ListCell *lc;
bool is_grpext = false;
unsigned char consider_agg = AGG_NONE;
AggPlanInfo plan_1p;
AggPlanInfo plan_2p;
AggPlanInfo plan_3p;
AggPlanInfo *plan_info = NULL;
Assert( !has_groups || root->group_pathkeys != NULL );
memset(&ctx, 0, sizeof(ctx));
*(group_context->querynode_changed) = false;
/* We always use sequential plans for distinct-qualified rollup queries,
* so don't waste time working on alternatives.
*/
is_grpext = is_grouping_extension(group_context->canonical_grpsets);
if ( is_grpext && agg_counts->numDistinctAggs > 0)
return NULL;
/*
* First choose a one-stage plan. Since there's always a way to do this,
* it serves as our default choice.
*/
if (group_context->subplan == NULL)
{
Path *input_path = group_context->cheapest_path;
/* Should we prefer the "best" path? Only for vector aggregation
* of input already sorted and collocated on the grouping key.
*/
if ( has_groups &&
pathkeys_contained_in(root->group_pathkeys, group_context->best_path->pathkeys) &&
cdbpathlocus_collocates(group_context->best_path->locus, root->group_pathkeys, false /*exact_match*/) )
{
input_path = group_context->best_path;
}
initAggPlanInfo(&plan_1p, input_path, group_context->subplan);
}
else
{
initAggPlanInfo(&plan_1p, group_context->best_path, group_context->subplan);
plan_1p.input_locus = group_context->best_path->locus;
}
if ( ! CdbPathLocus_IsPartitioned(plan_1p.input_locus) )
{
/* Can use base plan with no motion yielding same locus. */
plan_1p.group_prep = MPP_GRP_PREP_NONE;
plan_1p.output_locus = plan_1p.input_locus;
plan_1p.distinctkey_collocate = true;
}
else if ( has_groups ) /* and not single or replicated */
{
if (root->group_pathkeys != NULL &&
cdbpathlocus_collocates(plan_1p.input_locus, root->group_pathkeys, false /*exact_match*/) )
{
plan_1p.group_prep = MPP_GRP_PREP_NONE;
plan_1p.output_locus = plan_1p.input_locus; /* may be less discriminating that group locus */
plan_1p.distinctkey_collocate = true;
}
else
{
if (gp_hash_safe_grouping(root))
{
plan_1p.group_prep = MPP_GRP_PREP_HASH_GROUPS;
CdbPathLocus_MakeHashed(&plan_1p.output_locus, root->group_pathkeys);
}
else
{
plan_1p.group_prep = MPP_GRP_PREP_FOCUS_QE;
CdbPathLocus_MakeSingleQE(&plan_1p.output_locus);
}
}
}
else if ( has_aggs ) /* and not grouped and not single or replicated */
{
plan_1p.group_prep = MPP_GRP_PREP_FOCUS_QE;
CdbPathLocus_MakeSingleQE(&plan_1p.output_locus);
}
/*
* If the sequential planner can handle the situation with no Motion
* involved, let it do so. Don't bother to investigate the 2-stage
* approach.
*
* If the GUC enable_groupagg is set to off and this is a DQA
* query, we won't use the sequential plan. This is because
* the sequential plan for a DQA query always uses GroupAgg.
*/
if ( plan_1p.group_prep == MPP_GRP_PREP_NONE )
{
if (enable_groupagg || agg_counts->numDistinctAggs == 0)
{
*(group_context->pcurrent_pathkeys) = NIL;
return NULL;
}
}
/*
* When an input plan is given, use it, including its target
* list. When an input target list (and no plan) is given,
* use it for the plan to be created. When neither is given,
* generate a phase 1 target list for the plan to be created.
* Also note the location of any grouping attributes in the
* target list (numGroupCols, groupColIdx).
*
* Also make sure there's a target entry with a non-zero
* sortgroupref for each DQA argument and note the location
* of the attributes (numDistinctCols, distinctColIdx).
*/
if ( group_context->subplan != NULL)
{
sub_tlist = group_context->subplan->targetlist;
}
else if ( group_context->sub_tlist != NULL )
{
sub_tlist = group_context->sub_tlist;
sub_tlist = describe_subplan_tlist(sub_tlist,
group_context->tlist,
root->parse->havingQual,
root->parse->groupClause,
&(group_context->numGroupCols),
&(group_context->groupColIdx),
agg_counts->dqaArgs,
&(group_context->numDistinctCols),
&(group_context->distinctColIdx));
}
else
{
sub_tlist = make_subplan_tlist(group_context->tlist,
root->parse->havingQual,
root->parse->groupClause,
&(group_context->numGroupCols),
&(group_context->groupColIdx),
agg_counts->dqaArgs,
&(group_context->numDistinctCols),
&(group_context->distinctColIdx));
/* Where we need to and we can, add column names to the sub_tlist
* entries to make EXPLAIN output look nice. Note that we could dig
* further than this (if we come up empty handed) by probing the range
* table (root->parse->rtable), but this covers the ordinary cases.
*/
foreach(lc, sub_tlist)
{
TargetEntry *tle = (TargetEntry*)lfirst(lc);
/*
* the first tlist whose expression is equal() to the expression of tle
* may not be the desired one. We should also check if ressortgroupref is
* same if ressortgroupref > 0
*/
if ( IsA(tle->expr, Var) && tle->resname == NULL )
{
TargetEntry *vartle =
tlist_member_with_ressortgroupref((Node*)tle->expr,
group_context->tlist, tle->ressortgroupref);
if ( vartle != NULL && vartle->resname != NULL )
tle->resname = pstrdup(vartle->resname);
}
}
}
/* At this point, we're committed to producing a one- , two- or
* three-stage plan with motion. Determine what aggregation approaches to explore.
* Per MPP-2378, we don't insist on has_aggs for multi-phase
* plans.
*/
{
unsigned char allowed_agg;
unsigned char possible_agg;
allowed_agg = AGG_ALL;
if ( ! root->config->gp_enable_multiphase_agg )
allowed_agg &= AGG_SINGLEPHASE;
/* This prohibition could be relaxed if we tracked missing
* preliminary functions per DQA and were willing to plan
* some DQAs as single and some as multiple phases. Not
* currently, however.
*/
if ( agg_counts->missing_prelimfunc )
allowed_agg &= ~ AGG_MULTIPHASE;
/*
* Ordered aggregates need to run the transition function on the
* values in sorted order, which in turn translates into single
* phase aggregation.
*/
if ( has_ordered_aggs )
allowed_agg &= ~ AGG_MULTIPHASE;
/* We are currently unwilling to redistribute a gathered
* intermediate across the cluster. This might change
* one day.
*/
if ( ! CdbPathLocus_IsPartitioned(plan_1p.input_locus ) )
allowed_agg &= AGG_SINGLEPHASE;
if ( ! root->config->gp_enable_agg_distinct )
allowed_agg &= ~ AGG_2PHASE_DQA;
if ( ! root->config->gp_enable_dqa_pruning )
allowed_agg &= ~ AGG_3PHASE;
possible_agg = AGG_SINGLEPHASE;
if(gp_hash_safe_grouping(root))
{
switch ( list_length(agg_counts->dqaArgs) )
{
case 0:
possible_agg |= AGG_2PHASE;
break;
case 1:
possible_agg |= AGG_2PHASE_DQA | AGG_3PHASE;
break;
default: /* > 1 */
possible_agg |= AGG_3PHASE;
break;
}
}
if ( is_grpext )
possible_agg &= ~ (AGG_2PHASE_DQA | AGG_3PHASE);
consider_agg = allowed_agg & possible_agg;
}
Assert( consider_agg & AGG_1PHASE ); /* Always possible! */
if ( consider_agg & ( AGG_2PHASE | AGG_2PHASE_DQA ) )
{
/* XXX initAggPlanInfo(&plan_2p, group_context->cheapest_path); */
initAggPlanInfo(&plan_2p, group_context->best_path,
group_context->subplan); /* but why? */
/* Common 2-phase setup. */
if ( has_groups )
{
plan_2p.group_type = MPP_GRP_TYPE_GROUPED_2STAGE;
CdbPathLocus_MakeHashed(&plan_2p.output_locus, root->group_pathkeys);
}
else
{
plan_2p.group_type = MPP_GRP_TYPE_PLAIN_2STAGE;
CdbPathLocus_MakeSingleQE(&plan_2p.output_locus);
}
if ( consider_agg & AGG_2PHASE_DQA )
{
List *distinct_pathkeys;
/* Either have DQA or not! */
Assert(! (consider_agg & AGG_2PHASE) );
Insist( IsA(agg_counts->dqaArgs, List) &&
list_length((List*)agg_counts->dqaArgs) == 1 );
distinct_pathkeys = cdb_make_pathkey_for_expr(root,
linitial(agg_counts->dqaArgs),
list_make1(makeString("=")));
distinct_pathkeys = list_make1(distinct_pathkeys);
if ( ! cdbpathlocus_collocates(plan_2p.input_locus, distinct_pathkeys, false /*exact_match*/))
{
plan_2p.group_prep = MPP_GRP_PREP_HASH_DISTINCT;
CdbPathLocus_MakeHashed(&plan_2p.input_locus, distinct_pathkeys);
}
else
{
plan_2p.group_prep = MPP_GRP_PREP_HASH_DISTINCT;
plan_2p.output_locus = plan_2p.input_locus;
plan_2p.distinctkey_collocate = true;
}
}
}
if ( consider_agg & AGG_3PHASE )
{
initAggPlanInfo(&plan_3p, group_context->cheapest_path,
group_context->subplan);
if ( has_groups )
{
plan_3p.group_type = MPP_GRP_TYPE_GROUPED_DQA_2STAGE;
CdbPathLocus_MakeHashed(&plan_3p.output_locus, root->group_pathkeys);
}
else
{
plan_3p.group_type = MPP_GRP_TYPE_PLAIN_DQA_2STAGE;
CdbPathLocus_MakeSingleQE(&plan_3p.output_locus);
}
}
/*
* Package up byproducts for the actual planner.
*/
ctx.prep = plan_1p.group_prep;
ctx.type = plan_1p.group_type;
ctx.tlist = group_context->tlist;
ctx.havingQual = root->parse->havingQual;
ctx.sub_tlist = sub_tlist;
ctx.numGroupCols = group_context->numGroupCols;
ctx.groupColIdx = group_context->groupColIdx;
ctx.numDistinctCols = group_context->numDistinctCols;
ctx.distinctColIdx = group_context->distinctColIdx;
ctx.use_hashed_grouping = group_context->use_hashed_grouping;
ctx.best_path = group_context->best_path;
ctx.cheapest_path = group_context->cheapest_path;
ctx.subplan = group_context->subplan;
ctx.input_locus = plan_1p.input_locus;
ctx.output_locus = plan_1p.output_locus;
ctx.distinctkey_collocate = plan_1p.distinctkey_collocate;
ctx.agg_counts = agg_counts;
ctx.tuple_fraction = group_context->tuple_fraction;
ctx.p_dNumGroups = group_context->p_dNumGroups;
ctx.canonical_grpsets = group_context->canonical_grpsets;
ctx.grouping = group_context->grouping;
ctx.is_grpext = is_grpext;
ctx.current_pathkeys = NIL; /* Initialize to be tidy. */
ctx.querynode_changed = false;
ctx.root = root;
/* If we're to consider 3-phase plans, do some preparation.
*/
if ( ctx.numDistinctCols > 0 && (consider_agg & AGG_3PHASE) )
{
int i;
/* Collect row count estimates and other info for the partial
* results of grouping over combined grouping and distinct (DQA)
* keys. Order the output array of DqaInfo structures (in the
* context) according to how they should be joined.
*/
planDqaJoinOrder(root, &ctx, plan_3p.input_path->parent->rows);
/* Plan the post-Motion portions of each coplan in two ways: one to
* produce the result in the cheapest way and one to produce the
* result ordered by the grouping key in the cheapest way. (For use
* by make_plan_for_one_dqa called by make_three_stage_agg_plan.)
*/
for ( i = 0; i < ctx.numDistinctCols; i++ )
{
List *distinct_pathkeys;
set_coplan_strategies(root, &ctx, &ctx.dqaArgs[i], plan_3p.input_path);
/* Determine if the input plan already collocates on the distinct
* key.
*/
distinct_pathkeys = cdb_make_pathkey_for_expr(root,
ctx.dqaArgs[i].distinctExpr,
list_make1(makeString("=")));
distinct_pathkeys = list_make1(distinct_pathkeys);
if (cdbpathlocus_collocates(plan_3p.input_locus, distinct_pathkeys, false /*exact_match*/))
{
ctx.dqaArgs[i].distinctkey_collocate = true;
}
list_free(distinct_pathkeys);
}
}
plan_info = NULL; /* Most cost-effective, feasible plan. */
if ( consider_agg & AGG_1PHASE )
{
cost_1phase_aggregation(root, &ctx, &plan_1p);
if ( gp_dev_notice_agg_cost )
elog(NOTICE, "1-phase cost: %.6f", plan_1p.plan_cost);
if ( plan_info == NULL || plan_info->plan_cost > plan_1p.plan_cost )
plan_info = &plan_1p;
}
if ( consider_agg & ( AGG_2PHASE | AGG_2PHASE_DQA ) )
{
cost_2phase_aggregation(root, &ctx, &plan_2p);
if ( gp_dev_notice_agg_cost )
elog(NOTICE, "2-phase cost: %.6f", plan_2p.plan_cost);
if ( plan_info == NULL || plan_info->plan_cost > plan_2p.plan_cost )
plan_info = &plan_2p;
}
if ( consider_agg & AGG_3PHASE )
{
cost_3phase_aggregation(root, &ctx, &plan_3p);
if ( gp_dev_notice_agg_cost )
elog(NOTICE, "3-phase cost: %.6f", plan_3p.plan_cost);
if ( plan_info == NULL || !enable_groupagg || plan_info->plan_cost > plan_3p.plan_cost )
plan_info = &plan_3p;
}
Insist( plan_info != NULL );
ctx.prep = plan_info->group_prep;
ctx.type = plan_info->group_type;
ctx.input_locus = plan_info->input_locus;
ctx.output_locus = plan_info->output_locus;
ctx.distinctkey_collocate = plan_info->distinctkey_collocate;
ctx.join_strategy = plan_info->join_strategy;
ctx.use_sharing = plan_info->use_sharing;
/* Call appropriate planner. */
if (ctx.type == MPP_GRP_TYPE_BASEPLAN)
{
if (ctx.prep != MPP_GRP_PREP_NONE)
result_plan = make_one_stage_agg_plan(root, &ctx);
else
result_plan = NULL; /* allow sequential planner to do the work. */
}
else if (ctx.type == MPP_GRP_TYPE_PLAIN_2STAGE ||
ctx.type == MPP_GRP_TYPE_GROUPED_2STAGE)
result_plan = make_two_stage_agg_plan(root, &ctx);
else if (ctx.type == MPP_GRP_TYPE_PLAIN_DQA_2STAGE ||
ctx.type == MPP_GRP_TYPE_GROUPED_DQA_2STAGE)
result_plan = make_three_stage_agg_plan(root, &ctx);
else
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("no parallel plan for aggregation")));
if (!is_grpext && result_plan != NULL &&
IsA(result_plan, Agg))
((Agg *)result_plan)->lastAgg = true;
*(group_context->querynode_changed) = ctx.querynode_changed;
*(group_context->pcurrent_pathkeys) = ctx.current_pathkeys;
return result_plan;
}
/*
* Function make_one_stage_agg_plan
*
* Construct a one-stage aggregation plan by redistributing the result
* of the input plan appropriately
*/
static Plan *
make_one_stage_agg_plan(PlannerInfo *root,
MppGroupContext *ctx)
{
Query *parse = root->parse;
List *tlist = ctx->tlist;
List *sub_tlist = ctx->sub_tlist;
int numGroupCols = ctx->numGroupCols;
AttrNumber *groupColIdx = ctx->groupColIdx;
Path *best_path = ctx->best_path;
Path *cheapest_path = ctx->cheapest_path;
Path *path = NULL;
bool use_hashed_grouping = ctx->use_hashed_grouping;
long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 :
(*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX :
(long)*(ctx->p_dNumGroups);
List *groupExprs = NIL;
List *current_pathkeys;
QualCost tlist_cost;
int i;
Plan *result_plan;
bool is_sorted;
/*
* The argument to the "lower" Agg node will use a "flattened" tlist
* (having just the (levelsup==0) vars mentioned in the SELECT targetlist
* and HAVING qual plus entries for any GROUP BY expressions that are not
* simple Vars. This is the same sub_tlist as that used for 1-stage
* aggregation in grouping_planner.
*/
/* Create the base plan. If the best path is in grouping key order and
* we don't plan to move it around and this is a vector aggregation, we
* should use best path. In other cases, however, use cheapest.
*/
if (ctx->subplan == NULL)
{
is_sorted = pathkeys_contained_in(root->group_pathkeys, best_path->pathkeys);
path = cheapest_path;
if ( is_sorted && ctx->prep == MPP_GRP_PREP_NONE && numGroupCols > 0 )
path = best_path;
result_plan = create_plan(root, path);
current_pathkeys = path->pathkeys;
/* Instead of the flat target list produced above, use the sub_tlist
* constructed in cdb_grouping_planner. Add a Result node if the
* base plan can't project. (This may be unnecessary, but, if so,
* the Result node will be removed later.)
*/
result_plan = plan_pushdown_tlist(result_plan, sub_tlist);
Assert(result_plan->flow);
/* Account for the cost of evaluation of the sub_tlist. */
cost_qual_eval(&tlist_cost, sub_tlist, root);
result_plan->startup_cost += tlist_cost.startup;
result_plan->total_cost +=
tlist_cost.startup +
tlist_cost.per_tuple * result_plan->plan_rows;
}
else
{
result_plan = ctx->subplan;
current_pathkeys = ctx->current_pathkeys;
}
/* Precondition the input by adjusting its locus prior to adding
* the Agg or Group node to the base plan, if needed.
*/
switch ( ctx->prep )
{
case MPP_GRP_PREP_NONE:
break;
case MPP_GRP_PREP_HASH_GROUPS:
Assert(numGroupCols > 0);
for ( i = 0; i < numGroupCols; i++)
{
TargetEntry *tle = get_tle_by_resno(sub_tlist, groupColIdx[i]);
groupExprs = lappend(groupExprs, copyObject(tle->expr));
}
result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows);
current_pathkeys = NIL; /* No longer sorted. */
break;
case MPP_GRP_PREP_FOCUS_QE:
result_plan = (Plan*)make_motion_gather_to_QE(result_plan, (current_pathkeys != NIL));
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows * root->config->cdbpath_segments);
break;
case MPP_GRP_PREP_FOCUS_QD:
result_plan = (Plan*)make_motion_gather_to_QD(result_plan, (current_pathkeys != NIL));
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows * root->config->cdbpath_segments);
break;
case MPP_GRP_PREP_HASH_DISTINCT:
case MPP_GRP_PREP_BROADCAST:
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("no parallel plan for aggregation")));
break; /* Never */
}
Assert(result_plan->flow);
/*
* Insert AGG or GROUP node if needed, plus an explicit sort step
* if necessary.
*
* HAVING clause, if any, becomes qual of the Agg or Group node.
*/
if (!ctx->is_grpext && use_hashed_grouping)
{
/* Hashed aggregate plan --- no sort needed */
result_plan = (Plan *) make_agg(root,
tlist,
(List *) parse->havingQual,
AGG_HASHED, false,
numGroupCols,
groupColIdx,
numGroups,
0, /* num_nullcols */
0, /* input_grouping */
ctx->grouping,
0, /* rollup_gs_times */
ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace,
result_plan);
/* Hashed aggregation produces randomly-ordered results */
current_pathkeys = NIL;
}
else if (parse->hasAggs || parse->groupClause)
{
if (!ctx->is_grpext)
{
/* Plain aggregate plan --- sort if needed */
AggStrategy aggstrategy;
if (parse->groupClause)
{
if (!pathkeys_contained_in(root->group_pathkeys,
current_pathkeys))
{
result_plan = (Plan *)
make_sort_from_groupcols(root,
parse->groupClause,
groupColIdx,
false,
result_plan);
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
}
aggstrategy = AGG_SORTED;
/*
* The AGG node will not change the sort ordering of its
* groups, so current_pathkeys describes the result too.
*/
}
else
{
aggstrategy = AGG_PLAIN;
/* Result will be only one row anyway; no sort order */
current_pathkeys = NIL;
}
result_plan = (Plan *) make_agg(root,
tlist,
(List *) parse->havingQual,
aggstrategy, false,
numGroupCols,
groupColIdx,
numGroups,
0, /* num_nullcols */
0, /* input_grouping */
ctx->grouping,
0, /* rollup_gs_times */
ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace,
result_plan);
}
else
{
result_plan = plan_grouping_extension(root, path, ctx->tuple_fraction,
ctx->use_hashed_grouping,
&tlist, sub_tlist,
true, false,
(List *) parse->havingQual,
&numGroupCols,
&groupColIdx,
ctx->agg_counts,
ctx->canonical_grpsets,
ctx->p_dNumGroups,
&(ctx->querynode_changed),
&current_pathkeys,
result_plan);
}
}
else if (root->hasHavingQual)
{
/* No aggregates, and no GROUP BY, but a HAVING qual is a
* degenerate case discussed in grouping_planner. We can
* just throw away the plan-so-far and let the caller handle
* the whole enchilada.
*/
return NULL;
}
/*
* Decorate the top node with a Flow node if it doesn't have one yet.
* (In such cases we require the next-to-top node to have a Flow node
* from which we can obtain the distribution info.)
*/
if (!result_plan->flow)
{
Assert(!IsA(result_plan, Motion));
result_plan->flow = pull_up_Flow(result_plan,
result_plan->lefttree,
(current_pathkeys != NIL));
}
/* Marshal implicit results. Return explicit result. */
ctx->current_pathkeys = current_pathkeys;
return result_plan;
}
/*
* Function make_two_stage_agg_plan
*
* Construct a two-stage aggregation plan.
*/
static Plan *
make_two_stage_agg_plan(PlannerInfo *root,
MppGroupContext *ctx)
{
Query *parse = root->parse;
List *prelim_tlist = NIL;
List *final_tlist = NIL;
List *final_qual = NIL;
List *distinctExpr = NIL;
List *groupExprs = NIL;
List *current_pathkeys;
Plan *result_plan;
QualCost tlist_cost;
AggStrategy aggstrategy;
int i;
int numGroupCols;
AttrNumber *groupColIdx;
AttrNumber *prelimGroupColIdx;
Path *path = ctx->best_path; /* no use for ctx->cheapest_path */
long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 :
(*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX :
(long)*(ctx->p_dNumGroups);
/* Copy these from context rather than using them directly because we may
* scribble on them in plan_grouping_extension(). It would be good to
* clean this up, but not today.
*/
numGroupCols = ctx->numGroupCols;
groupColIdx = ctx->groupColIdx;
/* Create the base plan which will serve as the outer plan (argument)
* of the partial Agg node.
*/
if (ctx->subplan == NULL)
{
result_plan = create_plan(root, path);
current_pathkeys = path->pathkeys;
/* Instead of the flat target list produced by create_plan above, use
* the sub_tlist constructed in cdb_grouping_planner. This consists
* of just the (levelsup==0) vars mentioned in the SELECT and HAVING
* clauses plus entries for any GROUP BY expressions that are not
* simple Vars. (This is the same sub_tlist as used in 1-stage
* aggregation and in normal aggregation in grouping_planner).
*
* If the base plan is of a type that can't project, add a Result
* node to carry the new target list, else install it directly.
* (Though the result node may not always be necessary, it is safe,
* and superfluous Result nodes are removed later.)
*/
result_plan = plan_pushdown_tlist(result_plan, ctx->sub_tlist);
/* Account for the cost of evaluation of the sub_tlist. */
cost_qual_eval(&tlist_cost, ctx->sub_tlist, root);
result_plan->startup_cost += tlist_cost.startup;
result_plan->total_cost +=
tlist_cost.startup +
tlist_cost.per_tuple * result_plan->plan_rows;
}
else
{
result_plan = ctx->subplan;
current_pathkeys = ctx->current_pathkeys;
}
/* At this point result_plan produces the input relation for two-stage
* aggregation.
*
* Begin by preconditioning the input, if necessary, to collocate on
* non-distinct values of a single DISTINCT argument.
*/
switch ( ctx->prep )
{
case MPP_GRP_PREP_NONE:
break;
case MPP_GRP_PREP_HASH_DISTINCT:
Assert(list_length( ctx->agg_counts->dqaArgs) == 1 );
Assert( ctx->agg_counts->dqaArgs != NIL);
if (!ctx->distinctkey_collocate)
{
distinctExpr = list_make1(linitial(ctx->agg_counts->dqaArgs));
distinctExpr = copyObject(distinctExpr);
result_plan = (Plan*)make_motion_hash(root, result_plan, distinctExpr);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows);
current_pathkeys = NIL; /* No longer sorted. */
}
break;
case MPP_GRP_PREP_FOCUS_QD:
case MPP_GRP_PREP_FOCUS_QE:
case MPP_GRP_PREP_HASH_GROUPS:
case MPP_GRP_PREP_BROADCAST:
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("unexpected call for two-stage aggregation")));
break; /* Never */
}
/*
* Get the target lists for the preliminary and final aggregations and
* the qual (HAVING clause) for the final aggregation based on the target
* list of the base plan. Grouping attributes go on front of preliminary
* target list.
*/
generate_multi_stage_tlists(ctx,
&prelim_tlist,
NULL,
&final_tlist,
&final_qual);
/*
* Since the grouping attributes, if any, are on the front and in order
* on the preliminary targetlist, we need a different vector of grouping
* attribute numbers: (1, 2, 3, ...). Later, we'll need
*/
prelimGroupColIdx = NULL;
if ( numGroupCols > 0 )
{
prelimGroupColIdx = (AttrNumber*)palloc(numGroupCols * sizeof(AttrNumber));
for ( i = 0; i < numGroupCols; i++ )
prelimGroupColIdx[i] = i+1;
}
/*
* Add the Preliminary Agg Node.
*
* When this aggregate is a ROLLUP, we add a sequence of preliminary Agg node.
*/
/* Determine the aggregation strategy to use. */
if ( ctx->use_hashed_grouping )
{
aggstrategy = AGG_HASHED;
current_pathkeys = NIL;
}
else
{
if (parse->groupClause)
{
if (!ctx->is_grpext && !pathkeys_contained_in(root->group_pathkeys,
current_pathkeys))
{
/* TODO -- Investigate WHY we might sort here!
*
* Good reasons would be that one of the grouping
* expressions isn't "hashable" or that too may groups
* are anticipated.
*
* A bad reason would be that the final result will be in
* order of the grouping key. (Redistribution will remove
* the ordering.)
*/
result_plan = (Plan *)
make_sort_from_groupcols(root,
parse->groupClause,
groupColIdx,
false,
result_plan);
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
}
aggstrategy = AGG_SORTED;
/* The AGG node will not change the sort ordering of its
* groups, so current_pathkeys describes the result too.
*/
}
else
{
aggstrategy = AGG_PLAIN;
current_pathkeys = NIL; /* One row, no sort order */
}
}
if (!ctx->is_grpext)
{
result_plan = (Plan *) make_agg(root,
prelim_tlist,
NIL, /* no havingQual */
aggstrategy, root->config->gp_hashagg_streambottom,
numGroupCols,
groupColIdx,
numGroups,
0, /* num_nullcols */
0, /* input_grouping */
0, /* grouping */
0, /* rollup_gs_times */
ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace,
result_plan);
/* May lose useful locus and sort. Unlikely, but could do better. */
mark_plan_strewn(result_plan);
current_pathkeys = NIL;
}
else
{
result_plan = plan_grouping_extension(root, path, ctx->tuple_fraction,
ctx->use_hashed_grouping,
&prelim_tlist, ctx->sub_tlist,
true, true,
NIL, /* no havingQual */
&numGroupCols,
&groupColIdx,
ctx->agg_counts,
ctx->canonical_grpsets,
ctx->p_dNumGroups,
&(ctx->querynode_changed),
&current_pathkeys,
result_plan);
/* Since we add Grouping as an additional grouping column,
* we need to add it into prelimGroupColIdx. */
if (prelimGroupColIdx != NULL)
prelimGroupColIdx = (AttrNumber *)
repalloc(prelimGroupColIdx,
numGroupCols * sizeof(AttrNumber));
else
prelimGroupColIdx = (AttrNumber *)
palloc0(numGroupCols * sizeof(AttrNumber));
Assert(numGroupCols >= 2);
prelimGroupColIdx[numGroupCols-1] = groupColIdx[numGroupCols-1];
prelimGroupColIdx[numGroupCols-2] = groupColIdx[numGroupCols-2];
}
/*
* Add Intermediate Motion to Gather or Hash on Groups
*/
switch ( ctx->type )
{
case MPP_GRP_TYPE_GROUPED_2STAGE:
groupExprs = NIL;
Assert(numGroupCols > 0);
for ( i = 0; i < numGroupCols; i++)
{
TargetEntry *tle;
/* skip Grouping/GroupId columns */
if (ctx->is_grpext && (i == numGroupCols-1 || i == numGroupCols-2))
continue;
tle = get_tle_by_resno(prelim_tlist, prelimGroupColIdx[i]);
groupExprs = lappend(groupExprs, copyObject(tle->expr));
}
result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows);
break;
case MPP_GRP_TYPE_PLAIN_2STAGE:
result_plan = (Plan*)make_motion_gather_to_QE(result_plan, false);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows * root->config->cdbpath_segments);
break;
case MPP_GRP_TYPE_NONE:
case MPP_GRP_TYPE_BASEPLAN:
case MPP_GRP_TYPE_GROUPED_DQA_2STAGE:
case MPP_GRP_TYPE_PLAIN_DQA_2STAGE:
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("unexpected use of 2-stage aggregation")));
break; /* Never */
}
/*
* Add Sort on Groups if needed for AGG_SORTED strategy
*/
if (aggstrategy == AGG_SORTED)
{
result_plan = (Plan *)
make_sort_from_groupcols(root,
parse->groupClause,
prelimGroupColIdx,
ctx->is_grpext,
result_plan);
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
}
result_plan = add_second_stage_agg(root,
true,
prelim_tlist,
final_tlist,
final_qual,
aggstrategy,
numGroupCols,
prelimGroupColIdx,
0, /* num_nullcols */
0, /* input_grouping */
ctx->grouping,
0, /* rollup_gs_times */
*ctx->p_dNumGroups,
ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace,
"partial_aggregation",
&current_pathkeys,
result_plan,
!ctx->is_grpext,
true);
if (ctx->is_grpext)
{
ListCell *lc;
bool found = false;
((Agg *)result_plan)->inputHasGrouping = true;
/*
* We want to make sure that the targetlist of result plan contains
* either GROUP_ID or a targetentry to represent the value of
* GROUP_ID from the subplans. This is because we may need this
* entry to determine if a tuple will be outputted repeatly, by
* the later Repeat node. In the current grouping extension
* planner, if there is no GROUP_ID entry, then it must be the last
* entry in the targetlist of the subplan.
*/
foreach (lc, result_plan->targetlist)
{
TargetEntry *te = (TargetEntry *)lfirst(lc);
/*
* Find out if GROUP_ID in the final targetlist. It should
* point to the last attribute in the subplan targetlist.
*/
if (IsA(te->expr, Var))
{
Var *var = (Var *)te->expr;
if (var->varattno == list_length(prelim_tlist))
{
found = true;
break;
}
}
}
if (!found)
{
/* Add a new target entry in the targetlist which point to
* GROUP_ID attribute in the subplan. Mark this entry
* as Junk.
*/
TargetEntry *te = get_tle_by_resno(prelim_tlist,
list_length(prelim_tlist));
Expr *expr;
TargetEntry *new_te;
expr = (Expr *)makeVar(1,
te->resno,
exprType((Node *)te->expr),
exprTypmod((Node *)te->expr),
0);
new_te = makeTargetEntry(expr,
list_length(result_plan->targetlist) + 1,
"group_id",
true);
result_plan->targetlist = lappend(result_plan->targetlist,
new_te);
}
}
/* Marshal implicit results. Return explicit result. */
ctx->current_pathkeys = current_pathkeys;
ctx->querynode_changed = true;
return result_plan;
}
/*
* Function make_three_stage_agg_plan
*
* Construct a three-stage aggregation plan involving DQAs (DISTINCT-qualified
* aggregate functions.
*
* Such a plan will always involve the following three aggregation phases:
*
* - preliminary -- remove duplicate (grouping key, DQA argument) values
* from an arbitrarily partitioned input; pre-aggregate plain aggregate
* functions.
*
* - intermediate -- remove duplicate (grouping key, DQA argument) values
* from an input partitioned on the grouping key; pre-aggregate the
* pre-aggregated results of preliminary plain aggregate functions.
*
* - final -- apply ordinary aggregation to DQA arguments (now distinct
* within their group) and final aggregation to the pre-aggregated results
* of the previous phase.
*
* In addition, if there is more than one DQA in the query, the plan will
* join the results of the individual three-phase aggregations into the
* final result.
*
* The preliminary aggregation phase occurs prior to the collocating
* motion and is planned independently on the theory that any ordering
* will be disrupted by the motion. There are cases where this isn't
* necessarily the case, but they are unexploited for now.
*
* The intermediate and final aggregation phases...
*/
static Plan *
make_three_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx)
{
List *current_pathkeys;
Plan *result_plan;
QualCost tlist_cost;
Path *path = ctx->best_path; /* no use for ctx->cheapest_path */
/* We assume that we are called only when
* - there are no grouping extensions (like ROLLUP),
* - the input is partitioned and needs no preparatory Motion,
* - the required transformation involves DQAs.
*/
Assert ( !is_grouping_extension(ctx->canonical_grpsets) );
Assert ( ctx->prep == MPP_GRP_PREP_NONE );
Assert ( ctx->type == MPP_GRP_TYPE_GROUPED_DQA_2STAGE
|| ctx->type == MPP_GRP_TYPE_PLAIN_DQA_2STAGE );
/* Create the base plan which will serve as the outer plan (argument)
* of the partial Agg node(s).
*/
if (ctx->subplan == NULL)
{
result_plan = create_plan(root, path);
current_pathkeys = path->pathkeys;
/* Instead of the flat target list produced above, use the sub_tlist
* constructed in cdb_grouping_planner. Add a Result node if the
* base plan can't project. (This may be unnecessary, but, if so,
* the Result node will be removed later.)
*/
result_plan = plan_pushdown_tlist(result_plan, ctx->sub_tlist);
Assert(result_plan->flow);
/* Account for the cost of evaluation of the sub_tlist. */
cost_qual_eval(&tlist_cost, ctx->sub_tlist, root);
result_plan->startup_cost += tlist_cost.startup;
result_plan->total_cost +=
tlist_cost.startup +
tlist_cost.per_tuple * result_plan->plan_rows;
}
else
{
result_plan = ctx->subplan;
current_pathkeys = ctx->current_pathkeys;
}
/* Use caller specified join_strategy: None, Cross, Hash, or Merge. */
prepare_dqa_pruning_tlists(ctx);
if ( list_length(ctx->agg_counts->dqaArgs) == 1 )
{
/* Note: single-DQA plans don't require a join and are handled
* specially by make_plan_for_one_dqa so we can return the result
* directly.
*/
Query *query;
result_plan = make_plan_for_one_dqa(root, ctx, 0,
result_plan, &query);
memcpy(root->parse, query, sizeof(Query));
pfree(query);
}
else
{
/* Multi-DQA plans are trickier because of the need to consider input
* sharing and the need to join the coplans back together.
*/
List *share_partners;
int i;
List *rtable = NIL;
if ( ctx->use_sharing )
{
share_partners = share_plan(root, result_plan, ctx->numDistinctCols);
}
else
{
share_partners = NIL;
share_partners = lappend(share_partners, result_plan);
for ( i = 1; i < ctx->numDistinctCols; i++ )
{
share_partners = lappend(share_partners, copyObject(result_plan));
}
}
/* Construct a coplan for each distinct DQA argument. */
for ( i = 0; i < ctx->numDistinctCols; i++ )
{
char buffer[50];
int j;
ListCell *l;
Alias *eref;
Plan *coplan;
Query *coquery;
coplan = (Plan*)list_nth(share_partners,i);
coplan = make_plan_for_one_dqa(root, ctx, i,
coplan, &coquery);
eref = makeNode(Alias);
sprintf(buffer, "dqa_coplan_%d", i+1);
eref->aliasname = pstrdup(buffer);
eref->colnames = NIL;
j = 1;
foreach (l, coplan->targetlist)
{
TargetEntry *tle = (TargetEntry*)lfirst(l);
Value *colname = get_tle_name(tle, coquery->rtable, buffer);
eref->colnames = lappend(eref->colnames, colname);
j++;
}
rtable = lappend(rtable,
package_plan_as_rte(coquery, coplan, eref, NIL));
ctx->dqaArgs[i].coplan = add_subqueryscan(root, NULL, i+1, coquery, coplan);
}
/* Begin with the first coplan, then join in each suceeding coplan. */
result_plan = ctx->dqaArgs[0].coplan;
for ( i = 1; i < ctx->numDistinctCols; i++ )
{
result_plan = join_dqa_coplan(root, ctx, result_plan, i);
}
/* Finalize the last join plan so it has the correct target list
* and having qual.
*/
ctx->top_tlist = result_plan->targetlist;
result_plan->targetlist = (List*) finalize_split_expr((Node*) ctx->fin_tlist, ctx);
result_plan->qual = (List*) finalize_split_expr((Node*) ctx->fin_hqual, ctx);
/*
* Reconstruct the flow since the targetlist for the result_plan may have
* changed.
*/
result_plan->flow = pull_up_Flow(result_plan,
result_plan->lefttree,
true);
/* Need to adjust root. Is this enuf? I think so. */
root->parse->rtable = rtable;
root->parse->targetList = copyObject(result_plan->targetlist);
}
// Rebuild arrays for RelOptInfo and RangeTblEntry for the PlannerInfo
// since the underlying range tables have been transformed
rebuild_simple_rel_and_rte(root);
return result_plan;
}
/* Helper for qsort in planDqaJoinOrder. */
int compareDqas(const void *larg, const void *rarg)
{
double lft = ((DqaInfo*)larg)->num_rows;
double rgt = ((DqaInfo*)rarg)->num_rows;
return (lft < rgt)? -1 : (lft == rgt)? 0 : 1;
}
/* Collect per distinct DQA argument information for use in single- and
* multiple-DQA planning and cache it in the context as a new array of
* DqaInfo structures anchored at ctx->dqaArgs. The order of elements
* in the array determines join order for a multi-DQA plan.
*
* Note: The original list of distinct DQA arguments was collected by
* the count_agg_clauses earlier in planning. Later, make_subplan_tlist
* used it to guarantee that the DQA arguments have target entries with
* non-zero sortgroupref values and to generate vector ctx->distinctColIdx
* to locate those entries. Here, however, we use that vector to locate
* the DQA arguments and reorder the vector to agree with join order.
*/
void planDqaJoinOrder(PlannerInfo *root, MppGroupContext *ctx,
double input_rows)
{
int i;
DqaInfo *args;
Node *distinctExpr;
Assert( ctx->numDistinctCols == list_length(ctx->agg_counts->dqaArgs) );
/* Collect row count estimates for the partial results. */
if ( ctx->numDistinctCols == 0 )
{
ctx->dqaArgs = NULL;
return;
}
args = (DqaInfo*)palloc( ctx->numDistinctCols * sizeof(DqaInfo));
for ( i = 0; i < ctx->numDistinctCols; i++)
{
TargetEntry *dtle;
List *x;
int j;
/* Like PG and the SQL standard, we assume that a DQA may take only
* a single argument -- no REGR_SXY(DISTINCT X,Y). This is what allows
* distinctExpr to be an expression rather than a list of expressions.
*/
dtle = get_tle_by_resno(ctx->sub_tlist, ctx->distinctColIdx[i]);
distinctExpr = (Node*) dtle->expr;
x = NIL;
for ( j = 0; j < ctx->numGroupCols ; j++ )
{
TargetEntry *tle;
tle = get_tle_by_resno(ctx->sub_tlist,ctx->groupColIdx[j]);
x = lappend(x, tle->expr);
}
x = lappend(x, distinctExpr);
args[i].distinctExpr = distinctExpr; /* no copy */
args[i].base_index = dtle->resno;
args[i].num_rows = estimate_num_groups(root, x, input_rows);
args[i].can_hash = hash_safe_type(exprType(distinctExpr));
list_free(x);
}
qsort(args, ctx->numDistinctCols, sizeof(DqaInfo), compareDqas);
/* Reorder ctx->distinctColIdx to agree with join order. */
for ( i = 0; i < ctx->numDistinctCols; i++ )
{
ctx->distinctColIdx[i] = args[i].base_index;
}
ctx->dqaArgs = args;
}
/* Function make_plan_for_one_dqa
*
* Subroutine for make_three_stage_agg_plan constructs a coplan for
* the specified DQA index [0..numDistinctCols-1] which selects a DqaInfo
* entry from the context.
*
* In multi-DQA plans, coplans have minimal targetlists (just grouping
* keys, DQA arguments, and results of single aggregate functions). In
* case this is a single-DQA (join-less) plan, the coplan target list is
* "finalized" to produce the result requested by the user (which may
* include expressions over the minimal list in the targetlist and/or
* having qual).
*
* A Query (including range table) which approximates a query for the
* returned plan is stored back into *coquery_p, if coquery_p is not NULL.
*/
static Plan *
make_plan_for_one_dqa(PlannerInfo *root, MppGroupContext *ctx, int dqa_index,
Plan* result_plan, Query **coquery_p)
{
DqaCoplanType coplan_type;
List *prelim_tlist = NIL;
List *inter_tlist = NIL;
List *final_tlist = NIL;
List *final_qual = NIL;
List *groupExprs = NIL;
List *current_pathkeys;
AggStrategy aggstrategy;
AttrNumber *prelimGroupColIdx;
AttrNumber *inputGroupColIdx;
List *extendedGroupClause;
Query *original_parse;
bool groups_sorted = false;
long numGroups;
int i, n;
DqaInfo *dqaArg = &ctx->dqaArgs[dqa_index];
bool sort_coplans = ( ctx->join_strategy == DqaJoinMerge );
bool groupkeys_collocate = cdbpathlocus_collocates(ctx->input_locus, root->group_pathkeys, false /*exact_match*/);
bool need_inter_agg = false;
bool dqaduphazard = false;
bool stream_bottom_agg = root->config->gp_hashagg_streambottom; /* Take hint */
/* Planning will perturb root->parse, so we copy it's content aside
* so we can restore it later. We flat copy instead of resetting
* because code in the stack may have a local variable set to the
* value of root->parse.
*/
original_parse = makeNode(Query);
memcpy(original_parse, root->parse, sizeof(Query));
/* Our caller, make_three_stage_agg_plan, pushed ctx->sub_tlist onto
* result_plan. This contains all the keys and arguments for the
* whole query. While it would be possible to generate a smaller
* targetlist to use for this single DQA it is probably not worth
* the complexity. Just use sub_tlist as given.
*
* The DQA argument of interest is attribute dqaArg->baseIndex.
*
* Get the target lists for the preliminary, intermediate and final
* aggregations and the qual (HAVING clause) for the final aggregation
* based on the target list of the base plan. Grouping attributes go on
* front of preliminary and intermediate target lists.
*/
generate_dqa_pruning_tlists(ctx,
dqa_index,
&prelim_tlist,
&inter_tlist,
&final_tlist,
&final_qual);
/*
* For the first aggregation phases the original grouping attributes
* (maybe zero of them) must be extended to include the DQA argument
* attribute (exactly one of them) to be pruned.
*
* The grouping attributes and a single DQA argument are on the front and
* in order on the preliminary and intermediate targetlists so we need a
* new vector of grouping attributes, prelimGroupColIdx = (1, 2, 3, ...),
* for use in these aggregations. The vector inputGroupColIdx plays a
* similar role for sub_tlist.
*
* The initial-phase group clause, extendedGroupClause, is the one in
* the query (assumed to have no grouping extensions) augmented by a
* GroupClause node for the DQA argument. This is where the sort
* operator for the DQA argument is selected.
*/
{
GroupClause* gc;
TargetEntry *tle;
prelimGroupColIdx = inputGroupColIdx = NULL;
n = ctx->numGroupCols + 1; /* add the DQA argument as a grouping key */
Assert( n > 0 );
prelimGroupColIdx = (AttrNumber*)palloc(n * sizeof(AttrNumber));
inputGroupColIdx = (AttrNumber*)palloc(n * sizeof(AttrNumber));
for ( i = 0; i < n; i++ )
prelimGroupColIdx[i] = i+1;
for ( i = 0; i < ctx->numGroupCols; i++ )
inputGroupColIdx[i] = ctx->groupColIdx[i];
inputGroupColIdx[ctx->numGroupCols] = dqaArg->base_index;
gc = makeNode(GroupClause);
tle = get_tle_by_resno(ctx->sub_tlist, dqaArg->base_index);
gc->tleSortGroupRef = tle->ressortgroupref;
gc->sortop = ordering_oper_opid(exprType((Node*)dqaArg->distinctExpr));
extendedGroupClause = list_copy(root->parse->groupClause);
extendedGroupClause = lappend(extendedGroupClause,gc);
}
/*
* Determine the first-phase aggregation strategy to use. Prefer hashing
* to sorting because the benefit of the sort will be lost by the Motion
* to follow.
*/
if ( dqaArg->use_hashed_preliminary )
{
aggstrategy = AGG_HASHED;
current_pathkeys = NIL;
}
else
{
/* Here we need to sort! The input pathkeys won't contain the
* DQA argument, so just do it.
*/
result_plan = (Plan *)
make_sort_from_groupcols(root,
extendedGroupClause,
inputGroupColIdx,
false,
result_plan);
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
aggstrategy = AGG_SORTED;
/* The AGG node will not change the sort ordering of its
* groups, so current_pathkeys describes the result too.
*/
}
/*
* Preliminary Aggregation: With the pre-existing distribution, group
* by the combined grouping key and DQA argument. In the case of the
* first coplan, this phase also pre-aggregates any non-DQAs. This
* eliminates duplicate values of the DQA argument on each QE.
*/
numGroups = (dqaArg->num_rows < 0) ? 0 :
(dqaArg->num_rows > LONG_MAX) ? LONG_MAX :
(long)dqaArg->num_rows;
/*
* If the data is distributed on the distinct qualified aggregate's key
* and there is no grouping key, then we prefer to not stream the bottom agg
*/
if (dqaArg->distinctkey_collocate && ctx->numGroupCols == 0)
{
stream_bottom_agg = false;
}
result_plan = (Plan *) make_agg(root,
prelim_tlist,
NIL, /* no havingQual */
aggstrategy, stream_bottom_agg,
ctx->numGroupCols + 1,
inputGroupColIdx,
numGroups,
0, /* num_nullcols */
0, /* input_grouping */
0, /* grouping */
0, /* rollup_gs_times */
ctx->agg_counts->numAggs - ctx->agg_counts->numDistinctAggs + 1,
ctx->agg_counts->transitionSpace, /* worst case */
result_plan);
dqaduphazard = (aggstrategy == AGG_HASHED && stream_bottom_agg);
result_plan->flow = pull_up_Flow(result_plan, result_plan->lefttree, (aggstrategy == AGG_SORTED));
current_pathkeys = NIL;
/*
* Intermediate Motion: Gather or Hash on Groups to get colocation
* on the grouping key. Note that this may bring duplicate values
* of the DQA argument together on the QEs.
*/
switch ( ctx->type )
{
case MPP_GRP_TYPE_GROUPED_DQA_2STAGE:
if (!groupkeys_collocate)
{
groupExprs = NIL;
Assert(ctx->numGroupCols > 0);
for ( i = 0; i < ctx->numGroupCols; i++)
{
TargetEntry *tle;
tle = get_tle_by_resno(prelim_tlist, prelimGroupColIdx[i]);
groupExprs = lappend(groupExprs, copyObject(tle->expr));
}
result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows);
}
break;
case MPP_GRP_TYPE_PLAIN_DQA_2STAGE:
/* Assert that this is only called for a plain DQA like select count(distinct x) from foo */
Assert(ctx->numGroupCols == 0); /* No group-by */
Assert(n == 1);
/* If already collocated on DQA arg, don't redistribute */
if (!dqaArg->distinctkey_collocate)
{
TargetEntry *tle = get_tle_by_resno(ctx->sub_tlist, dqaArg->base_index);
Assert(tle);
groupExprs = lappend(NIL, copyObject(tle->expr));
result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows);
}
break;
case MPP_GRP_TYPE_NONE:
case MPP_GRP_TYPE_BASEPLAN:
case MPP_GRP_TYPE_GROUPED_2STAGE:
case MPP_GRP_TYPE_PLAIN_2STAGE:
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("unexpected use of DQA pruned 2-phase aggregation")));
break; /* Never */
}
current_pathkeys = NIL;
groups_sorted = false;
if ( sort_coplans )
{
coplan_type = dqaArg->coplan_type_sorted;
}
else
{
coplan_type = dqaArg->coplan_type_cheapest;
}
if ( dqaduphazard ||
(!dqaArg->distinctkey_collocate && !groupkeys_collocate) )
{
/* Intermediate Aggregation: Grouping key values are colocated so group
* by the combined grouping key and DQA argument while intermediate-
* aggregating any non-DQAs. This once again (and finally) eliminates
* duplicate values of the DQA argument on each QE.
*/
need_inter_agg = true;
switch (coplan_type)
{
case DQACOPLAN_GGS:
case DQACOPLAN_PGS:
aggstrategy = AGG_SORTED;
/* pre-sort required on combined grouping key and DQA argument */
result_plan = (Plan *)
make_sort_from_groupcols(root,
extendedGroupClause,
prelimGroupColIdx,
false,
result_plan);
groups_sorted = true;
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
break;
case DQACOPLAN_GSH:
case DQACOPLAN_SHH:
case DQACOPLAN_HH:
case DQACOPLAN_PH:
aggstrategy = AGG_HASHED;
groups_sorted = false;
break;
}
result_plan = add_second_stage_agg(root,
true,
prelim_tlist,
inter_tlist,
NULL,
aggstrategy,
ctx->numGroupCols + 1,
prelimGroupColIdx,
0, /* num_nullcols */
0, /* input_grouping */
0, /* grouping */
0, /* rollup_gs_times */
dqaArg->num_rows,
ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace,
"partial_aggregation",
&current_pathkeys,
result_plan,
true, false);
}
/* Final Aggregation: Group by the grouping key, aggregate the now
* distinct values of the DQA argument using non-distinct-qualified
* aggregation, final aggregate the intermediate values of any non-DQAs.
*/
switch (coplan_type)
{
case DQACOPLAN_GSH:
/* pre-sort required on grouping key */
result_plan = (Plan *)
make_sort_from_groupcols(root,
root->parse->groupClause,
prelimGroupColIdx,
false,
result_plan);
groups_sorted = true;
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
/* Fall though. */
case DQACOPLAN_GGS:
aggstrategy = AGG_SORTED;
break;
case DQACOPLAN_SHH:
case DQACOPLAN_HH:
aggstrategy = AGG_HASHED;
groups_sorted = false;
break;
case DQACOPLAN_PGS:
case DQACOPLAN_PH:
/* plainagg */
aggstrategy = AGG_PLAIN;
groups_sorted = false;
break;
}
/**
* In the case where there is no grouping key, we need to gather up all the rows in a single segment to compute the final aggregate.
*/
if ( ctx->type == MPP_GRP_TYPE_PLAIN_DQA_2STAGE)
{
/* Assert that this is only called for a plain DQA like select count(distinct x) from foo */
Assert(ctx->numGroupCols == 0); /* No grouping columns */
Assert(n == 1);
result_plan = (Plan*)make_motion_gather_to_QE(result_plan, false);
result_plan->total_cost +=
incremental_motion_cost(result_plan->plan_rows,
result_plan->plan_rows * root->config->cdbpath_segments);
}
result_plan = add_second_stage_agg(root,
true,
need_inter_agg ? inter_tlist : prelim_tlist,
final_tlist,
final_qual,
aggstrategy,
ctx->numGroupCols,
prelimGroupColIdx,
0, /* num_nullcols */
0, /* input_grouping */
ctx->grouping,
0, /* rollup_gs_times */
*ctx->p_dNumGroups,
ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace,
"partial_aggregation",
&current_pathkeys,
result_plan,
true,
false);
/* Final sort */
switch (coplan_type)
{
case DQACOPLAN_SHH:
/* post-sort required */
result_plan = (Plan *)
make_sort_from_groupcols(root,
root->parse->groupClause,
prelimGroupColIdx,
false,
result_plan);
groups_sorted = true;
current_pathkeys = root->group_pathkeys;
mark_sort_locus(result_plan);
break;
case DQACOPLAN_GGS:
case DQACOPLAN_GSH:
case DQACOPLAN_HH:
case DQACOPLAN_PGS:
case DQACOPLAN_PH:
break;
}
/* Marshal implicit results. Return explicit result. */
if ( groups_sorted )
{
/* The following settings work correctly though they seem wrong.
* Though we changed the query tree, we say that we did not so that
* planner.c will notice the useful sort order we have produced.
* We also reset the current pathkeys to the original group keys.
* (Though our target list may differ, its attribute-wise ordering
* is on the group keys.)
*/
ctx->current_pathkeys = root->group_pathkeys; /* current_pathkeys are wrong! */
ctx->querynode_changed = false;
}
else
{
ctx->current_pathkeys = NIL;
ctx->querynode_changed = true;
}
/* If requested, copy our modified Query (at root->parse) for caller. */
if ( coquery_p != NULL )
{
*coquery_p = makeNode(Query);
memcpy(*coquery_p, root->parse, sizeof(Query));
}
/* Restore the original referent of root->parse. */
memcpy(root->parse, original_parse, sizeof(Query));
pfree(original_parse);
return result_plan;
}
static Plan *
join_dqa_coplan(PlannerInfo *root, MppGroupContext *ctx, Plan *outer, int dqa_index)
{
Plan *join_plan = NULL;
Plan *inner = ctx->dqaArgs[dqa_index].coplan;
List *join_tlist = NIL;
List *tlist = NIL;
Index outer_varno = 1;
Index inner_varno = dqa_index + 1;
Index varno = 1;
int i, ng, nd;
/* Make the target list for this join. The outer and inner target lists
* will look like
* (<g'> <D0'> ... <Dn-1'> <F'>) and (<g'> <Dn>)
* or
* (<g'> <D0> <F>) and (<g'> <Dn>)
* The join target list should look like
* (<g'> <D0'> ... <Dn'> <F'>)
*/
/* Use varno 1 for grouping key. */
join_tlist = make_vars_tlist(ctx->grps_tlist, varno, 0);
ng = list_length(join_tlist); /* (<g'>) */
nd = ng + list_length(ctx->dref_tlists[0]);/* (<g'> <D0'>) */
for ( i = 0; i <= dqa_index; i++ )
{
tlist = make_vars_tlist(ctx->dref_tlists[i], varno+i, ng);
join_tlist = seq_tlist_concat(join_tlist, tlist); /* (... <Di'>) */
}
tlist = make_vars_tlist(ctx->frefs_tlist, varno, nd);
join_tlist = seq_tlist_concat(join_tlist, tlist); /* (... <F'>) */
/* Make the join which will be either a cartesian product (in case of
* scalar aggregation) or a merge or hash join (in case of grouped
* aggregation.)
*/
if ( ctx->numGroupCols > 0 ) /* MergeJoin: 1x1 */
{
List *joinclause = NIL;
List *hashclause = NIL;
AttrNumber attrno;
Insist( ctx->join_strategy == DqaJoinMerge || ctx->join_strategy == DqaJoinHash );
/* Make the join clause -- a conjunction of IS NOT DISTINCT FROM
* predicates on the attributes of the grouping key.
*/
for ( attrno = 1; attrno <= ctx->numGroupCols; attrno++ )
{
Expr *qual;
Var *outer_var;
Var *inner_var;
TargetEntry *tle = get_tle_by_resno(outer->targetlist, attrno);
Assert( tle && IsA(tle->expr, Var) );
outer_var = (Var*)copyObject(tle->expr);
outer_var->varno = outer_varno;
outer_var->varnoold = outer_varno;
inner_var = (Var*)copyObject(tle->expr);
inner_var->varno = inner_varno;
inner_var->varnoold = inner_varno;
/* outer should always be on the left */
qual = make_op(NULL, list_make1(makeString("=")),
(Node*) outer_var,
(Node*) inner_var, -1);
if ( ctx->join_strategy == DqaJoinHash )
{
hashclause = lappend(hashclause, copyObject(qual));
}
qual->type = T_DistinctExpr;
qual = make_notclause(qual);
joinclause = lappend(joinclause, qual);
}
if ( ctx->join_strategy == DqaJoinHash )
{
/* Make the hash join. */
bool motion_added_outer = false;
bool motion_added_inner = false;
outer = add_motion_to_dqa_child(outer, root, &motion_added_outer);
inner = add_motion_to_dqa_child(inner, root, &motion_added_inner);
bool prefetch_inner = motion_added_outer || motion_added_inner;
if (motion_added_outer || motion_added_inner)
{
ctx->current_pathkeys = NULL;
}
Hash *hash_plan = make_hash(inner);
join_plan = (Plan*)make_hashjoin(join_tlist,
NIL, /* joinclauses */
NIL, /* otherclauses */
hashclause, /* hashclauses */
joinclause, /* hashqualclauses */
outer, (Plan*)hash_plan,
JOIN_INNER);
((Join *) join_plan)->prefetch_inner = prefetch_inner;
}
else
{
/* Make the merge join noting that the outer plan produces rows
* distinct in the join key. (So does the inner, for that matter,
* but the MJ algorithm is only sensitive to the outer.)
*/
join_plan = (Plan*)make_mergejoin(join_tlist,
NIL, NIL,
joinclause,
outer, inner,
JOIN_INNER);
((MergeJoin*)join_plan)->unique_outer = true;
}
}
else /* NestLoop: Cartesian product: 1x1 */
{
Insist(ctx->join_strategy == DqaJoinCross);
join_plan = (Plan*)make_nestloop(join_tlist,
NIL, NIL,
outer, inner,
JOIN_INNER);
((NestLoop*)join_plan)->singleton_outer = true;
}
join_plan->startup_cost = outer->startup_cost + inner->startup_cost;
join_plan->plan_rows = outer->plan_rows;
join_plan->plan_width = outer->plan_width + inner->plan_width; /* too high for MJ */
join_plan->total_cost = outer->total_cost + inner->total_cost;
join_plan->total_cost += cpu_tuple_cost * join_plan->plan_rows;
join_plan->flow = pull_up_Flow(join_plan, join_plan->lefttree, true);
return join_plan;
}
/*
* Function make_subplan_tlist (for multi-phase aggregation)
*
* The input to the "lower" Agg node will use a "flattened" tlist (having
* just the (levelsup==0) vars mentioned in the targetlist and HAVING qual.
* This is similar to the target list produced by make_subplanTargetList
* and used for 1-stage aggregation in grouping_planner.
*
* The result target list contains entries for all the simple Var attributes
* of the original SELECT and HAVING clauses, plus entries for any GROUP BY
* expressions and DQA arguments that are not simple Vars.
*
* The implicit results are
*
* - the number of grouping attributes and a vector of their positions
* (which are equal to their resno's) in the target list delivered through
* pointers pnum_gkeys and pcols_gkeys, and
*
* - the number of distinct arguments to DISTINCT-qualified aggregate
* function and a vector of their positions (which are equal to their
* resno's) in the target list delivered through pointers pnum_dqas and
* pcols_dqas. These arguments are guaranteed (by the call to function
* augment_subplan_tlist) to appear as attributes of the subplan target
* list.
*
* There are no similar results for sort and distinct attributes since
* they don't necessarily appear in the subplan target list.
*/
List *make_subplan_tlist(List *tlist, Node *havingQual,
List *grp_clauses,
int *pnum_gkeys, AttrNumber **pcols_gkeys,
List *dqa_args,
int *pnum_dqas, AttrNumber **pcols_dqas)
{
List *sub_tlist;
List *extravars;
int num_gkeys;
AttrNumber *cols_gkeys;
Assert( dqa_args != NIL? pnum_dqas != NULL && pcols_dqas != NULL: true );
sub_tlist = flatten_tlist(tlist);
extravars = pull_var_clause(havingQual, false);
sub_tlist = add_to_flat_tlist(sub_tlist, extravars, false /* resjunk */);
list_free(extravars);
num_gkeys = num_distcols_in_grouplist(grp_clauses);
if (num_gkeys > 0)
{
int keyno = 0;
ListCell *l;
List *tles;
cols_gkeys = (AttrNumber*) palloc(sizeof(AttrNumber) * num_gkeys);
tles = get_sortgroupclauses_tles(grp_clauses, tlist);
foreach (l, tles)
{
Node *expr;
TargetEntry *tle, *sub_tle = NULL;
ListCell *sl;
tle = (TargetEntry*) lfirst(l);
expr = (Node*)tle->expr;
/* Find or make a matching sub_tlist entry. */
foreach(sl, sub_tlist)
{
sub_tle = (TargetEntry *) lfirst(sl);
if (equal(expr, sub_tle->expr) && (sub_tle->ressortgroupref == 0))
break;
}
if (!sl)
{
sub_tle = makeTargetEntry((Expr*) expr,
list_length(sub_tlist) + 1,
NULL,
false);
sub_tlist = lappend(sub_tlist, sub_tle);
}
/* Set its group reference and save its resno */
sub_tle->ressortgroupref = tle->ressortgroupref;
cols_gkeys[keyno++] = sub_tle->resno;
}
*pnum_gkeys = num_gkeys;
*pcols_gkeys = cols_gkeys;
}
else
{
*pnum_gkeys = 0;
*pcols_gkeys = NULL;
}
if ( dqa_args != NIL )
sub_tlist = augment_subplan_tlist(sub_tlist, dqa_args, pnum_dqas, pcols_dqas, true);
return sub_tlist; /* Possibly modified by appending expression entries. */
}
/* Function augment_subplan_tlist
*
* Called from make_subplan_tlist, not directly.
*
* Make a target list like the input that includes sortgroupref'd entries
* for the expressions in exprs. Note that the entries in the input expression
* list must be distinct.
*
* New entries corresponding to the expressions in the input exprs list
* (if any) are added to the argument list. Existing entries are modified
* (if necessary) in place.
*
* Return the (modified) input targetlist.
*
* Implicitly return an array of resno values for exprs in (pnum, *pcols), if
* return_resno is true.
*/
List *augment_subplan_tlist(List *tlist, List *exprs, int *pnum, AttrNumber **pcols,
bool return_resno)
{
int num;
AttrNumber *cols = NULL;
num = list_length(exprs); /* Known to be distinct. */
if (num > 0)
{
int keyno = 0;
ListCell *lx, *lt;
TargetEntry *tle, *matched_tle;
Index max_sortgroupref = 0;
foreach (lt, tlist)
{
tle = (TargetEntry*)lfirst(lt);
if ( tle->ressortgroupref > max_sortgroupref )
max_sortgroupref = tle->ressortgroupref;
}
if (return_resno)
cols = (AttrNumber*) palloc(sizeof(AttrNumber) * num);
foreach (lx, exprs)
{
Node *expr = (Node*)lfirst(lx);
matched_tle = NULL;
foreach (lt, tlist)
{
tle = (TargetEntry*)lfirst(lt);
if ( equal(expr, tle->expr) )
{
matched_tle = tle;
break;
}
}
if ( matched_tle == NULL )
{
matched_tle = makeTargetEntry((Expr*) expr,
list_length(tlist) + 1,
NULL,
false);
tlist = lappend(tlist, matched_tle);
}
if ( matched_tle->ressortgroupref == 0 )
matched_tle->ressortgroupref = ++max_sortgroupref;
if (return_resno)
cols[keyno++] = matched_tle->resno;
}
if (return_resno)
{
*pnum = num;
*pcols = cols;
}
}
else
{
if (return_resno)
{
*pnum = 0;
*pcols = NULL;
}
}
/* Note that result is a copy, possibly modified by appending expression
* targetlist entries and/or updating sortgroupref values.
*/
return tlist;
}
/*
* Function describe_subplan_tlist (for single-phase aggregation)
*
* Implicitly return extra information about a supplied target list with having
* qual and and corresponding sub plan target list for single-phase aggregation.
* This does, essentially, what make_subplan_tlist does, but for a precalculated
* subplan target list. In particular
*
* - it constructs the grouping key -> subplan target list resno map.
* - it may extend the subplan targetlist for DQAs and record entries
* in the DQA argument -> subplan target list resno map.
*
* In the later case, the subplan target list may be extended, so return it.
* This function is for the case when a subplan target list (not a whole plan)
* is supplied to cdb_grouping_planner.
*/
List *describe_subplan_tlist(List *sub_tlist,
List *tlist, Node *havingQual,
List *grp_clauses, int *pnum_gkeys, AttrNumber **pcols_gkeys,
List *dqa_args, int *pnum_dqas, AttrNumber **pcols_dqas)
{
int nkeys;
AttrNumber *cols;
nkeys = num_distcols_in_grouplist(grp_clauses);
if ( nkeys > 0 )
{
List *tles;
ListCell *lc;
int keyno = 0;
cols = (AttrNumber*)palloc0(sizeof(AttrNumber)*nkeys);
tles = get_sortgroupclauses_tles(grp_clauses, tlist);
foreach (lc, tles)
{
TargetEntry *tle;
TargetEntry *sub_tle;
tle = (TargetEntry*)lfirst(lc);
sub_tle = tlist_member((Node*)tle->expr, sub_tlist);
Assert(tle->ressortgroupref != 0);
Assert(tle->ressortgroupref == sub_tle->ressortgroupref);
Assert(keyno < nkeys);
cols[keyno++] = sub_tle->resno;
}
*pnum_gkeys = nkeys;
*pcols_gkeys = cols;
}
else
{
*pnum_gkeys = 0;
*pcols_gkeys = NULL;
}
if ( dqa_args != NIL )
sub_tlist = augment_subplan_tlist(sub_tlist, dqa_args, pnum_dqas, pcols_dqas, true);
return sub_tlist;
}
/*
* Generate targetlist for a SubqueryScan node to wrap the stage-one
* Agg node (partial aggregation) of a 2-Stage aggregation sequence.
*
* varno: varno to use in generated Vars
* input_tlist: targetlist of this node's input node
*
* Result is a "flat" (all simple Var node) targetlist in which
* varattno and resno match and are sequential.
*
* This function also returns a map between the original targetlist
* entry to new target list entry using resno values. The index
* positions for resno_map represent the original resnos, while the
* array elements represent the new resnos. The array is allocated
* by the caller, which should have length of list_length(input_tlist).
*/
List *
generate_subquery_tlist(Index varno, List *input_tlist,
bool keep_resjunk, int **p_resno_map)
{
List *tlist = NIL;
int resno = 1;
ListCell *j;
TargetEntry *tle;
Node *expr;
*p_resno_map = (int *)palloc0(list_length(input_tlist) * sizeof(int));
foreach(j, input_tlist)
{
TargetEntry *inputtle = (TargetEntry *) lfirst(j);
Assert(inputtle->resno == resno && inputtle->resno >= 1);
/* Don't pull up constants, always use a Var to reference the input. */
expr = (Node *) makeVar(varno,
inputtle->resno,
exprType((Node *) inputtle->expr),
exprTypmod((Node *) inputtle->expr),
0);
(*p_resno_map)[inputtle->resno - 1] = resno;
tle = makeTargetEntry((Expr *) expr,
(AttrNumber) resno++,
(inputtle->resname == NULL) ?
NULL :
pstrdup(inputtle->resname),
keep_resjunk ? inputtle->resjunk : false);
tle->ressortgroupref = inputtle->ressortgroupref;
tlist = lappend(tlist, tle);
}
return tlist;
}
/*
* Function: cdbpathlocus_collocates
*
* Is a relation with the given locus guaranteed to collocate tuples with
* non-distinct values of the key. The key is a list of pathkeys (each of
* which is a list of PathKeyItem*).
*
* Collocation is guaranteed if the locus specifies a single process or
* if the result is partitioned on a subset of the keys that must be
* collocated.
*
* We ignore onther sorts of collocation, e.g., replication or partitioning
* on a range since these cannot occur at the moment (MPP 2.3).
*/
bool cdbpathlocus_collocates(CdbPathLocus locus, List *pathkeys, bool exact_match)
{
ListCell *lc;
List *exprs = NIL;
if ( CdbPathLocus_IsBottleneck(locus) )
return true;
if ( !CdbPathLocus_IsHashed(locus) )
return false; /* Or would HashedOJ ok, too? */
if (exact_match && list_length(pathkeys) != list_length(locus.partkey))
{
return false;
}
/* Extract a list of expressions from the pathkeys. Since the locus
* presumably knows all about attribute equivalence classes, we use
* only the first item in each input path key.
*/
foreach( lc, pathkeys )
{
PathKeyItem *item;
List *lst = (List*)lfirst(lc);
Assert( list_length(lst) > 0 );
item = (PathKeyItem*)linitial(lst);
exprs = lappend(exprs, item->key);
}
/* Check for containment of locus in exprs. */
return cdbpathlocus_is_hashed_on_exprs(locus, exprs);
}
/*
* Function: cdbpathlocus_from_flow
*
* Generate a locus from a flow. Since the information needed to produce
* canonical path keys is unavailable, this function will never return a
* hashed locus.
*/
CdbPathLocus cdbpathlocus_from_flow(Flow *flow)
{
CdbPathLocus locus;
CdbPathLocus_MakeNull(&locus);
if (!flow)
return locus;
switch (flow->flotype)
{
case FLOW_SINGLETON:
if (flow->segindex == -1)
CdbPathLocus_MakeEntry(&locus);
else
CdbPathLocus_MakeSingleQE(&locus);
break;
case FLOW_REPLICATED:
CdbPathLocus_MakeReplicated(&locus);
break;
case FLOW_PARTITIONED:
CdbPathLocus_MakeStrewn(&locus);
break;
case FLOW_UNDEFINED:
default:
Insist(0);
}
return locus;
}
/*
* Generate 3 target lists for a sequence of consecutive Agg nodes.
*
* This is intended for a sequence of consecutive Agg nodes used in
* a ROLLUP. '*p_tlist3' is for the upper Agg node, and '*p_tlist2' is
* for any Agg node in the middle, and '*p_tlist1' is for the
* bottom Agg node.
*
* '*p_tlist1' and '*p_tlist2' have similar target lists. '*p_tlist3'
* is constructed based on tlist and outputs from *p_tlist2 or
* '*p_tlist1' if twostage is true.
*
* NB This function is called externally (from plangroupext.c) and not
* used in this file! Beware: the API is now legacy here!
*/
void generate_three_tlists(List *tlist,
bool twostage,
List *sub_tlist,
Node *havingQual,
int numGroupCols,
AttrNumber *groupColIdx,
List **p_tlist1,
List **p_tlist2,
List **p_tlist3,
List **p_final_qual)
{
ListCell *lc;
int resno = 1;
MppGroupContext ctx; /* Just for API matching! */
/* Similar to the final tlist entries in two-stage aggregation,
* we use consistent varno in the middle tlist entries.
*/
int middle_varno = 1;
/* Generate the top and bottom tlists by calling the multi-phase
* aggregation code in cdbgroup.c.
*/
ctx.tlist = tlist;
ctx.sub_tlist = sub_tlist;
ctx.havingQual = havingQual;
ctx.numGroupCols = numGroupCols;
ctx.groupColIdx = groupColIdx;
ctx.numDistinctCols = 0;
ctx.distinctColIdx = NULL;
generate_multi_stage_tlists(&ctx,
p_tlist1,
NULL,
p_tlist3,
p_final_qual);
/*
* Read target entries in '*p_tlist1' one by one, and construct
* the entries for '*p_tlist2'.
*/
foreach (lc, *p_tlist1)
{
TargetEntry *tle = (TargetEntry *)lfirst(lc);
Expr *new_expr;
TargetEntry *new_tle;
if (IsA(tle->expr, Aggref))
{
Aggref *aggref = (Aggref *)tle->expr;
Aggref *new_aggref = makeNode(Aggref);
new_aggref->aggfnoid = aggref->aggfnoid;
new_aggref->aggtype = aggref->aggtype;
new_aggref->args =
list_make1((Expr*)makeVar(middle_varno, tle->resno, aggref->aggtype, -1, 0));
new_aggref->agglevelsup = 0;
new_aggref->aggstar = false;
new_aggref->aggdistinct = false; /* handled in preliminary aggregation */
new_aggref->aggstage = AGGSTAGE_INTERMEDIATE;
new_expr = (Expr *)new_aggref;
}
else
{
/* Just make a new Var. */
new_expr = (Expr *)makeVar(middle_varno,
tle->resno,
exprType((Node *)tle->expr),
exprTypmod((Node *)tle->expr),
0);
}
new_tle = makeTargetEntry(new_expr, resno,
(tle->resname == NULL) ?
NULL :
pstrdup(tle->resname),
false);
new_tle->ressortgroupref = tle->ressortgroupref;
*p_tlist2 = lappend(*p_tlist2, new_tle);
resno++;
}
/*
* This may be called inside a two-stage aggregation. In this case,
* We want to make sure all entries in the '*p_tlist3' are visible.
*/
foreach (lc, *p_tlist3)
{
TargetEntry *tle = (TargetEntry *)lfirst(lc);
if (twostage)
tle->resjunk = false;
/* We also set aggstage to AGGSTAGE_INTERMEDIATE if this is in
* a two-stage aggregation, because the agg node in
* the second stage aggregation will do the finalize.
*/
if (twostage && IsA(tle->expr, Aggref))
{
Aggref *aggref = (Aggref *)tle->expr;
aggref->aggstage = AGGSTAGE_INTERMEDIATE;
}
}
}
/*
* Function: generate_multi_stage_tlists
*
* Generate target lists and having qual for multi-stage aggregation.
*
* Input is
*
* From the ctx argument:
* tlist - the preprocessed target list of the original query
* sub_tlist - the reduced target list to use as input to the aggregation
* (If use_dqa_pruning, the all DQA arguments must appear in
* this list and must have non-zero sortgrouprefs.)
* havingQual - the preprocesses having qual of the originaly query
* (in list-of-conjunct-Exprs form)
* numGroupCols - number of grouping attributes (no grouping extensions)
* groupColIdx - resnos (= attr numbers) of the grouping attributes
* numDistinctCols - number of DISTINCT-qualified argument attributes
* distinctColIdx - resnos (= attr numbers) of the DQA argument attributes
*
* From use_dqa_pruning:
* Do we want to construct the tlists to support pruning DQAs?
*
* Output is pointers to
*
* prelim_tlist - the target list of the preliminary Agg node.
* inter_tlist - an optional intermediate target list for an Agg node
* used in multi-phase DQA pruning (p_inter_tlist non-null).
* final_tlist - the target list of the final Agg node.
* final_qual - the qual of the final Agg node.
*/
void generate_multi_stage_tlists(MppGroupContext *ctx,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist,
List **p_final_qual)
{
/* Use consistent varno in final and intermediate tlist entries. It will
* refer to the sole RTE (a Subquery RTE) of a SubqueryScan. */
ctx->final_varno = 1;
/* Do we need to build an intermediate tlist in irefs_tlist? */
ctx->use_irefs_tlist = ( p_inter_tlist != NULL );
/* Don't do DQA pruning. Use prepare/generate_dqa_pruning_tlists! */
ctx->use_dqa_pruning = false;
deconstruct_agg_info(ctx);
reconstruct_agg_info(ctx,
p_prelim_tlist, p_inter_tlist,
p_final_tlist, p_final_qual);
}
/*
* Function: prepare_dqa_pruning_tlists
*
* Performs the first phase of generate_multi_phase_tlist, but waits for
* a subquent call to generate_dqa_pruning_tlists to actually produce the
* target list. (This allows for the fact that DQA pruning may involve
* several "coplans" each with its own target list requirements. This
* function lays the groundwork for all such target lists.
*/
void prepare_dqa_pruning_tlists(MppGroupContext *ctx)
{
/* Use consistent varno in final and intermediate tlist entries. It will
* refer to the sole RTE (a Subquery RTE) of a SubqueryScan. */
ctx->final_varno = 1;
/* Do we need to build an intermediate tlist in irefs_tlist? */
ctx->use_irefs_tlist = true;
/* Do we want to do DQA pruning (in case there are any DISTINCT-qualified
* aggregate functions)? */
ctx->use_dqa_pruning = true;
deconstruct_agg_info(ctx);
}
/*
* Function: generate_dqa_pruning_tlists
*
* Performs the last phase of generate_multi_phase_tlist in the context of
* DQA pruning.
*/
void generate_dqa_pruning_tlists(MppGroupContext *ctx,
int dqa_index,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist,
List **p_final_qual)
{
Assert( p_inter_tlist != NULL ); /* optional elsewhere, required here. */
Assert( ctx->use_dqa_pruning );
if ( ctx->numDistinctCols == 1 )
{
/* Finalized results for single-DQA (join-less) plan. */
reconstruct_agg_info(ctx,
p_prelim_tlist,
p_inter_tlist,
p_final_tlist,
p_final_qual);
}
else
{
/* Minimal results for multi-DQA (join) plan. */
reconstruct_coplan_info(ctx,
dqa_index,
p_prelim_tlist,
p_inter_tlist,
p_final_tlist);
*p_final_qual = NIL;
}
}
/* Function: deconstruct_agg_info
*
* Top-level deconstruction of the target list and having qual of an
* aggregate Query into intermediate structures that will later guide
* reconstruction of the various target lists and expressions involved
* in a multi-phase aggregation plan, possibly with DISTINCT-qualified
* aggregate functions (DQAs).
*/
void deconstruct_agg_info(MppGroupContext *ctx)
{
int i;
ListCell *lc;
/* Initialize temporaries to hold the parts of the preliminary target
* list under construction. */
ctx->grps_tlist = NIL;
ctx->dqa_tlist = NIL;
ctx->prefs_tlist = NIL;
ctx->irefs_tlist = NIL;
ctx->frefs_tlist = NIL;
ctx->dref_tlists = NULL;
ctx->fin_tlist = NIL;
ctx->fin_hqual = NIL;
/*
* Begin constructing the target list for the preliminary Agg node
* by placing targets for the grouping attributes on the grps_tlist
* temporary. Make sure ressortgroupref matches the original. Copying
* the expression may be overkill, but it is safe.
*/
for ( i = 0; i < ctx->numGroupCols; i++ )
{
TargetEntry *sub_tle, *prelim_tle;
sub_tle = get_tle_by_resno(ctx->sub_tlist, ctx->groupColIdx[i]);
prelim_tle = makeTargetEntry(copyObject(sub_tle->expr),
list_length(ctx->grps_tlist) + 1,
(sub_tle->resname == NULL) ?
NULL :
pstrdup(sub_tle->resname),
false);
prelim_tle->ressortgroupref = sub_tle->ressortgroupref;
prelim_tle->resjunk = false;
ctx->grps_tlist = lappend(ctx->grps_tlist, prelim_tle);
}
/*
* Continue to construct the target list for the preliminary Agg node
* by placing targets for the argument attribute of each DQA on the
* dqa_tlist temporary. Make sure ressortgroupref matches the original.
*/
for ( i = 0; i < ctx->numDistinctCols; i++ )
{
TargetEntry *sub_tle, *prelim_tle;
sub_tle = get_tle_by_resno(ctx->sub_tlist, ctx->distinctColIdx[i]);
prelim_tle = makeTargetEntry(copyObject(sub_tle->expr),
list_length(ctx->dqa_tlist) + 1,
(sub_tle->resname == NULL) ?
NULL :
pstrdup(sub_tle->resname),
false);
prelim_tle->ressortgroupref = sub_tle->ressortgroupref;
prelim_tle->resjunk = false;
ctx->dqa_tlist = lappend(ctx->dqa_tlist, prelim_tle);
}
/* Initialize the array of Aggref target lists corresponding to the
* DQA argument target list just constructed.
*/
ctx->dref_tlists = (List **)palloc0(ctx->numDistinctCols * sizeof(List*));
/*
* Derive the final target list with entries corresponding to the input
* target list, but referring to the attributes of the preliminary target
* list rather than to the input attributes. Note that this involves
* augmenting the prefs_tlist temporary as we encounter new Aggref nodes.
*/
foreach (lc, ctx->tlist)
{
TargetEntry *tle, *final_tle;
Expr *expr;
tle = (TargetEntry*)lfirst(lc);
ctx->split_aggref_sortgroupref = tle->ressortgroupref; /* for deconstruction subroutines */
expr = deconstruct_expr(tle->expr, ctx);
ctx->split_aggref_sortgroupref = 0;
final_tle = makeTargetEntry(expr,
tle->resno,
(tle->resname == NULL) ?
NULL :
pstrdup(tle->resname),
tle->resjunk);
final_tle->ressortgroupref = tle->ressortgroupref;
ctx->fin_tlist = lappend(ctx->fin_tlist, final_tle);
}
/*
* Derive the final qual while augmenting the preliminary target list. */
ctx->fin_hqual = (List*)deconstruct_expr((Expr*)ctx->havingQual, ctx);
/* Now cache some values to avoid repeated recalculation by subroutines. */
/* Use consistent varno in final, intermediate an join tlist entries.
* final refers to the sole RTE (a Subquery RTE) of a SubqueryScan.
* outer and inner to the respective inputs to a join.
*/
ctx->final_varno = 1;
ctx->outer_varno = OUTER;
ctx->inner_varno = INNER;
/* Target lists used in multi-phase planning at or above the level
* of individual DQA coplans have one of the forms
*
* [G][D0...Dn][R]
* where
* G represents the grouping key attributes (if any)
* Di represents the results of the DQAs that take the i-th
* unique DQA argument (if any)
* R represents the results of regular aggregate functions (if any)
*
* The offset at index position i is the number of attributes that
* precede the first for the i-th DQA (index origin 0). The last
* is the offset of the first attribute following the DQA attributes.
*/
ctx->dqa_offsets = palloc(sizeof(int) * (1 + ctx->numDistinctCols));
ctx->dqa_offsets[0] = ctx->numGroupCols;
for ( i = 0; i < ctx->numDistinctCols; i++ )
{
ctx->dqa_offsets[i+1] = ctx->dqa_offsets[i]
+ list_length(ctx->dref_tlists[i]);
}
}
/* Function: reconstruct_agg_info
*
* Construct the preliminay, optional intermediate, and final target lists
* and the final having qual for the aggregation plan. If we are doing
* DQA pruning, this function is appropriate only for the cases of 0 or 1
* DQA.
*
* During processing we set ctx->top_tlist to be the flat target list
* containing only the grouping key and the results of individual aggregate
* functions. This list is transient -- it drives the production of the
* final target list and having qual through finalize_split_expression.
*/
void reconstruct_agg_info(MppGroupContext *ctx,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist,
List **p_final_qual)
{
List *prelim_tlist = NIL;
List *inter_tlist = NIL;
List *final_tlist = NIL;
/* Grouping keys */
prelim_tlist = ctx->grps_tlist;
if ( p_inter_tlist != NULL )
inter_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0);
final_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0);
/* If applicable, single DQA argument, corresponding DQAs */
if ( ctx->use_dqa_pruning )
{
if ( list_length(ctx->dqa_tlist) == 1 )
{
int n = list_length(prelim_tlist);
TargetEntry *tle = (TargetEntry*)linitial(ctx->dqa_tlist);
tle->resno = n+1;
prelim_tlist = lappend(prelim_tlist, tle);
if ( p_inter_tlist != NULL )
{
inter_tlist = list_concat(inter_tlist,
make_vars_tlist(ctx->dqa_tlist,
ctx->final_varno, n));
}
final_tlist = seq_tlist_concat(final_tlist, ctx->dref_tlists[0]);
}
else if ( list_length(ctx->dqa_tlist) != 0 )
{
/* Shouldn't use this function for multi-DQA pruning. */
elog(ERROR,"Unexpected use of DISTINCT-qualified aggregate pruning");
}
}
/* Aggrefs */
prelim_tlist = seq_tlist_concat(prelim_tlist, ctx->prefs_tlist);
if ( p_inter_tlist != NULL )
{
inter_tlist = seq_tlist_concat(inter_tlist, ctx->irefs_tlist);
}
final_tlist = seq_tlist_concat(final_tlist, ctx->frefs_tlist);
/* Set implicit results */
*p_prelim_tlist = prelim_tlist;
if ( p_inter_tlist != NULL )
*p_inter_tlist = inter_tlist;
ctx->top_tlist = final_tlist;
*p_final_tlist = (List*) finalize_split_expr((Node*) ctx->fin_tlist, ctx);
*p_final_qual = (List*) finalize_split_expr((Node*) ctx->fin_hqual, ctx);
}
/* Function: reconstruct_coplan_info
*
* Construct the preliminary, intermediate, and final target lists
* for the DQA pruning aggregation coplan specified by dqa_index.
*
* Note: Similar to reconstruct_agg_info but stop short of finalization
* and is sensitive to dqa_index. Ordinarily this function would
* be used only for multiple-DQA planning.
*/
void reconstruct_coplan_info(MppGroupContext *ctx,
int dqa_index,
List **p_prelim_tlist,
List **p_inter_tlist,
List **p_final_tlist)
{
List *prelim_tlist = NIL;
List *inter_tlist = NIL;
List *final_tlist = NIL;
int n;
TargetEntry *tle;
/* Grouping keys */
prelim_tlist = copyObject(ctx->grps_tlist);
if ( p_inter_tlist != NULL )
inter_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0);
final_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0);
/* Single DQA argument, corresponding DQAs */
Assert ( ctx->use_dqa_pruning );
n = list_length(prelim_tlist);
tle = (TargetEntry*)list_nth(ctx->dqa_tlist, dqa_index);
tle->resno = n+1;
prelim_tlist = lappend(prelim_tlist, tle);
if ( p_inter_tlist != NULL )
{
List *x = list_make1(tle);
inter_tlist = list_concat(inter_tlist,
make_vars_tlist(x, ctx->final_varno, n));
list_free(x);
}
final_tlist = seq_tlist_concat(final_tlist, ctx->dref_tlists[dqa_index]);
/* Plain Aggrefs go only on the first coplan! */
if ( dqa_index == 0 )
{
prelim_tlist = seq_tlist_concat(prelim_tlist, ctx->prefs_tlist);
if ( p_inter_tlist != NULL )
{
inter_tlist = seq_tlist_concat(inter_tlist, ctx->irefs_tlist);
}
final_tlist = seq_tlist_concat(final_tlist, ctx->frefs_tlist);
}
/* Set implicit results */
*p_prelim_tlist = prelim_tlist;
if ( p_inter_tlist != NULL )
{
*p_inter_tlist = inter_tlist;
}
*p_final_tlist = final_tlist;
}
/*
* Function: deconstruct_expr
*
* Prepare an expression for execution within 2-stage aggregation.
* This involves adding targets as needed to the target list of the
* first (partial) aggregation and referring to this target list from
* the modified expression for use in the second (final) aggregation.
*/
Expr *deconstruct_expr(Expr *expr, MppGroupContext *ctx)
{
return (Expr*)deconstruct_expr_mutator((Node*)expr, ctx);
}
/*
* Function: deconstruct_expr_mutator
*
* Work for deconstruct_expr.
*/
Node* deconstruct_expr_mutator(Node *node, MppGroupContext *ctx)
{
TargetEntry *tle;
if (node == NULL)
return NULL;
if (IsA(node, Aggref))
{
Aggref *aggref = (Aggref*)node;
return split_aggref(aggref, ctx);
}
/* If the given expression is a grouping expression, replace it with
* a Var node referring to the (lower) preliminary aggregation's
* target list.
*/
tle = tlist_member_with_ressortgroupref(node, ctx->grps_tlist, ctx->split_aggref_sortgroupref);
if ( tle != NULL )
{
return (Node*) makeVar(grp_varno, tle->resno,
exprType((Node*)tle->expr),
exprTypmod((Node*)tle->expr), 0);
}
return expression_tree_mutator(node, deconstruct_expr_mutator, (void*)ctx);
}
/*
* Function: split_aggref
*
* Find or add a partial-stage Aggref expression for the argument in the
* preliminary target list under construction. Return the final-stage
* Aggref with a single Var node argument referring to the partial-stage
* Aggref. In case of a DQA argument reduction, however, there is no
* partial-stage Aggref and the final-stage Aggref differs from the original
* in that (1) it does not specify DISTINCT and (2) it refers to its arguments
* via Vars on the lower range.
*
* For the normal 2-phase case:
*
* Note that he result type of the partial-stage Aggref will be the
* transition type of the aggregate function.
*
* At execution, the function used to compute the transition type in the
* lower aggregation will be the normal accumulator function for the
* aggregate function. The final function, however, will not be used.
* The result will be the ending transition value.
*
* At execution, the function used to compute the transition type in the
* upper aggregation will be the prelim function (known to exist, else
* we would have rejected 2-stage aggregation as a strategy) on input
* values of the transition type. The normal accumulator function for the
* aggregate function will not be used. The normal final function will
* be used to convert the ending transition value to the result type.
* aggregation
*/
Node *split_aggref(Aggref *aggref, MppGroupContext *ctx)
{
ListCell *cell;
Node *final_node;
Oid transtype = InvalidOid;
AttrNumber attrno = OUTER;
TargetEntry *prelim_tle = NULL;
Assert(aggref != NULL && aggref->agglevelsup == 0);
if ( aggref->aggdistinct && ctx->use_dqa_pruning )
{
Index arg_attno;
Index dqa_attno;
TargetEntry *dqa_tle = NULL;
TargetEntry *arg_tle;
List *dref_tlist = NIL;
/* First find the DQA argument. Since this is a DQA, its argument
* list must contain a single expression that matches one of the
* target expressions in ctx->dqa_tlist.
*/
arg_tle = NULL;
if ( list_length(aggref->args) == 1 ) /* safer than Assert */
{
arg_tle = tlist_member(linitial(aggref->args), ctx->dqa_tlist);
}
if (arg_tle == NULL)
elog(ERROR,"Unexpected use of DISTINCT-qualified aggregation");
arg_attno = arg_tle->resno; /* [1..numDistinctCols] */
/* We may have seen a DQA just like this one already. Look for
* one in the distinct Aggref target list to date.
*/
dref_tlist = ctx->dref_tlists[arg_attno - 1];
dqa_attno = 1;
foreach( cell, dref_tlist )
{
TargetEntry *tle = (TargetEntry*)lfirst(cell);
Aggref *ref = (Aggref*)tle->expr;
/* Check similarity, avoid aggtype and aggstage
* (which we control) and don't bother with agglevelsup
* (which is always 0 here) or aggdistinct.
*/
if ( aggref->aggfnoid == ref->aggfnoid
&& aggref->aggstar == ref->aggstar
&& equal(aggref->args, ref->args) )
{
dqa_tle = tle;
break;
}
dqa_attno++;
}
if ( dqa_tle == NULL )
{
/* Didn't find a target for the DQA Aggref so make a new one.
*/
Var *arg_var;
Aggref *dqa_aggref;
arg_var = makeVar(ctx->final_varno, ctx->numGroupCols + 1,
exprType(linitial(aggref->args)),
exprTypmod(linitial(aggref->args)),
0);
dqa_aggref = makeNode(Aggref);
memcpy(dqa_aggref, aggref, sizeof(Aggref)); /* flat copy */
dqa_aggref->args = list_make1(arg_var);
dqa_aggref->aggdistinct = false;
dqa_tle = makeTargetEntry((Expr*)dqa_aggref, dqa_attno, NULL, false);
dref_tlist = lappend(dref_tlist, dqa_tle);
}
ctx->dref_tlists[arg_attno-1] = dref_tlist;
/* Make the "final" target for the DQA case, a reference to the
* DQA Aggref we just found or constructed.
*/
final_node = (Node*) makeVar(dqa_base_varno + arg_attno - 1,
dqa_attno,
exprType((Node*)arg_tle->expr),
exprTypmod((Node*)arg_tle->expr),
0);
}
else /* Ordinary Aggref -or- DQA but ctx->use_dqa_pruning is off. */
{
Aggref *pref;
Aggref *iref;
Aggref *fref;
/*
* We may have seen an Aggref just like this one already. Look for
* the preliminary form of such in the preliminary Aggref target
* list to date.
*/
foreach( cell, ctx->prefs_tlist )
{
TargetEntry *tle = (TargetEntry*)lfirst(cell);
Aggref *ref = (Aggref*)tle->expr;
/* Check similarity, avoid aggtype and aggstage
* (which we control) and don't bother with agglevelsup
* (which is always 0 here).
*/
if ( aggref->aggfnoid == ref->aggfnoid
&& aggref->aggstar == ref->aggstar
&& aggref->aggdistinct == ref->aggdistinct
&& equal(aggref->args, ref->args) )
{
prelim_tle = tle;
transtype = ref->aggtype;
attrno = prelim_tle->resno;
break;
}
}
/*
* If no existing preliminary Aggref target matched, add one that does.
*/
if ( prelim_tle == NULL )
{
TargetEntry *final_tle;
Var *args;
/* Get type information for the Aggref */
transtype = lookup_agg_transtype(aggref);
/* Make a new preliminary Aggref wrapped as a new target entry.
* Like the input Aggref, the preliminary refers to the lower
* range. */
pref = (Aggref*)copyObject(aggref);
pref->aggtype = transtype;
pref->aggstage = AGGSTAGE_PARTIAL;
attrno = 1 + list_length(ctx->prefs_tlist);
prelim_tle = makeTargetEntry((Expr*)pref, attrno, NULL, false);
prelim_tle->ressortgroupref = ctx->split_aggref_sortgroupref;
ctx->prefs_tlist = lappend(ctx->prefs_tlist, prelim_tle);
args = makeVar(ctx->final_varno,
ctx->numGroupCols
+ (ctx->use_dqa_pruning ? 1 : 0)
+ attrno,
transtype, -1, 0);
if ( ctx->use_irefs_tlist )
{
TargetEntry *inter_tle;
iref = makeNode(Aggref);
iref->aggfnoid = pref->aggfnoid;
iref->aggtype = transtype;
iref->args = list_make1((Expr*)copyObject(args));
iref->agglevelsup = 0;
iref->aggstar = false;
iref->aggdistinct = false;
iref->aggstage = AGGSTAGE_INTERMEDIATE;
inter_tle = makeTargetEntry((Expr*)iref, attrno, NULL, false);
inter_tle->ressortgroupref = ctx->split_aggref_sortgroupref;
ctx->irefs_tlist = lappend(ctx->irefs_tlist, inter_tle);
}
/* Make a new final Aggref. */
fref = makeNode(Aggref);
fref->aggfnoid = aggref->aggfnoid;
fref->aggtype = aggref->aggtype;
fref->args = list_make1((Expr*)args);
fref->agglevelsup = 0;
fref->aggstar = false;
fref->aggdistinct = false; /* handled in preliminary aggregation */
fref->aggstage = AGGSTAGE_FINAL;
final_tle = makeTargetEntry((Expr*)fref, attrno, NULL, false);
final_tle->ressortgroupref = ctx->split_aggref_sortgroupref;
ctx->frefs_tlist = lappend(ctx->frefs_tlist, final_tle);
}
final_node = (Node*)makeVar(ref_varno, attrno, aggref->aggtype, -1, 0);
}
return final_node;
}
/* Function: make_vars_tlist
*
* Make a targetlist similar to the given length n tlist but consisting of
* simple Var nodes with the given varno and varattno in offset + [1..N].
*/
List *make_vars_tlist(List *tlist, Index varno, AttrNumber offset)
{
List *new_tlist = NIL;
AttrNumber attno = offset;
ListCell *lc;
foreach (lc, tlist)
{
Var *new_var;
TargetEntry *new_tle;
TargetEntry *tle = (TargetEntry*)lfirst(lc);
attno++;
new_var = makeVar(varno, attno,
exprType((Node*)tle->expr),
exprTypmod((Node*)tle->expr), 0);
new_tle = makeTargetEntry((Expr*)new_var,
attno, /* resno always matches attnr */
(tle->resname == NULL) ? NULL : pstrdup(tle->resname),
false);
new_tle->ressortgroupref = tle->ressortgroupref;
new_tlist = lappend(new_tlist, new_tle);
}
return new_tlist;
}
/* Function: seq_tlist_concat
*
* Concatenates tlist2 to the end of tlist1 adjusting the resno values
* of tlist2 so that the resulting entries have resno = position+1.
* The resno values of tlist1 must be dense from 1 to the length of
* the list. (They are sequential by position, though this is not
* strictly required.
*
* May modify tlist1 in place (to adjust last link and length). Does not
* modify tlist2, but the result shares structure below the TargetEntry
* nodes.
*/
List *seq_tlist_concat(List *tlist1, List *tlist2)
{
ListCell *lc;
AttrNumber high_attno = list_length(tlist1);
foreach (lc, tlist2)
{
TargetEntry *tle= (TargetEntry*)lfirst(lc);
TargetEntry *new_tle = (TargetEntry*)makeNode(TargetEntry);
memcpy(new_tle, tle, sizeof(TargetEntry));
new_tle->resno = ++high_attno;
tlist1 = lappend(tlist1, new_tle);
}
return tlist1;
}
/* Function finalize_split_expr
*
* Note: Only called on the top of the "join" tree, so all D_i are
* included in attribute offset calculations.
*/
Node *finalize_split_expr(Node *expr, MppGroupContext *ctx)
{
return finalize_split_expr_mutator(expr, ctx);
}
/* Mutator subroutine for finalize_split_expr() replaces pseudo Var nodes
* produced by split_aggref() with the similarly typed expression found in
* the top-level targetlist, ctx->top_tlist, being finalized.
*
* For example, a pseudo Var node that represents the 3rd DQA for the
* 2nd DQA argument will be replaced by the targetlist expression that
* corresponds to that DQA.
*/
Node* finalize_split_expr_mutator(Node *node, MppGroupContext *ctx)
{
if (node == NULL)
return NULL;
if (IsA(node, Var))
{
AttrNumber attrno=(AttrNumber)0;
TargetEntry *tle;
Var *pseudoVar = (Var*)node;
if ( pseudoVar->varno == grp_varno )
{
attrno = pseudoVar->varattno;
}
else if ( pseudoVar->varno == ref_varno )
{
if ( ctx->use_dqa_pruning )
{
attrno = ctx->dqa_offsets[ctx->numDistinctCols]
+ pseudoVar->varattno;
}
else
{
attrno = ctx->numGroupCols + pseudoVar->varattno;
}
}
else if ( pseudoVar->varno >= dqa_base_varno && ctx->use_dqa_pruning )
{
int i = pseudoVar->varno - dqa_base_varno;
attrno = ctx->dqa_offsets[i] + pseudoVar->varattno;
}
else
{
elog(ERROR,"Unexpected failure of multi-phase aggregation planning");
}
tle = (TargetEntry*) list_nth(ctx->top_tlist, attrno - 1);
return (Node*) tle->expr;
}
return expression_tree_mutator(node,
finalize_split_expr_mutator,
(void*)ctx);
}
/* Function lookup_agg_transtype
*
* Return the transition type Oid of the given aggregate fuction or throw
* an error, if none.
*/
Oid lookup_agg_transtype(Aggref *aggref)
{
Oid aggid = aggref->aggfnoid;
Oid result;
int fetchCount;
/* XXX: would have been get_agg_transtype() */
result = caql_getoid_plus(
NULL,
&fetchCount,
NULL,
cql("SELECT aggtranstype FROM pg_aggregate "
" WHERE aggfnoid = :1 ",
ObjectIdGetDatum(aggid)));
if (!fetchCount)
elog(ERROR, "cache lookup failed for aggregate %u", aggid);
return result;
}
/* Function: adapt_flow_to_targetlist
*
* Sometimes we replace the targetlist (especially subplan of an Agg).
* In this case, we need to assure that any hash expressions in the
* node's flow remain in the targetlist OR null out the hash leaving,
* in effect, a strewn flow.
*/
void adapt_flow_to_targetlist(Plan *plan)
{
ListCell *c;
Flow *flow;
Assert(plan != NULL);
flow = plan->flow;
Assert(flow);
foreach(c, flow->hashExpr)
{
Node *expr = (Node*)lfirst(c);
if (!tlist_member(expr, plan->targetlist))
{
flow->hashExpr = NIL;
break;
}
}
}
/**
* Update the scatter clause before a query tree's targetlist is about to
* be modified. The scatter clause of a query tree will correspond to
* old targetlist entries. If the query tree is modified and the targetlist
* is to be modified, we must call this method to ensure that the scatter clause
* is kept in sync with the new targetlist.
*/
void UpdateScatterClause(Query *query, List *newtlist)
{
Assert(query);
Assert(query->targetList);
Assert(newtlist);
if (query->scatterClause
&& list_nth(query->scatterClause, 0) != NULL /* scattered randomly */
)
{
Assert(list_length(query->targetList) == list_length(newtlist));
List *scatterClause = NIL;
ListCell *lc = NULL;
foreach (lc, query->scatterClause)
{
Expr *o = (Expr *) lfirst(lc);
Assert(o);
TargetEntry *tle = tlist_member((Node *) o, query->targetList);
Assert(tle);
TargetEntry *ntle = list_nth(newtlist, tle->resno - 1);
scatterClause = lappend(scatterClause, copyObject(ntle->expr));
}
query->scatterClause = scatterClause;
}
}
/*
* Function: add_second_stage_agg
*
* Add an Agg/Group node on top of an Agg/Group node. We add a SubqueryScan
* node on top of the existing Agg/Group node before adding the new Agg/Group
* node.
*
* Params:
* is_agg -- indicate to add an Agg or a Group node.
* prelim_tlist -- the targetlist for the existing Agg/Group node.
* final_tlist -- the targetlist for the new Agg/Group node.
*/
Plan *
add_second_stage_agg(PlannerInfo *root,
bool is_agg,
List *lower_tlist,
List *upper_tlist,
List *upper_qual,
AggStrategy aggstrategy,
int numGroupCols,
AttrNumber *prelimGroupColIdx,
int num_nullcols,
uint64 input_grouping,
uint64 grouping,
int rollup_gs_times,
double numGroups,
int numAggs,
int transSpace,
const char *alias,
List **p_current_pathkeys,
Plan *result_plan,
bool use_root,
bool adjust_scatter)
{
Query *parse = root->parse;
Query *subquery;
List *newrtable;
RangeTblEntry *newrte;
RangeTblRef *newrtref;
Plan *agg_node;
/*
* Add a SubqueryScan node to renumber the range of the query.
*
* The result of the preliminary aggregation (represented by lower_tlist)
* may contain targets with no representatives in the range of its outer
* relation. We resolve this be treating the preliminary aggregation as
* a subquery.
*
* However, this breaks the correspondence between the Plan tree and
* the Query tree that is assumed by the later call to set_plan_references
* as well as by the deparse processing used (e.g.) in EXPLAIN.
*
* So we also push the Query node from the root structure down into a new
* subquery RTE and scribble over the original Query node to make it into
* a simple SELECT * FROM a Subquery RTE.
*
* Note that the Agg phase we add below will refer to the attributes of
* the result of this new SubqueryScan plan node. It is up to the caller
* to set up upper_tlist and upper_qual accordingly.
*/
/* Flat-copy the root query into a newly allocated Query node and adjust
* its target list and having qual to match the lower (existing) Agg
* plan we're about to make into a SubqueryScan.
*/
subquery = copyObject(parse);
subquery->targetList = copyObject(lower_tlist);
subquery->havingQual = NULL;
/* Subquery attributes shouldn't be marked as junk, else they'll be
* skipped by addRangeTableEntryForSubquery. */
{
ListCell *cell;
foreach ( cell, subquery->targetList )
{
TargetEntry *tle = (TargetEntry *)lfirst(cell);
tle->resjunk = false;
if ( tle->resname == NULL )
{
if ( use_root && IsA(tle->expr, Var) )
{
Var *var = (Var*)tle->expr;
RangeTblEntry *rte = rt_fetch(var->varno, root->parse->rtable);
tle->resname = pstrdup(get_rte_attribute_name(rte, var->varattno));
}
else
{
const char *fmt = "unnamed_attr_%d";
char buf[32]; /* big enuf for fmt */
sprintf(buf,fmt,tle->resno);
tle->resname = pstrdup(buf);
}
}
}
}
/* Construct a range table entry referring to it. */
newrte = addRangeTableEntryForSubquery(NULL,
subquery,
makeAlias(alias, NULL),
TRUE);
newrtable = list_make1(newrte);
/* Modify the root query in place to look like its range table is
* a simple Subquery. */
parse->querySource = QSRC_PLANNER; /* but remember it's really ours */
parse->rtable = newrtable;
parse->jointree = makeNode(FromExpr);
newrtref = makeNode(RangeTblRef);
newrtref->rtindex = 1;
parse->jointree->fromlist = list_make1(newrtref);
parse->jointree->quals = NULL;
parse->rowMarks = NIL;
/* <EXECUTE s> uses parse->targetList to derive the portal's tupDesc,
* so when use_root is true, the caller owns the responsibility to make
* sure it ends up in an appropriate form at the end of planning.
*/
if ( use_root )
{
if (adjust_scatter)
{
UpdateScatterClause(parse, upper_tlist);
}
parse->targetList = copyObject(upper_tlist); /* Match range. */
}
result_plan = add_subqueryscan(root, p_current_pathkeys, 1, subquery, result_plan);
/* Add an Agg node */
/* convert current_numGroups to long int */
long lNumGroups = (long) Min(numGroups, (double) LONG_MAX);
agg_node = (Plan *)make_agg(root,
upper_tlist,
upper_qual,
aggstrategy, false,
numGroupCols,
prelimGroupColIdx,
lNumGroups,
num_nullcols,
input_grouping,
grouping,
rollup_gs_times,
numAggs,
transSpace,
result_plan);
/*
* Agg will not change the sort order unless it is hashed.
*/
agg_node->flow = pull_up_Flow(agg_node,
agg_node->lefttree,
(*p_current_pathkeys != NIL)
&& aggstrategy != AGG_HASHED );
/*
* Since the rtable has changed, we had better recreate a RelOptInfo entry for it.
*/
if (root->simple_rel_array)
pfree(root->simple_rel_array);
root->simple_rel_array_size = list_length(parse->rtable) + 1;
root->simple_rel_array = (RelOptInfo **)
palloc0(root->simple_rel_array_size * sizeof(RelOptInfo *));
build_simple_rel(root, 1, RELOPT_BASEREL);
return agg_node;
}
/*
* Add a SubqueryScan node to the input plan and maintain the given
* pathkeys by making adjustments to them and to the equivalence class
* information in root.
*
* Note that submerging a plan into a subquery scan will require changes
* to the range table and to any expressions above the new scan node.
* This is the caller's responsibility since the nature of the changes
* depends on the context in which the subquery is used.
*/
Plan* add_subqueryscan(PlannerInfo* root, List **p_pathkeys,
Index varno, Query *subquery, Plan *subplan)
{
List *subplan_tlist;
int *resno_map;
subplan_tlist = generate_subquery_tlist(varno, subquery->targetList,
false, &resno_map);
subplan = (Plan*)make_subqueryscan(root, subplan_tlist,
NIL,
varno, /* scanrelid (= varno) */
subplan,
subquery->rtable);
mark_passthru_locus(subplan, true, true);
/* Reconstruct equi_key_list since the rtable has changed.
* XXX we leak the old one.
*/
root->equi_key_list = construct_equivalencekey_list(root->equi_key_list,
resno_map,
subquery->targetList,
subplan_tlist);
if ( p_pathkeys != NULL && *p_pathkeys != NULL )
{
*p_pathkeys = reconstruct_pathkeys(root, *p_pathkeys, resno_map,
subquery->targetList, subplan_tlist);
}
pfree(resno_map);
return subplan;
}
/*
* hash_safe_type - is the given type hashable?
*
* Modelled on function hash_safe_grouping in planner.c, we assume
* the type is hashable if its equality operator is marked hashjoinable.
*/
static bool
hash_safe_type(Oid type)
{
Operator optup;
bool oprcanhash;
optup = equality_oper(type, true);
if (!optup)
return false;
oprcanhash = ((Form_pg_operator) GETSTRUCT(optup))->oprcanhash;
ReleaseOperator(optup);
if (!oprcanhash)
return false;
return true;
}
/*
* sorting_prefixes_grouping - is the result ordered on a grouping key prefix?
*
* If so, then we might prefer a pre-ordered grouping result to one that would
* need sorting after the fact.
*/
static bool
sorting_prefixes_grouping(PlannerInfo *root)
{
return root->sort_pathkeys != NIL
&& pathkeys_contained_in(root->sort_pathkeys, root->group_pathkeys);
}
/*
* gp_hash_safe_grouping - are grouping operators GP hashable for
* redistribution motion nodes?
*/
static bool
gp_hash_safe_grouping(PlannerInfo *root)
{
List *grouptles;
ListCell *glc;
grouptles = get_sortgroupclauses_tles(root->parse->groupClause,
root->parse->targetList);
foreach(glc, grouptles)
{
TargetEntry *tle = (TargetEntry *)lfirst(glc);
bool canhash;
canhash = isGreenplumDbHashable(exprType((Node *)tle->expr));
if (!canhash)
return false;
}
return true;
}
/*
* reconstruct_pathkeys
*
* Reconstruct the given pathkeys based on the given mapping from the original
* targetlist to a new targetlist.
*/
List *
reconstruct_pathkeys(PlannerInfo *root, List *pathkeys, int *resno_map,
List *orig_tlist, List *new_tlist)
{
List *new_pathkeys;
ListCell *lc;
foreach (lc, pathkeys)
{
List *keys = (List *)lfirst(lc);
ListCell *key_lc;
TargetEntry *new_tle;
Assert(IsA(keys, List));
foreach(key_lc, keys)
{
PathKeyItem *item = (PathKeyItem *)lfirst(key_lc);
int resno = 0;
ListCell *tle_lc;
Assert(IsA(item, PathKeyItem));
foreach(tle_lc, orig_tlist)
{
TargetEntry *tle = (TargetEntry *)lfirst(tle_lc);
Assert(IsA(tle, TargetEntry));
if (equal(tle->expr, item->key))
{
resno = tle->resno;
break;
}
}
if (resno > 0)
{
new_tle = get_tle_by_resno(new_tlist, resno_map[resno-1]);
Assert(new_tle != NULL);
item->key = (Node *)new_tle->expr;
}
}
}
new_pathkeys = canonicalize_pathkeys(root, pathkeys);
return new_pathkeys;
}
/* cost_common_agg -- Estimate the cost of executing the common subquery
* for an aggregation plan. Assumes that the AggPlanInfo contains the
* correct Path as input_path.
*
* Returns the total cost and, more importantly, populates the given
* dummy Plan node with cost information
*/
Cost cost_common_agg(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info, Plan *dummy)
{
QualCost tlist_cost;
Cost startup_cost;
Cost total_cost;
double input_rows;
int input_width;
int n;
Assert(dummy != NULL);
input_rows = info->input_path->parent->rows;
input_width = info->input_path->parent->width;
/* Path input width isn't correct for ctx->sub_tlist so we guess. */
n = 32 * list_length(ctx->sub_tlist);
input_width = ( input_width < n )? input_width: n;
/* Estimate cost of evaluation of the sub_tlist. */
cost_qual_eval(&tlist_cost, ctx->sub_tlist, root);
startup_cost = info->input_path->startup_cost + tlist_cost.startup;
total_cost = info->input_path->total_cost + tlist_cost.startup +
tlist_cost.per_tuple * input_rows;
memset(dummy, 0, sizeof(Plan));
dummy->type = info->input_path->type;
dummy->startup_cost = startup_cost;
dummy->total_cost = total_cost;
dummy->plan_rows = input_rows;
dummy->plan_width = input_width;
return dummy->total_cost;
}
/* Function cost_1phase_aggregation
*
* May be used for 1 phase aggregation costing with or without DQAs.
* Corresponds to make_one_stage_agg_plan and must be maintained in sync
* with it.
*/
Cost cost_1phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info)
{
Plan input_dummy;
bool is_sorted;
double input_rows;
long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 :
(*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX :
(long)*(ctx->p_dNumGroups);
cost_common_agg(root, ctx, info, &input_dummy);
input_rows = input_dummy.plan_rows;
is_sorted = pathkeys_contained_in(root->group_pathkeys, info->input_path->pathkeys);
/* Collocation cost (Motion). */
switch ( info->group_prep )
{
case MPP_GRP_PREP_HASH_GROUPS:
is_sorted = false;
input_dummy.total_cost +=
incremental_motion_cost(input_dummy.plan_rows,
input_dummy.plan_rows);
break;
case MPP_GRP_PREP_FOCUS_QE:
case MPP_GRP_PREP_FOCUS_QD:
input_dummy.total_cost +=
incremental_motion_cost(input_dummy.plan_rows,
input_dummy.plan_rows * root->config->cdbpath_segments);
input_dummy.plan_rows = input_dummy.plan_rows * root->config->cdbpath_segments;
break;
default:
break;
}
/* NB: We don't need to calculate grouping extension costs here because
* grouping extensions are planned elsewhere.
*/
if ( ctx->use_hashed_grouping )
{
/* HashAgg */
Assert( ctx->numDistinctCols == 0 );
add_agg_cost(NULL, &input_dummy,
ctx->sub_tlist, (List*)root->parse->havingQual,
AGG_HASHED, false,
ctx->numGroupCols, ctx->groupColIdx,
numGroups, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
else
{
if ( ctx->numGroupCols == 0 )
{
/* PlainAgg */
add_agg_cost(NULL, &input_dummy,
ctx->sub_tlist, (List*)root->parse->havingQual,
AGG_PLAIN, false,
0, NULL,
1, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
else
{
/* GroupAgg */
if ( ! is_sorted )
{
add_sort_cost(NULL, &input_dummy, ctx->numGroupCols, NULL, NULL);
}
add_agg_cost(NULL, &input_dummy,
ctx->sub_tlist, (List*)root->parse->havingQual,
AGG_SORTED, false,
ctx->numGroupCols, ctx->groupColIdx,
numGroups, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
/* Beware: AGG_PLAIN and AGG_GROUPED may be used with DQAs, however,
* the function cost_agg doesn't distinguish DQAs so it
* consistently under estimates the cost in these cases.
*/
if ( ctx->numDistinctCols > 0 )
{
Path path_dummy;
double ngrps = *(ctx->p_dNumGroups);
double nsorts = ngrps * ctx->numDistinctCols;
double avgsize = input_dummy.plan_rows / ngrps;
cost_sort(&path_dummy, NULL, NIL, 0.0, avgsize, 32);
input_dummy.total_cost += nsorts * path_dummy.total_cost;
}
}
info->plan_cost = root->config->gp_eager_one_phase_agg ? (Cost)0.0 : input_dummy.total_cost;
info->valid = true;
info->join_strategy = DqaJoinNone;
info->use_sharing = false;
info->plan_cost *= gp_coefficient_1phase_agg;
return info->plan_cost;
}
/* Function cost_2phase_aggregation
*
* May be used for 2 phase costing with 0 or 1 DQAs.
* Corresponds to make_two_stage_agg_plan and must be maintained in sync
* with it.
*/
Cost cost_2phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info)
{
Plan input_dummy;
bool is_sorted;
long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 :
(*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX :
(long)*(ctx->p_dNumGroups);
double input_rows;
double streaming_fudge = 1.3;
cost_common_agg(root, ctx, info, &input_dummy);
input_rows = input_dummy.plan_rows;
is_sorted = pathkeys_contained_in(root->group_pathkeys, info->input_path->pathkeys);
/* Precondition Input */
switch ( info->group_prep )
{
case MPP_GRP_PREP_HASH_DISTINCT:
input_dummy.total_cost +=
incremental_motion_cost(input_dummy.plan_rows,
input_dummy.plan_rows);
is_sorted = false;
break;
case MPP_GRP_PREP_NONE:
break;
default:
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("unexpected call for two-stage aggregation")));
break; /* Never */
}
/* Preliminary Aggregation */
if ( ctx->use_hashed_grouping )
{
/* Preliminary HashAgg*/
add_agg_cost(NULL, &input_dummy,
NIL, NIL, /* Don't know preliminary tlist, qual IS NIL */
AGG_HASHED, root->config->gp_hashagg_streambottom,
ctx->numGroupCols, ctx->groupColIdx,
numGroups, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
if ( gp_hashagg_streambottom )
{
input_dummy.plan_rows *= streaming_fudge;
}
}
else
{
if ( ctx->numGroupCols == 0 )
{
/* Preliminary PlainAgg*/
add_agg_cost(NULL, &input_dummy,
NIL, NIL, /* Don't know preliminary tlist, qual IS NIL */
AGG_PLAIN, false,
0, NULL,
1, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
else
{
/* Preliminary GroupAgg */
if ( ! is_sorted )
{
add_sort_cost(NULL, &input_dummy, ctx->numGroupCols, NULL, NULL);
}
add_agg_cost(NULL, &input_dummy,
NIL, NIL, /* Don't know preliminary tlist, qual IS NIL */
AGG_SORTED, false,
ctx->numGroupCols, ctx->groupColIdx,
numGroups, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
/* Beware: AGG_PLAIN and AGG_GROUPED may be used with DQAs, however,
* the function cost_agg doesn't distinguish DQAs so it
* consistently under estimates the cost in these cases.
*/
if ( ctx->numDistinctCols > 0 )
{
Path path_dummy;
Cost run_cost;
double ngrps = *(ctx->p_dNumGroups);
double avgsize = input_rows / ngrps;
Assert(ctx->numDistinctCols == 1);
cost_sort(&path_dummy, NULL, NIL, input_dummy.total_cost, avgsize, 32);
run_cost = path_dummy.total_cost - path_dummy.startup_cost;
input_dummy.total_cost += path_dummy.startup_cost + ngrps * run_cost;
}
}
/* Collocate groups */
switch ( info->group_type )
{
case MPP_GRP_TYPE_GROUPED_2STAGE: /* Redistribute */
input_dummy.total_cost +=
incremental_motion_cost(input_dummy.plan_rows,
input_dummy.plan_rows);
break;
case MPP_GRP_TYPE_PLAIN_2STAGE: /* Gather */
input_dummy.total_cost +=
incremental_motion_cost(input_dummy.plan_rows,
input_dummy.plan_rows *root->config->cdbpath_segments);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("unexpected call for two-stage aggregation")));
break; /* Never */
}
/* Final Aggregation */
if ( ctx->use_hashed_grouping )
{
/* HashAgg*/
add_agg_cost(NULL, &input_dummy,
NIL, NIL, /* Don't know tlist or qual */
AGG_HASHED, false,
ctx->numGroupCols, ctx->groupColIdx,
numGroups, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
else
{
if ( ctx->numGroupCols == 0 )
{
/* PlainAgg*/
add_agg_cost(NULL, &input_dummy,
NIL, NIL, /* Don't know tlist or qual */
AGG_PLAIN, false,
0, NULL,
1, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
else
{
/* GroupAgg */
add_sort_cost(NULL, &input_dummy, ctx->numGroupCols, NULL, NULL);
add_agg_cost(NULL, &input_dummy,
NIL, NIL, /* Don't know tlist or qual */
AGG_SORTED, false,
ctx->numGroupCols, ctx->groupColIdx,
numGroups, 0, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
}
}
info->plan_cost = root->config->gp_eager_two_phase_agg ? (Cost)0.0 : input_dummy.total_cost;
info->valid = true;
info->join_strategy = DqaJoinNone;
info->use_sharing = false;
info->plan_cost *= gp_coefficient_2phase_agg;
return info->plan_cost;
}
/* Function cost_3phase_aggregation
*
* Only used for costing with 2 or more DQAs.
* Corresponds to make_three_stage_agg_plan and must be maintained in sync
* with it.
*
* This function assumes the enviroment established by planDqaJoinOrder()
* and set_coplan_strategies().
*/
Cost cost_3phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info)
{
Plan dummy;
double input_rows;
Cost total_cost;
Cost share_cost;
Cost reexec_cost;
int i;
bool use_sharing = false;
DqaJoinStrategy join_strategy = DqaJoinUndefined;
Cost input_cost = 0.0;
Cost cost_coplan_cheapest = 0.0;
Cost cost_coplan_sorted = 0.0;
Cost cost_hashjoin = 0.0;
Cost cost_mergejoin = 0.0;
Cost cost_crossjoin = 0.0;
cost_common_agg(root, ctx, info, &dummy);
input_rows = dummy.plan_rows;
total_cost = dummy.total_cost;
Assert( ctx->numDistinctCols == list_length(ctx->agg_counts->dqaArgs) );
/* Note that join order has already been established by an earlier
* call to planDqaJoinOrder. Here we just use that order, but we
* need to decide on the join type.
*/
if ( list_length(ctx->agg_counts->dqaArgs) < 2 )
{
join_strategy = DqaJoinNone;
}
else if ( ctx->numGroupCols == 0 )
{
join_strategy = DqaJoinCross;
}
else if ( sorting_prefixes_grouping(root) )
{
/* Cheapest of merge join of sorted input or sorted hash join */
join_strategy = DqaJoinSorted;
}
else
{
/* Cheapest of merge join of sorted input or hash join */
join_strategy = DqaJoinCheapest;
}
/* Determine whether to use input sharing. */
if ( ctx->numDistinctCols < 2 )
{
reexec_cost = total_cost;
use_sharing = false;
}
else
{
/* Decide based on apparent costs.
* XXX Do we need to override this if there are volatile functions
* in the common plan? Is this known, or do we need to search?
*/
share_cost = cost_share_plan(&dummy, root, ctx->numDistinctCols);
reexec_cost = ctx->numDistinctCols * total_cost;
use_sharing = share_cost < reexec_cost;
}
input_cost = use_sharing ? share_cost : reexec_cost;
/* Get costs for the ctx->numDistinctCols coplans. */
cost_coplan_cheapest = cost_coplan_sorted = 0;
for ( i = 0; i < ctx->numDistinctCols; i++ )
{
DqaInfo *arg = ctx->dqaArgs + i;
cost_coplan_cheapest += arg->cost_cheapest;
cost_coplan_sorted += arg->cost_sorted;
}
/* Get costs to join the coplan results. */
switch ( join_strategy )
{
case DqaJoinNone:
break;
case DqaJoinCross:
cost_crossjoin = (ctx->numDistinctCols - 1) * 2 * cpu_tuple_cost;
break;
case DqaJoinSorted:
case DqaJoinCheapest:
set_cost_of_join_strategies(ctx, &cost_hashjoin, &cost_mergejoin);
if ( join_strategy == DqaJoinSorted )
cost_hashjoin += incremental_sort_cost(*ctx->p_dNumGroups, 100, ctx->numDistinctCols);
cost_hashjoin += cost_coplan_cheapest;
cost_mergejoin += cost_coplan_sorted;
if ( cost_hashjoin > 0.0 && cost_hashjoin <= cost_mergejoin )
{
join_strategy = DqaJoinHash;
}
else
{
join_strategy = DqaJoinMerge;
}
break;
default:
elog(ERROR, "unexpected join strategy during DQA planning");
}
/* Compare costs choose cheapest. */
switch ( join_strategy )
{
case DqaJoinNone:
total_cost = input_cost + cost_coplan_cheapest;
break;
case DqaJoinCross:
total_cost = input_cost + cost_coplan_cheapest + cost_crossjoin;
break;
case DqaJoinHash:
total_cost = input_cost + cost_coplan_cheapest + cost_hashjoin;
break;
case DqaJoinMerge:
total_cost = input_cost + cost_coplan_cheapest + cost_mergejoin;
break;
default:
elog(ERROR, "unexpected join strategy during DQA planning");
}
info->plan_cost = root->config->gp_eager_dqa_pruning ? (Cost)0.0 : total_cost;
info->valid = true;
info->join_strategy = join_strategy;
info->use_sharing = use_sharing;
info->plan_cost *= gp_coefficient_3phase_agg;
return info->plan_cost;
}
/* Estimate the costs of
* 1. HashJoin of cheapest inputs, and
* 2. MergeJoin of sorted input.
*
* If result should be ordered, compare a Sort of 1 with 2.
* Else compare 1 with 2.
*/
void set_cost_of_join_strategies(MppGroupContext *ctx, Cost *hashjoin_cost, Cost *mergejoin_cost)
{
Cost hj_cost;
Cost mj_cost;
List *mergeclauses = NIL;
List *hashclauses = NIL;
double rows;
int gk_width;
int outer_width;
bool try_hashed = true;
AttrNumber attrno;
Index outer_varno = 1;
int i;
rows = *ctx->p_dNumGroups;
/* Widths are wild speculation, but good enough, we hope. */
gk_width = 32 * ctx->numGroupCols;
outer_width = 32; /* DQA transition values for first DQA arg. */
outer_width += 64; /* Ordinary aggregate transition values. */
/* We need join clauses for costing. */
for( i = 0; i < ctx->numGroupCols; i++ )
{
Expr *qual;
Var *outer_var;
Var *inner_var;
AttrNumber resno = ctx->groupColIdx[i];
Index inner_varno = 1 + (i + 1);
TargetEntry *tle = get_tle_by_resno(ctx->sub_tlist, resno);
Assert( tle != NULL );
outer_var = makeVar(outer_varno, resno,
exprType((Node *)tle->expr),
exprTypmod((Node *)tle->expr), 0);
inner_var = makeVar(inner_varno, resno,
exprType((Node *)tle->expr),
exprTypmod((Node *)tle->expr), 0);
/* outer should always be on the left */
qual = make_op(NULL, list_make1(makeString("=")),
(Node*) outer_var,
(Node*) inner_var, -1);
/* If the grouping column is not hashable, do not try hashing. */
if (!hash_safe_type(exprType((Node *)tle->expr)))
try_hashed = false;
if ( try_hashed )
{
hashclauses = lappend(hashclauses, copyObject(qual));
}
qual->type = T_DistinctExpr;
qual = make_notclause(qual);
mergeclauses = lappend(mergeclauses, qual);
}
/* Estimate the incremental join costs. */
hj_cost = mj_cost = 0;
for ( attrno = 1; attrno < ctx->numDistinctCols; attrno++ )
{
int dqa_width = 32;
int inner_width = gk_width + dqa_width;
mj_cost += incremental_mergejoin_cost(rows, mergeclauses, ctx->root);
if ( try_hashed )
hj_cost += incremental_hashjoin_cost(rows, inner_width, outer_width, hashclauses, ctx->root);
outer_width += dqa_width;
}
*mergejoin_cost = mj_cost;
*hashjoin_cost = try_hashed ? hj_cost : 0.0;
}
/* Set up basic structure content. Caller to fill in.
*/
static
void initAggPlanInfo(AggPlanInfo *info, Path *input_path, Plan *input_plan)
{
info->input_path = input_path;
info->input_plan = input_plan;
if (input_path != NULL)
info->input_locus = input_path->locus;
else
CdbPathLocus_MakeNull(&info->input_locus);
info->group_type = MPP_GRP_TYPE_BASEPLAN;
info->group_prep = MPP_GRP_PREP_NONE;
CdbPathLocus_MakeNull(&info->output_locus);
info->distinctkey_collocate = false;
info->valid = false;
info->plan_cost = 0;
info->join_strategy = DqaJoinUndefined;
info->use_sharing = false;
}
/* set_coplan_strategies
*
* Determine and cache in the given DqaInfo structure the cheapest
* strategy that computes the answer and the cheapest strategy that
* computes the answer in grouping key order.
*
* Below, the result cardinalities are shown as <-n- where
*
* x (input_rows) is the input cardinality which is usually about
* equal to #segments * #distinct(grouping key, DQA arg)
*
* d (darg_rows) is #distinct(grouping key, DQA arg)
*
* g (group_rows) is #distinct(grouping key)
*
* The coplan through the Motion that collocates tuples on the
* grouping key is planned independently and will be one of
*
* <- Motion <-x- HashAgg <-t- R
* <- Motion <-x- GroupAgg <- Sort <-t- R
*
* which is encoded in DqaInfo by the flag use_hashed_preliminary.
*
* The possible post-Motion strategies are encoded as enum values of
* type DqaCoplanType and indicate all the required plan nodes.
*
* Vector aggregation strategies that produce a result ordered on the
* grouping key are:
*
* DQACOPLAN_GGS: <-g- GroupAgg <-d- GroupAgg <- Sort <-x-
* DQACOPLAN_GSH: <-g- GroupAgg <- Sort <-d- HashAgg <-x-
* DQACOPLAN_SHH: <- Sort <-g- HashAgg <-d- HashAgg <-x-
*
* In addition, the vector aggreagation strategy
*
* DQACOPLAN_HH: <-g- HashAgg <-d- HashAgg <-x- R
*
* produces an unordered result.
*
* Scalar aggregation strategies are:
*
* DQACOPLAN_PGS: <-1- PlainAgg <-d- GroupAgg <- Sort <-x- R
* DQACOPLAN_PH: <-1- PlainAgg <-d- HashedAgg <-x- R
*
*/
void set_coplan_strategies(PlannerInfo *root, MppGroupContext *ctx, DqaInfo *dqaArg, Path *input)
{
double input_rows = input->parent->rows;
int input_width = input->parent->width;
double darg_rows = dqaArg->num_rows;
double group_rows = *ctx->p_dNumGroups;
long numGroups = (group_rows < 0) ? 0 :
(group_rows > LONG_MAX) ? LONG_MAX :
(long)group_rows;
bool can_hash_group_key = ctx->agg_counts->canHashAgg;
bool can_hash_dqa_arg = dqaArg->can_hash;
bool use_hashed_preliminary = false;
Cost sort_input = incremental_sort_cost(input_rows, input_width,
ctx->numGroupCols+1);
Cost sort_dargs = incremental_sort_cost(darg_rows, input_width,
ctx->numGroupCols);
Cost sort_groups = incremental_sort_cost(group_rows, input_width,
ctx->numGroupCols);
Cost gagg_input = incremental_agg_cost(input_rows, input_width,
AGG_SORTED, ctx->numGroupCols+1,
numGroups, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
Cost gagg_dargs = incremental_agg_cost(darg_rows, input_width,
AGG_SORTED, ctx->numGroupCols,
numGroups, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
Cost hagg_input = incremental_agg_cost(input_rows, input_width,
AGG_HASHED, ctx->numGroupCols+1,
numGroups, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
Cost hagg_dargs = incremental_agg_cost(darg_rows, input_width,
AGG_HASHED, ctx->numGroupCols,
numGroups, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
Cost cost_base;
Cost cost_sorted;
Cost cost_cheapest;
DqaCoplanType type_sorted;
DqaCoplanType type_cheapest;
Cost trial;
/* Preliminary aggregation */
use_hashed_preliminary = ( can_hash_group_key || ctx->numGroupCols == 0 )
&& can_hash_dqa_arg;
if ( use_hashed_preliminary )
{
cost_base = hagg_input;
}
else
{
cost_base = sort_input + gagg_input;
}
/* Collocating motion */
cost_base += incremental_motion_cost(darg_rows, darg_rows);
/* Post-motion processing is more complex. */
if ( ctx->numGroupCols == 0 ) /* scalar agg */
{
Cost pagg_dargs = incremental_agg_cost(darg_rows, input_width,
AGG_PLAIN, 0,
1, ctx->agg_counts->numAggs,
ctx->agg_counts->transitionSpace);
type_sorted = type_cheapest = DQACOPLAN_PGS;
cost_sorted = cost_cheapest = sort_input + gagg_input + pagg_dargs;
trial = hagg_input + pagg_dargs;
if (trial < cost_cheapest )
{
cost_cheapest = trial;
type_cheapest = DQACOPLAN_PH;
}
}
else /* vector agg */
{
type_sorted = type_cheapest = DQACOPLAN_GGS;
cost_sorted = cost_cheapest = sort_input + gagg_input + gagg_dargs;
if ( can_hash_dqa_arg )
{
trial = hagg_input + sort_dargs + gagg_input;
if ( trial < cost_cheapest )
{
cost_cheapest = trial;
type_cheapest = DQACOPLAN_GSH;
}
if ( trial < cost_sorted )
{
cost_sorted = trial;
type_sorted = DQACOPLAN_GSH;
}
}
if ( can_hash_group_key && can_hash_dqa_arg )
{
trial = hagg_input + hagg_dargs;
if ( trial < cost_cheapest )
{
cost_cheapest = trial;
type_cheapest = DQACOPLAN_HH;
}
trial += sort_groups;
if ( trial < cost_sorted )
{
cost_sorted = trial;
type_sorted = DQACOPLAN_SHH;
}
}
}
dqaArg->use_hashed_preliminary = use_hashed_preliminary;
dqaArg->cost_sorted = cost_base + cost_sorted;
dqaArg->coplan_type_sorted = type_sorted;
dqaArg->cost_cheapest = cost_base + cost_cheapest;
dqaArg->coplan_type_cheapest = type_cheapest;
dqaArg->distinctkey_collocate = false;
}
/* incremental_sort_cost -- helper for set_coplan_strategies
*/
Cost incremental_sort_cost(double rows, int width, int numKeyCols)
{
Plan dummy;
memset(&dummy, 0, sizeof(dummy));
dummy.plan_rows = rows;
dummy.plan_width = width;
add_sort_cost(NULL, &dummy, numKeyCols, NULL, NULL);
return dummy.total_cost;
}
/* incremental_agg_cost -- helper for set_coplan_strategies
*/
Cost incremental_agg_cost(double rows, int width, AggStrategy strategy,
int numGroupCols, double numGroups,
int numAggs, int transSpace)
{
Plan dummy;
memset(&dummy, 0, sizeof(dummy));
dummy.plan_rows = rows;
dummy.plan_width = width;
add_agg_cost(NULL, &dummy,
NULL, NULL,
strategy, false,
numGroupCols, NULL,
numGroups, 0, numAggs, transSpace);
return dummy.total_cost;
}
/* incremental_motion_cost -- helper for set_coplan_strategies
*/
Cost incremental_motion_cost(double sendrows, double recvrows)
{
Cost cost_per_row = (gp_motion_cost_per_row > 0.0)
? gp_motion_cost_per_row
: 2.0 * cpu_tuple_cost;
return cost_per_row * 0.5 * (sendrows + recvrows);
}
/*
* choose_deduplicate
* numGroups is an output parameter that is estimated number of unique groups.
*
* Ideally, we could estimate all the costs based on what will happen and
* choose either of a naive or a de-duplicate plan. However, it is nearly
* impossible because there are plans decided by other components such as
* cdb_grouping_planner() and many branches including Redistribute is
* needed or not. In addition, estimated number of groups in multi-column
* case (i.e. GROUP BY + ORDER BY) is far from the truth. We could improve
* the code by adding more cost calculations, but it is reasonable to
* estimate the major costs of initial sort/aggregate for now.
*/
static bool
choose_deduplicate(PlannerInfo *root, List *sortExprs,
Plan *input_plan, double *numGroups)
{
double num_distinct;
double input_rows = input_plan->plan_rows;
Path dummy_path;
Cost naive_cost, dedup_cost;
int32 width;
AggStrategy aggstrategy;
int numGroupCols;
naive_cost = 0;
dedup_cost = 0;
width = input_plan->plan_width;
/* Add int8 column anyway which holds count */
width += get_typavgwidth(INT8OID, -1);
numGroupCols = list_length(sortExprs);
/*
* First, calculate cost of naive case.
*/
cost_sort(&dummy_path, root, NIL,
input_plan->total_cost,
input_plan->plan_rows,
input_plan->plan_width);
naive_cost = dummy_path.total_cost;
/*
* Next, calculate cost of deduplicate.
* The first aggregate calculates number of duplicate for
* each unique sort key, then we add cost of sort after
* the aggregate.
*/
num_distinct = estimate_num_groups(root, sortExprs, input_rows);
aggstrategy = AGG_HASHED;
cost_agg(&dummy_path, root, aggstrategy, 1, numGroupCols, num_distinct,
input_plan->startup_cost, input_plan->total_cost,
input_plan->plan_rows, input_plan->plan_width,
0, 0, false);
dummy_path.total_cost +=
incremental_motion_cost(num_distinct,
num_distinct * root->config->cdbpath_segments);
cost_sort(&dummy_path, root, NIL,
dummy_path.total_cost,
num_distinct,
width);
dedup_cost = dummy_path.total_cost;
if (numGroups)
*numGroups = num_distinct;
/* we need some calculates above even if the flag is off */
if (pg_strcasecmp(gp_idf_deduplicate_str, "force") == 0)
return true;
if (pg_strcasecmp(gp_idf_deduplicate_str, "none") == 0)
return false;
return dedup_cost < naive_cost;
}
/*
* wrap_plan_index
*
* A wrapper of wrap_plan. The column name aliases are retrieved from
* sub plan's target list automatically. The main additional operation here
* is to modify varno on each vars in either the new target list and
* hashExpr (qual is not set by wrap_plan.) While wrap_plan creates
* Query with the initial RangeTblEntry with index 1 and this is right,
* we sometimes need to wrap plan by a subquery scan as a part of upper join.
* In that case, varno is not necessarily 1.
*
* If varno > 1, the vars' references are updated. Thus,
* (*query_p)->rtable is not valid anymore, since rtable should have all
* RangeTblEntry up to varno. Still, we leave it in rtable field
* for the caller to use the rte from rtable.
*
* Note that the target list of the plan and query is identical and
* represents the "one on generated plan," which means the caller still needs
* to update other target list like upper parser->targetList. This routines
* does not handle it.
*
* This is in the middle of wrap_plan() and add_subqueryscan(). We actually
* need a more sophisticated and all-round way.
*
* This also updates locus if given, and root->group_pathkeys to match
* the subquery targetlist.
*/
static Plan *
wrap_plan_index(PlannerInfo *root, Plan *plan, Query *query,
List **p_pathkeys, Index varno, const char *alias_name,
Query **query_p)
{
ListCell *l;
plan = wrap_plan(root, plan, query, p_pathkeys,
alias_name, NIL, query_p);
Assert(varno > 0);
if (varno != 1)
{
foreach (l, plan->flow->hashExpr)
{
Var *var = lfirst(l);
if (IsA(var, Var))
{
/* fix varno, which is set to 1 in wrap_plan */
Assert(var->varno == 1);
var->varno = var->varnoold = varno;
}
}
/*
* Currently, plan and new parse tree shares target list.
* If this breaks, we'll need to update parse's target list as well.
*/
Assert(plan->targetlist == (*query_p)->targetList);
foreach (l, plan->targetlist)
{
TargetEntry *tle = lfirst(l);
Var *var = (Var *) tle->expr;
if (IsA(var, Var))
{
/* fix varno, which is set to 1 in wrap_plan */
Assert(var->varno == 1);
var->varno = var->varnoold = varno;
}
}
Assert(IsA(plan, SubqueryScan));
((SubqueryScan *) plan)->scan.scanrelid = varno;
Assert(plan->qual == NIL);
}
/*
* Update group_pathkeys in order to represent this subquery.
*/
root->group_pathkeys =
make_pathkeys_for_groupclause(root->parse->groupClause, plan->targetlist);
return plan;
}
/*
* rebuilt_simple_rel_and_rte
*
* Rebuild arrays for RelOptInfo and RangeTblEntry for the PlannerInfo when
* the underlying range tables are transformed. They can be rebuilt from
* parse->rtable, and will be used in later than here by processes like
* distinctClause planning. We never pfree the original array, since
* it's potentially used by other PlannerInfo which this is copied from.
*/
static void
rebuild_simple_rel_and_rte(PlannerInfo *root)
{
int i;
int array_size;
ListCell *l;
array_size = list_length(root->parse->rtable) + 1;
root->simple_rel_array_size = array_size;
root->simple_rel_array =
(RelOptInfo **) palloc0(sizeof(RelOptInfo *) * array_size);
root->simple_rte_array =
(RangeTblEntry **) palloc0(sizeof(RangeTblEntry *) * array_size);
i = 1;
foreach (l, root->parse->rtable)
{
(void) build_simple_rel(root, i, RELOPT_BASEREL);
root->simple_rte_array[i] = lfirst(l);
i++;
}
}
/*
* make_parallel_or_sequential_agg
*
* This is the common pattern to create multi-phase aggregate by cdb_grouping_planner
* or single agg by make_agg. The code was stolen from grouping_planner, but a little
* simplified under some assumptions to be used in limited condition. Here we assume
* we don't have GROUPING SETS and there is at least one aggregate. This function
* assumes the caller has already set up subplan as group_context->subplan.
*
* The caller may not pass current_pathkeys_p if it's not interested.
*/
static Plan *
make_parallel_or_sequential_agg(PlannerInfo *root, AggClauseCounts *agg_counts,
GroupContext *group_context, List **current_pathkeys_p)
{
Plan *result_plan;
List *current_pathkeys;
/*
* current_pathkeys_p can be NULL, which means the caller isn't interested in
* the pathkeys. Still, we are.
*/
if (current_pathkeys_p)
current_pathkeys = *current_pathkeys_p;
else
current_pathkeys = NIL;
result_plan = cdb_grouping_planner(root, agg_counts, group_context);
if (!result_plan)
{
/*
* If cdb_grouping_planner doesn't return a plan,
* it means the plan should fall back to sequential.
* In that case, multi-phase aggregate plan is not used.
* Here it's much simpler than grouping_planner,
* since we are sure we have at least one aggregate function
* and no GROUPING SETS.
*/
AggStrategy aggstrategy;
result_plan = group_context->subplan;
if (group_context->use_hashed_grouping)
{
/*
* HashAggregate case
*/
aggstrategy = AGG_HASHED;
current_pathkeys = NIL;
}
else
{
/*
* GroupAggregate case
*/
if (root->parse->groupClause)
{
if (!pathkeys_contained_in(root->group_pathkeys,
current_pathkeys))
{
result_plan = (Plan *)
make_sort_from_groupcols(root,
root->parse->groupClause,
group_context->groupColIdx,
false,
result_plan);
mark_sort_locus(result_plan);
}
aggstrategy = AGG_SORTED;
current_pathkeys =
make_pathkeys_for_groupclause(root->parse->groupClause, result_plan->targetlist);
current_pathkeys = canonicalize_pathkeys(root, current_pathkeys);
}
else
{
/*
* No GROUP BY case
*/
aggstrategy = AGG_PLAIN;
current_pathkeys = NIL;
}
}
/*
* Now make a single Agg node.
*/
result_plan = (Plan *) make_agg(root,
root->parse->targetList,
(List *) root->parse->havingQual,
aggstrategy,
false,
group_context->numGroupCols,
group_context->groupColIdx,
*group_context->p_dNumGroups,
0, /* num_nullcols */
0, /* input_grouping */
0, /* grouping */
0, /* rollup_gs_times */
agg_counts->numAggs,
agg_counts->transitionSpace,
result_plan);
mark_passthru_locus(result_plan, true, current_pathkeys != NIL);
}
else
current_pathkeys = *group_context->pcurrent_pathkeys;
if (current_pathkeys_p)
*current_pathkeys_p = current_pathkeys;
return result_plan;
}
/*
* deconstruct_within_agg
* deconstruct within-aggregate and normal aggregate expressions to sub pieces.
*
* In the split-DQA-join pattern, we want to deconstruct individual aggregate
* or percentile expressions, group them, and relocate them to the appropriate
* sub-plan target list. The information about which expression goes to which
* sub-plan is stored in MppGroupContext structure, and the function returns
* Var that points to the expression on sub-plan which the original expression
* is replaced with.
*
* See also similar deconstruct_expr() for the general aggregate case.
*/
static Node *
deconstruct_within_agg(Node *node, MppGroupContext *ctx)
{
return deconstruct_within_agg_mutator(node, ctx);
}
/*
* deconstruct_within_agg_mutator
*
* The workhorse of deconstruct_within_agg()
*/
static Node *
deconstruct_within_agg_mutator(Node *node, MppGroupContext *ctx)
{
TargetEntry *tle;
if (node == NULL)
return NULL;
if (IsA(node, Aggref) || IsA(node, PercentileExpr))
{
Index tlistno;
AttrNumber attno;
List *sortclauses, *dref_tlist;
ListCell *l;
Node *final_node;
/*
* Here we may see normal aggregates, not only percentiles.
* If normal aggs are involved, ctx->wagSortClauses should have
* NIL elements for it.
*/
if (IsA(node, PercentileExpr))
sortclauses = ((PercentileExpr *) node)->sortClause;
else
sortclauses = NIL;
/*
* Find the right sub-plan which this expression should go.
*/
tlistno = 0;
foreach (l, ctx->wagSortClauses)
{
/* Note NIL can be equal to NIL, too. */
if (equal(sortclauses, lfirst(l)))
break;
tlistno++;
}
/* Not found? Should not happen... */
if (!l)
elog(ERROR, "unexpected use of aggregate");
dref_tlist = ctx->dref_tlists[tlistno];
/*
* If the same expression exists at the same level, recycle it.
* Otherwise, create a new expression.
*/
tle = tlist_member(node, dref_tlist);
attno = list_length(dref_tlist) + 1;
if (!tle)
{
/*
* Don't copy node, share it with tlist, for later operation
* can modify the var reference in tlist.
*/
tle = makeTargetEntry((Expr *) node,
attno, NULL, false);
dref_tlist = lappend(dref_tlist, tle);
}
else
attno = tle->resno;
ctx->dref_tlists[tlistno] = dref_tlist;
final_node = (Node *) makeVar(dqa_base_varno + tlistno,
attno, exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr), 0);
return final_node;
}
/*
* If the given expression is a grouping expression, replace it with
* a Var node referring to the (lower) preliminary aggregation's
* target list.
*/
tle = tlist_member(node, ctx->grps_tlist);
if (tle != NULL)
return (Node *) makeVar(grp_varno, tle->resno,
exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr), 0);
return expression_tree_mutator(node, deconstruct_within_agg_mutator, ctx);
}
/*
* Returns percentile expressions associated with the specified sortClause.
*
* Because we want to keep percentile expressions in target list or having list
* to make any changes to them identically, we extract those expressions
* every time we need. It is more valuable here to keep consistent than to reduce
* a few cycles by copying those nodes.
*/
static List *
fetch_percentiles(Query *parse, List *sortClause)
{
List *nodes, *result;
ListCell *l;
nodes = list_concat(extract_nodes(NULL, (Node *) parse->targetList, T_PercentileExpr),
extract_nodes(NULL, (Node *) parse->havingQual, T_PercentileExpr));
nodes = list_concat(nodes,
extract_nodes(NULL, (Node *) parse->scatterClause, T_PercentileExpr));
result = NIL;
foreach (l, nodes)
{
PercentileExpr *p = lfirst(l);
if (equal(sortClause, p->sortClause))
result = lappend(result, p);
}
return result;
}
/*
* make_deduplicate_plan
*
* Build a plan that removes duplicate rows with GROUP BY + ORDER BY (in WITHIN)
* clause. It may or may not use two phase aggregate inside. Note the output
* plan has different target list than non-deduplicate plan, since the aggregate
* process cannot process un-grouped columns.
*/
static Plan *
make_deduplicate_plan(PlannerInfo *root,
GroupContext *group_context,
List *groupClause,
List *sortClause,
double numGroups,
AttrNumber *pc_pos_p,
List **current_pathkeys_p,
Plan *subplan)
{
Plan *result_plan;
Aggref *aggref;
GroupContext ctx;
ListCell *l1, *l2;
TargetEntry *pc_tle;
List *tlist;
int numGroupCols;
AttrNumber *groupColIdx;
List *pathkeys = NIL;
bool querynode_changed = false;
AggClauseCounts agg_counts;
bool use_hashed_grouping;
Query *original_parse, *parse;
List *original_group_pathkeys;
List *sub_tlist = group_context->sub_tlist;
Expr *tvexpr; /* ORDER BY expression */
const Index Outer = 1;
/*
* It is doable to just concatenate groupClause and sortClause,
* but it is more semantic to convert sortClause to groupClause.
* Especially we want to use make_pathkeys_from_grouplcause later where
* sortClause is not handled.
*
* Copy input groupClause, since we change it.
*/
groupClause = copyObject(groupClause);
foreach (l1, sortClause)
{
SortClause *sc = copyObject(lfirst(l1));
sc->type = T_GroupClause;
groupClause = lappend(groupClause, sc);
}
groupColIdx = get_grouplist_colidx(groupClause, sub_tlist, &numGroupCols);
/*
* Make target list derived from sub_tlist. Note that we filter out
* ungrouped columns which will be bogus after the aggregate.
*/
tlist = NIL;
foreach (l1, sub_tlist)
{
TargetEntry *tle = lfirst(l1);
TargetEntry *newtle;
/*
* Check if this target is a part of grouping columns.
*/
foreach (l2, groupClause)
{
GroupClause *gc = lfirst(l2);
if (gc->tleSortGroupRef == tle->ressortgroupref)
break;
}
/* Found, so add it */
if (l2)
{
newtle = flatCopyTargetEntry(tle);
newtle->resno = list_length(tlist) + 1;
tlist = lappend(tlist, newtle);
}
}
/*
* Count ORDER BY expression so that since NULL input should
* be ignored. We still need not to eliminate NULL input since
* the result should be returned per group even if the group
* has nothing but NULL.
*/
tvexpr = (Expr *)
get_sortgroupclause_expr(linitial(sortClause), sub_tlist);
/*
* Append peer count expression to target list.
*/
*pc_pos_p = list_length(tlist) + 1;
aggref = makeAggrefByOid(AGGFNOID_COUNT_ANY, list_make1(tvexpr));
pc_tle = makeTargetEntry((Expr *) aggref,
*pc_pos_p, "peer_count", false);
tlist = lappend(tlist, pc_tle);
MemSet(&agg_counts, 0, sizeof(AggClauseCounts));
count_agg_clauses((Node *) tlist, &agg_counts);
use_hashed_grouping = choose_hashed_grouping(root,
group_context->tuple_fraction,
group_context->cheapest_path,
NULL,
numGroups,
&agg_counts);
use_hashed_grouping = agg_counts.canHashAgg;
ctx.best_path = group_context->best_path;
ctx.cheapest_path = group_context->cheapest_path;
ctx.subplan = subplan;
ctx.sub_tlist = NIL;
ctx.tlist = tlist;
ctx.use_hashed_grouping = use_hashed_grouping;
ctx.tuple_fraction = 0.1;
ctx.canonical_grpsets = NULL;
ctx.grouping = 0;
ctx.numGroupCols = numGroupCols;
ctx.groupColIdx = groupColIdx;
ctx.numDistinctCols = 0;
ctx.distinctColIdx = NULL;
ctx.p_dNumGroups = &numGroups;
ctx.pcurrent_pathkeys = &pathkeys;
ctx.querynode_changed = &querynode_changed;
original_parse = root->parse;
original_group_pathkeys = root->group_pathkeys;
root->parse = parse = copyObject(root->parse);
root->parse->groupClause = groupClause;
root->parse->targetList = tlist;
root->parse->havingQual = NULL;
root->parse->scatterClause = NIL;
root->group_pathkeys = make_pathkeys_for_groupclause(groupClause, tlist);
/*
* Make a multi-phase or simple agg plan.
*/
result_plan = make_parallel_or_sequential_agg(root,
&agg_counts,
&ctx,
current_pathkeys_p);
root->parse = original_parse;
root->group_pathkeys = original_group_pathkeys;
/*
* Add SubqueryScan to wrap this anyway, so that
* the outcome of deduplicate can be treated as a simple subquery relation.
*/
result_plan = wrap_plan_index(root,
result_plan,
parse,
current_pathkeys_p,
Outer,
"deduplicate",
&parse);
/*
* Update the reference based on the aggregate target list. This ensures
* the target list of parse tree is pointing the top SubqueryScan's Vars.
*/
root->parse->targetList = (List *)
cdbpullup_expr((Expr *) root->parse->targetList, tlist, NIL, Outer);
root->parse->havingQual = (Node *)
cdbpullup_expr((Expr *) root->parse->havingQual, tlist, NIL, Outer);
root->parse->scatterClause = (List *)
cdbpullup_expr((Expr *) root->parse->scatterClause, tlist, NIL, Outer);
root->parse->rtable = parse->rtable;
// Rebuild arrays for RelOptInfo and RangeTblEntry for the PlannerInfo
// since the underlying range tables have been transformed
rebuild_simple_rel_and_rte(root);
return result_plan;
}
/*
* within_agg_make_baseplan
*
* Creates the scaffold plan which will be shared between the outer and
* inner plans. It may choose de-duplicate plan based on costs and GUC.
* The output plan is always a closed SubqueryScan. This will set
* pc_pos in wag_context.
*/
static Plan *
within_agg_make_baseplan(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
List *sortClause,
Plan *result_plan)
{
List *sub_tlist = group_context->sub_tlist;
double dedup_numGroups;
List *dedup_key_exprs;
/*
* The GROUP BY keys are the normal grouping keys + sort key.
*/
dedup_key_exprs = list_concat(
get_sortgrouplist_exprs(root->parse->groupClause, sub_tlist),
get_sortgrouplist_exprs(sortClause, sub_tlist));
/*
* Decide whether deduplicate is useful or not.
*/
wag_context->use_deduplicate = choose_deduplicate(root,
dedup_key_exprs,
result_plan,
&dedup_numGroups);
/*
* Create the base subplan for the upper join. We may take
* decuplicate way or not, but anyway the target list of result_plan has
* an extra target entry for the peer count.
*/
if (wag_context->use_deduplicate)
{
/*
* The deduplicate optimization. We reduce identical rows
* and record the number of reduced rows, so that
* percentile function can see the original rows.
* It's similar to the run-length encoding.
*
* root->parse is updated inside to represent this subquery.
*/
result_plan = make_deduplicate_plan(root,
group_context,
root->parse->groupClause,
sortClause,
dedup_numGroups,
&wag_context->pc_pos,
&wag_context->current_pathkeys,
result_plan);
}
else
{
Query *subquery;
Expr *tv_expr;
NullTest *nt;
CaseWhen *casearg;
CaseExpr *pc_expr;
TargetEntry *pc_tle;
/*
* The naive case. Wrapping this plan with SubqueryScan anyway
* is demanded as the underlying plan might be SharedInputScan where
* the target list should not be modified, and in order to align
* the semantics with de-duplicate case.
*/
result_plan = wrap_plan_index(root,
result_plan,
root->parse,
&wag_context->current_pathkeys,
1,
"nondeduplicate",
&subquery);
root->parse->targetList = (List *)
cdbpullup_expr((Expr *) root->parse->targetList, sub_tlist, NIL, 1);
root->parse->havingQual = (Node *)
cdbpullup_expr((Expr *) root->parse->havingQual, sub_tlist, NIL, 1);
root->parse->scatterClause = (List *)
cdbpullup_expr((Expr *) root->parse->scatterClause, sub_tlist, NIL, 1);
root->parse->rtable = subquery->rtable;
/*
* We make zero as the peer count if tv is NULL. The inner
* should count up how many non-NULL there is.
*
* pc = CASE WHEN tv IS NOT NULL THEN 1 ELSE 0 END
*/
tv_expr = (Expr *)
get_sortgroupclause_expr(linitial(sortClause), result_plan->targetlist);
nt = makeNode(NullTest);
nt->arg = tv_expr;
nt->nulltesttype = IS_NOT_NULL;
casearg = makeNode(CaseWhen);
casearg->expr = (Expr *) nt;
casearg->result = (Expr *)
makeConst(INT8OID, -1, 8, Int64GetDatum(1), false, true);
pc_expr = makeNode(CaseExpr);
pc_expr->casetype = INT8OID;
pc_expr->arg = NULL;
pc_expr->args = list_make1(casearg);
pc_expr->defresult = (Expr *)
makeConst(INT8OID, -1, 8, Int64GetDatum(0), false, true);
pc_tle = makeTargetEntry((Expr *) pc_expr,
list_length(result_plan->targetlist) + 1,
pstrdup("peer_count"),
false);
wag_context->pc_pos = pc_tle->resno;
result_plan->targetlist = lappend(result_plan->targetlist, pc_tle);
}
/*
* result_plan is SubqueryScan here whichever we took. Update locus
* in order to represent this subqeury.
*/
Assert(IsA(result_plan, SubqueryScan));
return result_plan;
}
/*
* within_agg_add_outer_sort
*
* Adds Sort to the outer plan, with Motion if desired. This is very
* straightforward and does not contain within-aggregate specific stuff.
* current_pathkeys in wag_context may be updated.
*/
static Plan *
within_agg_add_outer_sort(PlannerInfo *root,
WithinAggContext *wag_context,
List *sortClause,
Plan *outer_plan)
{
List *sort_pathkeys;
Query *outer_parse;
const Index Outer = 1;
if (!root->parse->groupClause)
{
/*
* Plain aggregate case. Gather tuples into QD.
*/
sort_pathkeys =
make_pathkeys_for_sortclauses(sortClause, outer_plan->targetlist);
sort_pathkeys = canonicalize_pathkeys(root, sort_pathkeys);
/*
* Check the sort redundancy.
*/
if (!pathkeys_contained_in(sort_pathkeys, wag_context->current_pathkeys))
{
outer_plan = (Plan *)
make_sort_from_sortclauses(root, sortClause, outer_plan);
mark_sort_locus(outer_plan);
wag_context->current_pathkeys = sort_pathkeys;
}
if (outer_plan->flow->flotype != FLOW_SINGLETON)
{
outer_plan = (Plan *) make_motion_gather_to_QE(outer_plan, true);
outer_plan->total_cost +=
incremental_motion_cost(outer_plan->plan_rows,
outer_plan->plan_rows * root->config->cdbpath_segments);
}
}
else
{
CdbPathLocus current_locus;
List *groupSortClauses;
Assert(root->group_pathkeys);
/*
* Create current locus.
*/
current_locus = cdbpathlocus_from_flow(outer_plan->flow);
if (CdbPathLocus_IsPartitioned(current_locus) &&
outer_plan->flow->hashExpr)
current_locus = cdbpathlocus_from_exprs(root, outer_plan->flow->hashExpr);
/*
* Add a redistribute motion if the group key doesn't collocate.
* group_pathkeys should have been fixed to reflect the latest targetlist.
* best_path->locus is wrong here since we put SubqueryScan already.
*/
if (!cdbpathlocus_collocates(current_locus, root->group_pathkeys, false /*exact_match*/))
{
List *groupExprs;
groupExprs = get_sortgrouplist_exprs(root->parse->groupClause,
outer_plan->targetlist);
outer_plan = (Plan *)
make_motion_hash(root, outer_plan, groupExprs);
outer_plan->total_cost +=
incremental_motion_cost(outer_plan->plan_rows,
outer_plan->plan_rows);
/*
* Invalidate pathkeys; the result is not sorted any more.
*/
wag_context->current_pathkeys = NULL;
}
/*
* Now we can add sort node.
*/
groupSortClauses = list_concat(copyObject(root->parse->groupClause),
sortClause);
sort_pathkeys =
make_pathkeys_for_sortclauses(groupSortClauses, outer_plan->targetlist);
sort_pathkeys = canonicalize_pathkeys(root, sort_pathkeys);
if (!pathkeys_contained_in(sort_pathkeys, wag_context->current_pathkeys))
{
outer_plan = (Plan *) make_sort_from_sortclauses(root,
groupSortClauses,
outer_plan);
mark_sort_locus(outer_plan);
/*
* Update current pathkeys.
*/
wag_context->current_pathkeys = sort_pathkeys;
}
}
/*
* Prepare parse tree for wrapping it by SubqueryScan.
*/
outer_parse = copyObject(root->parse);
root->parse->targetList = (List *)
cdbpullup_expr((Expr *) root->parse->targetList, outer_plan->targetlist, NIL, 1);
root->parse->havingQual = (Node *)
cdbpullup_expr((Expr *) root->parse->havingQual, outer_plan->targetlist, NIL, 1);
root->parse->scatterClause = (List *)
cdbpullup_expr((Expr *) root->parse->scatterClause, outer_plan->targetlist, NIL, 1);
/*
* Wrap plan by subquery as the outer of upcoming join.
*/
outer_plan = wrap_plan_index(root,
outer_plan,
outer_parse,
&wag_context->current_pathkeys,
Outer,
"outer_plan",
&outer_parse);
Assert(list_length(wag_context->rtable) == 0);
wag_context->rtable = list_concat(wag_context->rtable, outer_parse->rtable);
return outer_plan;
}
/*
* within_agg_construct_inner
*
* Constructs the inner plan which calculates the total non-NULL row count
* of the input. The aggregate function is actually not count(), but sum()
* of the peer count which has considered NULL input in both of naive and
* de-duplicate cases. Updates wag_context->tc_pos and the output plan
* should be SubqueryScan ready for the join.
*/
static Plan *
within_agg_construct_inner(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
Plan *inner_plan)
{
ListCell *l;
int idx;
int numGroupCols;
Path input_path;
TargetEntry *pc_tle;
Expr *tc_expr;
GroupContext ctx;
List *tlist;
double numGroups = *group_context->p_dNumGroups;
bool use_hashed_grouping;
bool querynode_changed = false;
List *pathkeys = NIL;
AggClauseCounts agg_counts;
AttrNumber *grpColIdx;
Query *original_parse;
List *original_group_pathkeys;
Query *parse;
const Index Inner = 2;
grpColIdx = get_grouplist_colidx(root->parse->groupClause,
inner_plan->targetlist, &numGroupCols);
/* build grouping key columns */
tlist = NIL;
foreach_with_count (l, root->parse->groupClause, idx)
{
GroupClause *gc = (GroupClause *) lfirst(l);
TargetEntry *tle, *newtle;
tle = get_sortgroupclause_tle(gc, inner_plan->targetlist);
newtle = flatCopyTargetEntry(tle);
newtle->resno = (AttrNumber) idx + 1;
tlist = lappend(tlist, newtle);
}
/*
* Sum up the peer count to count the total number of rows per group.
*/
pc_tle = get_tle_by_resno(inner_plan->targetlist, wag_context->pc_pos);
tc_expr = (Expr *) makeAggrefByOid(AGGFNOID_SUM_BIGINT,
list_make1(pc_tle->expr));
tc_expr = (Expr *) coerce_to_bigint(NULL, (Node *) tc_expr, "sum_to_bigint");
wag_context->tc_pos = list_length(tlist) + 1;
tlist = lappend(tlist, makeTargetEntry((Expr *) tc_expr,
wag_context->tc_pos,
"total_count",
false));
/*
* best_path is not appropriate here after building some SubqueryScan.
* Build up a dummy Path to reflect the underlying plan, but
* needed information is only locus in cdb_grouping_planner.
*/
memcpy(&input_path, group_context->best_path, sizeof(Path));
/*
* Create locus back from flow. Unfortunately cdbpathlocus_from_flow()
* doesn't return hashed locus in repartitioned case, so we need to
* call from_exprs() again if it's available.
*/
input_path.locus = cdbpathlocus_from_flow(inner_plan->flow);
if (CdbPathLocus_IsPartitioned(input_path.locus) &&
inner_plan->flow->hashExpr)
input_path.locus = cdbpathlocus_from_exprs(root, inner_plan->flow->hashExpr);
input_path.pathkeys = wag_context->inner_pathkeys;
MemSet(&agg_counts, 0, sizeof(AggClauseCounts));
count_agg_clauses((Node *) tlist, &agg_counts);
/*
* Evaluate possibility for hash/sort strategy. Things have been changed
* since the last decision in grouping_planner(), as the base plan
* may now be sorted.
*/
use_hashed_grouping = choose_hashed_grouping(root,
group_context->tuple_fraction,
&input_path,
&input_path,
numGroups,
&agg_counts);
ctx.best_path = &input_path;
ctx.cheapest_path = &input_path;
ctx.subplan = inner_plan;
ctx.sub_tlist = NIL;
ctx.tlist = tlist;
ctx.use_hashed_grouping = use_hashed_grouping;
ctx.tuple_fraction = 0.1;
ctx.canonical_grpsets = NULL;
ctx.grouping = 0;
ctx.numGroupCols = numGroupCols;
ctx.groupColIdx = grpColIdx;
ctx.numDistinctCols = 0;
ctx.distinctColIdx = NULL;
ctx.p_dNumGroups = &numGroups;
ctx.pcurrent_pathkeys = &pathkeys;
ctx.querynode_changed = &querynode_changed;
/*
* Save the parse tree, for cdb_grouping_planner will modify it.
*/
original_parse = root->parse;
original_group_pathkeys = root->group_pathkeys;
root->parse = parse = copyObject(root->parse);
root->parse->targetList = tlist;
root->parse->havingQual = NULL;
root->parse->scatterClause = NIL;
root->group_pathkeys =
make_pathkeys_for_sortclauses(parse->groupClause, tlist);
/*
* Make a multi-phase or simple aggregate plan.
*/
inner_plan = make_parallel_or_sequential_agg(root,
&agg_counts,
&ctx,
&wag_context->inner_pathkeys);
/*
* Wrap plan by subquery as the inner of upcoming join.
*/
inner_plan = wrap_plan_index(root,
inner_plan,
parse,
&wag_context->inner_pathkeys,
Inner,
"inner_plan",
&parse);
wag_context->rtable = list_concat(wag_context->rtable, parse->rtable);
/* outer + inner = 2 */
Assert(list_length(wag_context->rtable) == 2);
/*
* Restore the original info. Note that group_pathkeys is updated
* in wrap_plan_index(), so don't move this before it.
*/
root->parse = original_parse;
root->group_pathkeys = original_group_pathkeys;
return inner_plan;
}
/*
* within_agg_join_plans
*
* Joins the outer and inner plans and filters out unrelated rows. We use MergeJoin
* as the outer is already sorted and want to keep the order to the aggregate.
* It requires the inner plan to be able to be re-scannable, so we add sort
* or material node to the inner plan. The join should produce exact same number
* of the outer plan as the join clause is in GROUP BY keys and the inner is
* grouped with them.
*/
static Plan *
within_agg_join_plans(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
Plan *outer_plan,
Plan *inner_plan)
{
Plan *result_plan;
ListCell *l;
int idx;
List *join_tlist;
List *join_clause;
const Index Outer = 1, Inner = 2;
List *extravars;
Var *pc_var, *tc_var;
/*
* Up to now, these should've been prepared.
*/
Assert(wag_context->pc_pos > 0);
Assert(wag_context->tc_pos > 0);
/*
* Build target list for grouping columns.
*/
join_clause = NIL;
foreach_with_count (l, root->parse->groupClause, idx)
{
GroupClause *gc = (GroupClause*) lfirst(l);
TargetEntry *tle;
Var *outer_var, *inner_var;
Expr *qual;
/*
* Construct outer group keys.
*/
tle = get_sortgroupclause_tle(gc, outer_plan->targetlist);
Assert(tle && IsA(tle->expr, Var));
outer_var = makeVar(Outer, tle->resno,
exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr), 0);
/*
* Construct inner group keys.
*/
tle = get_tle_by_resno(inner_plan->targetlist, idx + 1);
Assert(tle && IsA(tle->expr, Var));
inner_var = makeVar(Inner, tle->resno,
exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr), 0);
/*
* Make join clause for group keys.
*/
qual = make_op(NULL, list_make1(makeString("=")),
(Node *) outer_var, (Node *) inner_var, -1);
qual->type = T_DistinctExpr;
qual = make_notclause(qual);
join_clause = lappend(join_clause, qual);
}
/*
* This is similar to make_subplanTargetList(), but things are much simpler.
* Note that this makes sure that expressions like SRF are going to be
* in the upper aggregate target list rather than in this join target list.
*/
join_tlist = flatten_tlist(root->parse->targetList);
extravars = pull_var_clause(root->parse->havingQual, false);
join_tlist = add_to_flat_tlist(join_tlist, extravars, false);
foreach (l, root->parse->groupClause)
{
GroupClause *gc = lfirst(l);
TargetEntry *gc_tle, *join_tle;
/*
* We need the grouping expressions in the target list. If they are
* in the taget list already, we remember the grouping reference
* since exracting vars drop those information. Otherwise, we
* simply append the entry to the target list.
*/
gc_tle = get_sortgroupclause_tle(gc, root->parse->targetList);
join_tle = tlist_member((Node *) gc_tle->expr, join_tlist);
if (join_tle)
{
join_tle->ressortgroupref = gc->tleSortGroupRef;
join_tle->resname = gc_tle->resname;
}
else
{
join_tle = flatCopyTargetEntry(gc_tle);
join_tlist = lappend(join_tlist, join_tle);
}
}
/*
* Make sure that the peer count and the total count is in the
* target list of the join. They will be needed in the upper
* final aggregate by the percentile functions.
*/
pc_var = makeVar(Outer, wag_context->pc_pos, INT8OID, -1, 0);
tc_var = makeVar(Inner, wag_context->tc_pos, INT8OID, -1, 0);
join_tlist = add_to_flat_tlist(join_tlist,
list_make2((void *) pc_var, (void *) tc_var),
false);
/*
* It is ideal to tell if the inner plan is fine to merge-join by
* examining it as re-scannable plan, but it seems we don't have
* such infrastructure, so here we assume the inner plan is not
* re-scannable and not sorted. If it is a grouping query,
* we add sort node, otherwise just put a materialize node.
*/
if (root->parse->groupClause && !wag_context->inner_pathkeys)
{
AttrNumber *grpColIdx;
grpColIdx = get_grouplist_colidx(root->parse->groupClause,
inner_plan->targetlist, NULL);
inner_plan = (Plan *)
make_sort_from_groupcols(root,
root->parse->groupClause,
grpColIdx,
false,
inner_plan);
mark_sort_locus(inner_plan);
}
else
{
inner_plan = (Plan *) make_material(inner_plan);
mark_passthru_locus(inner_plan, true, true);
}
/*
* All set. Join two plans.
* We choose cartesian product if there is no join clauses, meaning
* no grouping happens.
*/
if (list_length(join_clause) > 0)
result_plan = (Plan *) make_mergejoin(join_tlist,
NIL,
NIL,
join_clause,
outer_plan,
inner_plan,
JOIN_INNER);
else
result_plan = (Plan *) make_nestloop(join_tlist,
NIL,
NIL,
outer_plan,
inner_plan,
JOIN_INNER);
result_plan->startup_cost = outer_plan->startup_cost + inner_plan->startup_cost;
result_plan->plan_rows = outer_plan->plan_rows;
result_plan->total_cost = outer_plan->total_cost + inner_plan->total_cost;
result_plan->total_cost += cpu_tuple_cost * result_plan->plan_rows;
mark_passthru_locus(result_plan, true, true);
return result_plan;
}
/*
* within_agg_final_agg
*
* The final stage of plan_within_agg_persort(). The input plans are already
* joined and the total count from the inner plan is available now. The
* peer count and the total count is attached to the PercentileExpr so that
* it will get those values in the percentile function in the executor.
*/
static Plan *
within_agg_final_agg(PlannerInfo *root,
GroupContext *group_context,
WithinAggContext *wag_context,
List *sortClause,
Plan *result_plan)
{
ListCell *l;
List *percentiles;
Var *pc_var, *tc_var;
AttrNumber *grpColIdx;
int numGroupCols;
AggClauseCounts agg_counts;
AggStrategy aggstrategy;
const Index Outer = 1, Inner = 2;
/*
* Sanity check. These should've been prepared up to now.
*/
Assert(wag_context->pc_pos > 0);
Assert(wag_context->tc_pos > 0);
/*
* Attach the peer count and the total count expressions to
* PercentileExpr, which will be needed in the executor.
*/
percentiles = fetch_percentiles(root->parse, sortClause);
pc_var = makeVar(Outer, wag_context->pc_pos, INT8OID, -1, 0);
tc_var = makeVar(Inner, wag_context->tc_pos, INT8OID, -1, 0);
foreach (l, percentiles)
{
PercentileExpr *perc = lfirst(l);
perc->pcExpr = (Expr *) pc_var;
perc->tcExpr = (Expr *) tc_var;
}
MemSet(&agg_counts, 0, sizeof(AggClauseCounts));
count_agg_clauses((Node *) root->parse->targetList, &agg_counts);
/*
* Prepare GROUP BY clause for the final aggregate.
* Make sure the column indices point to the topmost target list.
*/
grpColIdx = get_grouplist_colidx(root->parse->groupClause,
result_plan->targetlist, &numGroupCols);
aggstrategy = root->parse->groupClause ? AGG_SORTED : AGG_PLAIN;
result_plan = (Plan *) make_agg(root,
root->parse->targetList,
(List *) root->parse->havingQual,
aggstrategy,
false,
numGroupCols,
grpColIdx,
*group_context->p_dNumGroups,
0, /* num_nullcols */
0, /* input_grouping */
0, /* grouping */
0, /* rollup_gs_times */
1, /* numAggs */
agg_counts.transitionSpace,
result_plan);
/*
* Stop copying sorts in flow, for the targetlist doesn't have them anymore.
*/
mark_passthru_locus(result_plan, true, false);
/*
* current_pathkeys is not needed anymore, but just in case.
*/
wag_context->current_pathkeys = NIL;
return result_plan;
}
/*
* plan_within_agg_persort
* Make a series of plans to calculate percentile per identical sort clause.
*
* The main stream of this planner is:
* - Build the base plan
* - Sort the input by group and target value
* - Split the input into two
* - One for the outer
* - The other for the inner, aggregate to count actual row number
* - Join two trees to get the inner row count to the sorted outer together
* - Aggregate the result of join to perform the final aggregate
*
* The first step is to figure out if we could optimize the input by
* de-duplicate table. If so, we create a sub-plan with aggregate to
* reduce the rows into GROUP + ORDER unit and add the peer count in this unit.
* If we don't take de-duplicate plan, we append constant 1 as the peer count
* to the target list to match the semantics with de-duplicate plan. In any
* case, the input plan is wrapped by a SubqueryScan in order not to confuse
* upper plan. The peer count takes account into NULL, so that the percentile
* function can operate on only non-NULL target value.
*
* The next step is for the outer plan; sort the input tuples by group clause
* and the target value. During this process, the input tuples are distributed
* by the group clause. We need to keep this order until the final aggregate
* so that the final percentile function can tell ordering of the input.
*
* Then we look at the inner side. It basically calculates number of rows
* per group. Since we know the peer count per each distinct target value in
* a group in either of naive or deduplicate approach, we actually take sum()
* rather than count() to calculate the total number of non-NULL target values.
*
* Now, we can join two plans. The inner total count is going to be with
* the target value for each group here. Note that we use Merge Join to
* keep the outer order and the inner should have capability to be rescanned
* per its requirement.
*
* Finally, the rows are aggregated per group. Since PercentileExpr needs
* to know the peer count and the total count in conjunction with its
* original argument and the target value (as the ORDER BY clause,) and we now
* know where those values are, here we attach those expressions to
* PercentileExpr nodes. These additional expressions are similar to
* AggOrder in Aggref, which will be combined with the original argument
* expression in the executor to be evaluated.
*
* The final result of this function is a plan ending Agg node with target
* list including each percentile result. Note this function should not
* change the semantics of input target list as an encapsulation.
*/
static Plan *
plan_within_agg_persort(PlannerInfo *root,
GroupContext *group_context,
List *sortClause,
List *current_pathkeys,
Plan *result_plan)
{
WithinAggContext wag_context;
Plan *outer_plan, *inner_plan;
List *partners;
ListCell *l;
memset(&wag_context, 0, sizeof(WithinAggContext));
wag_context.current_pathkeys = current_pathkeys;
/*
* Group clause expressions should be in ascending order,
* because our MergeJoin is not able to handle descending-ordered
* child plans. It is desirable to improve MergeJoin, but it requires
* amount of work.
*/
foreach (l, root->parse->groupClause)
{
GroupClause *gc = lfirst(l);
Node *gcexpr;
Oid gctype;
/*
* We assume only flattened grouping expressions here.
*/
Assert(IsA(gc, GroupClause));
gcexpr = get_sortgroupclause_expr(gc, group_context->sub_tlist);
gctype = exprType(gcexpr);
gc->sortop = ordering_oper_opid(gctype);
}
/*
* Make a common plan shared by outer and inner plan. It may become
* a de-duplicate plan.
*/
result_plan = within_agg_make_baseplan(root,
group_context,
&wag_context,
sortClause,
result_plan);
Assert(wag_context.pc_pos > 0);
Assert(IsA(result_plan, SubqueryScan));
/*
* Split the tree into outer and inner which will be joined later.
* It comes before Sort, so that the both of outer and inner run
* in parallel. We observed in most cases splitting it here requires
* RedistributeMotion in both sides, which allows more parallel way.
*/
partners = share_plan(root, result_plan, 2);
outer_plan = list_nth(partners, 0);
inner_plan = list_nth(partners, 1);
/*
* Keep the pathkeys for the inner as pre-Subquery one.
*/
wag_context.inner_pathkeys = wag_context.current_pathkeys;
/*
* Add sort if necessary on top of outer plan of join.
*/
outer_plan = within_agg_add_outer_sort(root, &wag_context, sortClause, outer_plan);
Assert(IsA(outer_plan, SubqueryScan));
Assert(list_length(wag_context.rtable) == 1);
/*
* Construct inner plan of join that returns only number of rows.
* The inner side always create target list looking like
* G1, G2, ..., count(*) TP
*/
inner_plan = within_agg_construct_inner(root,
group_context,
&wag_context,
inner_plan);
Assert(IsA(inner_plan, SubqueryScan));
Assert(wag_context.tc_pos > 0);
Assert(list_length(wag_context.rtable) == 2);
/*
* merge join to find interesting one or two rows per group per
* percentile.
*/
result_plan = within_agg_join_plans(root,
group_context,
&wag_context,
outer_plan,
inner_plan);
#ifdef NOT_USED
if (true)
{
/*
* For debug purpose.
* This helps to see what's the intermediate result.
*/
root->parse->targetList = copyObject(result_plan->targetlist);
root->parse->rtable = wag_context.rtable;
return result_plan;
}
#endif
/*
* Finally put Agg node to calculate exact value with filtered few rows.
*/
result_plan = within_agg_final_agg(root,
group_context,
&wag_context,
sortClause,
result_plan);
root->parse->targetList = copyObject(result_plan->targetlist);
root->parse->rtable = wag_context.rtable;
rebuild_simple_rel_and_rte(root);
return result_plan;
}
/*
* within_agg_planner
*
* This is the entry point of within-aggregate planner.
* Makes a specialized plan for within aggregates. If we find the query
* has normal aggregates among the within-aggregates, or within-aggregates
* have different sortClause, we generate multiple sub-plans regarding with
* the sortClause and combine the results by joining those sub-plan trees.
* For the normal aggregates, we call cdb_grouping_planner() to plan the tree,
* and treat it as one of the split sub-plans.
*/
Plan *
within_agg_planner(PlannerInfo *root,
AggClauseCounts *agg_counts,
GroupContext *group_context)
{
List *aggnodes, *percnodes;
ListCell *l;
List **aggreflist;
List **sortlist;
int numsortlist;
int numGroupCols, numDistinctCols;
AttrNumber *grpColIdx, *distinctColIdx;
int i;
List *sub_tlist;
AttrNumber next_resno;
List *current_pathkeys;
Plan *result_plan;
aggnodes = extract_nodes(NULL, (Node *) root->parse->targetList, T_Aggref);
aggnodes = list_concat(aggnodes,
extract_nodes(NULL, (Node *) root->parse->havingQual, T_Aggref));
aggnodes = list_concat(aggnodes,
extract_nodes(NULL, (Node *) root->parse->scatterClause, T_Aggref));
percnodes = extract_nodes(NULL, (Node *) root->parse->targetList, T_PercentileExpr);
percnodes = list_concat(percnodes,
extract_nodes(NULL, (Node *) root->parse->havingQual, T_PercentileExpr));
percnodes = list_concat(percnodes,
extract_nodes(NULL, (Node *) root->parse->scatterClause, T_PercentileExpr));
/* Allocate maximum number of list */
numsortlist = list_length(percnodes) + 1;
/* initialize each element with NIL */
aggreflist = (List **) palloc0(sizeof(List *) * numsortlist);
sortlist = (List **) palloc0(sizeof(List *) * numsortlist);
numsortlist = 0; /* Use this as a counter */
sub_tlist = group_context->sub_tlist;
next_resno = list_length(sub_tlist) + 1;
/*
* WITHIN aggregates are not supported in the grouping extensions.
* However, parse->groupClause may have non-flattened GroupClause list.
* We simply flatten it by reconstruct_group_clause under the assumption
* that we have denied grouping extension cases.
*/
Assert(!is_grouping_extension(group_context->canonical_grpsets));
grpColIdx = get_grouplist_colidx(
root->parse->groupClause, sub_tlist, &numGroupCols);
root->parse->groupClause =
reconstruct_group_clause(root->parse->groupClause,
sub_tlist,
grpColIdx,
numGroupCols);
numDistinctCols = agg_counts->numDistinctAggs;
distinctColIdx = (AttrNumber *) palloc(sizeof(AttrNumber) * numDistinctCols);
numDistinctCols = 0; /* Use this as a counter */
/*
* Collect aggref nodes to process them separately from percentiles.
* Note we represent this special case by NIL for sortClause (sortlist.)
*/
if (aggnodes)
{
sortlist[numsortlist] = NIL;
foreach (l, aggnodes)
{
Aggref *aggref = lfirst(l);
if (aggref->aggdistinct)
{
Node *arg;
TargetEntry *sub_tle;
Assert(list_length(aggref->args) == 1);
arg = linitial(aggref->args);
sub_tle = tlist_member(arg, sub_tlist);
if (!sub_tle)
{
sub_tle = makeTargetEntry((Expr *) arg,
next_resno++,
"<expr>",
true);
sub_tlist = lappend(sub_tlist, sub_tle);
}
distinctColIdx[numDistinctCols++] = sub_tle->resno;
}
aggreflist[numsortlist] = lappend(aggreflist[numsortlist], aggref);
}
numsortlist++;
}
/*
* Collect percentile nodes and classify them into some groups by sortClause.
* During this process, if sub_tlist lacks target entry for the
* sortClause, it is added.
*/
foreach (l, percnodes)
{
PercentileExpr *perc = lfirst(l);
List *sortClause;
ListCell *sl;
sortClause = perc->sortClause;
Assert(sortClause);
/*
* We need to add tlist to the parse's tlist. This is
* basically parser's task, but the list is separated
* to keep away from ungroup'ed columns check and
* this is the right place to add back to the parser's tlist.
*/
foreach (sl, sortClause)
{
SortClause *sc = lfirst(sl);
TargetEntry *tle, *sub_tle;
tle = get_sortgroupclause_tle(sc, perc->sortTargets);
sub_tle = tlist_member((Node *) tle->expr, sub_tlist);
if (!sub_tle)
{
sub_tle = makeTargetEntry(tle->expr,
next_resno++,
tle->resname,
tle->resjunk);
sub_tlist = lappend(sub_tlist, sub_tle);
}
sc->tleSortGroupRef = assignSortGroupRef(sub_tle, sub_tlist);
tle->ressortgroupref = sc->tleSortGroupRef;
}
/* Find identical sortClause. */
for (i = 0; i < numsortlist && !equal(sortClause, sortlist[i]); i++);
if (i == numsortlist)
{
/* Not found, so add a new one. */
sortlist[numsortlist] = sortClause;
numsortlist++;
}
/*
* Add this percentile to the group associated with the sortClause.
*/
aggreflist[i] = lappend(aggreflist[i], perc);
}
/* Make sure group_context->sub_tlist has pointer to what we made */
group_context->sub_tlist = sub_tlist;
/*
* Make the scaffold. We always take best_path here because
* it is not clear which to use for upcoming complex plans.
*/
Assert(sub_tlist != NIL);
result_plan = create_plan(root, group_context->best_path);
result_plan = plan_pushdown_tlist(result_plan, sub_tlist);
Assert(result_plan->flow);
current_pathkeys = group_context->best_path->pathkeys;
/*
* numsortlist is actually the number of sub-plans.
*/
Insist(numsortlist > 0);
/*
* The approach is very close to the one for DQA. If the plan consists
* only one sort group, then construct straightforward plan without
* mangling target list. If more than one, including normal aggregate,
* we split each sort group (normal aggregate goes to NIL sort group) into
* sub pieces and construct separate plans, and join them to get the final
* plan to match the desired target list.
*/
if (numsortlist == 1)
{
/*
* Simply plan a tree and return it. We don't clutter the target list.
*/
result_plan = plan_within_agg_persort(root,
group_context,
*sortlist,
current_pathkeys,
result_plan);
}
else
{
/*
* The pattern on multi-level sort is similar to multi dqa.
* We use its infrastructure much to avoid reinventing wheel.
*/
List *base_plans;
MppGroupContext mgctx;
List *rtable;
base_plans = share_plan(root, result_plan, numsortlist);
Assert(numsortlist > 0);
MemSet(&mgctx, 0, sizeof(MppGroupContext));
/*
* XXX: Need support of DqaJoinMerge for types used in GROUP BY clause
* which are not mergejoinable.
*/
mgctx.join_strategy = numGroupCols > 0 ? DqaJoinHash : DqaJoinCross;
mgctx.numGroupCols = numGroupCols;
/*
* This code is from deconstruct_agg_info. What it does is to
* collect grouping keys and make a simple list which contain
* only those key expressions, which will be used in each individual
* plan tree as leading columns (and later JOIN clause).
*/
mgctx.grps_tlist = NIL;
for (i = 0; i < numGroupCols; i++)
{
TargetEntry *sub_tle, *prelim_tle;
sub_tle = get_tle_by_resno(sub_tlist, grpColIdx[i]);
prelim_tle = flatCopyTargetEntry(sub_tle);
prelim_tle->resno = list_length(mgctx.grps_tlist) + 1;
mgctx.grps_tlist = lappend(mgctx.grps_tlist, prelim_tle);
}
mgctx.dref_tlists = (List **) palloc0(numsortlist * sizeof(List *));
/*
* Within-aggregate special. Used in within_aggregate_expr().
* Each sub-plan tree is identified by the sort clause.
*/
mgctx.wagSortClauses = NIL;
for (i = 0; i < numsortlist; i++)
mgctx.wagSortClauses = lappend(mgctx.wagSortClauses, sortlist[i]);
/*
* Prepare the final tlist to restore the original list. The main work
* goes into deconstruct_within_agg(), which determins which sub-plan tree
* this expression is actually coming from, and store that information in mgctx.
*/
foreach (l, root->parse->targetList)
{
TargetEntry *tle, *final_tle;
tle = (TargetEntry *) lfirst(l);
final_tle = flatCopyTargetEntry(tle);
final_tle->expr =
(Expr *) deconstruct_within_agg((Node *) tle->expr, &mgctx);
mgctx.fin_tlist = lappend(mgctx.fin_tlist, final_tle);
}
/*
* HAVING clause, same as target list. We wish we could optimize this
* as pushing each expression down to the individual plan tree, but
* we don't do it and just follow the same notion of DQA for now.
*/
foreach (l, (List *) root->parse->havingQual)
{
Expr *qual, *fin_hqual;
qual = lfirst(l);
fin_hqual = (Expr *) deconstruct_within_agg((Node *) qual, &mgctx);
mgctx.fin_hqual = lappend(mgctx.fin_hqual, fin_hqual);
}
/*
* Offsets. Used in join_dqa_coplan
*/
mgctx.dqa_offsets = (int *) palloc(sizeof(int) * (numsortlist + 1));
mgctx.dqa_offsets[0] = numGroupCols;
for (i = 0; i < numsortlist; i++)
{
mgctx.dqa_offsets[i + 1] =
mgctx.dqa_offsets[i] + list_length(mgctx.dref_tlists[i]);
}
/*
* Now plan each tree. Store them to array and later join them.
* Don't forget to save rtable representing each subquery.
*/
rtable = NIL;
mgctx.dqaArgs = (DqaInfo *) palloc(numsortlist * sizeof(DqaInfo));
/* keep the original query */
for (i = 0; i < numsortlist; i++)
{
Plan *coplan;
Query *coquery;
char queryname[256];
PlannerInfo root_copy;
size_t sz;
/*
* The base plan is created by best_path.
*/
current_pathkeys = group_context->best_path->pathkeys;
/*
* We use different instance of PlannerInfo for each cycle
* especially cdb_grouping_planner frees simple_rel_array.
* See also plan_append_aggs_with_rewrite.
*/
memcpy(&root_copy, root, sizeof(PlannerInfo));
sz = root->simple_rel_array_size * sizeof(RelOptInfo *);
root_copy.simple_rel_array =
(RelOptInfo **) palloc(sz);
memcpy(root_copy.simple_rel_array, root->simple_rel_array, sz);
/*
* Query should be copied deeply for the planner changes it.
*/
coquery = copyObject(root->parse);
coquery->targetList = seq_tlist_concat(copyObject(mgctx.grps_tlist), mgctx.dref_tlists[i]);
/*
* Clear havingQual and scatterClause, since they will be handled only
* the top of joins, and never in individual aggregate.
*/
coquery->havingQual = NULL;
coquery->scatterClause = NIL;
root_copy.parse = coquery;
if (sortlist[i])
{
/*
* Run the specialized planner for aggregate expressions
* gathered by an identical ORDER BY clause.
*/
coplan = plan_within_agg_persort(&root_copy,
group_context,
sortlist[i],
current_pathkeys,
list_nth(base_plans, i));
}
else
{
/*
* Run normal grouping planner for normal aggs.
*/
GroupContext local_group_context;
memcpy(&local_group_context, group_context, sizeof(GroupContext));
local_group_context.subplan = list_nth(base_plans, i);
local_group_context.tlist = coquery->targetList;
/* These fields are not set in grouping_planner */
local_group_context.numGroupCols = numGroupCols;
local_group_context.groupColIdx = grpColIdx;
local_group_context.numDistinctCols = numDistinctCols;
local_group_context.distinctColIdx = distinctColIdx;
/*
* Make a multi-phase or simple aggregate plan.
*
* agg_counts contain only normal aggregate information
* (without within-aggs information), so it's safe to use it
* as it's passed here.
*/
coplan = make_parallel_or_sequential_agg(&root_copy,
agg_counts,
&local_group_context,
&current_pathkeys);
}
snprintf(queryname, sizeof(queryname), "wag_coplan_%d", i + 1);
mgctx.dqaArgs[i].coplan =
wrap_plan_index(&root_copy, coplan, coquery,
NULL, i + 1, queryname,
&coquery);
rtable = list_concat(rtable, coquery->rtable);
}
/* Begin with the first coplan, then join in each succeeding coplan. */
result_plan = mgctx.dqaArgs[0].coplan;
for (i = 1; i < numsortlist; i++)
result_plan = join_dqa_coplan(root, &mgctx, result_plan, i);
/* Prepare for finalize_split_expr. */
mgctx.top_tlist = result_plan->targetlist;
/* Now to set true. */
mgctx.use_dqa_pruning = true;
result_plan->targetlist = (List *) finalize_split_expr((Node *) mgctx.fin_tlist, &mgctx);
result_plan->qual = (List *) finalize_split_expr((Node *) mgctx.fin_hqual, &mgctx);
UpdateScatterClause(root->parse, result_plan->targetlist);
/*
* Reconstruct the flow since the targetlist for the result_plan may have
* changed.
*/
result_plan->flow = pull_up_Flow(result_plan,
result_plan->lefttree,
true);
/* Need to adjust root->parse for upper plan. */
root->parse->rtable = rtable;
root->parse->targetList = copyObject(result_plan->targetlist);
rebuild_simple_rel_and_rte(root);
}
/* Tell the upper planner that we changed the node */
*(group_context->pcurrent_pathkeys) = NIL;
*(group_context->querynode_changed) = true;
return result_plan;
}
Plan *add_motion_to_dqa_child(Plan *plan, PlannerInfo *root, bool *motion_added)
{
Plan *result = plan;
*motion_added = false;
List *pathkeys = make_pathkeys_for_groupclause(root->parse->groupClause, plan->targetlist);
CdbPathLocus locus = cdbpathlocus_from_flow(plan->flow);
if (CdbPathLocus_IsPartitioned(locus) && NIL != plan->flow->hashExpr)
{
locus = cdbpathlocus_from_exprs(root, plan->flow->hashExpr);
}
if (!cdbpathlocus_collocates(locus, pathkeys, true /*exact_match*/))
{
/* MPP-22413: join requires exact distribution match for collocation purposes,
* which may not be provided by the underlying group by, as computing the
* group by only requires relaxed distribution collocation
*/
List *groupExprs = get_sortgrouplist_exprs(root->parse->groupClause,
plan->targetlist);
result = (Plan *) make_motion_hash(root, plan, groupExprs);
result->total_cost += incremental_motion_cost(plan->plan_rows, plan->plan_rows);
*motion_added = true;
}
return result;
}