| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * cdbsetop.c |
| * Routines to aid in planning set-operation queries for parallel |
| * execution. This is, essentially, an extension of the file |
| * optimizer/prep/prepunion.c, although some functions are not |
| * externalized. |
| * |
| * Portions Copyright (c) 2005-2008, Greenplum inc |
| * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include "catalog/pg_type.h" |
| #include "nodes/makefuncs.h" |
| |
| #include "cdb/cdblink.h" /* getgpsegmentCount */ |
| #include "cdb/cdbllize.h" |
| #include "cdb/cdbmutate.h" |
| #include "cdb/cdbsetop.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/cdbpullup.h" |
| |
| #include "optimizer/tlist.h" /* tlist_member */ |
| #include "parser/parse_expr.h" /* exprType and exprTypmod */ |
| #include "parser/parsetree.h" /* get_tle_by_resno */ |
| |
| static Flow *copyFlow(Flow *model_flow, bool withExprs, bool withSort); |
| static List *makeHashExprsFromNonjunkTargets(List *targetList); |
| |
| #define ARRAYCOPY(to, from, sz) \ |
| do { \ |
| Size _size = (sz); \ |
| (to) = palloc(_size); \ |
| memcpy((to), (from), _size); \ |
| } while (0) |
| |
| /* |
| * Function: choose_setop_type |
| * |
| * Decide what type of plan to use for a set operation based on the loci of |
| * the node list input to the set operation. |
| * |
| * See the comments in cdbsetop.h for discussion of types of setop plan. |
| */ |
| GpSetOpType choose_setop_type(List *planlist) |
| { |
| ListCell *cell; |
| Plan *subplan = NULL; |
| bool ok_general = TRUE; |
| bool ok_partitioned = TRUE; |
| bool ok_replicated = TRUE; |
| bool ok_single_qd = TRUE; |
| bool ok_single_qe = TRUE; |
| bool has_partitioned = FALSE; |
| |
| Assert( Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY ); |
| |
| foreach(cell, planlist) |
| { |
| Flow *subplanflow; |
| subplan = (Plan*)lfirst(cell); |
| subplanflow = subplan->flow; |
| |
| Assert(is_plan_node((Node*)subplan)); |
| Assert(subplanflow != NULL); |
| switch ( subplanflow->locustype ) |
| { |
| case CdbLocusType_Hashed: |
| case CdbLocusType_HashedOJ: |
| case CdbLocusType_Strewn: |
| ok_general = ok_replicated = FALSE; |
| has_partitioned = TRUE; |
| break; |
| |
| case CdbLocusType_Entry: |
| ok_general = ok_partitioned = ok_replicated = ok_single_qe = FALSE; |
| break; |
| |
| case CdbLocusType_SingleQE: |
| ok_general = ok_replicated = FALSE; |
| break; |
| |
| case CdbLocusType_General: |
| break; |
| |
| case CdbLocusType_Null: |
| case CdbLocusType_Replicated: |
| default: |
| return PSETOP_NONE; |
| } |
| } |
| |
| if ( ok_general ) |
| return PSETOP_GENERAL; |
| else if ( ok_partitioned && has_partitioned ) |
| return PSETOP_PARALLEL_PARTITIONED; |
| else if ( ok_single_qe ) |
| return PSETOP_SEQUENTIAL_QE; |
| else if ( ok_single_qd ) |
| return PSETOP_SEQUENTIAL_QD; |
| |
| return PSETOP_NONE; |
| } |
| |
| |
| void adjust_setop_arguments(List *planlist, GpSetOpType setop_type) |
| { |
| ListCell *cell; |
| Plan *subplan; |
| Plan *adjusted_plan; |
| |
| foreach ( cell, planlist ) |
| { |
| Flow* subplanflow; |
| subplan = (Plan*)lfirst(cell); |
| subplanflow = subplan->flow; |
| |
| Assert(is_plan_node((Node*)subplan)); |
| Assert(subplanflow != NULL); |
| |
| adjusted_plan = subplan; |
| switch ( setop_type ) |
| { |
| case PSETOP_GENERAL: |
| /* This only occurs when all arguments are general. */ |
| break; |
| |
| case PSETOP_PARALLEL_PARTITIONED: |
| switch ( subplanflow->locustype ) |
| { |
| case CdbLocusType_Hashed: |
| case CdbLocusType_HashedOJ: |
| case CdbLocusType_Strewn: |
| Assert( subplanflow->flotype == FLOW_PARTITIONED ); |
| break; |
| case CdbLocusType_SingleQE: |
| case CdbLocusType_General: |
| Assert( subplanflow->flotype == FLOW_SINGLETON && subplanflow->segindex > -1 ); |
| /* The setop itself will run on an N-gang, so we need to |
| * arrange for the singleton input to be separately dispatched |
| * to a 1-gang and collect its result on one of our N QEs. |
| * Hence ... */ |
| adjusted_plan = (Plan *)make_motion_hash_all_targets(NULL, subplan); |
| break; |
| case CdbLocusType_Null: |
| case CdbLocusType_Entry: |
| case CdbLocusType_Replicated: |
| default: |
| ereport(ERROR, ( |
| errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected argument locus to set operation") )); |
| break; |
| } |
| break; |
| |
| case PSETOP_SEQUENTIAL_QD: |
| switch ( subplanflow->locustype ) |
| { |
| case CdbLocusType_Hashed: |
| case CdbLocusType_HashedOJ: |
| case CdbLocusType_Strewn: |
| Assert( subplanflow->flotype == FLOW_PARTITIONED ); |
| adjusted_plan = (Plan*)make_motion_gather_to_QD(subplan, false); |
| break; |
| |
| case CdbLocusType_SingleQE: |
| Assert( subplanflow->flotype == FLOW_SINGLETON && subplanflow->segindex == 0 ); |
| adjusted_plan = (Plan*)make_motion_gather_to_QD(subplan, false); |
| break; |
| |
| case CdbLocusType_Entry: |
| case CdbLocusType_General: |
| break; |
| |
| case CdbLocusType_Null: |
| case CdbLocusType_Replicated: |
| default: |
| ereport(ERROR, ( |
| errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected argument locus to set operation") )); |
| break; |
| } |
| break; |
| |
| case PSETOP_SEQUENTIAL_QE: |
| switch ( subplanflow->locustype ) |
| { |
| case CdbLocusType_Hashed: |
| case CdbLocusType_HashedOJ: |
| case CdbLocusType_Strewn: |
| Assert( subplanflow->flotype == FLOW_PARTITIONED ); |
| /* Gather to QE. No need to keep ordering. */ |
| adjusted_plan = (Plan*)make_motion_gather_to_QE(subplan, false); |
| break; |
| |
| case CdbLocusType_SingleQE: |
| Assert( subplanflow->flotype == FLOW_SINGLETON && subplanflow->segindex != -1 ); |
| break; |
| |
| case CdbLocusType_General: |
| break; |
| |
| case CdbLocusType_Entry: |
| case CdbLocusType_Null: |
| case CdbLocusType_Replicated: |
| default: |
| ereport(ERROR, ( |
| errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected argument locus to set operation") )); |
| break; |
| } |
| break; |
| |
| case PSETOP_PARALLEL_REPLICATED: |
| /* Only when all args are replicated. */ |
| ereport(ERROR, (errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected replicated intermediate result"), |
| errdetail("argument to set operation may not be replicated") )); |
| break; |
| |
| default: |
| /* Can't happen! */ |
| ereport(ERROR, ( |
| errcode(ERRCODE_CDB_INTERNAL_ERROR), |
| errmsg("unexpected arguments to set operation") )); |
| break; |
| } |
| |
| /* If we made changes, inject them into the argument list. */ |
| if ( subplan != adjusted_plan ) |
| { |
| subplan = adjusted_plan; |
| cell->data.ptr_value = subplan; |
| } |
| } |
| |
| return; |
| } |
| |
| |
| /* |
| * Copy a Flow node. Only the declarative part is preserved. Not, e.g., |
| * any required movement or transformation. Hash information is preserved |
| * only if withExprs is true. Sort specifications are preserved only |
| * if withSort is true. |
| * |
| * A NULL result indicates either a NULL argument or a problem. |
| */ |
| Flow *copyFlow(Flow *model_flow, bool withExprs, bool withSort) |
| { |
| Flow *new_flow = NULL; |
| |
| if (model_flow == NULL) |
| return NULL; |
| |
| new_flow = makeFlow(model_flow->flotype); |
| new_flow->locustype = model_flow->locustype; |
| |
| if (model_flow->flotype == FLOW_PARTITIONED) |
| { |
| /* Copy hash attribute definitions, if wanted and available. */ |
| if (withExprs && model_flow->hashExpr != NULL) |
| { |
| new_flow->hashExpr = copyObject(model_flow->hashExpr); |
| } |
| } |
| else if (model_flow->flotype == FLOW_SINGLETON) |
| { |
| /* Propagate segment definition. */ |
| new_flow->segindex = model_flow->segindex; |
| } |
| else if (model_flow->flotype != FLOW_REPLICATED) |
| { |
| /* Clean up and give up. This isn't one of our blessed types. */ |
| pfree(new_flow); |
| return NULL; |
| } |
| |
| /* Propogate sort attributes, if wanted and available. */ |
| if (withSort && model_flow->numSortCols > 0) |
| { |
| new_flow->numSortCols = model_flow->numSortCols; |
| ARRAYCOPY( |
| new_flow->sortColIdx, |
| model_flow->sortColIdx, |
| model_flow->numSortCols * sizeof(AttrNumber) ); |
| ARRAYCOPY( |
| new_flow->sortOperators, |
| model_flow->sortOperators, |
| model_flow->numSortCols * sizeof(Oid) ); |
| } |
| |
| return new_flow; |
| } |
| |
| |
| /* |
| * make_motion_gather_to_QD |
| * Add a Motion node atop the given subplan to gather the tuples |
| * from an input gang to the QD. This motion should only be applied to |
| * a non-replicated, non-root subplan. |
| */ |
| Motion* make_motion_gather_to_QD(Plan *subplan, bool keep_ordering) |
| { |
| return make_motion_gather(subplan, -1, keep_ordering); |
| } |
| |
| /* |
| * make_motion_gather_to_QE |
| * Add a Motion node atop the given subplan to gather tuples to |
| * a single QE. This motion should only be applied to a partitioned |
| * subplan. |
| */ |
| Motion* make_motion_gather_to_QE(Plan *subplan, bool keep_ordering) |
| { |
| return make_motion_gather(subplan, gp_singleton_segindex, keep_ordering); |
| } |
| |
| /* |
| * make_motion_gather |
| * Add a Motion node atop the given subplan to gather tuples to |
| * a single process. This motion should only be applied to a partitioned |
| * subplan. |
| */ |
| Motion* make_motion_gather(Plan *subplan, int segindex, bool keep_ordering) |
| { |
| Motion *motion; |
| |
| Assert(subplan->flow != NULL); |
| Assert(subplan->flow->flotype == FLOW_PARTITIONED || |
| (subplan->flow->flotype == FLOW_SINGLETON && subplan->flow->segindex == 0)); |
| |
| if ( keep_ordering && subplan->flow->numSortCols > 0 ) |
| { |
| Flow *flow = subplan->flow; |
| |
| motion = make_sorted_union_motion( |
| subplan, |
| segindex, |
| flow->numSortCols, /* Motion and Flow can share arrays. */ |
| flow->sortColIdx, |
| flow->sortOperators, |
| false /* useExecutorVarFormat */); |
| } |
| else |
| { |
| motion = make_union_motion( |
| subplan, |
| segindex, |
| false /* useExecutorVarFormat */); |
| } |
| |
| return motion; |
| } |
| |
| /* |
| * make_motion_hash_all_targets |
| * Add a Motion node atop the given subplan to hash collocate |
| * tuples non-distinct on the non-junk attributes. This motion |
| * should only be applied to a non-replicated, non-root subplan. |
| */ |
| Motion* make_motion_hash_all_targets(PlannerInfo *root, Plan *subplan) |
| { |
| List *hashexprs = makeHashExprsFromNonjunkTargets(subplan->targetlist); |
| return make_motion_hash(root, subplan, hashexprs); |
| } |
| |
| /* |
| * make_motion_hash |
| * Add a Motion node atop the given subplan to hash collocate |
| * tuples non-distinct on the values of the hash expressions. This |
| * motion should only be applied to a non-replicated, non-root subplan. |
| */ |
| Motion* make_motion_hash(PlannerInfo *root __attribute__((unused)) , Plan *subplan, List *hashexprs) |
| { |
| Motion *motion; |
| |
| Assert(subplan->flow != NULL); |
| |
| motion = make_hashed_motion( |
| subplan, |
| hashexprs, |
| false /* useExecutorVarFormat */); |
| |
| return motion; |
| } |
| |
| /* |
| * makeHashExprsFromNonjunkTargets |
| * Make a list of hash expressions over all non-resjunk targets in |
| * the targetlist are in the given target list. This will align |
| * with the sort attributes used as input to a SetOp or Unique |
| * operator. |
| * |
| * Returns the newly allocate expression list for a Motion node. |
| */ |
| List *makeHashExprsFromNonjunkTargets(List *targetlist) |
| { |
| ListCell *cell; |
| List *hashlist = NIL; |
| |
| foreach(cell, targetlist) |
| { |
| TargetEntry *tle = (TargetEntry *) lfirst(cell); |
| |
| if (!tle->resjunk) |
| { |
| hashlist = lappend(hashlist, copyObject(tle->expr)); |
| } |
| } |
| return hashlist; |
| |
| } |
| |
| /* |
| * Marks an Append plan with its locus based on the set operation |
| * type determined during examination of the arguments. |
| */ |
| void mark_append_locus(Plan *plan, GpSetOpType optype) |
| { |
| switch ( optype ) |
| { |
| case PSETOP_GENERAL: |
| mark_plan_general(plan); |
| break; |
| case PSETOP_PARALLEL_PARTITIONED: |
| mark_plan_strewn(plan); |
| break; |
| case PSETOP_PARALLEL_REPLICATED: |
| mark_plan_replicated(plan); |
| break; |
| case PSETOP_SEQUENTIAL_QD: |
| mark_plan_entry(plan); |
| break; |
| case PSETOP_SEQUENTIAL_QE: |
| mark_plan_singleQE(plan); |
| case PSETOP_NONE: |
| break; |
| } |
| } |
| |
| void mark_passthru_locus(Plan *plan, bool with_hash, bool with_sort) |
| { |
| Flow *flow; |
| Plan *subplan = NULL; |
| bool is_subquery = IsA(plan, SubqueryScan); |
| |
| Assert( is_plan_node((Node*)plan) && plan->flow == NULL ); |
| |
| if ( is_subquery ) |
| { |
| subplan = ((SubqueryScan*)plan)->subplan; |
| } |
| else |
| { |
| subplan = plan->lefttree; |
| } |
| |
| Assert( subplan != NULL && subplan->flow != NULL); |
| |
| flow = copyFlow(subplan->flow, with_hash && !is_subquery, with_sort); |
| |
| if ( is_subquery && with_hash && flow->flotype == FLOW_PARTITIONED ) |
| { |
| ListCell *c; |
| List *hash = NIL; |
| Index varno = ((Scan*)plan)->scanrelid; |
| |
| Flow *subplanflow = subplan->flow; |
| |
| /* Make sure all the expressions the flow thinks we're hashed on |
| * occur in the subplan targetlist. |
| */ |
| foreach( c, subplanflow->hashExpr ) |
| { |
| Node *x = (Node*)lfirst(c); |
| |
| Expr *exprNew = cdbpullup_expr((Expr *) x, subplan->targetlist, NULL, varno); |
| |
| hash = lappend(hash, exprNew); |
| } |
| |
| flow->hashExpr = hash; |
| } |
| |
| plan->flow = flow; |
| } |
| |
| |
| void mark_sort_locus(Plan *plan) |
| { |
| plan->flow = pull_up_Flow(plan, plan->lefttree, true); |
| } |
| |
| void mark_plan_general(Plan* plan) |
| { |
| Assert( is_plan_node((Node*)plan) && plan->flow == NULL ); |
| plan->flow = makeFlow(FLOW_SINGLETON); |
| plan->flow->segindex = 0; |
| plan->flow->locustype = CdbLocusType_General; |
| } |
| |
| void mark_plan_strewn(Plan* plan) |
| { |
| Assert( is_plan_node((Node*)plan) && plan->flow == NULL ); |
| plan->flow = makeFlow(FLOW_PARTITIONED); |
| plan->flow->locustype = CdbLocusType_Strewn; |
| } |
| |
| void mark_plan_replicated(Plan* plan) |
| { |
| Assert( is_plan_node((Node*)plan) && plan->flow == NULL ); |
| plan->flow = makeFlow(FLOW_REPLICATED); |
| plan->flow->locustype = CdbLocusType_Replicated; |
| } |
| |
| void mark_plan_entry(Plan* plan) |
| { |
| Assert( is_plan_node((Node*)plan) && plan->flow == NULL ); |
| plan->flow = makeFlow(FLOW_SINGLETON); |
| plan->flow->segindex = -1; |
| plan->flow->locustype = CdbLocusType_Entry; |
| } |
| |
| void mark_plan_singleQE(Plan* plan) |
| { |
| Assert( is_plan_node((Node*)plan) && plan->flow == NULL ); |
| plan->flow = makeFlow(FLOW_SINGLETON); |
| plan->flow->segindex = 0; |
| plan->flow->locustype = CdbLocusType_SingleQE; |
| } |