blob: 1b57be4554e4b5312de0164b048e4dc705b16e30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbmutate.c
* Parallelize a PostgreSQL sequential plan tree.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "optimizer/clauses.h"
#include "parser/analyze.h" /* for createRandomDistribution() */
#include "parser/parsetree.h" /* for rt_fetch() */
#include "utils/lsyscache.h" /* for getatttypetypmod() */
#include "nodes/makefuncs.h" /* for makeVar() */
#include "parser/parse_expr.h" /* for expr_type() */
#include "parser/parse_oper.h" /* for compatible_oper_opid() */
#include "utils/relcache.h" /* RelationGetPartitioningKey() */
#include "optimizer/tlist.h" /* get_sortgroupclause_tle() */
#include "optimizer/predtest.h"
#include "optimizer/var.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "catalog/catalog.h"
#include "catalog/gp_policy.h"
#include "catalog/pg_type.h"
#include "catalog/pg_proc.h"
#include "utils/syscache.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbhash.h" /* isGreenplumDbHashable() */
#include "cdb/cdbllize.h"
#include "cdb/cdbmutate.h"
#include "cdb/cdbpartition.h"
#include "cdb/cdbplan.h"
#include "cdb/cdbpullup.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbtargeteddispatch.h"
#include "nodes/print.h"
#include "executor/executor.h"
/* Enable the optimizer */
extern bool optimizer;
/*
* An ApplyMotionState holds state for the recursive apply_motion_mutator().
* It is externalized here to make it shareable by helper code in other
* modules, e.g., cdbaggmutate.c.
*/
typedef struct ApplyMotionState
{
plan_tree_base_prefix base; /* Required prefix for plan_tree_walker/mutator */
int nextMotionID;
int sliceDepth;
int numInitPlanMotionNodes;
List *initPlans;
} ApplyMotionState;
/*
* External functions
*/
bool is_dummy_plan(Plan *plan);
/*
* Forward Declarations
*/
static void assignMotionID(Node *newnode, ApplyMotionState * context, Node *oldnode);
static int *makeDefaultSegIdxArray(int numSegs);
static Motion *
make_motion(Plan *lefttree,
bool sendSorted,
int numSortCols,
AttrNumber *sortColIdx,
Oid *sortOperators,
bool useExecutorVarFormat);
static Node *apply_motion_mutator(Node *node, ApplyMotionState * context);
static void request_explicit_motion(Plan *plan, Index resultRelationIdx, List *rtable);
static void request_explicit_motion_append(Append *append, List *resultRelationsIdx, List *rtable);
static AttrNumber find_segid_column(List *tlist, Index relid);
static bool
doesUpdateAffectPartitionCols(PlannerInfo *root,
Plan *plan,
Query *query);
/*
* Given an expression, return true if it contains anything non-constant.
*
* an expression is considered "constant" if they consist of constant
* expressions and non-volatile functions with constant arguments.
*/
static bool
contains_nonconstant_walker(Node *node, void *context)
{
if (node == NULL)
return false;
if (IsA(node, Const))
return false;
if (contain_volatile_functions(node))
return true;
return expression_tree_walker(node, contains_nonconstant_walker, context);
}
/*
* Is target list all-constant ?
*
* That means that there is no subplan and the elements of the
* targetlist are either constant or guaranteed to produce constants
* (non-volatile functions with all constant arguments).
*/
static bool
allConstantValuesClause(Plan *node)
{
ListCell *cell;
TargetEntry *tle;
Assert(IsA(node, Result));
if (node->lefttree != NULL)
return false;
foreach(cell, node->targetlist)
{
tle = (TargetEntry *)lfirst(cell);
Assert(tle->expr);
if (contains_nonconstant_walker((Node *)tle->expr, NULL))
return false;
}
return true;
}
static void
directDispatchCalculateHash(Plan *plan, GpPolicy *targetPolicy)
{
int i;
CdbHash *h=NULL;
ListCell *cell=NULL;
bool directDispatch;
h = makeCdbHash(GetPlannerSegmentNum(), HASH_FNV_1);
cdbhashinit(h);
/*
* the nested loops here seem scary --
* especially since we've already
* walked them before -- but I think
* this is still required since they
* may not be in the same order.
* (also typically we don't distribute
* by more than a handful of
* attributes).
*/
directDispatch = true;
for (i = 0; i < targetPolicy->nattrs; i++)
{
Const *c;
TargetEntry *tle = NULL;
foreach(cell, plan->targetlist)
{
tle = (TargetEntry *)lfirst(cell);
Assert(tle->expr);
if (tle->resno != targetPolicy->attrs[i])
continue;
if (!IsA(tle->expr, Const))
{
/* the planner could not simplify this */
directDispatch = false;
break;
}
c = (Const *)tle->expr;
if (c->constisnull)
{
cdbhashnull(h);
}
else
{
Assert(isGreenplumDbHashable(c->consttype));
cdbhash(h, c->constvalue, typeIsArrayType(c->consttype) ? ANYARRAYOID : c->consttype);
}
break;
}
if (!directDispatch)
break;
}
/* We now have the hash-partition that this row belong to */
plan->directDispatch.isDirectDispatch = directDispatch;
if (directDispatch)
{
uint32 hashcode = cdbhashreduce(h);
plan->directDispatch.contentIds = list_make1_int(hashcode );
elog(DEBUG1, "sending single row constant insert to content %d", hashcode);
}
}
/* ------------------------------------------------------------------------- *
* Function apply_motion() and apply_motion_mutator() add motion nodes to
* a top-level Plan tree as directed by the Flow nodes in the plan.
*
* In addition, if the argument Plan produces a partitioned flow, a Motion
* appropriate to the command is added to the top. For example, to consolidate
* the tuple streams from a SELECT command into a single tuple stream on the
* dispatcher.
*
* The result is a deep copy of the argument Plan tree with added Motion
* nodes.
*
* TODO Maybe add more context to argument list!
* ------------------------------------------------------------------------- *
*/
Plan *
apply_motion(PlannerInfo *root, Plan *plan, Query *query)
{
Plan *result;
ListCell *cell;
GpPolicy *targetPolicy = NULL;
GpPolicyType targetPolicyType = POLICYTYPE_ENTRY;
ApplyMotionState state;
bool needToAssignDirectDispatchContentIds = false;
/* Initialize mutator context. */
planner_init_plan_tree_base(&state.base, root); /* error on attempt to descend into subplan plan */
state.nextMotionID = 1; /* Start at 1 so zero will mean "unassigned". */
state.sliceDepth = 0;
state.numInitPlanMotionNodes = 0;
state.initPlans = NIL;
Assert(is_plan_node((Node *) plan) && IsA(query, Query));
Assert(plan->flow && plan->flow->flotype != FLOW_REPLICATED);
/* Does query have a target relation? (INSERT/DELETE/UPDATE) */
/*
* NOTE: This code makes the assumption that if we are working on a hierarchy of tables,
* all the tables are distributed, or all are on the entry DB. Any mixture will fail
*/
if (query->resultRelation > 0)
{
RangeTblEntry *rte = rt_fetch(query->resultRelation, query->rtable);
Assert(rte->rtekind == RTE_RELATION);
targetPolicy = GpPolicyFetch(CurrentMemoryContext, rte->relid);
if (targetPolicy)
targetPolicyType = targetPolicy->ptype;
}
switch (query->commandType)
{
case CMD_SELECT:
if (query->intoClause)
{
List *hashExpr;
ListCell *exp1;
int maxattrs = 200;
if (query->intoPolicy != NULL)
{
targetPolicy = query->intoPolicy;
Assert(query->intoPolicy->ptype == POLICYTYPE_PARTITIONED);
Assert(query->intoPolicy->nattrs >= 0);
Assert(query->intoPolicy->nattrs <= 1024);
}
else
{
/* User did not specify a DISTRIBUTED BY clause */
targetPolicy = palloc0(sizeof(GpPolicy)- sizeof(targetPolicy->attrs) + maxattrs * sizeof(targetPolicy->attrs[0]));
targetPolicy->nattrs = 0;
targetPolicy->bucketnum = GetRelOpt_bucket_num_fromRangeVar(query->intoClause->rel, GetDefaultPartitionNum());
targetPolicy->ptype = POLICYTYPE_PARTITIONED;
/* Find out what the flow is partitioned on */
hashExpr = plan->flow->hashExpr;
if(hashExpr)
foreach(exp1, hashExpr)
{
AttrNumber n;
bool found_expr = false;
/* See if this Expr is a column of the result table */
for(n=1;n<=list_length(plan->targetlist);n++)
{
Var *new_var = NULL;
TargetEntry *target = get_tle_by_resno(plan->targetlist, n);
if (!target->resjunk)
{
Expr *var1 = (Expr *) lfirst(exp1);
/* right side variable may be encapsulated by a relabel node. motion, however, does not care about relabel nodes. */
if (IsA(var1, RelabelType))
var1 = ((RelabelType *)var1)->arg;
/* If subplan expr is a Var, copy to preserve its EXPLAIN info. */
if (IsA(target->expr, Var))
{
new_var = copyObject(target->expr);
new_var->varno = OUTER;
new_var->varattno = n;
}
/*
* Make a Var that references the target list entry at this offset,
* using OUTER as the varno
*/
else
new_var = makeVar(OUTER,
n,
exprType((Node *) target->expr),
exprTypmod((Node *) target->expr),
0);
if (equal(var1,new_var))
{
/* If it is, use it to partition the result table, to avoid
* unnecessary redistibution of data */
targetPolicy->attrs[targetPolicy->nattrs++] = n;
found_expr = true;
break;
}
}
}
if (!found_expr)
break;
}
}
query->intoPolicy = targetPolicy;
/*
* Make sure the top level flow is partitioned on the
* partitioning key of the target relation. Since this
* is a SELECT INTO (basically same as an INSERT) command, the target list will correspond
* to the attributes of the target relation in order.
*/
hashExpr = getExprListFromTargetList(plan->targetlist,
targetPolicy->nattrs,
targetPolicy->attrs,
true);
if (!repartitionPlan(plan, false, false, hashExpr))
ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
errmsg("Cannot parallelize that SELECT INTO yet")
));
Assert(query->intoPolicy->ptype == POLICYTYPE_PARTITIONED);
}
if (plan->flow->flotype == FLOW_PARTITIONED && !query->intoClause)
{
/*
* Query result needs to be brought back to the QD.
* Ask for motion to a single QE. Later, apply_motion
* will override that to bring it to the QD instead.
*
* If the query has an ORDER BY clause, use Merge Receive to
* preserve the ordering.
*
* The plan has already been set up to ensure each qExec's
* result is properly ordered according to the ORDER BY
* specification. The existing ordering could be stronger
* than required; if so, omit the extra trailing columns
* from the Merge Receive key.
*
* An unordered result is ok if the ORDER BY ordering is
* degenerate (on constant exprs) or the result is known to
* have at most one row.
*/
if (query->sortClause &&
plan->flow->numSortCols > 0)
{
if (plan->flow->numSortCols > list_length(query->sortClause))
plan->flow->numSortCols = list_length(query->sortClause);
Insist(focusPlan(plan, true, false));
}
/* Use UNION RECEIVE. Does not preserve ordering. */
else
Insist(focusPlan(plan, false, false));
}
needToAssignDirectDispatchContentIds = root->config->gp_enable_direct_dispatch && ! query->intoClause;
break;
case CMD_INSERT:
{
switch (targetPolicyType)
{
case POLICYTYPE_PARTITIONED:
{
List *hashExpr = NIL;
/*
* Make sure the top level flow is partitioned on the
* partitioning key of the target relation. Since this
* is an INSERT command, the target list will correspond
* to the attributes of the target relation in order.
*/
/* Is this plan an all-const insert ? */
if (gp_enable_fast_sri && IsA(plan, Result))
{
ListCell *cell;
bool typesOK=true;
PartitionNode *pn;
RangeTblEntry *rte;
Relation rel;
rte = rt_fetch(query->resultRelation, query->rtable);
rel = relation_open(rte->relid, NoLock);
/* 1: See if it's partitioned */
pn = RelationBuildPartitionDesc(rel, false);
if (pn && !partition_policies_equal(targetPolicy, pn))
{
/* 2: See if partitioning columns are constant */
List *partatts = get_partition_attrs(pn);
ListCell *lc;
bool all_const = true;
foreach(lc, partatts)
{
List *tl = plan->targetlist;
ListCell *cell;
AttrNumber attnum = lfirst_int(lc);
bool found = false;
foreach(cell, tl)
{
TargetEntry *tle = (TargetEntry *)lfirst(cell);
Assert(tle->expr);
if (tle->resno == attnum)
{
found = true;
if (!IsA(tle->expr, Const))
all_const = false;
break;
}
}
Assert(found);
}
/* 3: if not, mutate plan to make constant */
if (!all_const)
plan->targetlist = (List *)
planner_make_plan_constant(root, (Node *)plan->targetlist, true);
/* better be constant now */
if (allConstantValuesClause(plan))
{
bool *nulls;
Datum *values;
EState *estate = CreateExecutorState();
ResultRelInfo *rri;
/* 4: build tuple, look up partitioning key */
nulls = palloc0(sizeof(bool) * rel->rd_att->natts);
values = palloc(sizeof(Datum) * rel->rd_att->natts);
foreach(lc, partatts)
{
AttrNumber attnum = lfirst_int(lc);
List *tl = plan->targetlist;
ListCell *cell;
foreach(cell, tl)
{
TargetEntry *tle = (TargetEntry *)lfirst(cell);
Assert(tle->expr);
if (tle->resno == attnum)
{
Assert(IsA(tle->expr, Const));
nulls[attnum - 1] = ((Const *)tle->expr)->constisnull;
if (!nulls[attnum - 1])
values[attnum - 1] = ((Const *)tle->expr)->constvalue;
}
}
}
estate->es_result_partitions = pn;
estate->es_partition_state =
createPartitionState(estate->es_result_partitions, 1 /*resultPartSize */);
rri = makeNode(ResultRelInfo);
rri->ri_RangeTableIndex = 1; /* dummy */
rri->ri_RelationDesc = rel;
estate->es_result_relations = rri;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = rri;
rri = values_get_partition(values, nulls, RelationGetDescr(rel),
estate);
/* 5: get target policy for destination table */
targetPolicy = RelationGetPartitioningKey(rri->ri_RelationDesc);
if (targetPolicy->ptype != POLICYTYPE_PARTITIONED)
elog(ERROR, "policy must be partitioned");
ExecCloseIndices(rri);
heap_close(rri->ri_RelationDesc, NoLock);
FreeExecutorState(estate);
}
}
relation_close(rel, NoLock);
hashExpr = getExprListFromTargetList(plan->targetlist,
targetPolicy->nattrs,
targetPolicy->attrs,
true);
/* check the types. */
foreach(cell, hashExpr)
{
Expr *elem = NULL;
Oid att_type = InvalidOid;
elem = (Expr *)lfirst(cell);
att_type = exprType((Node *) elem);
Assert(att_type != InvalidOid);
if (!isGreenplumDbHashable(att_type))
{
typesOK=false;
break;
}
}
/* all constants in values clause -- no need to repartition. */
if (typesOK && allConstantValuesClause(plan))
{
Result *rNode = (Result *)plan;
List *hList = NIL;
int i;
/*
* If this table has child tables, we need to
* find out destination partition.
*
* See partition check above.
*/
/* build our list */
for (i = 0; i < targetPolicy->nattrs; i++)
{
Assert(targetPolicy->attrs[i] > 0);
hList = lappend_int(hList, targetPolicy->attrs[i]);
}
if (root->config->gp_enable_direct_dispatch)
{
directDispatchCalculateHash(plan, targetPolicy);
/* we now either have a hash-code, or we've marked the plan non-directed. */
}
rNode->hashFilter = true;
rNode->hashList = hList;
/* Build a partitioned flow */
plan->flow->flotype = FLOW_PARTITIONED;
plan->flow->locustype = CdbLocusType_Hashed;
plan->flow->hashExpr = hashExpr;
}
}
if (!hashExpr)
hashExpr = getExprListFromTargetList(plan->targetlist,
targetPolicy->nattrs,
targetPolicy->attrs,
true);
if (!repartitionPlan(plan, false, false, hashExpr))
ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
errmsg("Cannot parallelize that INSERT yet")));
break;
}
case POLICYTYPE_ENTRY:
/* All's well if query result is already on the QD. */
if (plan->flow->flotype == FLOW_SINGLETON &&
plan->flow->segindex < 0)
break;
/*
* Query result needs to be brought back to the QD.
* Ask for motion to a single QE. Later, apply_motion
* will override that to bring it to the QD instead.
*/
if (!focusPlan(plan, false, false))
ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
errmsg("Cannot parallelize that INSERT yet")));
break;
default:
Insist(0);
}
}
break;
case CMD_UPDATE:
case CMD_DELETE:
{
/* Distributed target table? */
if (targetPolicyType == POLICYTYPE_PARTITIONED)
{
/*
* The planner does not support updating any of the partitioning columns.
* The optimizer supports updating partitioning columns.
*/
if (query->commandType == CMD_UPDATE &&
!optimizer &&
doesUpdateAffectPartitionCols(root, plan,query))
{
ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
errmsg("Cannot parallelize an UPDATE statement that updates the distribution columns"),
errOmitLocation(true)));
}
if (IsA(plan, Append))
{
/* updating a partitioned table */
request_explicit_motion_append((Append *) plan, root->resultRelations, root->glob->finalrtable);
}
else
{
/* updating a non-partitioned table */
Assert(list_length(root->resultRelations) == 1);
int relid = list_nth_int(root->resultRelations, 0);
Assert(relid > 0);
request_explicit_motion(plan, relid, root->glob->finalrtable);
}
needToAssignDirectDispatchContentIds = root->config->gp_enable_direct_dispatch;
}
/* Target table is not distributed. Must be in entry db. */
else
{
if (plan->flow->flotype == FLOW_PARTITIONED ||
plan->flow->flotype == FLOW_REPLICATED ||
(plan->flow->flotype == FLOW_SINGLETON && plan->flow->segindex != -1))
{
/*
* target table is master-only but flow is distributed:
* add a GatherMotion on top
*/
/* create a shallow copy of the plan flow */
Flow *flow = plan->flow;
plan->flow = (Flow *) palloc(sizeof(Flow));
*(plan->flow) = *flow;
/* save original flow information */
plan->flow->flow_before_req_move = flow;
/* request a GatherMotion node */
plan->flow->req_move = MOVEMENT_FOCUS;
plan->flow->hashExpr = NIL;
plan->flow->segindex = 0;
}
else
{
/*
* Source is, presumably, a dispatcher singleton.
*/
plan->flow->req_move = MOVEMENT_NONE;
}
break;
}
}
break;
default:
Insist(0); /* Never see non-DML in here! */
}
result = (Plan *) apply_motion_mutator((Node *) plan, &state);
if ( needToAssignDirectDispatchContentIds )
{
/* figure out if we can run on a reduced set of nodes */
AssignContentIdsToPlanData(query, result, root);
}
Assert(result->nMotionNodes == state.nextMotionID - 1);
Assert(result->nInitPlans == list_length(state.initPlans));
/* Assign slice numbers to the initplans. */
foreach(cell, state.initPlans)
{
SubPlan *subplan = (SubPlan *)lfirst(cell);
Assert(IsA(subplan, SubPlan) && subplan->qDispSliceId == 0);
subplan->qDispSliceId = state.nextMotionID++;
}
#ifdef USE_ASSERT_CHECKING
/* Check whether plan brings back the tuples to their original segments so they
* can be updated */
if (query->commandType == CMD_DELETE ||
query->commandType == CMD_UPDATE)
{
if (targetPolicyType == POLICYTYPE_ENTRY)
{
/* target table is master-only */
Assert(list_length(root->resultRelations) == 1);
Assert(result->flow->flotype == FLOW_SINGLETON && result->flow->segindex == -1);
}
else if (result->nMotionNodes > state.numInitPlanMotionNodes)
{
/* target table is distributed and plan involves motion nodes */
/*
* Currently, we require all DML plans with Motion to have an ExplicitRedistribute
* motion on top or (for partitioned tables) an Append node with ExplicitRedistribute
* child plans
*/
if (list_length(root->resultRelations) == 1)
{
/* non-partitioned table */
Assert(IsA(result, Motion) && ((Motion *) result)->motionType == MOTIONTYPE_EXPLICIT);
}
else
{
/* updating a partitioned table */
Assert(IsA(result, Append));
ListCell *lcAppend;
foreach(lcAppend, ((Append *) result)->appendplans)
{
Plan *appendPlan = (Plan *) lfirst(lcAppend);
Assert(IsA(appendPlan, Motion) &&
((Motion *) appendPlan)->motionType == MOTIONTYPE_EXPLICIT);
}
}
}
}
#endif
/*
* Discard subtrees of Query node that aren't needed for execution.
* Note the targetlist (query->targetList) is used in execution of
* prepared statements, so we leave that alone.
*/
query->jointree = NULL;
query->groupClause = NIL;
query->havingQual = NULL;
query->distinctClause = NIL;
query->sortClause = NIL;
query->limitCount = NULL;
query->limitOffset = NULL;
query->setOperations = NULL;
return result;
}
/*
* Function apply_motion_mutator() is the workhorse for apply_motion().
*/
Node *
apply_motion_mutator(Node *node, ApplyMotionState * context)
{
Node *newnode;
Plan *plan;
Flow *flow;
int saveNextMotionID;
int saveNumInitPlans;
int saveSliceDepth;
if (node == NULL)
return NULL;
/* An expression node might have subtrees containing plans to be mutated. */
if (!is_plan_node(node))
return plan_tree_mutator(node, apply_motion_mutator, context);
plan = (Plan *)node;
flow = plan->flow;
saveNextMotionID = context->nextMotionID;
saveNumInitPlans = list_length(context->initPlans);
/* Descending into a subquery or a new slice? */
saveSliceDepth = context->sliceDepth;
Assert( !IsA(node,Query) );
if (IsA(node, Motion))
context->sliceDepth++;
else if (flow && flow->req_move != MOVEMENT_NONE )
context->sliceDepth++;
/*
* Copy this node and mutate its children. Afterward, this node should be
* an exact image of the input node, except that contained nodes requiring
* parallelization will have had it applied.
*/
newnode = plan_tree_mutator(node, apply_motion_mutator, context);
context->sliceDepth = saveSliceDepth;
plan = (Plan *)newnode;
flow = plan->flow;
/* Make one big list of all of the initplans. */
if (plan->initPlan)
{
ListCell *cell;
SubPlan *subplan;
int nMotion = 0;
foreach(cell, plan->initPlan)
{
PlannerInfo *root = (PlannerInfo *) context->base.node;
subplan = (SubPlan *)lfirst(cell);
Assert(IsA(subplan, SubPlan));
Assert(root);
Assert(planner_subplan_get_plan(root, subplan));
nMotion += planner_subplan_get_plan(root, subplan)->nMotionNodes;
context->initPlans = lappend(context->initPlans, subplan);
}
/* Keep track of the number of Motion nodes found in Init Plans. */
context->numInitPlanMotionNodes += nMotion;
}
/* Pre-existing Motion nodes must be renumbered. */
if (IsA(newnode, Motion))
{
Motion *motion = (Motion *)newnode;
#ifdef USE_ASSERT_CHECKING
/* Sanity check */
/* Sub plan must have flow */
Assert(flow && motion->plan.lefttree->flow);
Assert(flow->req_move == MOVEMENT_NONE && !flow->flow_before_req_move);
#endif
/* Assign unique node number to the new node. */
assignMotionID(newnode, context, node);
/* If top slice marked as singleton, make it a dispatcher singleton. */
if(motion->motionType == MOTIONTYPE_FIXED
&& motion->numOutputSegs == 1
&& motion->outputSegIdx[0] >= 0
&& context->sliceDepth == 0)
{
Assert(flow->flotype == FLOW_SINGLETON &&
flow->segindex == motion->outputSegIdx[0]);
flow->segindex = -1;
motion->outputSegIdx[0] = -1;
}
goto done;
}
/*
* If no Flow node, we're in a portion of the tree that was finished by
* the planner and already contains any needed Motion nodes.
*/
if (!flow)
goto done;
Assert(IsA(flow, Flow));
/* Add motion atop the copied node, if necessary. */
switch (flow->req_move)
{
case MOVEMENT_FOCUS:
/* If top slice is a singleton, let it execute on qDisp. */
if (flow->segindex >= 0 &&
context->sliceDepth == 0)
flow->segindex = -1;
if (flow->numSortCols > 0)
{
newnode = (Node *)make_sorted_union_motion(plan,
flow->segindex,
flow->numSortCols,
flow->sortColIdx,
flow->sortOperators,
true /* useExecutorVarFormat */);
}
else
{
newnode = (Node *)make_union_motion(plan,
flow->segindex,
true /* useExecutorVarFormat */);
}
break;
case MOVEMENT_BROADCAST:
newnode = (Node *) make_broadcast_motion(plan, true /* useExecutorVarFormat */);
break;
case MOVEMENT_REPARTITION:
newnode = (Node *)make_hashed_motion(plan,
flow->hashExpr,
true /* useExecutorVarFormat */
);
break;
case MOVEMENT_EXPLICIT:
/* add an ExplicitRedistribute motion node only if child plan nodes
* have a motion node
*/
if (context->nextMotionID - context->numInitPlanMotionNodes - saveNextMotionID > 0)
{
/* motion node in child nodes: add a ExplicitRedistribute motion */
newnode = (Node *) make_explicit_motion(plan,
flow->segidColIdx,
true /* useExecutorVarFormat */
);
}
else
{
/* no motion nodes in child plan nodes - no need for ExplicitRedistribute:
* restore flow */
flow->req_move = MOVEMENT_NONE;
flow->flow_before_req_move = NULL;
}
break;
case MOVEMENT_NONE:
/* Update flow if reassigning singleton top slice to qDisp. */
if (flow->flotype == FLOW_SINGLETON &&
flow->segindex >= 0 &&
context->sliceDepth == 0)
{
flow->segindex = -1;
}
break;
default:
Insist(0);
break;
}
/*
* After adding Motion node, assign slice id and restore subplan's Flow.
*/
if (flow->flow_before_req_move)
{
Assert(flow->req_move != MOVEMENT_NONE &&
IsA(newnode, Motion) &&
plan == ((Motion *)newnode)->plan.lefttree);
/* Assign unique node number to the new node. */
assignMotionID(newnode, context, NULL);
/* Replace the subplan's modified Flow with the original. */
plan->flow = flow->flow_before_req_move;
}
else
Assert(flow->req_move == MOVEMENT_NONE);
done:
/*
* Label the Plan node with the number of Motion nodes and Init Plans
* in this subtree, inclusive of the node itself. This info is used
* only in the top node of a query or subquery. Someday, find a better
* place to keep it.
*/
plan = (Plan *)newnode;
plan->nMotionNodes = context->nextMotionID - saveNextMotionID;
plan->nInitPlans = list_length(context->initPlans) - saveNumInitPlans;
return newnode;
} /* apply_motion_mutator */
/*
* Helper code for ApplyMotionState -- Assign motion id to new Motion node.
*/
void
assignMotionID(Node *newnode, ApplyMotionState * context, Node *oldnode)
{
Assert(IsA(newnode, Motion) && context != NULL);
Assert(oldnode == NULL || (IsA(oldnode, Motion) && oldnode != newnode));
/* Assign unique node number to the new node. */
((Motion *) newnode)->motionID = context->nextMotionID++;
/* Debugging hint that we have reassigned a previously assigned ID. */
if (oldnode != NULL && ((Motion *) oldnode)->motionID != 0)
{
ereport(DEBUG2, (
errmsg("Motion node renumbered: old=%d new=%d.",
((Motion *) oldnode)->motionID,
((Motion *) newnode)->motionID)
));
}
}
/* --------------------------------------------------------------------
* make_motion -- creates a Motion node.
* Caller must have built the pHashDefn, pFixedDefn,
* and pSortDefn structs already.
* useExecutorVarFormat is true if make_motion is called after setrefs
* This call only make a motion node, without filling in flow info
* After calling this function, caller need to call add_slice_to_motion
* --------------------------------------------------------------------
*/
Motion *
make_motion(Plan *lefttree,
bool sendSorted,
int numSortCols,
AttrNumber *sortColIdx,
Oid *sortOperators,
bool useExecutorVarFormat)
{
Motion *node = makeNode(Motion);
Plan *plan = &node->plan;
Assert(lefttree);
Assert(!IsA(lefttree, Motion));
AssertImply(sendSorted, (numSortCols > 0 && sortColIdx != NULL && sortOperators != NULL));
AssertImply(!sendSorted, (numSortCols == 0 && sortColIdx == NULL && sortOperators == NULL));
plan->startup_cost = lefttree->startup_cost;
plan->total_cost = lefttree->total_cost;
plan->plan_rows = lefttree->plan_rows;
plan->plan_width = lefttree->plan_width;
plan->targetlist = cdbpullup_targetlist(lefttree, useExecutorVarFormat);
plan->qual = NIL;
plan->lefttree = lefttree;
plan->righttree = NULL;
plan->dispatch = DISPATCH_PARALLEL;
node->sendSorted = sendSorted;
node->numSortCols = numSortCols;
node->sortColIdx = sortColIdx;
node->sortOperators = sortOperators;
plan->extParam = bms_copy(lefttree->extParam);
plan->allParam = bms_copy(lefttree->allParam);
plan->flow = NULL;
node->outputSegIdx = NULL;
node->numOutputSegs = 0;
return node;
}
void add_slice_to_motion(Motion *motion,
MotionType motionType, List *hashExpr,
int numOutputSegs, int *outputSegIdx)
{
/* sanity checks */
/* check numOutputSegs and outputSegIdx are in sync. */
AssertEquivalent(numOutputSegs == 0, outputSegIdx == NULL);
/* numOutputSegs are either 0, for hash or broadcast, or 1, for focus */
Assert(numOutputSegs == 0 || numOutputSegs == 1);
AssertImply(motionType == MOTIONTYPE_HASH, numOutputSegs == 0);
AssertImply(numOutputSegs == 1, (outputSegIdx[0] == -1 || outputSegIdx[0] == gp_singleton_segindex));
motion->motionType = motionType;
motion->hashExpr = hashExpr;
motion->hashDataTypes = NULL;
motion->numOutputSegs = numOutputSegs;
motion->outputSegIdx = outputSegIdx;
Assert(motion->plan.lefttree);
/* Build list of hash key expression data types. */
if (hashExpr)
{
List *eq = list_make1(makeString("="));
ListCell *cell;
foreach(cell, hashExpr)
{
Node *expr = (Node *)lfirst(cell);
Oid typeoid = exprType(expr);
Oid eqopoid;
Oid lefttype;
Oid righttype;
/* Get oid of the equality operator for this data type. */
eqopoid = compatible_oper_opid(eq, typeoid, typeoid, true);
if ( eqopoid == InvalidOid )
ereport(ERROR, (errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("no equality operator for typid %d",
typeoid)));
/* Get the equality operator's operand type. */
op_input_types( eqopoid, &lefttype, &righttype );
Assert( lefttype == righttype );
/* If this type is a domain type, get its base type. */
if (get_typtype(lefttype) == 'd')
lefttype = getBaseType(lefttype);
motion->hashDataTypes = lappend_oid(motion->hashDataTypes, lefttype);
}
list_free_deep(eq);
}
/* Attach a descriptive Flow. */
switch(motion->motionType)
{
case MOTIONTYPE_HASH:
motion->plan.flow = makeFlow(FLOW_PARTITIONED);
motion->plan.flow->locustype = CdbLocusType_Hashed;
motion->plan.flow->hashExpr = copyObject(motion->hashExpr);
motion->numOutputSegs = GetPlannerSegmentNum();
motion->outputSegIdx = makeDefaultSegIdxArray(GetPlannerSegmentNum());
break;
case MOTIONTYPE_FIXED:
if (motion->numOutputSegs == 0 )
{
/* broadcast */
motion->plan.flow = makeFlow(FLOW_REPLICATED);
motion->plan.flow->locustype = CdbLocusType_Replicated;
}
else if (motion->numOutputSegs == 1 )
{
/* Focus motion */
motion->plan.flow = makeFlow(FLOW_SINGLETON);
motion->plan.flow->segindex = motion->outputSegIdx[0];
motion->plan.flow->locustype = ( motion->plan.flow->segindex<0 ) ?
CdbLocusType_Entry :
CdbLocusType_SingleQE;
}
else
{
Assert(!"Invalid numoutputSegs for motion type fixed");
motion->plan.flow = NULL;
}
break;
case MOTIONTYPE_EXPLICIT:
motion->plan.flow = makeFlow(FLOW_PARTITIONED);
/* TODO: antova - Nov 18, 2010; add a special locus type for ExplicitRedistribute flows */
motion->plan.flow->locustype = CdbLocusType_Strewn;
break;
default:
Assert(!"Invalid motion type");
motion->plan.flow = NULL;
break;
}
Assert(motion->plan.flow);
/* if the motion has a sort order, preserve it in each route */
if(motion->sendSorted && motion->numSortCols > 0)
{
int i, n;
n = motion->numSortCols;
motion->plan.flow->numSortCols = n;
motion->plan.flow->sortColIdx = palloc(n * sizeof(AttrNumber));
motion->plan.flow->sortOperators = palloc (n * sizeof(Oid));
for ( i = 0; i < n; i++ )
{
motion->plan.flow->sortColIdx[i] = motion->sortColIdx[i];
motion->plan.flow->sortOperators[i] = motion->sortOperators[i];
}
}
}
Motion *
make_union_motion(Plan *lefttree, int destSegIndex, bool useExecutorVarFormat)
{
Motion *motion;
int *outSegIdx = (int *) palloc(sizeof(int));
outSegIdx[0] = destSegIndex;
motion = make_motion(lefttree, false,
0, NULL, NULL, /* numSortCols, sortColIdx, sortOperators */
useExecutorVarFormat);
add_slice_to_motion(motion, MOTIONTYPE_FIXED,
NULL, 1, outSegIdx);
return motion;
}
Motion *
make_sorted_union_motion(Plan *lefttree,
int destSegIndex,
int numSortCols, AttrNumber *sortColIdx, Oid *sortOperators, bool useExecutorVarFormat)
{
Motion *motion;
int *outSegIdx = (int *) palloc(sizeof(int));
outSegIdx[0] = destSegIndex;
motion = make_motion(lefttree, true,
numSortCols, sortColIdx, sortOperators, useExecutorVarFormat);
add_slice_to_motion(motion, MOTIONTYPE_FIXED,
NULL, 1, outSegIdx);
return motion;
}
Motion *
make_hashed_motion(Plan *lefttree,
List *hashExpr, bool useExecutorVarFormat)
{
Motion *motion;
motion = make_motion(lefttree, false,
0, NULL, NULL, useExecutorVarFormat);
add_slice_to_motion(motion, MOTIONTYPE_HASH,
hashExpr,
0, NULL);
return motion;
}
Motion *
make_broadcast_motion(Plan *lefttree, bool useExecutorVarFormat)
{
Motion *motion;
motion = make_motion(lefttree, false,
0, NULL, NULL, useExecutorVarFormat);
add_slice_to_motion(motion, MOTIONTYPE_FIXED,
NULL, 0, NULL);
return motion;
}
Motion *
make_explicit_motion(Plan *lefttree, AttrNumber segidColIdx, bool useExecutorVarFormat)
{
Motion *motion;
motion = make_motion(lefttree, false,
0, NULL, NULL, useExecutorVarFormat); /* numSortCols, sortColIdx, sortOperators */
Assert(segidColIdx > 0 && segidColIdx <= list_length(lefttree->targetlist));
motion->segidColIdx = segidColIdx;
add_slice_to_motion(motion, MOTIONTYPE_EXPLICIT,
NULL, 0, NULL); /* hashExpr, numOutputSegs, outputSegIdx */
return motion;
}
/* --------------------------------------------------------------------
* makeDefaultSegIdxArray -- creates an int array with numSegs elements,
* with values between 0 and numSegs-1
* --------------------------------------------------------------------
*/
int *
makeDefaultSegIdxArray(int numSegs)
{
int *segIdx = palloc(numSegs * sizeof(int));
/*
* The following assumes that the default is to have segindexes
* numbered sequentially starting at 0.
*/
int i;
for (i = 0; i < numSegs; i++)
segIdx[i] = i;
return segIdx;
}
/* --------------------------------------------------------------------
*
* Static Helper routines
* --------------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* getExprListFromTargetList
*
* Creates a VAR that references the indicated entries from the target list,
* sets the restype and restypmod fields from the target list info,
* and puts them into a list.
*
* The AttrNumber indexes actually refer to the 1 based index into the
* target list.
*
* The entries have the varno field replaced by references in OUTER.
* ----------------------------------------------------------------
*/
List *
getExprListFromTargetList(List *tlist,
int numCols,
AttrNumber *colIdx,
bool useExecutorVarFormat)
{
int i;
List *elist = NIL;
for (i = 0; i < numCols; i++)
{
/* Find expr in TargetList */
AttrNumber n = colIdx[i];
TargetEntry *target = get_tle_by_resno(tlist, n);
if (target == NULL)
elog(ERROR, "no tlist entry for key %d", n);
/* After set_plan_references(), make a Var referencing subplan tlist. */
if (useExecutorVarFormat)
elist = lappend(elist, cdbpullup_make_expr(OUTER, n, target->expr, false));
/* Before set_plan_references(), copy the subplan's result expr. */
else
elist = lappend(elist, copyObject(target->expr));
}
return elist;
}
/*
* isAnyColChangedByUpdate
*/
static bool
isAnyColChangedByUpdate(Index targetvarno,
List *targetlist,
int nattrs,
AttrNumber *attrs)
{
/*
* Want to test whether an update statement possibly changes the
* value of any of the partitioning columns.
*/
/*
* For each partitioning column, the TargetListEntry for that varattno
* should be just a Var node with the same varattno.
*/
int i;
for (i = 0; i < nattrs; i++)
{
TargetEntry *tle = get_tle_by_resno(targetlist, attrs[i]);
Var *var;
Insist(tle);
if (!IsA(tle->expr, Var))
return true;
var = (Var *)tle->expr;
if (var->varnoold != targetvarno ||
var->varoattno != attrs[i])
return true;
}
return false;
} /* isAnyColChangedByUpdate */
/*
* Request an ExplicitRedistribute motion node for a plan node
*/
void request_explicit_motion(Plan *plan, Index resultRelationsIdx, List *rtable)
{
/* request a segid redistribute motion */
/* create a shallow copy of the plan flow */
Flow *flow = plan->flow;
plan->flow = (Flow *) palloc(sizeof(*(plan->flow)));
*(plan->flow) = *flow;
/* save original flow information */
plan->flow->flow_before_req_move = flow;
/* request a SegIdRedistribute motion node */
plan->flow->req_move = MOVEMENT_EXPLICIT;
/* find segid column in target list */
AttrNumber segidColIdx = find_segid_column(plan->targetlist, resultRelationsIdx);
Assert(-1 != segidColIdx);
plan->flow->segidColIdx = segidColIdx;
}
/*
* Request SegIdRedistribute motion node for an Append plan updating a partitioned
* table
*/
void request_explicit_motion_append(Append *append, List *resultRelationsIdx, List *rtable)
{
Assert(IsA(append, Append));
Assert(list_length(append->appendplans) == list_length(resultRelationsIdx));
ListCell *lcAppendPlan = NULL;
ListCell *lcResultRel = NULL;
forboth(lcAppendPlan, append->appendplans,
lcResultRel, resultRelationsIdx)
{
/* request a segid redistribute motion node to be put above each
* append plan */
Plan *appendChildPlan = (Plan *) lfirst(lcAppendPlan);
int relid = lfirst_int(lcResultRel);
Assert(relid > 0);
request_explicit_motion(appendChildPlan, relid, rtable);
}
}
/*
* Find the index of the segid column of the requested relation (relid) in the
* target list
*/
AttrNumber find_segid_column(List *tlist, Index relid)
{
if (NIL == tlist)
{
return -1;
}
ListCell *lcte = NULL;
foreach(lcte, tlist)
{
TargetEntry *te = (TargetEntry *) lfirst(lcte);
Var *var = (Var *) te->expr;
if (!IsA(var, Var))
{
continue;
}
if ((var->varno == relid && var->varattno == GpSegmentIdAttributeNumber) ||
(var->varnoold == relid && var->varoattno == GpSegmentIdAttributeNumber))
{
return te->resno;
}
}
// no segid column found
return -1;
}
/*
* doesUpdateAffectPartitionCols
*/
bool
doesUpdateAffectPartitionCols(PlannerInfo *root,
Plan *plan,
Query *query)
{
Append *append;
ListCell *rticell;
ListCell *plancell;
GpPolicy *policy;
if (list_length(root->resultRelations) == 1)
{
RangeTblEntry *rte;
Relation relation;
int resultRelation = linitial_int(root->resultRelations);
rte = rt_fetch(resultRelation, query->rtable);
Assert(rte->rtekind == RTE_RELATION);
/* Get a copy of the rel's GpPolicy from the relcache. */
relation = relation_open(rte->relid, NoLock);
policy = RelationGetPartitioningKey(relation);
relation_close(relation, NoLock);
/*
* Single target relation?
*/
return isAnyColChangedByUpdate(resultRelation,
plan->targetlist,
policy->nattrs,
policy->attrs);
}
/*
* Multiple target relations (inheritance)...
* Plan should have an Append node on top, with a subplan per target.
*/
append = (Append *)plan;
Insist(IsA(append, Append) &&
append->isTarget &&
list_length(append->appendplans) == list_length(root->resultRelations));
forboth(rticell, root->resultRelations,
plancell, append->appendplans)
{
int rti = lfirst_int(rticell);
Plan *subplan = (Plan *)lfirst(plancell);
RangeTblEntry *rte;
Relation relation;
rte = rt_fetch(rti, query->rtable);
Assert(rte->rtekind == RTE_RELATION);
/* Get a copy of the rel's GpPolicy from the relcache. */
relation = relation_open(rte->relid, NoLock);
policy = RelationGetPartitioningKey(relation);
relation_close(relation, NoLock);
if (isAnyColChangedByUpdate(rti, subplan->targetlist, policy->nattrs, policy->attrs))
return true;
}
return false;
} /* doesUpdateAffectPartitionCols */
/* ----------------------------------------------------------------------- *
* cdbmutate_warn_ctid_without_segid() warns the user if the plan refers to a
* partitioned table's ctid column without also referencing its
* gp_segment_id column.
* ----------------------------------------------------------------------- *
*/
typedef struct ctid_inventory_context
{
plan_tree_base_prefix base; /* Required prefix for plan_tree_walker/mutator */
bool uses_ctid;
bool uses_segid;
Index relid;
} ctid_inventory_context;
static bool
ctid_inventory_walker(Node *node, ctid_inventory_context *inv)
{
if (node == NULL)
return false;
if (IsA(node, Var))
{
Var *var = (Var *)node;
if (var->varattno < 0 &&
var->varno == inv->relid &&
var->varlevelsup == 0)
{
if (var->varattno == SelfItemPointerAttributeNumber)
inv->uses_ctid = true;
else if (var->varattno == GpSegmentIdAttributeNumber)
inv->uses_segid = true;
}
return false;
}
return plan_tree_walker(node, ctid_inventory_walker, inv);
}
void
cdbmutate_warn_ctid_without_segid(struct PlannerInfo *root, struct RelOptInfo *rel)
{
ctid_inventory_context inv;
Relids relids_to_ignore;
ListCell *cell;
AttrNumber attno;
int ndx;
planner_init_plan_tree_base(&inv.base, root);
inv.uses_ctid = false;
inv.uses_segid = false;
inv.relid = rel->relid;
/* Rel not distributed? Then segment id doesn't matter. */
if (!rel->cdbpolicy ||
rel->cdbpolicy->ptype != POLICYTYPE_PARTITIONED)
return;
/* Ignore references occurring in the Query's final output targetlist. */
relids_to_ignore = bms_make_singleton(0);
/* Is 'ctid' referenced in join quals? */
attno = SelfItemPointerAttributeNumber;
Assert(attno >= rel->min_attr && attno <= rel->max_attr);
ndx = attno - rel->min_attr;
if (bms_nonempty_difference(rel->attr_needed[ndx], relids_to_ignore))
inv.uses_ctid = true;
/* Is 'gp_segment_id' referenced in join quals? */
attno = GpSegmentIdAttributeNumber;
Assert(attno >= rel->min_attr && attno <= rel->max_attr);
ndx = attno - rel->min_attr;
if (bms_nonempty_difference(rel->attr_needed[ndx], relids_to_ignore))
inv.uses_segid = true;
/* Examine the single-table quals on this rel. */
foreach(cell, rel->baserestrictinfo)
{
RestrictInfo *rinfo = (RestrictInfo *)lfirst(cell);
Assert(IsA(rinfo, RestrictInfo));
ctid_inventory_walker((Node *)rinfo->clause, &inv);
}
/* Notify client if found a reference to ctid only, without gp_segment_id */
if (inv.uses_ctid &&
!inv.uses_segid)
{
RangeTblEntry *rte = rt_fetch(rel->relid, root->parse->rtable);
const char *cmd;
int elevel;
/* Reject if UPDATE or DELETE. Otherwise just give info msg. */
switch (root->parse->commandType)
{
case CMD_UPDATE:
cmd = "UPDATE";
elevel = ERROR;
break;
case CMD_DELETE:
cmd = "DELETE";
elevel = ERROR;
break;
default:
cmd = "SELECT";
elevel = NOTICE;
}
ereport(elevel, (errmsg("%s uses system-defined column \"%s.ctid\" "
"without the necessary companion column "
"\"%s.gp_segment_id\"",
cmd,
rte->eref->aliasname,
rte->eref->aliasname),
errhint("To uniquely identify a row within a "
"distributed table, use the \"gp_segment_id\" "
"column together with the \"ctid\" column.")
));
}
bms_free(relids_to_ignore);
} /* cdbmutate_warn_ctid_without_segid */
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/dependency.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
/*
* Code that mutate the tree for share input
*
* After the planner, the plan is really a DAG. SISCs will have valid
* pointer to the underlying share. However, other code (setrefs etc)
* depends on the fact that the plan is a tree. We first mutate the DAG
* to a tree.
*
* Next, we will need to decide if the share is cross slices. If the share
* is not cross slice, we do not need the syncrhonization, and it is possible to
* keep the Material/Sort in memory to save a sort.
*
* It is essential that we walk the tree in the same order as the ExecProcNode start
* execution, otherwise, deadlock may rise.
*/
/* Walk the tree for shareinput.
* Shareinput fix shared_as_id and underlying_share_id of nodes in place. We do not want to use
* the ordinary tree walker as it is unnecessary to make copies etc.
*/
typedef bool (*SHAREINPUT_MUTATOR) (Node *node, ApplyShareInputContext *ctxt, bool fPop);
static void shareinput_walker(SHAREINPUT_MUTATOR f, Node *node, ApplyShareInputContext *ctxt)
{
Plan *plan = NULL;
bool recursive_down;
if(node == NULL)
return;
if(IsA(node, List))
{
List *l = (List *) node;
ListCell *lc;
foreach(lc, l)
{
Node* n = lfirst(lc);
shareinput_walker(f, n, ctxt);
}
return;
}
if(!is_plan_node(node))
return;
plan = (Plan *) node;
recursive_down = (*f)(node, ctxt, false);
if(recursive_down)
{
if(IsA(node, Append))
{
ListCell *cell;
Append *app = (Append *) node;
foreach(cell, app->appendplans)
shareinput_walker(f, (Node *)lfirst(cell), ctxt);
}
else if(IsA(node, SubqueryScan))
{
SubqueryScan *subqscan = (SubqueryScan *) node;
shareinput_walker(f, (Node *) subqscan->subplan, ctxt);
}
else if(IsA(node, BitmapAnd))
{
ListCell *cell;
BitmapAnd *ba = (BitmapAnd *) node;
foreach(cell, ba->bitmapplans)
shareinput_walker(f, (Node *)lfirst(cell), ctxt);
}
else if(IsA(node, BitmapOr))
{
ListCell *cell;
BitmapOr *bo = (BitmapOr *) node;
foreach(cell, bo->bitmapplans)
shareinput_walker(f, (Node *)lfirst(cell), ctxt);
}
else if(IsA(node, NestLoop))
{
/* Nest loop join is strange. Exec order depends on prefetch_inner */
NestLoop *nl = (NestLoop *) node;
if(nl->join.prefetch_inner)
{
shareinput_walker(f, (Node *)plan->righttree, ctxt);
shareinput_walker(f, (Node *)plan->lefttree, ctxt);
}
else
{
shareinput_walker(f, (Node *)plan->lefttree, ctxt);
shareinput_walker(f, (Node *)plan->righttree, ctxt);
}
}
else if(IsA(node, HashJoin))
{
/* Hash join the hash table is at inner */
shareinput_walker(f, (Node *)plan->righttree, ctxt);
shareinput_walker(f, (Node *)plan->lefttree, ctxt);
}
else if (IsA(node, MergeJoin))
{
MergeJoin *mj = (MergeJoin *) node;
if (mj->unique_outer)
{
shareinput_walker(f, (Node *)plan->lefttree, ctxt);
shareinput_walker(f, (Node *)plan->righttree, ctxt);
}
else
{
shareinput_walker(f, (Node *)plan->righttree, ctxt);
shareinput_walker(f, (Node *)plan->lefttree, ctxt);
}
}
else if (IsA(node, Sequence))
{
ListCell *cell = NULL;
Sequence *sequence = (Sequence *) node;
foreach(cell, sequence->subplans)
{
shareinput_walker(f, (Node *)lfirst(cell), ctxt);
}
}
else
{
shareinput_walker(f, (Node *)plan->lefttree, ctxt);
shareinput_walker(f, (Node *)plan->righttree, ctxt);
shareinput_walker(f, (Node *)plan->initPlan, ctxt);
}
}
(*f)(node, ctxt, true);
}
static void leftdeep_walker(SHAREINPUT_MUTATOR f, Node *node, ApplyShareInputContext *ctxt)
{
Plan *plan = NULL;
bool recursive_down;
if(node == NULL)
return;
if(IsA(node, List))
{
List *l = (List *) node;
ListCell *lc;
foreach(lc, l)
{
Node* n = lfirst(lc);
leftdeep_walker(f, n, ctxt);
}
return;
}
if(!is_plan_node(node))
return;
plan = (Plan *) node;
recursive_down = (*f)(node, ctxt, false);
if(recursive_down)
{
if(IsA(node, Append))
{
ListCell *cell;
Append *app = (Append *) node;
foreach(cell, app->appendplans)
leftdeep_walker(f, (Node *)lfirst(cell), ctxt);
}
else if(IsA(node, BitmapAnd))
{
ListCell *cell;
BitmapAnd *ba = (BitmapAnd *) node;
foreach(cell, ba->bitmapplans)
leftdeep_walker(f, (Node *)lfirst(cell), ctxt);
}
else if(IsA(node, BitmapOr))
{
ListCell *cell;
BitmapOr *bo = (BitmapOr *) node;
foreach(cell, bo->bitmapplans)
leftdeep_walker(f, (Node *)lfirst(cell), ctxt);
}
else if(IsA(node, SubqueryScan))
{
SubqueryScan *subqscan = (SubqueryScan *) node;
leftdeep_walker(f, (Node *) subqscan->subplan, ctxt);
}
else
{
leftdeep_walker(f, (Node *)plan->lefttree, ctxt);
leftdeep_walker(f, (Node *)plan->righttree, ctxt);
leftdeep_walker(f, (Node *)plan->initPlan, ctxt);
}
}
(*f)(node, ctxt, true);
}
/*
* DAG to Tree
* Zap children if I am not the first sharer (not recursive down).
* Assign share_id to both ShareInputScan and Material/Sort.
*/
static bool shareinput_mutator_dag_to_tree(Node *node, ApplyShareInputContext* ctxt, bool fPop)
{
Plan *plan = (Plan *) node;
if(fPop)
return true;
if(IsA(plan, ShareInputScan))
{
Plan *subplan = plan->lefttree;
Assert(subplan);
Assert(IsA(subplan, Material) || IsA(subplan, Sort));
Assert(get_plan_share_id(plan) == SHARE_ID_NOT_ASSIGNED);
Assert(plan->righttree == NULL);
Assert(plan->initPlan == NULL);
if(list_member_ptr(ctxt->sharedNodes, subplan))
{
Assert(get_plan_share_id(subplan) >= 0);
set_plan_share_id(plan, get_plan_share_id(subplan));
plan->lefttree = NULL;
return false;
}
else
{
Assert(get_plan_share_id(subplan) == SHARE_ID_NOT_ASSIGNED);
set_plan_share_id(subplan, list_length(ctxt->sharedNodes));
set_plan_share_id(plan, get_plan_share_id(subplan));
ctxt->sharedNodes = lappend(ctxt->sharedNodes, subplan);
return true;
}
}
return true;
}
/*
* Apply the share input mutator.
*/
Plan *apply_shareinput_dag_to_tree(Plan *plan, ApplyShareInputContext *ctxt)
{
shareinput_walker(shareinput_mutator_dag_to_tree, (Node *) plan, ctxt);
return plan;
}
/* Some helper: implements a stack using List. */
static void shareinput_pushmot(ApplyShareInputContext *ctxt, int motid)
{
ctxt->motStack = lcons_int(motid, ctxt->motStack);
}
static void shareinput_popmot(ApplyShareInputContext *ctxt)
{
list_delete_first(ctxt->motStack);
}
static int shareinput_peekmot(ApplyShareInputContext *ctxt)
{
return linitial_int(ctxt->motStack);
}
typedef struct ShareNodeWithSliceMark
{
Plan *plan;
int slice_mark;
} ShareNodeWithSliceMark;
static bool shareinput_find_sharenode(ApplyShareInputContext *ctxt, int share_id, ShareNodeWithSliceMark *result)
{
ListCell *lc;
ListCell *lc_slicemark;
if(ctxt->sharedNodes == NULL)
return false;
forboth(lc, ctxt->sharedNodes, lc_slicemark, ctxt->sliceMarks)
{
Plan *plan = (Plan *) lfirst(lc);
Assert(IsA(plan, Material) || IsA(plan, Sort));
if(get_plan_share_id(plan) == share_id)
{
if(result)
{
result->plan = plan;
result->slice_mark = lfirst_int(lc_slicemark);
}
return true;
}
}
return false;
}
/*
* First walk on shareinput xslice. It does the following:
*
* 1. Build the sharedNodes in context.
* 2. Build the sliceMarks in context.
* 3. Build a list a share on QD
*/
static bool
shareinput_mutator_xslice_1(Node* node, ApplyShareInputContext *ctxt, bool fPop)
{
Plan *plan = (Plan *) node;
if(fPop)
{
if(IsA(plan, Motion))
shareinput_popmot(ctxt);
return false;
}
if(IsA(plan, Motion))
{
Motion *motion = (Motion *) plan;
shareinput_pushmot(ctxt, motion->motionID);
return true;
}
if(IsA(plan, ShareInputScan))
{
ShareInputScan *sisc = (ShareInputScan *) plan;
int motId = shareinput_peekmot(ctxt);
Plan *shared = plan->lefttree;
Assert(sisc->plan.flow);
if(sisc->plan.flow->flotype == FLOW_SINGLETON)
{
if(sisc->plan.flow->segindex < 0)
ctxt->qdShares = list_append_unique_int(ctxt->qdShares, sisc->share_id);
}
if (shared)
{
Assert(shareinput_find_sharenode(ctxt, sisc->share_id, NULL) == false);
Assert(get_plan_share_id(plan) == get_plan_share_id(shared));
set_plan_driver_slice(shared, motId);
ctxt->sharedNodes = lappend(ctxt->sharedNodes, shared);
ctxt->sliceMarks = lappend_int(ctxt->sliceMarks, motId);
}
}
return true;
}
/*
* Second pass on shareinput xslice. It marks the shared node xslice,
* if a 'shared' is cross-slice.
*/
static bool
shareinput_mutator_xslice_2(Node* node, ApplyShareInputContext *ctxt, bool fPop)
{
Plan *plan = (Plan *) node;
if (fPop)
{
if (IsA(plan, Motion))
shareinput_popmot(ctxt);
return false;
}
if (IsA(plan, Motion))
{
Motion *motion = (Motion *) plan;
shareinput_pushmot(ctxt, motion->motionID);
return true;
}
if (IsA(plan, ShareInputScan))
{
ShareInputScan *sisc = (ShareInputScan *) plan;
int motId = shareinput_peekmot(ctxt);
Plan *shared = plan->lefttree;
if(!shared)
{
ShareNodeWithSliceMark plan_slicemark = {NULL /* plan */,0 /* slice_mark */};
int shareSliceId = 0;
shareinput_find_sharenode(ctxt, sisc->share_id, &plan_slicemark);
if (plan_slicemark.plan == NULL)
elog(ERROR, "could not find shared input node with id %d", sisc->share_id);
shareSliceId = get_plan_driver_slice(plan_slicemark.plan);
if(shareSliceId != motId)
{
ShareType stype = get_plan_share_type(plan_slicemark.plan);
if(stype == SHARE_MATERIAL || stype == SHARE_SORT)
set_plan_share_type_xslice(plan_slicemark.plan);
incr_plan_nsharer_xslice(plan_slicemark.plan);
sisc->driver_slice = motId;
}
}
}
return true;
}
/*
* Third pass:
* 1. Mark shareinput scan xslice,
* 2. Bulid a list of QD slices
*/
static bool
shareinput_mutator_xslice_3(Node *node, ApplyShareInputContext *ctxt, bool fPop)
{
Plan *plan = (Plan *) node;
if(fPop)
{
if(IsA(plan, Motion))
shareinput_popmot(ctxt);
return false;
}
if(IsA(plan, Motion))
{
Motion *motion = (Motion *) plan;
shareinput_pushmot(ctxt, motion->motionID);
return true;
}
if(IsA(plan, ShareInputScan))
{
ShareInputScan *sisc = (ShareInputScan *) plan;
int motId = shareinput_peekmot(ctxt);
ShareNodeWithSliceMark plan_slicemark = { NULL, 0 };
ShareType stype = SHARE_NOTSHARED;
shareinput_find_sharenode(ctxt, sisc->share_id, &plan_slicemark);
if(plan_slicemark.plan)
{
stype = get_plan_share_type(plan_slicemark.plan);
}
switch(stype)
{
case SHARE_MATERIAL_XSLICE:
Assert(sisc->share_type == SHARE_MATERIAL);
set_plan_share_type_xslice(plan);
break;
case SHARE_SORT_XSLICE:
Assert(sisc->share_type == SHARE_SORT);
set_plan_share_type_xslice(plan);
break;
default:
Assert(sisc->share_type == stype);
break;
}
if(list_member_int(ctxt->qdShares, sisc->share_id))
{
Assert(sisc->plan.flow);
Assert(sisc->plan.flow->flotype == FLOW_SINGLETON);
ctxt->qdSlices = list_append_unique_int(ctxt->qdSlices, motId);
}
}
return true;
}
/*
* The fourth pass. If a shareinput is running on QD, then all slices in
* this share must be on QD. Move them to QD.
*/
static bool
shareinput_mutator_xslice_4(Node *node, ApplyShareInputContext *ctxt, bool fPop)
{
Plan *plan = (Plan *) node;
int motId = shareinput_peekmot(ctxt);
if(fPop)
{
if(IsA(plan, Motion))
shareinput_popmot(ctxt);
return false;
}
if(IsA(plan, Motion))
{
Motion *motion = (Motion *) plan;
shareinput_pushmot(ctxt, motion->motionID);
/* Do not return. Motion need to be adjusted as well */
}
/* Well, the following test can be optimized if we record the test
* result so we test just once for all node in one slice. But this
* code is not perf critical so be lazy.
*/
if(list_member_int(ctxt->qdSlices, motId))
{
if(plan->flow)
{
Assert(plan->flow->flotype == FLOW_SINGLETON);
plan->flow->segindex = -1;
}
}
return true;
}
static void gpmon_pushplannode(ApplyShareInputContext *ctxt, Plan *plan)
{
ctxt->planNodes = lcons(plan, ctxt->planNodes);
}
static void gpmon_popplannode(ApplyShareInputContext *ctxt)
{
list_delete_first(ctxt->planNodes);
}
static Plan *gpmon_peekplannode(ApplyShareInputContext *ctxt)
{
return ctxt->planNodes == NULL ? NULL : linitial(ctxt->planNodes);
}
static bool assign_plannode_id_walker(Node *node, ApplyShareInputContext *ctxt, bool fPop)
{
Plan *p = (Plan *) node;
Plan *parent;
if(fPop)
{
gpmon_popplannode(ctxt);
return false;
}
p->plan_node_id = ++ctxt->nextPlanId;
parent = gpmon_peekplannode(ctxt);
if(!parent)
p->plan_parent_node_id = -1;
else
p->plan_parent_node_id = parent->plan_node_id;
gpmon_pushplannode(ctxt, p);
return true;
}
Plan *
apply_shareinput_xslice(Plan *plan, PlannerGlobal *glob)
{
ApplyShareInputContext *ctxt = &glob->share;
ListCell *lp;
ctxt->sharedNodes = NULL;
ctxt->sliceMarks = NULL;
ctxt->motStack = NULL;
ctxt->qdShares = NULL;
ctxt->qdSlices = NULL;
ctxt->planNodes = NIL;
ctxt->nextPlanId = 0;
shareinput_pushmot(ctxt, 0);
/*
* Walk the tree. See comment for each pass for what each pass will do.
* The context is used to carry information from one pass to another,
* as well as within a pass.
*/
/*
* A subplan might have a SharedScan consumer while the SharedScan
* producer is in the main plan, or vice versa. So in the first pass, we
* walk through all plans and collect all producer subplans into the
* context, before processing the consumers.
*/
foreach (lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
shareinput_walker(shareinput_mutator_xslice_1, (Node *) subplan, ctxt);
}
shareinput_walker(shareinput_mutator_xslice_1, (Node *) plan, ctxt);
/* Now walk the tree again, and process all the consumers. */
foreach (lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
shareinput_walker(shareinput_mutator_xslice_2, (Node *) subplan, ctxt);
}
shareinput_walker(shareinput_mutator_xslice_2, (Node *) plan, ctxt);
foreach (lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
shareinput_walker(shareinput_mutator_xslice_3, (Node *) subplan, ctxt);
}
shareinput_walker(shareinput_mutator_xslice_3, (Node *) plan, ctxt);
foreach (lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
shareinput_walker(shareinput_mutator_xslice_4, (Node *) subplan, ctxt);
}
shareinput_walker(shareinput_mutator_xslice_4, (Node *) plan, ctxt);
return plan;
}
/*
* assign_plannode_id - Assign an id for each plan node. Used by gpmon.
*/
void assign_plannode_id(PlannedStmt *stmt)
{
ApplyShareInputContext ctxt;
/* We only care about planNodes and nextPlanId, so initialize them. */
ctxt.planNodes = NIL;
ctxt.nextPlanId = 0;
leftdeep_walker(assign_plannode_id_walker, (Node *) stmt->planTree, &ctxt);
ctxt.planNodes = NIL;
ListCell *lc = NULL;
foreach (lc, stmt->subplans)
{
Plan *subplan = lfirst(lc);
Assert(subplan);
ctxt.planNodes = NIL;
leftdeep_walker(assign_plannode_id_walker, (Node *) subplan, &ctxt);
}
}
/*
* We can "zap" a Result node if it has an Append node as its left tree and
* no other complications (initplans, etc.) and its target list is
* composed of nothing but Var nodes.
*/
static bool can_zap_result(Result *res)
{
ListCell *lc;
if(!res->plan.lefttree
|| res->plan.initPlan
|| !IsA(res->plan.lefttree, Append)
|| res->resconstantqual
|| res->hashFilter
)
return false;
foreach(lc, res->plan.targetlist)
{
TargetEntry *te = (TargetEntry *) lfirst(lc);
if(!IsA(te->expr, Var))
return false;
}
return true;
}
/* "Zap" eligible Result nodes in the given plan by pushing their quals and
* targets down onto their Append argument, marking the Append node isZapped,
* and using it in place of the Result node.
*
* XXX Note the call to planner_subplan_put_plan which modifies the
* subplan's plan. This is unfortunate. We should look for a
* better way.
*/
Plan *zap_trivial_result(PlannerInfo *root, Plan *plan)
{
if(plan == NULL)
return NULL;
if(IsA(plan, SubPlan))
{
SubPlan *subplan = (SubPlan *) plan;
Plan *subplan_plan = planner_subplan_get_plan(root, subplan);
planner_subplan_put_plan(root, subplan, zap_trivial_result(root, subplan_plan));
return plan;
}
if(IsA(plan, List))
{
List *l = (List *) plan;
ListCell *lc;
foreach(lc, l)
{
lfirst(lc) = zap_trivial_result(root, lfirst(lc));
}
return plan;
}
if(IsA(plan, Append))
{
Append *app = (Append *) plan;
app->appendplans = (List *) zap_trivial_result(root, (Plan *) app->appendplans);
return plan;
}
if(IsA(plan, SubqueryScan))
{
SubqueryScan *subq = (SubqueryScan *) plan;
subq->subplan = zap_trivial_result(root, subq->subplan);
return plan;
}
if(IsA(plan, Result))
{
Result *res = (Result *) plan;
plan->lefttree = zap_trivial_result(root, plan->lefttree);
if(can_zap_result(res))
{
Append *app = (Append *) plan->lefttree;
app->plan.targetlist = plan->targetlist;
app->plan.qual = plan->qual;
app->isZapped = true;
return (Plan *) app;
}
else
return plan;
}
plan->lefttree = zap_trivial_result(root, plan->lefttree);
plan->righttree = zap_trivial_result(root, plan->righttree);
plan->initPlan = (List *) zap_trivial_result(root, (Plan *) plan->initPlan);
return plan;
}
/*
* Hash a const value with GPDB's hash function
*/
int32
cdbhash_const(Const *pconst, int iSegments)
{
CdbHash *pcdbhash = makeCdbHash(iSegments, HASH_FNV_1);
cdbhashinit(pcdbhash);
if (pconst->constisnull)
{
cdbhashnull(pcdbhash);
}
else
{
Assert(isGreenplumDbHashable(pconst->consttype));
Oid oidType = pconst->consttype;
if (typeIsArrayType(oidType))
{
oidType = ANYARRAYOID;
}
cdbhash(pcdbhash, pconst->constvalue, oidType);
}
return cdbhashreduce(pcdbhash);
}
/*
* Hash a list of const values with GPDB's hash function
*/
int32
cdbhash_const_list(List *plConsts, int iSegments)
{
Assert(0 < list_length(plConsts));
CdbHash *pcdbhash = makeCdbHash(iSegments, HASH_FNV_1);
cdbhashinit(pcdbhash);
ListCell *lc = NULL;
foreach(lc, plConsts)
{
Const *pconst = (Const *) lfirst(lc);
Assert(IsA(pconst, Const));
if (pconst->constisnull)
{
cdbhashnull(pcdbhash);
}
else
{
Assert(isGreenplumDbHashable(pconst->consttype));
Oid oidType = pconst->consttype;
if (typeIsArrayType(oidType))
{
oidType = ANYARRAYOID;
}
cdbhash(pcdbhash, pconst->constvalue, oidType);
}
}
return cdbhashreduce(pcdbhash);
}