/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*-------------------------------------------------------------------------
 *
 * planwindow.c
 *	  Planning for window queries.
 *
 * Portions Copyright (c) 2007-2008, Greenplum inc
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $ID$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "catalog/catquery.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "catalog/pg_window.h"
#include "nodes/makefuncs.h"
#include "nodes/print.h"
#include "optimizer/clauses.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/subselect.h"
#include "optimizer/tlist.h"
#include "optimizer/planshare.h"
#include "optimizer/var.h"
#include "parser/parse_clause.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "parser/parse_type.h"
#include "parser/parsetree.h"
#include "utils/debugbreak.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/selfuncs.h"

#include "cdb/cdbgroup.h"					/* cdbpathlocus_collocates() */
#include "cdb/cdbllize.h"                   /* pull_up_Flow() */
#include "cdb/cdbpath.h"
#include "cdb/cdbsetop.h"					/* make_motion... routines */
#include "cdb/cdbvars.h"

typedef enum CoplanType
{
	COPLAN_WINDOW,
	COPLAN_AGG
} CoplanType;

typedef struct Coplan
{
	CoplanType	type;
	Index		varno;
	AttrNumber	last_resno;
	int			num_aggs;
	List	   *targetnames;
	List	   *targetlist;
	int			rowkey_len;
	AttrNumber *rowkey_attrs;
	int			partkey_len;
	AttrNumber *partkey_attrs;
	int			trans_space;
} Coplan;

/* WindowInfo represents a set of coplans in the final plan for a window 
 * query.  The set contains at least one of a coplan for a Window node 
 * and a coplan for an Agg node.
 *
 * Each WindowInfo controls an array of SpecInfos all of which have the same
 * partitioning set and compatible ordering keys.  The SpecInfos are stored 
 * in WindowContext's array (nspecinfos, specinfos).  A WindowSpec controls
 * numspecindex of them beginning at firstspecindex.
 *
 * Stored in the WindowContext's array (nwindowinfos, windowinfos).
 */
typedef struct WindowInfo
{
	Index	firstspecindex;
	Index	numspecindex;
	List   *sortclause;
	int		partkey_len;
	int		rowkey_len;
	int     orderkeys_offset;
	bool	needpartkey;
	bool	needauxcount;
	
	AttrNumber *partkey_attrs;
	List	   *key_list;
	
	/* coplan assembly */
	
	struct Coplan  *window_coplan;
	struct Coplan  *agg_coplan;
} WindowInfo;


/* RefInfo holds a WindowRef and describes it in the context of the overall
 * query.  During planning, the contained WindowRef refers back to the
 * RefInfo via winindex - the position (counting from 0) of the RefInfo in
 * the WindowContext's list, refinfos.
 */
typedef struct RefInfo
{
	WindowRef  *ref;
	Bitmapset  *varset;
	Index		specindex; /* index of parent SpecInfo */
	bool		isagg;
	char		winkind;
	
	/* When isagg  */
	bool		hasinvtrans;
	bool		hasprelim;
	bool		hasinvprelim;
	
	/* When iswin = !isagg */
	bool		needcount;
	Oid			winpretype;
	Oid			winfinfunc;
	
	int			trans_space;
	
	Expr	   *resultexpr; /* Result expression for upper target list in parallel target list */
	AttrNumber	resno; /* Attribute number of result in sequential target list */
} RefInfo;

/* Macros to determine what kind of window function a RefInfo represents
 * should be used AFTER the context has been intialized by functions through
 * through assign_window_info().
 */
#define RefInfo_AggregateUnordered(rinfo, context) \
	( (rinfo)->isagg && !(rinfo)->needcount && (context)->specinfos[rinfo->specindex].order == NIL )

#define RefInfo_AggregateOrdered(rinfo, context) \
	( (rinfo)->isagg && !(rinfo)->needcount && (context)->specinfos[rinfo->specindex].order != NIL )

#define RefInfo_WindowDeferred(rinfo, context) \
	( !(rinfo)->isagg && (rinfo)->needcount )

#define RefInfo_WindowImmediate(rinfo, context) \
	( !(rinfo)->isagg && !(rinfo)->needcount )

/* SpecInfo represents a distinct, referenced WindowSpec from the query.
 * They are stored in the WindowContext's array (nspecinfos,specinfos).
 *
 * Later in planning, SpecInfo's with identical partitioning and compatible
 * ordering are grouped under a WindowInfo.
 */
typedef struct SpecInfo
{
	Index			specindex;		/* index of SpecInfo in context */
	Bitmapset	   *partset;		/* sortgrouprefs of partitioning keys */
	List		   *order;			/* ORDER BY clause */
	WindowFrame	   *frame;			/* framing clause */
	Bitmapset	   *refset;			/* indices of referencing RefInfo */
	Index			windowindex;	/* index of assigned Window node */
	int				keylevel;		/* key level of spec in Window node */
	List		   *partkey;		/* PARTITION BY clause - transient */
	/*
	 * Remaining SortClauses after removing keys that are
	 * duplicates of partitioning keys or other ORDER BY keys.
	 *
	 * This is used to sort SpecInfos and group SpecInfos into a
	 * WindowInfo.
	 */
	List           *unique_order; 
} SpecInfo;


/* WindowContext is the overall context for window planning.
 *
 */
typedef struct WindowContext
{
	List		   *upper_tlist;
	Bitmapset	   *upper_var_set;
	List		   *refinfos;
	
	int				nspecinfos;
	SpecInfo	   *specinfos;
	int				nwindowinfos;
	WindowInfo	   *windowinfos;

	int				rowkey_len;
	AttrNumber	   *rowkey_attrs;
	
	Index			coplan_count;
	
	bool			has_unordered_aggs;
	bool			has_deferred_window_fns;
	
	 /* Only for sequential plans */
	bool			original_range;
	Query		   *subquery;
	
	/* Map (varno,varattno) <--> (integer) for use in Bitmapsets recording
	 * use of vars from the window query's range table.
	 */
	int				max_varno;
	int			   *varattno_offsets;
	int			   *offset_varnos;
	int				offset_lim;
	
	/* Offset map for final targetlist translation for parallel plans. */
	AttrNumber	   *offset_upper_varattrnos;
	
	/* Target list for the "common subquery".
	 */
	List		   *lower_tlist;
	List		   *keyed_lower_tlist;
	Plan		   *subplan;
	CdbPathLocus	subplan_locus;
	List		   *subplan_pathkeys;

	/* List of PathKeys, one for each coplan. */
	List           *subplans_pathkeys;
	
		
	/* Map (ressortgrpref) --> (resno) in lower_tlist */
	int				max_sortref;
	AttrNumber	   *sortref_resno;
	
	/* Transient state for preprocess_window_tlist */
	RefInfo		   *cur_refinfo;
	List           *win_specs;
	List           *orig_tlist;
} WindowContext;

#define VarattnoAdjustment (-(FirstLowInvalidHeapAttributeNumber + 1))

/* Forward declarations (local) */

static WindowContext *newWindowContext(void);
static void deleteWindowContext(WindowContext *context);
static void build_sortref_index(List *tlist, int *max_sortref, AttrNumber **sortref_resno);
static void build_var_index(Query *parse, WindowContext *context);
static int index_of_var(Var * var, WindowContext *context);
static void preprocess_window_tlist(List *orig_tlist, WindowContext *context);
static Node * window_tlist_mutator(Node *node, WindowContext *context);
static bool window_tlist_vars_walker(Node *node, WindowContext *context);
static void inventory_window_specs(List *window_specs, WindowContext *context);
static int compare_order(List *a, List* b);
static int compare_edge(WindowFrameEdge *a, WindowFrameEdge *b);
static int compare_frame(WindowFrame *a, WindowFrame *b);
static int compare_spec_info_ptr(const void *arg1, const void *arg2);
static bool validateBound(Node *node, bool is_rows);
static WindowFrameEdge *adjustFrameBound(WindowFrameEdge *edge, bool is_rows);
static void make_lower_targetlist(Query *parse, WindowContext *context);
static void set_window_keys(WindowContext *context, int wind_index);
static void assign_window_info(WindowContext *context);
static bool is_order_prefix_of(List *sca, List *scb);
static void lookup_window_function(RefInfo *rinfo);
static Plan *plan_trivial_window_query(PlannerInfo *root, WindowContext *context, List **pathkeys_ptr);
static Plan *plan_sequential_window_query(PlannerInfo *root, WindowContext *context, List **pathkeys_ptr);
static Plan *plan_sequential_stage(PlannerInfo *root, WindowContext *context, 
		int hi_windex, int lo_windex,
		Plan *input_plan, CdbPathLocus *locus_ptr, List **pathkeys_ptr);
static Plan *assure_order(PlannerInfo *root, Plan *input_plan, List *sortclause, List **pathkeys_ptr);
static Expr * make_mergeclause(Index lftvarno, AttrNumber lftattrno, Node *rgtexpr);
static List *translate_upper_tlist_sequential(List *orig_tlist, List *window_tlist, WindowContext *context);
static Plan *plan_parallel_window_query(PlannerInfo *root, WindowContext *context, List **pathkeys_ptr);
static void plan_windowinfo_coplans(PlannerInfo *root, WindowContext *context, int window_index);
static List * plan_window_rtable(PlannerInfo* root, WindowContext *context);
static List *make_rowkey_targets(void);
static Coplan *makeCoplan(CoplanType type, WindowContext *context);
static Plan *assure_collocation_and_order(PlannerInfo *root, Plan *input_plan, 
		int partkey_len, AttrNumber *partkey_attrs, List *sortclause, 
		CdbPathLocus input_locus, CdbPathLocus *output_locus, List **pathkeys_ptr);
static AttrNumber addTargetToCoplan(Node *target, Coplan *coplan, WindowContext *context);
static Aggref* makeWindowAggref(WindowRef *winref);
static Aggref* makeAuxCountAggref(void);
static List *translate_upper_tlist_parallel(List *orig_tlist, WindowContext *context);
static Node *translate_upper_vars_sequential(Node *node, WindowContext *context);
static FromExpr *plan_window_jointree(PlannerInfo *root, WindowContext *context);
static Expr *make_window_join_term(Oid type, Index lftvarno, AttrNumber lftattrno, Index rgtvarno, AttrNumber rgtattrno, bool no_nulls);
static Oid *get_window_attr_types(List *targetlist, int nattrs, AttrNumber *attrs);
static Plan *add_join_to_wrapper(PlannerInfo *root, Plan *plan, Query *query, List *join_tlist,
								 unsigned partkey_len, 
								 const char *alias_name, List *col_names, 
								 Query *wrapper_query);
static Query *copy_common_subquery(Query *original, List *targetList);
static char *get_function_name(Oid proid, const char *dflt);


Plan *
window_planner(PlannerInfo *root, double tuple_fraction, List **pathkeys_ptr)
{
	Query		   *parse = root->parse;
	WindowContext  *context = newWindowContext();
	Plan		   *result_plan = NULL;

	/* Assert existence of windowing in query. */
	Assert(parse->targetList != NIL);
	Assert(parse->windowClause != NULL);
	Assert(parse->hasWindFuncs);	
	/* Assert no unsupported stuff */
	Assert(parse->setOperations == NULL);
	Assert(!parse->hasAggs);
	Assert(parse->groupClause == NIL);
	Assert(parse->havingQual == NULL);
	
#ifdef CDB_WINDOW_DISPLAY	
	elog_node_display(NOTICE, "Window  query target list", parse->targetList, true);
	elog_node_display(NOTICE, "Window  query window  clause", parse->windowClause, true);
#endif
	
	/* Create a map: (varno,varattno)  <--> (integer) for use in recording
	 * Var references in Bitmapsets.
	 */
	build_var_index(parse, context);

	/*
	 * Set context->win_specs and context->orig_tlist. These
	 * are needed in preprecess_window_tlist() to check NTILE function
	 * arguments.
	 */
	context->win_specs = parse->windowClause;
	context->orig_tlist = parse->targetList;
	
	/* Create a copy of the input target list for our use.  Save pointers
	 * to the contained WindowRef nodes on the side in a array of RefInfo
	 * structures.
	 */
	preprocess_window_tlist(parse->targetList, context);

	/* Create an array of SpecInfo structures corresponding to the distinct
	 * referenced WindowSpec nodes in the input. Refer back to this array
	 * from the array of RefInfo structures.
	 */
	inventory_window_specs(parse->windowClause, context);

	/* Generate the (lower) target list for the common subquery.  
	 * Importantly, the ressortgroupref values agree with those in
	 * the input (upper) target list (and all of them appear).
	 */
	make_lower_targetlist(root->parse, context);
	
	/* Divide the SpecInfos into (contiguous) groups to be handled by a
	 * single Window node and associated auxiliary stuff (in other words,
	 * by a single sort of the input) and assign a WindowInfo to represent
	 * each group.
	 */
	assign_window_info(context);
	
	Assert( context->upper_tlist != NULL );
	Assert( list_length(context->refinfos) > 0 );
	Assert( context->nspecinfos > 0 );
	
	
	if ( context->nwindowinfos == 1 && 
		!(context->has_unordered_aggs || context->has_deferred_window_fns) )
	{
		return plan_trivial_window_query(root, context, pathkeys_ptr);
	}
	else
	{
		if ( root->config->gp_enable_sequential_window_plans )
			return plan_sequential_window_query(root, context, pathkeys_ptr);
		else
			return plan_parallel_window_query(root, context, pathkeys_ptr);
	}

	/* TODO Check our API.
	 *
	 * Note: pathkeys may be an important implicit result. 
	 */
	
	deleteWindowContext(context);
	
	if ( result_plan == NULL )
	{
		ereport(ERROR,
			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
			 errmsg("That windowing query is not supported yet.")));
	}
	
	return result_plan;
}

/* Create a palloc'd WindowContext structure for local use. */
static WindowContext *newWindowContext()
{
	WindowContext *context = palloc(sizeof(WindowContext));
	
	context->upper_tlist = NIL;
	context->upper_var_set = NULL;
	context->refinfos = NIL;
	context->nspecinfos = 0;
	context->specinfos = NULL;
	context->nwindowinfos = 0;
	context->windowinfos = NULL;
	context->rowkey_len = 0;
	context->rowkey_attrs = NULL;
	
	context->coplan_count = 0;
	context->has_unordered_aggs = false;
	context->has_deferred_window_fns = false;

	context->original_range = true;
	context->subquery = NULL;
	
	context->max_varno = 0;
	context->varattno_offsets = NULL;
	context->offset_varnos = NULL;
	context->offset_lim = 0;
	context->offset_upper_varattrnos = NULL;
	
	context->max_sortref = 0;
	context->sortref_resno = NULL;

	context->cur_refinfo = NULL;
	
	return context;
}

/* Delete palloc'd WindowContext and owned sub-structures. */
static void deleteWindowContext(WindowContext *context)
{
	ListCell *cell;
	
	if ( context == NULL )
		return;
	
	/* no need to free upper_tlist */
	
	bms_free(context->upper_var_set);
	
	foreach( cell, context->refinfos )
	{
		RefInfo *ref = (RefInfo *)lfirst(cell);
		/* no need to free actual ref */
		bms_free(ref->varset);
	}
	list_free_deep(context->refinfos);
	
	if ( context->specinfos != NULL )
		pfree(context->specinfos);
	
	if ( context->rowkey_attrs != NULL )
		pfree(context->rowkey_attrs);
	
	if ( context->windowinfos != NULL )
		pfree(context->windowinfos);
	
	if ( context->varattno_offsets != NULL )
		pfree(context->varattno_offsets);
	
	if ( context->offset_varnos != NULL )
		pfree(context->offset_varnos);
	
	if ( context->offset_upper_varattrnos != NULL )
		pfree(context->offset_upper_varattrnos);
	
	if ( context->sortref_resno != NULL )
		pfree(context->sortref_resno);
	
	/* TODO Clean up handling of 
	 *   lower_tlist, 
	 *   keyed_lower_tlist, 
	 *   subplan, 
	 *   subplan_pathkeys
	 * so we know whether to free them!
	 */
	
	/* don't free cur_refinfo */
	
	pfree(context);
}



/* Build an index structure to convert a (varno, varattno) to an integer
 * index and save it in the context.  The intended use of the index is
 * as follows:
 *
 *    varattno_offsets[varno] + varattno + VarattnoAdjustment  -->  index
 *
 *    offset_varnos[index]  -->  varno
 *    index - varattno_offsets[varno] - VarattnoAdjustment  --> varattno
 */
static void build_var_index(Query *parse, WindowContext *context)
{
	int i, j, n;
	int max_varno = list_length(parse->rtable);
	
	context->max_varno = max_varno;
	context->varattno_offsets = palloc0((max_varno+1)*sizeof(int));
	
	expression_tree_walker((Node*)parse->targetList,
							window_tlist_vars_walker, 
							(void*)context );
							
	for ( i = 0, n = 0; i <= max_varno; i++ )
	{
		int max_attno = context->varattno_offsets[i];
		context->varattno_offsets[i] = n;
		n += max_attno;
	}
	
	context->offset_lim = (n+1)*sizeof(int);
	context->offset_varnos = palloc0(context->offset_lim);
	
	for ( i = 1, j = 1; i <= max_varno; i++ )
	{
		for ( ; j <= context->varattno_offsets[i]; j++ )
			context->offset_varnos[j] = i;
	}
}

/* Subroutine for build_var_index() updates varattno_offsets vector to
 * record largest attribute number seen per varno (at the current level). 
 */
static bool window_tlist_vars_walker(Node *node, WindowContext *context)
{
	if ( node == NULL )
		return false;
	
	if ( IsA(node, Var) )
	{
		Var *var = (Var *) node;
		AttrNumber adjusted_attno;
		Assert( var->varno <= context->max_varno );
		if ( var->varlevelsup > 0 )
			return false;
		
		adjusted_attno = var->varattno + VarattnoAdjustment;
		if ( adjusted_attno > context->varattno_offsets[var->varno] )
			context->varattno_offsets[var->varno] = adjusted_attno;
		return false;
	}
	return expression_tree_walker(node, window_tlist_vars_walker, (void *)context);
}

/* Standard use of variable-to-offset index -- convert a Var on the lower
 * range to an integer, e.g., for use in a bit map. 
 */
static int index_of_var(Var * var, WindowContext *context)
{
	AttrNumber adjusted_attno;
	Assert( var->varno <= context->max_varno );
	
	adjusted_attno = var->varattno + VarattnoAdjustment;
	return context->varattno_offsets[var->varno] + adjusted_attno;
}


/* Build a map of sortgroupref to resno for the given target list.
 * Store its maximum sortref value and the index where specified.
 */
static void build_sortref_index(List *tlist, int *max_sortref, AttrNumber **sortref_resno)
{
	ListCell *lc;
	AttrNumber *index = NULL;
	int n = 0;
	
	foreach ( lc, tlist )
	{
		TargetEntry *tle = (TargetEntry *)lfirst(lc);
		if ( tle->ressortgroupref > n )
			n = tle->ressortgroupref;
	}

	index = (AttrNumber *)palloc0((n+1) * sizeof(AttrNumber));
	
	foreach ( lc, tlist )
	{
		TargetEntry *tle = (TargetEntry *)lfirst(lc);
		if ( tle->ressortgroupref != 0 )
			index[tle->ressortgroupref] = tle->resno;
	}
	
	*max_sortref = n;
	*sortref_resno = index;
}

/*
 * ntile_argument_walker
 *
 * Walk through the expressions in the NTILE argument. Return false
 * when the expression is a constant or the one that appears in
 * part_tlist.
 */
static bool
ntile_argument_walker(Node *node, List *part_tlist)
{
	ListCell *lc = NULL;
	
	if (node == NULL)
		return false;
	if (IsA(node, Const))
		return false;

	/*
	 * Check if node appears in part_tlist.
	 */
	foreach(lc, part_tlist)
	{
		TargetEntry *tle = (TargetEntry*)lfirst(lc);

		if (equal(tle->expr, node))
			return false;
	}
	
	if (IsA(node, Var))
		return true;
	
	return expression_tree_walker(node, ntile_argument_walker, part_tlist);
}

/*
 * check_ntile_argument
 *
 * Check expressions used in the NTILE function argument. These expressions
 * should be either a constant or expressions in PARTITION BY clauses.
 */
static void
check_ntile_argument(List *args, WindowSpec *win_spec, List *tlist)
{
	ListCell *lc;
	List *part_tlist = NIL;
	
	if (list_length(win_spec->partition) > 0)
	{
		/*
		 * Obtain the list of target entries for each expression in the
		 * PARTITION BY clause. The PARTITION BY expressions should appear
		 * in tlist.
		 */
		foreach (lc, win_spec->partition)
		{
			ListCell *tlist_lc = NULL;
			
			SortClause *sc = (SortClause *) lfirst(lc);
			Assert(IsA(sc, SortClause));

			foreach(tlist_lc, tlist)
			{
				TargetEntry *tle = (TargetEntry *)lfirst(tlist_lc);
				if (tle->ressortgroupref == sc->tleSortGroupRef)
				{
					part_tlist = lappend(part_tlist,
										 tle);
					break;
				}
			}

			if (tlist_lc == NULL)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("PARTITION BY expression does not appear in the targetlist."),
								   errOmitLocation(true)));
		}
	}

	foreach (lc, args)
	{
		Node *node = lfirst(lc);

		if (ntile_argument_walker(node, part_tlist))
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("NTILE function argument expression should be in PARTITION BY."),
							   errOmitLocation(true)));
	}
}

/*  preprocess_window_tlist
 *
 * Stores a deep copy of the given target list in the context and takes
 * inventory of its content as follows:
 *
 *	upper_tlist		the deep copy
 *
 *	upper_var_set	Vars used outside of WindowRef nodes
 *
 *	ref_infos		per WindowRef list of RefInfo containing
 *		ref		the WindowRef in upper_tlist
 *		varset	Vars used in the WindowRefs explicit arguments
 */
static void preprocess_window_tlist(List *orig_tlist, WindowContext *context)
{
	context->cur_refinfo = NULL;
	context->upper_var_set = NULL;
	
	context->upper_tlist =  (List*)  
		expression_tree_mutator((Node*)orig_tlist,
								 window_tlist_mutator, 
								 (void*)context );
}

/* Subroutine for preprocess_window_tlist.
 *
 * Deep copy the given expression (presumed to be a part of the parse tree
 * target list for a window query) while checking validity of and recording
 * information about contained WindowRef nodes.
 */
static Node * window_tlist_mutator(Node *node, WindowContext *context)
{
	if ( node == NULL )
		return NULL;
		
	if ( IsA(node, Var) )
	{
		Var *var = (Var *)node;
		int idx = index_of_var(var, context);
		
		if ( context->cur_refinfo == NULL ) /* above all WindowRef */
		{
			if ( var->varlevelsup == 0 )
			{
				context->upper_var_set = 
					bms_add_member(context->upper_var_set, idx);
			}
		}
		else /* below a WindowRef */
		{
			if ( var->varlevelsup == 0 )
			{
				context->cur_refinfo->varset = 
					bms_add_member(context->cur_refinfo->varset, idx);
			}
			else
			{
				ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("Call to window function may not reference outer queries."),
							   errOmitLocation(true)));
			}
		}
		return (Node *) copyObject(node); /* Exact copy of the Var */
	}
	else if ( IsA(node, FuncExpr) )
	{
		/*
		 * If this function is part of the argument for a NTILE function,
		 * it is not allowed to be a volatilve function.
		 */
		if (context->cur_refinfo != NULL)
		{
			FuncExpr *expr = (FuncExpr *)node;

			if (IS_NTILE(context->cur_refinfo->winkind) &&
				(func_volatile(expr->funcid) == PROVOLATILE_VOLATILE))
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("NTILE function argument should not use volatile functions."),
								   errOmitLocation(true)));
		}
	}
	
	else if ( IsA(node, WindowRef) )
	{
		WindowRef *window_ref = (WindowRef *)node;
		WindowRef *new_ref = NULL;
		
		if ( context->cur_refinfo == NULL ) /* top-level WindowRef */
		{
			new_ref = copyObject(window_ref);

			context->cur_refinfo = palloc0(sizeof(RefInfo));			
			context->cur_refinfo->ref = new_ref;
			context->cur_refinfo->varset = NULL;
			lookup_window_function(context->cur_refinfo);
			
			new_ref->args = (List *)
				window_tlist_mutator((Node*)new_ref->args, context);

			/*
			 * If this function is a NTILE function, check if its argument
			 * contains expressions that are constant or those in the
			 * PARTITION BY clause.
			 */
			if (IS_NTILE(context->cur_refinfo->winkind))
			{
				WindowSpec *winspec = list_nth(context->win_specs, new_ref->winspec);
				
				check_ntile_argument(new_ref->args, winspec, context->orig_tlist);
			}
							
			/* Record the WindowRef and its Vars referenced set. */
			new_ref->winindex = list_length(context->refinfos);
			context->refinfos = lappend(context->refinfos, context->cur_refinfo);
			
			/* Clean up context. */
			context->cur_refinfo = NULL;
			
			return (Node*)new_ref;
		}
		else
		{
			ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("Window function calls may not be nested."),
						   errOmitLocation(true)));
		}
	}
	
	return expression_tree_mutator(node, window_tlist_mutator, (void*) context);
}


/* inventory_window_specs
 * 
 * Take inventory of the supplied list of WindowSpecs in the given context. 
 * Add an array of distinct SpecInfo to the context.  Exclude SpecInfos that
 * are not referenced by any WindowRefs.  Afterward, the specindex field of
 * a RefInfo identifies the parent SpecInfo of its WindowRef.
 *
 * Note that preprocess_window_tlist() must already have run in order to
 * fill in the list refinfos list in context.
 */
static void inventory_window_specs(List *window_specs, WindowContext *context)
{
	ListCell *lcs, *lcp, *lcr;
	SpecInfo **index;
	int i, j, ndistinct;
	unsigned nspec, nextra;
	SpecInfo *specinfos;
	
	/* Make a preliminary pass over the refinfos to count how many
	 * extra specinfos we might need to add to those specified
	 * in the parse tree.
	 */
	nextra = 0;
	foreach ( lcr, context->refinfos )
	{
		RefInfo *rinfo = (RefInfo *)lfirst(lcr);
		if ( rinfo->winkind == WINKIND_LEAD || rinfo->winkind == WINKIND_LAG )
			nextra++;
	}
	
	/* Construct preliminary array of SpecInfo structs, one per WindowSpec
	 * plus possible extras as counted above.  Note that, at this point, 
	 * the order of the initial entries is the same as the entries in the
	 * Query's windowClause list.  Space for extras is allocated at the
	 * end and may be used as needed.
	 */
	nspec = list_length(window_specs);
	specinfos = palloc0((nspec+nextra) * sizeof(SpecInfo));
	i = 0;
	foreach ( lcs, window_specs )
	{
		WindowSpec *spec = (WindowSpec *) lfirst(lcs);
		Bitmapset *map = NULL;
		ListCell *lc;
		Bitmapset *orderset = NULL;
		
		foreach ( lcp, spec->partition )
		{
			SortClause *sc = (SortClause *) lfirst(lcp);
			map = bms_add_member(map, sc->tleSortGroupRef);
		}
		specinfos[i].specindex = i;
		specinfos[i].partset = map;
		specinfos[i].order = spec->order;
		specinfos[i].frame = spec->frame;
		specinfos[i].refset = NULL;
		specinfos[i].windowindex = 0;
		specinfos[i].keylevel = 0;
		specinfos[i].partkey = spec->partition;

		/* Construct unique_order for each SpecInfo by removing
		 * keys that are duplicates of partitioning keys and other
		 * ORDER BY keys.
		 */
		specinfos[i].unique_order = NIL;
		
		foreach (lc, specinfos[i].order)
		{
			SortClause *sc = (SortClause *)lfirst(lc);
			if (!bms_is_member(sc->tleSortGroupRef, specinfos[i].partset))
			{
				if (!bms_is_member(sc->tleSortGroupRef, orderset))
				{
					specinfos[i].unique_order =
						lappend(specinfos[i].unique_order, sc);
					orderset =
						bms_add_member(orderset, sc->tleSortGroupRef);
				}
			}
		}

		bms_free(orderset);

		i++;
	}
	
	/* Note which WindowRefs each WindowSpec covers by saving the indexes
	 * of their RefInfos.  Redirect WindowRefs with special framing needs
	 * to one of the extra SpecInfos allocated above.  In addition, note 
	 * in the context any occurence of a window function that requires an 
	 * auxiliary aggregate coplan. 
	 */
	i = 0;
	foreach ( lcr, context->refinfos )
	{
		RefInfo *rinfo = (RefInfo *)lfirst(lcr);
		WindowRef *ref = rinfo->ref;
		int sindex = ref->winspec;
		SpecInfo *sinfo = &specinfos[sindex];
		
		/* If Special Framing ... */
		if ( rinfo->winkind == WINKIND_LEAD || rinfo->winkind == WINKIND_LAG )
		{ 
			Node *offset = NULL;
			WindowFrame * frame = NULL;
			SpecInfo *xinfo = sinfo;
			
			/* Add a new "extra" SpecInfo for the specially framed window
			 * reference.  Note that we could be better by recognizing a
			 * previously added "extra" that meets our needs, but we
			 * don't do that yet.
			 */
			sindex = nspec++;
			sinfo = &specinfos[sindex];
			memcpy(sinfo, xinfo, sizeof(SpecInfo));
			sinfo->specindex = sindex;
			sinfo->refset = NULL; /* MPP-4836 */
			
			Assert( sinfo->frame == NULL );
			
			/* At the moment, the only special framing requirements are for
			 * LEAD and LAG.  These carry an offset as their second argument
			 * and, by now, the offset has be resolved to a Const node, if
			 * possible.
			 */	
			if ( list_length(ref->args) > 1 )
			{
				offset = (Node*)list_nth(ref->args,1);
			}
			else
			{
				Const *c = (Const*)makeNode(Const);
				c->consttype = INT8OID;
				c->constlen = 8;
				c->constvalue = 1;
				c->constisnull = false;
				c->constbyval = true;
				offset = (Node*)c;
			}
			
			frame = makeNode(WindowFrame);
			frame->system_generated = true;
			frame->is_between = true;
			frame->is_rows = true;
			frame->trail = makeNode(WindowFrameEdge);
			frame->lead = makeNode(WindowFrameEdge);

			switch ( rinfo->winkind )
			{
			case WINKIND_LAG:
				if ( IsA(offset,Const) )
					frame->trail->kind = frame->lead->kind = WINDOW_BOUND_PRECEDING;
				else
					frame->trail->kind = frame->lead->kind = WINDOW_DELAYED_BOUND_PRECEDING;
				break;
				
			case WINKIND_LEAD:
				if ( IsA(offset,Const) )
					frame->trail->kind = frame->lead->kind = WINDOW_BOUND_FOLLOWING;
				else
					frame->trail->kind = frame->lead->kind = WINDOW_DELAYED_BOUND_FOLLOWING;
				break;
				
			default:
				elog(ERROR, "internal window framing error");
			}
			
			frame->trail->val = frame->lead->val = offset;
			sinfo->frame = frame;
		}
		else if ( sinfo->frame != NULL )
		{
			if (sinfo->frame->lead &&
				sinfo->frame->lead->kind != WINDOW_DELAYED_BOUND_PRECEDING &&
				sinfo->frame->lead->kind != WINDOW_DELAYED_BOUND_FOLLOWING)
				sinfo->frame->lead = adjustFrameBound(sinfo->frame->lead, sinfo->frame->is_rows);
			if (sinfo->frame->trail &&
				sinfo->frame->trail->kind != WINDOW_DELAYED_BOUND_PRECEDING &&
				sinfo->frame->trail->kind != WINDOW_DELAYED_BOUND_FOLLOWING)
				sinfo->frame->trail = adjustFrameBound(sinfo->frame->trail, sinfo->frame->is_rows);
		}
		
		sinfo->refset = bms_add_member(sinfo->refset, i);
		
		rinfo->specindex = sinfo->specindex; /* in case this is an extra */
		
		if ( rinfo->isagg && sinfo->order == NIL )
			context->has_unordered_aggs = true;
		if ( rinfo->needcount && ! rinfo->isagg )
			context->has_deferred_window_fns = true;
		
		i++;
	}	
	
	/* Make an index to group by partition, order, and frame. */
	index = (SpecInfo**)palloc(nspec * sizeof(SpecInfo**));
	for ( i = 0; i < nspec; i++ )
		index[i] = &specinfos[i];
	qsort(index, nspec, sizeof(SpecInfo*), compare_spec_info_ptr);
	
	
	
	/* Identify distinct, referenced window specs and bubble up their
	 * refsets. Begin by finding the first referenced window spec and
	 * mark it to be the first  distinct representative window spec.  
	 * 
	 * Though not strictly necessary, we mark unreferenced window specs 
	 * with an index of nspec.
	 *
	 * In the second loop, j (via the index) indexes the current distinct 
	 * representative SpecInfo while i indexes the next candidate.
	 *
	 * Since processing moves forward through the index, we write the
	 * answer in its leading elements.
	 */
	for ( j = 0; j < nspec; j++ )
	{
		if ( ! bms_is_empty(index[j]->refset) )
			break;
		else
			index[j]->specindex = nspec; /* don't use */
	}
	Assert ( j < nspec ); /* there is a referenced specinfo */
	
	ndistinct = 0;
	index[j]->specindex = ndistinct;
	index[ndistinct++] = index[j];
	for ( i = j+1; i < nspec; i++ )
	{
		if ( compare_spec_info_ptr(&index[j],&index[i]) != 0 )
		{
			if ( bms_is_empty(index[i]->refset) )
			{
				index[i]->specindex = nspec; /* don't use */
				continue;
			}
			/* SpecInfo at index[i] is start of next run. */
			j = i;
			index[j]->specindex = ndistinct;
			index[ndistinct++] = index[j];
		}
		else
		{
			/* SpecInfo at index[i] is duplicate of last. */
			index[j]->refset = bms_union(index[j]->refset, index[i]->refset);
			index[i]->specindex = index[j]->specindex;
		}
	}
	
	/* Construct array of distinct windowspecs in the context. */
	context->nspecinfos = ndistinct;
	context->specinfos = palloc0(ndistinct * sizeof(SpecInfo));
	for ( j = 0; j < ndistinct; j++ )
	{
		Assert( index[j]->specindex == j );
		memcpy(&context->specinfos[j], index[j], sizeof(SpecInfo));
	}
	
	/* Adjust RefInfo specindex to index the parent SpecInfo in context. 
	 * Update summary info in the parent.
	 */
	foreach ( lcr, context->refinfos )
	{
		RefInfo *rinfo = (RefInfo *)lfirst(lcr);
		SpecInfo *sinfo = & specinfos[rinfo->specindex];
		
		Assert( sinfo->specindex < ndistinct );
		rinfo->specindex = sinfo->specindex;
	}		
	pfree(specinfos);
}


/* assign_window_info
 *
 * Assigns a WindowInfo to each group of compatible SpecInfo structures in
 * the context. The groups will be contiguous in array context->specinfos
 * due to the sort order established by inventory_window_specs.
 *
 * Ultimately each WindowInfo will correspond to a Window node and the
 * notion of "compatible" is based on the requirements of Window nodes.
 */
static void assign_window_info(WindowContext *context)
{
	int i, j, k, n;
	Index current;
	ListCell *lc;
	
	Assert( context->nspecinfos > 0 );
	
	/* First divide the previously produced, sorted list of distinct
	 * SpecInfo into groups with matching partitioning requirements 
	 * and so that each group member's ordering key is a prefix of 
	 * each following member's keys.
	 */
	context->specinfos[0].windowindex = current = 0;	
	for ( i = 1, j = 0; i < context->nspecinfos; j = i, i++ )
	{
		if ( ! bms_equal(context->specinfos[j].partset, context->specinfos[i].partset) 
			 || ! is_order_prefix_of(context->specinfos[j].unique_order, context->specinfos[i].unique_order) )
		{
			current++;
		}
		context->specinfos[i].windowindex = current;
	}
	n = current + 1;
	
	/* Next allocate a WindowInfo for each group and setup its requirements.
	 */
	context->nwindowinfos = n;
	context->windowinfos = (WindowInfo *)palloc0(n * sizeof(WindowInfo));
	for ( i = k = 0; i < n; i++ )
	{
		SpecInfo *final_sinfo = NULL;
		WindowInfo *winfo = &context->windowinfos[i];
		int sortgroupref = 0;
		
		j = k; /* j indexes first SpecInfo in this WindowInfo */
		
		for ( ; k < context->nspecinfos; k++ )
		{
			SpecInfo *sinfo = &context->specinfos[k];
			
			if ( sinfo->windowindex != i )
				break; /* k indexes first SpecInfo in next WindowInfo */
			final_sinfo = sinfo; /* has longest sort key so far */
		}
		
		Assert( final_sinfo != NULL );

		winfo->firstspecindex = j;
		winfo->numspecindex = k-j;

		winfo->sortclause = NIL;

		/* Append part keys into sortclause in the order defined by
		 * final_sinfo->partset. This guarantees that same part keys
		 * in all WindowInfos are in the same order.
		 */
		sortgroupref = bms_first_from(final_sinfo->partset, 0);
		while (sortgroupref >= 0)
		{
			foreach (lc, final_sinfo->partkey)
			{
				SortClause *sc = (SortClause *)lfirst(lc);
				
				if (sc->tleSortGroupRef == sortgroupref)
				{
					winfo->sortclause = lappend(winfo->sortclause, sc);
					break;
				}
			}

			Assert(lc != NULL);
			
			sortgroupref = bms_first_from(final_sinfo->partset, sortgroupref+1);
		}
		
		winfo->sortclause = list_concat(winfo->sortclause,
										final_sinfo->unique_order);
		winfo->partkey_len = list_length(final_sinfo->partkey);
		winfo->orderkeys_offset = list_length(final_sinfo->partkey);
	}
	
	/* Set up the partitioning and ordering key levels (WindowKeys)  */
	for ( i = 0; i < context->nwindowinfos; i++ )
	{	
		set_window_keys(context, i);
	}
	
	foreach ( lc, context->refinfos )
	{
		RefInfo *rinfo = (RefInfo*)lfirst(lc);
		SpecInfo *sinfo = &context->specinfos[rinfo->specindex];
		WindowInfo *winfo = &context->windowinfos[sinfo->windowindex];
		
		if ( sinfo->keylevel < 0 )
		{
			/* Unordered */
			Assert( sinfo->order == NIL );
			
			rinfo->ref->winlevel = list_length (winfo->key_list);
			if ( rinfo->isagg )
				winfo->needpartkey = true; /* unordered aggregation */
		}
		else
		{
			/* Ordered */
			Assert( sinfo->order != NIL );
			
			rinfo->ref->winlevel = sinfo->keylevel;
		}
		
		/* Check for window function in need of auxiliary count. */
		if ( !rinfo->isagg && rinfo->needcount )
		{
			winfo->needpartkey = true;
			winfo->needauxcount = true;
		}
	}
}


static bool is_order_prefix_of(List *sca, List *scb)
{
	ListCell *lca, *lcb;
	
	if ( list_length(sca) > list_length(scb) )
		return false;
	
	forboth(lca, sca, lcb, scb)
	{
		if ( !equal(lfirst(lca),lfirst(lcb)) )
			return false;
	}
	return true;
}



/* Comparision functions for local use */

static int compare_spec_info_ptr(const void *arg1, const void *arg2)
{
	int n;
	SpecInfo *a = *(SpecInfo **)arg1;
	SpecInfo *b = *(SpecInfo **)arg2;
	
	/* partition */
	n = bms_compare(a->partset, b->partset);
	if ( n != 0 ) 
		return n;
	
	/* order */
	n = compare_order(a->unique_order, b->unique_order);
	if ( n != 0 ) 
		return n;
	
	n = compare_order(a->order, b->order);
	if ( n != 0 ) 
		return n;

	/* frame */
	return compare_frame(a->frame, b->frame);
}

static int compare_order(List *a, List* b)
{
	ListCell *lca, *lcb;
	int na, nb;
	
	forboth ( lca, a, lcb, b )
	{
		SortClause *sca = (SortClause *)lfirst(lca);
		SortClause *scb = (SortClause *)lfirst(lcb);
		
		if ( sca->tleSortGroupRef < scb->tleSortGroupRef )
			return -1;
		else if ( sca->tleSortGroupRef > scb->tleSortGroupRef )
			return 1;
		else if ( sca->sortop < scb->sortop )
			return -1;
		else if ( sca->sortop > scb->sortop )
			return 1;
	}
	na = list_length(a);
	nb = list_length(b);
	if ( na < nb )
		return -1;
	else if ( na > nb )
		return 1;
	return 0;
}

static int compare_frame(WindowFrame *a, WindowFrame *b)
{
	int n;
	
	if ( a == b )
		return 0;
	else if ( a == NULL )
		return -1;
	else if ( b == NULL )
		return 1;
	
	if ( a->is_rows && ! b->is_rows )
		return -1;
	else if ( b->is_rows && ! a->is_rows )
		return 1;
	
	n = compare_edge(a->trail, b->trail);
	if ( n != 0 )
		return n;
		
	n = compare_edge(a->lead, b->lead);
	if ( n != 0 )
		return n;
	
	if ( a->exclude < b->exclude )
		return -1;
	else if ( a->exclude > b->exclude )
		return 1;
	
	return 0;
}

static int compare_edge(WindowFrameEdge *a, WindowFrameEdge *b)
{
	if ( a->kind < b->kind )
		return -1;
	else if ( a->kind > b->kind )
		return 1;
	else if ( equal(a->val, b->val) )
		return 0;
	/* When the bounds aren't equal (since they may be expressions) we
	 * just compare pointer values a->val and b->val.  The resulting order,
	 * though arbitrary, is consistent.
	 */
	else if ( ((void*)a->val) < ((void*)b->val) )
		return -1;

	return 1;
}


/* If the node is a Const with a value and we understand the value
 * well enough, make sure it is non-negative.
 *
 * The result is true, if the bound is constant and valid.
 * The result is false if we must delay checking until run time.
 * The function issues an error, if the bound is constant and invalid.
 */
static bool validateBound(Node *node, bool is_rows)
{
	Const *bound;
	Operator tup;
	Type typ;
	Datum zero;
	Oid funcoid;
	bool isNeg;
	
	if ( node == NULL || !IsA(node,Const) )
		return FALSE; /* Can't check here, wait until run time. */
	
	bound = (Const*)node;
	
	tup = ordering_oper(bound->consttype, TRUE);
	if ( !HeapTupleIsValid(tup) )
		return FALSE ; /* Can't check here, wait until run time. */
	
	typ = typeidType(bound->consttype);
	funcoid = oprfuncid(tup);
	zero = stringTypeDatum(typ, "0", exprTypmod(node));
	isNeg = OidFunctionCall2(funcoid, bound->constvalue, zero);
	
	if ( isNeg )						
		ereport(ERROR,
				(errcode(ERROR_INVALID_WINDOW_FRAME_PARAMETER),
				 errmsg("%s parameter cannot be negative", is_rows ? "ROWS" : "RANGE"),
						   errOmitLocation(true)));
	
	ReleaseOperator(tup);
	ReleaseType(typ);
	
	return TRUE;
}


/* Make any necessary adjustments to an ordinary window frame edge to
 * prepare it for later planning stages and for execution.
 *
 * Currently this is just resetting the window frame bound to delayed
 * if the value parameter can't be validated at planning time.  The
 * function issues an error if the value parameter is a negative 
 * constant.
 *
 * The function assumes the frame comes from the parser/rewriter so 
 * it will reject a delayed frame bound.  So don't use this to adjust 
 * edges of special frames such as those created for LAG/LEAD functions.
 */
static WindowFrameEdge *adjustFrameBound(WindowFrameEdge *edge, bool is_rows)
{
	WindowBoundingKind kind;
	
	if ( edge == NULL )
		return NULL;
		
	kind = edge->kind;
	
	if ( kind == WINDOW_BOUND_PRECEDING || kind == WINDOW_BOUND_FOLLOWING )
	{
		if ( validateBound(edge->val, is_rows) )
			;
		else
		{
			edge = copyObject(edge);
			if ( kind == WINDOW_BOUND_PRECEDING )
				edge->kind = WINDOW_DELAYED_BOUND_PRECEDING;
			else
				edge->kind = WINDOW_DELAYED_BOUND_FOLLOWING;
		}
	}
	else if ( edge->kind == WINDOW_BOUND_PRECEDING 
			|| edge->kind == WINDOW_BOUND_FOLLOWING
			|| edge->val != NULL )
	{
		elog(ERROR,"invalid window frame edge");
	}
	
	return edge;
}


/*---------------
 * make_lower_targetlist
 *
 * When window_planner inserts Window, Aggregate, or Result plan nodes
 * above the result of query_planner, we may want to pass a different
 * target list to query_planner than the outer plan nodes should have.
 * This routine generates the correct target list for the subplan.
 *
 * The initial target list passed from the parser already contains entries
 * for all PARTITION BY and ORDER BY expressions. We flatten all expressions
 * except these into their component variables; the other expressions
 * will be computed by the inserted nodes rather than by the subplan.
 * For example, given a query like
 *		SELECT a+b,SUM(c+d) OVER (ORDER BY a+b) FROM table;
 * we want to pass this targetlist to the subplan:
 *		a,b,c,d,a+b
 * where the a+b target will be used by the Sort/Window step, and the
 * other targets will be used for computing the final results.	(In the
 * above example we could theoretically suppress the a and b targets and
 * pass down only c,d,a+b, but it's not really worth the trouble to
 * eliminate simple var references from the subplan.  We will avoid doing
 * the extra computation to recompute a+b at the outer level; see
 * replace_vars_with_subplan_refs() in setrefs.c.)
 *
 * If we are grouping or aggregating, *and* there are no non-Var grouping
 * expressions, then the returned tlist is effectively dummy; we do not
 * need to force it to be evaluated, because all the Vars it contains
 * should be present in the output of query_planner anyway.
 *
 * The targetlist to be passed to the subplan goes in context->lower_tlist.
 *---------------
 */
static void
make_lower_targetlist(Query *parse,
					  WindowContext *context)
{
	List	   *tlist = parse->targetList;
	List	   *extravars = NIL;
	List	   *lower_tlist;
	bool		need_tlist_eval;
	int			i;
	SortClause *dummy;
	ListCell   *lc;

	Assert ( parse->hasWindFuncs );
	
	/* Start with a "flattened" tlist (having just the vars mentioned in 
	 * the targetlist or the window clause --- but no upper-level Vars; 
	 * they will be replaced  by Params later on).
	 */
	lower_tlist = flatten_tlist(tlist);
	
	/* Make sure Vars that may only occur in expression defining 
	 * delayed window frame edge bounds are represented.
	 */
	for ( i = 0; i < context->nspecinfos; i++ )
	{
		WindowFrame *f = context->specinfos[i].frame;
		if ( f == NULL )
			continue;
			
		if ( window_edge_is_delayed(f->trail) )
		{
			extravars = list_concat(extravars, 
							pull_var_clause(f->trail->val, false));
		}
		if ( window_edge_is_delayed(f->lead) )
		{
			extravars = list_concat(extravars, 
							pull_var_clause(f->lead->val, false));
		}
	}
	lower_tlist = add_to_flat_tlist(lower_tlist, extravars, false /* resjunk */);
	list_free(extravars);

	need_tlist_eval = false;
	
	/* Find or add target list entries for partitioning or ordering exprs
	 * mentioned in window specs to be fed by this targetlist. 
	 */
	dummy = makeNode(SortClause);
	for ( i = 0; i < context->nspecinfos; i++ )
	{
		TargetEntry *tle;
		int sortgroupref;
		List *extra_tles = NIL;
		
		bms_foreach ( sortgroupref, context->specinfos[i].partset )
		{
			dummy->tleSortGroupRef = sortgroupref;
			tle = get_sortgroupclause_tle(dummy, tlist);
			extra_tles = lappend(extra_tles, tle);
		}
		foreach ( lc, context->specinfos[i].order )
		{
			SortClause *sc = (SortClause *) lfirst(lc);
			tle = get_sortgroupclause_tle(sc, tlist);
			extra_tles = lappend(extra_tles, tle);
		}
			
		foreach ( lc, extra_tles )
		{
			ListCell *lct;
			TargetEntry *te;
			
			tle = (TargetEntry *)lfirst(lc);
			
			foreach (lct, lower_tlist)
			{
				te = (TargetEntry *)lfirst(lct);
				if (equal(tle->expr, te->expr) && 
					(te->ressortgroupref == tle->ressortgroupref ||
					 te->ressortgroupref == 0))
				{
					break;
				}
			}
			
			if ( lct != NULL )
			{
				/* Found one. */
				if ( te->ressortgroupref != tle->ressortgroupref )
				{
					Assert( te->ressortgroupref == 0 );
					te->ressortgroupref = tle->ressortgroupref;
				}
			}
			else
			{
				/* Need to add one. */
				te = copyObject(tle);
				te->resno = list_length(lower_tlist) + 1;
				lower_tlist = lappend(lower_tlist, te);
				/* NOTE that the addition must be an expr other than 
				 *  a simple Var.  Do we care? */
				need_tlist_eval = true;
			}
		}
		list_free(extra_tles);
	}
	pfree(dummy);
	dummy = NULL;
	
	build_sortref_index(lower_tlist, &context->max_sortref, &context->sortref_resno);

	context->lower_tlist = lower_tlist;
}


static void set_window_keys(WindowContext *context, int wind_index)
{
	WindowInfo *winfo;
	int i, j;
	int skoffset, nextsk;
	ListCell *lc;
	int nattrs;
	AttrNumber* sortattrs = NULL;
	Oid* sortops = NULL;
	
	/* results  */
	int partkey_len = 0;
	AttrNumber *partkey_attrs = NULL;
	List *window_keys = NIL;
	
	Assert( 0 <= wind_index && wind_index < context->nwindowinfos );
	
	winfo = &context->windowinfos[wind_index];

	/* Translate the sort clause to attribute numbers. */
	nattrs = list_length(winfo->sortclause);
	if ( nattrs > 0 )
	{
		sortattrs = (AttrNumber *)palloc(nattrs*sizeof(AttrNumber));
		sortops = (Oid *)palloc(nattrs*sizeof(Oid));
		i = 0;
		foreach ( lc, winfo->sortclause )
		{
			SortClause *sc = (SortClause *)lfirst(lc);
			sortattrs[i] = context->sortref_resno[sc->tleSortGroupRef];
			sortops[i] = sc->sortop;
			i++;
		}
	}
	
	/* Make a separate copy of just the partition key. */
	if ( winfo->partkey_len > 0 )
	{
		partkey_len = winfo->partkey_len;
		partkey_attrs = (AttrNumber *)palloc(partkey_len*sizeof(AttrNumber));
		for ( i = 0; i < partkey_len; i++ )
			partkey_attrs[i] = sortattrs[i];
	}
	
	/* Careful.  Within sort key, parition key may overlap order keys. */
	nextsk = skoffset = winfo->orderkeys_offset;
	
	/* Make a WindowKey per SpecInfo. */
	for ( i = 0; i < winfo->numspecindex; i++ )
	{
		WindowKey *wkey;
		SpecInfo *sinfo = &context->specinfos[winfo->firstspecindex + i];
		int keylen = list_length(sinfo->order);
		
		if ( keylen == 0 )
		{
			if ( sinfo->frame != NULL )
			{
				ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("invalid window specification"),
					 errhint("Only ordered windows may specify ROWS or RANGE framing."),
							   errOmitLocation(true)));
			}

			sinfo->keylevel = -1;  /* No ordering key, just partition key. */
			continue;
		}

		keylen = list_length(sinfo->unique_order);

		wkey = makeNode(WindowKey);
		wkey->numSortCols = (keylen + skoffset) - nextsk;
		
		Assert( wkey->numSortCols >= 0 );
		
		if (  wkey->numSortCols > 0 )
		{
			wkey->sortColIdx = (AttrNumber*)palloc(wkey->numSortCols * sizeof(AttrNumber));
			wkey->sortOperators = (Oid*)palloc(wkey->numSortCols * sizeof(Oid));
			
			for ( j = 0; j < wkey->numSortCols; j++ )
			{
				wkey->sortColIdx[j] = sortattrs[nextsk];
				wkey->sortOperators[j] = sortops[nextsk]; /* TODO SET THIS CORRECTLY!!! */
				nextsk++;
			}
		}

		else
		{
			SortClause *sc;
			
			/* Copy the first ORDER BY key into SortCols. */
			wkey->numSortCols = 1;
			wkey->sortColIdx = (AttrNumber *)palloc(sizeof(AttrNumber));
			wkey->sortOperators = (Oid*)palloc(sizeof(Oid));
			sc = (SortClause *)linitial(sinfo->order);
			wkey->sortColIdx[0] = context->sortref_resno[sc->tleSortGroupRef];
			wkey->sortOperators[0] = sc->sortop;
		}
		
		wkey->frame = copyObject(sinfo->frame);
		sinfo->keylevel = list_length(window_keys); /* WindowKey position. */
		window_keys = lappend(window_keys,  wkey);
	}
	
	winfo->partkey_attrs = partkey_attrs;
	winfo->key_list = window_keys;
}

/* Fill in the fields of the given RefInfo that characterize the window 
 * function represented by its WindowRef. 
 */
static void
lookup_window_function(RefInfo *rinfo)
{
	HeapTuple	tuple;
	Form_pg_proc proform;
	bool isagg, iswin;
	cqContext	*procqCtx;
	Oid transtype = InvalidOid;
	
	Oid fnoid = rinfo->ref->winfnoid;
	
	/* pg_proc */
	procqCtx = caql_beginscan(
			NULL,
			cql("SELECT * FROM pg_proc "
				" WHERE oid = :1 ",
				ObjectIdGetDatum(fnoid)));

	tuple = caql_getnext(procqCtx);

	if (!HeapTupleIsValid(tuple))
		elog(ERROR, "cache lookup failed for function %u", fnoid);
	proform = (Form_pg_proc) GETSTRUCT(tuple);
	isagg = proform->proisagg;
	iswin = proform->proiswin;
	
	if ( (!isagg) && (!iswin) )
	{
		ereport(ERROR,
			(errcode(ERRCODE_SYNTAX_ERROR),
			 errmsg("can not call ordinary function, %s, as window function", NameStr(proform->proname)),
					   errOmitLocation(true)));
	}
	caql_endscan(procqCtx);
	
	Assert( isagg != iswin );
	
	if ( isagg )
	{
		Form_pg_aggregate		 aggform;
		cqContext				*aggcqCtx;

		aggcqCtx = caql_beginscan(
				NULL,
				cql("SELECT * FROM pg_aggregate "
					" WHERE aggfnoid = :1 ",
					ObjectIdGetDatum(fnoid)));
		
		tuple = caql_getnext(aggcqCtx);

		if (!HeapTupleIsValid(tuple))
			elog(ERROR, "cache lookup failed for aggregate function %u", fnoid);
		aggform = (Form_pg_aggregate) GETSTRUCT(tuple);
		
		rinfo->isagg = true;
		rinfo->winkind = WINKIND_AGGREGATE;
		rinfo->hasinvtrans = (aggform->agginvtransfn != InvalidOid);
		rinfo->hasprelim = (aggform->aggprelimfn != InvalidOid);
		rinfo->hasinvprelim = (aggform->agginvprelimfn != InvalidOid);
		transtype = aggform->aggtranstype;

		caql_endscan(aggcqCtx);
		
		/*
		 * If the transition type is pass-by-reference, note the estimated 
		 * size of the value itself, plus palloc overhead.
		 */
		if (!get_typbyval(transtype))
		{
			int32		avgwidth;

			avgwidth = get_typavgwidth(transtype, -1);
			avgwidth = MAXALIGN(avgwidth);

			rinfo->trans_space += avgwidth + 2 * sizeof(void *);
		}
		
	}
	else /* iswin */
	{
		Form_pg_window	 winform;
		cqContext		*wincqCtx;

		wincqCtx = caql_beginscan(
				NULL,
				cql("SELECT * FROM pg_window "
					" WHERE winfnoid = :1 ",
					ObjectIdGetDatum(fnoid)));

		tuple = caql_getnext(wincqCtx);

		if (!HeapTupleIsValid(tuple))
			elog(ERROR, "cache lookup failed for window function %u", fnoid);
		winform = (Form_pg_window) GETSTRUCT(tuple);

		rinfo->winkind = winform->winkind;
		rinfo->needcount = winform->wincount;
		rinfo->winpretype = winform->winpretype;
		rinfo->winfinfunc = winform->winfinfunc;		

		caql_endscan(wincqCtx);
	}
}

/*
 *
 */
static Plan *plan_common_subquery(PlannerInfo *root, List *lower_tlist,
								  List *order_hint,
								  CdbPathLocus *result_locus,
								  List **result_pathkeys
								  )
{
	Plan *result_plan = NULL;
	Path *cheapest_path = NULL;
	Path *sorted_path = NULL;
	Path *best_path = NULL;
	double num_groups = 0.0;
	QualCost tlist_cost;
	
	root->query_pathkeys = NIL;
	root->group_pathkeys = NIL;
	root->sort_pathkeys = NIL;
	
	/* If an order hint is specified, try for it.   */
	if ( order_hint != NIL )
	{
		root->query_pathkeys = make_pathkeys_for_sortclauses(order_hint, lower_tlist);
		root->sort_pathkeys = root->query_pathkeys;
	}


	/* Generate the best unsorted and presorted paths for this Query. */
	query_planner(root, lower_tlist, 0.0, 
				  &cheapest_path, &sorted_path, &num_groups);
					  
	if ( order_hint != NIL &&
		 sorted_path != NULL &&
		 (	CdbPathLocus_IsEntry(sorted_path->locus) ||
			CdbPathLocus_IsSingleQE(sorted_path->locus) ) )
	{
		best_path = sorted_path;
	}
	else
	{
		best_path = cheapest_path;
	}
	
	/* Make the plan. */
	result_plan = create_plan(root, best_path);
	result_plan = plan_pushdown_tlist(result_plan, lower_tlist);
	
	Assert(result_plan->flow);

	/* Account for the cost of evaluation of the lower tlist. */
	cost_qual_eval(&tlist_cost, lower_tlist, root);
	result_plan->startup_cost += tlist_cost.startup;
	result_plan->total_cost +=
		tlist_cost.startup +
		tlist_cost.per_tuple * result_plan->plan_rows;

	/* Set up results. */
	*result_locus = best_path->locus;
	*result_pathkeys = best_path->pathkeys;	
	return result_plan;
}


/* Assure that the given input plan meets the given distribution and sort order
 * requirements -- either by observing that it does or by adding Motion and/or 
 * Sort nodes to meet the requirements.
 *
 * Note that the function currently won't redistribute a plan that meets the
 * collocation requirement, e.g., since a singleton locus collocates on any 
 * partition key, the function won't "spread" such a plan's result across the 
 * segments.  Perhaps we should reconsider this.
 *
 * Returns the input plan, possibly with new nodes on top.  Input and output
 * must have flow declarations.
 *
 * The output locus for the result plan is also returned if output_locus is not NULL.
 */
Plan *assure_collocation_and_order(
		PlannerInfo *root,
		Plan *input_plan,
		int partkey_len,
		AttrNumber *partkey_attrs,
		List *sortclause,
		CdbPathLocus input_locus,
		CdbPathLocus *output_locus,  /*OUT*/
		List **pathkeys_ptr /*OUT*/
		)
{
	Plan *result_plan;
	List *sort_pathkeys = NULL;
	double motion_cost_per_row = (gp_motion_cost_per_row > 0.0) ?
					gp_motion_cost_per_row :
					2.0 * cpu_tuple_cost;

	Assert( input_plan && (input_plan->flow || IsA(input_plan, Motion)) );
	Assert( !CdbPathLocus_IsNull(input_locus));
	Assert( pathkeys_ptr && (*pathkeys_ptr == NIL || IsA(*pathkeys_ptr, List)) );
	
	result_plan = input_plan;
	if (output_locus != NULL)
		*output_locus = input_locus;
	
	if ( sortclause != NIL )
	{
		sort_pathkeys = make_pathkeys_for_sortclauses(sortclause, input_plan->targetlist);
		if ( root != NULL )
			sort_pathkeys = canonicalize_pathkeys(root, sort_pathkeys);
		Assert(sort_pathkeys != NULL);
	}
	
	if ( partkey_len == 0 ) /* Plan for single process locus. */
	{
		/* Assure sort order first */
		if(sort_pathkeys != NULL)
		{
			if(!pathkeys_contained_in(sort_pathkeys, *pathkeys_ptr))
			{
				result_plan = (Plan *) make_sort_from_sortclauses(root, sortclause, result_plan);
				*pathkeys_ptr = sort_pathkeys;
				mark_sort_locus(result_plan);
			}
		}

		/* bring to single locus */
		if( CdbPathLocus_IsPartitioned(input_locus))
		{
			result_plan = (Plan *) make_motion_gather_to_QE(result_plan, (*pathkeys_ptr != NULL));
			result_plan->total_cost += motion_cost_per_row * result_plan->plan_rows;
		}

		Assert(result_plan->flow);
	}
	else  /* Plan for hash distributed locus. */
	{
		List *dist_keys = NIL;
		List *dist_pathkeys = NIL;
		List *dist_exprs = NIL;
		ListCell *lc;
		int n;
		
		/* Get the required distribution path keys. */
		n = partkey_len;
		foreach (lc, sortclause)
		{
			if ( 0 >= n-- ) break;
			dist_keys = lappend(dist_keys, lfirst(lc));
		}
		dist_pathkeys = make_pathkeys_for_sortclauses(dist_keys, input_plan->targetlist);
		if ( root != NULL )
			dist_pathkeys = canonicalize_pathkeys(root, dist_pathkeys);
		
		/* Assure the required distribution. */
		if ( ! cdbpathlocus_collocates(input_locus, dist_pathkeys, false /*exact_match*/) )
		{
			foreach (lc, dist_keys)
			{
				SortClause *sc = (SortClause*)lfirst(lc);
				TargetEntry *tle =  get_sortgroupclause_tle(sc,input_plan->targetlist);
				dist_exprs = lappend(dist_exprs, tle->expr);
			}

			Insist(dist_exprs != NIL); /* since partkey is non-empty */
			
			result_plan = (Plan *) make_motion_hash(root, result_plan, dist_exprs);
			result_plan->total_cost += motion_cost_per_row * result_plan->plan_rows;
			*pathkeys_ptr = NIL; /* no longer sorted */
			Assert(result_plan->flow);

			/*
			 * Change output_locus based on the new distribution pathkeys.
			 */
			if (output_locus != NULL)
				CdbPathLocus_MakeHashed(output_locus, dist_pathkeys);
		}

		if(sortclause != NIL)
		{
			if(! pathkeys_contained_in(sort_pathkeys, *pathkeys_ptr))
			{
				result_plan = (Plan *) make_sort_from_sortclauses(root, sortclause, result_plan);
				*pathkeys_ptr = sort_pathkeys;
				mark_sort_locus(result_plan);
			}
		}
	}

	return result_plan;
}

/* Assure that the given input plan meets the sort order requirements 
 * by adding a Sort node, if necessary.
 *
 * Returns the input plan, possibly with a new Sort node on top.  
 * Input and output must have flow declarations.
 */
Plan *assure_order(
		PlannerInfo *root,
		Plan *input_plan,
		List *sortclause,
		List **pathkeys_ptr)
{
	Plan *result_plan;
	List *sort_pathkeys = NULL;

	Assert( input_plan && (input_plan->flow || IsA(input_plan, Motion)) );
	Assert( pathkeys_ptr && (*pathkeys_ptr == NIL || IsA(*pathkeys_ptr, List)) );
	
	result_plan = input_plan;

	if ( sortclause != NIL )
	{
		sort_pathkeys = make_pathkeys_for_sortclauses(sortclause, input_plan->targetlist);
		if ( root != NULL )
			sort_pathkeys = canonicalize_pathkeys(root, sort_pathkeys);
		Assert(sort_pathkeys != NULL);
	}

	if(sort_pathkeys != NULL)
	{
		if(!pathkeys_contained_in(sort_pathkeys, *pathkeys_ptr))
		{
			result_plan = (Plan *) make_sort_from_sortclauses(root, sortclause, result_plan);
			*pathkeys_ptr = sort_pathkeys;
			mark_sort_locus(result_plan);
		}
	}

	return result_plan;
}


Expr *make_mergeclause(Index varno, AttrNumber attrno, Node *expr)
{
	Oid type = exprType(expr);
	Node *lft = (Node*)makeVar(varno, attrno, type, -1, 0);
	Node *rgt = copyObject(expr);
	Expr *xpr = make_op(NULL, list_make1(makeString("=")), lft, rgt, -1);
	
	xpr->type = T_DistinctExpr;
	
	return make_notclause(xpr);
}


/* Plan a window query that can be implemented without any joins or 
 * input sharing, i.e., it involves one scan of the input plan result
 * and a single Window node.
 *
 * Returns plan and, implicitly, pathkeys.
 */
static Plan *plan_trivial_window_query(PlannerInfo *root, WindowContext *context, List **pathkeys_ptr)
{
	Plan *result_plan;
	WindowInfo *winfo;
	List *lower_tlist = context->lower_tlist;
	CdbPathLocus input_locus, output_locus;
	List *pathkeys = NIL;
	List *window_pathkeys = NIL;	
	
	Assert ( pathkeys_ptr );
	Assert (context->nwindowinfos == 1);
	winfo = context->windowinfos;


	result_plan = plan_common_subquery(root, lower_tlist,
								  winfo->sortclause, /* order hint */
								  &input_locus,
								  &pathkeys);
								  
	
	window_pathkeys = make_pathkeys_for_sortclauses(winfo->sortclause, lower_tlist);
	window_pathkeys = canonicalize_pathkeys(root, window_pathkeys);
	
	/* Assure needed colocation and order. */
	result_plan = assure_collocation_and_order(root,
											   result_plan,
											   winfo->partkey_len,
											   winfo->partkey_attrs,
											   winfo->sortclause,
											   input_locus,
											   NULL,
											   &pathkeys);
	
	/* Add the single Window node. */
	result_plan = (Plan*) make_window(root, context->upper_tlist, 
									  winfo->partkey_len, winfo->partkey_attrs,
									  winfo->key_list, /* XXX copy? */
									  result_plan);
	
	/* 
	 * Add a Flow node to the top node, if it doesn't have one yet.
	 * Retrieve distribution info from the next-to-top node which
	 * must have a Flow node.
	 */
	if (!result_plan->flow)
		result_plan->flow = pull_up_Flow(result_plan, 
										 result_plan->lefttree, 
										 (pathkeys != NIL));

	/* TODO Check our API.
	 *
	 * Note: locus and pathkeys may be an important implicit result. 
	 */
	CdbPathLocus_MakeSingleQE(&output_locus); /* We don't partition yet. */
	
	*pathkeys_ptr = pathkeys;
	return result_plan;
}


/* Construct a sequential plan for a non-trivial window query.  A non-trivial
 * window query is one that involves more than one partition/sort order or
 * that contains deferred window functions.
 *
 * A sequential plan implements the query as a sequence of stages, each of
 * which adds partial results corresponding to window functions to the 
 * relation under contruction.  At the top these partial results are
 * combined to produce the final result.
 * 
 * Aggregations (for deferred window functions or unordered aggregate window
 * functions) are immediately joined back into the relation under construction
 * via a locally generated MergeJoin, i.e., the join optimizer isn't involved.
 *
 * Returns plan and, implicitly, pathkeys.
 */
static Plan *plan_sequential_window_query(PlannerInfo *root, WindowContext *context, List **pathkeys_ptr)
{
	Plan *result_plan;
	CdbPathLocus locus;
	List *pathkeys = NIL;
	List *targetlist;
	List *sort_hint;
	int hi, lo;
	TargetEntry *tle;
	AttrNumber resno;
	ListCell *lc;
	QualCost tlist_cost;
		
	Assert ( pathkeys_ptr != NULL );
	Assert ( context->original_range );
	
	/* Plan common subquery. */
	sort_hint = context->windowinfos[context->nwindowinfos-1].sortclause;
	result_plan = plan_common_subquery(root, context->lower_tlist,
									   sort_hint,
									   &locus,
									   &pathkeys);
	
	/* Record common subquery information in context. 
	 *
	 * It's not clear that we need this for the sequential strategy.
	 * Set the input parse tree for plan_sequential_stage().
	 */
	context->subplan = result_plan;
	context->subplan_locus = locus;
	context->subplan_pathkeys = pathkeys;
	context->subquery = copy_common_subquery(root->parse, context->lower_tlist);
		
	/* Be tidy: no row key used in the sequential strategy. */
	context->rowkey_len = 0;
	context->rowkey_attrs = NULL;	
	context->keyed_lower_tlist = NULL;	
	
	/* Initialize an array to convert the index of a Var from the lower 
	 * range to its position (resno) in the target list we are producing.  
	 * This will be used later by translate_upper_tlist_sequential to mutate 
	 * Vars on the original range table (from the original target list) to 
	 * Vars on the final range table.
	 */	
	resno = 0;
	context->offset_upper_varattrnos = palloc0(context->offset_lim * sizeof(AttrNumber));
	
	foreach ( lc, context->lower_tlist )
	{
		tle = (TargetEntry *)lfirst(lc);
		resno++;
		
		/* This assumption later allows us to copy key attributes. */
		Assert( resno == tle->resno ); 
		
		if ( IsA(tle->expr, Var) )
		{
			Var *var = (Var*)tle->expr;
			context->offset_upper_varattrnos[index_of_var(var,context)] = resno;
		}
	}
	
	/* 
	 * A stage is a set of WindowInfos with the same partitioning.
	 *
	 * We construct plan stages from the last window info to the first.
	 * This is because the window info sort places non-partitioned windows 
	 * first and we prefer to process these last. 
	 */
	for ( hi = context->nwindowinfos - 1; hi >= 0; hi = lo - 1 )
	{
		int j;
		Bitmapset *partset = context->specinfos[context->windowinfos[hi].firstspecindex].partset;

		lo = hi;		
		for ( j = hi-1; j  >= 0; j-- )
		{
			WindowInfo *winfo = &context->windowinfos[j];
			
			if ( ! bms_equal( context->specinfos[winfo->firstspecindex].partset, partset ) )
				break;
				
			lo = j;
		}
		
		/* Plan a stage for WindowInfos from hi to lo. */
		result_plan = plan_sequential_stage(root, 
											context, 
											hi, lo, 
											result_plan, 
											&locus, 
											&pathkeys
											);
	}
			
	/* Mutate the upper target list to compute the final result. */
	targetlist = translate_upper_tlist_sequential(context->upper_tlist, 
												  result_plan->targetlist, 
												  context);
	
	/* Adjust root so the table expression portion of the parse tree looks 
	 * like the output of the final stage. */
	root->parse->rtable = context->subquery->rtable;
	root->parse->jointree = context->subquery->jointree;

	/*
	 * Since rtable may be changed, reconstruct RelOptInfo nodes for all base relations
	 * in the root query to match this change.
	 */
	if (root->simple_rel_array)
		pfree(root->simple_rel_array);
	root->simple_rel_array_size = list_length(root->parse->rtable) + 1;
	root->simple_rel_array = (RelOptInfo **)
		palloc0(root->simple_rel_array_size * sizeof(RelOptInfo *));
  if (root->simple_rte_array) pfree(root->simple_rte_array);
  root->simple_rte_array = (RangeTblEntry **)palloc0(
      sizeof(RangeTblEntry *) * root->simple_rel_array_size);
  int i = 1;
  ListCell *l;
  foreach (l, root->parse->rtable)
    root->simple_rte_array[i++] = lfirst(l);
  add_base_rels_to_query(root, (Node *)root->parse->jointree);

	/* XXX I don't think the quals are used later so no need to translate. 
	 *     They would either be empty or represent a join on the last
	 *     partitioning key processed by plan_sequential_stage.
	 *
	 * XXX There won't be any grouping stuff because of the semantic
	 *     transformation that pushes grouping below windowing.  There
	 *     may, however, be expression to translate in the limit/offset
	 *     clauses.  Distinct and sort use sort/group refs which are
	 *     preserved in the targetlist.
	 */

	/* Set target list and adjust costs since it may have expressions.  
	 *
	 * XXX Could track this and avoid, but not yet.
	 */
	root->parse->targetList = targetlist;
	result_plan = plan_pushdown_tlist(result_plan, targetlist);
	cost_qual_eval(&tlist_cost, targetlist, root);
	result_plan->startup_cost += tlist_cost.startup;
	result_plan->total_cost += tlist_cost.startup + tlist_cost.per_tuple * result_plan->plan_rows;

	Assert( result_plan->flow != NULL );

	*pathkeys_ptr = pathkeys;
	return result_plan;
}


/* Plan a single stage of a sequential window query plan.
 *
 * By construction, all the window functions computed by a stage have the 
 * same PARTITION BY specification.  They are represented by contiguous
 * WindowInfo structures (from lo_windex to hi_windex) in the WindowContext.
 *
 * A stage comprises a chain of Window nodes (with an initial motion and 
 * intervening sorts as required) and, if needed, an Agg node which is 
 * merge-joined to the head of the Window node chain on the common partition 
 * key.  By construction, this key is the high-order term of any sorts.
 *
 * Thus a simple (no Agg) stage looks like
 *
 *   input->Motion->Sort->Window->...->Sort->Window->
 * 
 * With aggregation, this becomes
 *
 *   input->Motion->Sort->Share->Window->...->Sort->Window-+->Join->
 *                            |                            |
 *                            +------------>Agg------------+
 *
 * The input plan is the window plan to date.  Some invariants are
 * - the lower target list is a prefix of the input plan target list
 * - the lower range table is a prefix of the input plan range table.
 * The input plan may have additional targets and range table entries
 * added by previous planning stages. 
 */
static Plan *plan_sequential_stage(PlannerInfo *root, 
								   WindowContext *context, 
								   int hi_windex, int lo_windex, 
								   Plan *input_plan,
								   CdbPathLocus *locus_ptr,
								   List **pathkeys_ptr )
{
	Plan *window_plan;
	Plan *agg_plan = NULL;
	WindowInfo *winfo;
	int i;
	ListCell *lc;
	Index win_varno = 1;
	Index agg_varno = 2;
	AttrNumber aux_attrno = 0;
	AttrNumber agg_attrno = 0;
	List * win_names = NIL; /* for Window sub-query RTEs */
	List * agg_names = NIL; /* for Agg sub-query RTE */
	List * join_tlist = NIL;
	Query *agg_subquery;
	bool hasaux, hasagg;
	CdbPathLocus input_locus = *locus_ptr;

	/* In-Out parameters */
	List *pathkeys = *pathkeys_ptr;
	
	/* elog(NOTICE, "Start of plan_sequential_stage()."); */

	/* Determine whether this stage requires external aggregation and an
	 * auxilliary aggregate. 
	 */
	{
		hasaux = false;
		hasagg = false;

		for ( i = lo_windex; i <= hi_windex; i++ )
		{
			winfo = &context->windowinfos[i];
			hasaux = hasaux || winfo->needauxcount;
			hasagg = hasagg || hasaux || winfo->needpartkey;
		}
	}
	
	/* Derive names for the targets in the initial Window subquery and,
	 * if we'll use a join, set up the beginning of the join targetlist. 
	 */
	i = 1;
	foreach ( lc, input_plan->targetlist )
	{
		Value *tle_name;
		TargetEntry *tle = (TargetEntry*)lfirst(lc);
		AttrNumber resno = i++;
		
		StringInfo buf = makeStringInfo();
		appendStringInfo(buf, "unnamed_attr_%d", resno);
		
		tle_name = get_tle_name(tle, context->subquery->rtable, buf->data);
		win_names = lappend(win_names, tle_name);
		
		pfree(buf->data);
		pfree(buf);
		buf = NULL;
		
		if ( hasagg )
		{
			Var *join_var = makeVar(win_varno, resno, 
									exprType((Node*)tle->expr), 
									exprTypmod((Node*)tle->expr), 0);
			
			char *cname = pstrdup(tle_name->val.str);
									
			TargetEntry *join_tle = makeTargetEntry((Expr*)join_var,
													resno,
													cname,
													false);
			join_tle->ressortgroupref = tle->ressortgroupref;
			join_tlist = lappend(join_tlist, join_tle);
		}
	}


	/*
	 * Assure collocation on the partition key and ordering for the first
	 * WindowInfo in the stage.
	 */
	winfo = &context->windowinfos[lo_windex];
	window_plan = assure_collocation_and_order(root,
											   input_plan,
											   winfo->partkey_len,
											   winfo->partkey_attrs,
											   winfo->sortclause,
											   input_locus,
											   locus_ptr,
											   &pathkeys);
	
	if ( hasagg )
	{
		AggStrategy strategy = AGG_PLAIN;
		long num_groups = 1;
		AttrNumber *grpcolidx = NULL;
		List *share_partners = NIL;
		List *tlist = NIL;
			
		/* elog(NOTICE, "Fn plan_sequential_stage(): Preparing for Agg."); */

		/* Since we'll be encountering some Agg targets.  Prepare for that 
		 * by sharing the input window plan locally (within the slice) so
		 * we can develop an Agg and a Window chain within the stage. 
		 */
		share_partners = share_plan(root, window_plan, 2);
		window_plan = list_nth(share_partners, 0);
		agg_plan = list_nth(share_partners, 1);
		list_free(share_partners);
		
		/* An agg plan will have a base subquery like the first window plan. */
		agg_subquery = copyObject(context->subquery);
		
		/* If there's a partition key, we'll group on that, else default
		 * to a plain aggregation.
		 */
		if ( winfo->partkey_len > 0 )
		{
			size_t sz;
			
			/* Estimate the number of parts in the partition.  Note that
			 * we use the global root for this, but retain only the estimate.
			 */
			{
				int i;
				double d;
				List *part_exprs = NIL;
			
				for ( i = 0; i < winfo->partkey_len; i++ )
				{
					TargetEntry *tle; 
					tle = get_tle_by_resno(context->lower_tlist,
										   winfo->partkey_attrs[i]);

					part_exprs = lappend(part_exprs, tle->expr);
				}
				d = estimate_num_groups(root, part_exprs, window_plan->plan_rows);
				num_groups = (d<0)? 0: (d>LONG_MAX)? LONG_MAX: (long)d;
				list_free(part_exprs);
			}

			/* Set up for grouped aggregation */
			strategy = AGG_SORTED;
			sz = winfo->partkey_len * sizeof(AttrNumber);
			grpcolidx = (AttrNumber*)palloc(sz);
			for ( i = 0; i < winfo->partkey_len; i++ )
			{
				TargetEntry *tle; 			
				tle = get_tle_by_resno(window_plan->targetlist, 
									   winfo->partkey_attrs[i]);

				tle = copyObject(tle);
				grpcolidx[i] = tle->resno;
				tle->resno = i+1;
				tlist = lappend(tlist, tle);
				agg_names = lappend(agg_names,
								    makeString(pstrdup(tle->resname? 
											   tle->resname:
											   "part_key")));
			}			
		}
		
		/* If any WindowRefs need a partition count (auxiliary aggregate)
		 * add a target for it and remember where it is.
		 */
		if ( hasaux )
		{
			TargetEntry *tle;
			Aggref *aggref = makeAuxCountAggref();
			aux_attrno = 1 + list_length(tlist);
			tle = makeTargetEntry((Expr*)aggref, aux_attrno, pstrdup("partition_count"), false);
			
			tlist = lappend(tlist, tle);
			agg_names = lappend(agg_names, makeString(pstrdup(tle->resname)));
		}
		
		/* Put the Agg node atop the Agg plan. It's targetlist is, however,
		 * incomplete at this point.  We'll fill it in as we go.
		 */
		agg_plan = (Plan *)make_agg(
					root,
					tlist, /* just partkey (if any) at this point */
					NIL, /* qual */
					strategy, false,
					winfo->partkey_len, 
					grpcolidx,
				    num_groups,
				    0, /* num_nullcols */
				    0, /* input_grouping */
				    0, /* grouping_id */
				    0, /* rollup_gs_times */
					0, /* numAggs */
					0, /* transSpace */
					agg_plan /* now just the shared input */
					);
		
		agg_plan->flow = pull_up_Flow(agg_plan, agg_plan->lefttree, true);
		
		/* Later we'll package this Agg plan as the second sub-query RTE
		 * in a fake Query representing a two-way join of Window sub-query 
		 * to Agg sub-query. We keep track of the attribute number (resno) 
		 * of the next Aggref target we'll add to the Agg plan.
		 */
		 agg_attrno = 1 + list_length(agg_plan->targetlist);
	}
	else
	{
		agg_subquery = NULL; /* tidy */
	}
	
	/* Put Window node (and any required Sort) atop the Window plan for each
	 * WindowInfo in the stage.
	 */
	for ( i = lo_windex; i <= hi_windex; i++ )
	{
		int j;
		winfo = &context->windowinfos[i];

		if ( i > lo_windex )
		{
			/* [Re]condition input for this window info.  At most this may
			 * add a Sort node, but the partitioning and the partitioning
			 * sort order will be preserved so we don't need to reSort the
			 * agg_plan (if any).
			 *
			 * XXX If we had a partitioned Sort operator, we could use
			 *     it here on a partitioned window plan.
			 */
			window_plan = assure_order(root,
									   window_plan,
									   winfo->sortclause,
									   &pathkeys);
		}
		 
		/* Initialize a Window node for this WindowInfo. 
		 */
		{
			size_t sz = winfo->partkey_len * sizeof(AttrNumber);
			AttrNumber *partkey = NULL;
			
			/* elog(NOTICE, "Fn plan_sequential_stage(): Adding Window node."); */

			if ( winfo->partkey_len > 0 )
			{
				partkey = (AttrNumber*)palloc(sz);
				memcpy(partkey, winfo->partkey_attrs, sz);
			}
			
			window_plan = (Plan *)make_window(
							root,
							copyObject(window_plan->targetlist),
							winfo->partkey_len,
							partkey,
							(List*) translate_upper_vars_sequential((Node*)winfo->key_list, context), /* XXX mutate windowKeys to translate any Var nodes in frame vals. */
							window_plan);
							
			window_plan->flow = pull_up_Flow(window_plan, window_plan->lefttree, true);
		}

		
		/* Add a target to the window plan (and, if needed, the agg plan)
		 * for each RefInfo controlled by each SpecInfo of the current
		 * WindowInfo.
		 */
		for ( j = 0; j < winfo->numspecindex; j++ )
		{
			ListCell *lc;
			SpecInfo *sinfo = &context->specinfos[j+winfo->firstspecindex];
			
			/* Per RefInfo */
			foreach ( lc, context->refinfos )
			{
				WindowRef *ref;
				Aggref *aggref;
				AttrNumber win_resno;
				Var *var;
				TargetEntry *tle;
				RefInfo *rinfo = (RefInfo *)lfirst(lc);
				
				if ( rinfo->specindex != sinfo->specindex )
					continue;
					
				ref = (WindowRef*)translate_upper_vars_sequential((Node*)rinfo->ref, context); /* XXX mutate ref's args to translate any Vars */
				win_resno = 1 + list_length(window_plan->targetlist);
				rinfo->resno = win_resno;


				/* Finish target based on function type. */
				if ( RefInfo_AggregateUnordered(rinfo, context) )
				{
					/* Plan unordered aggregate */
					char *agg_resname;
					AttrNumber agg_resno = agg_attrno++;
					
					/* Add target to window plan. */
					tle = makeTargetEntry((Expr*)makeNullConst(ref->restype, -1),
										  win_resno, 
										  pstrdup("dummy"), 
										  false);
					window_plan->targetlist = lappend(window_plan->targetlist, tle); 
					win_names = lappend(win_names, get_tle_name(tle, context->subquery->rtable, NULL));
				
					
					/* Add target to the aggregation plan */
					aggref = makeWindowAggref(ref);
					agg_resname = get_function_name(aggref->aggfnoid, "partition_aggregate");					

					tle = makeTargetEntry((Expr*)aggref, agg_resno, agg_resname, false);
					agg_plan->targetlist = lappend(agg_plan->targetlist, tle);
					agg_names = lappend(agg_names, makeString(pstrdup(tle->resname)));
					
					Assert( list_length(agg_names) == list_length(agg_plan->targetlist) );
					Assert( agg_resno == list_length(agg_plan->targetlist) );
					
					/* Result is reference to joined aggregate value. */
					var = makeVar(agg_varno, agg_resno, ref->restype, -1, false);
					tle = makeTargetEntry((Expr *)var, win_resno, 
										   get_function_name(ref->winfnoid, "window_function"), 
										   false);
					join_tlist = lappend(join_tlist, tle);
				}
				else if ( RefInfo_WindowDeferred(rinfo, context) )
				{
					/* Plan deferred window function */
					FuncExpr *func;
					Var *win_var;
					Var *aux_var;
					Oid counttype = 20; /* TODO count(*) type in pg_proc */
					Oid restype = ref->restype;
					
					/* Adjust WindowRef */
					ref->winstage = WINSTAGE_PRELIMINARY;
					ref->restype = rinfo->winpretype;
					
					/* Add target to window plan. */
					tle = makeTargetEntry((Expr*)ref, 
										  win_resno, 
										  get_function_name(ref->winfnoid, "window_function"), 
										  false);
					window_plan->targetlist = lappend(window_plan->targetlist, tle); 
					win_names = lappend(win_names, get_tle_name(tle, context->subquery->rtable, NULL));
					
					/* Result is finalization of partial window with joined auxiliary count. */
					win_var = makeVar(win_varno, win_resno, ref->restype, -1, false);					
					aux_var = makeVar(agg_varno, aux_attrno, counttype , -1, false);
					func = makeFuncExpr(rinfo->winfinfunc, restype, list_make2(win_var, aux_var), COERCE_DONTCARE);
					tle = makeTargetEntry((Expr*)func, win_resno,
										   get_function_name(ref->winfnoid, "window_function"),
										   false);
					join_tlist = lappend(join_tlist, tle);
				}
				else if ( RefInfo_WindowImmediate(rinfo, context) || 
						  RefInfo_AggregateOrdered(rinfo, context) )
				{
					/* Plan immediate window function or ordered window agg) */
					Var *win_var;
					
					/* Adjust WindowRef */
					ref->winstage = WINSTAGE_IMMEDIATE;

					/* Add target to window plan. */
					tle = makeTargetEntry((Expr*)ref, 
										  win_resno, 
										  get_function_name(ref->winfnoid, "window_function"), 
										  false);
					window_plan->targetlist = lappend(window_plan->targetlist, tle); 
					win_names = lappend(win_names, get_tle_name(tle, context->subquery->rtable, NULL));
				
					/* If the stage has a join, add a join target. */
					if ( hasagg )
					{
						win_var = makeVar(win_varno, win_resno, ref->restype, -1, false);
						tle = makeTargetEntry((Expr*)win_var, win_resno,
											  get_function_name(ref->winfnoid, "window_function"),
											  false);
						join_tlist = lappend(join_tlist, tle);
					}
				}
			}
		}
		
		/* Now the next Window node in this stage's chain is ready. 
		 * Package it as a subquery RTE for use in the phony query we'll 
		 * produce as the next level in the stage.  This will be either 
		 * a simple select from one subquery (in the case of an intermediate
		 * level or a top-level with no aggregation), or a join of this RTE
		 * and the Agg RTE produced at the end of this stage. 
		 */
		{
			Query *wquery;
			
			window_plan = wrap_plan(root, window_plan, 
									context->subquery, &pathkeys, 
									"coplan", win_names, 
									&wquery);
			
			context->subquery = wquery; /* the "input query" for the next time through */
			context->original_range = false; /* range is now our introduced range */
		}
	}
	
	
	if ( hasagg )
	{
		Plan *join_plan = NULL;	
		
		agg_plan = add_join_to_wrapper(root, agg_plan, agg_subquery, join_tlist,
									   winfo->partkey_len,
									   "coplan", agg_names,
									   context->subquery);
		agg_subquery = NULL;
		
		/* Finally, join the aggregation query to the main query.  
		 * This will be either be cartesian product (NestLoop) or 
		 * a many-to-one join on the partitioning key (MergeJoin).
		 * In both cases, the aggregation query should be the outer
		 * plan since this is more efficient for joining a single
		 * tuple (outer) to a sequence of consecutive tuples (inner).
		 *
		 * Make any necessary adjustments to root->parse (e.g., push 
		 * a Query into  its rangetable).
		 *
		 * Return the result in window_plan
		 */		
		 if ( winfo->partkey_len > 0 )
		 {
			List *mergeclauses = NIL;
			
			for ( i = 0; i < winfo->partkey_len; i++ )
			{
				TargetEntry *tle;
				Expr *mc;
				Node *rgt;
				
				tle = get_tle_by_resno(window_plan->targetlist, winfo->partkey_attrs[i]);
				rgt = (Node*)tle->expr;

				mc = make_mergeclause(agg_varno, i+1, copyObject(rgt));
				mergeclauses = lappend(mergeclauses, mc);
			}
			
			join_plan = (Plan *)make_mergejoin(join_tlist,
											   NIL, NIL,
											   mergeclauses,
											   agg_plan, window_plan,
											   JOIN_INNER);
			((MergeJoin*)join_plan)->unique_outer = true;
		 }
		 else
		 {
			/* Cartesian product: 1 x MANY. */
			join_plan = (Plan *)make_nestloop(join_tlist,
											   NIL, NIL,
											   agg_plan, window_plan,
											   JOIN_INNER);
			((NestLoop*)join_plan)->singleton_outer = true;
		 }
		join_plan->startup_cost = agg_plan->startup_cost + window_plan->startup_cost;
		join_plan->plan_rows = window_plan->plan_rows;
		join_plan->plan_width = agg_plan->plan_width + window_plan->plan_width;
		
		join_plan->total_cost = agg_plan->total_cost + window_plan->total_cost;
		join_plan->total_cost += cpu_tuple_cost * join_plan->plan_rows;
		
		join_plan->flow = pull_up_Flow(join_plan, join_plan->righttree, true);
		window_plan = join_plan;
	}	

	*pathkeys_ptr = pathkeys;

	/* elog(NOTICE, "End of plan_sequential_stage()."); */
	return window_plan;
}


/* Construct a parallel plan for a non-trivial window query.  A non-trivial
 * window query is one that involves more than one partition/sort order or
 * that contains deferred window functions.
 *
 * A parallel plan implements the query as a top-level join of a set of
 * independent "coplans" that compute portions of the final target list.
 * The coplans usually take their input from the common subquery via input
 * sharing.
 *
 * Returns plan and, implicitly, pathkeys.
 */
static Plan *plan_parallel_window_query(PlannerInfo *root, WindowContext *context, List **pathkeys_ptr)
{
	Plan *subplan;
	Plan *result_plan;
	List *result_pathkeys;
	CdbPathLocus input_locus;
	List *pathkeys = NIL;
	List *targetlist;
	List *rtable;
	FromExpr *jointree;
	
	int i;
	
	Assert ( pathkeys_ptr != NULL );

	subplan = plan_common_subquery(root, context->lower_tlist,
								  NULL, /* order hint */
								  &input_locus,
								  &pathkeys);
	
	/* If necessary, add row key to the lower plan. */
	if ( context->nwindowinfos > 1 )
	{
		AttrNumber resno;
		ListCell *lc;
		List *lower_tlist = copyObject(context->lower_tlist);
		List *rowkey_tlist = make_rowkey_targets();
		
		context->rowkey_len = list_length(rowkey_tlist);
		context->rowkey_attrs = palloc(context->rowkey_len * sizeof(AttrNumber));
			
		resno = list_length(lower_tlist); /* XXX safe? */
		i = 0;
		foreach ( lc, rowkey_tlist )
		{
			TargetEntry *tle = (TargetEntry *)lfirst(lc);
			tle->resno = ++resno;
			context->rowkey_attrs[i++] = resno;
		}
		lower_tlist = list_concat(lower_tlist, rowkey_tlist);
		context->keyed_lower_tlist = lower_tlist;
		
		subplan = (Plan*) make_window(root, lower_tlist, 
										  0, NULL, /* No paritioning */
										  NIL, /* No ordering */
										  subplan);
		if (!subplan->flow)
			subplan->flow = pull_up_Flow(subplan, 
											 subplan->lefttree, 
											 (pathkeys != NIL));
	}
	else
	{
		context->rowkey_len = 0;
		context->rowkey_attrs = NULL;	
		context->keyed_lower_tlist = context->lower_tlist;	
	}
	
	/* Record common subquery information in context. */
	context->subplan = subplan;
	context->subplan_locus = input_locus;
	context->subplan_pathkeys = pathkeys;
	
	/* Generate coplans for each WindowInfo */								  								  
	context->coplan_count = 0;
	
	for ( i = 0; i < context->nwindowinfos; i++ )
	{
		plan_windowinfo_coplans(root, context, i);
	}
	
	/* Construct the range table for the join query to be constructed. */
	rtable = plan_window_rtable(root, context);
	
	/* Mutate the upper target list to refer to the new range table. */
	targetlist = translate_upper_tlist_parallel(context->upper_tlist, context);
	
	/* Arrange to join the coplans back together. */
	jointree = plan_window_jointree(root,  context);
	
	/* XXX This is the evil part!
	 *
	 * Modify the input query to look like a new upper level join
	 * query.  Retain what's above  the jointree (ordering, etc)
	 * from the original.  This works because the targetlist is
	 * "conformable" with the original w.r.t. sort/group refs,
	 * resnos, types, etc.
	 *
	 * We don't need to descend into the range table to fix varlevelsup
	 * due to the new query level we're adding, because the range table
	 * has already been planned. (Right?)
	 */
	root->parse->targetList = targetlist;
	root->parse->rtable = rtable;
	root->parse->jointree = jointree;
	root->parse->windowClause = NIL;
	/*
	 * since we modify the upper level query, the in_info_list is not valid
	 * anymore, and needs to be released (MPP-21017)
	 */
	list_free(root->in_info_list);
	root->in_info_list = NIL;
	
	/* Plan the join.
	 */
	{
		Path *cheapest_path;
		Path *best_path;
		Path *sorted_path;
		double ngroups;
		QualCost tlist_cost;
						
		root->group_pathkeys = NIL;
		root->sort_pathkeys = 
			make_pathkeys_for_sortclauses(root->parse->sortClause, targetlist);
		root->query_pathkeys = root->sort_pathkeys;
		
		query_planner(root, targetlist, 0.0, &cheapest_path, &sorted_path, &ngroups);
		
		if ( sorted_path != NULL )
			best_path = sorted_path;
		else
			best_path = cheapest_path;
		
		result_plan = create_plan(root, best_path);
		result_pathkeys = best_path->pathkeys;
		
		/* Adjust target list (create_plan returns a flat one) and costs
		 * since our target list may have expressions.  (Could track this
		 * and avoid, but not yet.)
		 */
		result_plan = plan_pushdown_tlist(result_plan, targetlist);
		cost_qual_eval(&tlist_cost, targetlist, root);
		result_plan->startup_cost += tlist_cost.startup;
		result_plan->total_cost += tlist_cost.startup + tlist_cost.per_tuple * result_plan->plan_rows;

		Assert( result_plan->flow != NULL );
	}

	*pathkeys_ptr = result_pathkeys;
	return result_plan;
}

/* Plan the coplans for a single WindowInfo. Results are stored back in the
 * indicated WindowInfo (fields window_coplan and agg_coplan).  The context
 * is updated appropriately (e.g., the Var translation map is kept current).
 */
static void plan_windowinfo_coplans(PlannerInfo *root, WindowContext *context, int window_index)
{
	Coplan *window_coplan = NULL;
	Coplan *agg_coplan = NULL;
	int i;
	ListCell *lc;
	WindowInfo *winfo = context->windowinfos + window_index;
	
	/* There's always a window coplan. If there is more than one WindowInfo, 
	 * we must make sure they all contain the row key.
	 *
	 * XXX Except for the first (required) window coplan, we don't really
	 *     need a window coplan when there's only a widow aggregate over
	 *     an unpartitioned, unordered window.  Currently, we ignore
	 *     this optimization and always include one.
	 */
	window_coplan = makeCoplan(COPLAN_WINDOW, context);
	
	/* There's an agg coplan only if there are unordered aggregates or
	 * deferred strategy window functions.  The agg coplan will join to
	 * its window coplan on the partition key, if one exists, or by 
	 * cartesian product.
	 */
	if ( winfo->needpartkey || winfo->needauxcount )
		agg_coplan = makeCoplan(COPLAN_AGG, context);
	
	if ( window_index == 0 )
	{
		/* First window coplan is special.  It contains the entire lower
		 * target list along with the row key, if any.  Subsequent window 
		 * coplans are sparse.  They contain only those targets needed 
		 * locally by their WindowInfo.
		 *
		 * Note that, at this point, the upper and lower target lists refer 
		 * to the same range -- that of the common subquery.  But soon the 
		 * upper list will refer to the middle range we are about to build.  
		 *
		 * Here, we initialize the Var translation map used later by
		 * translate_upper_tlist_parallel to adjust upper Vars to reference 
		 * the new middle range. During processing we add an entry to this 
		 * map for the entries corresponding to plain Vars.  By construction, 
		 * this will include an entry for each distinct Var in the range.
		 */		
		context->offset_upper_varattrnos = palloc0(context->offset_lim * sizeof(AttrNumber));
		
		foreach ( lc, context->keyed_lower_tlist )
		{
			TargetEntry *tle = (TargetEntry *)lfirst(lc);
			AttrNumber resno = addTargetToCoplan((Node*)tle, window_coplan, context);
			
			/* This assumption used below to copy key attributes. */
			Assert( resno == tle->resno ); 
			
			if ( IsA(tle->expr, Var) )
			{
				Var *var = (Var*)tle->expr;
				context->offset_upper_varattrnos[index_of_var(var,context)] = resno;
			}
		}

		/* The attribute numbers of keys are the same as in the lower target 
		 * list, so we can just copy them. 
		 */
		if ( context->rowkey_len > 0 )
		{
			size_t sz = context->rowkey_len * sizeof(AttrNumber);
			window_coplan->rowkey_len = context->rowkey_len;
			window_coplan->rowkey_attrs = palloc(sz);
			memcpy(window_coplan->rowkey_attrs, context->rowkey_attrs, sz);
		}

		if ( winfo->partkey_len > 0 )
		{
			size_t sz = winfo->partkey_len * sizeof(AttrNumber);
			window_coplan->partkey_attrs = palloc(sz);
			memcpy(window_coplan->partkey_attrs, winfo->partkey_attrs, sz);
		}
			
		if ( (agg_coplan != NULL) && winfo->partkey_len > 0 )
		{
			size_t sz = winfo->partkey_len * sizeof(AttrNumber*);
			agg_coplan->partkey_len = winfo->partkey_len;
			agg_coplan->partkey_attrs = palloc(sz);
			for ( i = 0; i < winfo->partkey_len; i ++ )
			{
				TargetEntry *tle; 
				tle = get_tle_by_resno(context->lower_tlist, winfo->partkey_attrs[i]);
				agg_coplan->partkey_attrs[i] = 
					addTargetToCoplan((Node*)tle, agg_coplan, context);
			}
		}
	}
	else
	{
		/* Later window coplans are all similar and need contain only 
		 * targets specifically used by this WindowInfo's WindowRefs. */			
		
		/* Include input key, if any. */
		if ( context->rowkey_len > 0 )
		{
			size_t sz = winfo->rowkey_len * sizeof(AttrNumber);
			window_coplan->rowkey_len = context->rowkey_len;
			window_coplan->rowkey_attrs = palloc(sz);
			for ( i = 0; i < context->rowkey_len; i ++ )
			{
				TargetEntry *tle;
				
				
				tle = get_tle_by_resno(context->keyed_lower_tlist, 
									   context->rowkey_attrs[i]);
									 
				window_coplan->rowkey_attrs[i] = 
					addTargetToCoplan((Node*)tle, window_coplan, context);
			}
		}
			
		/* Include partition key, if needed. */
		if ( (agg_coplan != NULL) && winfo->partkey_len > 0 )
		{
			size_t sz = winfo->partkey_len * sizeof(AttrNumber);
			window_coplan->partkey_len = winfo->partkey_len;
			window_coplan->partkey_attrs = (AttrNumber*)palloc(sz);
			agg_coplan->partkey_len = winfo->partkey_len;
			agg_coplan->partkey_attrs = (AttrNumber*)palloc(sz);
			for ( i = 0; i < winfo->partkey_len; i++ )
			{
				TargetEntry *tle; 
			
				tle = get_tle_by_resno(context->keyed_lower_tlist, 
									   winfo->partkey_attrs[i]);

				window_coplan->partkey_attrs[i] = 
					addTargetToCoplan((Node*)tle, window_coplan, context);
				agg_coplan->partkey_attrs[i] = 
					addTargetToCoplan((Node*)tle, agg_coplan, context);
			}
		}
	}
	
	
	/* Now add targets for each RefInfo covered by this WindowInfo.
	 *
	 * TODO Reorganize context->refinfos into an array.  Use varsets
	 *      to access the refinfos. 
	 */
	foreach ( lc, context->refinfos )
	{
		WindowRef *ref;
		AttrNumber resno;
		RefInfo *rinfo = (RefInfo *)lfirst(lc);
		SpecInfo *sinfo = &context->specinfos[rinfo->specindex];
		
		if ( sinfo->windowindex != window_index )
			continue;
		
		ref = rinfo->ref;
		
		if ( RefInfo_AggregateUnordered(rinfo, context) ) // ( rinfo->isagg && sinfo->order == NIL )
		{
			/* Plan unordered aggregate */
			Aggref *aggref;
			Var *var;
			
			aggref = makeWindowAggref(ref);
			resno = addTargetToCoplan((Node*)aggref, agg_coplan, context);
			
			/* Add result expr to ref */
			var = makeVar(agg_coplan->varno, resno, ref->restype, -1, false);
			rinfo->resultexpr = (Expr*)var;
		}
		else if ( RefInfo_WindowDeferred(rinfo, context) ) // ( !rinfo->isagg && rinfo->needcount )
		{
			/* Plan deferred window function */
			FuncExpr *func;
			Var *arg1, *arg2;
			Oid counttype = 20; /* TODO count(*) type in pg_proc */
			Oid restype = ref->restype;
			
			ref->winstage = WINSTAGE_PRELIMINARY;
			ref->restype = rinfo->winpretype;

			resno = addTargetToCoplan((Node*)ref, window_coplan, context);			
			arg1 = makeVar(window_coplan->varno, resno, ref->restype, -1, false);
			
			resno = addTargetToCoplan((Node*)makeAuxCountAggref(), agg_coplan, context);
			arg2 = makeVar(agg_coplan->varno, resno, counttype , -1, false);
			
			/* Add result expr to ref */
			func = makeFuncExpr(rinfo->winfinfunc, restype, list_make2(arg1, arg2), COERCE_DONTCARE);
			rinfo->resultexpr = (Expr*)func;
		}
		else if ( RefInfo_WindowImmediate(rinfo, context) || 
				  RefInfo_AggregateOrdered(rinfo, context) ) // ( rinfo->isagg || !rinfo->needcount )
		{
			/* Plan immediate window function or ordered window agg) */
			ref->winstage = WINSTAGE_IMMEDIATE;
			resno = addTargetToCoplan((Node*)ref, window_coplan, context);
			
			/* Add result expr to ref */
			rinfo->resultexpr = (Expr*)makeVar(window_coplan->varno, resno, ref->restype, -1, false);
		}
		else
		{
			elog(ERROR,"internal error planning window function call");
		}
		
		if ( agg_coplan != NULL )
			agg_coplan->trans_space += rinfo->trans_space; /* may over count, just an estimate */
	}
	
	/* Store results. */
	winfo->window_coplan = window_coplan;
	winfo->agg_coplan = agg_coplan;
}


/* Return a target list to use as the join key for the separate window
 * coplans generated by a non-trivial window query.  The key consists
 * of a segment number and row number and must be evaluated within a
 * Window node.  
 *
 * The caller is responsible for assigning appropriate resno values 
 * in the targets.  (They are initialized, here, to zero!).
 */
static List *make_rowkey_targets()
{
	FuncExpr *seg;
	WindowRef *row;
	
	seg = makeFuncExpr(MPP_EXECUTION_SEGMENT_OID, 
					   MPP_EXECUTION_SEGMENT_TYPE, 
					   NIL, 
					   COERCE_DONTCARE);
								  
	row = makeNode(WindowRef);
	row->winfnoid = ROW_NUMBER_OID;
	row->restype = ROW_NUMBER_TYPE;
	row->args = NIL;
	row->winlevelsup = row->winspec = row->winindex = 0;
	row->winstage = WINSTAGE_ROWKEY; /* so setrefs doesn't get confused  */
	row->winlevel = 0;
	
	return list_make2(
		makeTargetEntry((Expr*)seg, 1, pstrdup("segment_join_key"), false),
		makeTargetEntry((Expr*)row, 1, pstrdup("row_join_key"), false) );
}


static Coplan *makeCoplan(CoplanType type, WindowContext *context)
{
	Index varno =  ++context->coplan_count;
	Coplan *coplan = palloc0(sizeof(Coplan));
	
	Assert ( type==COPLAN_WINDOW || type==COPLAN_AGG );
	Assert ( varno > 0 );
	
	coplan->type = type;
	coplan->varno = varno;
	coplan->num_aggs = 0;
	
	return coplan;
}

/* Return the atttribute number of the given target in the given coplan.
 *
 * The target may be a TargetEntry (in which case we copy it) or an Expr
 * (in which case we construct a TargetEntry and copy the Expr into it).
 * The result is the attribute number (resno) of the new target in the
 * coplan's target list.
 *
 * TODO Don't make a new target if an identical one exists already.
 * TODO API? 
 */
static AttrNumber addTargetToCoplan(Node *target, Coplan *coplan, WindowContext *context)
{
	char *name;
	TargetEntry *tle;
	AttrNumber resno = ++coplan->last_resno;
	
	if ( IsA(target,TargetEntry) )
	{
		tle = copyObject(target);
	}
	else
	{
		tle = makeTargetEntry(copyObject(target), resno, 
							  pstrdup("window_column"), false);
	}
	
	if ( IsA(target, Aggref) )
		coplan->num_aggs++;
	
	/* TODO Really should do better.  Need names for rangetable erefs. */
	if ( tle->resname != NULL )
		name = pstrdup(tle->resname);
	else
		name = pstrdup("coplan_target");
	
		
	tle->resno = resno;
	coplan->targetlist = lappend(coplan->targetlist, tle);
	coplan->targetnames = lappend(coplan->targetnames, makeString(name));
	
	Assert( resno == list_length(coplan->targetlist) );
	return resno;
}


static Aggref* makeWindowAggref(WindowRef *winref)
{
	Aggref *aggref = makeNode(Aggref);
	
	aggref->aggfnoid = winref->winfnoid;
	aggref->aggtype = winref->restype;
	aggref->args = copyObject(winref->args);
	aggref->agglevelsup = 0;
	aggref->aggstar = false; /* at this point in processing, doesn't matter */
	aggref->aggdistinct = winref->windistinct;
	aggref->aggstage = AGGSTAGE_NORMAL;

	return aggref;
}

static Aggref* makeAuxCountAggref()
{
	Aggref *aggref = makeNode(Aggref);
	
	aggref->aggfnoid = 2803; /* TODO count(*) oid define in pg_proc.h */
	aggref->aggtype = 20; /* TODO count(*) result type oid in pg_proc.h */
	aggref->args = NIL;
	aggref->agglevelsup = 0;
	aggref->aggstar = true; 
	aggref->aggdistinct = false; 
	aggref->aggstage = AGGSTAGE_NORMAL;
	
	return aggref;
}

/* Constructs a plan for a single coplan of a window query and returns it 
 * as a variant form of subquery RTE.  
 * 
 * The input subplan (and associated locus and pathkeys) should have the 
 * same result as the "common subquery", but need not be a plan for that 
 * query.  For example, it might be a ShareInputScan. The subplan is copied, 
 * not used directly. 
 *
 * XXX Perhaps copy should be optional based an additional argument?
 *
 * The generated RTE differs from an ordinary subquery RTE in that it
 * it holds the produced plan and associated pathkeys.  These are used
 * later in the optimizer short-circuit the recursive planning of the
 * RTE's subquery.
 *
 * TODO The spare fields that hold the plan aren't in, e.g., copyfuncs.c.
 *      Decide whether or not we need to add them!!!
 */

static RangeTblEntry *rte_for_coplan(
		PlannerInfo *root,
		Plan *input_plan, 
		CdbPathLocus subplan_locus,
		List *subplan_pathkeys,
		WindowInfo *winfo,
		Coplan *coplan)
{
	RangeTblEntry *rte;
	Plan *new_plan = input_plan;
	Alias *new_eref;
	
	/* Now new_plan is suitable input for a coplan of this winfo. 
	 * Add the coplan processing.
	 */
	switch ( coplan->type )
	{
		case COPLAN_WINDOW:
			new_plan = (Plan*) 
				make_window(root, coplan->targetlist, 
						winfo->partkey_len, winfo->partkey_attrs,
						winfo->key_list, /* XXX copy? */
						new_plan);
		break;
	case COPLAN_AGG:
		if ( coplan->partkey_len > 0 )
		{
			/* Grouped aggregation. */
			
			/* TODO Fix this cheesy estimate. */
			double d = new_plan->plan_rows / 100.0;
			long num_groups = (d < 0)? 0: ( d > LONG_MAX )? LONG_MAX: (long)d;
			
			new_plan = (Plan*)
				make_agg(root, coplan->targetlist, NULL,
						 AGG_SORTED, false,
						 winfo->partkey_len, winfo->partkey_attrs,
						 num_groups,
						 0, /* num_nullcols */
						 0, /* input_grouping */
						 0, /* grouping_id */
						 0, /* rollup_gs_times */
						 coplan->num_aggs, coplan->trans_space,
						 new_plan);
		}
		else
		{
			/* Plain aggregation, no grouping. */
			new_plan = (Plan*)
				make_agg(root, coplan->targetlist, NULL,
						 AGG_PLAIN, false,
						 0, NULL,
						 1, /* number of groups */
						 0, /* num_nullcols */
						 0, /* input_grouping */
						 0, /* grouping_id */
						 0, /* rollup_gs_times */
						 coplan->num_aggs, coplan->trans_space,
						 new_plan);
		}
		break;
	default:
		elog(ERROR,"invalid window coplan");
	}

	if (!new_plan->flow)
		new_plan->flow = pull_up_Flow(new_plan, 
				new_plan->lefttree, 
				(subplan_pathkeys != NIL));

	/* Need to specify eref for messages. */
	new_eref = makeNode(Alias);
	new_eref->aliasname = pstrdup("coplan");
	new_eref->colnames = coplan->targetnames;
	
	rte = package_plan_as_rte(root->parse, new_plan, new_eref, subplan_pathkeys);

	return rte;
}

/* Package a plan as a pre-planned subquery RTE.
 *
 * Note that the input query is often root->parse (since that is the
 * query from which this invocation of the planner usually takes its 
 * context), but may be a derived query, e.g., in the case of sequential 
 * window plans or multiple-DQA pruning (in cdbgroup.c).
 * 
 * Note also that the supplied plan's target list must be congruent with
 * the supplied query: its Var nodes must refer to RTEs in the range 
 * table of the Query node, it should conserve sort/group reference
 * values, and its SubqueryScan nodes should match up with the query's
 * Subquery RTEs.
 *
 * The result is a pre-planned subquery RTE which incorporates the given
 * plan, alias, and pathkeys (if any) directly.  The input query is not
 * modified.
 *
 * The caller must install the RTE in the range table of an appropriate query 
 * and the corresponding plan should reference its results through a 
 * SubqueryScan node.
 */
RangeTblEntry *package_plan_as_rte(Query *query, Plan *plan, Alias *eref, List *pathkeys)
{
	Query *subquery;
	RangeTblEntry *rte;
	
	
	Assert( query != NULL );
	Assert( plan != NULL );
	Assert( eref != NULL );
	Assert( plan->flow != NULL ); /* essential in a pre-planned RTE */
	
	/* Make a plausible subquery for the RTE we'll produce. */
	subquery = makeNode(Query);
	memcpy(subquery, query, sizeof(Query));
	
	subquery->querySource = QSRC_PLANNER;
	subquery->canSetTag = false;
	subquery->resultRelation = 0;
	subquery->intoClause = NULL;
	
	subquery->rtable = copyObject(subquery->rtable);

	subquery->targetList = copyObject(plan->targetlist);
	subquery->windowClause = NIL;
	
	subquery->distinctClause = NIL;
	subquery->sortClause = NIL;
	subquery->limitOffset = NULL;
	subquery->limitCount = NULL;
	
	Assert( subquery->setOperations == NULL );
	
	/* Package up the RTE. */
	rte = makeNode(RangeTblEntry);
	rte->rtekind = RTE_SUBQUERY;
	rte->subquery = subquery;
	rte->eref = eref;
	rte->subquery_plan = plan;
	rte->subquery_rtable = subquery->rtable;
	rte->subquery_pathkeys = pathkeys;

	return rte;
}

/*
 * construct_share_plans - construct share plans for both window
 * coplans and aggregate coplans. When one WindowInfo contains
 * both a window coplan and an aggregate coplan, we share
 * more than just the given subplan. This function takes care
 * of that.
 *
 * For two different WindowInfos, the given subplan is shared.
 */
static List *
construct_share_plans(PlannerInfo *root, WindowContext *context)
{
	List *lower_share_nodes = NIL;
	List *share_nodes = NIL;
	int i;
	ListCell *lc;
	
	if (context->nwindowinfos > 1)
	{
		lower_share_nodes = share_plan(root, context->subplan,
									   context->nwindowinfos);
	}
	else
	{
		Assert(context->nwindowinfos == 1);
		lower_share_nodes = list_make1(context->subplan);
	}

	lc = list_head(lower_share_nodes);

	context->subplans_pathkeys = NIL;
	for (i = 0; i < context->nwindowinfos; i++)
	{
		WindowInfo *winfo = context->windowinfos + i;
		Plan *share_node;
		
		Assert(lc);
		share_node = (Plan *)lfirst(lc);

		if (winfo->window_coplan != NULL &&
			winfo->agg_coplan != NULL)
		{
			List *new_share_nodes = NIL;
			List *pathKeys = (List *)copyObject(context->subplan_pathkeys);
			
			Plan *new_plan =
				assure_collocation_and_order(root,
											 share_node,
											 winfo->partkey_len, 
											 winfo->partkey_attrs,
											 winfo->sortclause,
											 context->subplan_locus,
											 NULL,
											 &pathKeys);
			new_share_nodes = share_plan(root, new_plan, 2);
			share_nodes = list_concat(share_nodes, new_share_nodes);

			/* Append two copies of new pathkeys */
			context->subplans_pathkeys = lappend(context->subplans_pathkeys,
												 pathKeys);
			context->subplans_pathkeys = lappend(context->subplans_pathkeys,
												 pathKeys);
		}

		else if (winfo->window_coplan != NULL ||
				 winfo->agg_coplan != NULL)
		{
			List *pathKeys = (List *)copyObject(context->subplan_pathkeys);
			Plan *new_plan =
				assure_collocation_and_order(root,
											 share_node,
											 winfo->partkey_len, 
											 winfo->partkey_attrs,
											 winfo->sortclause,
											 context->subplan_locus,
											 NULL,
											 &pathKeys);
			share_nodes = lappend(share_nodes, new_plan);
			
			context->subplans_pathkeys = lappend(context->subplans_pathkeys,
												 pathKeys);
		}

		lc = lnext(lc);
	}

	return share_nodes;
}

/* 
 * plan_window_rtable is the stub for plan_window_rtable_dup/share.
 *
 * XXX TODO: Right now we either always dup the subplan or always share
 * the sub plan, depending on a guc gp_window_shareinput.
 * Should make this cost based
 */
List *plan_window_rtable(PlannerInfo *root, WindowContext *context)
{
	int i;
	RangeTblEntry *rte;
	List *sharedNodes = NIL;
	List *rtable = NIL;
	ListCell *lc = NULL;
	ListCell *pk_lc = NULL;

	sharedNodes = construct_share_plans(root, context);
	Assert(list_length(sharedNodes) == context->coplan_count);

	lc = list_head(sharedNodes);
	pk_lc = list_head(context->subplans_pathkeys);

	for ( i = 0; i < context->nwindowinfos; i++ )
	{
		WindowInfo *winfo = context->windowinfos + i; 
		if ( winfo->window_coplan != NULL )
		{
			Assert(lc);
			Assert(pk_lc);
			
			rte = rte_for_coplan(root, 
					(Plan *) lfirst(lc),
					context->subplan_locus,
					(List *)lfirst(pk_lc),
					winfo,
					winfo->window_coplan);
			lc = lnext(lc);
			pk_lc = lnext(pk_lc);
			rtable = lappend(rtable, rte);

		}

		if ( winfo->agg_coplan != NULL )
		{
			Assert(lc);
			Assert(pk_lc);
			rte = rte_for_coplan(root, 
					(Plan *) lfirst(lc),
					context->subplan_locus,
					(List *)lfirst(pk_lc),
					winfo,
					winfo->agg_coplan);
			lc = lnext(lc);
			pk_lc = lnext(pk_lc);
			rtable = lappend(rtable, rte);
		}
	}

	Assert( list_length(rtable) == context->coplan_count );
	return rtable;
}

/* Plan the FROM-WHERE clauses to rejoin the coplans.
 */
static FromExpr *plan_window_jointree(PlannerInfo *root, WindowContext *context)
{
	FromExpr *jointree;
	List *from = NIL;
	List *quals = NIL;
	RangeTblRef *rtr;
	Coplan *coplan;
	int i,j;
	Oid *rowkey_types = NULL;
	Coplan *base_window_coplan = NULL;
	
	
	for ( i = 0; i < context->nwindowinfos; i++ )
	{
		WindowInfo *winfo = &context->windowinfos[i];
		
		for ( j = 0,	coplan = winfo->window_coplan; 
			  j < 2; 
			  j++,		coplan = winfo->agg_coplan ) 
		{
			if ( coplan != NULL ) 
			{
				rtr = makeNode(RangeTblRef);
				rtr->rtindex = coplan->varno;
				from = lappend(from, rtr);
			}
		}

		if ( context->rowkey_len > 0 )
		{
			/* Join window_coplans on the row key */
			if ( i == 0 )
			{
				Assert( winfo->window_coplan != NULL );
				base_window_coplan = winfo->window_coplan;
				rowkey_types = get_window_attr_types(((Plan *)(context->subplan))->targetlist, context->rowkey_len, context->rowkey_attrs);
			}
			else if ( winfo->window_coplan != NULL )
			{
				/* Append qual to join base_window_coplan to 
				 * winfo->window_coplan on the row key.
				 */
				for ( j = 0; j < context->rowkey_len; j++ )
				{
					Expr *term = make_window_join_term(rowkey_types[j], 
													   base_window_coplan->varno,
													   base_window_coplan->rowkey_attrs[j],
													   winfo->window_coplan->varno,
													   winfo->window_coplan->rowkey_attrs[j],
													   true); /* true - no nulls */
					quals = lappend(quals, term);
				}
			}
		}
		
		if ( winfo->partkey_len > 0 && winfo->window_coplan != NULL && winfo->agg_coplan != NULL )
		{
			Oid *partkey_types;
			partkey_types = get_window_attr_types(context->lower_tlist, winfo->partkey_len, winfo->partkey_attrs);
			
			/* Append quals to join winfo->window_coplan to winfo->agg_coplan
			 * on the  the partitioning key. 
			 */
			for ( j = 0; j < winfo->partkey_len; j++ )
			{
				Expr *term = make_window_join_term(partkey_types[j], 
												   winfo->window_coplan->varno,
												   winfo->window_coplan->partkey_attrs[j],
												   winfo->agg_coplan->varno,
												   winfo->agg_coplan->partkey_attrs[j],
												   false); /* false - may contain nulls */
				quals = lappend(quals, term);
			}
			
			pfree(partkey_types);
		}
	}
	if ( rowkey_types != NULL )
		pfree(rowkey_types);
		
	jointree = makeNode(FromExpr);
	jointree->fromlist = from;
	jointree->quals = (Node*)quals;
	
	return jointree;
}

static Expr *make_window_join_term(Oid type, Index lftvarno, AttrNumber lftattrno, Index rgtvarno, AttrNumber rgtattrno, bool no_nulls)
{
	Var *lft = makeVar(lftvarno, lftattrno, type, -1, 0);
	Var *rgt = makeVar(rgtvarno, rgtattrno, type, -1, 0);
	Expr *xpr = make_op(NULL, list_make1(makeString("=")), (Node*)lft, (Node*)rgt, -1);
	
	if ( ! no_nulls )
	{
		/* Change the equality OpExpr into <NOT IS DISTINCT> */
		xpr->type = T_DistinctExpr;
		xpr = make_notclause(xpr);
	}

	return xpr;
}

static Oid *get_window_attr_types(List *targetlist, int nattrs, AttrNumber *attrs)
{
	int i;
	Oid *types = palloc0(nattrs*sizeof(Oid));
	
	for ( i = 0; i < nattrs; i++ )
	{
		TargetEntry *tle = get_tle_by_resno(targetlist, attrs[i]);
		types[i] = exprType((Node*)tle->expr);
	}
	return types;
}


/* Translate a targetlist on the lower range (of the common subquery) to
 * a targetlist on the upper range.
 *
 * TODO Avoid reevaluating sort/group expressions that have already been
 *      evaluated in the common subquery. 
 *
 * This translator is specific to parallel window plans!
 */

static Node * translate_upper_tlist_parallel_mutator(Node *node, WindowContext *context)
{
	if ( node == NULL )
		return NULL;
		
	if ( IsA(node, Var) )
	{
		Var *new_var;
		Var *var = (Var *)node;
		
		/* We don't descend into WindowRefs, so Var isn't in one. */
		if ( var->varlevelsup == 0 )
		{
			int idx = index_of_var(var, context);
			AttrNumber attrno = context->offset_upper_varattrnos[idx];
			new_var = makeVar(
						1, /* first RTE has all entries */
						attrno,
						var->vartype,
						var->vartypmod,
						0 /* levels up known 0 */);
		}
		else
			new_var = copyObject(var);

		return (Node *) new_var;
	}
	else if ( IsA(node, WindowRef) )
	{
		WindowRef *ref = (WindowRef *)node;
		RefInfo *rinfo = (RefInfo *)list_nth(context->refinfos, ref->winindex);
		return copyObject(rinfo->resultexpr); /* XXX why copy? */
	}
	
	return expression_tree_mutator(node, translate_upper_tlist_parallel_mutator, (void*) context);
}

static List *translate_upper_tlist_parallel(List *orig_tlist, WindowContext *context)
{
	List *new_tlist;
	
	new_tlist =  (List*)  
		expression_tree_mutator((Node*)orig_tlist,
								 translate_upper_tlist_parallel_mutator, 
								 (void*)context );
	
	return new_tlist;
}


/* This translator is specific to sequential window plans!
 */
typedef struct xut_context
{
	WindowContext *context;
	List *window_tlist;
} xut_context;

static Node * translate_upper_tlist_sequential_mutator(Node *node, xut_context *ctxt)
{
	WindowContext *context = ctxt->context;
	
	if ( node == NULL )
		return NULL;
		
	if ( IsA(node, Var) )
	{
		Var *new_var;
		Var *var = (Var *)node;
		
		/* We don't descend into WindowRefs, so Var isn't in one. */
		if ( var->varlevelsup == 0 )
		{
			int idx = index_of_var(var, context);
			AttrNumber attrno = context->offset_upper_varattrnos[idx];
			new_var = makeVar(
						1, /* first RTE has all entries */
						attrno,
						var->vartype,
						var->vartypmod,
						0 /* levels up known 0 */);
		}
		else
			new_var = copyObject(var);

		return (Node *) new_var;
	}
	else if ( IsA(node, WindowRef) )
	{
		WindowRef *ref = (WindowRef *)node;
		RefInfo *rinfo = (RefInfo *)list_nth(context->refinfos, ref->winindex);
		TargetEntry *tle = get_tle_by_resno(ctxt->window_tlist, rinfo->resno);
		return copyObject(tle->expr);
	}
	
	return expression_tree_mutator(node, translate_upper_tlist_sequential_mutator, (void*) ctxt);
}


static List *translate_upper_tlist_sequential(List *orig_tlist, List *window_tlist, WindowContext *context)
{
	xut_context ctxt;
	List *new_tlist;
	
	ctxt.context = context;
	ctxt.window_tlist = window_tlist;
		
	new_tlist =  (List*)  
		expression_tree_mutator((Node*)orig_tlist,
								 translate_upper_tlist_sequential_mutator, 
								 &ctxt);
	
	return new_tlist;
}


/* This translator is specific to sequential window plans. It converts Vars
 * the given tree (including top-level WindowRef node args, and WindowFrameEdge 
 * vals from the input range to the intermediate range used in sequential
 * planning.
 */
typedef struct xuv_context
{
	WindowContext *context;
	bool is_top_level;
} xuv_context;

static Node * translate_upper_vars_sequential_mutator(Node *node, xuv_context *ctxt)
{
	WindowContext *context = ctxt->context;
	
	if ( node == NULL )
		return NULL;
		
	if ( IsA(node, Var) )
	{
		Var *new_var;
		Var *var = (Var *)node;
		
		/* We don't descend into WindowRefs, so Var isn't in one. */
		if ( var->varlevelsup == 0 )
		{
			int idx = index_of_var(var, context);
			AttrNumber attrno = context->offset_upper_varattrnos[idx];
			new_var = makeVar(
						1, /* first RTE has all entries */
						attrno,
						var->vartype,
						var->vartypmod,
						0 /* levels up known 0 */);
		}
		else
			new_var = copyObject(var);

		return (Node *) new_var;
	}
	else if ( IsA(node, WindowRef) )
	{
		/* XXX We violate the spirit of the thing here by actually modifying
		 *     and returning the WindowRef itself.  We can't copy it because
		 *     its RefInfo's link must still find it.
		 */
		WindowRef *ref = (WindowRef *)node;
		
		if ( ! ctxt->is_top_level )
			elog(ERROR, "nested window reference invalid");		
		ctxt->is_top_level = false;
		
		ref->args = (List*)expression_tree_mutator((Node*)ref->args, translate_upper_vars_sequential_mutator, (void*) ctxt);
		
		ctxt->is_top_level = true;
		return (Node*)ref;
	}
	else if( IsA(node, WindowKey) )
	{
		WindowKey *key = (WindowKey*)node;
		WindowKey *newkey;
		
		if ( ! ctxt->is_top_level )
			elog(ERROR, "nested window key invalid");		
		ctxt->is_top_level = false;
		
		newkey = makeNode(WindowKey);
		memcpy(newkey, key, sizeof(WindowKey));
		if ( key->numSortCols > 0 )
		{
			size_t sz;
			
			sz = key->numSortCols * sizeof(AttrNumber);
			newkey->sortColIdx = (AttrNumber*)palloc(sz);
			memcpy(newkey->sortColIdx, key->sortColIdx, sz);
			
			sz = key->numSortCols * sizeof(Oid);
			newkey->sortOperators = (Oid*)palloc(sz);
			memcpy(newkey->sortOperators, key->sortOperators, sz);
		}
		
		newkey->frame = (WindowFrame*)expression_tree_mutator(
										(Node*)key->frame, 
										translate_upper_vars_sequential_mutator, 
										(void*) ctxt);
		
		ctxt->is_top_level = true;
		return (Node*)newkey;
	}
	
	return expression_tree_mutator(node, translate_upper_vars_sequential_mutator, (void*) ctxt);
}


static Node *translate_upper_vars_sequential(Node *node, WindowContext *context)
{
	Node *result;
	xuv_context ctxt;
	
	ctxt.context = context;
	ctxt.is_top_level = true;
	
	if ( context->original_range )
	{
		result = node;
	}
	else
	{
		result =  
			expression_tree_mutator(node,
									translate_upper_vars_sequential_mutator, 
									&ctxt);
	}
	
	return result;
}



/* Utility to get a name for a function to use as an eref. */
char *get_function_name(Oid proid, const char *dflt)
{
	char	   *result;

	if (!OidIsValid(proid))
	{
		result = pstrdup(dflt);
	}
	else
	{
		int			fetchCount;

		result = caql_getcstring_plus(
				NULL,
				&fetchCount,
				NULL,
				cql("SELECT proname FROM pg_proc "
					" WHERE oid = :1 ",
					ObjectIdGetDatum(proid)));
	
		if (!fetchCount)
			result = pstrdup(dflt);
	}

	return result;
}

/* Utility to get a name for a tle to use as an eref. */
Value *get_tle_name(TargetEntry *tle, List *rtable, const char *default_name)
{
	char *name = NULL;
	Node *expr = (Node*)tle->expr;
	
	if ( tle->resname != NULL )
	{
		name = pstrdup(tle->resname);
	}
	else if ( IsA(tle->expr, Var) && rtable != NULL )
	{
		Var *var = (Var*)tle->expr;
		RangeTblEntry *rte = rt_fetch(var->varno, rtable);
		name = pstrdup(get_rte_attribute_name(rte, var->varattno));
	}
	else if ( IsA(tle->expr, WindowRef) )
	{
		if ( default_name == NULL ) default_name = "window_func";
		name = get_function_name(((WindowRef*)expr)->winfnoid, default_name);
	}
	else if ( IsA(tle->expr, Aggref) )
	{
		if ( default_name == NULL ) default_name = "aggregate_func";
		name = get_function_name(((Aggref*)expr)->aggfnoid, default_name);
	}
	
	if ( name == NULL )
	{
		if (default_name == NULL ) default_name = "unnamed_attr";
		name = pstrdup(default_name);
	}
	
	return makeString(name);
}


/* Wrap a window plan under construction in a SubqueryScan thereby 
 * renumbering its attributes so that its targetlist becomes a list 
 * of Var nodes on a single entry range table (thus the only varno 
 * is 1).
 *
 * The input consists of
 * - the window Plan (with attached Flow) under development
 * - the pathkeys corresponding to the plan
 * - the Query tree corresponding to the plan
 * 
 * The output consists of
 * - the Plan modified by the addition of a SubqueryScan node
 *   (explicit result), and
 * - a Query tree representing simple select of all targets
 *   from the sub-query (in location query_p).
 *
 * Input query table expression gets pushed down into a sqry RTE.
 * 
 * The caller is responsible for translating pathkeys and locus, if needed.
 *
 * XXX Is it necessary to adjust varlevelsup in the plan and query trees?
 *     I don't think so.
 */
Plan *wrap_plan(PlannerInfo *root, Plan *plan, Query *query, 
				List **p_pathkeys, 
				const char *alias_name, List *col_names, 
				Query **query_p)	
{
	Query *subquery;
	Alias *eref;
	RangeTblEntry *rte;
	RangeTblRef *rtr;
	FromExpr *jointree;
	List *subq_tlist = NIL;
	Index varno = 1;
	int *resno_map;
	
	Assert( query_p != NULL );
	
	/*
	 * If NIL passed for col_names, generates it from subplan's targetlist.
	 */
	if (col_names == NIL)
	{
		ListCell	   *l;

		foreach (l, plan->targetlist)
		{
			TargetEntry	   *tle = lfirst(l);
			Value		   *col_name;

			col_name = get_tle_name(tle, query->rtable, NULL);
			col_names = lappend(col_names, col_name);
		}
	}
	/* Make the subquery RTE. Note that this will include a Query derived
	 * from the input Query.
	 */
	eref = makeNode(Alias);
	eref->aliasname = pstrdup(alias_name);
	eref->colnames = col_names;

	rte = package_plan_as_rte(query, plan, eref, p_pathkeys ? *p_pathkeys : NIL);
	
	/* Make the target list for the plan and for the wrapper Query
	 * that will correspond to it.
	 */
	subq_tlist = generate_subquery_tlist(varno, plan->targetlist, false, &resno_map);
	
	/* Make the plan. 
	 */
	plan = (Plan*)make_subqueryscan(root, subq_tlist, NIL, varno, plan, query->rtable);
	mark_passthru_locus(plan, true, true);
	
	/* Make the corresponding Query.
	 */
	subquery = makeNode(Query);
	
	subquery->commandType = CMD_SELECT;
	subquery->querySource = QSRC_PLANNER;
	subquery->canSetTag = false;
	subquery->intoClause = NULL;
	subquery->rtable = list_make1(rte);	
	rtr = makeNode(RangeTblRef);
	rtr->rtindex = varno;
	jointree = makeNode(FromExpr);
	jointree->fromlist = list_make1(rtr);
	subquery->jointree = jointree;
	subquery->targetList = subq_tlist;
	subquery->windowClause = copyObject(query->windowClause); /* XXX need translation for frame vals */

	/* The rest may default to zero ...
	
	subquery->utilityStmt = NULL;
	subquery->resultRelation = 0;
	subquery->intoClause = NULL;
	subquery->hasAggs = false;
	subquery->hasWindFuncs = false;
	subquery->hasSubLinks = false;
	subquery->returningList = NIL;
	subquery->groupClause = NIL;
	subquery->havingClause = NULL;
	subquery->distinctClause = NIL;
	subquery->sortClause = NIL;
	subquery->limitOffset = NULL;
	subquery->limitCount = NULL;
	subquery->rowMarks = NIL;
	subquery->setOperations = NULL;
	subquery->resultRelations = NIL;
	subquery->returningLists = NIL;
	
	... */

	/* Reconstruct equi_key_list since the rtable has changed.
	 * XXX we leak the old one.
	 */
	root->equi_key_list =
		construct_equivalencekey_list(root->equi_key_list,
									  resno_map,
									  ((SubqueryScan *)plan)->subplan->targetlist,
									  subq_tlist);

	/* Construct the new pathkeys */
	if (p_pathkeys != NULL)
		*p_pathkeys = reconstruct_pathkeys(root, *p_pathkeys, resno_map,
										   ((SubqueryScan *)plan)->subplan->targetlist,
										   subq_tlist);
	pfree(resno_map);
	
	*query_p = subquery;
	return plan;
}

/* Include the auxiliary aggregation plan into a wrapper query (a window 
 * query with a singleton range table and null quals) by adding a new 
 * subquery RTE representing the aggregation plan.  Add a corresponding
 * subquery scan node to the top of the aggregation plan.
 */
Plan *add_join_to_wrapper(
				PlannerInfo *root,
				Plan *plan, Query *query, /* the auxiliary aggregate to join */
				List *join_tlist,
				unsigned partkey_len, /* count of leading key columns of auxiliary */
				const char *alias_name, List *col_names, /* for auxiliary RTE */
				Query *wrapper_query /* the window query to modify */
				)
{
	Alias *eref;
	RangeTblEntry *rte;
	RangeTblRef *rtr;
	List *subq_tlist = NIL;
	Index varno = 2; /* Always */
	List *pathkeys = NIL;
	int *resno_map;
	
	Insist( list_length(wrapper_query->rtable) == 1 );
	Insist( wrapper_query->jointree->quals == NULL );
	
	/* Make the new subquery RTE. Note that this will include a Query derived
	 * from the input Query.
	 */
	eref = makeNode(Alias);
	eref->aliasname = pstrdup(alias_name);
	eref->colnames = col_names;

	rte = package_plan_as_rte(query, plan, eref, pathkeys);
	
	/* Make the target list for the plan and for the wrapper Query
	 * that will correspond to it.
	 */
	subq_tlist = generate_subquery_tlist(varno, plan->targetlist, false, &resno_map);
	
	/* Make the plan. 
	 */
	plan = (Plan*)make_subqueryscan(root, subq_tlist, NIL, varno, plan, query->rtable);
	mark_passthru_locus(plan, true, true);

	
	/* Add new RTE */
	wrapper_query->rtable = lappend(wrapper_query->rtable, rte);	
	
	rtr = makeNode(RangeTblRef);
	rtr->rtindex = varno;
	wrapper_query->jointree->fromlist = lappend(wrapper_query->jointree->fromlist, rtr);
	
	wrapper_query->targetList = copyObject(join_tlist);
	
	/* XXX Do we need to add a join qual? I don't think so. */	
	
	return plan;
}

/* Copy the given query but use the given target list instead of the one
 * in the query.  The given target list should be "flat" and conformable
 * to the range table of the query and any sort/group refs.  In particular,
 * it may not include aggregate or window functions.  Null out the "windowing 
 * and above" portions of the copy but retain the windowClause so that
 * EXPLAIN can use it.
 *
 * We expect the grouping specification to be null already (due to the
 * syntactic transformation that separates windowing from grouping)  so 
 * we assert that.
 * 
 * This produces the initial value of the windowing subquery
 * from root->parse for sequential window planning.
 */
Query *copy_common_subquery(Query *original, List *targetList)
{
	/* Flat-copy the input */
	Query *common = makeNode(Query);
	memcpy(common, original, sizeof(Query));

	/* Perhaps not strictly necessary, but its what we expect. */
	Assert (common->groupClause == NIL && common->havingQual == NULL);
	Assert (common->hasAggs == false);
	Assert (common->setOperations == NULL);
	
	/* Leave rtable and jointree alone. */
	
	/* New target list must assure no aggregation or windowing. */
	common->targetList = targetList;
	
	common->commandType = CMD_SELECT;
	common->querySource = QSRC_PLANNER;
	common->canSetTag = false;
	common->intoClause = NULL;
	common->windowClause = copyObject(original->windowClause);
	
	/* The rest is "blank". */
	common->resultRelation = 0;
	common->utilityStmt = NULL;
	common->intoClause = NULL;
	common->hasWindFuncs = false;
	common->hasSubLinks = false; /* XXX */
	common->returningList = NIL;
	common->distinctClause = NIL;
	common->sortClause = NIL;
	common->limitOffset = NULL;
	common->limitCount = NULL;
	common->rowMarks = NIL;
	common->resultRelations = NIL;
	common->returningLists = NIL;
	
	return common;
}

/*
 * Return true if a node contains WindowRefs.
 *
 * 'context' is not used in this function.
 */
bool
contain_windowref(Node *node, void *context)
{
	if (node == NULL)
		return false;
	
	if (IsA(node, WindowRef))
		return true;
	
	return expression_tree_walker(node, contain_windowref, NULL);
}

/*
 * Does the given window frame edge contains an expression that must be
 * evaluated at run time (i.e., may contain a Var)?
 */
bool
window_edge_is_delayed(WindowFrameEdge *edge)
{
	if (edge == NULL)
		return false;
	if ((edge->kind == WINDOW_DELAYED_BOUND_PRECEDING ||
		 edge->kind == WINDOW_DELAYED_BOUND_FOLLOWING) &&
			edge->val != NULL)
		return true;

	/* Non-delayed frame edge must not have Var */
	Assert(pull_var_clause((Node *) edge->val, false) == NIL);

	return false;
}
