| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * cdbgroup.c |
| * Routines to aid in planning grouping queries for parallel |
| * execution. This is, essentially, an extension of the file |
| * optimizer/prep/planner.c, although some functions are not |
| * externalized. |
| * |
| * Portions Copyright (c) 2005-2008, Greenplum inc |
| * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include <limits.h> |
| |
| #include "catalog/pg_operator.h" |
| #include "catalog/pg_type.h" |
| #include "executor/executor.h" |
| #include "executor/nodeAgg.h" |
| #include "miscadmin.h" |
| #include "nodes/makefuncs.h" |
| #include "nodes/print.h" |
| #include "optimizer/clauses.h" |
| #include "optimizer/cost.h" |
| #include "optimizer/pathnode.h" |
| #include "optimizer/paths.h" |
| #include "optimizer/planmain.h" |
| #include "optimizer/planner.h" |
| #include "optimizer/planshare.h" |
| #include "optimizer/prep.h" |
| #include "optimizer/subselect.h" |
| #include "optimizer/tlist.h" |
| #include "optimizer/var.h" |
| #include "parser/parsetree.h" |
| #include "parser/parse_clause.h" |
| #include "parser/parse_coerce.h" |
| #include "parser/parse_expr.h" |
| #include "parser/parse_oper.h" |
| #include "parser/parse_relation.h" |
| #include "utils/lsyscache.h" |
| #include "utils/selfuncs.h" |
| #include "utils/syscache.h" |
| #include "catalog/pg_aggregate.h" |
| |
| #include "cdb/cdbllize.h" |
| #include "cdb/cdbpathtoplan.h" /* cdbpathtoplan_create_flow() */ |
| #include "cdb/cdbpath.h" |
| #include "cdb/cdbpullup.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/cdbhash.h" /* isGreenplumDbHashable() */ |
| |
| #include "cdb/cdbsetop.h" |
| #include "cdb/cdbgroup.h" |
| |
| extern void UpdateScatterClause(Query *query, List *newtlist); |
| |
| /* |
| * MppGroupPrep represents a strategy by which to precondition the |
| * argument to a parallel aggregation plan. |
| * |
| * MPP_GRP_PREP_NONE |
| * Use the input plan as is. |
| * MPP_GRP_PREP_HASH_GROUPS |
| * Redistribute the input to collocate groups. |
| * MPP_GRP_PREP_HASH_DISTINCT |
| * Redistribute the input on the sole distinct expression used as |
| * an aggregate argument. |
| * MPP_GRP_PREP_FOCUS_QE |
| * Focus a partitioned input on the utility QE. |
| * MPP_GRP_PREP_FOCUS_QD |
| * Focus a partitioned input on the QD. |
| * MPP_GRP_PREP_BROADCAST |
| * Broadcast to convert a partitioned input into a replicated one. |
| */ |
| typedef enum MppGroupPrep |
| { |
| MPP_GRP_PREP_NONE = 0, |
| MPP_GRP_PREP_HASH_GROUPS, |
| MPP_GRP_PREP_HASH_DISTINCT, |
| MPP_GRP_PREP_FOCUS_QE, |
| MPP_GRP_PREP_FOCUS_QD, |
| MPP_GRP_PREP_BROADCAST |
| } MppGroupPrep; |
| |
| /* |
| * MppGroupType represents a stategy by which to implement parallel |
| * aggregation on a possibly preconditioned input plan. |
| * |
| * MPP_GRP_TYPE_NONE |
| * No parallel plan found. |
| * MPP_GRP_TYPE_BASEPLAN |
| * Use the sequential plan as is. |
| * MPP_GRP_TYPE_PLAIN_2STAGE, |
| * Ungrouped, 2-stage aggregation. |
| * MPP_GRP_TYPE_GROUPED_2STAGE, |
| * Grouped, 2-stage aggregation. |
| * MPP_GRP_TYPE_PLAIN_DQA_2STAGE, |
| * Ungrouped, 2-stage aggregation. |
| * MPP_GRP_TYPE_GROUPED_DQA_2STAGE, |
| * Grouped, 2-stage aggregation. |
| * |
| * TODO: Add types for min-max optimization, when ready: |
| * |
| * MPP_GRP_TYPE_MINMAX |
| * Use the sequential min-max optimization plan as is. |
| * MPP_GRP_TYPE_MINMAX_2STAGE |
| * Use a 2-stage variant of min-max aggregation. |
| */ |
| typedef enum MppGroupType |
| { |
| MPP_GRP_TYPE_NONE = 0, |
| MPP_GRP_TYPE_BASEPLAN, |
| MPP_GRP_TYPE_PLAIN_2STAGE, |
| MPP_GRP_TYPE_GROUPED_2STAGE, |
| MPP_GRP_TYPE_PLAIN_DQA_2STAGE, |
| MPP_GRP_TYPE_GROUPED_DQA_2STAGE, |
| } MppGroupType; |
| |
| |
| /* Summary values detailing the post-Motion part of a coplan for |
| * three-phase aggreation. The code is as follows: |
| * S = Sort |
| * G = GroupAgg |
| * H = HashAgg |
| * P = PlainAgg |
| * |
| * So GSH means (GroupAgg (Sort (HashAgg X))). |
| */ |
| typedef enum DqaCoplanType |
| { |
| DQACOPLAN_GGS, |
| DQACOPLAN_GSH, |
| DQACOPLAN_SHH, |
| DQACOPLAN_HH, |
| DQACOPLAN_PGS, |
| DQACOPLAN_PH |
| } DqaCoplanType; |
| |
| typedef enum DqaJoinStrategy |
| { |
| DqaJoinUndefined = 0, |
| DqaJoinNone, /* No join required for solitary DQA argument. */ |
| DqaJoinCross, /* Scalar aggregation uses cross product. */ |
| DqaJoinHash, /* Hash join (possibly with subsequent sort) */ |
| DqaJoinMerge, /* Merge join */ |
| /* These last are abstract and will be replaced |
| * by DqaJoinHash aor DqaJoinMerge once planning |
| * is complete. |
| */ |
| DqaJoinSorted, /* Sorted output required. */ |
| DqaJoinCheapest, /* No sort requirement. */ |
| } DqaJoinStrategy; |
| |
| /* DQA coplan information */ |
| typedef struct DqaInfo |
| { |
| Node *distinctExpr; /* By reference from agg_counts for convenience. */ |
| AttrNumber base_index; /* Index of attribute in base plan targetlist */ |
| bool can_hash; |
| double num_rows; /* Estimated cardinality of grouping key, dqa arg */ |
| Plan *coplan; /* Coplan for this (later this and all prior) coplan */ |
| Query *parse; /* Plausible root->parse for the coplan. */ |
| bool distinctkey_collocate; /* Whether the input plan collocates on this |
| * distinct key */ |
| |
| /* These fields are for costing and planning. Before constructing |
| * the coplan for this DQA argument, determine cheapest way to get |
| * the answer and cheapest way to get the answer in grouping key |
| * order. |
| */ |
| bool use_hashed_preliminary; |
| Cost cost_sorted; |
| DqaCoplanType coplan_type_sorted; |
| Cost cost_cheapest; |
| DqaCoplanType coplan_type_cheapest; |
| } DqaInfo; |
| |
| /* Information about the overall plan for a one-, two- or one coplan of |
| * a three-phase aggregation. |
| */ |
| typedef struct AggPlanInfo |
| { |
| /* |
| * The input is either represented as a Path or a Plan and a Path. |
| * If input_plan is given, use this plan instead of creating one |
| * through input_path. |
| * */ |
| Path *input_path; |
| Plan *input_plan; |
| |
| /* These are the ordinary fields characterizing an aggregation */ |
| CdbPathLocus input_locus; |
| MppGroupPrep group_prep; |
| MppGroupType group_type; |
| CdbPathLocus output_locus; |
| bool distinctkey_collocate; /* Whether the input plan collocates on the |
| * distinct key */ |
| |
| /* These are extra for 3-phase plans */ |
| DqaJoinStrategy join_strategy; |
| bool use_sharing; |
| |
| /* These summarize the status of the structure's cost value. */ |
| bool valid; |
| Cost plan_cost; |
| } AggPlanInfo; |
| |
| typedef struct MppGroupContext |
| { |
| MppGroupPrep prep; |
| MppGroupType type; |
| |
| List *tlist; /* The preprocessed targetlist of the original query. */ |
| Node *havingQual; /* The proprocessed having qual of the original query. */ |
| Path *best_path; |
| Path *cheapest_path; |
| Plan *subplan; |
| AggClauseCounts *agg_counts; |
| double tuple_fraction; |
| double *p_dNumGroups; /* Group count estimate shared up the call tree. */ |
| CanonicalGroupingSets *canonical_grpsets; |
| int64 grouping; /* the GROUPING value */ |
| bool is_grpext; /* identify if this is a grouping extension query */ |
| |
| List *sub_tlist; /* Derived (in cdb_grouping_planner) input targetlist. */ |
| int numGroupCols; |
| AttrNumber *groupColIdx; |
| int numDistinctCols; |
| AttrNumber *distinctColIdx; |
| DqaInfo *dqaArgs; |
| bool use_hashed_grouping; |
| CdbPathLocus input_locus; |
| CdbPathLocus output_locus; |
| /* Indicate whether the input plan collocates on the distinct key if any. |
| * It is used for one or two-phase aggregation. For three-phase aggregation, |
| * distinctkey_collocate inside DqaInfo is used. |
| */ |
| bool distinctkey_collocate; |
| List *current_pathkeys; |
| |
| /* Indicate if root->parse has been changed during planning. Carry in pointer |
| * to root for miscellaneous globals. |
| */ |
| bool querynode_changed; |
| PlannerInfo *root; |
| |
| /* Work space for aggregate/tlist deconstruction and reconstruction */ |
| Index final_varno; /* input */ |
| bool use_irefs_tlist; /* input */ |
| bool use_dqa_pruning; /* input */ |
| List *prefs_tlist; /* Aggref attributes for prelim_tlist */ |
| List *irefs_tlist; /* Aggref attributes for optional inter_tlist */ |
| List *frefs_tlist; /* Aggref attributes for optional join tlists */ |
| List *dqa_tlist; /* DQA argument attributes for prelim_tlist */ |
| List **dref_tlists; /* Array of DQA Aggref tlists (dqa_tlist order) */ |
| List *grps_tlist; /* Grouping attributes for prelim_tlist */ |
| List *fin_tlist; /* Final tlist cache. */ |
| List *fin_hqual; /* Final having qual cache. */ |
| Index split_aggref_sortgroupref; /* for TargetEntrys made in split_aggref */ |
| Index outer_varno; /* work */ |
| Index inner_varno; /* work */ |
| int *dqa_offsets; /* work */ |
| List *top_tlist; /* work - the target list to finalize */ |
| |
| /* 3-phase DQA decisions */ |
| DqaJoinStrategy join_strategy; |
| bool use_sharing; |
| |
| List *wagSortClauses; /* List of List; within-agg multi sort level */ |
| } MppGroupContext; |
| |
| /* Constants for aggregation approaches. |
| */ |
| #define AGG_NONE 0x00 |
| |
| #define AGG_1PHASE 0x01 |
| #define AGG_2PHASE 0x02 |
| #define AGG_2PHASE_DQA 0x04 |
| #define AGG_3PHASE 0x08 |
| |
| #define AGG_SINGLEPHASE (AGG_1PHASE) |
| #define AGG_MULTIPHASE (AGG_2PHASE | AGG_2PHASE_DQA | AGG_3PHASE) |
| |
| #define AGG_ALL (AGG_SINGLEPHASE | AGG_MULTIPHASE) |
| |
| /* Constants for DQA pruning: |
| */ |
| static const Index grp_varno = 1; /* var refers to grps_tlist */ |
| static const Index ref_varno = 2; /* var refers to prefs_tlist or relatives */ |
| static const Index dqa_base_varno = 3; /* refers to one of the dref_tlists */ |
| |
| /* Coefficients for cost calculation adjustments: These are candidate GUCs |
| * or, perhaps, replacements for the gp_eager_... series. We wouldn't |
| * need these if our statistics and cost calculations were correct, but |
| * as of 3.2, they not. |
| * |
| * Early testing suggested that (1.0, 0.45, 1.7) was about right, but the |
| * risk of introducing skew in the initial redistribution of a 1-phase plan |
| * is great (especially given the 3.2 tendency to way underestimate the |
| * cardinality of joins), so we penalize 1-phase and normalize to the |
| * 2-phase cost (approximately). |
| */ |
| static const double gp_coefficient_1phase_agg = 20.0; /* penalty */ |
| static const double gp_coefficient_2phase_agg = 1.0; /* normalized */ |
| static const double gp_coefficient_3phase_agg = 3.3; /* increase systematic under estimate */ |
| |
| /* Forward declarations */ |
| |
| static Plan * make_one_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx); |
| static Plan * make_two_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx); |
| static Plan * make_three_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx); |
| static Plan * make_plan_for_one_dqa(PlannerInfo *root, MppGroupContext *ctx, |
| int dqa_index, Plan* result_plan, |
| Query** coquery_p); |
| static Plan * join_dqa_coplan(PlannerInfo *root, MppGroupContext *ctx, Plan *plan, int dqa_index); |
| static int compareDqas(const void *larg, const void *rarg); |
| static void planDqaJoinOrder(PlannerInfo *root, MppGroupContext *ctx, |
| double input_rows); |
| static List *make_subplan_tlist(List *tlist, Node *havingQual, |
| List *grp_clauses, int *pnum_gkeys, AttrNumber **pcols_gkeys, |
| List *dqa_args, int *pnum_dqas, AttrNumber **pcols_dqas); |
| static List *describe_subplan_tlist(List *sub_tlist, |
| List *tlist, Node *havingQual, |
| List *grp_clauses, int *pnum_gkeys, AttrNumber **pcols_gkeys, |
| List *dqa_args, int *pnum_dqas, AttrNumber **pcols_dqas); |
| static void generate_multi_stage_tlists(MppGroupContext* ctx, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist, |
| List **p_final_qual); |
| static void prepare_dqa_pruning_tlists(MppGroupContext *ctx); |
| static void generate_dqa_pruning_tlists(MppGroupContext *ctx, |
| int dqa_index, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist, |
| List **p_final_qual); |
| static void deconstruct_agg_info(MppGroupContext *ctx); |
| static void reconstruct_agg_info(MppGroupContext *ctx, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist, |
| List **p_final_qual); |
| static void reconstruct_coplan_info(MppGroupContext *ctx, |
| int dqa_index, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist); |
| static Expr *deconstruct_expr(Expr *expr, MppGroupContext *ctx); |
| static Node* deconstruct_expr_mutator(Node *node, MppGroupContext *ctx); |
| static Node *split_aggref(Aggref *aggref, MppGroupContext *ctx); |
| static List *make_vars_tlist(List *tlist, Index varno, AttrNumber offset); |
| static Plan* add_subqueryscan(PlannerInfo* root, List **p_pathkeys, |
| Index varno, Query *subquery, Plan *subplan); |
| static List *seq_tlist_concat(List *tlist1, List *tlist2); |
| static Node *finalize_split_expr(Node *expr, MppGroupContext *ctx); |
| static Node* finalize_split_expr_mutator(Node *node, MppGroupContext *ctx); |
| static Oid lookup_agg_transtype(Aggref *aggref); |
| static bool hash_safe_type(Oid type); |
| static bool sorting_prefixes_grouping(PlannerInfo *root); |
| static bool gp_hash_safe_grouping(PlannerInfo *root); |
| |
| static Cost cost_common_agg(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info, Plan *dummy); |
| static Cost cost_1phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info); |
| static Cost cost_2phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info); |
| static Cost cost_3phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info); |
| static void set_cost_of_join_strategies(MppGroupContext *ctx, Cost *hashjoin_cost, Cost *mergejoin_cost); |
| static void initAggPlanInfo(AggPlanInfo *info, Path *input_path, Plan *input_plan); |
| static void set_coplan_strategies(PlannerInfo *root, MppGroupContext *ctx, DqaInfo *dqaArg, Path *input); |
| static Cost incremental_sort_cost(double rows, int width, int numKeyCols); |
| static Cost incremental_agg_cost(double rows, int width, AggStrategy strategy, |
| int numGroupCols, double numGroups, |
| int numAggs, int transSpace); |
| static Cost incremental_motion_cost(double sendrows, double recvrows); |
| |
| /*--------------------------------------------- |
| * WITHIN/Percentile stuff |
| *---------------------------------------------*/ |
| |
| /* |
| * WithinAggContext is a variable set used throughout plan_within_agg_persort(). |
| */ |
| typedef struct |
| { |
| bool use_deduplicate; /* true to choose deduplicate strategy */ |
| AttrNumber pc_pos; /* resno for peer count in outer tlist */ |
| AttrNumber tc_pos; /* resno for total count in inner tlist */ |
| List *current_pathkeys; /* pathkeys tracking */ |
| List *inner_pathkeys; /* pathkeys for inner plan */ |
| List *rtable; /* outer/inner RTE of the output */ |
| } WithinAggContext; |
| |
| static bool choose_deduplicate(PlannerInfo *root, List *sortExprs, |
| Plan *input_plan, double *numGroups); |
| static Plan *wrap_plan_index(PlannerInfo *root, Plan *plan, Query *query, |
| List **p_pathkeys, Index varno, const char *alias_name, Query **query_p); |
| static void rebuild_simple_rel_and_rte(PlannerInfo *root); |
| static Plan *make_parallel_or_sequential_agg(PlannerInfo *root, |
| AggClauseCounts *agg_count, GroupContext *group_context, |
| List **current_pathkeys_p); |
| static Node *deconstruct_within_agg(Node *node, MppGroupContext *ctx); |
| static Node *deconstruct_within_agg_mutator(Node *node, MppGroupContext *ctx); |
| static List *fetch_percentiles(Query *parse, List *sortClause); |
| static Plan *make_deduplicate_plan(PlannerInfo *root, GroupContext *group_context, |
| List *groupClause, List *sortClause, double numGroups, |
| AttrNumber *pc_pos_p, List **current_pathkeys_p, Plan *subplan); |
| static Plan *within_agg_make_baseplan(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| List *sortClause, |
| Plan *result_plan); |
| static Plan *within_agg_add_outer_sort(PlannerInfo *root, |
| WithinAggContext *wag_context, |
| List *sortClause, |
| Plan *outer_plan); |
| static Plan *within_agg_construct_inner(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| Plan *inner_plan); |
| static Plan *within_agg_join_plans(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| Plan *outer_plan, |
| Plan *inner_plan); |
| static Plan *within_agg_final_agg(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| List *sortClause, |
| Plan *result_plan); |
| static Plan *plan_within_agg_persort(PlannerInfo *root, GroupContext *group_context, |
| List *sortClause, List *current_pathkeys, Plan *base_plan); |
| |
| /* |
| * add_motion_to_dqa_plan |
| * Add a Redistribute motion to a dqa child plan if the plan is not already |
| * distributed on the grouping columns |
| */ |
| static Plan *add_motion_to_dqa_child(Plan *plan, PlannerInfo *root, bool *motion_added); |
| |
| /* |
| * Function: cdb_grouping_planner |
| * |
| * This is basically an extension of the function grouping_planner() from |
| * planner.c. It (conditionally) replaces the logic in the "normal case" |
| * (the aggregation/grouping branch) of the main "if" statement. |
| * |
| * The result is a Plan for one-, two-, or three-phase grouping/aggregation |
| * (possibly including a top-level join in case of DQA pruning) or NULL. |
| * |
| * A NULL result means that the ordinary sequential planner will produce |
| * a correct (and preferred) result, so the "normal case" code should be |
| * used. |
| * |
| * A non-NULL result is taken as a partially formed Plan to be processed |
| * by the trailing sort/distinct/limit logic of grouping_planner(). In |
| * otherwords, after any associated changes to the local environment (see |
| * the calling code), the resulting plan should be treated as if from the |
| * "normal case" or the function optimize_minmax_aggregates(). |
| * |
| * The pointer at location pcurrent_pathkeys is updated to refer to the |
| * ordering pathkey or NIL, if none. The parse tree at root->parse may |
| * be modified in place to reflect changes in the context (e.g. current |
| * range table). |
| */ |
| |
| Plan * |
| cdb_grouping_planner(PlannerInfo* root, |
| AggClauseCounts *agg_counts, |
| GroupContext *group_context) |
| { |
| MppGroupContext ctx; |
| Plan * result_plan = NULL; |
| List * sub_tlist = NIL; |
| bool has_groups = root->parse->groupClause != NIL; |
| bool has_aggs = agg_counts->numAggs > 0; |
| bool has_ordered_aggs = list_length(agg_counts->aggOrder) > 0; |
| ListCell *lc; |
| |
| bool is_grpext = false; |
| unsigned char consider_agg = AGG_NONE; |
| AggPlanInfo plan_1p; |
| AggPlanInfo plan_2p; |
| AggPlanInfo plan_3p; |
| AggPlanInfo *plan_info = NULL; |
| |
| Assert( !has_groups || root->group_pathkeys != NULL ); |
| |
| memset(&ctx, 0, sizeof(ctx)); |
| |
| *(group_context->querynode_changed) = false; |
| |
| /* We always use sequential plans for distinct-qualified rollup queries, |
| * so don't waste time working on alternatives. |
| */ |
| is_grpext = is_grouping_extension(group_context->canonical_grpsets); |
| if ( is_grpext && agg_counts->numDistinctAggs > 0) |
| return NULL; |
| |
| /* |
| * First choose a one-stage plan. Since there's always a way to do this, |
| * it serves as our default choice. |
| */ |
| if (group_context->subplan == NULL) |
| { |
| Path *input_path = group_context->cheapest_path; |
| |
| /* Should we prefer the "best" path? Only for vector aggregation |
| * of input already sorted and collocated on the grouping key. |
| */ |
| if ( has_groups && |
| pathkeys_contained_in(root->group_pathkeys, group_context->best_path->pathkeys) && |
| cdbpathlocus_collocates(group_context->best_path->locus, root->group_pathkeys, false /*exact_match*/) ) |
| { |
| input_path = group_context->best_path; |
| } |
| |
| initAggPlanInfo(&plan_1p, input_path, group_context->subplan); |
| } |
| |
| else |
| { |
| initAggPlanInfo(&plan_1p, group_context->best_path, group_context->subplan); |
| plan_1p.input_locus = group_context->best_path->locus; |
| } |
| |
| if ( ! CdbPathLocus_IsPartitioned(plan_1p.input_locus) ) |
| { |
| /* Can use base plan with no motion yielding same locus. */ |
| plan_1p.group_prep = MPP_GRP_PREP_NONE; |
| plan_1p.output_locus = plan_1p.input_locus; |
| plan_1p.distinctkey_collocate = true; |
| } |
| else if ( has_groups ) /* and not single or replicated */ |
| { |
| if (root->group_pathkeys != NULL && |
| cdbpathlocus_collocates(plan_1p.input_locus, root->group_pathkeys, false /*exact_match*/) ) |
| { |
| plan_1p.group_prep = MPP_GRP_PREP_NONE; |
| plan_1p.output_locus = plan_1p.input_locus; /* may be less discriminating that group locus */ |
| plan_1p.distinctkey_collocate = true; |
| } |
| else |
| { |
| if (gp_hash_safe_grouping(root)) |
| { |
| plan_1p.group_prep = MPP_GRP_PREP_HASH_GROUPS; |
| CdbPathLocus_MakeHashed(&plan_1p.output_locus, root->group_pathkeys); |
| } |
| else |
| { |
| plan_1p.group_prep = MPP_GRP_PREP_FOCUS_QE; |
| CdbPathLocus_MakeSingleQE(&plan_1p.output_locus); |
| } |
| } |
| } |
| else if ( has_aggs ) /* and not grouped and not single or replicated */ |
| { |
| plan_1p.group_prep = MPP_GRP_PREP_FOCUS_QE; |
| CdbPathLocus_MakeSingleQE(&plan_1p.output_locus); |
| } |
| |
| /* |
| * If the sequential planner can handle the situation with no Motion |
| * involved, let it do so. Don't bother to investigate the 2-stage |
| * approach. |
| * |
| * If the GUC enable_groupagg is set to off and this is a DQA |
| * query, we won't use the sequential plan. This is because |
| * the sequential plan for a DQA query always uses GroupAgg. |
| */ |
| if ( plan_1p.group_prep == MPP_GRP_PREP_NONE ) |
| { |
| if (enable_groupagg || agg_counts->numDistinctAggs == 0) |
| { |
| *(group_context->pcurrent_pathkeys) = NIL; |
| return NULL; |
| } |
| } |
| |
| /* |
| * When an input plan is given, use it, including its target |
| * list. When an input target list (and no plan) is given, |
| * use it for the plan to be created. When neither is given, |
| * generate a phase 1 target list for the plan to be created. |
| * Also note the location of any grouping attributes in the |
| * target list (numGroupCols, groupColIdx). |
| * |
| * Also make sure there's a target entry with a non-zero |
| * sortgroupref for each DQA argument and note the location |
| * of the attributes (numDistinctCols, distinctColIdx). |
| */ |
| if ( group_context->subplan != NULL) |
| { |
| sub_tlist = group_context->subplan->targetlist; |
| } |
| else if ( group_context->sub_tlist != NULL ) |
| { |
| sub_tlist = group_context->sub_tlist; |
| sub_tlist = describe_subplan_tlist(sub_tlist, |
| group_context->tlist, |
| root->parse->havingQual, |
| root->parse->groupClause, |
| &(group_context->numGroupCols), |
| &(group_context->groupColIdx), |
| agg_counts->dqaArgs, |
| &(group_context->numDistinctCols), |
| &(group_context->distinctColIdx)); |
| } |
| else |
| { |
| sub_tlist = make_subplan_tlist(group_context->tlist, |
| root->parse->havingQual, |
| root->parse->groupClause, |
| &(group_context->numGroupCols), |
| &(group_context->groupColIdx), |
| agg_counts->dqaArgs, |
| &(group_context->numDistinctCols), |
| &(group_context->distinctColIdx)); |
| |
| /* Where we need to and we can, add column names to the sub_tlist |
| * entries to make EXPLAIN output look nice. Note that we could dig |
| * further than this (if we come up empty handed) by probing the range |
| * table (root->parse->rtable), but this covers the ordinary cases. |
| */ |
| foreach(lc, sub_tlist) |
| { |
| TargetEntry *tle = (TargetEntry*)lfirst(lc); |
| |
| /* |
| * the first tlist whose expression is equal() to the expression of tle |
| * may not be the desired one. We should also check if ressortgroupref is |
| * same if ressortgroupref > 0 |
| */ |
| if ( IsA(tle->expr, Var) && tle->resname == NULL ) |
| { |
| TargetEntry *vartle = |
| tlist_member_with_ressortgroupref((Node*)tle->expr, |
| group_context->tlist, tle->ressortgroupref); |
| if ( vartle != NULL && vartle->resname != NULL ) |
| tle->resname = pstrdup(vartle->resname); |
| } |
| } |
| } |
| |
| /* At this point, we're committed to producing a one- , two- or |
| * three-stage plan with motion. Determine what aggregation approaches to explore. |
| * Per MPP-2378, we don't insist on has_aggs for multi-phase |
| * plans. |
| */ |
| { |
| unsigned char allowed_agg; |
| unsigned char possible_agg; |
| |
| allowed_agg = AGG_ALL; |
| |
| if ( ! root->config->gp_enable_multiphase_agg ) |
| allowed_agg &= AGG_SINGLEPHASE; |
| |
| /* This prohibition could be relaxed if we tracked missing |
| * preliminary functions per DQA and were willing to plan |
| * some DQAs as single and some as multiple phases. Not |
| * currently, however. |
| */ |
| if ( agg_counts->missing_prelimfunc ) |
| allowed_agg &= ~ AGG_MULTIPHASE; |
| |
| /* |
| * Ordered aggregates need to run the transition function on the |
| * values in sorted order, which in turn translates into single |
| * phase aggregation. |
| */ |
| if ( has_ordered_aggs ) |
| allowed_agg &= ~ AGG_MULTIPHASE; |
| |
| /* We are currently unwilling to redistribute a gathered |
| * intermediate across the cluster. This might change |
| * one day. |
| */ |
| if ( ! CdbPathLocus_IsPartitioned(plan_1p.input_locus ) ) |
| allowed_agg &= AGG_SINGLEPHASE; |
| |
| |
| if ( ! root->config->gp_enable_agg_distinct ) |
| allowed_agg &= ~ AGG_2PHASE_DQA; |
| |
| if ( ! root->config->gp_enable_dqa_pruning ) |
| allowed_agg &= ~ AGG_3PHASE; |
| |
| possible_agg = AGG_SINGLEPHASE; |
| |
| if(gp_hash_safe_grouping(root)) |
| { |
| switch ( list_length(agg_counts->dqaArgs) ) |
| { |
| case 0: |
| possible_agg |= AGG_2PHASE; |
| break; |
| case 1: |
| possible_agg |= AGG_2PHASE_DQA | AGG_3PHASE; |
| break; |
| default: /* > 1 */ |
| possible_agg |= AGG_3PHASE; |
| break; |
| } |
| } |
| if ( is_grpext ) |
| possible_agg &= ~ (AGG_2PHASE_DQA | AGG_3PHASE); |
| |
| consider_agg = allowed_agg & possible_agg; |
| } |
| Assert( consider_agg & AGG_1PHASE ); /* Always possible! */ |
| |
| if ( consider_agg & ( AGG_2PHASE | AGG_2PHASE_DQA ) ) |
| { |
| /* XXX initAggPlanInfo(&plan_2p, group_context->cheapest_path); */ |
| initAggPlanInfo(&plan_2p, group_context->best_path, |
| group_context->subplan); /* but why? */ |
| |
| /* Common 2-phase setup. */ |
| if ( has_groups ) |
| { |
| plan_2p.group_type = MPP_GRP_TYPE_GROUPED_2STAGE; |
| CdbPathLocus_MakeHashed(&plan_2p.output_locus, root->group_pathkeys); |
| } |
| else |
| { |
| plan_2p.group_type = MPP_GRP_TYPE_PLAIN_2STAGE; |
| CdbPathLocus_MakeSingleQE(&plan_2p.output_locus); |
| } |
| |
| if ( consider_agg & AGG_2PHASE_DQA ) |
| { |
| List *distinct_pathkeys; |
| |
| /* Either have DQA or not! */ |
| Assert(! (consider_agg & AGG_2PHASE) ); |
| |
| Insist( IsA(agg_counts->dqaArgs, List) && |
| list_length((List*)agg_counts->dqaArgs) == 1 ); |
| distinct_pathkeys = cdb_make_pathkey_for_expr(root, |
| linitial(agg_counts->dqaArgs), |
| list_make1(makeString("="))); |
| distinct_pathkeys = list_make1(distinct_pathkeys); |
| |
| if ( ! cdbpathlocus_collocates(plan_2p.input_locus, distinct_pathkeys, false /*exact_match*/)) |
| { |
| plan_2p.group_prep = MPP_GRP_PREP_HASH_DISTINCT; |
| CdbPathLocus_MakeHashed(&plan_2p.input_locus, distinct_pathkeys); |
| } |
| else |
| { |
| plan_2p.group_prep = MPP_GRP_PREP_HASH_DISTINCT; |
| plan_2p.output_locus = plan_2p.input_locus; |
| plan_2p.distinctkey_collocate = true; |
| } |
| } |
| } |
| |
| if ( consider_agg & AGG_3PHASE ) |
| { |
| initAggPlanInfo(&plan_3p, group_context->cheapest_path, |
| group_context->subplan); |
| |
| if ( has_groups ) |
| { |
| plan_3p.group_type = MPP_GRP_TYPE_GROUPED_DQA_2STAGE; |
| CdbPathLocus_MakeHashed(&plan_3p.output_locus, root->group_pathkeys); |
| } |
| else |
| { |
| plan_3p.group_type = MPP_GRP_TYPE_PLAIN_DQA_2STAGE; |
| CdbPathLocus_MakeSingleQE(&plan_3p.output_locus); |
| } |
| } |
| |
| /* |
| * Package up byproducts for the actual planner. |
| */ |
| ctx.prep = plan_1p.group_prep; |
| ctx.type = plan_1p.group_type; |
| ctx.tlist = group_context->tlist; |
| ctx.havingQual = root->parse->havingQual; |
| ctx.sub_tlist = sub_tlist; |
| ctx.numGroupCols = group_context->numGroupCols; |
| ctx.groupColIdx = group_context->groupColIdx; |
| ctx.numDistinctCols = group_context->numDistinctCols; |
| ctx.distinctColIdx = group_context->distinctColIdx; |
| ctx.use_hashed_grouping = group_context->use_hashed_grouping; |
| ctx.best_path = group_context->best_path; |
| ctx.cheapest_path = group_context->cheapest_path; |
| ctx.subplan = group_context->subplan; |
| ctx.input_locus = plan_1p.input_locus; |
| ctx.output_locus = plan_1p.output_locus; |
| ctx.distinctkey_collocate = plan_1p.distinctkey_collocate; |
| ctx.agg_counts = agg_counts; |
| ctx.tuple_fraction = group_context->tuple_fraction; |
| ctx.p_dNumGroups = group_context->p_dNumGroups; |
| ctx.canonical_grpsets = group_context->canonical_grpsets; |
| ctx.grouping = group_context->grouping; |
| ctx.is_grpext = is_grpext; |
| ctx.current_pathkeys = NIL; /* Initialize to be tidy. */ |
| ctx.querynode_changed = false; |
| ctx.root = root; |
| |
| /* If we're to consider 3-phase plans, do some preparation. |
| */ |
| if ( ctx.numDistinctCols > 0 && (consider_agg & AGG_3PHASE) ) |
| { |
| int i; |
| |
| /* Collect row count estimates and other info for the partial |
| * results of grouping over combined grouping and distinct (DQA) |
| * keys. Order the output array of DqaInfo structures (in the |
| * context) according to how they should be joined. |
| */ |
| planDqaJoinOrder(root, &ctx, plan_3p.input_path->parent->rows); |
| |
| /* Plan the post-Motion portions of each coplan in two ways: one to |
| * produce the result in the cheapest way and one to produce the |
| * result ordered by the grouping key in the cheapest way. (For use |
| * by make_plan_for_one_dqa called by make_three_stage_agg_plan.) |
| */ |
| for ( i = 0; i < ctx.numDistinctCols; i++ ) |
| { |
| List *distinct_pathkeys; |
| |
| set_coplan_strategies(root, &ctx, &ctx.dqaArgs[i], plan_3p.input_path); |
| |
| /* Determine if the input plan already collocates on the distinct |
| * key. |
| */ |
| distinct_pathkeys = cdb_make_pathkey_for_expr(root, |
| ctx.dqaArgs[i].distinctExpr, |
| list_make1(makeString("="))); |
| distinct_pathkeys = list_make1(distinct_pathkeys); |
| |
| if (cdbpathlocus_collocates(plan_3p.input_locus, distinct_pathkeys, false /*exact_match*/)) |
| { |
| ctx.dqaArgs[i].distinctkey_collocate = true; |
| } |
| |
| list_free(distinct_pathkeys); |
| } |
| } |
| |
| |
| plan_info = NULL; /* Most cost-effective, feasible plan. */ |
| |
| if ( consider_agg & AGG_1PHASE ) |
| { |
| cost_1phase_aggregation(root, &ctx, &plan_1p); |
| if ( gp_dev_notice_agg_cost ) |
| elog(NOTICE, "1-phase cost: %.6f", plan_1p.plan_cost); |
| if ( plan_info == NULL || plan_info->plan_cost > plan_1p.plan_cost ) |
| plan_info = &plan_1p; |
| } |
| |
| if ( consider_agg & ( AGG_2PHASE | AGG_2PHASE_DQA ) ) |
| { |
| cost_2phase_aggregation(root, &ctx, &plan_2p); |
| if ( gp_dev_notice_agg_cost ) |
| elog(NOTICE, "2-phase cost: %.6f", plan_2p.plan_cost); |
| if ( plan_info == NULL || plan_info->plan_cost > plan_2p.plan_cost ) |
| plan_info = &plan_2p; |
| } |
| |
| if ( consider_agg & AGG_3PHASE ) |
| { |
| cost_3phase_aggregation(root, &ctx, &plan_3p); |
| if ( gp_dev_notice_agg_cost ) |
| elog(NOTICE, "3-phase cost: %.6f", plan_3p.plan_cost); |
| if ( plan_info == NULL || !enable_groupagg || plan_info->plan_cost > plan_3p.plan_cost ) |
| plan_info = &plan_3p; |
| } |
| |
| Insist( plan_info != NULL ); |
| |
| ctx.prep = plan_info->group_prep; |
| ctx.type = plan_info->group_type; |
| ctx.input_locus = plan_info->input_locus; |
| ctx.output_locus = plan_info->output_locus; |
| ctx.distinctkey_collocate = plan_info->distinctkey_collocate; |
| ctx.join_strategy = plan_info->join_strategy; |
| ctx.use_sharing = plan_info->use_sharing; |
| |
| |
| /* Call appropriate planner. */ |
| if (ctx.type == MPP_GRP_TYPE_BASEPLAN) |
| { |
| if (ctx.prep != MPP_GRP_PREP_NONE) |
| result_plan = make_one_stage_agg_plan(root, &ctx); |
| else |
| result_plan = NULL; /* allow sequential planner to do the work. */ |
| } |
| else if (ctx.type == MPP_GRP_TYPE_PLAIN_2STAGE || |
| ctx.type == MPP_GRP_TYPE_GROUPED_2STAGE) |
| result_plan = make_two_stage_agg_plan(root, &ctx); |
| else if (ctx.type == MPP_GRP_TYPE_PLAIN_DQA_2STAGE || |
| ctx.type == MPP_GRP_TYPE_GROUPED_DQA_2STAGE) |
| result_plan = make_three_stage_agg_plan(root, &ctx); |
| else |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("no parallel plan for aggregation"))); |
| |
| if (!is_grpext && result_plan != NULL && |
| IsA(result_plan, Agg)) |
| ((Agg *)result_plan)->lastAgg = true; |
| |
| *(group_context->querynode_changed) = ctx.querynode_changed; |
| *(group_context->pcurrent_pathkeys) = ctx.current_pathkeys; |
| return result_plan; |
| } |
| |
| /* |
| * Function make_one_stage_agg_plan |
| * |
| * Construct a one-stage aggregation plan by redistributing the result |
| * of the input plan appropriately |
| */ |
| static Plan * |
| make_one_stage_agg_plan(PlannerInfo *root, |
| MppGroupContext *ctx) |
| { |
| Query *parse = root->parse; |
| List *tlist = ctx->tlist; |
| List *sub_tlist = ctx->sub_tlist; |
| int numGroupCols = ctx->numGroupCols; |
| AttrNumber *groupColIdx = ctx->groupColIdx; |
| Path *best_path = ctx->best_path; |
| Path *cheapest_path = ctx->cheapest_path; |
| Path *path = NULL; |
| bool use_hashed_grouping = ctx->use_hashed_grouping; |
| long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 : |
| (*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX : |
| (long)*(ctx->p_dNumGroups); |
| |
| List *groupExprs = NIL; |
| List *current_pathkeys; |
| QualCost tlist_cost; |
| int i; |
| |
| Plan *result_plan; |
| bool is_sorted; |
| |
| /* |
| * The argument to the "lower" Agg node will use a "flattened" tlist |
| * (having just the (levelsup==0) vars mentioned in the SELECT targetlist |
| * and HAVING qual plus entries for any GROUP BY expressions that are not |
| * simple Vars. This is the same sub_tlist as that used for 1-stage |
| * aggregation in grouping_planner. |
| */ |
| |
| /* Create the base plan. If the best path is in grouping key order and |
| * we don't plan to move it around and this is a vector aggregation, we |
| * should use best path. In other cases, however, use cheapest. |
| */ |
| if (ctx->subplan == NULL) |
| { |
| is_sorted = pathkeys_contained_in(root->group_pathkeys, best_path->pathkeys); |
| path = cheapest_path; |
| if ( is_sorted && ctx->prep == MPP_GRP_PREP_NONE && numGroupCols > 0 ) |
| path = best_path; |
| result_plan = create_plan(root, path); |
| current_pathkeys = path->pathkeys; |
| |
| /* Instead of the flat target list produced above, use the sub_tlist |
| * constructed in cdb_grouping_planner. Add a Result node if the |
| * base plan can't project. (This may be unnecessary, but, if so, |
| * the Result node will be removed later.) |
| */ |
| result_plan = plan_pushdown_tlist(result_plan, sub_tlist); |
| |
| Assert(result_plan->flow); |
| |
| /* Account for the cost of evaluation of the sub_tlist. */ |
| cost_qual_eval(&tlist_cost, sub_tlist, root); |
| result_plan->startup_cost += tlist_cost.startup; |
| result_plan->total_cost += |
| tlist_cost.startup + |
| tlist_cost.per_tuple * result_plan->plan_rows; |
| } |
| else |
| { |
| result_plan = ctx->subplan; |
| current_pathkeys = ctx->current_pathkeys; |
| } |
| |
| /* Precondition the input by adjusting its locus prior to adding |
| * the Agg or Group node to the base plan, if needed. |
| */ |
| switch ( ctx->prep ) |
| { |
| case MPP_GRP_PREP_NONE: |
| break; |
| |
| case MPP_GRP_PREP_HASH_GROUPS: |
| Assert(numGroupCols > 0); |
| for ( i = 0; i < numGroupCols; i++) |
| { |
| TargetEntry *tle = get_tle_by_resno(sub_tlist, groupColIdx[i]); |
| groupExprs = lappend(groupExprs, copyObject(tle->expr)); |
| } |
| |
| result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows); |
| current_pathkeys = NIL; /* No longer sorted. */ |
| break; |
| |
| case MPP_GRP_PREP_FOCUS_QE: |
| result_plan = (Plan*)make_motion_gather_to_QE(result_plan, (current_pathkeys != NIL)); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows * root->config->cdbpath_segments); |
| break; |
| |
| case MPP_GRP_PREP_FOCUS_QD: |
| result_plan = (Plan*)make_motion_gather_to_QD(result_plan, (current_pathkeys != NIL)); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows * root->config->cdbpath_segments); |
| break; |
| |
| case MPP_GRP_PREP_HASH_DISTINCT: |
| case MPP_GRP_PREP_BROADCAST: |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("no parallel plan for aggregation"))); |
| break; /* Never */ |
| } |
| |
| Assert(result_plan->flow); |
| |
| /* |
| * Insert AGG or GROUP node if needed, plus an explicit sort step |
| * if necessary. |
| * |
| * HAVING clause, if any, becomes qual of the Agg or Group node. |
| */ |
| if (!ctx->is_grpext && use_hashed_grouping) |
| { |
| /* Hashed aggregate plan --- no sort needed */ |
| result_plan = (Plan *) make_agg(root, |
| tlist, |
| (List *) parse->havingQual, |
| AGG_HASHED, false, |
| numGroupCols, |
| groupColIdx, |
| numGroups, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| ctx->grouping, |
| 0, /* rollup_gs_times */ |
| ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace, |
| result_plan); |
| /* Hashed aggregation produces randomly-ordered results */ |
| current_pathkeys = NIL; |
| } |
| else if (parse->hasAggs || parse->groupClause) |
| { |
| if (!ctx->is_grpext) |
| { |
| /* Plain aggregate plan --- sort if needed */ |
| AggStrategy aggstrategy; |
| |
| if (parse->groupClause) |
| { |
| if (!pathkeys_contained_in(root->group_pathkeys, |
| current_pathkeys)) |
| { |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| parse->groupClause, |
| groupColIdx, |
| false, |
| result_plan); |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| } |
| aggstrategy = AGG_SORTED; |
| |
| /* |
| * The AGG node will not change the sort ordering of its |
| * groups, so current_pathkeys describes the result too. |
| */ |
| } |
| else |
| { |
| aggstrategy = AGG_PLAIN; |
| /* Result will be only one row anyway; no sort order */ |
| current_pathkeys = NIL; |
| } |
| |
| result_plan = (Plan *) make_agg(root, |
| tlist, |
| (List *) parse->havingQual, |
| aggstrategy, false, |
| numGroupCols, |
| groupColIdx, |
| numGroups, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| ctx->grouping, |
| 0, /* rollup_gs_times */ |
| ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace, |
| result_plan); |
| } |
| |
| else |
| { |
| result_plan = plan_grouping_extension(root, path, ctx->tuple_fraction, |
| ctx->use_hashed_grouping, |
| &tlist, sub_tlist, |
| true, false, |
| (List *) parse->havingQual, |
| &numGroupCols, |
| &groupColIdx, |
| ctx->agg_counts, |
| ctx->canonical_grpsets, |
| ctx->p_dNumGroups, |
| &(ctx->querynode_changed), |
| ¤t_pathkeys, |
| result_plan); |
| } |
| } |
| else if (root->hasHavingQual) |
| { |
| /* No aggregates, and no GROUP BY, but a HAVING qual is a |
| * degenerate case discussed in grouping_planner. We can |
| * just throw away the plan-so-far and let the caller handle |
| * the whole enchilada. |
| */ |
| return NULL; |
| } |
| |
| /* |
| * Decorate the top node with a Flow node if it doesn't have one yet. |
| * (In such cases we require the next-to-top node to have a Flow node |
| * from which we can obtain the distribution info.) |
| */ |
| if (!result_plan->flow) |
| { |
| Assert(!IsA(result_plan, Motion)); |
| result_plan->flow = pull_up_Flow(result_plan, |
| result_plan->lefttree, |
| (current_pathkeys != NIL)); |
| } |
| |
| /* Marshal implicit results. Return explicit result. */ |
| ctx->current_pathkeys = current_pathkeys; |
| return result_plan; |
| } |
| |
| /* |
| * Function make_two_stage_agg_plan |
| * |
| * Construct a two-stage aggregation plan. |
| */ |
| static Plan * |
| make_two_stage_agg_plan(PlannerInfo *root, |
| MppGroupContext *ctx) |
| { |
| Query *parse = root->parse; |
| List *prelim_tlist = NIL; |
| List *final_tlist = NIL; |
| List *final_qual = NIL; |
| List *distinctExpr = NIL; |
| List *groupExprs = NIL; |
| List *current_pathkeys; |
| Plan *result_plan; |
| QualCost tlist_cost; |
| AggStrategy aggstrategy; |
| int i; |
| int numGroupCols; |
| AttrNumber *groupColIdx; |
| AttrNumber *prelimGroupColIdx; |
| Path *path = ctx->best_path; /* no use for ctx->cheapest_path */ |
| long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 : |
| (*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX : |
| (long)*(ctx->p_dNumGroups); |
| |
| /* Copy these from context rather than using them directly because we may |
| * scribble on them in plan_grouping_extension(). It would be good to |
| * clean this up, but not today. |
| */ |
| numGroupCols = ctx->numGroupCols; |
| groupColIdx = ctx->groupColIdx; |
| |
| /* Create the base plan which will serve as the outer plan (argument) |
| * of the partial Agg node. |
| */ |
| if (ctx->subplan == NULL) |
| { |
| result_plan = create_plan(root, path); |
| current_pathkeys = path->pathkeys; |
| |
| /* Instead of the flat target list produced by create_plan above, use |
| * the sub_tlist constructed in cdb_grouping_planner. This consists |
| * of just the (levelsup==0) vars mentioned in the SELECT and HAVING |
| * clauses plus entries for any GROUP BY expressions that are not |
| * simple Vars. (This is the same sub_tlist as used in 1-stage |
| * aggregation and in normal aggregation in grouping_planner). |
| * |
| * If the base plan is of a type that can't project, add a Result |
| * node to carry the new target list, else install it directly. |
| * (Though the result node may not always be necessary, it is safe, |
| * and superfluous Result nodes are removed later.) |
| */ |
| result_plan = plan_pushdown_tlist(result_plan, ctx->sub_tlist); |
| |
| /* Account for the cost of evaluation of the sub_tlist. */ |
| cost_qual_eval(&tlist_cost, ctx->sub_tlist, root); |
| result_plan->startup_cost += tlist_cost.startup; |
| result_plan->total_cost += |
| tlist_cost.startup + |
| tlist_cost.per_tuple * result_plan->plan_rows; |
| } |
| else |
| { |
| result_plan = ctx->subplan; |
| current_pathkeys = ctx->current_pathkeys; |
| } |
| |
| /* At this point result_plan produces the input relation for two-stage |
| * aggregation. |
| * |
| * Begin by preconditioning the input, if necessary, to collocate on |
| * non-distinct values of a single DISTINCT argument. |
| */ |
| switch ( ctx->prep ) |
| { |
| case MPP_GRP_PREP_NONE: |
| break; |
| |
| case MPP_GRP_PREP_HASH_DISTINCT: |
| Assert(list_length( ctx->agg_counts->dqaArgs) == 1 ); |
| Assert( ctx->agg_counts->dqaArgs != NIL); |
| if (!ctx->distinctkey_collocate) |
| { |
| distinctExpr = list_make1(linitial(ctx->agg_counts->dqaArgs)); |
| distinctExpr = copyObject(distinctExpr); |
| result_plan = (Plan*)make_motion_hash(root, result_plan, distinctExpr); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows); |
| current_pathkeys = NIL; /* No longer sorted. */ |
| } |
| |
| break; |
| |
| case MPP_GRP_PREP_FOCUS_QD: |
| case MPP_GRP_PREP_FOCUS_QE: |
| case MPP_GRP_PREP_HASH_GROUPS: |
| case MPP_GRP_PREP_BROADCAST: |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected call for two-stage aggregation"))); |
| break; /* Never */ |
| } |
| |
| /* |
| * Get the target lists for the preliminary and final aggregations and |
| * the qual (HAVING clause) for the final aggregation based on the target |
| * list of the base plan. Grouping attributes go on front of preliminary |
| * target list. |
| */ |
| generate_multi_stage_tlists(ctx, |
| &prelim_tlist, |
| NULL, |
| &final_tlist, |
| &final_qual); |
| |
| /* |
| * Since the grouping attributes, if any, are on the front and in order |
| * on the preliminary targetlist, we need a different vector of grouping |
| * attribute numbers: (1, 2, 3, ...). Later, we'll need |
| */ |
| prelimGroupColIdx = NULL; |
| if ( numGroupCols > 0 ) |
| { |
| prelimGroupColIdx = (AttrNumber*)palloc(numGroupCols * sizeof(AttrNumber)); |
| for ( i = 0; i < numGroupCols; i++ ) |
| prelimGroupColIdx[i] = i+1; |
| } |
| |
| /* |
| * Add the Preliminary Agg Node. |
| * |
| * When this aggregate is a ROLLUP, we add a sequence of preliminary Agg node. |
| */ |
| /* Determine the aggregation strategy to use. */ |
| if ( ctx->use_hashed_grouping ) |
| { |
| aggstrategy = AGG_HASHED; |
| current_pathkeys = NIL; |
| } |
| else |
| { |
| if (parse->groupClause) |
| { |
| if (!ctx->is_grpext && !pathkeys_contained_in(root->group_pathkeys, |
| current_pathkeys)) |
| { |
| /* TODO -- Investigate WHY we might sort here! |
| * |
| * Good reasons would be that one of the grouping |
| * expressions isn't "hashable" or that too may groups |
| * are anticipated. |
| * |
| * A bad reason would be that the final result will be in |
| * order of the grouping key. (Redistribution will remove |
| * the ordering.) |
| */ |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| parse->groupClause, |
| groupColIdx, |
| false, |
| result_plan); |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| } |
| aggstrategy = AGG_SORTED; |
| /* The AGG node will not change the sort ordering of its |
| * groups, so current_pathkeys describes the result too. |
| */ |
| } |
| else |
| { |
| aggstrategy = AGG_PLAIN; |
| current_pathkeys = NIL; /* One row, no sort order */ |
| } |
| } |
| |
| if (!ctx->is_grpext) |
| { |
| result_plan = (Plan *) make_agg(root, |
| prelim_tlist, |
| NIL, /* no havingQual */ |
| aggstrategy, root->config->gp_hashagg_streambottom, |
| numGroupCols, |
| groupColIdx, |
| numGroups, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| 0, /* grouping */ |
| 0, /* rollup_gs_times */ |
| ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace, |
| result_plan); |
| /* May lose useful locus and sort. Unlikely, but could do better. */ |
| mark_plan_strewn(result_plan); |
| current_pathkeys = NIL; |
| } |
| |
| else |
| { |
| result_plan = plan_grouping_extension(root, path, ctx->tuple_fraction, |
| ctx->use_hashed_grouping, |
| &prelim_tlist, ctx->sub_tlist, |
| true, true, |
| NIL, /* no havingQual */ |
| &numGroupCols, |
| &groupColIdx, |
| ctx->agg_counts, |
| ctx->canonical_grpsets, |
| ctx->p_dNumGroups, |
| &(ctx->querynode_changed), |
| ¤t_pathkeys, |
| result_plan); |
| /* Since we add Grouping as an additional grouping column, |
| * we need to add it into prelimGroupColIdx. */ |
| if (prelimGroupColIdx != NULL) |
| prelimGroupColIdx = (AttrNumber *) |
| repalloc(prelimGroupColIdx, |
| numGroupCols * sizeof(AttrNumber)); |
| else |
| prelimGroupColIdx = (AttrNumber *) |
| palloc0(numGroupCols * sizeof(AttrNumber)); |
| |
| Assert(numGroupCols >= 2); |
| prelimGroupColIdx[numGroupCols-1] = groupColIdx[numGroupCols-1]; |
| prelimGroupColIdx[numGroupCols-2] = groupColIdx[numGroupCols-2]; |
| } |
| |
| /* |
| * Add Intermediate Motion to Gather or Hash on Groups |
| */ |
| switch ( ctx->type ) |
| { |
| case MPP_GRP_TYPE_GROUPED_2STAGE: |
| groupExprs = NIL; |
| Assert(numGroupCols > 0); |
| for ( i = 0; i < numGroupCols; i++) |
| { |
| TargetEntry *tle; |
| |
| /* skip Grouping/GroupId columns */ |
| if (ctx->is_grpext && (i == numGroupCols-1 || i == numGroupCols-2)) |
| continue; |
| |
| tle = get_tle_by_resno(prelim_tlist, prelimGroupColIdx[i]); |
| groupExprs = lappend(groupExprs, copyObject(tle->expr)); |
| } |
| result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows); |
| break; |
| |
| case MPP_GRP_TYPE_PLAIN_2STAGE: |
| result_plan = (Plan*)make_motion_gather_to_QE(result_plan, false); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows * root->config->cdbpath_segments); |
| break; |
| |
| case MPP_GRP_TYPE_NONE: |
| case MPP_GRP_TYPE_BASEPLAN: |
| case MPP_GRP_TYPE_GROUPED_DQA_2STAGE: |
| case MPP_GRP_TYPE_PLAIN_DQA_2STAGE: |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected use of 2-stage aggregation"))); |
| break; /* Never */ |
| } |
| |
| /* |
| * Add Sort on Groups if needed for AGG_SORTED strategy |
| */ |
| if (aggstrategy == AGG_SORTED) |
| { |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| parse->groupClause, |
| prelimGroupColIdx, |
| ctx->is_grpext, |
| result_plan); |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| } |
| |
| result_plan = add_second_stage_agg(root, |
| true, |
| prelim_tlist, |
| final_tlist, |
| final_qual, |
| aggstrategy, |
| numGroupCols, |
| prelimGroupColIdx, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| ctx->grouping, |
| 0, /* rollup_gs_times */ |
| *ctx->p_dNumGroups, |
| ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace, |
| "partial_aggregation", |
| ¤t_pathkeys, |
| result_plan, |
| !ctx->is_grpext, |
| true); |
| |
| if (ctx->is_grpext) |
| { |
| ListCell *lc; |
| bool found = false; |
| |
| ((Agg *)result_plan)->inputHasGrouping = true; |
| |
| /* |
| * We want to make sure that the targetlist of result plan contains |
| * either GROUP_ID or a targetentry to represent the value of |
| * GROUP_ID from the subplans. This is because we may need this |
| * entry to determine if a tuple will be outputted repeatly, by |
| * the later Repeat node. In the current grouping extension |
| * planner, if there is no GROUP_ID entry, then it must be the last |
| * entry in the targetlist of the subplan. |
| */ |
| foreach (lc, result_plan->targetlist) |
| { |
| TargetEntry *te = (TargetEntry *)lfirst(lc); |
| |
| /* |
| * Find out if GROUP_ID in the final targetlist. It should |
| * point to the last attribute in the subplan targetlist. |
| */ |
| if (IsA(te->expr, Var)) |
| { |
| Var *var = (Var *)te->expr; |
| if (var->varattno == list_length(prelim_tlist)) |
| { |
| found = true; |
| break; |
| } |
| } |
| } |
| |
| if (!found) |
| { |
| /* Add a new target entry in the targetlist which point to |
| * GROUP_ID attribute in the subplan. Mark this entry |
| * as Junk. |
| */ |
| TargetEntry *te = get_tle_by_resno(prelim_tlist, |
| list_length(prelim_tlist)); |
| Expr *expr; |
| TargetEntry *new_te; |
| |
| expr = (Expr *)makeVar(1, |
| te->resno, |
| exprType((Node *)te->expr), |
| exprTypmod((Node *)te->expr), |
| 0); |
| new_te = makeTargetEntry(expr, |
| list_length(result_plan->targetlist) + 1, |
| "group_id", |
| true); |
| result_plan->targetlist = lappend(result_plan->targetlist, |
| new_te); |
| } |
| } |
| |
| /* Marshal implicit results. Return explicit result. */ |
| ctx->current_pathkeys = current_pathkeys; |
| ctx->querynode_changed = true; |
| return result_plan; |
| } |
| |
| |
| /* |
| * Function make_three_stage_agg_plan |
| * |
| * Construct a three-stage aggregation plan involving DQAs (DISTINCT-qualified |
| * aggregate functions. |
| * |
| * Such a plan will always involve the following three aggregation phases: |
| * |
| * - preliminary -- remove duplicate (grouping key, DQA argument) values |
| * from an arbitrarily partitioned input; pre-aggregate plain aggregate |
| * functions. |
| * |
| * - intermediate -- remove duplicate (grouping key, DQA argument) values |
| * from an input partitioned on the grouping key; pre-aggregate the |
| * pre-aggregated results of preliminary plain aggregate functions. |
| * |
| * - final -- apply ordinary aggregation to DQA arguments (now distinct |
| * within their group) and final aggregation to the pre-aggregated results |
| * of the previous phase. |
| * |
| * In addition, if there is more than one DQA in the query, the plan will |
| * join the results of the individual three-phase aggregations into the |
| * final result. |
| * |
| * The preliminary aggregation phase occurs prior to the collocating |
| * motion and is planned independently on the theory that any ordering |
| * will be disrupted by the motion. There are cases where this isn't |
| * necessarily the case, but they are unexploited for now. |
| * |
| * The intermediate and final aggregation phases... |
| */ |
| static Plan * |
| make_three_stage_agg_plan(PlannerInfo *root, MppGroupContext *ctx) |
| { |
| List *current_pathkeys; |
| Plan *result_plan; |
| QualCost tlist_cost; |
| Path *path = ctx->best_path; /* no use for ctx->cheapest_path */ |
| |
| /* We assume that we are called only when |
| * - there are no grouping extensions (like ROLLUP), |
| * - the input is partitioned and needs no preparatory Motion, |
| * - the required transformation involves DQAs. |
| */ |
| Assert ( !is_grouping_extension(ctx->canonical_grpsets) ); |
| Assert ( ctx->prep == MPP_GRP_PREP_NONE ); |
| Assert ( ctx->type == MPP_GRP_TYPE_GROUPED_DQA_2STAGE |
| || ctx->type == MPP_GRP_TYPE_PLAIN_DQA_2STAGE ); |
| |
| /* Create the base plan which will serve as the outer plan (argument) |
| * of the partial Agg node(s). |
| */ |
| if (ctx->subplan == NULL) |
| { |
| result_plan = create_plan(root, path); |
| current_pathkeys = path->pathkeys; |
| |
| /* Instead of the flat target list produced above, use the sub_tlist |
| * constructed in cdb_grouping_planner. Add a Result node if the |
| * base plan can't project. (This may be unnecessary, but, if so, |
| * the Result node will be removed later.) |
| */ |
| result_plan = plan_pushdown_tlist(result_plan, ctx->sub_tlist); |
| |
| Assert(result_plan->flow); |
| |
| /* Account for the cost of evaluation of the sub_tlist. */ |
| cost_qual_eval(&tlist_cost, ctx->sub_tlist, root); |
| result_plan->startup_cost += tlist_cost.startup; |
| result_plan->total_cost += |
| tlist_cost.startup + |
| tlist_cost.per_tuple * result_plan->plan_rows; |
| } |
| else |
| { |
| result_plan = ctx->subplan; |
| current_pathkeys = ctx->current_pathkeys; |
| } |
| |
| /* Use caller specified join_strategy: None, Cross, Hash, or Merge. */ |
| |
| prepare_dqa_pruning_tlists(ctx); |
| |
| if ( list_length(ctx->agg_counts->dqaArgs) == 1 ) |
| { |
| /* Note: single-DQA plans don't require a join and are handled |
| * specially by make_plan_for_one_dqa so we can return the result |
| * directly. |
| */ |
| Query *query; |
| |
| result_plan = make_plan_for_one_dqa(root, ctx, 0, |
| result_plan, &query); |
| memcpy(root->parse, query, sizeof(Query)); |
| |
| pfree(query); |
| } |
| else |
| { |
| /* Multi-DQA plans are trickier because of the need to consider input |
| * sharing and the need to join the coplans back together. |
| */ |
| List *share_partners; |
| int i; |
| List *rtable = NIL; |
| |
| if ( ctx->use_sharing ) |
| { |
| share_partners = share_plan(root, result_plan, ctx->numDistinctCols); |
| } |
| else |
| { |
| share_partners = NIL; |
| share_partners = lappend(share_partners, result_plan); |
| for ( i = 1; i < ctx->numDistinctCols; i++ ) |
| { |
| share_partners = lappend(share_partners, copyObject(result_plan)); |
| } |
| } |
| |
| /* Construct a coplan for each distinct DQA argument. */ |
| for ( i = 0; i < ctx->numDistinctCols; i++ ) |
| { |
| char buffer[50]; |
| int j; |
| ListCell *l; |
| Alias *eref; |
| Plan *coplan; |
| Query *coquery; |
| |
| coplan = (Plan*)list_nth(share_partners,i); |
| coplan = make_plan_for_one_dqa(root, ctx, i, |
| coplan, &coquery); |
| |
| eref = makeNode(Alias); |
| sprintf(buffer, "dqa_coplan_%d", i+1); |
| eref->aliasname = pstrdup(buffer); |
| eref->colnames = NIL; |
| j = 1; |
| foreach (l, coplan->targetlist) |
| { |
| TargetEntry *tle = (TargetEntry*)lfirst(l); |
| Value *colname = get_tle_name(tle, coquery->rtable, buffer); |
| eref->colnames = lappend(eref->colnames, colname); |
| j++; |
| } |
| |
| rtable = lappend(rtable, |
| package_plan_as_rte(coquery, coplan, eref, NIL)); |
| ctx->dqaArgs[i].coplan = add_subqueryscan(root, NULL, i+1, coquery, coplan); |
| } |
| |
| /* Begin with the first coplan, then join in each suceeding coplan. */ |
| result_plan = ctx->dqaArgs[0].coplan; |
| for ( i = 1; i < ctx->numDistinctCols; i++ ) |
| { |
| result_plan = join_dqa_coplan(root, ctx, result_plan, i); |
| } |
| |
| /* Finalize the last join plan so it has the correct target list |
| * and having qual. |
| */ |
| ctx->top_tlist = result_plan->targetlist; |
| |
| result_plan->targetlist = (List*) finalize_split_expr((Node*) ctx->fin_tlist, ctx); |
| result_plan->qual = (List*) finalize_split_expr((Node*) ctx->fin_hqual, ctx); |
| |
| /* |
| * Reconstruct the flow since the targetlist for the result_plan may have |
| * changed. |
| */ |
| result_plan->flow = pull_up_Flow(result_plan, |
| result_plan->lefttree, |
| true); |
| |
| /* Need to adjust root. Is this enuf? I think so. */ |
| root->parse->rtable = rtable; |
| root->parse->targetList = copyObject(result_plan->targetlist); |
| } |
| // Rebuild arrays for RelOptInfo and RangeTblEntry for the PlannerInfo |
| // since the underlying range tables have been transformed |
| rebuild_simple_rel_and_rte(root); |
| |
| return result_plan; |
| } |
| |
| |
| /* Helper for qsort in planDqaJoinOrder. */ |
| int compareDqas(const void *larg, const void *rarg) |
| { |
| double lft = ((DqaInfo*)larg)->num_rows; |
| double rgt = ((DqaInfo*)rarg)->num_rows; |
| return (lft < rgt)? -1 : (lft == rgt)? 0 : 1; |
| } |
| |
| /* Collect per distinct DQA argument information for use in single- and |
| * multiple-DQA planning and cache it in the context as a new array of |
| * DqaInfo structures anchored at ctx->dqaArgs. The order of elements |
| * in the array determines join order for a multi-DQA plan. |
| * |
| * Note: The original list of distinct DQA arguments was collected by |
| * the count_agg_clauses earlier in planning. Later, make_subplan_tlist |
| * used it to guarantee that the DQA arguments have target entries with |
| * non-zero sortgroupref values and to generate vector ctx->distinctColIdx |
| * to locate those entries. Here, however, we use that vector to locate |
| * the DQA arguments and reorder the vector to agree with join order. |
| */ |
| void planDqaJoinOrder(PlannerInfo *root, MppGroupContext *ctx, |
| double input_rows) |
| { |
| int i; |
| DqaInfo *args; |
| Node *distinctExpr; |
| |
| Assert( ctx->numDistinctCols == list_length(ctx->agg_counts->dqaArgs) ); |
| |
| /* Collect row count estimates for the partial results. */ |
| if ( ctx->numDistinctCols == 0 ) |
| { |
| ctx->dqaArgs = NULL; |
| return; |
| } |
| |
| args = (DqaInfo*)palloc( ctx->numDistinctCols * sizeof(DqaInfo)); |
| |
| for ( i = 0; i < ctx->numDistinctCols; i++) |
| { |
| TargetEntry *dtle; |
| List *x; |
| int j; |
| |
| /* Like PG and the SQL standard, we assume that a DQA may take only |
| * a single argument -- no REGR_SXY(DISTINCT X,Y). This is what allows |
| * distinctExpr to be an expression rather than a list of expressions. |
| */ |
| dtle = get_tle_by_resno(ctx->sub_tlist, ctx->distinctColIdx[i]); |
| distinctExpr = (Node*) dtle->expr; |
| |
| x = NIL; |
| for ( j = 0; j < ctx->numGroupCols ; j++ ) |
| { |
| TargetEntry *tle; |
| |
| tle = get_tle_by_resno(ctx->sub_tlist,ctx->groupColIdx[j]); |
| x = lappend(x, tle->expr); |
| } |
| x = lappend(x, distinctExpr); |
| |
| args[i].distinctExpr = distinctExpr; /* no copy */ |
| args[i].base_index = dtle->resno; |
| args[i].num_rows = estimate_num_groups(root, x, input_rows); |
| args[i].can_hash = hash_safe_type(exprType(distinctExpr)); |
| |
| list_free(x); |
| } |
| qsort(args, ctx->numDistinctCols, sizeof(DqaInfo), compareDqas); |
| |
| /* Reorder ctx->distinctColIdx to agree with join order. */ |
| for ( i = 0; i < ctx->numDistinctCols; i++ ) |
| { |
| ctx->distinctColIdx[i] = args[i].base_index; |
| } |
| |
| ctx->dqaArgs = args; |
| } |
| |
| |
| /* Function make_plan_for_one_dqa |
| * |
| * Subroutine for make_three_stage_agg_plan constructs a coplan for |
| * the specified DQA index [0..numDistinctCols-1] which selects a DqaInfo |
| * entry from the context. |
| * |
| * In multi-DQA plans, coplans have minimal targetlists (just grouping |
| * keys, DQA arguments, and results of single aggregate functions). In |
| * case this is a single-DQA (join-less) plan, the coplan target list is |
| * "finalized" to produce the result requested by the user (which may |
| * include expressions over the minimal list in the targetlist and/or |
| * having qual). |
| * |
| * A Query (including range table) which approximates a query for the |
| * returned plan is stored back into *coquery_p, if coquery_p is not NULL. |
| */ |
| static Plan * |
| make_plan_for_one_dqa(PlannerInfo *root, MppGroupContext *ctx, int dqa_index, |
| Plan* result_plan, Query **coquery_p) |
| { |
| DqaCoplanType coplan_type; |
| List *prelim_tlist = NIL; |
| List *inter_tlist = NIL; |
| List *final_tlist = NIL; |
| List *final_qual = NIL; |
| List *groupExprs = NIL; |
| List *current_pathkeys; |
| AggStrategy aggstrategy; |
| AttrNumber *prelimGroupColIdx; |
| AttrNumber *inputGroupColIdx; |
| List *extendedGroupClause; |
| Query *original_parse; |
| bool groups_sorted = false; |
| long numGroups; |
| int i, n; |
| DqaInfo *dqaArg = &ctx->dqaArgs[dqa_index]; |
| bool sort_coplans = ( ctx->join_strategy == DqaJoinMerge ); |
| bool groupkeys_collocate = cdbpathlocus_collocates(ctx->input_locus, root->group_pathkeys, false /*exact_match*/); |
| bool need_inter_agg = false; |
| bool dqaduphazard = false; |
| bool stream_bottom_agg = root->config->gp_hashagg_streambottom; /* Take hint */ |
| |
| /* Planning will perturb root->parse, so we copy it's content aside |
| * so we can restore it later. We flat copy instead of resetting |
| * because code in the stack may have a local variable set to the |
| * value of root->parse. |
| */ |
| original_parse = makeNode(Query); |
| memcpy(original_parse, root->parse, sizeof(Query)); |
| |
| /* Our caller, make_three_stage_agg_plan, pushed ctx->sub_tlist onto |
| * result_plan. This contains all the keys and arguments for the |
| * whole query. While it would be possible to generate a smaller |
| * targetlist to use for this single DQA it is probably not worth |
| * the complexity. Just use sub_tlist as given. |
| * |
| * The DQA argument of interest is attribute dqaArg->baseIndex. |
| * |
| * Get the target lists for the preliminary, intermediate and final |
| * aggregations and the qual (HAVING clause) for the final aggregation |
| * based on the target list of the base plan. Grouping attributes go on |
| * front of preliminary and intermediate target lists. |
| */ |
| generate_dqa_pruning_tlists(ctx, |
| dqa_index, |
| &prelim_tlist, |
| &inter_tlist, |
| &final_tlist, |
| &final_qual); |
| |
| /* |
| * For the first aggregation phases the original grouping attributes |
| * (maybe zero of them) must be extended to include the DQA argument |
| * attribute (exactly one of them) to be pruned. |
| * |
| * The grouping attributes and a single DQA argument are on the front and |
| * in order on the preliminary and intermediate targetlists so we need a |
| * new vector of grouping attributes, prelimGroupColIdx = (1, 2, 3, ...), |
| * for use in these aggregations. The vector inputGroupColIdx plays a |
| * similar role for sub_tlist. |
| * |
| * The initial-phase group clause, extendedGroupClause, is the one in |
| * the query (assumed to have no grouping extensions) augmented by a |
| * GroupClause node for the DQA argument. This is where the sort |
| * operator for the DQA argument is selected. |
| */ |
| { |
| GroupClause* gc; |
| TargetEntry *tle; |
| |
| prelimGroupColIdx = inputGroupColIdx = NULL; |
| |
| n = ctx->numGroupCols + 1; /* add the DQA argument as a grouping key */ |
| Assert( n > 0 ); |
| |
| prelimGroupColIdx = (AttrNumber*)palloc(n * sizeof(AttrNumber)); |
| inputGroupColIdx = (AttrNumber*)palloc(n * sizeof(AttrNumber)); |
| |
| for ( i = 0; i < n; i++ ) |
| prelimGroupColIdx[i] = i+1; |
| for ( i = 0; i < ctx->numGroupCols; i++ ) |
| inputGroupColIdx[i] = ctx->groupColIdx[i]; |
| inputGroupColIdx[ctx->numGroupCols] = dqaArg->base_index; |
| |
| gc = makeNode(GroupClause); |
| tle = get_tle_by_resno(ctx->sub_tlist, dqaArg->base_index); |
| gc->tleSortGroupRef = tle->ressortgroupref; |
| gc->sortop = ordering_oper_opid(exprType((Node*)dqaArg->distinctExpr)); |
| |
| extendedGroupClause = list_copy(root->parse->groupClause); |
| extendedGroupClause = lappend(extendedGroupClause,gc); |
| } |
| |
| /* |
| * Determine the first-phase aggregation strategy to use. Prefer hashing |
| * to sorting because the benefit of the sort will be lost by the Motion |
| * to follow. |
| */ |
| if ( dqaArg->use_hashed_preliminary ) |
| { |
| aggstrategy = AGG_HASHED; |
| current_pathkeys = NIL; |
| } |
| else |
| { |
| /* Here we need to sort! The input pathkeys won't contain the |
| * DQA argument, so just do it. |
| */ |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| extendedGroupClause, |
| inputGroupColIdx, |
| false, |
| result_plan); |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| aggstrategy = AGG_SORTED; |
| /* The AGG node will not change the sort ordering of its |
| * groups, so current_pathkeys describes the result too. |
| */ |
| } |
| |
| /* |
| * Preliminary Aggregation: With the pre-existing distribution, group |
| * by the combined grouping key and DQA argument. In the case of the |
| * first coplan, this phase also pre-aggregates any non-DQAs. This |
| * eliminates duplicate values of the DQA argument on each QE. |
| */ |
| numGroups = (dqaArg->num_rows < 0) ? 0 : |
| (dqaArg->num_rows > LONG_MAX) ? LONG_MAX : |
| (long)dqaArg->num_rows; |
| |
| /* |
| * If the data is distributed on the distinct qualified aggregate's key |
| * and there is no grouping key, then we prefer to not stream the bottom agg |
| */ |
| if (dqaArg->distinctkey_collocate && ctx->numGroupCols == 0) |
| { |
| stream_bottom_agg = false; |
| } |
| |
| result_plan = (Plan *) make_agg(root, |
| prelim_tlist, |
| NIL, /* no havingQual */ |
| aggstrategy, stream_bottom_agg, |
| ctx->numGroupCols + 1, |
| inputGroupColIdx, |
| numGroups, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| 0, /* grouping */ |
| 0, /* rollup_gs_times */ |
| ctx->agg_counts->numAggs - ctx->agg_counts->numDistinctAggs + 1, |
| ctx->agg_counts->transitionSpace, /* worst case */ |
| result_plan); |
| |
| dqaduphazard = (aggstrategy == AGG_HASHED && stream_bottom_agg); |
| |
| result_plan->flow = pull_up_Flow(result_plan, result_plan->lefttree, (aggstrategy == AGG_SORTED)); |
| |
| current_pathkeys = NIL; |
| |
| /* |
| * Intermediate Motion: Gather or Hash on Groups to get colocation |
| * on the grouping key. Note that this may bring duplicate values |
| * of the DQA argument together on the QEs. |
| */ |
| switch ( ctx->type ) |
| { |
| case MPP_GRP_TYPE_GROUPED_DQA_2STAGE: |
| if (!groupkeys_collocate) |
| { |
| groupExprs = NIL; |
| Assert(ctx->numGroupCols > 0); |
| for ( i = 0; i < ctx->numGroupCols; i++) |
| { |
| TargetEntry *tle; |
| |
| tle = get_tle_by_resno(prelim_tlist, prelimGroupColIdx[i]); |
| groupExprs = lappend(groupExprs, copyObject(tle->expr)); |
| } |
| result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows); |
| } |
| |
| break; |
| |
| case MPP_GRP_TYPE_PLAIN_DQA_2STAGE: |
| /* Assert that this is only called for a plain DQA like select count(distinct x) from foo */ |
| |
| Assert(ctx->numGroupCols == 0); /* No group-by */ |
| Assert(n == 1); |
| |
| /* If already collocated on DQA arg, don't redistribute */ |
| if (!dqaArg->distinctkey_collocate) |
| { |
| TargetEntry *tle = get_tle_by_resno(ctx->sub_tlist, dqaArg->base_index); |
| Assert(tle); |
| groupExprs = lappend(NIL, copyObject(tle->expr)); |
| |
| result_plan = (Plan*)make_motion_hash(root, result_plan, groupExprs); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows); |
| } |
| break; |
| |
| case MPP_GRP_TYPE_NONE: |
| case MPP_GRP_TYPE_BASEPLAN: |
| case MPP_GRP_TYPE_GROUPED_2STAGE: |
| case MPP_GRP_TYPE_PLAIN_2STAGE: |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected use of DQA pruned 2-phase aggregation"))); |
| break; /* Never */ |
| } |
| current_pathkeys = NIL; |
| |
| groups_sorted = false; |
| |
| if ( sort_coplans ) |
| { |
| coplan_type = dqaArg->coplan_type_sorted; |
| } |
| else |
| { |
| coplan_type = dqaArg->coplan_type_cheapest; |
| } |
| |
| if ( dqaduphazard || |
| (!dqaArg->distinctkey_collocate && !groupkeys_collocate) ) |
| { |
| /* Intermediate Aggregation: Grouping key values are colocated so group |
| * by the combined grouping key and DQA argument while intermediate- |
| * aggregating any non-DQAs. This once again (and finally) eliminates |
| * duplicate values of the DQA argument on each QE. |
| */ |
| need_inter_agg = true; |
| |
| switch (coplan_type) |
| { |
| case DQACOPLAN_GGS: |
| case DQACOPLAN_PGS: |
| aggstrategy = AGG_SORTED; |
| |
| /* pre-sort required on combined grouping key and DQA argument */ |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| extendedGroupClause, |
| prelimGroupColIdx, |
| false, |
| result_plan); |
| groups_sorted = true; |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| break; |
| |
| case DQACOPLAN_GSH: |
| case DQACOPLAN_SHH: |
| case DQACOPLAN_HH: |
| case DQACOPLAN_PH: |
| aggstrategy = AGG_HASHED; |
| groups_sorted = false; |
| break; |
| } |
| |
| result_plan = add_second_stage_agg(root, |
| true, |
| prelim_tlist, |
| inter_tlist, |
| NULL, |
| aggstrategy, |
| ctx->numGroupCols + 1, |
| prelimGroupColIdx, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| 0, /* grouping */ |
| 0, /* rollup_gs_times */ |
| dqaArg->num_rows, |
| ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace, |
| "partial_aggregation", |
| ¤t_pathkeys, |
| result_plan, |
| true, false); |
| } |
| |
| /* Final Aggregation: Group by the grouping key, aggregate the now |
| * distinct values of the DQA argument using non-distinct-qualified |
| * aggregation, final aggregate the intermediate values of any non-DQAs. |
| */ |
| |
| switch (coplan_type) |
| { |
| case DQACOPLAN_GSH: |
| /* pre-sort required on grouping key */ |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| root->parse->groupClause, |
| prelimGroupColIdx, |
| false, |
| result_plan); |
| groups_sorted = true; |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| /* Fall though. */ |
| |
| case DQACOPLAN_GGS: |
| aggstrategy = AGG_SORTED; |
| break; |
| |
| case DQACOPLAN_SHH: |
| case DQACOPLAN_HH: |
| aggstrategy = AGG_HASHED; |
| groups_sorted = false; |
| break; |
| |
| case DQACOPLAN_PGS: |
| case DQACOPLAN_PH: |
| /* plainagg */ |
| aggstrategy = AGG_PLAIN; |
| groups_sorted = false; |
| break; |
| } |
| |
| /** |
| * In the case where there is no grouping key, we need to gather up all the rows in a single segment to compute the final aggregate. |
| */ |
| if ( ctx->type == MPP_GRP_TYPE_PLAIN_DQA_2STAGE) |
| { |
| /* Assert that this is only called for a plain DQA like select count(distinct x) from foo */ |
| Assert(ctx->numGroupCols == 0); /* No grouping columns */ |
| Assert(n == 1); |
| |
| result_plan = (Plan*)make_motion_gather_to_QE(result_plan, false); |
| result_plan->total_cost += |
| incremental_motion_cost(result_plan->plan_rows, |
| result_plan->plan_rows * root->config->cdbpath_segments); |
| } |
| |
| result_plan = add_second_stage_agg(root, |
| true, |
| need_inter_agg ? inter_tlist : prelim_tlist, |
| final_tlist, |
| final_qual, |
| aggstrategy, |
| ctx->numGroupCols, |
| prelimGroupColIdx, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| ctx->grouping, |
| 0, /* rollup_gs_times */ |
| *ctx->p_dNumGroups, |
| ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace, |
| "partial_aggregation", |
| ¤t_pathkeys, |
| result_plan, |
| true, |
| false); |
| |
| /* Final sort */ |
| switch (coplan_type) |
| { |
| case DQACOPLAN_SHH: |
| /* post-sort required */ |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| root->parse->groupClause, |
| prelimGroupColIdx, |
| false, |
| result_plan); |
| groups_sorted = true; |
| current_pathkeys = root->group_pathkeys; |
| mark_sort_locus(result_plan); |
| break; |
| |
| case DQACOPLAN_GGS: |
| case DQACOPLAN_GSH: |
| case DQACOPLAN_HH: |
| case DQACOPLAN_PGS: |
| case DQACOPLAN_PH: |
| break; |
| } |
| |
| /* Marshal implicit results. Return explicit result. */ |
| if ( groups_sorted ) |
| { |
| /* The following settings work correctly though they seem wrong. |
| * Though we changed the query tree, we say that we did not so that |
| * planner.c will notice the useful sort order we have produced. |
| * We also reset the current pathkeys to the original group keys. |
| * (Though our target list may differ, its attribute-wise ordering |
| * is on the group keys.) |
| */ |
| ctx->current_pathkeys = root->group_pathkeys; /* current_pathkeys are wrong! */ |
| ctx->querynode_changed = false; |
| } |
| else |
| { |
| ctx->current_pathkeys = NIL; |
| ctx->querynode_changed = true; |
| } |
| |
| /* If requested, copy our modified Query (at root->parse) for caller. */ |
| if ( coquery_p != NULL ) |
| { |
| *coquery_p = makeNode(Query); |
| memcpy(*coquery_p, root->parse, sizeof(Query)); |
| } |
| |
| /* Restore the original referent of root->parse. */ |
| memcpy(root->parse, original_parse, sizeof(Query)); |
| pfree(original_parse); |
| |
| return result_plan; |
| } |
| |
| |
| static Plan * |
| join_dqa_coplan(PlannerInfo *root, MppGroupContext *ctx, Plan *outer, int dqa_index) |
| { |
| Plan *join_plan = NULL; |
| Plan *inner = ctx->dqaArgs[dqa_index].coplan; |
| List *join_tlist = NIL; |
| List *tlist = NIL; |
| Index outer_varno = 1; |
| Index inner_varno = dqa_index + 1; |
| Index varno = 1; |
| int i, ng, nd; |
| |
| /* Make the target list for this join. The outer and inner target lists |
| * will look like |
| * (<g'> <D0'> ... <Dn-1'> <F'>) and (<g'> <Dn>) |
| * or |
| * (<g'> <D0> <F>) and (<g'> <Dn>) |
| * The join target list should look like |
| * (<g'> <D0'> ... <Dn'> <F'>) |
| */ |
| /* Use varno 1 for grouping key. */ |
| join_tlist = make_vars_tlist(ctx->grps_tlist, varno, 0); |
| |
| ng = list_length(join_tlist); /* (<g'>) */ |
| nd = ng + list_length(ctx->dref_tlists[0]);/* (<g'> <D0'>) */ |
| |
| for ( i = 0; i <= dqa_index; i++ ) |
| { |
| tlist = make_vars_tlist(ctx->dref_tlists[i], varno+i, ng); |
| join_tlist = seq_tlist_concat(join_tlist, tlist); /* (... <Di'>) */ |
| } |
| |
| tlist = make_vars_tlist(ctx->frefs_tlist, varno, nd); |
| join_tlist = seq_tlist_concat(join_tlist, tlist); /* (... <F'>) */ |
| |
| /* Make the join which will be either a cartesian product (in case of |
| * scalar aggregation) or a merge or hash join (in case of grouped |
| * aggregation.) |
| */ |
| if ( ctx->numGroupCols > 0 ) /* MergeJoin: 1x1 */ |
| { |
| List *joinclause = NIL; |
| List *hashclause = NIL; |
| AttrNumber attrno; |
| |
| Insist( ctx->join_strategy == DqaJoinMerge || ctx->join_strategy == DqaJoinHash ); |
| |
| /* Make the join clause -- a conjunction of IS NOT DISTINCT FROM |
| * predicates on the attributes of the grouping key. |
| */ |
| for ( attrno = 1; attrno <= ctx->numGroupCols; attrno++ ) |
| { |
| Expr *qual; |
| Var *outer_var; |
| Var *inner_var; |
| TargetEntry *tle = get_tle_by_resno(outer->targetlist, attrno); |
| |
| Assert( tle && IsA(tle->expr, Var) ); |
| |
| outer_var = (Var*)copyObject(tle->expr); |
| outer_var->varno = outer_varno; |
| outer_var->varnoold = outer_varno; |
| |
| inner_var = (Var*)copyObject(tle->expr); |
| inner_var->varno = inner_varno; |
| inner_var->varnoold = inner_varno; |
| |
| /* outer should always be on the left */ |
| qual = make_op(NULL, list_make1(makeString("=")), |
| (Node*) outer_var, |
| (Node*) inner_var, -1); |
| |
| if ( ctx->join_strategy == DqaJoinHash ) |
| { |
| hashclause = lappend(hashclause, copyObject(qual)); |
| } |
| |
| qual->type = T_DistinctExpr; |
| qual = make_notclause(qual); |
| joinclause = lappend(joinclause, qual); |
| } |
| |
| if ( ctx->join_strategy == DqaJoinHash ) |
| { |
| /* Make the hash join. */ |
| bool motion_added_outer = false; |
| bool motion_added_inner = false; |
| |
| outer = add_motion_to_dqa_child(outer, root, &motion_added_outer); |
| inner = add_motion_to_dqa_child(inner, root, &motion_added_inner); |
| |
| bool prefetch_inner = motion_added_outer || motion_added_inner; |
| if (motion_added_outer || motion_added_inner) |
| { |
| ctx->current_pathkeys = NULL; |
| } |
| |
| Hash *hash_plan = make_hash(inner); |
| |
| join_plan = (Plan*)make_hashjoin(join_tlist, |
| NIL, /* joinclauses */ |
| NIL, /* otherclauses */ |
| hashclause, /* hashclauses */ |
| joinclause, /* hashqualclauses */ |
| outer, (Plan*)hash_plan, |
| JOIN_INNER); |
| ((Join *) join_plan)->prefetch_inner = prefetch_inner; |
| } |
| else |
| { |
| /* Make the merge join noting that the outer plan produces rows |
| * distinct in the join key. (So does the inner, for that matter, |
| * but the MJ algorithm is only sensitive to the outer.) |
| */ |
| |
| join_plan = (Plan*)make_mergejoin(join_tlist, |
| NIL, NIL, |
| joinclause, |
| outer, inner, |
| JOIN_INNER); |
| ((MergeJoin*)join_plan)->unique_outer = true; |
| } |
| } |
| else /* NestLoop: Cartesian product: 1x1 */ |
| { |
| Insist(ctx->join_strategy == DqaJoinCross); |
| |
| join_plan = (Plan*)make_nestloop(join_tlist, |
| NIL, NIL, |
| outer, inner, |
| JOIN_INNER); |
| ((NestLoop*)join_plan)->singleton_outer = true; |
| } |
| |
| join_plan->startup_cost = outer->startup_cost + inner->startup_cost; |
| join_plan->plan_rows = outer->plan_rows; |
| join_plan->plan_width = outer->plan_width + inner->plan_width; /* too high for MJ */ |
| join_plan->total_cost = outer->total_cost + inner->total_cost; |
| join_plan->total_cost += cpu_tuple_cost * join_plan->plan_rows; |
| |
| join_plan->flow = pull_up_Flow(join_plan, join_plan->lefttree, true); |
| |
| return join_plan; |
| } |
| |
| |
| /* |
| * Function make_subplan_tlist (for multi-phase aggregation) |
| * |
| * The input to the "lower" Agg node will use a "flattened" tlist (having |
| * just the (levelsup==0) vars mentioned in the targetlist and HAVING qual. |
| * This is similar to the target list produced by make_subplanTargetList |
| * and used for 1-stage aggregation in grouping_planner. |
| * |
| * The result target list contains entries for all the simple Var attributes |
| * of the original SELECT and HAVING clauses, plus entries for any GROUP BY |
| * expressions and DQA arguments that are not simple Vars. |
| * |
| * The implicit results are |
| * |
| * - the number of grouping attributes and a vector of their positions |
| * (which are equal to their resno's) in the target list delivered through |
| * pointers pnum_gkeys and pcols_gkeys, and |
| * |
| * - the number of distinct arguments to DISTINCT-qualified aggregate |
| * function and a vector of their positions (which are equal to their |
| * resno's) in the target list delivered through pointers pnum_dqas and |
| * pcols_dqas. These arguments are guaranteed (by the call to function |
| * augment_subplan_tlist) to appear as attributes of the subplan target |
| * list. |
| * |
| * There are no similar results for sort and distinct attributes since |
| * they don't necessarily appear in the subplan target list. |
| */ |
| List *make_subplan_tlist(List *tlist, Node *havingQual, |
| List *grp_clauses, |
| int *pnum_gkeys, AttrNumber **pcols_gkeys, |
| List *dqa_args, |
| int *pnum_dqas, AttrNumber **pcols_dqas) |
| { |
| List *sub_tlist; |
| List *extravars; |
| |
| int num_gkeys; |
| AttrNumber *cols_gkeys; |
| |
| Assert( dqa_args != NIL? pnum_dqas != NULL && pcols_dqas != NULL: true ); |
| |
| sub_tlist = flatten_tlist(tlist); |
| extravars = pull_var_clause(havingQual, false); |
| sub_tlist = add_to_flat_tlist(sub_tlist, extravars, false /* resjunk */); |
| list_free(extravars); |
| |
| num_gkeys = num_distcols_in_grouplist(grp_clauses); |
| if (num_gkeys > 0) |
| { |
| int keyno = 0; |
| ListCell *l; |
| List *tles; |
| |
| cols_gkeys = (AttrNumber*) palloc(sizeof(AttrNumber) * num_gkeys); |
| |
| tles = get_sortgroupclauses_tles(grp_clauses, tlist); |
| |
| foreach (l, tles) |
| { |
| Node *expr; |
| TargetEntry *tle, *sub_tle = NULL; |
| ListCell *sl; |
| |
| tle = (TargetEntry*) lfirst(l); |
| |
| expr = (Node*)tle->expr; |
| |
| /* Find or make a matching sub_tlist entry. */ |
| foreach(sl, sub_tlist) |
| { |
| sub_tle = (TargetEntry *) lfirst(sl); |
| if (equal(expr, sub_tle->expr) && (sub_tle->ressortgroupref == 0)) |
| break; |
| } |
| if (!sl) |
| { |
| sub_tle = makeTargetEntry((Expr*) expr, |
| list_length(sub_tlist) + 1, |
| NULL, |
| false); |
| sub_tlist = lappend(sub_tlist, sub_tle); |
| } |
| |
| /* Set its group reference and save its resno */ |
| sub_tle->ressortgroupref = tle->ressortgroupref; |
| cols_gkeys[keyno++] = sub_tle->resno; |
| } |
| *pnum_gkeys = num_gkeys; |
| *pcols_gkeys = cols_gkeys; |
| } |
| else |
| { |
| *pnum_gkeys = 0; |
| *pcols_gkeys = NULL; |
| } |
| |
| if ( dqa_args != NIL ) |
| sub_tlist = augment_subplan_tlist(sub_tlist, dqa_args, pnum_dqas, pcols_dqas, true); |
| |
| return sub_tlist; /* Possibly modified by appending expression entries. */ |
| } |
| |
| |
| /* Function augment_subplan_tlist |
| * |
| * Called from make_subplan_tlist, not directly. |
| * |
| * Make a target list like the input that includes sortgroupref'd entries |
| * for the expressions in exprs. Note that the entries in the input expression |
| * list must be distinct. |
| * |
| * New entries corresponding to the expressions in the input exprs list |
| * (if any) are added to the argument list. Existing entries are modified |
| * (if necessary) in place. |
| * |
| * Return the (modified) input targetlist. |
| * |
| * Implicitly return an array of resno values for exprs in (pnum, *pcols), if |
| * return_resno is true. |
| */ |
| List *augment_subplan_tlist(List *tlist, List *exprs, int *pnum, AttrNumber **pcols, |
| bool return_resno) |
| { |
| int num; |
| AttrNumber *cols = NULL; |
| |
| num = list_length(exprs); /* Known to be distinct. */ |
| if (num > 0) |
| { |
| int keyno = 0; |
| ListCell *lx, *lt; |
| TargetEntry *tle, *matched_tle; |
| Index max_sortgroupref = 0; |
| |
| foreach (lt, tlist) |
| { |
| tle = (TargetEntry*)lfirst(lt); |
| if ( tle->ressortgroupref > max_sortgroupref ) |
| max_sortgroupref = tle->ressortgroupref; |
| } |
| |
| if (return_resno) |
| cols = (AttrNumber*) palloc(sizeof(AttrNumber) * num); |
| |
| foreach (lx, exprs) |
| { |
| Node *expr = (Node*)lfirst(lx); |
| matched_tle = NULL; |
| |
| foreach (lt, tlist) |
| { |
| tle = (TargetEntry*)lfirst(lt); |
| |
| if ( equal(expr, tle->expr) ) |
| { |
| matched_tle = tle; |
| break; |
| } |
| } |
| |
| if ( matched_tle == NULL ) |
| { |
| matched_tle = makeTargetEntry((Expr*) expr, |
| list_length(tlist) + 1, |
| NULL, |
| false); |
| tlist = lappend(tlist, matched_tle); |
| } |
| |
| if ( matched_tle->ressortgroupref == 0 ) |
| matched_tle->ressortgroupref = ++max_sortgroupref; |
| |
| if (return_resno) |
| cols[keyno++] = matched_tle->resno; |
| } |
| |
| if (return_resno) |
| { |
| *pnum = num; |
| *pcols = cols; |
| } |
| } |
| else |
| { |
| if (return_resno) |
| { |
| *pnum = 0; |
| *pcols = NULL; |
| } |
| } |
| |
| /* Note that result is a copy, possibly modified by appending expression |
| * targetlist entries and/or updating sortgroupref values. |
| */ |
| return tlist; |
| } |
| |
| /* |
| * Function describe_subplan_tlist (for single-phase aggregation) |
| * |
| * Implicitly return extra information about a supplied target list with having |
| * qual and and corresponding sub plan target list for single-phase aggregation. |
| * This does, essentially, what make_subplan_tlist does, but for a precalculated |
| * subplan target list. In particular |
| * |
| * - it constructs the grouping key -> subplan target list resno map. |
| * - it may extend the subplan targetlist for DQAs and record entries |
| * in the DQA argument -> subplan target list resno map. |
| * |
| * In the later case, the subplan target list may be extended, so return it. |
| * This function is for the case when a subplan target list (not a whole plan) |
| * is supplied to cdb_grouping_planner. |
| */ |
| List *describe_subplan_tlist(List *sub_tlist, |
| List *tlist, Node *havingQual, |
| List *grp_clauses, int *pnum_gkeys, AttrNumber **pcols_gkeys, |
| List *dqa_args, int *pnum_dqas, AttrNumber **pcols_dqas) |
| { |
| int nkeys; |
| AttrNumber *cols; |
| |
| nkeys = num_distcols_in_grouplist(grp_clauses); |
| if ( nkeys > 0 ) |
| { |
| List *tles; |
| ListCell *lc; |
| int keyno = 0; |
| |
| cols = (AttrNumber*)palloc0(sizeof(AttrNumber)*nkeys); |
| |
| tles = get_sortgroupclauses_tles(grp_clauses, tlist); |
| |
| foreach (lc, tles) |
| { |
| TargetEntry *tle; |
| TargetEntry *sub_tle; |
| |
| tle = (TargetEntry*)lfirst(lc); |
| sub_tle = tlist_member((Node*)tle->expr, sub_tlist); |
| Assert(tle->ressortgroupref != 0); |
| Assert(tle->ressortgroupref == sub_tle->ressortgroupref); |
| Assert(keyno < nkeys); |
| |
| cols[keyno++] = sub_tle->resno; |
| } |
| *pnum_gkeys = nkeys; |
| *pcols_gkeys = cols; |
| } |
| else |
| { |
| *pnum_gkeys = 0; |
| *pcols_gkeys = NULL; |
| } |
| |
| if ( dqa_args != NIL ) |
| sub_tlist = augment_subplan_tlist(sub_tlist, dqa_args, pnum_dqas, pcols_dqas, true); |
| |
| return sub_tlist; |
| } |
| |
| |
| /* |
| * Generate targetlist for a SubqueryScan node to wrap the stage-one |
| * Agg node (partial aggregation) of a 2-Stage aggregation sequence. |
| * |
| * varno: varno to use in generated Vars |
| * input_tlist: targetlist of this node's input node |
| * |
| * Result is a "flat" (all simple Var node) targetlist in which |
| * varattno and resno match and are sequential. |
| * |
| * This function also returns a map between the original targetlist |
| * entry to new target list entry using resno values. The index |
| * positions for resno_map represent the original resnos, while the |
| * array elements represent the new resnos. The array is allocated |
| * by the caller, which should have length of list_length(input_tlist). |
| */ |
| List * |
| generate_subquery_tlist(Index varno, List *input_tlist, |
| bool keep_resjunk, int **p_resno_map) |
| { |
| List *tlist = NIL; |
| int resno = 1; |
| ListCell *j; |
| TargetEntry *tle; |
| Node *expr; |
| |
| *p_resno_map = (int *)palloc0(list_length(input_tlist) * sizeof(int)); |
| |
| foreach(j, input_tlist) |
| { |
| TargetEntry *inputtle = (TargetEntry *) lfirst(j); |
| |
| Assert(inputtle->resno == resno && inputtle->resno >= 1); |
| |
| /* Don't pull up constants, always use a Var to reference the input. */ |
| expr = (Node *) makeVar(varno, |
| inputtle->resno, |
| exprType((Node *) inputtle->expr), |
| exprTypmod((Node *) inputtle->expr), |
| 0); |
| |
| (*p_resno_map)[inputtle->resno - 1] = resno; |
| |
| tle = makeTargetEntry((Expr *) expr, |
| (AttrNumber) resno++, |
| (inputtle->resname == NULL) ? |
| NULL : |
| pstrdup(inputtle->resname), |
| keep_resjunk ? inputtle->resjunk : false); |
| tle->ressortgroupref = inputtle->ressortgroupref; |
| tlist = lappend(tlist, tle); |
| } |
| |
| return tlist; |
| } |
| |
| |
| /* |
| * Function: cdbpathlocus_collocates |
| * |
| * Is a relation with the given locus guaranteed to collocate tuples with |
| * non-distinct values of the key. The key is a list of pathkeys (each of |
| * which is a list of PathKeyItem*). |
| * |
| * Collocation is guaranteed if the locus specifies a single process or |
| * if the result is partitioned on a subset of the keys that must be |
| * collocated. |
| * |
| * We ignore onther sorts of collocation, e.g., replication or partitioning |
| * on a range since these cannot occur at the moment (MPP 2.3). |
| */ |
| bool cdbpathlocus_collocates(CdbPathLocus locus, List *pathkeys, bool exact_match) |
| { |
| ListCell *lc; |
| List *exprs = NIL; |
| |
| if ( CdbPathLocus_IsBottleneck(locus) ) |
| return true; |
| |
| if ( !CdbPathLocus_IsHashed(locus) ) |
| return false; /* Or would HashedOJ ok, too? */ |
| |
| if (exact_match && list_length(pathkeys) != list_length(locus.partkey)) |
| { |
| return false; |
| } |
| |
| /* Extract a list of expressions from the pathkeys. Since the locus |
| * presumably knows all about attribute equivalence classes, we use |
| * only the first item in each input path key. |
| */ |
| foreach( lc, pathkeys ) |
| { |
| PathKeyItem *item; |
| List *lst = (List*)lfirst(lc); |
| Assert( list_length(lst) > 0 ); |
| item = (PathKeyItem*)linitial(lst); |
| exprs = lappend(exprs, item->key); |
| } |
| |
| /* Check for containment of locus in exprs. */ |
| return cdbpathlocus_is_hashed_on_exprs(locus, exprs); |
| } |
| |
| |
| /* |
| * Function: cdbpathlocus_from_flow |
| * |
| * Generate a locus from a flow. Since the information needed to produce |
| * canonical path keys is unavailable, this function will never return a |
| * hashed locus. |
| */ |
| CdbPathLocus cdbpathlocus_from_flow(Flow *flow) |
| { |
| CdbPathLocus locus; |
| |
| CdbPathLocus_MakeNull(&locus); |
| |
| if (!flow) |
| return locus; |
| |
| switch (flow->flotype) |
| { |
| case FLOW_SINGLETON: |
| if (flow->segindex == -1) |
| CdbPathLocus_MakeEntry(&locus); |
| else |
| CdbPathLocus_MakeSingleQE(&locus); |
| break; |
| case FLOW_REPLICATED: |
| CdbPathLocus_MakeReplicated(&locus); |
| break; |
| case FLOW_PARTITIONED: |
| CdbPathLocus_MakeStrewn(&locus); |
| break; |
| case FLOW_UNDEFINED: |
| default: |
| Insist(0); |
| } |
| return locus; |
| } |
| |
| /* |
| * Generate 3 target lists for a sequence of consecutive Agg nodes. |
| * |
| * This is intended for a sequence of consecutive Agg nodes used in |
| * a ROLLUP. '*p_tlist3' is for the upper Agg node, and '*p_tlist2' is |
| * for any Agg node in the middle, and '*p_tlist1' is for the |
| * bottom Agg node. |
| * |
| * '*p_tlist1' and '*p_tlist2' have similar target lists. '*p_tlist3' |
| * is constructed based on tlist and outputs from *p_tlist2 or |
| * '*p_tlist1' if twostage is true. |
| * |
| * NB This function is called externally (from plangroupext.c) and not |
| * used in this file! Beware: the API is now legacy here! |
| */ |
| void generate_three_tlists(List *tlist, |
| bool twostage, |
| List *sub_tlist, |
| Node *havingQual, |
| int numGroupCols, |
| AttrNumber *groupColIdx, |
| List **p_tlist1, |
| List **p_tlist2, |
| List **p_tlist3, |
| List **p_final_qual) |
| { |
| ListCell *lc; |
| int resno = 1; |
| |
| MppGroupContext ctx; /* Just for API matching! */ |
| |
| /* Similar to the final tlist entries in two-stage aggregation, |
| * we use consistent varno in the middle tlist entries. |
| */ |
| int middle_varno = 1; |
| |
| /* Generate the top and bottom tlists by calling the multi-phase |
| * aggregation code in cdbgroup.c. |
| */ |
| ctx.tlist = tlist; |
| ctx.sub_tlist = sub_tlist; |
| ctx.havingQual = havingQual; |
| ctx.numGroupCols = numGroupCols; |
| ctx.groupColIdx = groupColIdx; |
| ctx.numDistinctCols = 0; |
| ctx.distinctColIdx = NULL; |
| |
| generate_multi_stage_tlists(&ctx, |
| p_tlist1, |
| NULL, |
| p_tlist3, |
| p_final_qual); |
| |
| /* |
| * Read target entries in '*p_tlist1' one by one, and construct |
| * the entries for '*p_tlist2'. |
| */ |
| foreach (lc, *p_tlist1) |
| { |
| TargetEntry *tle = (TargetEntry *)lfirst(lc); |
| Expr *new_expr; |
| TargetEntry *new_tle; |
| |
| if (IsA(tle->expr, Aggref)) |
| { |
| Aggref *aggref = (Aggref *)tle->expr; |
| Aggref *new_aggref = makeNode(Aggref); |
| |
| new_aggref->aggfnoid = aggref->aggfnoid; |
| new_aggref->aggtype = aggref->aggtype; |
| new_aggref->args = |
| list_make1((Expr*)makeVar(middle_varno, tle->resno, aggref->aggtype, -1, 0)); |
| new_aggref->agglevelsup = 0; |
| new_aggref->aggstar = false; |
| new_aggref->aggdistinct = false; /* handled in preliminary aggregation */ |
| new_aggref->aggstage = AGGSTAGE_INTERMEDIATE; |
| |
| new_expr = (Expr *)new_aggref; |
| } |
| |
| else |
| { |
| /* Just make a new Var. */ |
| new_expr = (Expr *)makeVar(middle_varno, |
| tle->resno, |
| exprType((Node *)tle->expr), |
| exprTypmod((Node *)tle->expr), |
| 0); |
| |
| } |
| |
| new_tle = makeTargetEntry(new_expr, resno, |
| (tle->resname == NULL) ? |
| NULL : |
| pstrdup(tle->resname), |
| false); |
| new_tle->ressortgroupref = tle->ressortgroupref; |
| *p_tlist2 = lappend(*p_tlist2, new_tle); |
| |
| resno++; |
| } |
| |
| /* |
| * This may be called inside a two-stage aggregation. In this case, |
| * We want to make sure all entries in the '*p_tlist3' are visible. |
| */ |
| foreach (lc, *p_tlist3) |
| { |
| TargetEntry *tle = (TargetEntry *)lfirst(lc); |
| |
| if (twostage) |
| tle->resjunk = false; |
| |
| /* We also set aggstage to AGGSTAGE_INTERMEDIATE if this is in |
| * a two-stage aggregation, because the agg node in |
| * the second stage aggregation will do the finalize. |
| */ |
| if (twostage && IsA(tle->expr, Aggref)) |
| { |
| Aggref *aggref = (Aggref *)tle->expr; |
| aggref->aggstage = AGGSTAGE_INTERMEDIATE; |
| } |
| } |
| } |
| |
| |
| /* |
| * Function: generate_multi_stage_tlists |
| * |
| * Generate target lists and having qual for multi-stage aggregation. |
| * |
| * Input is |
| * |
| * From the ctx argument: |
| * tlist - the preprocessed target list of the original query |
| * sub_tlist - the reduced target list to use as input to the aggregation |
| * (If use_dqa_pruning, the all DQA arguments must appear in |
| * this list and must have non-zero sortgrouprefs.) |
| * havingQual - the preprocesses having qual of the originaly query |
| * (in list-of-conjunct-Exprs form) |
| * numGroupCols - number of grouping attributes (no grouping extensions) |
| * groupColIdx - resnos (= attr numbers) of the grouping attributes |
| * numDistinctCols - number of DISTINCT-qualified argument attributes |
| * distinctColIdx - resnos (= attr numbers) of the DQA argument attributes |
| * |
| * From use_dqa_pruning: |
| * Do we want to construct the tlists to support pruning DQAs? |
| * |
| * Output is pointers to |
| * |
| * prelim_tlist - the target list of the preliminary Agg node. |
| * inter_tlist - an optional intermediate target list for an Agg node |
| * used in multi-phase DQA pruning (p_inter_tlist non-null). |
| * final_tlist - the target list of the final Agg node. |
| * final_qual - the qual of the final Agg node. |
| */ |
| void generate_multi_stage_tlists(MppGroupContext *ctx, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist, |
| List **p_final_qual) |
| { |
| /* Use consistent varno in final and intermediate tlist entries. It will |
| * refer to the sole RTE (a Subquery RTE) of a SubqueryScan. */ |
| ctx->final_varno = 1; |
| |
| /* Do we need to build an intermediate tlist in irefs_tlist? */ |
| ctx->use_irefs_tlist = ( p_inter_tlist != NULL ); |
| |
| /* Don't do DQA pruning. Use prepare/generate_dqa_pruning_tlists! */ |
| ctx->use_dqa_pruning = false; |
| |
| deconstruct_agg_info(ctx); |
| reconstruct_agg_info(ctx, |
| p_prelim_tlist, p_inter_tlist, |
| p_final_tlist, p_final_qual); |
| } |
| |
| |
| /* |
| * Function: prepare_dqa_pruning_tlists |
| * |
| * Performs the first phase of generate_multi_phase_tlist, but waits for |
| * a subquent call to generate_dqa_pruning_tlists to actually produce the |
| * target list. (This allows for the fact that DQA pruning may involve |
| * several "coplans" each with its own target list requirements. This |
| * function lays the groundwork for all such target lists. |
| */ |
| void prepare_dqa_pruning_tlists(MppGroupContext *ctx) |
| { |
| /* Use consistent varno in final and intermediate tlist entries. It will |
| * refer to the sole RTE (a Subquery RTE) of a SubqueryScan. */ |
| ctx->final_varno = 1; |
| |
| /* Do we need to build an intermediate tlist in irefs_tlist? */ |
| ctx->use_irefs_tlist = true; |
| |
| /* Do we want to do DQA pruning (in case there are any DISTINCT-qualified |
| * aggregate functions)? */ |
| ctx->use_dqa_pruning = true; |
| |
| deconstruct_agg_info(ctx); |
| } |
| |
| /* |
| * Function: generate_dqa_pruning_tlists |
| * |
| * Performs the last phase of generate_multi_phase_tlist in the context of |
| * DQA pruning. |
| */ |
| void generate_dqa_pruning_tlists(MppGroupContext *ctx, |
| int dqa_index, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist, |
| List **p_final_qual) |
| { |
| Assert( p_inter_tlist != NULL ); /* optional elsewhere, required here. */ |
| Assert( ctx->use_dqa_pruning ); |
| |
| if ( ctx->numDistinctCols == 1 ) |
| { |
| /* Finalized results for single-DQA (join-less) plan. */ |
| reconstruct_agg_info(ctx, |
| p_prelim_tlist, |
| p_inter_tlist, |
| p_final_tlist, |
| p_final_qual); |
| } |
| else |
| { |
| /* Minimal results for multi-DQA (join) plan. */ |
| reconstruct_coplan_info(ctx, |
| dqa_index, |
| p_prelim_tlist, |
| p_inter_tlist, |
| p_final_tlist); |
| *p_final_qual = NIL; |
| } |
| } |
| |
| /* Function: deconstruct_agg_info |
| * |
| * Top-level deconstruction of the target list and having qual of an |
| * aggregate Query into intermediate structures that will later guide |
| * reconstruction of the various target lists and expressions involved |
| * in a multi-phase aggregation plan, possibly with DISTINCT-qualified |
| * aggregate functions (DQAs). |
| */ |
| void deconstruct_agg_info(MppGroupContext *ctx) |
| { |
| int i; |
| ListCell *lc; |
| |
| /* Initialize temporaries to hold the parts of the preliminary target |
| * list under construction. */ |
| ctx->grps_tlist = NIL; |
| ctx->dqa_tlist = NIL; |
| ctx->prefs_tlist = NIL; |
| ctx->irefs_tlist = NIL; |
| ctx->frefs_tlist = NIL; |
| ctx->dref_tlists = NULL; |
| ctx->fin_tlist = NIL; |
| ctx->fin_hqual = NIL; |
| |
| /* |
| * Begin constructing the target list for the preliminary Agg node |
| * by placing targets for the grouping attributes on the grps_tlist |
| * temporary. Make sure ressortgroupref matches the original. Copying |
| * the expression may be overkill, but it is safe. |
| */ |
| for ( i = 0; i < ctx->numGroupCols; i++ ) |
| { |
| TargetEntry *sub_tle, *prelim_tle; |
| |
| sub_tle = get_tle_by_resno(ctx->sub_tlist, ctx->groupColIdx[i]); |
| prelim_tle = makeTargetEntry(copyObject(sub_tle->expr), |
| list_length(ctx->grps_tlist) + 1, |
| (sub_tle->resname == NULL) ? |
| NULL : |
| pstrdup(sub_tle->resname), |
| false); |
| prelim_tle->ressortgroupref = sub_tle->ressortgroupref; |
| prelim_tle->resjunk = false; |
| ctx->grps_tlist = lappend(ctx->grps_tlist, prelim_tle); |
| } |
| |
| /* |
| * Continue to construct the target list for the preliminary Agg node |
| * by placing targets for the argument attribute of each DQA on the |
| * dqa_tlist temporary. Make sure ressortgroupref matches the original. |
| */ |
| for ( i = 0; i < ctx->numDistinctCols; i++ ) |
| { |
| TargetEntry *sub_tle, *prelim_tle; |
| |
| sub_tle = get_tle_by_resno(ctx->sub_tlist, ctx->distinctColIdx[i]); |
| prelim_tle = makeTargetEntry(copyObject(sub_tle->expr), |
| list_length(ctx->dqa_tlist) + 1, |
| (sub_tle->resname == NULL) ? |
| NULL : |
| pstrdup(sub_tle->resname), |
| false); |
| prelim_tle->ressortgroupref = sub_tle->ressortgroupref; |
| prelim_tle->resjunk = false; |
| ctx->dqa_tlist = lappend(ctx->dqa_tlist, prelim_tle); |
| } |
| |
| /* Initialize the array of Aggref target lists corresponding to the |
| * DQA argument target list just constructed. |
| */ |
| ctx->dref_tlists = (List **)palloc0(ctx->numDistinctCols * sizeof(List*)); |
| |
| /* |
| * Derive the final target list with entries corresponding to the input |
| * target list, but referring to the attributes of the preliminary target |
| * list rather than to the input attributes. Note that this involves |
| * augmenting the prefs_tlist temporary as we encounter new Aggref nodes. |
| */ |
| foreach (lc, ctx->tlist) |
| { |
| TargetEntry *tle, *final_tle; |
| Expr *expr; |
| |
| tle = (TargetEntry*)lfirst(lc); |
| ctx->split_aggref_sortgroupref = tle->ressortgroupref; /* for deconstruction subroutines */ |
| expr = deconstruct_expr(tle->expr, ctx); |
| ctx->split_aggref_sortgroupref = 0; |
| final_tle = makeTargetEntry(expr, |
| tle->resno, |
| (tle->resname == NULL) ? |
| NULL : |
| pstrdup(tle->resname), |
| tle->resjunk); |
| final_tle->ressortgroupref = tle->ressortgroupref; |
| ctx->fin_tlist = lappend(ctx->fin_tlist, final_tle); |
| } |
| |
| /* |
| * Derive the final qual while augmenting the preliminary target list. */ |
| ctx->fin_hqual = (List*)deconstruct_expr((Expr*)ctx->havingQual, ctx); |
| |
| |
| /* Now cache some values to avoid repeated recalculation by subroutines. */ |
| |
| /* Use consistent varno in final, intermediate an join tlist entries. |
| * final refers to the sole RTE (a Subquery RTE) of a SubqueryScan. |
| * outer and inner to the respective inputs to a join. |
| */ |
| ctx->final_varno = 1; |
| ctx->outer_varno = OUTER; |
| ctx->inner_varno = INNER; |
| |
| /* Target lists used in multi-phase planning at or above the level |
| * of individual DQA coplans have one of the forms |
| * |
| * [G][D0...Dn][R] |
| * where |
| * G represents the grouping key attributes (if any) |
| * Di represents the results of the DQAs that take the i-th |
| * unique DQA argument (if any) |
| * R represents the results of regular aggregate functions (if any) |
| * |
| * The offset at index position i is the number of attributes that |
| * precede the first for the i-th DQA (index origin 0). The last |
| * is the offset of the first attribute following the DQA attributes. |
| */ |
| ctx->dqa_offsets = palloc(sizeof(int) * (1 + ctx->numDistinctCols)); |
| ctx->dqa_offsets[0] = ctx->numGroupCols; |
| for ( i = 0; i < ctx->numDistinctCols; i++ ) |
| { |
| ctx->dqa_offsets[i+1] = ctx->dqa_offsets[i] |
| + list_length(ctx->dref_tlists[i]); |
| } |
| } |
| |
| /* Function: reconstruct_agg_info |
| * |
| * Construct the preliminay, optional intermediate, and final target lists |
| * and the final having qual for the aggregation plan. If we are doing |
| * DQA pruning, this function is appropriate only for the cases of 0 or 1 |
| * DQA. |
| * |
| * During processing we set ctx->top_tlist to be the flat target list |
| * containing only the grouping key and the results of individual aggregate |
| * functions. This list is transient -- it drives the production of the |
| * final target list and having qual through finalize_split_expression. |
| */ |
| void reconstruct_agg_info(MppGroupContext *ctx, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist, |
| List **p_final_qual) |
| { |
| List *prelim_tlist = NIL; |
| List *inter_tlist = NIL; |
| List *final_tlist = NIL; |
| |
| /* Grouping keys */ |
| |
| prelim_tlist = ctx->grps_tlist; |
| if ( p_inter_tlist != NULL ) |
| inter_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0); |
| final_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0); |
| |
| /* If applicable, single DQA argument, corresponding DQAs */ |
| |
| if ( ctx->use_dqa_pruning ) |
| { |
| if ( list_length(ctx->dqa_tlist) == 1 ) |
| { |
| int n = list_length(prelim_tlist); |
| TargetEntry *tle = (TargetEntry*)linitial(ctx->dqa_tlist); |
| tle->resno = n+1; |
| |
| prelim_tlist = lappend(prelim_tlist, tle); |
| if ( p_inter_tlist != NULL ) |
| { |
| inter_tlist = list_concat(inter_tlist, |
| make_vars_tlist(ctx->dqa_tlist, |
| ctx->final_varno, n)); |
| } |
| final_tlist = seq_tlist_concat(final_tlist, ctx->dref_tlists[0]); |
| } |
| else if ( list_length(ctx->dqa_tlist) != 0 ) |
| { |
| /* Shouldn't use this function for multi-DQA pruning. */ |
| elog(ERROR,"Unexpected use of DISTINCT-qualified aggregate pruning"); |
| } |
| } |
| |
| /* Aggrefs */ |
| |
| prelim_tlist = seq_tlist_concat(prelim_tlist, ctx->prefs_tlist); |
| if ( p_inter_tlist != NULL ) |
| { |
| inter_tlist = seq_tlist_concat(inter_tlist, ctx->irefs_tlist); |
| } |
| final_tlist = seq_tlist_concat(final_tlist, ctx->frefs_tlist); |
| |
| /* Set implicit results */ |
| |
| *p_prelim_tlist = prelim_tlist; |
| if ( p_inter_tlist != NULL ) |
| *p_inter_tlist = inter_tlist; |
| |
| ctx->top_tlist = final_tlist; |
| |
| *p_final_tlist = (List*) finalize_split_expr((Node*) ctx->fin_tlist, ctx); |
| *p_final_qual = (List*) finalize_split_expr((Node*) ctx->fin_hqual, ctx); |
| } |
| |
| /* Function: reconstruct_coplan_info |
| * |
| * Construct the preliminary, intermediate, and final target lists |
| * for the DQA pruning aggregation coplan specified by dqa_index. |
| * |
| * Note: Similar to reconstruct_agg_info but stop short of finalization |
| * and is sensitive to dqa_index. Ordinarily this function would |
| * be used only for multiple-DQA planning. |
| */ |
| void reconstruct_coplan_info(MppGroupContext *ctx, |
| int dqa_index, |
| List **p_prelim_tlist, |
| List **p_inter_tlist, |
| List **p_final_tlist) |
| { |
| List *prelim_tlist = NIL; |
| List *inter_tlist = NIL; |
| List *final_tlist = NIL; |
| |
| int n; |
| TargetEntry *tle; |
| |
| /* Grouping keys */ |
| |
| prelim_tlist = copyObject(ctx->grps_tlist); |
| if ( p_inter_tlist != NULL ) |
| inter_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0); |
| final_tlist = make_vars_tlist(ctx->grps_tlist, ctx->final_varno, 0); |
| |
| /* Single DQA argument, corresponding DQAs */ |
| |
| Assert ( ctx->use_dqa_pruning ); |
| |
| n = list_length(prelim_tlist); |
| tle = (TargetEntry*)list_nth(ctx->dqa_tlist, dqa_index); |
| tle->resno = n+1; |
| |
| prelim_tlist = lappend(prelim_tlist, tle); |
| if ( p_inter_tlist != NULL ) |
| { |
| List *x = list_make1(tle); |
| inter_tlist = list_concat(inter_tlist, |
| make_vars_tlist(x, ctx->final_varno, n)); |
| list_free(x); |
| } |
| final_tlist = seq_tlist_concat(final_tlist, ctx->dref_tlists[dqa_index]); |
| |
| |
| /* Plain Aggrefs go only on the first coplan! */ |
| if ( dqa_index == 0 ) |
| { |
| prelim_tlist = seq_tlist_concat(prelim_tlist, ctx->prefs_tlist); |
| if ( p_inter_tlist != NULL ) |
| { |
| inter_tlist = seq_tlist_concat(inter_tlist, ctx->irefs_tlist); |
| } |
| final_tlist = seq_tlist_concat(final_tlist, ctx->frefs_tlist); |
| } |
| |
| /* Set implicit results */ |
| |
| *p_prelim_tlist = prelim_tlist; |
| if ( p_inter_tlist != NULL ) |
| { |
| *p_inter_tlist = inter_tlist; |
| } |
| *p_final_tlist = final_tlist; |
| } |
| |
| |
| /* |
| * Function: deconstruct_expr |
| * |
| * Prepare an expression for execution within 2-stage aggregation. |
| * This involves adding targets as needed to the target list of the |
| * first (partial) aggregation and referring to this target list from |
| * the modified expression for use in the second (final) aggregation. |
| */ |
| Expr *deconstruct_expr(Expr *expr, MppGroupContext *ctx) |
| { |
| return (Expr*)deconstruct_expr_mutator((Node*)expr, ctx); |
| } |
| |
| /* |
| * Function: deconstruct_expr_mutator |
| * |
| * Work for deconstruct_expr. |
| */ |
| Node* deconstruct_expr_mutator(Node *node, MppGroupContext *ctx) |
| { |
| TargetEntry *tle; |
| |
| if (node == NULL) |
| return NULL; |
| |
| if (IsA(node, Aggref)) |
| { |
| Aggref *aggref = (Aggref*)node; |
| return split_aggref(aggref, ctx); |
| } |
| |
| /* If the given expression is a grouping expression, replace it with |
| * a Var node referring to the (lower) preliminary aggregation's |
| * target list. |
| */ |
| tle = tlist_member_with_ressortgroupref(node, ctx->grps_tlist, ctx->split_aggref_sortgroupref); |
| if ( tle != NULL ) |
| { |
| return (Node*) makeVar(grp_varno, tle->resno, |
| exprType((Node*)tle->expr), |
| exprTypmod((Node*)tle->expr), 0); |
| } |
| |
| return expression_tree_mutator(node, deconstruct_expr_mutator, (void*)ctx); |
| } |
| |
| |
| /* |
| * Function: split_aggref |
| * |
| * Find or add a partial-stage Aggref expression for the argument in the |
| * preliminary target list under construction. Return the final-stage |
| * Aggref with a single Var node argument referring to the partial-stage |
| * Aggref. In case of a DQA argument reduction, however, there is no |
| * partial-stage Aggref and the final-stage Aggref differs from the original |
| * in that (1) it does not specify DISTINCT and (2) it refers to its arguments |
| * via Vars on the lower range. |
| * |
| * For the normal 2-phase case: |
| * |
| * Note that he result type of the partial-stage Aggref will be the |
| * transition type of the aggregate function. |
| * |
| * At execution, the function used to compute the transition type in the |
| * lower aggregation will be the normal accumulator function for the |
| * aggregate function. The final function, however, will not be used. |
| * The result will be the ending transition value. |
| * |
| * At execution, the function used to compute the transition type in the |
| * upper aggregation will be the prelim function (known to exist, else |
| * we would have rejected 2-stage aggregation as a strategy) on input |
| * values of the transition type. The normal accumulator function for the |
| * aggregate function will not be used. The normal final function will |
| * be used to convert the ending transition value to the result type. |
| * aggregation |
| */ |
| Node *split_aggref(Aggref *aggref, MppGroupContext *ctx) |
| { |
| ListCell *cell; |
| Node *final_node; |
| Oid transtype = InvalidOid; |
| AttrNumber attrno = OUTER; |
| TargetEntry *prelim_tle = NULL; |
| |
| Assert(aggref != NULL && aggref->agglevelsup == 0); |
| |
| if ( aggref->aggdistinct && ctx->use_dqa_pruning ) |
| { |
| Index arg_attno; |
| Index dqa_attno; |
| TargetEntry *dqa_tle = NULL; |
| TargetEntry *arg_tle; |
| List *dref_tlist = NIL; |
| |
| /* First find the DQA argument. Since this is a DQA, its argument |
| * list must contain a single expression that matches one of the |
| * target expressions in ctx->dqa_tlist. |
| */ |
| arg_tle = NULL; |
| if ( list_length(aggref->args) == 1 ) /* safer than Assert */ |
| { |
| arg_tle = tlist_member(linitial(aggref->args), ctx->dqa_tlist); |
| } |
| if (arg_tle == NULL) |
| elog(ERROR,"Unexpected use of DISTINCT-qualified aggregation"); |
| arg_attno = arg_tle->resno; /* [1..numDistinctCols] */ |
| |
| /* We may have seen a DQA just like this one already. Look for |
| * one in the distinct Aggref target list to date. |
| */ |
| dref_tlist = ctx->dref_tlists[arg_attno - 1]; |
| dqa_attno = 1; |
| foreach( cell, dref_tlist ) |
| { |
| TargetEntry *tle = (TargetEntry*)lfirst(cell); |
| Aggref *ref = (Aggref*)tle->expr; |
| |
| /* Check similarity, avoid aggtype and aggstage |
| * (which we control) and don't bother with agglevelsup |
| * (which is always 0 here) or aggdistinct. |
| */ |
| if ( aggref->aggfnoid == ref->aggfnoid |
| && aggref->aggstar == ref->aggstar |
| && equal(aggref->args, ref->args) ) |
| { |
| dqa_tle = tle; |
| break; |
| } |
| dqa_attno++; |
| } |
| |
| if ( dqa_tle == NULL ) |
| { |
| /* Didn't find a target for the DQA Aggref so make a new one. |
| */ |
| Var *arg_var; |
| Aggref *dqa_aggref; |
| |
| arg_var = makeVar(ctx->final_varno, ctx->numGroupCols + 1, |
| exprType(linitial(aggref->args)), |
| exprTypmod(linitial(aggref->args)), |
| 0); |
| |
| dqa_aggref = makeNode(Aggref); |
| memcpy(dqa_aggref, aggref, sizeof(Aggref)); /* flat copy */ |
| dqa_aggref->args = list_make1(arg_var); |
| dqa_aggref->aggdistinct = false; |
| |
| dqa_tle = makeTargetEntry((Expr*)dqa_aggref, dqa_attno, NULL, false); |
| dref_tlist = lappend(dref_tlist, dqa_tle); |
| } |
| ctx->dref_tlists[arg_attno-1] = dref_tlist; |
| |
| /* Make the "final" target for the DQA case, a reference to the |
| * DQA Aggref we just found or constructed. |
| */ |
| final_node = (Node*) makeVar(dqa_base_varno + arg_attno - 1, |
| dqa_attno, |
| exprType((Node*)arg_tle->expr), |
| exprTypmod((Node*)arg_tle->expr), |
| 0); |
| } |
| else /* Ordinary Aggref -or- DQA but ctx->use_dqa_pruning is off. */ |
| { |
| Aggref *pref; |
| Aggref *iref; |
| Aggref *fref; |
| |
| /* |
| * We may have seen an Aggref just like this one already. Look for |
| * the preliminary form of such in the preliminary Aggref target |
| * list to date. |
| */ |
| foreach( cell, ctx->prefs_tlist ) |
| { |
| TargetEntry *tle = (TargetEntry*)lfirst(cell); |
| Aggref *ref = (Aggref*)tle->expr; |
| |
| /* Check similarity, avoid aggtype and aggstage |
| * (which we control) and don't bother with agglevelsup |
| * (which is always 0 here). |
| */ |
| if ( aggref->aggfnoid == ref->aggfnoid |
| && aggref->aggstar == ref->aggstar |
| && aggref->aggdistinct == ref->aggdistinct |
| && equal(aggref->args, ref->args) ) |
| { |
| prelim_tle = tle; |
| transtype = ref->aggtype; |
| attrno = prelim_tle->resno; |
| break; |
| } |
| } |
| |
| /* |
| * If no existing preliminary Aggref target matched, add one that does. |
| */ |
| if ( prelim_tle == NULL ) |
| { |
| TargetEntry *final_tle; |
| Var *args; |
| |
| /* Get type information for the Aggref */ |
| transtype = lookup_agg_transtype(aggref); |
| |
| /* Make a new preliminary Aggref wrapped as a new target entry. |
| * Like the input Aggref, the preliminary refers to the lower |
| * range. */ |
| pref = (Aggref*)copyObject(aggref); |
| pref->aggtype = transtype; |
| pref->aggstage = AGGSTAGE_PARTIAL; |
| |
| attrno = 1 + list_length(ctx->prefs_tlist); |
| prelim_tle = makeTargetEntry((Expr*)pref, attrno, NULL, false); |
| prelim_tle->ressortgroupref = ctx->split_aggref_sortgroupref; |
| ctx->prefs_tlist = lappend(ctx->prefs_tlist, prelim_tle); |
| |
| args = makeVar(ctx->final_varno, |
| ctx->numGroupCols |
| + (ctx->use_dqa_pruning ? 1 : 0) |
| + attrno, |
| transtype, -1, 0); |
| |
| if ( ctx->use_irefs_tlist ) |
| { |
| TargetEntry *inter_tle; |
| |
| iref = makeNode(Aggref); |
| iref->aggfnoid = pref->aggfnoid; |
| iref->aggtype = transtype; |
| iref->args = list_make1((Expr*)copyObject(args)); |
| iref->agglevelsup = 0; |
| iref->aggstar = false; |
| iref->aggdistinct = false; |
| iref->aggstage = AGGSTAGE_INTERMEDIATE; |
| |
| inter_tle = makeTargetEntry((Expr*)iref, attrno, NULL, false); |
| inter_tle->ressortgroupref = ctx->split_aggref_sortgroupref; |
| ctx->irefs_tlist = lappend(ctx->irefs_tlist, inter_tle); |
| } |
| |
| /* Make a new final Aggref. */ |
| fref = makeNode(Aggref); |
| |
| fref->aggfnoid = aggref->aggfnoid; |
| fref->aggtype = aggref->aggtype; |
| fref->args = list_make1((Expr*)args); |
| fref->agglevelsup = 0; |
| fref->aggstar = false; |
| fref->aggdistinct = false; /* handled in preliminary aggregation */ |
| fref->aggstage = AGGSTAGE_FINAL; |
| final_tle = makeTargetEntry((Expr*)fref, attrno, NULL, false); |
| final_tle->ressortgroupref = ctx->split_aggref_sortgroupref; |
| ctx->frefs_tlist = lappend(ctx->frefs_tlist, final_tle); |
| } |
| |
| final_node = (Node*)makeVar(ref_varno, attrno, aggref->aggtype, -1, 0); |
| } |
| |
| return final_node; |
| } |
| |
| |
| /* Function: make_vars_tlist |
| * |
| * Make a targetlist similar to the given length n tlist but consisting of |
| * simple Var nodes with the given varno and varattno in offset + [1..N]. |
| */ |
| List *make_vars_tlist(List *tlist, Index varno, AttrNumber offset) |
| { |
| List *new_tlist = NIL; |
| AttrNumber attno = offset; |
| ListCell *lc; |
| |
| foreach (lc, tlist) |
| { |
| Var *new_var; |
| TargetEntry *new_tle; |
| |
| TargetEntry *tle = (TargetEntry*)lfirst(lc); |
| |
| attno++; |
| |
| new_var = makeVar(varno, attno, |
| exprType((Node*)tle->expr), |
| exprTypmod((Node*)tle->expr), 0); |
| |
| new_tle = makeTargetEntry((Expr*)new_var, |
| attno, /* resno always matches attnr */ |
| (tle->resname == NULL) ? NULL : pstrdup(tle->resname), |
| false); |
| new_tle->ressortgroupref = tle->ressortgroupref; |
| |
| new_tlist = lappend(new_tlist, new_tle); |
| } |
| return new_tlist; |
| } |
| |
| /* Function: seq_tlist_concat |
| * |
| * Concatenates tlist2 to the end of tlist1 adjusting the resno values |
| * of tlist2 so that the resulting entries have resno = position+1. |
| * The resno values of tlist1 must be dense from 1 to the length of |
| * the list. (They are sequential by position, though this is not |
| * strictly required. |
| * |
| * May modify tlist1 in place (to adjust last link and length). Does not |
| * modify tlist2, but the result shares structure below the TargetEntry |
| * nodes. |
| */ |
| List *seq_tlist_concat(List *tlist1, List *tlist2) |
| { |
| ListCell *lc; |
| AttrNumber high_attno = list_length(tlist1); |
| |
| foreach (lc, tlist2) |
| { |
| TargetEntry *tle= (TargetEntry*)lfirst(lc); |
| TargetEntry *new_tle = (TargetEntry*)makeNode(TargetEntry); |
| memcpy(new_tle, tle, sizeof(TargetEntry)); |
| new_tle->resno = ++high_attno; |
| tlist1 = lappend(tlist1, new_tle); |
| } |
| return tlist1; |
| } |
| |
| /* Function finalize_split_expr |
| * |
| * Note: Only called on the top of the "join" tree, so all D_i are |
| * included in attribute offset calculations. |
| */ |
| Node *finalize_split_expr(Node *expr, MppGroupContext *ctx) |
| { |
| return finalize_split_expr_mutator(expr, ctx); |
| } |
| |
| /* Mutator subroutine for finalize_split_expr() replaces pseudo Var nodes |
| * produced by split_aggref() with the similarly typed expression found in |
| * the top-level targetlist, ctx->top_tlist, being finalized. |
| * |
| * For example, a pseudo Var node that represents the 3rd DQA for the |
| * 2nd DQA argument will be replaced by the targetlist expression that |
| * corresponds to that DQA. |
| */ |
| Node* finalize_split_expr_mutator(Node *node, MppGroupContext *ctx) |
| { |
| if (node == NULL) |
| return NULL; |
| |
| if (IsA(node, Var)) |
| { |
| AttrNumber attrno=(AttrNumber)0; |
| TargetEntry *tle; |
| |
| Var *pseudoVar = (Var*)node; |
| |
| if ( pseudoVar->varno == grp_varno ) |
| { |
| attrno = pseudoVar->varattno; |
| } |
| else if ( pseudoVar->varno == ref_varno ) |
| { |
| if ( ctx->use_dqa_pruning ) |
| { |
| attrno = ctx->dqa_offsets[ctx->numDistinctCols] |
| + pseudoVar->varattno; |
| } |
| else |
| { |
| attrno = ctx->numGroupCols + pseudoVar->varattno; |
| } |
| } |
| else if ( pseudoVar->varno >= dqa_base_varno && ctx->use_dqa_pruning ) |
| { |
| int i = pseudoVar->varno - dqa_base_varno; |
| attrno = ctx->dqa_offsets[i] + pseudoVar->varattno; |
| } |
| else |
| { |
| elog(ERROR,"Unexpected failure of multi-phase aggregation planning"); |
| } |
| |
| tle = (TargetEntry*) list_nth(ctx->top_tlist, attrno - 1); |
| |
| return (Node*) tle->expr; |
| } |
| |
| return expression_tree_mutator(node, |
| finalize_split_expr_mutator, |
| (void*)ctx); |
| } |
| |
| |
| /* Function lookup_agg_transtype |
| * |
| * Return the transition type Oid of the given aggregate fuction or throw |
| * an error, if none. |
| */ |
| Oid lookup_agg_transtype(Aggref *aggref) |
| { |
| Oid aggid = aggref->aggfnoid; |
| Oid result; |
| int fetchCount; |
| |
| /* XXX: would have been get_agg_transtype() */ |
| result = caql_getoid_plus( |
| NULL, |
| &fetchCount, |
| NULL, |
| cql("SELECT aggtranstype FROM pg_aggregate " |
| " WHERE aggfnoid = :1 ", |
| ObjectIdGetDatum(aggid))); |
| |
| if (!fetchCount) |
| elog(ERROR, "cache lookup failed for aggregate %u", aggid); |
| |
| return result; |
| } |
| |
| /* Function: adapt_flow_to_targetlist |
| * |
| * Sometimes we replace the targetlist (especially subplan of an Agg). |
| * In this case, we need to assure that any hash expressions in the |
| * node's flow remain in the targetlist OR null out the hash leaving, |
| * in effect, a strewn flow. |
| */ |
| void adapt_flow_to_targetlist(Plan *plan) |
| { |
| ListCell *c; |
| Flow *flow; |
| |
| Assert(plan != NULL); |
| flow = plan->flow; |
| |
| Assert(flow); |
| |
| foreach(c, flow->hashExpr) |
| { |
| Node *expr = (Node*)lfirst(c); |
| if (!tlist_member(expr, plan->targetlist)) |
| { |
| flow->hashExpr = NIL; |
| break; |
| } |
| } |
| } |
| |
| /** |
| * Update the scatter clause before a query tree's targetlist is about to |
| * be modified. The scatter clause of a query tree will correspond to |
| * old targetlist entries. If the query tree is modified and the targetlist |
| * is to be modified, we must call this method to ensure that the scatter clause |
| * is kept in sync with the new targetlist. |
| */ |
| void UpdateScatterClause(Query *query, List *newtlist) |
| { |
| Assert(query); |
| Assert(query->targetList); |
| Assert(newtlist); |
| |
| if (query->scatterClause |
| && list_nth(query->scatterClause, 0) != NULL /* scattered randomly */ |
| ) |
| { |
| Assert(list_length(query->targetList) == list_length(newtlist)); |
| List *scatterClause = NIL; |
| ListCell *lc = NULL; |
| foreach (lc, query->scatterClause) |
| { |
| Expr *o = (Expr *) lfirst(lc); |
| Assert(o); |
| TargetEntry *tle = tlist_member((Node *) o, query->targetList); |
| Assert(tle); |
| TargetEntry *ntle = list_nth(newtlist, tle->resno - 1); |
| scatterClause = lappend(scatterClause, copyObject(ntle->expr)); |
| } |
| query->scatterClause = scatterClause; |
| } |
| } |
| |
| /* |
| * Function: add_second_stage_agg |
| * |
| * Add an Agg/Group node on top of an Agg/Group node. We add a SubqueryScan |
| * node on top of the existing Agg/Group node before adding the new Agg/Group |
| * node. |
| * |
| * Params: |
| * is_agg -- indicate to add an Agg or a Group node. |
| * prelim_tlist -- the targetlist for the existing Agg/Group node. |
| * final_tlist -- the targetlist for the new Agg/Group node. |
| */ |
| Plan * |
| add_second_stage_agg(PlannerInfo *root, |
| bool is_agg, |
| List *lower_tlist, |
| List *upper_tlist, |
| List *upper_qual, |
| AggStrategy aggstrategy, |
| int numGroupCols, |
| AttrNumber *prelimGroupColIdx, |
| int num_nullcols, |
| uint64 input_grouping, |
| uint64 grouping, |
| int rollup_gs_times, |
| double numGroups, |
| int numAggs, |
| int transSpace, |
| const char *alias, |
| List **p_current_pathkeys, |
| Plan *result_plan, |
| bool use_root, |
| bool adjust_scatter) |
| { |
| Query *parse = root->parse; |
| Query *subquery; |
| List *newrtable; |
| RangeTblEntry *newrte; |
| RangeTblRef *newrtref; |
| Plan *agg_node; |
| |
| /* |
| * Add a SubqueryScan node to renumber the range of the query. |
| * |
| * The result of the preliminary aggregation (represented by lower_tlist) |
| * may contain targets with no representatives in the range of its outer |
| * relation. We resolve this be treating the preliminary aggregation as |
| * a subquery. |
| * |
| * However, this breaks the correspondence between the Plan tree and |
| * the Query tree that is assumed by the later call to set_plan_references |
| * as well as by the deparse processing used (e.g.) in EXPLAIN. |
| * |
| * So we also push the Query node from the root structure down into a new |
| * subquery RTE and scribble over the original Query node to make it into |
| * a simple SELECT * FROM a Subquery RTE. |
| * |
| * Note that the Agg phase we add below will refer to the attributes of |
| * the result of this new SubqueryScan plan node. It is up to the caller |
| * to set up upper_tlist and upper_qual accordingly. |
| */ |
| |
| /* Flat-copy the root query into a newly allocated Query node and adjust |
| * its target list and having qual to match the lower (existing) Agg |
| * plan we're about to make into a SubqueryScan. |
| */ |
| subquery = copyObject(parse); |
| |
| subquery->targetList = copyObject(lower_tlist); |
| subquery->havingQual = NULL; |
| |
| /* Subquery attributes shouldn't be marked as junk, else they'll be |
| * skipped by addRangeTableEntryForSubquery. */ |
| { |
| ListCell *cell; |
| |
| foreach ( cell, subquery->targetList ) |
| { |
| TargetEntry *tle = (TargetEntry *)lfirst(cell); |
| tle->resjunk = false; |
| if ( tle->resname == NULL ) |
| { |
| if ( use_root && IsA(tle->expr, Var) ) |
| { |
| Var *var = (Var*)tle->expr; |
| RangeTblEntry *rte = rt_fetch(var->varno, root->parse->rtable); |
| tle->resname = pstrdup(get_rte_attribute_name(rte, var->varattno)); |
| } |
| else |
| { |
| const char *fmt = "unnamed_attr_%d"; |
| char buf[32]; /* big enuf for fmt */ |
| sprintf(buf,fmt,tle->resno); |
| tle->resname = pstrdup(buf); |
| } |
| } |
| } |
| } |
| |
| /* Construct a range table entry referring to it. */ |
| newrte = addRangeTableEntryForSubquery(NULL, |
| subquery, |
| makeAlias(alias, NULL), |
| TRUE); |
| newrtable = list_make1(newrte); |
| |
| /* Modify the root query in place to look like its range table is |
| * a simple Subquery. */ |
| parse->querySource = QSRC_PLANNER; /* but remember it's really ours */ |
| parse->rtable = newrtable; |
| parse->jointree = makeNode(FromExpr); |
| newrtref = makeNode(RangeTblRef); |
| newrtref->rtindex = 1; |
| parse->jointree->fromlist = list_make1(newrtref); |
| parse->jointree->quals = NULL; |
| parse->rowMarks = NIL; |
| |
| /* <EXECUTE s> uses parse->targetList to derive the portal's tupDesc, |
| * so when use_root is true, the caller owns the responsibility to make |
| * sure it ends up in an appropriate form at the end of planning. |
| */ |
| if ( use_root ) |
| { |
| if (adjust_scatter) |
| { |
| UpdateScatterClause(parse, upper_tlist); |
| } |
| parse->targetList = copyObject(upper_tlist); /* Match range. */ |
| } |
| |
| result_plan = add_subqueryscan(root, p_current_pathkeys, 1, subquery, result_plan); |
| |
| /* Add an Agg node */ |
| /* convert current_numGroups to long int */ |
| long lNumGroups = (long) Min(numGroups, (double) LONG_MAX); |
| |
| agg_node = (Plan *)make_agg(root, |
| upper_tlist, |
| upper_qual, |
| aggstrategy, false, |
| numGroupCols, |
| prelimGroupColIdx, |
| lNumGroups, |
| num_nullcols, |
| input_grouping, |
| grouping, |
| rollup_gs_times, |
| numAggs, |
| transSpace, |
| result_plan); |
| |
| /* |
| * Agg will not change the sort order unless it is hashed. |
| */ |
| agg_node->flow = pull_up_Flow(agg_node, |
| agg_node->lefttree, |
| (*p_current_pathkeys != NIL) |
| && aggstrategy != AGG_HASHED ); |
| |
| /* |
| * Since the rtable has changed, we had better recreate a RelOptInfo entry for it. |
| */ |
| if (root->simple_rel_array) |
| pfree(root->simple_rel_array); |
| root->simple_rel_array_size = list_length(parse->rtable) + 1; |
| root->simple_rel_array = (RelOptInfo **) |
| palloc0(root->simple_rel_array_size * sizeof(RelOptInfo *)); |
| build_simple_rel(root, 1, RELOPT_BASEREL); |
| |
| return agg_node; |
| } |
| |
| |
| /* |
| * Add a SubqueryScan node to the input plan and maintain the given |
| * pathkeys by making adjustments to them and to the equivalence class |
| * information in root. |
| * |
| * Note that submerging a plan into a subquery scan will require changes |
| * to the range table and to any expressions above the new scan node. |
| * This is the caller's responsibility since the nature of the changes |
| * depends on the context in which the subquery is used. |
| */ |
| Plan* add_subqueryscan(PlannerInfo* root, List **p_pathkeys, |
| Index varno, Query *subquery, Plan *subplan) |
| { |
| List *subplan_tlist; |
| int *resno_map; |
| |
| subplan_tlist = generate_subquery_tlist(varno, subquery->targetList, |
| false, &resno_map); |
| |
| subplan = (Plan*)make_subqueryscan(root, subplan_tlist, |
| NIL, |
| varno, /* scanrelid (= varno) */ |
| subplan, |
| subquery->rtable); |
| |
| mark_passthru_locus(subplan, true, true); |
| |
| /* Reconstruct equi_key_list since the rtable has changed. |
| * XXX we leak the old one. |
| */ |
| root->equi_key_list = construct_equivalencekey_list(root->equi_key_list, |
| resno_map, |
| subquery->targetList, |
| subplan_tlist); |
| if ( p_pathkeys != NULL && *p_pathkeys != NULL ) |
| { |
| *p_pathkeys = reconstruct_pathkeys(root, *p_pathkeys, resno_map, |
| subquery->targetList, subplan_tlist); |
| } |
| |
| pfree(resno_map); |
| |
| return subplan; |
| } |
| |
| |
| /* |
| * hash_safe_type - is the given type hashable? |
| * |
| * Modelled on function hash_safe_grouping in planner.c, we assume |
| * the type is hashable if its equality operator is marked hashjoinable. |
| */ |
| static bool |
| hash_safe_type(Oid type) |
| { |
| Operator optup; |
| bool oprcanhash; |
| |
| optup = equality_oper(type, true); |
| if (!optup) |
| return false; |
| oprcanhash = ((Form_pg_operator) GETSTRUCT(optup))->oprcanhash; |
| ReleaseOperator(optup); |
| if (!oprcanhash) |
| return false; |
| |
| return true; |
| } |
| |
| /* |
| * sorting_prefixes_grouping - is the result ordered on a grouping key prefix? |
| * |
| * If so, then we might prefer a pre-ordered grouping result to one that would |
| * need sorting after the fact. |
| */ |
| static bool |
| sorting_prefixes_grouping(PlannerInfo *root) |
| { |
| return root->sort_pathkeys != NIL |
| && pathkeys_contained_in(root->sort_pathkeys, root->group_pathkeys); |
| } |
| |
| /* |
| * gp_hash_safe_grouping - are grouping operators GP hashable for |
| * redistribution motion nodes? |
| */ |
| static bool |
| gp_hash_safe_grouping(PlannerInfo *root) |
| { |
| List *grouptles; |
| ListCell *glc; |
| |
| grouptles = get_sortgroupclauses_tles(root->parse->groupClause, |
| root->parse->targetList); |
| foreach(glc, grouptles) |
| { |
| TargetEntry *tle = (TargetEntry *)lfirst(glc); |
| bool canhash; |
| canhash = isGreenplumDbHashable(exprType((Node *)tle->expr)); |
| if (!canhash) |
| return false; |
| } |
| return true; |
| } |
| |
| /* |
| * reconstruct_pathkeys |
| * |
| * Reconstruct the given pathkeys based on the given mapping from the original |
| * targetlist to a new targetlist. |
| */ |
| List * |
| reconstruct_pathkeys(PlannerInfo *root, List *pathkeys, int *resno_map, |
| List *orig_tlist, List *new_tlist) |
| { |
| List *new_pathkeys; |
| |
| ListCell *lc; |
| foreach (lc, pathkeys) |
| { |
| List *keys = (List *)lfirst(lc); |
| ListCell *key_lc; |
| TargetEntry *new_tle; |
| |
| Assert(IsA(keys, List)); |
| foreach(key_lc, keys) |
| { |
| PathKeyItem *item = (PathKeyItem *)lfirst(key_lc); |
| int resno = 0; |
| ListCell *tle_lc; |
| Assert(IsA(item, PathKeyItem)); |
| |
| foreach(tle_lc, orig_tlist) |
| { |
| TargetEntry *tle = (TargetEntry *)lfirst(tle_lc); |
| Assert(IsA(tle, TargetEntry)); |
| if (equal(tle->expr, item->key)) |
| { |
| resno = tle->resno; |
| break; |
| } |
| } |
| if (resno > 0) |
| { |
| new_tle = get_tle_by_resno(new_tlist, resno_map[resno-1]); |
| Assert(new_tle != NULL); |
| item->key = (Node *)new_tle->expr; |
| } |
| } |
| } |
| |
| new_pathkeys = canonicalize_pathkeys(root, pathkeys); |
| |
| return new_pathkeys; |
| } |
| |
| |
| /* cost_common_agg -- Estimate the cost of executing the common subquery |
| * for an aggregation plan. Assumes that the AggPlanInfo contains the |
| * correct Path as input_path. |
| * |
| * Returns the total cost and, more importantly, populates the given |
| * dummy Plan node with cost information |
| */ |
| Cost cost_common_agg(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info, Plan *dummy) |
| { |
| QualCost tlist_cost; |
| Cost startup_cost; |
| Cost total_cost; |
| double input_rows; |
| int input_width; |
| int n; |
| |
| Assert(dummy != NULL); |
| |
| input_rows = info->input_path->parent->rows; |
| input_width = info->input_path->parent->width; |
| /* Path input width isn't correct for ctx->sub_tlist so we guess. */ |
| n = 32 * list_length(ctx->sub_tlist); |
| input_width = ( input_width < n )? input_width: n; |
| |
| /* Estimate cost of evaluation of the sub_tlist. */ |
| cost_qual_eval(&tlist_cost, ctx->sub_tlist, root); |
| startup_cost = info->input_path->startup_cost + tlist_cost.startup; |
| total_cost = info->input_path->total_cost + tlist_cost.startup + |
| tlist_cost.per_tuple * input_rows; |
| |
| memset(dummy, 0, sizeof(Plan)); |
| dummy->type = info->input_path->type; |
| dummy->startup_cost = startup_cost; |
| dummy->total_cost = total_cost; |
| dummy->plan_rows = input_rows; |
| dummy->plan_width = input_width; |
| |
| return dummy->total_cost; |
| } |
| |
| |
| |
| /* Function cost_1phase_aggregation |
| * |
| * May be used for 1 phase aggregation costing with or without DQAs. |
| * Corresponds to make_one_stage_agg_plan and must be maintained in sync |
| * with it. |
| */ |
| Cost cost_1phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info) |
| { |
| Plan input_dummy; |
| bool is_sorted; |
| double input_rows; |
| long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 : |
| (*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX : |
| (long)*(ctx->p_dNumGroups); |
| |
| cost_common_agg(root, ctx, info, &input_dummy); |
| input_rows = input_dummy.plan_rows; |
| |
| is_sorted = pathkeys_contained_in(root->group_pathkeys, info->input_path->pathkeys); |
| |
| /* Collocation cost (Motion). */ |
| switch ( info->group_prep ) |
| { |
| case MPP_GRP_PREP_HASH_GROUPS: |
| is_sorted = false; |
| input_dummy.total_cost += |
| incremental_motion_cost(input_dummy.plan_rows, |
| input_dummy.plan_rows); |
| break; |
| case MPP_GRP_PREP_FOCUS_QE: |
| case MPP_GRP_PREP_FOCUS_QD: |
| input_dummy.total_cost += |
| incremental_motion_cost(input_dummy.plan_rows, |
| input_dummy.plan_rows * root->config->cdbpath_segments); |
| input_dummy.plan_rows = input_dummy.plan_rows * root->config->cdbpath_segments; |
| break; |
| default: |
| break; |
| } |
| |
| /* NB: We don't need to calculate grouping extension costs here because |
| * grouping extensions are planned elsewhere. |
| */ |
| if ( ctx->use_hashed_grouping ) |
| { |
| /* HashAgg */ |
| Assert( ctx->numDistinctCols == 0 ); |
| |
| add_agg_cost(NULL, &input_dummy, |
| ctx->sub_tlist, (List*)root->parse->havingQual, |
| AGG_HASHED, false, |
| ctx->numGroupCols, ctx->groupColIdx, |
| numGroups, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| else |
| { |
| if ( ctx->numGroupCols == 0 ) |
| { |
| /* PlainAgg */ |
| add_agg_cost(NULL, &input_dummy, |
| ctx->sub_tlist, (List*)root->parse->havingQual, |
| AGG_PLAIN, false, |
| 0, NULL, |
| 1, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| else |
| { |
| /* GroupAgg */ |
| if ( ! is_sorted ) |
| { |
| add_sort_cost(NULL, &input_dummy, ctx->numGroupCols, NULL, NULL); |
| } |
| add_agg_cost(NULL, &input_dummy, |
| ctx->sub_tlist, (List*)root->parse->havingQual, |
| AGG_SORTED, false, |
| ctx->numGroupCols, ctx->groupColIdx, |
| numGroups, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| |
| /* Beware: AGG_PLAIN and AGG_GROUPED may be used with DQAs, however, |
| * the function cost_agg doesn't distinguish DQAs so it |
| * consistently under estimates the cost in these cases. |
| */ |
| if ( ctx->numDistinctCols > 0 ) |
| { |
| Path path_dummy; |
| double ngrps = *(ctx->p_dNumGroups); |
| double nsorts = ngrps * ctx->numDistinctCols; |
| double avgsize = input_dummy.plan_rows / ngrps; |
| cost_sort(&path_dummy, NULL, NIL, 0.0, avgsize, 32); |
| input_dummy.total_cost += nsorts * path_dummy.total_cost; |
| } |
| } |
| info->plan_cost = root->config->gp_eager_one_phase_agg ? (Cost)0.0 : input_dummy.total_cost; |
| info->valid = true; |
| info->join_strategy = DqaJoinNone; |
| info->use_sharing = false; |
| |
| info->plan_cost *= gp_coefficient_1phase_agg; |
| return info->plan_cost; |
| } |
| |
| |
| /* Function cost_2phase_aggregation |
| * |
| * May be used for 2 phase costing with 0 or 1 DQAs. |
| * Corresponds to make_two_stage_agg_plan and must be maintained in sync |
| * with it. |
| */ |
| Cost cost_2phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info) |
| { |
| Plan input_dummy; |
| bool is_sorted; |
| long numGroups = (*(ctx->p_dNumGroups) < 0) ? 0 : |
| (*(ctx->p_dNumGroups) > LONG_MAX) ? LONG_MAX : |
| (long)*(ctx->p_dNumGroups); |
| double input_rows; |
| double streaming_fudge = 1.3; |
| |
| cost_common_agg(root, ctx, info, &input_dummy); |
| input_rows = input_dummy.plan_rows; |
| |
| is_sorted = pathkeys_contained_in(root->group_pathkeys, info->input_path->pathkeys); |
| |
| /* Precondition Input */ |
| |
| switch ( info->group_prep ) |
| { |
| case MPP_GRP_PREP_HASH_DISTINCT: |
| input_dummy.total_cost += |
| incremental_motion_cost(input_dummy.plan_rows, |
| input_dummy.plan_rows); |
| is_sorted = false; |
| break; |
| case MPP_GRP_PREP_NONE: |
| break; |
| default: |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected call for two-stage aggregation"))); |
| break; /* Never */ |
| } |
| |
| /* Preliminary Aggregation */ |
| |
| if ( ctx->use_hashed_grouping ) |
| { |
| /* Preliminary HashAgg*/ |
| add_agg_cost(NULL, &input_dummy, |
| NIL, NIL, /* Don't know preliminary tlist, qual IS NIL */ |
| AGG_HASHED, root->config->gp_hashagg_streambottom, |
| ctx->numGroupCols, ctx->groupColIdx, |
| numGroups, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| |
| if ( gp_hashagg_streambottom ) |
| { |
| input_dummy.plan_rows *= streaming_fudge; |
| } |
| } |
| else |
| { |
| if ( ctx->numGroupCols == 0 ) |
| { |
| /* Preliminary PlainAgg*/ |
| add_agg_cost(NULL, &input_dummy, |
| NIL, NIL, /* Don't know preliminary tlist, qual IS NIL */ |
| AGG_PLAIN, false, |
| 0, NULL, |
| 1, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| else |
| { |
| /* Preliminary GroupAgg */ |
| if ( ! is_sorted ) |
| { |
| add_sort_cost(NULL, &input_dummy, ctx->numGroupCols, NULL, NULL); |
| } |
| add_agg_cost(NULL, &input_dummy, |
| NIL, NIL, /* Don't know preliminary tlist, qual IS NIL */ |
| AGG_SORTED, false, |
| ctx->numGroupCols, ctx->groupColIdx, |
| numGroups, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| /* Beware: AGG_PLAIN and AGG_GROUPED may be used with DQAs, however, |
| * the function cost_agg doesn't distinguish DQAs so it |
| * consistently under estimates the cost in these cases. |
| */ |
| if ( ctx->numDistinctCols > 0 ) |
| { |
| Path path_dummy; |
| Cost run_cost; |
| double ngrps = *(ctx->p_dNumGroups); |
| double avgsize = input_rows / ngrps; |
| |
| Assert(ctx->numDistinctCols == 1); |
| |
| cost_sort(&path_dummy, NULL, NIL, input_dummy.total_cost, avgsize, 32); |
| run_cost = path_dummy.total_cost - path_dummy.startup_cost; |
| input_dummy.total_cost += path_dummy.startup_cost + ngrps * run_cost; |
| } |
| |
| } |
| |
| /* Collocate groups */ |
| switch ( info->group_type ) |
| { |
| case MPP_GRP_TYPE_GROUPED_2STAGE: /* Redistribute */ |
| input_dummy.total_cost += |
| incremental_motion_cost(input_dummy.plan_rows, |
| input_dummy.plan_rows); |
| break; |
| case MPP_GRP_TYPE_PLAIN_2STAGE: /* Gather */ |
| input_dummy.total_cost += |
| incremental_motion_cost(input_dummy.plan_rows, |
| input_dummy.plan_rows *root->config->cdbpath_segments); |
| break; |
| default: |
| ereport(ERROR, |
| (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected call for two-stage aggregation"))); |
| break; /* Never */ |
| } |
| |
| /* Final Aggregation */ |
| |
| if ( ctx->use_hashed_grouping ) |
| { |
| /* HashAgg*/ |
| add_agg_cost(NULL, &input_dummy, |
| NIL, NIL, /* Don't know tlist or qual */ |
| AGG_HASHED, false, |
| ctx->numGroupCols, ctx->groupColIdx, |
| numGroups, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| else |
| { |
| if ( ctx->numGroupCols == 0 ) |
| { |
| /* PlainAgg*/ |
| add_agg_cost(NULL, &input_dummy, |
| NIL, NIL, /* Don't know tlist or qual */ |
| AGG_PLAIN, false, |
| 0, NULL, |
| 1, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| else |
| { |
| /* GroupAgg */ |
| add_sort_cost(NULL, &input_dummy, ctx->numGroupCols, NULL, NULL); |
| add_agg_cost(NULL, &input_dummy, |
| NIL, NIL, /* Don't know tlist or qual */ |
| AGG_SORTED, false, |
| ctx->numGroupCols, ctx->groupColIdx, |
| numGroups, 0, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| } |
| } |
| |
| info->plan_cost = root->config->gp_eager_two_phase_agg ? (Cost)0.0 : input_dummy.total_cost; |
| info->valid = true; |
| info->join_strategy = DqaJoinNone; |
| info->use_sharing = false; |
| |
| info->plan_cost *= gp_coefficient_2phase_agg; |
| return info->plan_cost; |
| } |
| |
| |
| /* Function cost_3phase_aggregation |
| * |
| * Only used for costing with 2 or more DQAs. |
| * Corresponds to make_three_stage_agg_plan and must be maintained in sync |
| * with it. |
| * |
| * This function assumes the enviroment established by planDqaJoinOrder() |
| * and set_coplan_strategies(). |
| */ |
| Cost cost_3phase_aggregation(PlannerInfo *root, MppGroupContext *ctx, AggPlanInfo *info) |
| { |
| Plan dummy; |
| double input_rows; |
| Cost total_cost; |
| Cost share_cost; |
| Cost reexec_cost; |
| int i; |
| bool use_sharing = false; |
| DqaJoinStrategy join_strategy = DqaJoinUndefined; |
| Cost input_cost = 0.0; |
| Cost cost_coplan_cheapest = 0.0; |
| Cost cost_coplan_sorted = 0.0; |
| Cost cost_hashjoin = 0.0; |
| Cost cost_mergejoin = 0.0; |
| Cost cost_crossjoin = 0.0; |
| |
| cost_common_agg(root, ctx, info, &dummy); |
| |
| input_rows = dummy.plan_rows; |
| total_cost = dummy.total_cost; |
| |
| Assert( ctx->numDistinctCols == list_length(ctx->agg_counts->dqaArgs) ); |
| |
| /* Note that join order has already been established by an earlier |
| * call to planDqaJoinOrder. Here we just use that order, but we |
| * need to decide on the join type. |
| */ |
| if ( list_length(ctx->agg_counts->dqaArgs) < 2 ) |
| { |
| join_strategy = DqaJoinNone; |
| } |
| else if ( ctx->numGroupCols == 0 ) |
| { |
| join_strategy = DqaJoinCross; |
| } |
| else if ( sorting_prefixes_grouping(root) ) |
| { |
| /* Cheapest of merge join of sorted input or sorted hash join */ |
| join_strategy = DqaJoinSorted; |
| } |
| else |
| { |
| /* Cheapest of merge join of sorted input or hash join */ |
| join_strategy = DqaJoinCheapest; |
| } |
| |
| /* Determine whether to use input sharing. */ |
| if ( ctx->numDistinctCols < 2 ) |
| { |
| reexec_cost = total_cost; |
| use_sharing = false; |
| } |
| else |
| { |
| /* Decide based on apparent costs. |
| * XXX Do we need to override this if there are volatile functions |
| * in the common plan? Is this known, or do we need to search? |
| */ |
| share_cost = cost_share_plan(&dummy, root, ctx->numDistinctCols); |
| reexec_cost = ctx->numDistinctCols * total_cost; |
| use_sharing = share_cost < reexec_cost; |
| } |
| input_cost = use_sharing ? share_cost : reexec_cost; |
| |
| /* Get costs for the ctx->numDistinctCols coplans. */ |
| cost_coplan_cheapest = cost_coplan_sorted = 0; |
| for ( i = 0; i < ctx->numDistinctCols; i++ ) |
| { |
| DqaInfo *arg = ctx->dqaArgs + i; |
| |
| cost_coplan_cheapest += arg->cost_cheapest; |
| cost_coplan_sorted += arg->cost_sorted; |
| } |
| |
| /* Get costs to join the coplan results. */ |
| switch ( join_strategy ) |
| { |
| case DqaJoinNone: |
| break; |
| |
| case DqaJoinCross: |
| cost_crossjoin = (ctx->numDistinctCols - 1) * 2 * cpu_tuple_cost; |
| break; |
| |
| case DqaJoinSorted: |
| case DqaJoinCheapest: |
| set_cost_of_join_strategies(ctx, &cost_hashjoin, &cost_mergejoin); |
| |
| if ( join_strategy == DqaJoinSorted ) |
| cost_hashjoin += incremental_sort_cost(*ctx->p_dNumGroups, 100, ctx->numDistinctCols); |
| |
| cost_hashjoin += cost_coplan_cheapest; |
| cost_mergejoin += cost_coplan_sorted; |
| |
| if ( cost_hashjoin > 0.0 && cost_hashjoin <= cost_mergejoin ) |
| { |
| join_strategy = DqaJoinHash; |
| } |
| else |
| { |
| join_strategy = DqaJoinMerge; |
| } |
| break; |
| |
| default: |
| elog(ERROR, "unexpected join strategy during DQA planning"); |
| } |
| |
| /* Compare costs choose cheapest. */ |
| switch ( join_strategy ) |
| { |
| case DqaJoinNone: |
| total_cost = input_cost + cost_coplan_cheapest; |
| break; |
| |
| case DqaJoinCross: |
| total_cost = input_cost + cost_coplan_cheapest + cost_crossjoin; |
| break; |
| |
| case DqaJoinHash: |
| total_cost = input_cost + cost_coplan_cheapest + cost_hashjoin; |
| break; |
| |
| case DqaJoinMerge: |
| total_cost = input_cost + cost_coplan_cheapest + cost_mergejoin; |
| break; |
| |
| default: |
| elog(ERROR, "unexpected join strategy during DQA planning"); |
| } |
| |
| info->plan_cost = root->config->gp_eager_dqa_pruning ? (Cost)0.0 : total_cost; |
| info->valid = true; |
| info->join_strategy = join_strategy; |
| info->use_sharing = use_sharing; |
| |
| info->plan_cost *= gp_coefficient_3phase_agg; |
| return info->plan_cost; |
| } |
| |
| |
| /* Estimate the costs of |
| * 1. HashJoin of cheapest inputs, and |
| * 2. MergeJoin of sorted input. |
| * |
| * If result should be ordered, compare a Sort of 1 with 2. |
| * Else compare 1 with 2. |
| */ |
| void set_cost_of_join_strategies(MppGroupContext *ctx, Cost *hashjoin_cost, Cost *mergejoin_cost) |
| { |
| Cost hj_cost; |
| Cost mj_cost; |
| List *mergeclauses = NIL; |
| List *hashclauses = NIL; |
| |
| double rows; |
| int gk_width; |
| int outer_width; |
| bool try_hashed = true; |
| AttrNumber attrno; |
| Index outer_varno = 1; |
| int i; |
| |
| rows = *ctx->p_dNumGroups; |
| |
| /* Widths are wild speculation, but good enough, we hope. */ |
| gk_width = 32 * ctx->numGroupCols; |
| outer_width = 32; /* DQA transition values for first DQA arg. */ |
| outer_width += 64; /* Ordinary aggregate transition values. */ |
| |
| /* We need join clauses for costing. */ |
| for( i = 0; i < ctx->numGroupCols; i++ ) |
| { |
| Expr *qual; |
| Var *outer_var; |
| Var *inner_var; |
| AttrNumber resno = ctx->groupColIdx[i]; |
| Index inner_varno = 1 + (i + 1); |
| TargetEntry *tle = get_tle_by_resno(ctx->sub_tlist, resno); |
| |
| Assert( tle != NULL ); |
| |
| outer_var = makeVar(outer_varno, resno, |
| exprType((Node *)tle->expr), |
| exprTypmod((Node *)tle->expr), 0); |
| |
| inner_var = makeVar(inner_varno, resno, |
| exprType((Node *)tle->expr), |
| exprTypmod((Node *)tle->expr), 0); |
| |
| /* outer should always be on the left */ |
| qual = make_op(NULL, list_make1(makeString("=")), |
| (Node*) outer_var, |
| (Node*) inner_var, -1); |
| |
| /* If the grouping column is not hashable, do not try hashing. */ |
| if (!hash_safe_type(exprType((Node *)tle->expr))) |
| try_hashed = false; |
| |
| if ( try_hashed ) |
| { |
| hashclauses = lappend(hashclauses, copyObject(qual)); |
| } |
| |
| qual->type = T_DistinctExpr; |
| qual = make_notclause(qual); |
| mergeclauses = lappend(mergeclauses, qual); |
| } |
| |
| /* Estimate the incremental join costs. */ |
| hj_cost = mj_cost = 0; |
| for ( attrno = 1; attrno < ctx->numDistinctCols; attrno++ ) |
| { |
| int dqa_width = 32; |
| int inner_width = gk_width + dqa_width; |
| |
| mj_cost += incremental_mergejoin_cost(rows, mergeclauses, ctx->root); |
| if ( try_hashed ) |
| hj_cost += incremental_hashjoin_cost(rows, inner_width, outer_width, hashclauses, ctx->root); |
| |
| outer_width += dqa_width; |
| } |
| |
| *mergejoin_cost = mj_cost; |
| *hashjoin_cost = try_hashed ? hj_cost : 0.0; |
| } |
| |
| /* Set up basic structure content. Caller to fill in. |
| */ |
| static |
| void initAggPlanInfo(AggPlanInfo *info, Path *input_path, Plan *input_plan) |
| { |
| info->input_path = input_path; |
| info->input_plan = input_plan; |
| |
| if (input_path != NULL) |
| info->input_locus = input_path->locus; |
| else |
| CdbPathLocus_MakeNull(&info->input_locus); |
| |
| info->group_type = MPP_GRP_TYPE_BASEPLAN; |
| info->group_prep = MPP_GRP_PREP_NONE; |
| CdbPathLocus_MakeNull(&info->output_locus); |
| info->distinctkey_collocate = false; |
| |
| info->valid = false; |
| info->plan_cost = 0; |
| info->join_strategy = DqaJoinUndefined; |
| info->use_sharing = false; |
| } |
| |
| |
| /* set_coplan_strategies |
| * |
| * Determine and cache in the given DqaInfo structure the cheapest |
| * strategy that computes the answer and the cheapest strategy that |
| * computes the answer in grouping key order. |
| * |
| * Below, the result cardinalities are shown as <-n- where |
| * |
| * x (input_rows) is the input cardinality which is usually about |
| * equal to #segments * #distinct(grouping key, DQA arg) |
| * |
| * d (darg_rows) is #distinct(grouping key, DQA arg) |
| * |
| * g (group_rows) is #distinct(grouping key) |
| * |
| * The coplan through the Motion that collocates tuples on the |
| * grouping key is planned independently and will be one of |
| * |
| * <- Motion <-x- HashAgg <-t- R |
| * <- Motion <-x- GroupAgg <- Sort <-t- R |
| * |
| * which is encoded in DqaInfo by the flag use_hashed_preliminary. |
| * |
| * The possible post-Motion strategies are encoded as enum values of |
| * type DqaCoplanType and indicate all the required plan nodes. |
| * |
| * Vector aggregation strategies that produce a result ordered on the |
| * grouping key are: |
| * |
| * DQACOPLAN_GGS: <-g- GroupAgg <-d- GroupAgg <- Sort <-x- |
| * DQACOPLAN_GSH: <-g- GroupAgg <- Sort <-d- HashAgg <-x- |
| * DQACOPLAN_SHH: <- Sort <-g- HashAgg <-d- HashAgg <-x- |
| * |
| * In addition, the vector aggreagation strategy |
| * |
| * DQACOPLAN_HH: <-g- HashAgg <-d- HashAgg <-x- R |
| * |
| * produces an unordered result. |
| * |
| * Scalar aggregation strategies are: |
| * |
| * DQACOPLAN_PGS: <-1- PlainAgg <-d- GroupAgg <- Sort <-x- R |
| * DQACOPLAN_PH: <-1- PlainAgg <-d- HashedAgg <-x- R |
| * |
| */ |
| void set_coplan_strategies(PlannerInfo *root, MppGroupContext *ctx, DqaInfo *dqaArg, Path *input) |
| { |
| double input_rows = input->parent->rows; |
| int input_width = input->parent->width; |
| double darg_rows = dqaArg->num_rows; |
| double group_rows = *ctx->p_dNumGroups; |
| long numGroups = (group_rows < 0) ? 0 : |
| (group_rows > LONG_MAX) ? LONG_MAX : |
| (long)group_rows; |
| bool can_hash_group_key = ctx->agg_counts->canHashAgg; |
| bool can_hash_dqa_arg = dqaArg->can_hash; |
| bool use_hashed_preliminary = false; |
| |
| Cost sort_input = incremental_sort_cost(input_rows, input_width, |
| ctx->numGroupCols+1); |
| Cost sort_dargs = incremental_sort_cost(darg_rows, input_width, |
| ctx->numGroupCols); |
| Cost sort_groups = incremental_sort_cost(group_rows, input_width, |
| ctx->numGroupCols); |
| Cost gagg_input = incremental_agg_cost(input_rows, input_width, |
| AGG_SORTED, ctx->numGroupCols+1, |
| numGroups, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| Cost gagg_dargs = incremental_agg_cost(darg_rows, input_width, |
| AGG_SORTED, ctx->numGroupCols, |
| numGroups, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| Cost hagg_input = incremental_agg_cost(input_rows, input_width, |
| AGG_HASHED, ctx->numGroupCols+1, |
| numGroups, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| Cost hagg_dargs = incremental_agg_cost(darg_rows, input_width, |
| AGG_HASHED, ctx->numGroupCols, |
| numGroups, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| Cost cost_base; |
| Cost cost_sorted; |
| Cost cost_cheapest; |
| DqaCoplanType type_sorted; |
| DqaCoplanType type_cheapest; |
| Cost trial; |
| |
| /* Preliminary aggregation */ |
| use_hashed_preliminary = ( can_hash_group_key || ctx->numGroupCols == 0 ) |
| && can_hash_dqa_arg; |
| if ( use_hashed_preliminary ) |
| { |
| cost_base = hagg_input; |
| } |
| else |
| { |
| cost_base = sort_input + gagg_input; |
| } |
| |
| /* Collocating motion */ |
| cost_base += incremental_motion_cost(darg_rows, darg_rows); |
| |
| /* Post-motion processing is more complex. */ |
| |
| if ( ctx->numGroupCols == 0 ) /* scalar agg */ |
| { |
| Cost pagg_dargs = incremental_agg_cost(darg_rows, input_width, |
| AGG_PLAIN, 0, |
| 1, ctx->agg_counts->numAggs, |
| ctx->agg_counts->transitionSpace); |
| |
| type_sorted = type_cheapest = DQACOPLAN_PGS; |
| cost_sorted = cost_cheapest = sort_input + gagg_input + pagg_dargs; |
| |
| trial = hagg_input + pagg_dargs; |
| if (trial < cost_cheapest ) |
| { |
| cost_cheapest = trial; |
| type_cheapest = DQACOPLAN_PH; |
| } |
| } |
| else /* vector agg */ |
| { |
| type_sorted = type_cheapest = DQACOPLAN_GGS; |
| cost_sorted = cost_cheapest = sort_input + gagg_input + gagg_dargs; |
| |
| if ( can_hash_dqa_arg ) |
| { |
| trial = hagg_input + sort_dargs + gagg_input; |
| |
| if ( trial < cost_cheapest ) |
| { |
| cost_cheapest = trial; |
| type_cheapest = DQACOPLAN_GSH; |
| } |
| |
| if ( trial < cost_sorted ) |
| { |
| cost_sorted = trial; |
| type_sorted = DQACOPLAN_GSH; |
| } |
| } |
| |
| if ( can_hash_group_key && can_hash_dqa_arg ) |
| { |
| trial = hagg_input + hagg_dargs; |
| |
| if ( trial < cost_cheapest ) |
| { |
| cost_cheapest = trial; |
| type_cheapest = DQACOPLAN_HH; |
| } |
| |
| trial += sort_groups; |
| |
| if ( trial < cost_sorted ) |
| { |
| cost_sorted = trial; |
| type_sorted = DQACOPLAN_SHH; |
| } |
| } |
| } |
| |
| dqaArg->use_hashed_preliminary = use_hashed_preliminary; |
| dqaArg->cost_sorted = cost_base + cost_sorted; |
| dqaArg->coplan_type_sorted = type_sorted; |
| dqaArg->cost_cheapest = cost_base + cost_cheapest; |
| dqaArg->coplan_type_cheapest = type_cheapest; |
| dqaArg->distinctkey_collocate = false; |
| } |
| |
| |
| /* incremental_sort_cost -- helper for set_coplan_strategies |
| */ |
| Cost incremental_sort_cost(double rows, int width, int numKeyCols) |
| { |
| Plan dummy; |
| |
| memset(&dummy, 0, sizeof(dummy)); |
| dummy.plan_rows = rows; |
| dummy.plan_width = width; |
| |
| add_sort_cost(NULL, &dummy, numKeyCols, NULL, NULL); |
| |
| return dummy.total_cost; |
| } |
| |
| /* incremental_agg_cost -- helper for set_coplan_strategies |
| */ |
| Cost incremental_agg_cost(double rows, int width, AggStrategy strategy, |
| int numGroupCols, double numGroups, |
| int numAggs, int transSpace) |
| { |
| Plan dummy; |
| |
| memset(&dummy, 0, sizeof(dummy)); |
| dummy.plan_rows = rows; |
| dummy.plan_width = width; |
| |
| add_agg_cost(NULL, &dummy, |
| NULL, NULL, |
| strategy, false, |
| numGroupCols, NULL, |
| numGroups, 0, numAggs, transSpace); |
| |
| return dummy.total_cost; |
| } |
| |
| |
| /* incremental_motion_cost -- helper for set_coplan_strategies |
| */ |
| Cost incremental_motion_cost(double sendrows, double recvrows) |
| { |
| Cost cost_per_row = (gp_motion_cost_per_row > 0.0) |
| ? gp_motion_cost_per_row |
| : 2.0 * cpu_tuple_cost; |
| return cost_per_row * 0.5 * (sendrows + recvrows); |
| } |
| |
| /* |
| * choose_deduplicate |
| * numGroups is an output parameter that is estimated number of unique groups. |
| * |
| * Ideally, we could estimate all the costs based on what will happen and |
| * choose either of a naive or a de-duplicate plan. However, it is nearly |
| * impossible because there are plans decided by other components such as |
| * cdb_grouping_planner() and many branches including Redistribute is |
| * needed or not. In addition, estimated number of groups in multi-column |
| * case (i.e. GROUP BY + ORDER BY) is far from the truth. We could improve |
| * the code by adding more cost calculations, but it is reasonable to |
| * estimate the major costs of initial sort/aggregate for now. |
| */ |
| static bool |
| choose_deduplicate(PlannerInfo *root, List *sortExprs, |
| Plan *input_plan, double *numGroups) |
| { |
| double num_distinct; |
| double input_rows = input_plan->plan_rows; |
| Path dummy_path; |
| Cost naive_cost, dedup_cost; |
| int32 width; |
| AggStrategy aggstrategy; |
| int numGroupCols; |
| |
| naive_cost = 0; |
| dedup_cost = 0; |
| width = input_plan->plan_width; |
| /* Add int8 column anyway which holds count */ |
| width += get_typavgwidth(INT8OID, -1); |
| numGroupCols = list_length(sortExprs); |
| |
| /* |
| * First, calculate cost of naive case. |
| */ |
| cost_sort(&dummy_path, root, NIL, |
| input_plan->total_cost, |
| input_plan->plan_rows, |
| input_plan->plan_width); |
| naive_cost = dummy_path.total_cost; |
| |
| /* |
| * Next, calculate cost of deduplicate. |
| * The first aggregate calculates number of duplicate for |
| * each unique sort key, then we add cost of sort after |
| * the aggregate. |
| */ |
| num_distinct = estimate_num_groups(root, sortExprs, input_rows); |
| aggstrategy = AGG_HASHED; |
| cost_agg(&dummy_path, root, aggstrategy, 1, numGroupCols, num_distinct, |
| input_plan->startup_cost, input_plan->total_cost, |
| input_plan->plan_rows, input_plan->plan_width, |
| 0, 0, false); |
| dummy_path.total_cost += |
| incremental_motion_cost(num_distinct, |
| num_distinct * root->config->cdbpath_segments); |
| cost_sort(&dummy_path, root, NIL, |
| dummy_path.total_cost, |
| num_distinct, |
| width); |
| dedup_cost = dummy_path.total_cost; |
| |
| if (numGroups) |
| *numGroups = num_distinct; |
| |
| /* we need some calculates above even if the flag is off */ |
| if (pg_strcasecmp(gp_idf_deduplicate_str, "force") == 0) |
| return true; |
| if (pg_strcasecmp(gp_idf_deduplicate_str, "none") == 0) |
| return false; |
| |
| return dedup_cost < naive_cost; |
| } |
| |
| /* |
| * wrap_plan_index |
| * |
| * A wrapper of wrap_plan. The column name aliases are retrieved from |
| * sub plan's target list automatically. The main additional operation here |
| * is to modify varno on each vars in either the new target list and |
| * hashExpr (qual is not set by wrap_plan.) While wrap_plan creates |
| * Query with the initial RangeTblEntry with index 1 and this is right, |
| * we sometimes need to wrap plan by a subquery scan as a part of upper join. |
| * In that case, varno is not necessarily 1. |
| * |
| * If varno > 1, the vars' references are updated. Thus, |
| * (*query_p)->rtable is not valid anymore, since rtable should have all |
| * RangeTblEntry up to varno. Still, we leave it in rtable field |
| * for the caller to use the rte from rtable. |
| * |
| * Note that the target list of the plan and query is identical and |
| * represents the "one on generated plan," which means the caller still needs |
| * to update other target list like upper parser->targetList. This routines |
| * does not handle it. |
| * |
| * This is in the middle of wrap_plan() and add_subqueryscan(). We actually |
| * need a more sophisticated and all-round way. |
| * |
| * This also updates locus if given, and root->group_pathkeys to match |
| * the subquery targetlist. |
| */ |
| static Plan * |
| wrap_plan_index(PlannerInfo *root, Plan *plan, Query *query, |
| List **p_pathkeys, Index varno, const char *alias_name, |
| Query **query_p) |
| { |
| ListCell *l; |
| |
| plan = wrap_plan(root, plan, query, p_pathkeys, |
| alias_name, NIL, query_p); |
| Assert(varno > 0); |
| if (varno != 1) |
| { |
| foreach (l, plan->flow->hashExpr) |
| { |
| Var *var = lfirst(l); |
| |
| if (IsA(var, Var)) |
| { |
| /* fix varno, which is set to 1 in wrap_plan */ |
| Assert(var->varno == 1); |
| var->varno = var->varnoold = varno; |
| } |
| } |
| |
| /* |
| * Currently, plan and new parse tree shares target list. |
| * If this breaks, we'll need to update parse's target list as well. |
| */ |
| Assert(plan->targetlist == (*query_p)->targetList); |
| foreach (l, plan->targetlist) |
| { |
| TargetEntry *tle = lfirst(l); |
| Var *var = (Var *) tle->expr; |
| |
| if (IsA(var, Var)) |
| { |
| /* fix varno, which is set to 1 in wrap_plan */ |
| Assert(var->varno == 1); |
| var->varno = var->varnoold = varno; |
| } |
| } |
| Assert(IsA(plan, SubqueryScan)); |
| ((SubqueryScan *) plan)->scan.scanrelid = varno; |
| Assert(plan->qual == NIL); |
| } |
| |
| /* |
| * Update group_pathkeys in order to represent this subquery. |
| */ |
| root->group_pathkeys = |
| make_pathkeys_for_groupclause(root->parse->groupClause, plan->targetlist); |
| return plan; |
| } |
| |
| /* |
| * rebuilt_simple_rel_and_rte |
| * |
| * Rebuild arrays for RelOptInfo and RangeTblEntry for the PlannerInfo when |
| * the underlying range tables are transformed. They can be rebuilt from |
| * parse->rtable, and will be used in later than here by processes like |
| * distinctClause planning. We never pfree the original array, since |
| * it's potentially used by other PlannerInfo which this is copied from. |
| */ |
| static void |
| rebuild_simple_rel_and_rte(PlannerInfo *root) |
| { |
| int i; |
| int array_size; |
| ListCell *l; |
| |
| array_size = list_length(root->parse->rtable) + 1; |
| root->simple_rel_array_size = array_size; |
| root->simple_rel_array = |
| (RelOptInfo **) palloc0(sizeof(RelOptInfo *) * array_size); |
| root->simple_rte_array = |
| (RangeTblEntry **) palloc0(sizeof(RangeTblEntry *) * array_size); |
| i = 1; |
| foreach (l, root->parse->rtable) |
| { |
| (void) build_simple_rel(root, i, RELOPT_BASEREL); |
| root->simple_rte_array[i] = lfirst(l); |
| i++; |
| } |
| } |
| |
| /* |
| * make_parallel_or_sequential_agg |
| * |
| * This is the common pattern to create multi-phase aggregate by cdb_grouping_planner |
| * or single agg by make_agg. The code was stolen from grouping_planner, but a little |
| * simplified under some assumptions to be used in limited condition. Here we assume |
| * we don't have GROUPING SETS and there is at least one aggregate. This function |
| * assumes the caller has already set up subplan as group_context->subplan. |
| * |
| * The caller may not pass current_pathkeys_p if it's not interested. |
| */ |
| static Plan * |
| make_parallel_or_sequential_agg(PlannerInfo *root, AggClauseCounts *agg_counts, |
| GroupContext *group_context, List **current_pathkeys_p) |
| { |
| Plan *result_plan; |
| List *current_pathkeys; |
| |
| |
| /* |
| * current_pathkeys_p can be NULL, which means the caller isn't interested in |
| * the pathkeys. Still, we are. |
| */ |
| if (current_pathkeys_p) |
| current_pathkeys = *current_pathkeys_p; |
| else |
| current_pathkeys = NIL; |
| |
| result_plan = cdb_grouping_planner(root, agg_counts, group_context); |
| if (!result_plan) |
| { |
| /* |
| * If cdb_grouping_planner doesn't return a plan, |
| * it means the plan should fall back to sequential. |
| * In that case, multi-phase aggregate plan is not used. |
| * Here it's much simpler than grouping_planner, |
| * since we are sure we have at least one aggregate function |
| * and no GROUPING SETS. |
| */ |
| AggStrategy aggstrategy; |
| |
| result_plan = group_context->subplan; |
| if (group_context->use_hashed_grouping) |
| { |
| /* |
| * HashAggregate case |
| */ |
| aggstrategy = AGG_HASHED; |
| current_pathkeys = NIL; |
| } |
| else |
| { |
| /* |
| * GroupAggregate case |
| */ |
| if (root->parse->groupClause) |
| { |
| if (!pathkeys_contained_in(root->group_pathkeys, |
| current_pathkeys)) |
| { |
| result_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| root->parse->groupClause, |
| group_context->groupColIdx, |
| false, |
| result_plan); |
| mark_sort_locus(result_plan); |
| } |
| aggstrategy = AGG_SORTED; |
| current_pathkeys = |
| make_pathkeys_for_groupclause(root->parse->groupClause, result_plan->targetlist); |
| current_pathkeys = canonicalize_pathkeys(root, current_pathkeys); |
| } |
| else |
| { |
| /* |
| * No GROUP BY case |
| */ |
| aggstrategy = AGG_PLAIN; |
| current_pathkeys = NIL; |
| } |
| } |
| /* |
| * Now make a single Agg node. |
| */ |
| result_plan = (Plan *) make_agg(root, |
| root->parse->targetList, |
| (List *) root->parse->havingQual, |
| aggstrategy, |
| false, |
| group_context->numGroupCols, |
| group_context->groupColIdx, |
| *group_context->p_dNumGroups, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| 0, /* grouping */ |
| 0, /* rollup_gs_times */ |
| agg_counts->numAggs, |
| agg_counts->transitionSpace, |
| result_plan); |
| mark_passthru_locus(result_plan, true, current_pathkeys != NIL); |
| } |
| else |
| current_pathkeys = *group_context->pcurrent_pathkeys; |
| |
| if (current_pathkeys_p) |
| *current_pathkeys_p = current_pathkeys; |
| |
| return result_plan; |
| } |
| |
| /* |
| * deconstruct_within_agg |
| * deconstruct within-aggregate and normal aggregate expressions to sub pieces. |
| * |
| * In the split-DQA-join pattern, we want to deconstruct individual aggregate |
| * or percentile expressions, group them, and relocate them to the appropriate |
| * sub-plan target list. The information about which expression goes to which |
| * sub-plan is stored in MppGroupContext structure, and the function returns |
| * Var that points to the expression on sub-plan which the original expression |
| * is replaced with. |
| * |
| * See also similar deconstruct_expr() for the general aggregate case. |
| */ |
| static Node * |
| deconstruct_within_agg(Node *node, MppGroupContext *ctx) |
| { |
| return deconstruct_within_agg_mutator(node, ctx); |
| } |
| |
| /* |
| * deconstruct_within_agg_mutator |
| * |
| * The workhorse of deconstruct_within_agg() |
| */ |
| static Node * |
| deconstruct_within_agg_mutator(Node *node, MppGroupContext *ctx) |
| { |
| TargetEntry *tle; |
| |
| if (node == NULL) |
| return NULL; |
| |
| if (IsA(node, Aggref) || IsA(node, PercentileExpr)) |
| { |
| Index tlistno; |
| AttrNumber attno; |
| List *sortclauses, *dref_tlist; |
| ListCell *l; |
| Node *final_node; |
| |
| /* |
| * Here we may see normal aggregates, not only percentiles. |
| * If normal aggs are involved, ctx->wagSortClauses should have |
| * NIL elements for it. |
| */ |
| if (IsA(node, PercentileExpr)) |
| sortclauses = ((PercentileExpr *) node)->sortClause; |
| else |
| sortclauses = NIL; |
| |
| /* |
| * Find the right sub-plan which this expression should go. |
| */ |
| tlistno = 0; |
| foreach (l, ctx->wagSortClauses) |
| { |
| /* Note NIL can be equal to NIL, too. */ |
| if (equal(sortclauses, lfirst(l))) |
| break; |
| tlistno++; |
| } |
| /* Not found? Should not happen... */ |
| if (!l) |
| elog(ERROR, "unexpected use of aggregate"); |
| dref_tlist = ctx->dref_tlists[tlistno]; |
| /* |
| * If the same expression exists at the same level, recycle it. |
| * Otherwise, create a new expression. |
| */ |
| tle = tlist_member(node, dref_tlist); |
| attno = list_length(dref_tlist) + 1; |
| if (!tle) |
| { |
| /* |
| * Don't copy node, share it with tlist, for later operation |
| * can modify the var reference in tlist. |
| */ |
| tle = makeTargetEntry((Expr *) node, |
| attno, NULL, false); |
| dref_tlist = lappend(dref_tlist, tle); |
| } |
| else |
| attno = tle->resno; |
| ctx->dref_tlists[tlistno] = dref_tlist; |
| |
| final_node = (Node *) makeVar(dqa_base_varno + tlistno, |
| attno, exprType((Node *) tle->expr), |
| exprTypmod((Node *) tle->expr), 0); |
| return final_node; |
| } |
| |
| /* |
| * If the given expression is a grouping expression, replace it with |
| * a Var node referring to the (lower) preliminary aggregation's |
| * target list. |
| */ |
| tle = tlist_member(node, ctx->grps_tlist); |
| if (tle != NULL) |
| return (Node *) makeVar(grp_varno, tle->resno, |
| exprType((Node *) tle->expr), |
| exprTypmod((Node *) tle->expr), 0); |
| |
| return expression_tree_mutator(node, deconstruct_within_agg_mutator, ctx); |
| } |
| |
| /* |
| * Returns percentile expressions associated with the specified sortClause. |
| * |
| * Because we want to keep percentile expressions in target list or having list |
| * to make any changes to them identically, we extract those expressions |
| * every time we need. It is more valuable here to keep consistent than to reduce |
| * a few cycles by copying those nodes. |
| */ |
| static List * |
| fetch_percentiles(Query *parse, List *sortClause) |
| { |
| List *nodes, *result; |
| ListCell *l; |
| |
| nodes = list_concat(extract_nodes(NULL, (Node *) parse->targetList, T_PercentileExpr), |
| extract_nodes(NULL, (Node *) parse->havingQual, T_PercentileExpr)); |
| nodes = list_concat(nodes, |
| extract_nodes(NULL, (Node *) parse->scatterClause, T_PercentileExpr)); |
| result = NIL; |
| foreach (l, nodes) |
| { |
| PercentileExpr *p = lfirst(l); |
| |
| if (equal(sortClause, p->sortClause)) |
| result = lappend(result, p); |
| } |
| |
| return result; |
| } |
| |
| /* |
| * make_deduplicate_plan |
| * |
| * Build a plan that removes duplicate rows with GROUP BY + ORDER BY (in WITHIN) |
| * clause. It may or may not use two phase aggregate inside. Note the output |
| * plan has different target list than non-deduplicate plan, since the aggregate |
| * process cannot process un-grouped columns. |
| */ |
| static Plan * |
| make_deduplicate_plan(PlannerInfo *root, |
| GroupContext *group_context, |
| List *groupClause, |
| List *sortClause, |
| double numGroups, |
| AttrNumber *pc_pos_p, |
| List **current_pathkeys_p, |
| Plan *subplan) |
| { |
| Plan *result_plan; |
| Aggref *aggref; |
| GroupContext ctx; |
| ListCell *l1, *l2; |
| TargetEntry *pc_tle; |
| List *tlist; |
| int numGroupCols; |
| AttrNumber *groupColIdx; |
| List *pathkeys = NIL; |
| bool querynode_changed = false; |
| AggClauseCounts agg_counts; |
| bool use_hashed_grouping; |
| |
| Query *original_parse, *parse; |
| List *original_group_pathkeys; |
| |
| List *sub_tlist = group_context->sub_tlist; |
| Expr *tvexpr; /* ORDER BY expression */ |
| const Index Outer = 1; |
| |
| /* |
| * It is doable to just concatenate groupClause and sortClause, |
| * but it is more semantic to convert sortClause to groupClause. |
| * Especially we want to use make_pathkeys_from_grouplcause later where |
| * sortClause is not handled. |
| * |
| * Copy input groupClause, since we change it. |
| */ |
| groupClause = copyObject(groupClause); |
| foreach (l1, sortClause) |
| { |
| SortClause *sc = copyObject(lfirst(l1)); |
| |
| sc->type = T_GroupClause; |
| groupClause = lappend(groupClause, sc); |
| } |
| |
| groupColIdx = get_grouplist_colidx(groupClause, sub_tlist, &numGroupCols); |
| |
| /* |
| * Make target list derived from sub_tlist. Note that we filter out |
| * ungrouped columns which will be bogus after the aggregate. |
| */ |
| tlist = NIL; |
| foreach (l1, sub_tlist) |
| { |
| TargetEntry *tle = lfirst(l1); |
| TargetEntry *newtle; |
| |
| /* |
| * Check if this target is a part of grouping columns. |
| */ |
| foreach (l2, groupClause) |
| { |
| GroupClause *gc = lfirst(l2); |
| |
| if (gc->tleSortGroupRef == tle->ressortgroupref) |
| break; |
| } |
| /* Found, so add it */ |
| if (l2) |
| { |
| newtle = flatCopyTargetEntry(tle); |
| newtle->resno = list_length(tlist) + 1; |
| tlist = lappend(tlist, newtle); |
| } |
| } |
| |
| /* |
| * Count ORDER BY expression so that since NULL input should |
| * be ignored. We still need not to eliminate NULL input since |
| * the result should be returned per group even if the group |
| * has nothing but NULL. |
| */ |
| tvexpr = (Expr *) |
| get_sortgroupclause_expr(linitial(sortClause), sub_tlist); |
| /* |
| * Append peer count expression to target list. |
| */ |
| *pc_pos_p = list_length(tlist) + 1; |
| aggref = makeAggrefByOid(AGGFNOID_COUNT_ANY, list_make1(tvexpr)); |
| pc_tle = makeTargetEntry((Expr *) aggref, |
| *pc_pos_p, "peer_count", false); |
| tlist = lappend(tlist, pc_tle); |
| |
| MemSet(&agg_counts, 0, sizeof(AggClauseCounts)); |
| count_agg_clauses((Node *) tlist, &agg_counts); |
| |
| use_hashed_grouping = choose_hashed_grouping(root, |
| group_context->tuple_fraction, |
| group_context->cheapest_path, |
| NULL, |
| numGroups, |
| &agg_counts); |
| use_hashed_grouping = agg_counts.canHashAgg; |
| |
| ctx.best_path = group_context->best_path; |
| ctx.cheapest_path = group_context->cheapest_path; |
| ctx.subplan = subplan; |
| ctx.sub_tlist = NIL; |
| ctx.tlist = tlist; |
| ctx.use_hashed_grouping = use_hashed_grouping; |
| ctx.tuple_fraction = 0.1; |
| ctx.canonical_grpsets = NULL; |
| ctx.grouping = 0; |
| ctx.numGroupCols = numGroupCols; |
| ctx.groupColIdx = groupColIdx; |
| ctx.numDistinctCols = 0; |
| ctx.distinctColIdx = NULL; |
| ctx.p_dNumGroups = &numGroups; |
| ctx.pcurrent_pathkeys = &pathkeys; |
| ctx.querynode_changed = &querynode_changed; |
| |
| original_parse = root->parse; |
| original_group_pathkeys = root->group_pathkeys; |
| |
| root->parse = parse = copyObject(root->parse); |
| root->parse->groupClause = groupClause; |
| root->parse->targetList = tlist; |
| root->parse->havingQual = NULL; |
| root->parse->scatterClause = NIL; |
| root->group_pathkeys = make_pathkeys_for_groupclause(groupClause, tlist); |
| |
| /* |
| * Make a multi-phase or simple agg plan. |
| */ |
| result_plan = make_parallel_or_sequential_agg(root, |
| &agg_counts, |
| &ctx, |
| current_pathkeys_p); |
| |
| root->parse = original_parse; |
| root->group_pathkeys = original_group_pathkeys; |
| |
| /* |
| * Add SubqueryScan to wrap this anyway, so that |
| * the outcome of deduplicate can be treated as a simple subquery relation. |
| */ |
| result_plan = wrap_plan_index(root, |
| result_plan, |
| parse, |
| current_pathkeys_p, |
| Outer, |
| "deduplicate", |
| &parse); |
| |
| /* |
| * Update the reference based on the aggregate target list. This ensures |
| * the target list of parse tree is pointing the top SubqueryScan's Vars. |
| */ |
| root->parse->targetList = (List *) |
| cdbpullup_expr((Expr *) root->parse->targetList, tlist, NIL, Outer); |
| root->parse->havingQual = (Node *) |
| cdbpullup_expr((Expr *) root->parse->havingQual, tlist, NIL, Outer); |
| root->parse->scatterClause = (List *) |
| cdbpullup_expr((Expr *) root->parse->scatterClause, tlist, NIL, Outer); |
| root->parse->rtable = parse->rtable; |
| |
| // Rebuild arrays for RelOptInfo and RangeTblEntry for the PlannerInfo |
| // since the underlying range tables have been transformed |
| rebuild_simple_rel_and_rte(root); |
| |
| return result_plan; |
| } |
| |
| /* |
| * within_agg_make_baseplan |
| * |
| * Creates the scaffold plan which will be shared between the outer and |
| * inner plans. It may choose de-duplicate plan based on costs and GUC. |
| * The output plan is always a closed SubqueryScan. This will set |
| * pc_pos in wag_context. |
| */ |
| static Plan * |
| within_agg_make_baseplan(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| List *sortClause, |
| Plan *result_plan) |
| { |
| List *sub_tlist = group_context->sub_tlist; |
| double dedup_numGroups; |
| List *dedup_key_exprs; |
| |
| /* |
| * The GROUP BY keys are the normal grouping keys + sort key. |
| */ |
| dedup_key_exprs = list_concat( |
| get_sortgrouplist_exprs(root->parse->groupClause, sub_tlist), |
| get_sortgrouplist_exprs(sortClause, sub_tlist)); |
| |
| /* |
| * Decide whether deduplicate is useful or not. |
| */ |
| wag_context->use_deduplicate = choose_deduplicate(root, |
| dedup_key_exprs, |
| result_plan, |
| &dedup_numGroups); |
| |
| /* |
| * Create the base subplan for the upper join. We may take |
| * decuplicate way or not, but anyway the target list of result_plan has |
| * an extra target entry for the peer count. |
| */ |
| if (wag_context->use_deduplicate) |
| { |
| /* |
| * The deduplicate optimization. We reduce identical rows |
| * and record the number of reduced rows, so that |
| * percentile function can see the original rows. |
| * It's similar to the run-length encoding. |
| * |
| * root->parse is updated inside to represent this subquery. |
| */ |
| result_plan = make_deduplicate_plan(root, |
| group_context, |
| root->parse->groupClause, |
| sortClause, |
| dedup_numGroups, |
| &wag_context->pc_pos, |
| &wag_context->current_pathkeys, |
| result_plan); |
| } |
| else |
| { |
| Query *subquery; |
| Expr *tv_expr; |
| NullTest *nt; |
| CaseWhen *casearg; |
| CaseExpr *pc_expr; |
| TargetEntry *pc_tle; |
| |
| /* |
| * The naive case. Wrapping this plan with SubqueryScan anyway |
| * is demanded as the underlying plan might be SharedInputScan where |
| * the target list should not be modified, and in order to align |
| * the semantics with de-duplicate case. |
| */ |
| result_plan = wrap_plan_index(root, |
| result_plan, |
| root->parse, |
| &wag_context->current_pathkeys, |
| 1, |
| "nondeduplicate", |
| &subquery); |
| root->parse->targetList = (List *) |
| cdbpullup_expr((Expr *) root->parse->targetList, sub_tlist, NIL, 1); |
| root->parse->havingQual = (Node *) |
| cdbpullup_expr((Expr *) root->parse->havingQual, sub_tlist, NIL, 1); |
| root->parse->scatterClause = (List *) |
| cdbpullup_expr((Expr *) root->parse->scatterClause, sub_tlist, NIL, 1); |
| root->parse->rtable = subquery->rtable; |
| |
| /* |
| * We make zero as the peer count if tv is NULL. The inner |
| * should count up how many non-NULL there is. |
| * |
| * pc = CASE WHEN tv IS NOT NULL THEN 1 ELSE 0 END |
| */ |
| tv_expr = (Expr *) |
| get_sortgroupclause_expr(linitial(sortClause), result_plan->targetlist); |
| nt = makeNode(NullTest); |
| nt->arg = tv_expr; |
| nt->nulltesttype = IS_NOT_NULL; |
| casearg = makeNode(CaseWhen); |
| casearg->expr = (Expr *) nt; |
| casearg->result = (Expr *) |
| makeConst(INT8OID, -1, 8, Int64GetDatum(1), false, true); |
| pc_expr = makeNode(CaseExpr); |
| pc_expr->casetype = INT8OID; |
| pc_expr->arg = NULL; |
| pc_expr->args = list_make1(casearg); |
| pc_expr->defresult = (Expr *) |
| makeConst(INT8OID, -1, 8, Int64GetDatum(0), false, true); |
| pc_tle = makeTargetEntry((Expr *) pc_expr, |
| list_length(result_plan->targetlist) + 1, |
| pstrdup("peer_count"), |
| false); |
| wag_context->pc_pos = pc_tle->resno; |
| result_plan->targetlist = lappend(result_plan->targetlist, pc_tle); |
| } |
| |
| /* |
| * result_plan is SubqueryScan here whichever we took. Update locus |
| * in order to represent this subqeury. |
| */ |
| Assert(IsA(result_plan, SubqueryScan)); |
| |
| return result_plan; |
| } |
| |
| /* |
| * within_agg_add_outer_sort |
| * |
| * Adds Sort to the outer plan, with Motion if desired. This is very |
| * straightforward and does not contain within-aggregate specific stuff. |
| * current_pathkeys in wag_context may be updated. |
| */ |
| static Plan * |
| within_agg_add_outer_sort(PlannerInfo *root, |
| WithinAggContext *wag_context, |
| List *sortClause, |
| Plan *outer_plan) |
| { |
| List *sort_pathkeys; |
| Query *outer_parse; |
| const Index Outer = 1; |
| |
| |
| if (!root->parse->groupClause) |
| { |
| /* |
| * Plain aggregate case. Gather tuples into QD. |
| */ |
| sort_pathkeys = |
| make_pathkeys_for_sortclauses(sortClause, outer_plan->targetlist); |
| sort_pathkeys = canonicalize_pathkeys(root, sort_pathkeys); |
| |
| /* |
| * Check the sort redundancy. |
| */ |
| if (!pathkeys_contained_in(sort_pathkeys, wag_context->current_pathkeys)) |
| { |
| outer_plan = (Plan *) |
| make_sort_from_sortclauses(root, sortClause, outer_plan); |
| mark_sort_locus(outer_plan); |
| wag_context->current_pathkeys = sort_pathkeys; |
| } |
| if (outer_plan->flow->flotype != FLOW_SINGLETON) |
| { |
| outer_plan = (Plan *) make_motion_gather_to_QE(outer_plan, true); |
| outer_plan->total_cost += |
| incremental_motion_cost(outer_plan->plan_rows, |
| outer_plan->plan_rows * root->config->cdbpath_segments); |
| } |
| } |
| else |
| { |
| CdbPathLocus current_locus; |
| List *groupSortClauses; |
| |
| Assert(root->group_pathkeys); |
| |
| /* |
| * Create current locus. |
| */ |
| current_locus = cdbpathlocus_from_flow(outer_plan->flow); |
| if (CdbPathLocus_IsPartitioned(current_locus) && |
| outer_plan->flow->hashExpr) |
| current_locus = cdbpathlocus_from_exprs(root, outer_plan->flow->hashExpr); |
| |
| /* |
| * Add a redistribute motion if the group key doesn't collocate. |
| * group_pathkeys should have been fixed to reflect the latest targetlist. |
| * best_path->locus is wrong here since we put SubqueryScan already. |
| */ |
| if (!cdbpathlocus_collocates(current_locus, root->group_pathkeys, false /*exact_match*/)) |
| { |
| List *groupExprs; |
| |
| groupExprs = get_sortgrouplist_exprs(root->parse->groupClause, |
| outer_plan->targetlist); |
| outer_plan = (Plan *) |
| make_motion_hash(root, outer_plan, groupExprs); |
| outer_plan->total_cost += |
| incremental_motion_cost(outer_plan->plan_rows, |
| outer_plan->plan_rows); |
| /* |
| * Invalidate pathkeys; the result is not sorted any more. |
| */ |
| wag_context->current_pathkeys = NULL; |
| } |
| /* |
| * Now we can add sort node. |
| */ |
| groupSortClauses = list_concat(copyObject(root->parse->groupClause), |
| sortClause); |
| sort_pathkeys = |
| make_pathkeys_for_sortclauses(groupSortClauses, outer_plan->targetlist); |
| sort_pathkeys = canonicalize_pathkeys(root, sort_pathkeys); |
| |
| if (!pathkeys_contained_in(sort_pathkeys, wag_context->current_pathkeys)) |
| { |
| outer_plan = (Plan *) make_sort_from_sortclauses(root, |
| groupSortClauses, |
| outer_plan); |
| mark_sort_locus(outer_plan); |
| /* |
| * Update current pathkeys. |
| */ |
| wag_context->current_pathkeys = sort_pathkeys; |
| } |
| } |
| |
| /* |
| * Prepare parse tree for wrapping it by SubqueryScan. |
| */ |
| outer_parse = copyObject(root->parse); |
| |
| root->parse->targetList = (List *) |
| cdbpullup_expr((Expr *) root->parse->targetList, outer_plan->targetlist, NIL, 1); |
| root->parse->havingQual = (Node *) |
| cdbpullup_expr((Expr *) root->parse->havingQual, outer_plan->targetlist, NIL, 1); |
| root->parse->scatterClause = (List *) |
| cdbpullup_expr((Expr *) root->parse->scatterClause, outer_plan->targetlist, NIL, 1); |
| /* |
| * Wrap plan by subquery as the outer of upcoming join. |
| */ |
| outer_plan = wrap_plan_index(root, |
| outer_plan, |
| outer_parse, |
| &wag_context->current_pathkeys, |
| Outer, |
| "outer_plan", |
| &outer_parse); |
| Assert(list_length(wag_context->rtable) == 0); |
| wag_context->rtable = list_concat(wag_context->rtable, outer_parse->rtable); |
| |
| return outer_plan; |
| } |
| |
| /* |
| * within_agg_construct_inner |
| * |
| * Constructs the inner plan which calculates the total non-NULL row count |
| * of the input. The aggregate function is actually not count(), but sum() |
| * of the peer count which has considered NULL input in both of naive and |
| * de-duplicate cases. Updates wag_context->tc_pos and the output plan |
| * should be SubqueryScan ready for the join. |
| */ |
| static Plan * |
| within_agg_construct_inner(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| Plan *inner_plan) |
| { |
| ListCell *l; |
| int idx; |
| int numGroupCols; |
| Path input_path; |
| TargetEntry *pc_tle; |
| Expr *tc_expr; |
| GroupContext ctx; |
| List *tlist; |
| double numGroups = *group_context->p_dNumGroups; |
| bool use_hashed_grouping; |
| bool querynode_changed = false; |
| List *pathkeys = NIL; |
| AggClauseCounts agg_counts; |
| AttrNumber *grpColIdx; |
| Query *original_parse; |
| List *original_group_pathkeys; |
| Query *parse; |
| const Index Inner = 2; |
| |
| grpColIdx = get_grouplist_colidx(root->parse->groupClause, |
| inner_plan->targetlist, &numGroupCols); |
| /* build grouping key columns */ |
| tlist = NIL; |
| foreach_with_count (l, root->parse->groupClause, idx) |
| { |
| GroupClause *gc = (GroupClause *) lfirst(l); |
| TargetEntry *tle, *newtle; |
| |
| tle = get_sortgroupclause_tle(gc, inner_plan->targetlist); |
| newtle = flatCopyTargetEntry(tle); |
| newtle->resno = (AttrNumber) idx + 1; |
| tlist = lappend(tlist, newtle); |
| } |
| /* |
| * Sum up the peer count to count the total number of rows per group. |
| */ |
| pc_tle = get_tle_by_resno(inner_plan->targetlist, wag_context->pc_pos); |
| tc_expr = (Expr *) makeAggrefByOid(AGGFNOID_SUM_BIGINT, |
| list_make1(pc_tle->expr)); |
| tc_expr = (Expr *) coerce_to_bigint(NULL, (Node *) tc_expr, "sum_to_bigint"); |
| wag_context->tc_pos = list_length(tlist) + 1; |
| tlist = lappend(tlist, makeTargetEntry((Expr *) tc_expr, |
| wag_context->tc_pos, |
| "total_count", |
| false)); |
| |
| /* |
| * best_path is not appropriate here after building some SubqueryScan. |
| * Build up a dummy Path to reflect the underlying plan, but |
| * needed information is only locus in cdb_grouping_planner. |
| */ |
| memcpy(&input_path, group_context->best_path, sizeof(Path)); |
| |
| /* |
| * Create locus back from flow. Unfortunately cdbpathlocus_from_flow() |
| * doesn't return hashed locus in repartitioned case, so we need to |
| * call from_exprs() again if it's available. |
| */ |
| input_path.locus = cdbpathlocus_from_flow(inner_plan->flow); |
| if (CdbPathLocus_IsPartitioned(input_path.locus) && |
| inner_plan->flow->hashExpr) |
| input_path.locus = cdbpathlocus_from_exprs(root, inner_plan->flow->hashExpr); |
| input_path.pathkeys = wag_context->inner_pathkeys; |
| |
| MemSet(&agg_counts, 0, sizeof(AggClauseCounts)); |
| count_agg_clauses((Node *) tlist, &agg_counts); |
| |
| /* |
| * Evaluate possibility for hash/sort strategy. Things have been changed |
| * since the last decision in grouping_planner(), as the base plan |
| * may now be sorted. |
| */ |
| use_hashed_grouping = choose_hashed_grouping(root, |
| group_context->tuple_fraction, |
| &input_path, |
| &input_path, |
| numGroups, |
| &agg_counts); |
| |
| ctx.best_path = &input_path; |
| ctx.cheapest_path = &input_path; |
| ctx.subplan = inner_plan; |
| ctx.sub_tlist = NIL; |
| ctx.tlist = tlist; |
| ctx.use_hashed_grouping = use_hashed_grouping; |
| ctx.tuple_fraction = 0.1; |
| ctx.canonical_grpsets = NULL; |
| ctx.grouping = 0; |
| ctx.numGroupCols = numGroupCols; |
| ctx.groupColIdx = grpColIdx; |
| ctx.numDistinctCols = 0; |
| ctx.distinctColIdx = NULL; |
| ctx.p_dNumGroups = &numGroups; |
| ctx.pcurrent_pathkeys = &pathkeys; |
| ctx.querynode_changed = &querynode_changed; |
| |
| /* |
| * Save the parse tree, for cdb_grouping_planner will modify it. |
| */ |
| original_parse = root->parse; |
| original_group_pathkeys = root->group_pathkeys; |
| root->parse = parse = copyObject(root->parse); |
| root->parse->targetList = tlist; |
| root->parse->havingQual = NULL; |
| root->parse->scatterClause = NIL; |
| root->group_pathkeys = |
| make_pathkeys_for_sortclauses(parse->groupClause, tlist); |
| |
| /* |
| * Make a multi-phase or simple aggregate plan. |
| */ |
| inner_plan = make_parallel_or_sequential_agg(root, |
| &agg_counts, |
| &ctx, |
| &wag_context->inner_pathkeys); |
| |
| /* |
| * Wrap plan by subquery as the inner of upcoming join. |
| */ |
| inner_plan = wrap_plan_index(root, |
| inner_plan, |
| parse, |
| &wag_context->inner_pathkeys, |
| Inner, |
| "inner_plan", |
| &parse); |
| wag_context->rtable = list_concat(wag_context->rtable, parse->rtable); |
| /* outer + inner = 2 */ |
| Assert(list_length(wag_context->rtable) == 2); |
| |
| /* |
| * Restore the original info. Note that group_pathkeys is updated |
| * in wrap_plan_index(), so don't move this before it. |
| */ |
| root->parse = original_parse; |
| root->group_pathkeys = original_group_pathkeys; |
| |
| return inner_plan; |
| } |
| |
| /* |
| * within_agg_join_plans |
| * |
| * Joins the outer and inner plans and filters out unrelated rows. We use MergeJoin |
| * as the outer is already sorted and want to keep the order to the aggregate. |
| * It requires the inner plan to be able to be re-scannable, so we add sort |
| * or material node to the inner plan. The join should produce exact same number |
| * of the outer plan as the join clause is in GROUP BY keys and the inner is |
| * grouped with them. |
| */ |
| static Plan * |
| within_agg_join_plans(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| Plan *outer_plan, |
| Plan *inner_plan) |
| { |
| Plan *result_plan; |
| ListCell *l; |
| int idx; |
| List *join_tlist; |
| List *join_clause; |
| const Index Outer = 1, Inner = 2; |
| List *extravars; |
| Var *pc_var, *tc_var; |
| |
| /* |
| * Up to now, these should've been prepared. |
| */ |
| Assert(wag_context->pc_pos > 0); |
| Assert(wag_context->tc_pos > 0); |
| |
| /* |
| * Build target list for grouping columns. |
| */ |
| join_clause = NIL; |
| foreach_with_count (l, root->parse->groupClause, idx) |
| { |
| GroupClause *gc = (GroupClause*) lfirst(l); |
| TargetEntry *tle; |
| Var *outer_var, *inner_var; |
| Expr *qual; |
| |
| /* |
| * Construct outer group keys. |
| */ |
| tle = get_sortgroupclause_tle(gc, outer_plan->targetlist); |
| Assert(tle && IsA(tle->expr, Var)); |
| outer_var = makeVar(Outer, tle->resno, |
| exprType((Node *) tle->expr), |
| exprTypmod((Node *) tle->expr), 0); |
| |
| /* |
| * Construct inner group keys. |
| */ |
| tle = get_tle_by_resno(inner_plan->targetlist, idx + 1); |
| Assert(tle && IsA(tle->expr, Var)); |
| inner_var = makeVar(Inner, tle->resno, |
| exprType((Node *) tle->expr), |
| exprTypmod((Node *) tle->expr), 0); |
| |
| /* |
| * Make join clause for group keys. |
| */ |
| qual = make_op(NULL, list_make1(makeString("=")), |
| (Node *) outer_var, (Node *) inner_var, -1); |
| qual->type = T_DistinctExpr; |
| qual = make_notclause(qual); |
| join_clause = lappend(join_clause, qual); |
| } |
| |
| /* |
| * This is similar to make_subplanTargetList(), but things are much simpler. |
| * Note that this makes sure that expressions like SRF are going to be |
| * in the upper aggregate target list rather than in this join target list. |
| */ |
| join_tlist = flatten_tlist(root->parse->targetList); |
| extravars = pull_var_clause(root->parse->havingQual, false); |
| join_tlist = add_to_flat_tlist(join_tlist, extravars, false); |
| |
| foreach (l, root->parse->groupClause) |
| { |
| GroupClause *gc = lfirst(l); |
| TargetEntry *gc_tle, *join_tle; |
| |
| /* |
| * We need the grouping expressions in the target list. If they are |
| * in the taget list already, we remember the grouping reference |
| * since exracting vars drop those information. Otherwise, we |
| * simply append the entry to the target list. |
| */ |
| gc_tle = get_sortgroupclause_tle(gc, root->parse->targetList); |
| join_tle = tlist_member((Node *) gc_tle->expr, join_tlist); |
| if (join_tle) |
| { |
| join_tle->ressortgroupref = gc->tleSortGroupRef; |
| join_tle->resname = gc_tle->resname; |
| } |
| else |
| { |
| join_tle = flatCopyTargetEntry(gc_tle); |
| join_tlist = lappend(join_tlist, join_tle); |
| } |
| } |
| |
| /* |
| * Make sure that the peer count and the total count is in the |
| * target list of the join. They will be needed in the upper |
| * final aggregate by the percentile functions. |
| */ |
| pc_var = makeVar(Outer, wag_context->pc_pos, INT8OID, -1, 0); |
| tc_var = makeVar(Inner, wag_context->tc_pos, INT8OID, -1, 0); |
| join_tlist = add_to_flat_tlist(join_tlist, |
| list_make2((void *) pc_var, (void *) tc_var), |
| false); |
| |
| /* |
| * It is ideal to tell if the inner plan is fine to merge-join by |
| * examining it as re-scannable plan, but it seems we don't have |
| * such infrastructure, so here we assume the inner plan is not |
| * re-scannable and not sorted. If it is a grouping query, |
| * we add sort node, otherwise just put a materialize node. |
| */ |
| if (root->parse->groupClause && !wag_context->inner_pathkeys) |
| { |
| AttrNumber *grpColIdx; |
| |
| grpColIdx = get_grouplist_colidx(root->parse->groupClause, |
| inner_plan->targetlist, NULL); |
| inner_plan = (Plan *) |
| make_sort_from_groupcols(root, |
| root->parse->groupClause, |
| grpColIdx, |
| false, |
| inner_plan); |
| mark_sort_locus(inner_plan); |
| } |
| else |
| { |
| inner_plan = (Plan *) make_material(inner_plan); |
| mark_passthru_locus(inner_plan, true, true); |
| } |
| |
| /* |
| * All set. Join two plans. |
| * We choose cartesian product if there is no join clauses, meaning |
| * no grouping happens. |
| */ |
| if (list_length(join_clause) > 0) |
| result_plan = (Plan *) make_mergejoin(join_tlist, |
| NIL, |
| NIL, |
| join_clause, |
| outer_plan, |
| inner_plan, |
| JOIN_INNER); |
| else |
| result_plan = (Plan *) make_nestloop(join_tlist, |
| NIL, |
| NIL, |
| outer_plan, |
| inner_plan, |
| JOIN_INNER); |
| |
| result_plan->startup_cost = outer_plan->startup_cost + inner_plan->startup_cost; |
| result_plan->plan_rows = outer_plan->plan_rows; |
| result_plan->total_cost = outer_plan->total_cost + inner_plan->total_cost; |
| result_plan->total_cost += cpu_tuple_cost * result_plan->plan_rows; |
| mark_passthru_locus(result_plan, true, true); |
| |
| return result_plan; |
| } |
| |
| /* |
| * within_agg_final_agg |
| * |
| * The final stage of plan_within_agg_persort(). The input plans are already |
| * joined and the total count from the inner plan is available now. The |
| * peer count and the total count is attached to the PercentileExpr so that |
| * it will get those values in the percentile function in the executor. |
| */ |
| static Plan * |
| within_agg_final_agg(PlannerInfo *root, |
| GroupContext *group_context, |
| WithinAggContext *wag_context, |
| List *sortClause, |
| Plan *result_plan) |
| { |
| ListCell *l; |
| List *percentiles; |
| Var *pc_var, *tc_var; |
| AttrNumber *grpColIdx; |
| int numGroupCols; |
| AggClauseCounts agg_counts; |
| AggStrategy aggstrategy; |
| const Index Outer = 1, Inner = 2; |
| |
| /* |
| * Sanity check. These should've been prepared up to now. |
| */ |
| Assert(wag_context->pc_pos > 0); |
| Assert(wag_context->tc_pos > 0); |
| |
| /* |
| * Attach the peer count and the total count expressions to |
| * PercentileExpr, which will be needed in the executor. |
| */ |
| percentiles = fetch_percentiles(root->parse, sortClause); |
| pc_var = makeVar(Outer, wag_context->pc_pos, INT8OID, -1, 0); |
| tc_var = makeVar(Inner, wag_context->tc_pos, INT8OID, -1, 0); |
| foreach (l, percentiles) |
| { |
| PercentileExpr *perc = lfirst(l); |
| |
| perc->pcExpr = (Expr *) pc_var; |
| perc->tcExpr = (Expr *) tc_var; |
| } |
| |
| MemSet(&agg_counts, 0, sizeof(AggClauseCounts)); |
| count_agg_clauses((Node *) root->parse->targetList, &agg_counts); |
| |
| /* |
| * Prepare GROUP BY clause for the final aggregate. |
| * Make sure the column indices point to the topmost target list. |
| */ |
| grpColIdx = get_grouplist_colidx(root->parse->groupClause, |
| result_plan->targetlist, &numGroupCols); |
| aggstrategy = root->parse->groupClause ? AGG_SORTED : AGG_PLAIN; |
| result_plan = (Plan *) make_agg(root, |
| root->parse->targetList, |
| (List *) root->parse->havingQual, |
| aggstrategy, |
| false, |
| numGroupCols, |
| grpColIdx, |
| *group_context->p_dNumGroups, |
| 0, /* num_nullcols */ |
| 0, /* input_grouping */ |
| 0, /* grouping */ |
| 0, /* rollup_gs_times */ |
| 1, /* numAggs */ |
| agg_counts.transitionSpace, |
| result_plan); |
| |
| /* |
| * Stop copying sorts in flow, for the targetlist doesn't have them anymore. |
| */ |
| mark_passthru_locus(result_plan, true, false); |
| |
| /* |
| * current_pathkeys is not needed anymore, but just in case. |
| */ |
| wag_context->current_pathkeys = NIL; |
| |
| return result_plan; |
| } |
| |
| /* |
| * plan_within_agg_persort |
| * Make a series of plans to calculate percentile per identical sort clause. |
| * |
| * The main stream of this planner is: |
| * - Build the base plan |
| * - Sort the input by group and target value |
| * - Split the input into two |
| * - One for the outer |
| * - The other for the inner, aggregate to count actual row number |
| * - Join two trees to get the inner row count to the sorted outer together |
| * - Aggregate the result of join to perform the final aggregate |
| * |
| * The first step is to figure out if we could optimize the input by |
| * de-duplicate table. If so, we create a sub-plan with aggregate to |
| * reduce the rows into GROUP + ORDER unit and add the peer count in this unit. |
| * If we don't take de-duplicate plan, we append constant 1 as the peer count |
| * to the target list to match the semantics with de-duplicate plan. In any |
| * case, the input plan is wrapped by a SubqueryScan in order not to confuse |
| * upper plan. The peer count takes account into NULL, so that the percentile |
| * function can operate on only non-NULL target value. |
| * |
| * The next step is for the outer plan; sort the input tuples by group clause |
| * and the target value. During this process, the input tuples are distributed |
| * by the group clause. We need to keep this order until the final aggregate |
| * so that the final percentile function can tell ordering of the input. |
| * |
| * Then we look at the inner side. It basically calculates number of rows |
| * per group. Since we know the peer count per each distinct target value in |
| * a group in either of naive or deduplicate approach, we actually take sum() |
| * rather than count() to calculate the total number of non-NULL target values. |
| * |
| * Now, we can join two plans. The inner total count is going to be with |
| * the target value for each group here. Note that we use Merge Join to |
| * keep the outer order and the inner should have capability to be rescanned |
| * per its requirement. |
| * |
| * Finally, the rows are aggregated per group. Since PercentileExpr needs |
| * to know the peer count and the total count in conjunction with its |
| * original argument and the target value (as the ORDER BY clause,) and we now |
| * know where those values are, here we attach those expressions to |
| * PercentileExpr nodes. These additional expressions are similar to |
| * AggOrder in Aggref, which will be combined with the original argument |
| * expression in the executor to be evaluated. |
| * |
| * The final result of this function is a plan ending Agg node with target |
| * list including each percentile result. Note this function should not |
| * change the semantics of input target list as an encapsulation. |
| */ |
| static Plan * |
| plan_within_agg_persort(PlannerInfo *root, |
| GroupContext *group_context, |
| List *sortClause, |
| List *current_pathkeys, |
| Plan *result_plan) |
| { |
| WithinAggContext wag_context; |
| Plan *outer_plan, *inner_plan; |
| List *partners; |
| ListCell *l; |
| |
| memset(&wag_context, 0, sizeof(WithinAggContext)); |
| wag_context.current_pathkeys = current_pathkeys; |
| |
| /* |
| * Group clause expressions should be in ascending order, |
| * because our MergeJoin is not able to handle descending-ordered |
| * child plans. It is desirable to improve MergeJoin, but it requires |
| * amount of work. |
| */ |
| foreach (l, root->parse->groupClause) |
| { |
| GroupClause *gc = lfirst(l); |
| Node *gcexpr; |
| Oid gctype; |
| |
| /* |
| * We assume only flattened grouping expressions here. |
| */ |
| Assert(IsA(gc, GroupClause)); |
| gcexpr = get_sortgroupclause_expr(gc, group_context->sub_tlist); |
| gctype = exprType(gcexpr); |
| gc->sortop = ordering_oper_opid(gctype); |
| } |
| |
| /* |
| * Make a common plan shared by outer and inner plan. It may become |
| * a de-duplicate plan. |
| */ |
| result_plan = within_agg_make_baseplan(root, |
| group_context, |
| &wag_context, |
| sortClause, |
| result_plan); |
| Assert(wag_context.pc_pos > 0); |
| Assert(IsA(result_plan, SubqueryScan)); |
| |
| /* |
| * Split the tree into outer and inner which will be joined later. |
| * It comes before Sort, so that the both of outer and inner run |
| * in parallel. We observed in most cases splitting it here requires |
| * RedistributeMotion in both sides, which allows more parallel way. |
| */ |
| partners = share_plan(root, result_plan, 2); |
| outer_plan = list_nth(partners, 0); |
| inner_plan = list_nth(partners, 1); |
| |
| /* |
| * Keep the pathkeys for the inner as pre-Subquery one. |
| */ |
| wag_context.inner_pathkeys = wag_context.current_pathkeys; |
| |
| /* |
| * Add sort if necessary on top of outer plan of join. |
| */ |
| outer_plan = within_agg_add_outer_sort(root, &wag_context, sortClause, outer_plan); |
| Assert(IsA(outer_plan, SubqueryScan)); |
| Assert(list_length(wag_context.rtable) == 1); |
| |
| /* |
| * Construct inner plan of join that returns only number of rows. |
| * The inner side always create target list looking like |
| * G1, G2, ..., count(*) TP |
| */ |
| inner_plan = within_agg_construct_inner(root, |
| group_context, |
| &wag_context, |
| inner_plan); |
| Assert(IsA(inner_plan, SubqueryScan)); |
| Assert(wag_context.tc_pos > 0); |
| Assert(list_length(wag_context.rtable) == 2); |
| |
| /* |
| * merge join to find interesting one or two rows per group per |
| * percentile. |
| */ |
| result_plan = within_agg_join_plans(root, |
| group_context, |
| &wag_context, |
| outer_plan, |
| inner_plan); |
| |
| #ifdef NOT_USED |
| if (true) |
| { |
| /* |
| * For debug purpose. |
| * This helps to see what's the intermediate result. |
| */ |
| root->parse->targetList = copyObject(result_plan->targetlist); |
| root->parse->rtable = wag_context.rtable; |
| return result_plan; |
| } |
| #endif |
| |
| /* |
| * Finally put Agg node to calculate exact value with filtered few rows. |
| */ |
| result_plan = within_agg_final_agg(root, |
| group_context, |
| &wag_context, |
| sortClause, |
| result_plan); |
| |
| root->parse->targetList = copyObject(result_plan->targetlist); |
| root->parse->rtable = wag_context.rtable; |
| rebuild_simple_rel_and_rte(root); |
| |
| return result_plan; |
| } |
| |
| /* |
| * within_agg_planner |
| * |
| * This is the entry point of within-aggregate planner. |
| * Makes a specialized plan for within aggregates. If we find the query |
| * has normal aggregates among the within-aggregates, or within-aggregates |
| * have different sortClause, we generate multiple sub-plans regarding with |
| * the sortClause and combine the results by joining those sub-plan trees. |
| * For the normal aggregates, we call cdb_grouping_planner() to plan the tree, |
| * and treat it as one of the split sub-plans. |
| */ |
| Plan * |
| within_agg_planner(PlannerInfo *root, |
| AggClauseCounts *agg_counts, |
| GroupContext *group_context) |
| { |
| List *aggnodes, *percnodes; |
| ListCell *l; |
| List **aggreflist; |
| List **sortlist; |
| int numsortlist; |
| int numGroupCols, numDistinctCols; |
| AttrNumber *grpColIdx, *distinctColIdx; |
| int i; |
| List *sub_tlist; |
| AttrNumber next_resno; |
| List *current_pathkeys; |
| Plan *result_plan; |
| |
| aggnodes = extract_nodes(NULL, (Node *) root->parse->targetList, T_Aggref); |
| aggnodes = list_concat(aggnodes, |
| extract_nodes(NULL, (Node *) root->parse->havingQual, T_Aggref)); |
| aggnodes = list_concat(aggnodes, |
| extract_nodes(NULL, (Node *) root->parse->scatterClause, T_Aggref)); |
| percnodes = extract_nodes(NULL, (Node *) root->parse->targetList, T_PercentileExpr); |
| percnodes = list_concat(percnodes, |
| extract_nodes(NULL, (Node *) root->parse->havingQual, T_PercentileExpr)); |
| percnodes = list_concat(percnodes, |
| extract_nodes(NULL, (Node *) root->parse->scatterClause, T_PercentileExpr)); |
| /* Allocate maximum number of list */ |
| numsortlist = list_length(percnodes) + 1; |
| /* initialize each element with NIL */ |
| aggreflist = (List **) palloc0(sizeof(List *) * numsortlist); |
| sortlist = (List **) palloc0(sizeof(List *) * numsortlist); |
| numsortlist = 0; /* Use this as a counter */ |
| |
| sub_tlist = group_context->sub_tlist; |
| next_resno = list_length(sub_tlist) + 1; |
| |
| /* |
| * WITHIN aggregates are not supported in the grouping extensions. |
| * However, parse->groupClause may have non-flattened GroupClause list. |
| * We simply flatten it by reconstruct_group_clause under the assumption |
| * that we have denied grouping extension cases. |
| */ |
| Assert(!is_grouping_extension(group_context->canonical_grpsets)); |
| grpColIdx = get_grouplist_colidx( |
| root->parse->groupClause, sub_tlist, &numGroupCols); |
| root->parse->groupClause = |
| reconstruct_group_clause(root->parse->groupClause, |
| sub_tlist, |
| grpColIdx, |
| numGroupCols); |
| numDistinctCols = agg_counts->numDistinctAggs; |
| distinctColIdx = (AttrNumber *) palloc(sizeof(AttrNumber) * numDistinctCols); |
| numDistinctCols = 0; /* Use this as a counter */ |
| |
| /* |
| * Collect aggref nodes to process them separately from percentiles. |
| * Note we represent this special case by NIL for sortClause (sortlist.) |
| */ |
| if (aggnodes) |
| { |
| sortlist[numsortlist] = NIL; |
| foreach (l, aggnodes) |
| { |
| Aggref *aggref = lfirst(l); |
| |
| if (aggref->aggdistinct) |
| { |
| Node *arg; |
| TargetEntry *sub_tle; |
| |
| Assert(list_length(aggref->args) == 1); |
| arg = linitial(aggref->args); |
| sub_tle = tlist_member(arg, sub_tlist); |
| if (!sub_tle) |
| { |
| sub_tle = makeTargetEntry((Expr *) arg, |
| next_resno++, |
| "<expr>", |
| true); |
| sub_tlist = lappend(sub_tlist, sub_tle); |
| } |
| distinctColIdx[numDistinctCols++] = sub_tle->resno; |
| } |
| aggreflist[numsortlist] = lappend(aggreflist[numsortlist], aggref); |
| } |
| numsortlist++; |
| } |
| |
| /* |
| * Collect percentile nodes and classify them into some groups by sortClause. |
| * During this process, if sub_tlist lacks target entry for the |
| * sortClause, it is added. |
| */ |
| foreach (l, percnodes) |
| { |
| PercentileExpr *perc = lfirst(l); |
| List *sortClause; |
| ListCell *sl; |
| |
| sortClause = perc->sortClause; |
| Assert(sortClause); |
| |
| /* |
| * We need to add tlist to the parse's tlist. This is |
| * basically parser's task, but the list is separated |
| * to keep away from ungroup'ed columns check and |
| * this is the right place to add back to the parser's tlist. |
| */ |
| foreach (sl, sortClause) |
| { |
| SortClause *sc = lfirst(sl); |
| TargetEntry *tle, *sub_tle; |
| |
| tle = get_sortgroupclause_tle(sc, perc->sortTargets); |
| sub_tle = tlist_member((Node *) tle->expr, sub_tlist); |
| if (!sub_tle) |
| { |
| sub_tle = makeTargetEntry(tle->expr, |
| next_resno++, |
| tle->resname, |
| tle->resjunk); |
| sub_tlist = lappend(sub_tlist, sub_tle); |
| } |
| sc->tleSortGroupRef = assignSortGroupRef(sub_tle, sub_tlist); |
| tle->ressortgroupref = sc->tleSortGroupRef; |
| } |
| /* Find identical sortClause. */ |
| for (i = 0; i < numsortlist && !equal(sortClause, sortlist[i]); i++); |
| |
| if (i == numsortlist) |
| { |
| /* Not found, so add a new one. */ |
| sortlist[numsortlist] = sortClause; |
| numsortlist++; |
| } |
| |
| /* |
| * Add this percentile to the group associated with the sortClause. |
| */ |
| aggreflist[i] = lappend(aggreflist[i], perc); |
| } |
| /* Make sure group_context->sub_tlist has pointer to what we made */ |
| group_context->sub_tlist = sub_tlist; |
| |
| /* |
| * Make the scaffold. We always take best_path here because |
| * it is not clear which to use for upcoming complex plans. |
| */ |
| Assert(sub_tlist != NIL); |
| result_plan = create_plan(root, group_context->best_path); |
| result_plan = plan_pushdown_tlist(result_plan, sub_tlist); |
| Assert(result_plan->flow); |
| current_pathkeys = group_context->best_path->pathkeys; |
| |
| /* |
| * numsortlist is actually the number of sub-plans. |
| */ |
| Insist(numsortlist > 0); |
| |
| /* |
| * The approach is very close to the one for DQA. If the plan consists |
| * only one sort group, then construct straightforward plan without |
| * mangling target list. If more than one, including normal aggregate, |
| * we split each sort group (normal aggregate goes to NIL sort group) into |
| * sub pieces and construct separate plans, and join them to get the final |
| * plan to match the desired target list. |
| */ |
| if (numsortlist == 1) |
| { |
| /* |
| * Simply plan a tree and return it. We don't clutter the target list. |
| */ |
| result_plan = plan_within_agg_persort(root, |
| group_context, |
| *sortlist, |
| current_pathkeys, |
| result_plan); |
| } |
| else |
| { |
| /* |
| * The pattern on multi-level sort is similar to multi dqa. |
| * We use its infrastructure much to avoid reinventing wheel. |
| */ |
| List *base_plans; |
| MppGroupContext mgctx; |
| List *rtable; |
| |
| base_plans = share_plan(root, result_plan, numsortlist); |
| Assert(numsortlist > 0); |
| |
| MemSet(&mgctx, 0, sizeof(MppGroupContext)); |
| |
| /* |
| * XXX: Need support of DqaJoinMerge for types used in GROUP BY clause |
| * which are not mergejoinable. |
| */ |
| mgctx.join_strategy = numGroupCols > 0 ? DqaJoinHash : DqaJoinCross; |
| mgctx.numGroupCols = numGroupCols; |
| |
| /* |
| * This code is from deconstruct_agg_info. What it does is to |
| * collect grouping keys and make a simple list which contain |
| * only those key expressions, which will be used in each individual |
| * plan tree as leading columns (and later JOIN clause). |
| */ |
| mgctx.grps_tlist = NIL; |
| for (i = 0; i < numGroupCols; i++) |
| { |
| TargetEntry *sub_tle, *prelim_tle; |
| |
| sub_tle = get_tle_by_resno(sub_tlist, grpColIdx[i]); |
| prelim_tle = flatCopyTargetEntry(sub_tle); |
| prelim_tle->resno = list_length(mgctx.grps_tlist) + 1; |
| mgctx.grps_tlist = lappend(mgctx.grps_tlist, prelim_tle); |
| } |
| mgctx.dref_tlists = (List **) palloc0(numsortlist * sizeof(List *)); |
| |
| /* |
| * Within-aggregate special. Used in within_aggregate_expr(). |
| * Each sub-plan tree is identified by the sort clause. |
| */ |
| mgctx.wagSortClauses = NIL; |
| for (i = 0; i < numsortlist; i++) |
| mgctx.wagSortClauses = lappend(mgctx.wagSortClauses, sortlist[i]); |
| |
| /* |
| * Prepare the final tlist to restore the original list. The main work |
| * goes into deconstruct_within_agg(), which determins which sub-plan tree |
| * this expression is actually coming from, and store that information in mgctx. |
| */ |
| foreach (l, root->parse->targetList) |
| { |
| TargetEntry *tle, *final_tle; |
| |
| tle = (TargetEntry *) lfirst(l); |
| final_tle = flatCopyTargetEntry(tle); |
| final_tle->expr = |
| (Expr *) deconstruct_within_agg((Node *) tle->expr, &mgctx); |
| mgctx.fin_tlist = lappend(mgctx.fin_tlist, final_tle); |
| } |
| |
| /* |
| * HAVING clause, same as target list. We wish we could optimize this |
| * as pushing each expression down to the individual plan tree, but |
| * we don't do it and just follow the same notion of DQA for now. |
| */ |
| foreach (l, (List *) root->parse->havingQual) |
| { |
| Expr *qual, *fin_hqual; |
| |
| qual = lfirst(l); |
| fin_hqual = (Expr *) deconstruct_within_agg((Node *) qual, &mgctx); |
| mgctx.fin_hqual = lappend(mgctx.fin_hqual, fin_hqual); |
| } |
| |
| /* |
| * Offsets. Used in join_dqa_coplan |
| */ |
| mgctx.dqa_offsets = (int *) palloc(sizeof(int) * (numsortlist + 1)); |
| mgctx.dqa_offsets[0] = numGroupCols; |
| for (i = 0; i < numsortlist; i++) |
| { |
| mgctx.dqa_offsets[i + 1] = |
| mgctx.dqa_offsets[i] + list_length(mgctx.dref_tlists[i]); |
| } |
| |
| /* |
| * Now plan each tree. Store them to array and later join them. |
| * Don't forget to save rtable representing each subquery. |
| */ |
| rtable = NIL; |
| mgctx.dqaArgs = (DqaInfo *) palloc(numsortlist * sizeof(DqaInfo)); |
| /* keep the original query */ |
| for (i = 0; i < numsortlist; i++) |
| { |
| Plan *coplan; |
| Query *coquery; |
| char queryname[256]; |
| PlannerInfo root_copy; |
| size_t sz; |
| |
| /* |
| * The base plan is created by best_path. |
| */ |
| current_pathkeys = group_context->best_path->pathkeys; |
| /* |
| * We use different instance of PlannerInfo for each cycle |
| * especially cdb_grouping_planner frees simple_rel_array. |
| * See also plan_append_aggs_with_rewrite. |
| */ |
| memcpy(&root_copy, root, sizeof(PlannerInfo)); |
| sz = root->simple_rel_array_size * sizeof(RelOptInfo *); |
| root_copy.simple_rel_array = |
| (RelOptInfo **) palloc(sz); |
| memcpy(root_copy.simple_rel_array, root->simple_rel_array, sz); |
| |
| /* |
| * Query should be copied deeply for the planner changes it. |
| */ |
| coquery = copyObject(root->parse); |
| coquery->targetList = seq_tlist_concat(copyObject(mgctx.grps_tlist), mgctx.dref_tlists[i]); |
| /* |
| * Clear havingQual and scatterClause, since they will be handled only |
| * the top of joins, and never in individual aggregate. |
| */ |
| coquery->havingQual = NULL; |
| coquery->scatterClause = NIL; |
| root_copy.parse = coquery; |
| |
| if (sortlist[i]) |
| { |
| /* |
| * Run the specialized planner for aggregate expressions |
| * gathered by an identical ORDER BY clause. |
| */ |
| coplan = plan_within_agg_persort(&root_copy, |
| group_context, |
| sortlist[i], |
| current_pathkeys, |
| list_nth(base_plans, i)); |
| } |
| else |
| { |
| /* |
| * Run normal grouping planner for normal aggs. |
| */ |
| GroupContext local_group_context; |
| |
| memcpy(&local_group_context, group_context, sizeof(GroupContext)); |
| local_group_context.subplan = list_nth(base_plans, i); |
| local_group_context.tlist = coquery->targetList; |
| /* These fields are not set in grouping_planner */ |
| local_group_context.numGroupCols = numGroupCols; |
| local_group_context.groupColIdx = grpColIdx; |
| local_group_context.numDistinctCols = numDistinctCols; |
| local_group_context.distinctColIdx = distinctColIdx; |
| |
| /* |
| * Make a multi-phase or simple aggregate plan. |
| * |
| * agg_counts contain only normal aggregate information |
| * (without within-aggs information), so it's safe to use it |
| * as it's passed here. |
| */ |
| coplan = make_parallel_or_sequential_agg(&root_copy, |
| agg_counts, |
| &local_group_context, |
| ¤t_pathkeys); |
| } |
| |
| snprintf(queryname, sizeof(queryname), "wag_coplan_%d", i + 1); |
| mgctx.dqaArgs[i].coplan = |
| wrap_plan_index(&root_copy, coplan, coquery, |
| NULL, i + 1, queryname, |
| &coquery); |
| rtable = list_concat(rtable, coquery->rtable); |
| } |
| |
| /* Begin with the first coplan, then join in each succeeding coplan. */ |
| result_plan = mgctx.dqaArgs[0].coplan; |
| for (i = 1; i < numsortlist; i++) |
| result_plan = join_dqa_coplan(root, &mgctx, result_plan, i); |
| |
| /* Prepare for finalize_split_expr. */ |
| mgctx.top_tlist = result_plan->targetlist; |
| /* Now to set true. */ |
| mgctx.use_dqa_pruning = true; |
| result_plan->targetlist = (List *) finalize_split_expr((Node *) mgctx.fin_tlist, &mgctx); |
| result_plan->qual = (List *) finalize_split_expr((Node *) mgctx.fin_hqual, &mgctx); |
| |
| UpdateScatterClause(root->parse, result_plan->targetlist); |
| /* |
| * Reconstruct the flow since the targetlist for the result_plan may have |
| * changed. |
| */ |
| result_plan->flow = pull_up_Flow(result_plan, |
| result_plan->lefttree, |
| true); |
| /* Need to adjust root->parse for upper plan. */ |
| root->parse->rtable = rtable; |
| root->parse->targetList = copyObject(result_plan->targetlist); |
| rebuild_simple_rel_and_rte(root); |
| } |
| |
| /* Tell the upper planner that we changed the node */ |
| *(group_context->pcurrent_pathkeys) = NIL; |
| *(group_context->querynode_changed) = true; |
| |
| return result_plan; |
| } |
| |
| Plan *add_motion_to_dqa_child(Plan *plan, PlannerInfo *root, bool *motion_added) |
| { |
| Plan *result = plan; |
| *motion_added = false; |
| |
| List *pathkeys = make_pathkeys_for_groupclause(root->parse->groupClause, plan->targetlist); |
| CdbPathLocus locus = cdbpathlocus_from_flow(plan->flow); |
| if (CdbPathLocus_IsPartitioned(locus) && NIL != plan->flow->hashExpr) |
| { |
| locus = cdbpathlocus_from_exprs(root, plan->flow->hashExpr); |
| } |
| |
| if (!cdbpathlocus_collocates(locus, pathkeys, true /*exact_match*/)) |
| { |
| /* MPP-22413: join requires exact distribution match for collocation purposes, |
| * which may not be provided by the underlying group by, as computing the |
| * group by only requires relaxed distribution collocation |
| */ |
| List *groupExprs = get_sortgrouplist_exprs(root->parse->groupClause, |
| plan->targetlist); |
| result = (Plan *) make_motion_hash(root, plan, groupExprs); |
| result->total_cost += incremental_motion_cost(plan->plan_rows, plan->plan_rows); |
| *motion_added = true; |
| } |
| |
| return result; |
| } |
| |