blob: 3551410b8d04d6a71e422556d895b61531c3267a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* nodeWindowNew.c
* Routines to handle window nodes.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
/* XXX include list is speculative -- bhagenbuch */
#include "access/heapam.h"
#include "catalog/catquery.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "catalog/pg_window.h"
#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "executor/nodeWindow.h"
#include "optimizer/clauses.h"
#include "nodes/makefuncs.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/debugutils.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/numeric.h"
#include "utils/syscache.h"
#include "utils/tuplestorenew.h"
/* The initial size (in bytes) for an entry in the frame buffer. */
#define FRAMEBUFFER_ENTRY_SIZE 1024
/* Check for aggregate functions that have only transition functions,
* but not inverse preliminary functions or preliminary functions.
*/
#define HAS_ONLY_TRANS_FUNC(funcstate) \
(funcstate->isAgg && \
!OidIsValid(funcstate->invprelimfn_oid) && \
!OidIsValid(funcstate->prelimfn_oid) && \
OidIsValid(funcstate->transfn_oid))
/*
* A struct to hold all the data relevant to a value, used for
* buffering.
*/
typedef struct WindowValue
{
/*
* Total number of not NULL argument for the function
* associated with this value.
*
* This is only used for aggregate functions that have
* inverse preliminary functions. For this kind of functions,
* when their argument values are all NULLs in the current
* frame, their result value should be NULL as well.
* This value is used to tell if all argument values in
* the current frame are all NULLs.
*/
uint64 numNotNulls;
Datum value;
bool valueIsNull;
} WindowValue;
/*
* A higher-level structure to represent an entry in the frame buffer.
*/
typedef struct FrameBufferEntry
{
/* Keys for RANGE frames.
*
* This is only used for RANGE frames. For ROWS frame, these values
* are NULL.
*/
Datum *keys;
bool *nulls;
/* A list of intermediate function values to be buffered in the frame
* buffer. Each of values is of WindowValue type.
*/
List *func_values;
} FrameBufferEntry;
/*
* WindowStatePerLevelData - per-level working state
*/
typedef struct WindowStatePerLevelData
{
Index level; /* index of WindowKey for this level */
FmgrInfo *eqfunctions; /* equality fns for partial key */
FmgrInfo *ltfunctions; /* less-than functions for partial key */
bool need_peercount;
int64 group_index;
int64 prior_non_peer_count;
int64 peer_index;
int64 peer_count;
int64 rank;
int64 dense_rank;
int64 prior_rank;
int64 prior_dense_rank;
/* List of functions in this key level */
List *level_funcs;
/* The framing clause */
WindowFrame *frame;
/* Indicate if all functions have trivial frames. */
bool trivial_frames_only;
/* if the user didn't specify a frame, we use the default */
bool default_frame;
/* Indicate if the frame is a ROW frame or a RANGE frame. */
bool is_rows;
/* Indicate if the frame clause for this level has an edge of
* DELAY_BOUND type.
*/
bool has_delay_bound;
/* Indicate if all functions in this level are aggregate functions that
* have no preliminary functions or inverse preliminary functions.
*/
bool has_only_trans_funcs;
/* user can specify ascending or descending order, record it here */
bool *col_sort_asc;
/* The partial order by keys. There is at least one sort key in this array.
* It is used for convenience to find the order by keys since we need to
* store them in the frame buffer for RANGE-based framing.
*/
int numSortCols;
AttrNumber *sortColIdx;
Oid *sortOperators;
/* column type data */
Oid *col_types;
int2 *col_typlens;
bool *col_typbyvals;
/*
* Frame is contradictory and no tuples will fall into the frame, for
* example: BETWEEN 10 FOLLOWING AND 10 PRECEDING.
*/
bool empty_frame;
/* XXX: merge the following 4 fields into 2 */
/* state for rows frame is simple (for now) */
long int trail_rows;
long int lead_rows;
/* state for range frame is more complex */
Datum trail_range;
Datum lead_range;
/*
* The number of rows between the trailing edge and the current_row,
* and between the leading edge and the current_row.
*
* For preceding edges, these numbers are 0 or negative.
*/
long int num_trail_rows;
long int num_lead_rows;
/* state for frame edges (for both ROWS and RANGE frames) */
ExprState *trail_expr;
ExprState *lead_expr;
/* state for RANGE-frame edge computation */
ExprState *trail_range_expr;
ExprState *lead_range_expr;
/* RANGE frame only; see comments on init_bound_frame_edge_expr */
ExprState *trail_range_eq_expr;
ExprState *lead_range_eq_expr;
/* The frame buffer to buffer the intermediate results for
* the functions in this level key.
*/
struct WindowFrameBufferData *frame_buffer;
/* These two readers are pointing to the trailing and leading edges
* of this frame, respectively.
*/
NTupleStoreAccessor *trail_reader;
NTupleStoreAccessor *lead_reader;
/* Indicate if lead_reader is pointing to the exact edge position.
* With this flag, we don't need to do the tuple comparison again
* during checking if there are sufficient data to generate an
* output.
*/
bool lead_ready;
/* Indicate if funcstate->aggTransValue actually contains the aggregate
* value.
*/
bool agg_filled;
/*
* FrameBufferEntry buffers to avoid pallocs/pfrees.
*/
FrameBufferEntry *curr_entry_buf;
FrameBufferEntry *trail_entry_buf;
FrameBufferEntry *lead_entry_buf;
} WindowStatePerLevelData;
/*
* WindowStatePerFunctionData
*/
typedef struct WindowStatePerFunctionData
{
WindowRefExprState *wrxstate;
WindowRef *wref;
WindowStatePerLevelData *wlevel;
bool isAgg;
bool allowframe;
/* Indicate if this function requires a range cumulative frame. */
bool cumul_frame;
bool winpeercount;
int numargs; /* explicit, caller-supplied args */
int16 resulttypeLen;
bool resulttypeByVal;
/* Ordinary window function */
Datum win_value;
bool win_value_is_null;
Oid windowfn_oid;
FmgrInfo windowfn;
/* Aggregate-derived window function */
Oid transfn_oid;
Oid finalfn_oid;
Oid prelimfn_oid;
Oid invtransfn_oid;
Oid invprelimfn_oid;
FmgrInfo transfn;
FmgrInfo finalfn;
FmgrInfo prelimfn;
FmgrInfo invtransfn;
FmgrInfo invprelimfn;
Datum aggInitValue;
bool aggInitValueIsNull;
int16 aggTranstypeLen;
bool aggTranstypeByVal;
/* the intermediate transition value */
Datum aggTransValue;
bool aggTransValueIsNull;
bool aggNoTransValue;
/* the final transition value for output tuples */
Datum final_aggTransValue;
bool final_aggTransValueIsNull;
bool final_aggNoTransValue;
/*
* The final transition value may be a copy of a value stored in the frame
* buffre, or the above intermediate in-memory transition value. We should
* free the copied value. This boolean is used to determine if
* final_aggTransValue should be freed.
*/
bool final_aggShouldFree;
/* frame does not require buffering and complexity of invokeWindowFuncs() */
bool trivial_frame;
/* The index for the intermediate value of this function when serializing
* its value along with others before storing them into frame buffers.
*
* The initial value is -1.
*/
int serial_index;
/*
* The total number of not NULL arguments for this function
* so far.
*/
uint64 numNotNulls;
} WindowStatePerFunctionData;
#define FRAME_TRAIL_ROWS 0
#define FRAME_TRAIL_RANGE 1
#define FRAME_LEAD_ROWS 2
#define FRAME_LEAD_RANGE 3
#define EDGE_IS_BOUND(e) \
(e->kind == WINDOW_BOUND_PRECEDING || \
e->kind == WINDOW_BOUND_FOLLOWING || \
e->kind == WINDOW_DELAYED_BOUND_FOLLOWING || \
e->kind == WINDOW_DELAYED_BOUND_PRECEDING)
#define EDGE_IS_CURRENT_ROW(e) \
(e->kind == WINDOW_CURRENT_ROW)
#define EDGE_IS_DELAYED(e) \
(e->kind == WINDOW_DELAYED_BOUND_FOLLOWING || \
e->kind == WINDOW_DELAYED_BOUND_PRECEDING)
#define EDGE_IS_BOUND_FOLLOWING(e) \
(e->kind == WINDOW_BOUND_FOLLOWING || \
e->kind == WINDOW_DELAYED_BOUND_FOLLOWING)
#define EDGE_IS_BOUND_PRECEDING(e) \
(e->kind == WINDOW_BOUND_PRECEDING || \
e->kind == WINDOW_DELAYED_BOUND_PRECEDING)
#define EDGE_IS_DELAYED_BOUND(e) \
(e->kind == WINDOW_DELAYED_BOUND_PRECEDING || \
e->kind == WINDOW_DELAYED_BOUND_FOLLOWING)
#define EDGE_EQ_CURRENT_ROW(level_state, wstate, e, is_lead) \
(EDGE_IS_CURRENT_ROW(e) || \
((EDGE_IS_BOUND_FOLLOWING(e) || \
EDGE_IS_BOUND_PRECEDING(e)) && \
((!is_lead && \
(((level_state)->is_rows && (level_state)->trail_rows == 0) || \
(!(level_state)->is_rows && exec_eq_exprstate((wstate), \
(level_state)->trail_range_eq_expr)))) || \
(is_lead && \
(((level_state)->is_rows && (level_state)->lead_rows == 0) || \
(!(level_state)->is_rows && exec_eq_exprstate((wstate), \
(level_state)->lead_range_eq_expr)))))))
/****************************************************
* Window Input Buffer and its APIs
***************************************************/
/*
* WindowInputBuffer: this buffer holds input tuples that fit in the
* partition which is currently processed by the window node. This
* buffer also stores the first tuple in the next partition, which
* is always the last tuple in the buffer.
*/
typedef struct WindowInputBufferData
{
NTupleStore *tuplestore;
NTupleStoreAccessor *writer;
/* The reader that is always pointed to the previous current_row */
NTupleStoreAccessor *current_row_reader;
/* The reader that reads tuples in the buffer */
NTupleStoreAccessor *reader;
/* Indicate if the last tuple in the buffer is the tuple that
* breaks the partition key.
*/
bool part_break;
/* The total number of tuples in the store. */
int64 num_tuples;
} WindowInputBufferData;
static void resetInputBuffer(WindowState *wstate);
static void freeInputBuffer(WindowState *wstate);
static void trimInputBuffer(WindowInputBuffer buffer);
/********************************************************
* Window Frame Buffer and its APIs
*******************************************************/
/*
* WindowFrameBuffer: this buffer holds intermediate aggregate values
* or input arguments in order to generate the final aggregates for
* the next output row.
*
* This buffer will trim old values when they are outside the lower bound
* of the given frame provided to this buffer. This frame can be a RANGE
* frame or a ROW frame. Note that if this frame is a RANGE frame, the
* actual number of values in the buffer may vary over time.
*
* This buffer is designed to be one for each key level. We could
* also share one buffer among key levels if they have the same list
* functions.
*/
typedef struct WindowFrameBufferData
{
NTupleStore *tuplestore;
NTupleStoreAccessor *writer;
/* This reader is used to scan through the buffer to compute the final
* result.
*/
NTupleStoreAccessor *reader;
/* Indicate if this buffer is for a RANGE frame or a ROWS frame. */
bool is_rows;
/*
* The number of rows before and after the current_row.
* Note that 'num_rows_after' counts the current_row.
*
* These two counters are used to adjust frame edges for
* frames with expressions.
*
* Currently, these two counters do not be modified after a trim
* operation.
*/
long int num_rows_before;
long int num_rows_after;
/* The trailing and leading number of rows from current_row if
* this frame is a ROW frame.
*/
long int trail_rows;
long int lead_rows;
/* The trailing and leading range from current_row if this frame
* is a RANGE frame.
*/
Datum trail_range;
Datum lead_range;
/* The reader that always points to the current_row. */
NTupleStoreAccessor *current_row_reader;
/* The accessor that defines the point before which all values
* in the buffer can be trimmed.
*/
NTupleStoreAccessor *trim_reader;
/* Pointer to the level state. Information about
* window functions can be found here.
*/
WindowStatePerLevel level_state;
} WindowFrameBufferData;
typedef WindowFrameBufferData *WindowFrameBuffer;
static WindowFrameBuffer createRangeFrameBuffer(Datum trail_range,
Datum lead_range,
int bytes);
static WindowFrameBuffer createRowsFrameBuffer(long int trail_rows,
long int lead_rows,
int bytes);
static WindowFrameBuffer resetFrameBuffer(WindowFrameBuffer buffer);
static void appendToFrameBuffer(WindowStatePerLevel level_state,
WindowState *wstate,
bool last_peer);
static void trimFrameBuffer(WindowFrameBuffer buffer);
static void incrementCurrentRow(WindowFrameBuffer buffer, WindowState *wstate);
static bool hasEnoughDataInRange(WindowFrameBuffer buffer,
WindowStatePerLevel level_state,
WindowState *wstate,
Datum tail_range,
Datum lead_range);
static bool hasEnoughDataInRows(WindowFrameBuffer buffer,
WindowStatePerLevel level_state,
WindowState *wstate,
long int trail_rows,
long int lead_rows);
static void computeFrameValue(WindowStatePerLevel level_state,
WindowState *wstate,
NTupleStoreAccessor *trail_reader,
NTupleStoreAccessor *lead_reader);
static void createFrameBuffers(WindowState *wstate);
static void resetFrameBuffers(WindowState *wstate);
static void resetTransValues(WindowStatePerLevel level_state,
WindowState *wstate);
static void freeFrameBuffer(WindowFrameBuffer buffer);
static void freeFrameBuffers(WindowState *wstate);
/* the functions for the Window node */
static WindowState *makeWindowState(Window *window, EState *estate);
static void initializePartition(WindowState *wstate);
static bool checkOutputReady(WindowState *wstate);
static TupleTableSlot *fetchTupleSlotThroughBuf(WindowState *wstate);
static int initFcinfo(WindowRefExprState *wrxstate, FunctionCallInfoData *fcinfo,
WindowStatePerFunction funcstate, ExprContext *econtext,
bool check_nulls);
static void adjustEdges(WindowFrameBuffer buffer, WindowState *wstate);
static TupleTableSlot *fetchCurrentRow(WindowState *wstate);
static TupleTableSlot *fetchTupleSlotThroughBuf(WindowState *wstate);
static void processTupleSlot(WindowState *wstate, TupleTableSlot *slot, bool last_peer);
static bool getCurrentValue(NTupleStoreAccessor *reader,
WindowStatePerLevel level_state,
FrameBufferEntry *entry_buf);
static bool cmp_deformed_tuple(Datum *a, bool *a_nulls, Datum *b, bool *b_nulls,
bool *asc_cols, int ncols,
FmgrInfo *ltfuncs, FmgrInfo *eqfuncs,
MemoryContext evalContext, bool is_equal);
static FmgrInfo *get_ltfuncs(TupleDesc tupdesc, int numCols, AttrNumber *matchColIdx);
static void advanceKeyLevelState(WindowState *wstate, int min_level);
static void init_frames(WindowState *wstate);
static void deform_window_tuple(TupleTableSlot *slot, int nattrs, AttrNumber *attnums,
Datum *values, bool *nulls, WindowState *wstate);
static void add_tuple_to_trans(WindowStatePerFunction funcstate, WindowState *wstate,
ExprContext *econtext, bool check_nulls);
static bool hasTuplesInFrame(WindowStatePerLevel level_state,
WindowState *wstate);
static Datum last_value_internal(WindowRefExprState *wrxstate, bool *isnull);
static Datum first_value_internal(WindowRefExprState *wrxstate, bool *isnull);
static FrameBufferEntry *createFrameBufferEntry(WindowStatePerLevel level_state);
static void freeFrameBufferEntry(FrameBufferEntry *entry);
static void advanceEdgeForRange(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
ExprState *edge_expr,
ExprState *edge_range_expr,
NTupleStoreAccessor *edge_reader,
bool is_lead_edge);
static void forwardEdgeForRange(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
ExprState *edge_expr,
ExprState *edge_range_expr,
NTupleStoreAccessor *edge_reader,
bool is_lead_edge);
static bool checkLastRowForEdge(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
NTupleStoreAccessor *edge_reader,
Datum new_edge_value,
bool new_edge_value_isnull,
bool is_lead_edge);
static ExprState *make_eq_exprstate(WindowState *wstate,
Expr *expr1, Expr *expr2);
static bool exec_eq_exprstate(WindowState *wstate, ExprState *eq_exprstate);
static void setEmptyFrame(WindowStatePerLevel level_state,
WindowState *wstate);
/*
* initFrameBuffer -- initialize the frame buffer.
*
* Create the tuplestore and all accessors.
*/
static void
initFrameBuffer(WindowFrameBuffer buffer, int bytes)
{
buffer->tuplestore = ntuplestore_create(bytes);
/* Create accessors. */
buffer->writer = ntuplestore_create_accessor(buffer->tuplestore, true);
buffer->reader = ntuplestore_create_accessor(buffer->tuplestore, false);
buffer->current_row_reader =
ntuplestore_create_accessor(buffer->tuplestore, false);
buffer->trim_reader =
ntuplestore_create_accessor(buffer->tuplestore, false);
buffer->num_rows_before = buffer->num_rows_after = 0;
}
/*
* createRangeFrameBuffer -- create a new WindowFrameBuffer of the RANGE type.
*/
static WindowFrameBuffer
createRangeFrameBuffer(Datum trail_range, Datum lead_range, int bytes)
{
WindowFrameBuffer buffer =
(WindowFrameBuffer) palloc0(sizeof(WindowFrameBufferData));
buffer->is_rows = false;
buffer->trail_range = trail_range;
buffer->lead_range = lead_range;
initFrameBuffer(buffer, bytes);
return buffer;
}
/*
* createRowsFrameBuffer -- create a new WindowFrameBuffer of the ROWS type.
*/
static WindowFrameBuffer
createRowsFrameBuffer(long int trail_rows, long int lead_rows, int bytes)
{
WindowFrameBuffer buffer =
(WindowFrameBuffer) palloc0(sizeof(WindowFrameBufferData));
buffer->is_rows = true;
buffer->trail_rows = trail_rows;
buffer->lead_rows = lead_rows;
initFrameBuffer(buffer, bytes);
return buffer;
}
/*
* createFrameBuffers -- create frame buffers for all key levels inside
* a given WindowState.
*/
static void
createFrameBuffers(WindowState *wstate)
{
int level;
int bytes;
if (wstate->numlevels <= 0)
return;
bytes = ((PlanStateOperatorMemKB((PlanState *) wstate) * 1024L)/2) / wstate->numlevels;
for (level=0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
Assert(level_state->frame_buffer == NULL);
if (level_state->is_rows)
level_state->frame_buffer =
createRowsFrameBuffer(level_state->trail_rows,
level_state->lead_rows,
bytes);
else
level_state->frame_buffer =
createRangeFrameBuffer(level_state->trail_range,
level_state->lead_range,
bytes);
level_state->trail_reader =
ntuplestore_create_accessor(level_state->frame_buffer->tuplestore, false);
level_state->lead_reader =
ntuplestore_create_accessor(level_state->frame_buffer->tuplestore, false);
level_state->frame_buffer->level_state = level_state;
}
}
/*
* resetFrameBuffer -- reset or initialize a window frame buffer.
*
* Reset the tuplestore in the buffer, and re-create all accessors.
*
* The input argument 'buffer' can not be NULL.
*/
static WindowFrameBuffer
resetFrameBuffer(WindowFrameBuffer buffer)
{
Assert(buffer != NULL);
/* destroy all accessors */
ntuplestore_destroy_accessor(buffer->writer);
ntuplestore_destroy_accessor(buffer->reader);
ntuplestore_destroy_accessor(buffer->current_row_reader);
ntuplestore_destroy_accessor(buffer->trim_reader);
ntuplestore_reset(buffer->tuplestore);
/* (Re)create accessors. */
buffer->writer = ntuplestore_create_accessor(buffer->tuplestore, true);
buffer->reader = ntuplestore_create_accessor(buffer->tuplestore, false);
buffer->current_row_reader =
ntuplestore_create_accessor(buffer->tuplestore, false);
buffer->trim_reader =
ntuplestore_create_accessor(buffer->tuplestore, false);
buffer->num_rows_before = buffer->num_rows_after = 0;
return buffer;
}
/*
* resetFrameBuffers -- reset all frame buffers in a WindowState.
*/
static void
resetFrameBuffers(WindowState *wstate)
{
int level;
for (level=0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
if (level_state->frame_buffer)
{
/* destroy its accessors */
if (level_state->trail_reader)
ntuplestore_destroy_accessor(level_state->trail_reader);
if (level_state->lead_reader)
ntuplestore_destroy_accessor(level_state->lead_reader);
level_state->frame_buffer = resetFrameBuffer(level_state->frame_buffer);
/* (Re)create accessors */
level_state->trail_reader =
ntuplestore_create_accessor(level_state->frame_buffer->tuplestore, false);
level_state->lead_reader =
ntuplestore_create_accessor(level_state->frame_buffer->tuplestore, false);
}
level_state->num_trail_rows = 0;
level_state->num_lead_rows = 0;
level_state->lead_ready = false;
}
}
/*
* freeFrameBuffer -- release the space for a given WindowFrameBuffer.
*/
static void
freeFrameBuffer(WindowFrameBuffer buffer)
{
if (buffer == NULL)
return;
ntuplestore_destroy_accessor(buffer->writer);
ntuplestore_destroy_accessor(buffer->reader);
ntuplestore_destroy_accessor(buffer->current_row_reader);
ntuplestore_destroy(buffer->tuplestore);
pfree(buffer);
}
/*
* freeFrameBuffers -- release the space for all frame buffers.
*/
static void
freeFrameBuffers(WindowState *wstate)
{
int level;
for (level=0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
if (level_state->frame_buffer)
{
/* destroy its accessors */
if (level_state->trail_reader)
ntuplestore_destroy_accessor(level_state->trail_reader);
if (level_state->lead_reader)
ntuplestore_destroy_accessor(level_state->lead_reader);
freeFrameBuffer(level_state->frame_buffer);
level_state->frame_buffer = NULL;
}
}
}
/*
* ensureSpace -- ensure that there is enough space in the buffer.
*
* This function returns the new written position if the array is
* entended.
*/
static inline char *
ensureSpace(char **p_serial_entry, Size *p_max_size,
char *written_pos, Size written_len)
{
Size current_len = written_pos - *p_serial_entry;
while (current_len + written_len > *p_max_size)
{
*p_max_size += FRAMEBUFFER_ENTRY_SIZE;
*p_serial_entry = repalloc(*p_serial_entry, *p_max_size);
}
return (*p_serial_entry + current_len);
}
/*
* serializeValue -- serialize a given value (Datum) into a char array.
*
* The start writing position in the given char array is specified by
* 'start_pos'. If this value is too big to fit into the available
* space in the array, the size of the array is increased in this
* function.
*
* The return pointer points to the next char position in the
* given char array after writing the serialized string of the
* given value.
*/
static char *
serializeValue(Datum value, bool isnull,
bool byvalue, int16 typelen,
char **p_serial_entry, Size *p_max_size, Size *p_len,
Size start_pos)
{
char *written_pos = (*p_serial_entry) + start_pos;
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, sizeof(bool));
memcpy(written_pos, &isnull, sizeof(bool));
written_pos += sizeof(bool); /* isnull col */
if (!isnull)
{
if (byvalue)
{
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, sizeof(Datum));
memcpy(written_pos, &value, sizeof(Datum));
written_pos += sizeof(Datum);
}
else
{
Size real_size = datumGetSize(value, byvalue, typelen);
/*
* The pointer needs to be properly aligned since
* the deserializer will not make a copy for this Datum, but
* simply convert this pointer address to a Datum.
*/
Size alignmentBytes = ((char *)MAXALIGN(written_pos)) - written_pos;
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, real_size + alignmentBytes);
written_pos += alignmentBytes;
Assert(DatumGetPointer(value) != NULL);
memcpy(written_pos,
DatumGetPointer(value),
real_size);
written_pos += real_size;
}
}
return written_pos;
}
/*
* serializeFuncs -- serialize values for all functions in a given
* level and put it into a given space, starting from 'start_pos'.
*/
static void
serializeFuncs(WindowStatePerLevel level_state,
ExprContext *econtext,
char **p_serial_entry, Size *p_max_size, Size *p_len,
Size start_pos)
{
ListCell *lc;
char *written_pos = (*p_serial_entry) + start_pos;
int serial_index = -1;
/* the value for each function. */
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction) lfirst(lc);
if (funcstate->trivial_frame)
{
/* ignore trivial window functions */
continue;
}
else if (funcstate->winpeercount)
{
/* functions that require peer counts. */
int16 typelen;
bool byvalue;
get_typlenbyval(CUME_DIST_PRELIM_TYPE,
&typelen, &byvalue);
written_pos =
serializeValue(funcstate->win_value,
funcstate->win_value_is_null,
byvalue, typelen,
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
}
else if (IS_LEAD_LAG(funcstate->wrxstate->winkind) ||
IS_FIRST_LAST(funcstate->wrxstate->winkind))
{
WindowRefExprState *wrxstate;
int nargs;
int argno = 1;
wrxstate = funcstate->wrxstate;
nargs = list_length(wrxstate->args);
Assert(2 <= nargs && nargs <= 4);
written_pos =
serializeValue(funcstate->aggTransValue,
funcstate->aggTransValueIsNull,
wrxstate->argtypbyval[argno],
wrxstate->argtyplen[argno],
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
}
/* the function which has an inverse preliminary function or
* preliminary function.
*/
else if (OidIsValid(funcstate->invprelimfn_oid) ||
OidIsValid(funcstate->prelimfn_oid))
{
/*
* Store number of not NULLS when the function has a
* inverse preliminary function.
*/
if (OidIsValid(funcstate->invprelimfn_oid))
{
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, sizeof(uint64));
memcpy(written_pos, &funcstate->numNotNulls, sizeof(uint64));
written_pos += sizeof(uint64);
}
written_pos =
serializeValue(funcstate->aggTransValue,
funcstate->aggTransValueIsNull,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
}
else
{
/* the general case */
WindowRefExprState *winref_state = funcstate->wrxstate;
ListCell *ref_lc;
Oid typid;
bool byval;
int16 arglen;
/* Store the input arguments */
foreach (ref_lc, winref_state->args)
{
ExprState *argstate = (ExprState *)lfirst(ref_lc);
Datum value;
bool isnull;
MemoryContext oldctx;
typid = exprType((Node *)argstate->expr);
get_typlenbyval(typid, &arglen, &byval);
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
value = ExecEvalExpr(argstate, econtext, &isnull, NULL);
MemoryContextSwitchTo(oldctx);
written_pos =
serializeValue(value, isnull,
byval, arglen,
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
}
}
serial_index++;
funcstate->serial_index = serial_index;
}
*p_len = written_pos - *p_serial_entry;
Assert(*p_len <= *p_max_size);
}
/*
* deserializeValue -- deserialize a Datum value from a given char array.
*
* This function reads the char array as specified by 'read_pos', and
* deserialize one serialized Datum values in this array into a given
* WindowValue. If a Datum is passed by reference, this function does _not_
* make a copy of this Datum, but simply set the pointer to the
* corresponding address of the given char array.
*
* The return pointer points to the char position in the given char array
* after reading one seralized string of WindowValue.
*
* Note that 'value' is allocated by the caller.
*/
static char *
deserializeValue(char *read_pos, WindowValue *value,
bool byvalue, int16 typelen)
{
memcpy(&(value->valueIsNull), read_pos, sizeof(bool));
read_pos += sizeof(bool);
if (!value->valueIsNull)
{
if (byvalue)
{
memcpy(&(value->value), read_pos, sizeof(Datum));
read_pos += sizeof(Datum);
}
else
{
/*
* Make read_pos properly aligned, since it is the start
* address of a Datum that is passed by reference.
*/
read_pos = (char *)MAXALIGN(read_pos);
value->value = PointerGetDatum(read_pos);
Size valueLen = datumGetSize(value->value, byvalue, typelen);
read_pos += valueLen;
}
}
else
value->value = 0;
return read_pos;
}
/*
* deserializeFuncs -- deserialize Datum values from a given char array
* for all functions in a key level.
*/
static List *
deserializeFuncs(WindowStatePerLevel level_state,
FrameBufferEntry *entry_buf,
char *start_pos,
Size len)
{
List *func_values = entry_buf->func_values;
ListCell *lc;
char *read_pos = start_pos;
/* the value for each function. */
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction) lfirst(lc);
WindowValue *value;
/* skip window functions with trivial frames */
if (funcstate->trivial_frame)
continue;
/* functions that require peer counts. */
else if (funcstate->winpeercount)
{
int16 typelen;
bool byvalue;
get_typlenbyval(CUME_DIST_PRELIM_TYPE,
&typelen, &byvalue);
value = list_nth(func_values, funcstate->serial_index);
read_pos = deserializeValue(read_pos, value, byvalue, typelen);
}
else if (IS_LEAD_LAG(funcstate->wrxstate->winkind) ||
IS_FIRST_LAST(funcstate->wrxstate->winkind))
{
WindowRefExprState *wrxstate;
int nargs;
int argno = 1;
wrxstate = funcstate->wrxstate;
nargs = list_length(wrxstate->args);
Assert(2 <= nargs && nargs <= 4);
value = list_nth(func_values, funcstate->serial_index);
read_pos = deserializeValue(read_pos, value,
wrxstate->argtypbyval[argno],
wrxstate->argtyplen[argno]);
}
else if (OidIsValid(funcstate->invprelimfn_oid) ||
OidIsValid(funcstate->prelimfn_oid))
{
/*
* The function which has an inverse preliminary function.
*/
value = list_nth(func_values, funcstate->serial_index);
/* Read number of not nulls if this function has a preliminary
* function.
*/
if (OidIsValid(funcstate->invprelimfn_oid))
{
memcpy(&(value->numNotNulls), read_pos, sizeof(uint64));
read_pos += sizeof(uint64);
}
read_pos = deserializeValue(read_pos, value,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen);
}
else
{
/* the general case */
WindowRefExprState *winref_state = funcstate->wrxstate;
ListCell *ref_lc;
Oid typid;
bool byval;
int16 arglen;
int argno = 0;
foreach (ref_lc, winref_state->args)
{
ExprState *argstate = (ExprState *)lfirst(ref_lc);
typid = exprType((Node *)argstate->expr);
get_typlenbyval(typid, &arglen, &byval);
value = list_nth(func_values, funcstate->serial_index + argno);
read_pos = deserializeValue(read_pos, value, byval, arglen);
argno++;
}
}
}
Assert((read_pos - start_pos) == len);
return func_values;
}
/*
* serializeEntry -- construct a serialized version of function values for
* a given key level using the given char array.
*
* If the data generated by this serialization have more bytes than the space
* provided by the given char array, the size of this array is increased
* in this function.
*/
static void
serializeEntry(WindowStatePerLevel level_state,
ExprContext *econtext,
char **p_serial_entry,
Size *p_max_size,
Size *p_len)
{
int key_no;
char *written_pos;
TupleTableSlot *slot = econtext->ecxt_scantuple;
*p_len = 0;
Assert(*p_serial_entry != NULL);
/* We rely on the address of the char array is maxaligned. */
Assert(*p_serial_entry == (char *)MAXALIGN(*p_serial_entry));
written_pos = *p_serial_entry;
if (!level_state->is_rows)
{
/* Copy the keys */
for (key_no=0; key_no < level_state->numSortCols; key_no++)
{
AttrNumber attnum = level_state->sortColIdx[key_no];
Datum key;
bool isnull;
key = slot_getattr(slot, attnum, &isnull);
written_pos = ensureSpace(p_serial_entry, p_max_size, written_pos,
sizeof(bool));
memcpy(written_pos, &(isnull), sizeof(bool));
written_pos += sizeof(bool);
if (!isnull)
{
Size keylen = 0;
Size alignmentBytes = 0;
if (level_state->col_typbyvals[key_no])
{
keylen = sizeof(Datum);
}
else
{
alignmentBytes = ((char *)MAXALIGN(written_pos)) - written_pos;
keylen = datumGetSize(key, level_state->col_typbyvals[key_no],
level_state->col_typlens[key_no]);
}
written_pos = ensureSpace(p_serial_entry, p_max_size, written_pos,
keylen + alignmentBytes);
written_pos += alignmentBytes;
if (level_state->col_typbyvals[key_no])
memcpy(written_pos, &(key), keylen);
else
memcpy(written_pos, DatumGetPointer(key), keylen);
written_pos += keylen;
}
}
}
/* Copy function values */
serializeFuncs(level_state,
econtext,
p_serial_entry,
p_max_size,
p_len,
(written_pos - *p_serial_entry));
}
/*
* deserializeEntry -- deserialize a buffer entry.
*/
static FrameBufferEntry *
deserializeEntry(WindowStatePerLevel level_state,
FrameBufferEntry *entry_buf,
char *serial_entry, Size len)
{
FrameBufferEntry *entry = entry_buf;
char *read_pos = serial_entry;
Size keylen = 0;
int key_no;
/* We rely on the address of serial_entry is maxaligned. */
Assert(serial_entry == (char *)MAXALIGN(serial_entry));
if (!level_state->is_rows)
{
for (key_no=0; key_no < level_state->numSortCols; key_no++)
{
Size curr_keylen = 0;
memcpy(&(entry->nulls[key_no]), read_pos, sizeof(bool));
read_pos += sizeof(bool);
keylen += sizeof(bool);
if (!entry->nulls[key_no])
{
if (level_state->col_typbyvals[key_no])
{
memcpy(&(entry->keys[key_no]), read_pos, sizeof(Datum));
curr_keylen = sizeof(Datum);
}
else
{
Size alignmentBytes = ((char *)MAXALIGN(read_pos)) - read_pos;
read_pos += alignmentBytes;
keylen += alignmentBytes;
curr_keylen = datumGetSize(PointerGetDatum(read_pos),
level_state->col_typbyvals[key_no],
level_state->col_typlens[key_no]);
entry->keys[key_no] = PointerGetDatum(read_pos);
}
read_pos += curr_keylen;
keylen += curr_keylen;
}
}
}
/* deserial function values */
entry->func_values = deserializeFuncs(level_state, entry_buf,
read_pos, len - keylen);
return entry;
}
/*
* adjustEdgesAfterAppend -- adjust the trailing and leading edges for
* a given level_state after appending a value into its frame buffer.
*/
static void
adjustEdgesAfterAppend(WindowStatePerLevel level_state,
WindowState *wstate,
bool last_peer)
{
WindowFrameBuffer buffer = level_state->frame_buffer;
ExprContext *econtext = wstate->ps.ps_ExprContext;
TupleTableSlot *inserting_tuple = econtext->ecxt_scantuple;
/* If the current_row_reader for the buffer is not set, set it to point to
* the last row.
*/
if (!ntuplestore_acc_tell(buffer->current_row_reader, NULL))
ntuplestore_acc_seek_last(buffer->current_row_reader);
/*
* Adjust the trailing edge if it is "ROWS x FOLLOWING" or
* it is a delayed edge.
*/
if (!(level_state->empty_frame &&
EDGE_IS_DELAYED(level_state->frame->trail)))
{
if (level_state->is_rows &&
EDGE_IS_BOUND(level_state->frame->trail) &&
level_state->trail_rows > 0)
{
if (!ntuplestore_acc_tell(level_state->trail_reader, NULL))
{
if (!last_peer)
ntuplestore_acc_seek_last(level_state->trail_reader);
}
else if (level_state->num_trail_rows < level_state->trail_rows - 1)
{
ntuplestore_acc_advance(level_state->trail_reader, 1);
level_state->num_trail_rows++;
}
}
else if (!level_state->is_rows &&
(EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
EDGE_IS_DELAYED(level_state->frame->trail)))
{
econtext->ecxt_scantuple = wstate->curslot;
forwardEdgeForRange(level_state, wstate,
level_state->frame->trail,
level_state->trail_expr,
level_state->trail_range_expr,
level_state->trail_reader,
false);
econtext->ecxt_scantuple = inserting_tuple;
}
}
if (!(level_state->empty_frame &&
EDGE_IS_DELAYED(level_state->frame->lead)))
{
/* If the leading edge is "x PRECEDING" and x > 0, simply return. */
if (EDGE_IS_BOUND_PRECEDING(level_state->frame->lead) &&
ntuplestore_acc_tell(level_state->lead_reader, NULL))
{
level_state->lead_ready = true;
return;
}
/*
* Adjust the leading edge when the edge is "x FOLLOWING" or
* "CURRENT ROW".
*/
if (level_state->is_rows)
{
if (level_state->num_lead_rows < level_state->lead_rows)
{
Assert(!ntuplestore_acc_tell(level_state->lead_reader, NULL));
if (!last_peer)
level_state->num_lead_rows++;
}
else if ((EDGE_IS_BOUND(level_state->frame->lead) ||
EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true)) &&
level_state->num_lead_rows == level_state->lead_rows)
{
/* If the leading edge is not set, set it here. */
if (!ntuplestore_acc_tell(level_state->lead_reader, NULL))
ntuplestore_acc_seek_last(level_state->lead_reader);
}
else if (!EDGE_IS_BOUND(level_state->frame->lead) &&
!EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
{
if (!ntuplestore_acc_tell(level_state->lead_reader, NULL))
ntuplestore_acc_seek_last(level_state->lead_reader);
else
{
bool found = ntuplestore_acc_advance(level_state->lead_reader, 1);
if (found)
level_state->num_lead_rows++;
}
}
}
else
{
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead,
true))
{
bool found;
NTupleStorePos pos;
found = ntuplestore_acc_tell(buffer->current_row_reader, &pos);
Assert(found);
ntuplestore_acc_seek(level_state->lead_reader, &pos);
level_state->lead_ready = true;
}
else if (EDGE_IS_BOUND(level_state->frame->lead))
{
econtext->ecxt_scantuple = wstate->curslot;
forwardEdgeForRange(level_state, wstate,
level_state->frame->lead,
level_state->lead_expr,
level_state->lead_range_expr,
level_state->lead_reader,
true);
econtext->ecxt_scantuple = inserting_tuple;
}
else
{
ntuplestore_acc_advance(level_state->lead_reader, 1);
level_state->lead_ready = last_peer;
}
}
}
}
/*
* appendToFrameBuffer -- append the intermediate values stored in level
*/
static void
appendToFrameBuffer(WindowStatePerLevel level_state,
WindowState *wstate,
bool last_peer)
{
Size len;
WindowFrameBuffer buffer = level_state->frame_buffer;
ExprContext *econtext = wstate->ps.ps_ExprContext;
Assert(buffer->is_rows == level_state->is_rows);
MemSet(wstate->serial_array, 0, wstate->max_size);
serializeEntry(level_state, econtext,
&(wstate->serial_array), &(wstate->max_size), &len);
ntuplestore_acc_put_data(buffer->writer, (void*)(wstate->serial_array), len);
adjustEdgesAfterAppend(level_state, wstate, last_peer);
buffer->num_rows_after++;
}
/*
* trimFrameBuffer -- trim the old values in the buffer.
*/
static void
trimFrameBuffer(WindowFrameBuffer buffer)
{
NTupleStorePos pos;
bool found;
Assert(buffer != NULL);
found = ntuplestore_acc_tell(buffer->trim_reader, &pos);
if (!found)
return;
ntuplestore_trim(buffer->tuplestore, &pos);
}
/*
* getCurrentValue -- obtain the value at the current position for
* an accessor.
*
* The value is stored in the given 'entry_buf'. Note that 'entry_buf'
* is allocated by the caller.
*
* If the current position of the accessor is an invalid position, this
* function returns false.
*/
static bool
getCurrentValue(NTupleStoreAccessor *reader,
WindowStatePerLevel level_state,
FrameBufferEntry *entry_buf)
{
void *data;
int len;
bool found = ntuplestore_acc_current_data(reader, &data, &len);
if (!found)
return false;
entry_buf = deserializeEntry(level_state, entry_buf, data, len);
return true;
}
/*
* incrementCurrentRow -- increment the current_row in the buffer.
*/
static void
incrementCurrentRow(WindowFrameBuffer buffer,
WindowState *wstate)
{
WindowStatePerLevel level_state = buffer->level_state;
/* Increment the current_row_reader */
ntuplestore_acc_advance(buffer->current_row_reader, 1);
buffer->num_rows_before++;
if (buffer->num_rows_after > 0)
buffer->num_rows_after--;
if (level_state->is_rows)
{
level_state->num_trail_rows--;
level_state->num_lead_rows--;
}
if (!buffer->level_state->trivial_frames_only)
/* Adjust all leading and trailig edges */
adjustEdges(buffer, wstate);
/* Set the trim_reader, and trim the buffer */
if (!level_state->has_delay_bound)
{
NTupleStorePos pos;
bool found = ntuplestore_acc_tell(buffer->current_row_reader, &pos);
if (found)
ntuplestore_acc_seek(buffer->trim_reader, &pos);
else
ntuplestore_acc_set_invalid(buffer->trim_reader);
if (ntuplestore_acc_tell(level_state->trail_reader, NULL) &&
((ntuplestore_acc_tell(buffer->trim_reader, NULL) &&
ntuplestore_acc_is_before(level_state->trail_reader,
buffer->trim_reader)) ||
!ntuplestore_acc_tell(buffer->trim_reader, NULL)))
{
found = ntuplestore_acc_tell(level_state->trail_reader, &pos);
Assert(found);
ntuplestore_acc_seek(buffer->trim_reader, &pos);
}
if (!ntuplestore_acc_tell(level_state->trail_reader, NULL))
ntuplestore_acc_set_invalid(buffer->trim_reader);
else
ntuplestore_acc_advance(buffer->trim_reader, -1);
trimFrameBuffer(buffer);
}
}
/*
* hasEnoughDataInRange -- return true if there is enough data in
* the buffer that satisfy a given value range.
*/
static bool
hasEnoughDataInRange(WindowFrameBuffer buffer,
WindowStatePerLevel level_state,
WindowState *wstate,
Datum trail_range, Datum lead_range)
{
if (level_state->empty_frame)
return true;
if (!EDGE_IS_BOUND(level_state->frame->lead) &&
!EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
return false;
return level_state->lead_ready;
}
/*
* hasEnoughDataInRows -- return true if there is enough data in the
* buffer that satisfy a given row range.
*/
static bool
hasEnoughDataInRows(WindowFrameBuffer buffer,
WindowStatePerLevel level_state,
WindowState *wstate,
long int trail_rows, long int lead_rows)
{
if (level_state->empty_frame)
return true;
if (!EDGE_IS_BOUND(level_state->frame->lead) &&
!EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
return false;
if (trail_rows == 0 &&
lead_rows == 0)
return true;
if (level_state->num_lead_rows >= lead_rows)
return true;
return false;
}
static FrameBufferEntry *
createFrameBufferEntry(WindowStatePerLevel level_state)
{
FrameBufferEntry *entry = NULL;
ListCell *lc;
entry = (FrameBufferEntry *)palloc0(sizeof(FrameBufferEntry));
if (!level_state->is_rows)
{
entry->keys = (Datum *)palloc0(level_state->numSortCols * sizeof(Datum));
entry->nulls = (bool *)palloc0(level_state->numSortCols * sizeof(bool));
}
entry->func_values = NIL;
/* Allocate the space for WindowValue */
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction) lfirst(lc);
WindowValue *value;
/* skip window functions with trivial frames */
if (funcstate->trivial_frame)
continue;
else if (funcstate->winpeercount ||
(IS_LEAD_LAG(funcstate->wrxstate->winkind) ||
IS_FIRST_LAST(funcstate->wrxstate->winkind)) ||
(OidIsValid(funcstate->invprelimfn_oid) ||
OidIsValid(funcstate->prelimfn_oid)))
{
value = (WindowValue *)palloc0(sizeof(WindowValue));
entry->func_values = lappend(entry->func_values, value);
}
else
{
WindowRefExprState *winref_state = funcstate->wrxstate;
int argno;
for(argno = 0; argno < list_length(winref_state->args); argno++)
{
value = (WindowValue *)palloc0(sizeof(WindowValue));
entry->func_values = lappend(entry->func_values, value);
}
}
}
return entry;
}
static void
freeFrameBufferEntry(FrameBufferEntry *entry)
{
if (entry->keys != NULL)
pfree(entry->keys);
if (entry->nulls != NULL)
pfree(entry->nulls);
if (entry->func_values != NULL)
list_free_deep(entry->func_values);
}
/*
* hasTuplesInFrame - Check if there are tuples between the trailing
* and leading frame edge for the current_row.
*
* This function returns true if there are such tuples. Otherwise,
* return false.
*/
static bool
hasTuplesInFrame(WindowStatePerLevel level_state,
WindowState *wstate)
{
bool has_tuples = true;
NTupleStorePos orig_pos;
bool trail_valid;
ExprContext *econtext = wstate->ps.ps_ExprContext;
if (level_state->empty_frame)
return false;
/*
* If both trail_reader and lead_reader point to the same
* position, there are no tuples within the frame edges.
*/
if (ntuplestore_acc_tell(level_state->trail_reader, NULL) &&
ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
(!ntuplestore_acc_is_before(level_state->lead_reader,
level_state->trail_reader) &&
!ntuplestore_acc_is_before(level_state->trail_reader,
level_state->lead_reader)))
return false;
/* Save the position for the trail_reader */
trail_valid = ntuplestore_acc_tell(level_state->trail_reader, &orig_pos);
if (trail_valid)
ntuplestore_acc_advance(level_state->trail_reader, 1);
else
{
/*
* When the leading edge and trailing edge are both RANGE x PRECEDING, and
* the trail_reader points to an invalid position but the lead_reader
* points to a valid position, it is possible that there are no
* tuples in the current frame.
*/
if (!level_state->is_rows &&
EDGE_IS_BOUND_PRECEDING(level_state->frame->lead) &&
EDGE_IS_BOUND_PRECEDING(level_state->frame->trail) &&
ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
!ntuplestore_acc_tell(level_state->trail_reader, NULL))
{
bool is_less = true;
MemoryContext oldctx;
Datum trail_edge_value;
bool trail_edge_value_isnull;
bool has_entry;
FrameBufferEntry *entry = level_state->curr_entry_buf;
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
trail_edge_value =
ExecEvalExpr(level_state->trail_range_expr, econtext,
&(trail_edge_value_isnull), NULL);
has_entry = getCurrentValue(level_state->lead_reader, level_state, entry);
Assert(has_entry);
is_less = cmp_deformed_tuple(entry->keys, entry->nulls,
&trail_edge_value,
&trail_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
false);
MemoryContextSwitchTo(oldctx);
return (!is_less);
}
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
ntuplestore_acc_seek_first(level_state->trail_reader);
}
/*
* When both trail_reader and lead_reader points to an entry
* in the frame buffer, we compare their position order.
*/
if (ntuplestore_acc_tell(level_state->trail_reader, NULL) &&
ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
ntuplestore_acc_is_before( level_state->lead_reader,
level_state->trail_reader))
has_tuples = false;
/*
* When the trail_reader does not point to any entry in the frame
* buffer, and the trailing edge is x FOLLOWING, it is possible that
* there are no tuples within frame edges for the current_row.
*/
else if (!ntuplestore_acc_tell(level_state->trail_reader, NULL) &&
EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail))
{
if (level_state->is_rows)
{
if (level_state->num_lead_rows < level_state->trail_rows)
has_tuples = false;
}
else
{
if (!ntuplestore_acc_tell(level_state->trail_reader, NULL) &&
!exec_eq_exprstate(wstate, level_state->trail_range_eq_expr))
{
Datum new_edge_value = 0;
bool new_edge_value_isnull = false;
bool in_trail_edge = false;
bool in_lead_edge = false;
MemoryContext oldctx;
has_tuples = false;
/* Check if the last row is within the edges */
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
new_edge_value =
ExecEvalExpr(level_state->trail_range_expr, econtext,
&(new_edge_value_isnull), NULL);
MemoryContextSwitchTo(oldctx);
in_trail_edge =
checkLastRowForEdge(level_state, wstate,
level_state->frame->trail,
level_state->trail_reader,
new_edge_value,
new_edge_value_isnull,
false);
in_lead_edge = true;
if (level_state->lead_range_expr != NULL)
{
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
new_edge_value =
ExecEvalExpr(level_state->lead_range_expr, econtext,
&(new_edge_value_isnull), NULL);
MemoryContextSwitchTo(oldctx);
in_lead_edge =
checkLastRowForEdge(level_state, wstate,
level_state->frame->lead,
level_state->lead_reader,
new_edge_value,
new_edge_value_isnull,
true);
}
if (in_trail_edge && in_lead_edge)
has_tuples = true;
}
}
}
/*
* When the lead_reader does not point to any entry in the frame buffer,
* and the leading edge is x PRECEDING, it means that there are
* no tuples within the frame edges for the current_row.
*/
else if (!ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
(EDGE_IS_BOUND_PRECEDING(level_state->frame->lead) &&
((level_state->is_rows && level_state->lead_rows != 0) ||
(!level_state->is_rows && !exec_eq_exprstate(wstate,
level_state->lead_range_eq_expr)))))
has_tuples = false;
/* Restore trail_reader */
if (trail_valid)
ntuplestore_acc_seek(level_state->trail_reader, &orig_pos);
else
ntuplestore_acc_set_invalid(level_state->trail_reader);
return has_tuples;
}
/*
* freeTransValue -- release the space allocated for the given transition value.
*/
static void
freeTransValue(Datum *transValue,
bool transTypeByVal,
bool *transValueIsNull,
bool *noTransValue,
bool shouldFree)
{
if (!shouldFree)
return;
if (!transTypeByVal && !(*noTransValue) && !(*transValueIsNull) &&
DatumGetPointer(*transValue) != NULL)
pfree(DatumGetPointer(*transValue));
*transValue = 0;
*transValueIsNull = true;
*noTransValue = true;
}
/*
* computeTransValuesThroughScan -- compute transition values
* for those functions in the given level whose aggregate values
* can only be computed through scanning through multiple entries
* in the frame buffer.
*/
static void
computeTransValuesThroughScan(WindowStatePerLevel level_state,
WindowState *wstate)
{
/* Indicate if the current frame contains tuples at all. */
bool has_tuples = true;
ListCell *lc;
bool has_curr_entry = false;
FrameBufferEntry *curr_entry = level_state->curr_entry_buf;
WindowValue *curr_value = NULL;
ExprContext *econtext = wstate->ps.ps_ExprContext;
FunctionCallInfoData fcinfo;
NTupleStorePos orig_pos;
has_tuples = hasTuplesInFrame(level_state, wstate);
/* Remember the original trail_reader position. */
ntuplestore_acc_tell(level_state->trail_reader, &orig_pos);
/* Since the trail_reader points to the value before the trailing
* edge, we advance the trail_reader by one.
*/
if (ntuplestore_acc_tell(level_state->trail_reader, NULL))
ntuplestore_acc_advance(level_state->trail_reader, 1);
else
{
if (!ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
(EDGE_IS_BOUND_PRECEDING(level_state->frame->lead) &&
((level_state->is_rows && level_state->lead_rows != 0) ||
(!level_state->is_rows && !exec_eq_exprstate(wstate,
level_state->lead_range_eq_expr)))))
has_tuples = false;
else
ntuplestore_acc_seek_first(level_state->trail_reader);
}
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction)
lfirst(lc);
/* Ignore those functions that do not require scanning
* multiple entries in the frame buffer.
*/
if (funcstate->trivial_frame ||
funcstate->winpeercount ||
(funcstate->isAgg && OidIsValid(funcstate->invprelimfn_oid)) ||
!funcstate->isAgg)
continue;
freeTransValue(&funcstate->final_aggTransValue,
funcstate->aggTranstypeByVal,
&funcstate->final_aggTransValueIsNull,
&funcstate->final_aggNoTransValue,
funcstate->final_aggShouldFree);
funcstate->final_aggTransValue =
datumCopyWithMemManager(0, funcstate->aggInitValue,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&(wstate->mem_manager));
funcstate->final_aggTransValueIsNull = funcstate->aggInitValueIsNull;
funcstate->final_aggNoTransValue = funcstate->aggInitValueIsNull;
funcstate->final_aggShouldFree = !funcstate->aggInitValueIsNull;
}
if (has_tuples)
{
bool include_last_agg = false;
while (ntuplestore_acc_tell(level_state->trail_reader, NULL))
{
if (ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
ntuplestore_acc_is_before(level_state->lead_reader,
level_state->trail_reader))
break;
has_curr_entry = getCurrentValue(level_state->trail_reader,
level_state, curr_entry);
Assert(has_curr_entry);
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction)
lfirst(lc);
FmgrInfo *fmgr_info;
/* Ignore those functions that do not require scanning
* multiple entries in the frame buffer.
*/
if (funcstate->trivial_frame ||
funcstate->winpeercount ||
(funcstate->isAgg &&
OidIsValid(funcstate->invprelimfn_oid)) ||
!funcstate->isAgg)
continue;
if (OidIsValid(funcstate->prelimfn_oid))
{
curr_value =
(WindowValue *)list_nth(curr_entry->func_values,
funcstate->serial_index);
Assert(curr_value);
fcinfo.arg[1] = curr_value->value;
fcinfo.argnull[1] = curr_value->valueIsNull;
fmgr_info = &(funcstate->prelimfn);
}
else
{
int argno;
for (argno=1; argno <= funcstate->numargs; argno++)
{
curr_value = (WindowValue *)
list_nth(curr_entry->func_values,
funcstate->serial_index + (argno-1));
Assert(curr_value);
fcinfo.arg[argno] = curr_value->value;
fcinfo.argnull[argno] = curr_value->valueIsNull;
}
fmgr_info = &(funcstate->transfn);
}
funcstate->final_aggTransValue =
invoke_agg_trans_func(fmgr_info,
fmgr_info->fn_nargs - 1,
funcstate->final_aggTransValue,
&funcstate->final_aggNoTransValue,
&(funcstate->final_aggTransValueIsNull),
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&fcinfo, (void *)wstate,
econtext->ecxt_per_tuple_memory,
&(wstate->mem_manager));
funcstate->final_aggShouldFree = true;
}
ntuplestore_acc_advance(level_state->trail_reader, 1);
}
/*
* Add the funcstate->aggTransValue if it is in the current
* frame.
*/
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
include_last_agg = true;
if (!EDGE_IS_BOUND(level_state->frame->lead) ||
EDGE_IS_BOUND_FOLLOWING(level_state->frame->lead) ||
EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
include_last_agg = true;
if (!ntuplestore_acc_tell(level_state->lead_reader, NULL) &&
level_state->agg_filled && include_last_agg)
{
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction)
lfirst(lc);
/* Ignore those functions that do not require scanning
* multiple entries in the frame buffer.
*/
if (funcstate->trivial_frame ||
funcstate->winpeercount ||
(funcstate->isAgg && OidIsValid(funcstate->invprelimfn_oid)) ||
!funcstate->isAgg)
continue;
if (OidIsValid(funcstate->prelimfn_oid))
{
fcinfo.arg[1] = funcstate->aggTransValue;
fcinfo.argnull[1] = funcstate->aggTransValueIsNull;
funcstate->final_aggTransValue =
invoke_agg_trans_func(&(funcstate->prelimfn),
funcstate->prelimfn.fn_nargs - 1,
funcstate->final_aggTransValue,
&funcstate->final_aggNoTransValue,
&(funcstate->final_aggTransValueIsNull),
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&fcinfo, (void *)wstate,
econtext->ecxt_per_tuple_memory,
&(wstate->mem_manager));
funcstate->final_aggShouldFree = true;
}
else
{
TupleTableSlot *slot = econtext->ecxt_scantuple;
bool found;
found = ntuplestore_acc_current_tupleslot(wstate->input_buffer->writer,
wstate->spare);
Assert(found);
econtext->ecxt_scantuple = wstate->spare;
add_tuple_to_trans(funcstate, wstate, econtext, false);
/* Reset back to its orginial value */
econtext->ecxt_scantuple = slot;
}
}
}
}
/* Reset the trail_reader to its original position. */
if (!ntuplestore_acc_seek(level_state->trail_reader, &orig_pos))
ntuplestore_acc_set_invalid(level_state->trail_reader);
}
/*
* computeFrameValue -- compute transition values for each function
* in a given key level from the data stored in its frame buffer.
*
* The 'trail_reader' and 'lead_reader' point to the positions of
* the trailing and leading edges.
*/
static void
computeFrameValue(WindowStatePerLevel level_state,
WindowState *wstate,
NTupleStoreAccessor *trail_reader,
NTupleStoreAccessor *lead_reader)
{
ListCell *lc;
ExprContext *econtext = wstate->ps.ps_ExprContext;
WindowFrameBuffer buffer = level_state->frame_buffer;
FunctionCallInfoData fcinfo;
FrameBufferEntry *trail_entry = level_state->trail_entry_buf;
FrameBufferEntry *lead_entry = level_state->lead_entry_buf;
FrameBufferEntry *curr_entry = level_state->curr_entry_buf;
bool has_trail_entry = false;
bool has_lead_entry = false;
bool has_curr_entry = false;
WindowValue *trail_value = NULL;
WindowValue *lead_value = NULL;
WindowValue *curr_value = NULL;
bool read_value = false;
bool read_edge_value = false;
bool require_scanning = false;
Assert(level_state->is_rows == buffer->is_rows);
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction) lfirst(lc);
/* Ignore ordinary window functions, except for functions that
* require peer counts.
*/
if (funcstate->trivial_frame)
continue;
/* Ignore functions that are cumulative aggregate functions. */
if (funcstate->cumul_frame)
continue;
/* functions that require peer counts. */
else if (funcstate->winpeercount)
{
/* If we didn't read the value pointed by current_row_reader,
* read it now.
*/
if (!read_value)
{
read_value = true;
has_curr_entry = getCurrentValue(buffer->current_row_reader,
level_state, curr_entry);
}
if (has_curr_entry)
curr_value = (WindowValue *)list_nth(curr_entry->func_values,
funcstate->serial_index);
if (curr_value != NULL)
{
funcstate->win_value = curr_value->value;
funcstate->win_value_is_null = curr_value->valueIsNull;
}
else
{
TupleTableSlot *slot = econtext->ecxt_scantuple;
ntuplestore_acc_current_tupleslot(wstate->input_buffer->writer,
wstate->spare);
econtext->ecxt_scantuple = wstate->spare;
add_tuple_to_trans(funcstate, wstate, econtext, false);
/* Reset back to its orginial value */
econtext->ecxt_scantuple = slot;
}
}
/* Aggregate functions with inverse preliminary functions. */
else if (funcstate->isAgg && OidIsValid(funcstate->invprelimfn_oid))
{
uint64 trail_num_not_nulls = 0;
uint64 lead_num_not_nulls = 0;
if (!read_edge_value)
{
read_edge_value = true;
if (level_state->is_rows)
{
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
level_state->num_lead_rows >= level_state->num_trail_rows)
{
has_trail_entry =
getCurrentValue(level_state->trail_reader,
level_state, trail_entry);
has_lead_entry =
getCurrentValue(level_state->lead_reader,
level_state, lead_entry);
}
}
else
{
has_trail_entry = getCurrentValue(level_state->trail_reader,
level_state, trail_entry);
/*
* If the leading edge is UNBOUNDED FOLLOWING, the value
* in the leading edge is funcstate->aggTransValue. We
* don't need to read from the buffer.
*/
if (level_state->frame->lead->kind !=
WINDOW_UNBOUND_FOLLOWING)
{
has_lead_entry =
getCurrentValue(level_state->lead_reader,
level_state, lead_entry);
}
}
}
if (!level_state->empty_frame && has_trail_entry)
{
Assert(list_length(trail_entry->func_values) >
funcstate->serial_index);
trail_value =
(WindowValue *)list_nth(trail_entry->func_values,
funcstate->serial_index);
}
if (!level_state->empty_frame && has_lead_entry)
{
Assert(list_length(lead_entry->func_values) >
funcstate->serial_index);
lead_value =
(WindowValue *)list_nth(lead_entry->func_values,
funcstate->serial_index);
}
if (trail_value != NULL)
{
fcinfo.arg[1] = trail_value->value;
fcinfo.argnull[1] = trail_value->valueIsNull;
trail_num_not_nulls = trail_value->numNotNulls;
}
else
{
fcinfo.arg[1] = funcstate->aggInitValue;
fcinfo.argnull[1] = funcstate->aggInitValueIsNull;
trail_num_not_nulls = 0;
}
freeTransValue(&funcstate->final_aggTransValue,
funcstate->aggTranstypeByVal,
&funcstate->final_aggTransValueIsNull,
&funcstate->final_aggNoTransValue,
funcstate->final_aggShouldFree);
if (lead_value != NULL || trail_value != NULL)
{
bool has_tuples = hasTuplesInFrame(level_state, wstate);
if (lead_value != NULL)
{
funcstate->final_aggTransValue =
datumCopyWithMemManager(0, lead_value->value,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&(wstate->mem_manager));
funcstate->final_aggTransValueIsNull =
lead_value->valueIsNull;
funcstate->final_aggNoTransValue = false;
funcstate->final_aggShouldFree = true;
lead_num_not_nulls = lead_value->numNotNulls;
}
else
{
funcstate->final_aggTransValue =
datumCopyWithMemManager(0, funcstate->aggTransValue,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&(wstate->mem_manager));
funcstate->final_aggTransValueIsNull =
funcstate->aggTransValueIsNull;
funcstate->final_aggNoTransValue = false;
funcstate->final_aggShouldFree = true;
lead_num_not_nulls = funcstate->numNotNulls;
}
if (has_tuples &&
(lead_num_not_nulls - trail_num_not_nulls > 0))
{
funcstate->final_aggTransValue =
invoke_agg_trans_func(&(funcstate->invprelimfn),
funcstate->invprelimfn.fn_nargs - 1,
funcstate->final_aggTransValue,
&funcstate->final_aggNoTransValue,
&(funcstate->final_aggTransValueIsNull),
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&fcinfo, (void *)wstate,
econtext->ecxt_per_tuple_memory,
&(wstate->mem_manager));
}
else
{
funcstate->final_aggTransValue = funcstate->aggInitValue;
funcstate->final_aggTransValueIsNull =
funcstate->aggInitValueIsNull;
funcstate->final_aggNoTransValue =
funcstate->aggInitValueIsNull;
funcstate->final_aggShouldFree = false;
}
}
else
{
/*
* Check if the frame overlaps the current row.
* This is done by checking if trail is identical to
* the current row when the frame is
* RANGE BETWEEN <following> and <following>, or
* if lead is identical to the current row when the frame is
* RANGE BETWEEN <preceding> and <preceding>.
*
* In the case the frame overlaps, take aggTransValue,
* otherwise aggInitValue, as the final value.
*/
if ((EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) &&
((level_state->is_rows && level_state->trail_rows != 0) ||
(!level_state->is_rows &&
!exec_eq_exprstate(wstate, level_state->trail_range_eq_expr)))) ||
(EDGE_IS_BOUND_PRECEDING(level_state->frame->lead) &&
((level_state->is_rows && level_state->lead_rows != 0) ||
(!level_state->is_rows &&
!exec_eq_exprstate(wstate, level_state->lead_range_eq_expr)))))
{
funcstate->final_aggTransValue = funcstate->aggInitValue;
funcstate->final_aggTransValueIsNull = funcstate->aggInitValueIsNull;
funcstate->final_aggNoTransValue = funcstate->aggInitValueIsNull;
funcstate->final_aggShouldFree = false;
}
else
{
funcstate->final_aggTransValue = funcstate->aggTransValue;
funcstate->final_aggTransValueIsNull = funcstate->aggTransValueIsNull;
funcstate->final_aggNoTransValue = funcstate->aggNoTransValue;
funcstate->final_aggShouldFree = false;
}
}
}
else if (funcstate->isAgg && OidIsValid(funcstate->prelimfn_oid))
{
require_scanning = true;
}
else
{
/* Here are the general cases. The frame buffer stores the input
* arguments. For window aggregate functions, we scan through
* values between the trailing edge and the leading edge, and
* add them into the transition value. For ordinary window
* functions, we pass the frame buffer to its function handler
* to generate the output.
*/
if (!funcstate->isAgg)
{
int argno;
FunctionCallInfoData fcinfo;
argno = initFcinfo(funcstate->wrxstate, &fcinfo, funcstate,
econtext, false);
InitFunctionCallInfoData(fcinfo,
&funcstate->windowfn,
argno,
(void *) funcstate->wrxstate,
NULL);
funcstate->win_value = FunctionCallInvoke(&fcinfo);
funcstate->win_value_is_null = fcinfo.isnull;
}
else
require_scanning = true;
}
}
/*
* Compute transition values for functions whose values have to be computed
* by scanning through multiple entries in the frame buffer. We will
* read each entry once.
*/
if (require_scanning)
computeTransValuesThroughScan(level_state, wstate);
}
/*
* Initialize function state for each function in the Window node.
*/
static void
initWindowFuncState(WindowState *wstate, Window *node)
{
int numrefs;
int refno;
ListCell *lc;
if (wstate->wrxstates == NULL)
return;
wstate->numfuncs = 0;
numrefs = list_length(wstate->wrxstates);
Insist(numrefs > 0);
wstate->func_state = palloc0(sizeof(WindowStatePerFunctionData) * numrefs);
/* Initialize per window ref (both ordinary and aggregate derived). */
refno = -1;
foreach(lc, wstate->wrxstates)
{
WindowRefExprState *wrxstate = (WindowRefExprState *) lfirst(lc);
WindowRef *winref = (WindowRef *) wrxstate->xprstate.expr;
WindowStatePerFunction funcstate;
Oid inputTypes[FUNC_MAX_ARGS];
Form_pg_proc proform;
HeapTuple heap_tuple;
ListCell *lcarg;
int i, funcno;
Oid winOwner,
winResType;
bool isAgg,
isWin,
isSet;
AclResult aclresult;
cqContext *pcqCtx;
refno++; /* First one is 0 */
/* Register the parent window state with the window ref state node. */
wrxstate->refno = refno;
wrxstate->windowstate = wstate;
/* Look for a previous duplicate window function */
for (funcno = 0; funcno < refno; funcno++)
{
WindowRef *w = wstate->func_state[funcno].wref;
if (equal(winref, w) &&
!contain_volatile_functions((Node *) w))
break;
}
if (funcno < refno)
{
/* Found a match to an existing entry, so just mark it */
wrxstate->funcno = funcno;
continue;
}
/* No match so this window ref represents a new function and we
* need to fill in its function state.
*/
funcstate = &wstate->func_state[funcno];
funcstate->wrxstate = wrxstate;
funcstate->wref = winref;
funcstate->serial_index = -1;
if (winref->winlevel >= wstate->numlevels)
{
funcstate->trivial_frame = true;
funcstate->wlevel = NULL;
}
else
{
WindowStatePerLevel level_state =
&wstate->level_state[winref->winlevel];
/* When this level has the range frame, this frame may not
* always match with the row_number function. For example,
* row_number() over (order by cn), sum(qty) over (order by cn)
* Both of these functions point to the level "order by cn",
* which is used the default frame "range between unbounded
* preceding and current_row". This is not valid for row_number.
* We should not put row_number and sum in the same level.
* We can simply ignore row_number in this key level.
*/
if (!level_state->is_rows &&
winref->winfnoid == ROW_NUMBER_OID)
{
funcstate->wlevel = NULL;
funcstate->trivial_frame = true;
}
else
{
funcstate->wlevel = level_state;
level_state->level_funcs = lappend(level_state->level_funcs,
funcstate);
}
}
wrxstate->funcno = funcno;
wstate->numfuncs = funcno + 1;
/* May we call the function? Initially all window functions are
* built-in, however, at this point we haven't learned whether
* we're working on an ordinary window function or an aggregate.
*/
aclresult = pg_proc_aclcheck(winref->winfnoid, GetUserId(),
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(winref->winfnoid));
/* Collect information about the window function's pg_proc entry. */
pcqCtx = caql_beginscan(
NULL,
cql("SELECT * FROM pg_proc "
" WHERE oid = :1 ",
ObjectIdGetDatum(winref->winfnoid)));
heap_tuple = caql_getnext(pcqCtx);
insist_log(HeapTupleIsValid(heap_tuple),
"cache lookup failed for window function proc %u",
winref->winfnoid);
proform = (Form_pg_proc) GETSTRUCT(heap_tuple);
isAgg = proform->proisagg;
isWin = proform->proiswin;
isSet = proform->proretset;
winResType = proform->prorettype;
winOwner = proform->proowner;
caql_endscan(pcqCtx);
Assert( isAgg != isWin );
Assert( !isSet );
/*
* Get actual datatypes of the inputs. These could be different from
* the declared input types, when the function accepts ANY, ANYARRAY
* or ANYELEMENT.
*/
i = 0;
foreach(lcarg, winref->args)
{
inputTypes[i++] = exprType((Node *) lfirst(lcarg));
}
funcstate->numargs = list_length(winref->args);
funcstate->isAgg = isAgg;
/* The rest depends on the type (agg-derived or ordinary). */
if ( isAgg )
{
HeapTuple agg_tuple;
Form_pg_aggregate aggform;
#define NUM_AGG_FNS_TO_CHECK 5
int c;
Oid to_check[NUM_AGG_FNS_TO_CHECK];
Oid aggtranstype;
Expr *transfnexpr,
*finalfnexpr,
*invtransfnexpr,
*invprelimfnexpr,
*prelimfnexpr;
Datum textInitVal;
cqContext *aggcqCtx;
Insist(winref->winlevel < wstate->numlevels);
aggcqCtx = caql_beginscan(
NULL,
cql("SELECT * FROM pg_aggregate "
" WHERE aggfnoid = :1 ",
ObjectIdGetDatum(winref->winfnoid)));
agg_tuple = caql_getnext(aggcqCtx);
insist_log(HeapTupleIsValid(agg_tuple), "cache lookup failed for aggregate %u",
winref->winfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(agg_tuple);
/* Get the implementation functions and related information
* we need to handle aggregate-derived functions.
*
* TODO - Omit ones we're not interested in.
*/
to_check[0] = funcstate->transfn_oid = aggform->aggtransfn;
to_check[1] = funcstate->finalfn_oid = aggform->aggfinalfn;
to_check[2] = funcstate->invtransfn_oid = aggform->agginvtransfn;
to_check[3] = funcstate->invprelimfn_oid = aggform->agginvprelimfn;
to_check[4] = funcstate->prelimfn_oid = aggform->aggprelimfn;
/* Check that the aggregate owner has needed permissions. */
for (c = 0; c < NUM_AGG_FNS_TO_CHECK; c++)
{
if ( ! OidIsValid(to_check[c]) )
continue;
aclresult = pg_proc_aclcheck(to_check[c], winOwner, ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(to_check[c]));
}
aggtranstype = resolve_polymorphic_transtype(aggform->aggtranstype,
winref->winfnoid,
inputTypes);
/*
* Build expression trees using actual argument & result types.
* c.f., ExecInitAgg(). We should really reduce this to a
* few common functions.
*/
build_aggregate_fnexprs(inputTypes, funcstate->numargs,
aggtranstype, winref->restype,
funcstate->transfn_oid,
funcstate->finalfn_oid,
funcstate->prelimfn_oid,
funcstate->invtransfn_oid,
funcstate->invprelimfn_oid,
&transfnexpr, &finalfnexpr, &prelimfnexpr,
&invtransfnexpr, &invprelimfnexpr);
fmgr_info(funcstate->transfn_oid, &funcstate->transfn);
funcstate->transfn.fn_expr = (Node *)transfnexpr;
if (OidIsValid(funcstate->finalfn_oid))
{
fmgr_info(funcstate->finalfn_oid, &funcstate->finalfn);
funcstate->finalfn.fn_expr = (Node *) finalfnexpr;
}
if (OidIsValid(funcstate->invtransfn_oid))
{
fmgr_info(funcstate->invtransfn_oid, &funcstate->invtransfn);
funcstate->invtransfn.fn_expr = (Node *) invtransfnexpr;
}
if (OidIsValid(funcstate->prelimfn_oid))
{
fmgr_info(funcstate->prelimfn_oid, &funcstate->prelimfn);
funcstate->prelimfn.fn_expr = (Node *) prelimfnexpr;
}
if (OidIsValid(funcstate->invprelimfn_oid))
{
fmgr_info(funcstate->invprelimfn_oid, &funcstate->invprelimfn);
funcstate->invprelimfn.fn_expr = (Node *) invprelimfnexpr;
}
get_typlenbyval(winref->restype,
&funcstate->resulttypeLen,
&funcstate->resulttypeByVal);
get_typlenbyval(aggtranstype,
&funcstate->aggTranstypeLen,
&funcstate->aggTranstypeByVal);
/*
* initval is potentially null, so don't try to access it as a
* struct field. Must do it the hard way with caql_getattr
*/
textInitVal = caql_getattr(aggcqCtx,
Anum_pg_aggregate_agginitval,
&funcstate->aggInitValueIsNull);
if (funcstate->aggInitValueIsNull)
funcstate->aggInitValue = (Datum) 0;
else
funcstate->aggInitValue = GetAggInitVal(textInitVal,
aggtranstype);
/*
* If the transfn is strict and the initval is NULL, make sure input
* type and transtype are the same (or at least binary-compatible), so
* that it's OK to use the first input value as the initial
* aggTransValue. This should have been checked at agg definition time,
* but just in case...
*/
if (funcstate->transfn.fn_strict && funcstate->aggInitValueIsNull)
{
if (funcstate->numargs < 1 ||
!IsBinaryCoercible(inputTypes[0], aggtranstype))
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate %u needs to have compatible "
"input type and transition type",
winref->winfnoid)));
}
caql_endscan(aggcqCtx);
wrxstate->winkind = WINKIND_AGGREGATE;
}
else if (isWin)
{
HeapTuple win_tuple;
Form_pg_window winform;
Oid windowfn_oid = InvalidOid;
Const *refptr;
ExprState *xtrastate;
cqContext *wincqCtx;
wincqCtx = caql_beginscan(
NULL,
cql("SELECT * FROM pg_window "
" WHERE winfnoid = :1 ",
ObjectIdGetDatum(winref->winfnoid)));
win_tuple = caql_getnext(wincqCtx);
if (!HeapTupleIsValid(win_tuple))
elog(ERROR, "cache lookup failed for window function %u",
winref->winfnoid);
winform = (Form_pg_window) GETSTRUCT(win_tuple);
/*
* Lookup ordinary window implementation function info.
*/
switch ( winref->winstage)
{
case WINSTAGE_IMMEDIATE:
case WINSTAGE_ROWKEY:
windowfn_oid = winform->winfunc;
break;
case WINSTAGE_PRELIMINARY:
windowfn_oid = winform->winprefunc;
break;
}
wrxstate->winkind = winform->winkind;
Assert(OidIsValid(windowfn_oid));
/* Check that window function owner has permission to call the
* implementation function.
*
* XXX initially all implementation functions are builtin and
* anyone can call them, but good to follow form.
*/
aclresult = pg_proc_aclcheck(windowfn_oid,
winOwner,
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(windowfn_oid));
funcstate->windowfn_oid = windowfn_oid;
funcstate->allowframe = winform->winallowframe;
/* Inject extra argument. */
refptr = makeNode(Const);
refptr->consttype = INTERNALOID;
refptr->constlen = 4;
refptr->constvalue = PointerGetDatum(wrxstate);
refptr->constisnull = false;
refptr->constbyval = true;
xtrastate = ExecInitExpr((Expr *) refptr, (PlanState *) wstate);
wrxstate->args = lcons(xtrastate, wrxstate->args);
fmgr_info(funcstate->windowfn_oid, &funcstate->windowfn);
/*
* Initialize byval and typlen for framed funcs. Of course we're
* creating an entry here for the inserted refptr but we do so
* for consistency.
*/
if (funcstate->allowframe || IS_LEAD_LAG(wrxstate->winkind) ||
IS_FIRST_LAST(wrxstate->winkind))
{
ListCell *lc;
int numargs = list_length(wrxstate->args);
int argno = 0;
wrxstate->argtypbyval = palloc(sizeof(bool) * numargs);
wrxstate->argtyplen = palloc(sizeof(int16) * numargs);
foreach(lc, wrxstate->args)
{
if (argno == 0)
{
/* internal arg */
wrxstate->argtypbyval[argno] = true;
wrxstate->argtyplen[argno] = INTERNALOID;
}
else
{
Oid typid = inputTypes[argno - 1];
int16 len;
bool byval;
get_typlenbyval(typid, &len, &byval);
wrxstate->argtypbyval[argno] = byval;
wrxstate->argtyplen[argno] = len;
}
argno++;
}
}
/*
* Only set the peer count if we're referencing an actual
* window key level and if the peer count is required. This
* ensures that subsequent iterations do not overwrite the peer
* count flag.
*/
if (PointerIsValid(funcstate->wlevel) && winform->winpeercount)
{
funcstate->wlevel->need_peercount = winform->winpeercount;
funcstate->winpeercount = winform->winpeercount;
wstate->need_peercount = winform->winpeercount;
}
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
caql_endscan(wincqCtx);
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("inappropriate use of function as window function")));
}
}
}
/*
* Initialize WindowStatePerLevel in the Window node.
*/
static void
initWindowStatePerLevel(WindowState *wstate, Window *node)
{
ListCell *lc;
int level_no = 0;
WindowKey *prev_key = NULL;
TupleDesc desc;
int col_no;
desc = ExecGetResultType(wstate->ps.lefttree);
foreach(lc, node->windowKeys)
{
WindowKey *key = (WindowKey *) lfirst(lc);
WindowFrame *frame;
WindowStatePerLevel lvl = &wstate->level_state[level_no++];
lvl->has_delay_bound = false;
lvl->has_only_trans_funcs = false;
/* Copy the sort columns for this level. If the window key contains
* an empty list of sort columns, we copy the previous non-empty
* list of sort columns here. This makes it easier to find keys to
* be stored in the frame buffer later.
*/
lvl->numSortCols = key->numSortCols;
if (lvl->numSortCols == 0 && prev_key != NULL)
lvl->numSortCols += prev_key->numSortCols;
lvl->sortColIdx = (AttrNumber *)palloc0(lvl->numSortCols * sizeof(AttrNumber));
lvl->sortOperators = (Oid *)palloc0(lvl->numSortCols *sizeof(Oid));
if (lvl->numSortCols > key->numSortCols)
{
memcpy(lvl->sortColIdx, prev_key->sortColIdx,
prev_key->numSortCols * sizeof(AttrNumber));
memcpy(lvl->sortOperators, prev_key->sortOperators,
prev_key->numSortCols * sizeof(Oid));
}
memcpy(lvl->sortColIdx + (lvl->numSortCols - key->numSortCols),
key->sortColIdx,
key->numSortCols * sizeof(AttrNumber));
memcpy(lvl->sortOperators + (lvl->numSortCols - key->numSortCols),
key->sortOperators,
key->numSortCols * sizeof(Oid));
if (key->numSortCols > 0)
prev_key = key;
/* Find comparison functions */
lvl->eqfunctions = execTuplesMatchPrepare(desc,
lvl->numSortCols,
lvl->sortColIdx);
lvl->ltfunctions = get_ltfuncs(desc,
lvl->numSortCols,
lvl->sortColIdx);
/* Set the frame for this level */
lvl->is_rows = false;
if (!key->frame)
{
/*
* User didn't specify a frame, so we add the default. We do not
* do this in the parser because otherwise we cannot handle
* situation such as the following query being turned into a view:
*
* SELECT sum(...) OVER (w1), RANK() OVER (w1) ...
* WINDOW w1 AS (ORDER BY foo);
*
* When transformed to a view, we'd add the default framing clause
* and then the parser would barf when it encountered a
* window clause for a RANK function. So, we do it here.
*/
lvl->default_frame = true;
frame = (WindowFrame *)makeNode(WindowFrame);
frame->trail = (WindowFrameEdge *)makeNode(WindowFrameEdge);
frame->lead = (WindowFrameEdge *)makeNode(WindowFrameEdge);
/*
* The default frame is:
*
* RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
*/
frame->is_rows = false;
frame->trail->kind = WINDOW_UNBOUND_PRECEDING;
frame->lead->kind = WINDOW_CURRENT_ROW;
lvl->is_rows = false;
/*
* Note: we haven't set is_rows or exclude but the use of palloc0()
* inside of makeNode() ensures all fields of the struct are
* zeros so we're golden.
*/
}
else
{
frame = key->frame;
lvl->is_rows = key->frame->is_rows;
}
lvl->frame = frame;
Assert(PointerIsValid(frame->trail));
Assert(PointerIsValid(frame->lead));
lvl->col_types = palloc(sizeof(Oid) * lvl->numSortCols);
lvl->col_typlens = palloc(sizeof(int2) * lvl->numSortCols);
lvl->col_typbyvals = palloc(sizeof(bool) * lvl->numSortCols);
for (col_no = 0; col_no < lvl->numSortCols; col_no++)
{
AttrNumber attnum = lvl->sortColIdx[col_no];
Form_pg_attribute attr = desc->attrs[attnum - 1];
lvl->col_types[col_no] = attr->atttypid;
lvl->col_typlens[col_no] = attr->attlen;
lvl->col_typbyvals[col_no] = attr->attbyval;
}
}
}
/* WindowState constructor.
*/
static WindowState *
makeWindowState(Window *window, EState *estate)
{
int numlevels;
WindowState *wstate = makeNode(WindowState);
wstate->ps.plan = (Plan *) window;
wstate->ps.state = estate;
wstate->wrxstates = NIL;
wstate->eqfunctions = NULL;
wstate->numfuncs = 0;
wstate->func_state = NULL;
wstate->row_index = 0;
wstate->level_state = NULL;
numlevels = list_length(window->windowKeys);
if ( numlevels > 0 )
wstate->level_state = (WindowStatePerLevel)
palloc0(numlevels * sizeof(WindowStatePerLevelData));
else
wstate->level_state = NULL;
wstate->numlevels = numlevels;
wstate->transcontext = AllocSetContextCreate(CurrentMemoryContext,
"TransContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
wstate->cmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"CmpContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* We allocate the buffer to be 1K initially, which should be
* sufficient for most cases.
*/
wstate->serial_array = palloc0(FRAMEBUFFER_ENTRY_SIZE);
wstate->max_size = FRAMEBUFFER_ENTRY_SIZE;
return wstate;
}
/* [Re]initialize prior to processing a new partition.
*/
static void
initializePartition(WindowState *wstate)
{
int level;
/* Reset partition primitive values. Note row_index is -1 to flag
* "before first row of partition" and so that it increments to 0.
*/
wstate->row_index = -1;
/* Reset key-level primitive values.
*/
for ( level = 0; level < wstate->numlevels; level++ )
{
WindowStatePerLevel level_state = &wstate->level_state[level];
level_state->group_index = 0;
level_state->prior_non_peer_count = 0;
level_state->peer_index = 0;
level_state->peer_count = 0;
level_state->rank = 1;
level_state->dense_rank = 1;
level_state->prior_rank = 1;
level_state->prior_dense_rank = 1;
}
/* Per-partition input buffer management reinitialzation. */
resetInputBuffer(wstate);
}
/*
* trimInputBuffer -- trim the old values in the input buffer.
*/
static void
trimInputBuffer(WindowInputBuffer buffer)
{
/* Trim all values before the current_row_reader position. */
NTupleStorePos pos;
bool found;
Assert(buffer != NULL);
found = ntuplestore_acc_tell(buffer->current_row_reader, &pos);
if (!found)
return;
ntuplestore_trim(buffer->tuplestore, &pos);
}
/*
* Reset an input buffer after a partition boundary has been passed. We
* must keep the last tuple fetched -- it is the tuple which caused the
* key break -- and put it in the new buffer.
*
* Of course, we could get called to initialise the buffer as well.
*/
static void
resetInputBuffer(WindowState *wstate)
{
int level;
bool save_firsttuple = false;
if (wstate->input_buffer)
{
save_firsttuple = wstate->input_buffer->part_break;
if (save_firsttuple)
{
#ifdef USE_ASSERT_CHECKING
bool found =
#endif
ntuplestore_acc_current_tupleslot(wstate->input_buffer->writer,
wstate->spare);
Assert(found);
wstate->curslot = ExecCopySlot(wstate->curslot, wstate->spare);
}
ntuplestore_destroy_accessor(wstate->input_buffer->writer);
ntuplestore_destroy_accessor(wstate->input_buffer->current_row_reader);
ntuplestore_destroy_accessor(wstate->input_buffer->reader);
ntuplestore_reset(wstate->input_buffer->tuplestore);
wstate->input_buffer->part_break = false;
wstate->input_buffer->num_tuples = 0;
wstate->cur_slot_is_new = false;
wstate->cur_slot_part_break = false;
wstate->cur_slot_key_break = wstate->numlevels;
resetFrameBuffers(wstate);
}
else
{
wstate->input_buffer = (WindowInputBuffer)palloc0(sizeof(WindowInputBufferData));
wstate->input_buffer->tuplestore =
ntuplestore_create((PlanStateOperatorMemKB((PlanState *) wstate) * 1024L)/2); /* correct? */
createFrameBuffers(wstate);
}
/*
* Reset transition values. This has to be done even if we are
* creating new buffer because the trans values might remain in
* case of rescanning.
*/
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
resetTransValues(level_state, wstate);
}
wstate->input_buffer->writer =
ntuplestore_create_accessor(wstate->input_buffer->tuplestore, true);
wstate->input_buffer->current_row_reader =
ntuplestore_create_accessor(wstate->input_buffer->tuplestore, false);
wstate->input_buffer->reader =
ntuplestore_create_accessor(wstate->input_buffer->tuplestore, false);
if (save_firsttuple)
{
ntuplestore_acc_put_tupleslot(wstate->input_buffer->writer,
wstate->curslot);
wstate->input_buffer->num_tuples++;
ntuplestore_acc_seek_first(wstate->input_buffer->current_row_reader);
ntuplestore_acc_seek_first(wstate->input_buffer->reader);
wstate->cur_slot_is_new = true;
wstate->row_index++;
}
}
static void
freeInputBuffer(WindowState *wstate)
{
ntuplestore_destroy_accessor(wstate->input_buffer->writer);
ntuplestore_destroy_accessor(wstate->input_buffer->current_row_reader);
ntuplestore_destroy(wstate->input_buffer->tuplestore);
pfree(wstate->input_buffer);
wstate->input_buffer = NULL;
}
/*
* Advance values related to computing peer count.
*/
static void
advancePeerCount(WindowStatePerLevel level_state,
int curr_level, int min_level)
{
if (curr_level >= min_level && min_level != -1)
{
level_state->prior_non_peer_count += level_state->peer_count + 1;
level_state->peer_index = 0;
level_state->peer_count = 0;
}
}
/* Advance the "primitive" values that drive the calculation of window
* functions for the window ordering of the specified level.
*
* On entry these values are as set by partition initialization or as
* left by the previous call to this function. Per-partition state
* has not yet been advanced by function nextRow(), our caller.
*
* The "primitive" values managed here are read-only outside the window
* framework implemented by this function, initializePartition(), and
* nextRow().
*
* Note that the content of different key levels is independent. The
* function nextRow() is responsible for calling this function as needed
* for related key levels.
*/
static void
advanceKeyLevelState(WindowState *wstate, int min_level)
{
WindowStatePerLevel level_state;
int64 cur_row_index;
int i;
Assert(-1 <= min_level && min_level < wstate->numlevels);
cur_row_index = wstate->row_index;
for (i = 0; i < wstate->numlevels; i++)
{
level_state = &wstate->level_state[i];
if (i >= min_level && min_level != -1)
{
/* The level data is still set for the previous row.
* Remember the rank and dense_rank values.
*/
level_state->prior_rank = level_state->rank;
level_state->prior_dense_rank = level_state->dense_rank;
/* Advance level data for first peer in next group. */
level_state->group_index++;
level_state->rank = 1 + cur_row_index;
level_state->dense_rank = 1 + level_state->group_index;
}
}
}
static int
initFcinfo(WindowRefExprState *wrxstate, FunctionCallInfoData *fcinfo,
WindowStatePerFunction funcstate, ExprContext *econtext,
bool check_nulls)
{
ListCell *lcarg;
int argno;
bool has_nulls = false;
MemoryContext oldctx;
/* If this is an agg, the first arg will be the trans value */
if (funcstate->isAgg)
argno = 1;
else
argno = 0;
/* Switch memory context just once for all args */
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
foreach (lcarg, wrxstate->args)
{
ExprState *argstate = (ExprState *) lfirst(lcarg);
fcinfo->arg[argno] = ExecEvalExpr(argstate, econtext,
fcinfo->argnull + argno, NULL);
if (fcinfo->argnull[argno])
has_nulls = true;
argno++;
}
MemoryContextSwitchTo(oldctx);
if (check_nulls && !has_nulls)
funcstate->numNotNulls++;
return argno;
}
static void
add_tuple_to_trans(WindowStatePerFunction funcstate, WindowState *wstate,
ExprContext *econtext, bool check_nulls)
{
int argno;
FunctionCallInfoData fcinfo;
/* Evaluate function arguments, save in fcinfo. */
argno = initFcinfo(funcstate->wrxstate, &fcinfo, funcstate, econtext, check_nulls);
if (funcstate->isAgg)
{
funcstate->aggTransValue =
invoke_agg_trans_func(&(funcstate->transfn),
funcstate->numargs,
funcstate->aggTransValue,
&(funcstate->aggNoTransValue),
&(funcstate->aggTransValueIsNull),
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&fcinfo, (void *)wstate,
econtext->ecxt_per_tuple_memory,
&(wstate->mem_manager));
}
else /* ordinary window function */
{
InitFunctionCallInfoData(fcinfo,
&funcstate->windowfn,
argno,
(void *) funcstate->wrxstate,
NULL);
if (IS_LEAD_LAG(funcstate->wrxstate->winkind) ||
IS_LAST_VALUE(funcstate->wrxstate->winkind) ||
(IS_FIRST_VALUE(funcstate->wrxstate->winkind) &&
funcstate->aggNoTransValue))
{
int argno = 1;
funcstate->aggTransValue =
datumCopyWithMemManager(0, fcinfo.arg[argno],
funcstate->wrxstate->argtypbyval[argno],
funcstate->wrxstate->argtyplen[argno],
&(wstate->mem_manager));
funcstate->aggTransValueIsNull = fcinfo.argnull[argno];
funcstate->aggNoTransValue = false;
}
else
{
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
funcstate->win_value = FunctionCallInvoke(&fcinfo);
funcstate->win_value_is_null = fcinfo.isnull;
MemoryContextSwitchTo(oldctx);
}
}
}
static void
invokeTrivialFuncs(WindowState *wstate, bool *found)
{
int funcno;
ExprContext *econtext = wstate->ps.ps_ExprContext;
for (funcno = 0; funcno < wstate->numfuncs; funcno++)
{
WindowStatePerFunction funcstate = &wstate->func_state[funcno];
WindowStatePerLevel level_state = funcstate->wlevel;
if (funcstate->trivial_frame)
{
econtext->ecxt_scantuple = wstate->curslot;
add_tuple_to_trans(funcstate, wstate, econtext, false);
if (funcstate->isAgg)
{
funcstate->final_aggTransValue = funcstate->aggTransValue;
funcstate->final_aggTransValueIsNull = funcstate->aggTransValueIsNull;
funcstate->final_aggNoTransValue = funcstate->aggNoTransValue;
funcstate->final_aggShouldFree = false;
}
}
else if (funcstate->cumul_frame &&
!HAS_ONLY_TRANS_FUNC(funcstate))
{
WindowFrameBuffer frame_buffer;
Assert(level_state != NULL);
frame_buffer = level_state->frame_buffer;
Assert(frame_buffer != NULL);
freeTransValue(&funcstate->final_aggTransValue,
funcstate->aggTranstypeByVal,
&funcstate->final_aggTransValueIsNull,
&funcstate->final_aggNoTransValue,
funcstate->final_aggShouldFree);
/* When the current_row_reader is set, we retrieve the intermediate
* accumulated aggregate value from the buffer.
*/
if (ntuplestore_acc_tell(frame_buffer->current_row_reader, NULL))
{
FrameBufferEntry *entry = level_state->curr_entry_buf;
bool has_entry = false;
WindowValue *value = NULL;
Assert(funcstate->serial_index != -1);
has_entry = getCurrentValue(frame_buffer->current_row_reader,
level_state, entry);
Assert(has_entry);
value = (WindowValue *)list_nth(entry->func_values,
funcstate->serial_index);
Assert(value != NULL);
funcstate->final_aggTransValue =
datumCopyWithMemManager(0, value->value,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&(wstate->mem_manager));
funcstate->final_aggTransValueIsNull = value->valueIsNull;
funcstate->final_aggNoTransValue = false;
funcstate->final_aggShouldFree = true;
}
/* Otherwise, the value for the current_rows is stored in aggTransValue.
*/
else
{
funcstate->final_aggTransValue = funcstate->aggTransValue;
funcstate->final_aggTransValueIsNull = funcstate->aggTransValueIsNull;
funcstate->final_aggNoTransValue = funcstate->aggNoTransValue;
funcstate->final_aggShouldFree = false;
}
}
else
*found = true;
}
}
/*
* cmp_deformed_tuple -- compare two given arrays of Datums.
*
* If is_equal is true, this function returns true when these two Datums are
* equal.
*
* If is_equal is false, this function return true if Datum a is ordered
* before Datum b. The ASC/DESC ordering is defined by 'asc_cols'.
*/
static bool
cmp_deformed_tuple(Datum *a, bool *a_nulls, Datum *b, bool *b_nulls,
bool *asc_cols, int ncols, FmgrInfo *ltfuncs,
FmgrInfo *eqfuncs, MemoryContext evalContext, bool is_equal)
{
MemoryContext oldContext;
bool result;
int i, j;
FmgrInfo *funcs;
/* Reset and switch into the temp context. */
MemoryContextReset(evalContext);
oldContext = MemoryContextSwitchTo(evalContext);
if (is_equal)
{
funcs = eqfuncs;
result = true;
}
else
{
funcs = ltfuncs;
result = true;
}
/*
* For equality testing, we can will most likely find an inequality
* most quickly by working from right to left. But, we might not be
* doing equality testing in which case we must move from left to
* right.
*/
if (is_equal)
i = ncols - 1;
else
i = 0;
for (j = 0; j < ncols; j++, is_equal ? i-- : i++)
{
Datum attr1,
attr2;
bool isNull1,
isNull2;
bool res;
attr1 = a[i];
isNull1 = a_nulls[i];
attr2 = b[i];
isNull2 = b_nulls[i];
if (isNull1 != isNull2)
{
if (is_equal)
{
result = false; /* one null and one not; they aren't equal */
break;
}
else
{
/*
* Currently, we assume NULLS LAST. In the future, when we
* add support NULLS first to the window order-by clause,
* we should take care of NULLS FIRST as well.
*/
if (isNull1)
result = false;
else
result = true;
break;
}
}
else if (isNull1)
continue; /* both are null, treat as equal */
/*
* As we're not doing equality tests, be aware of user specified
* ordering. If the user specified DESC, swap the order of arguments
* for this test.
*/
if (!is_equal)
{
if (!asc_cols[i])
{
Datum tmp;
tmp = attr1;
attr1 = attr2;
attr2 = tmp;
}
}
/* Apply the type-specific function */
res = DatumGetBool(FunctionCall2(&funcs[i], attr1, attr2));
if (is_equal && !res)
{
/* we wanted equality but didn't get it */
result = false;
break;
}
else if (!is_equal)
{
if (!res)
{
/*
* We wanted it to be less, but it wasn't. Continue only if
* they're equal.
*/
res = DatumGetBool(FunctionCall2(&eqfuncs[i], attr1, attr2));
if (res)
{
/*
* Because we found the previous key equal, set the
* return value to false.
*/
result = false;
continue;
}
else
/* LHS is greater than RHS */
result = false;
}
else
{
/* because all keys are sorted, we return straight away */
result = true;
}
break;
}
}
MemoryContextSwitchTo(oldContext);
return result;
}
/*
* Like heap_deform_tuple(), but we only extract those fields listed in
* attnums.
*
* values and nulls should be allocated memory by the caller.
*/
static void
deform_window_tuple(TupleTableSlot *slot, int nattrs, AttrNumber *attnums,
Datum *values, bool *nulls, WindowState *wstate)
{
int i;
MemoryContext oldctx = MemoryContextSwitchTo(wstate->transcontext);
for (i = 0; i < nattrs; i++)
{
AttrNumber attnum = attnums[i];
bool isnull;
Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[attnum - 1];
Datum d = slot_getattr(slot, attnum, &isnull);
/*
* We must copy this tuple because we may need to keep it around
* a long time. Caller is responsible for freeing these values.
*/
values[i] = datumCopyWithMemManager(0, d,
attr->attbyval, attr->attlen,
&(wstate->mem_manager));
nulls[i] = isnull;
}
MemoryContextSwitchTo(oldctx);
}
/*
* Adjust the trailing and leading edge for ROWS-based framing.
*/
static void
adjustEdgesForRows(WindowFrameBuffer buffer,
WindowState *wstate)
{
WindowStatePerLevel level_state = buffer->level_state;
if (!(level_state->empty_frame &&
EDGE_IS_DELAYED(level_state->frame->trail)))
{
if (EDGE_IS_BOUND(level_state->frame->trail) ||
EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
{
if (level_state->num_trail_rows <= level_state->trail_rows - 1)
{
bool found = false;
bool was_valid = true;
if (ntuplestore_acc_tell(level_state->trail_reader, NULL))
found = ntuplestore_acc_advance(level_state->trail_reader, 1);
else
{
was_valid = false;
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
level_state->trail_rows == 0)
found = ntuplestore_acc_seek_first(level_state->trail_reader);
}
/* Only increment num_trail_rows when we found the next tuple. */
if (found ||
(was_valid && level_state->agg_filled &&
EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail)))
level_state->num_trail_rows++;
}
}
}
if (!(level_state->empty_frame &&
EDGE_IS_DELAYED(level_state->frame->lead)))
{
if (EDGE_IS_BOUND_PRECEDING(level_state->frame->lead) &&
level_state->num_lead_rows == level_state->lead_rows)
{
ntuplestore_acc_seek_first(level_state->lead_reader);
}
else if ((!EDGE_IS_BOUND(level_state->frame->lead) &&
!EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true)) ||
level_state->num_lead_rows < level_state->lead_rows)
{
bool prev_set = ntuplestore_acc_tell(level_state->lead_reader, NULL);
bool found = ntuplestore_acc_advance(level_state->lead_reader, 1);
if (found || (prev_set && level_state->agg_filled))
level_state->num_lead_rows++;
}
}
}
/*
* forwardEdgeForRange -- move the edge forward from its previous
* position in the frame buffer until it points to the key value
* whose next value is greater than the current edge value for the
* leading edge, and whose next value is greater than or equal to
* the current edge value for the trailing edge.
*/
static void
forwardEdgeForRange(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
ExprState *edge_expr,
ExprState *edge_range_expr,
NTupleStoreAccessor *edge_reader,
bool is_lead_edge)
{
ExprContext *econtext = wstate->ps.ps_ExprContext;
bool has_entry=false;
FrameBufferEntry *entry = level_state->curr_entry_buf;
Datum new_edge_value = 0;
bool new_edge_value_isnull = true;
MemoryContext oldctx;
bool is_less = true;
bool is_equal = true;
bool lastrow_edge = false;
Assert(EDGE_IS_BOUND(edge));
if (is_lead_edge)
level_state->lead_ready = false;
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
/*
* Compute the new edge value. Note that this value could be
* NULL.
*/
new_edge_value = ExecEvalExpr(edge_range_expr, econtext,
&(new_edge_value_isnull), NULL);
MemoryContextSwitchTo(oldctx);
if (!ntuplestore_acc_tell(edge_reader, NULL))
ntuplestore_acc_seek_first(edge_reader);
while ((is_equal && is_lead_edge) || is_less)
{
has_entry = getCurrentValue(edge_reader, level_state, entry);
if (!has_entry)
break;
is_less = cmp_deformed_tuple(entry->keys, entry->nulls,
&new_edge_value,
&new_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
false);
if (!is_less)
{
is_equal =
cmp_deformed_tuple(entry->keys, entry->nulls,
&new_edge_value,
&new_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
true);
}
if ((is_equal && is_lead_edge) || is_less)
ntuplestore_acc_advance(edge_reader, 1);
}
if (has_entry)
{
ntuplestore_acc_advance(edge_reader, -1);
if (is_lead_edge)
level_state->lead_ready = true;
}
else
{
lastrow_edge = checkLastRowForEdge(level_state, wstate,
edge, edge_reader,
new_edge_value,
new_edge_value_isnull,
is_lead_edge);
if (!lastrow_edge || !is_lead_edge)
ntuplestore_acc_seek_last(edge_reader);
}
}
/*
* adjustEdgesForRange -- adjust both the trailing and leading edge
* to the given frame buffer for its level state.
*
* This function advances the leading reader until it rests on the position
* at the frame buffer after which all key values are greater than
* the leading edge value. The trailing reader rests on the position
* at the frame buffer which key value is the greatest value in the
* buffer that is less than the trailing edge value.
*/
static void
adjustEdgesForRange(WindowFrameBuffer buffer,
WindowState *wstate)
{
WindowStatePerLevel level_state = buffer->level_state;
if (!(level_state->empty_frame &&
EDGE_IS_DELAYED(level_state->frame->trail)))
{
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
{
NTupleStorePos pos;
if (ntuplestore_acc_tell(buffer->current_row_reader, &pos))
{
ntuplestore_acc_seek(level_state->trail_reader, &pos);
ntuplestore_acc_advance(level_state->trail_reader, -1);
}
else
{
ntuplestore_acc_seek_last(level_state->trail_reader);
}
}
else if (EDGE_IS_BOUND(level_state->frame->trail))
{
forwardEdgeForRange(level_state, wstate,
level_state->frame->trail,
level_state->trail_expr,
level_state->trail_range_expr,
level_state->trail_reader,
false);
}
}
if (!(level_state->empty_frame &&
EDGE_IS_DELAYED(level_state->frame->lead)))
{
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
{
NTupleStorePos pos;
if (ntuplestore_acc_tell(buffer->current_row_reader, &pos))
{
ntuplestore_acc_seek(level_state->lead_reader, &pos);
level_state->lead_ready = true;
}
else
{
ntuplestore_acc_advance(level_state->lead_reader, 1);
level_state->lead_ready = false;
}
}
else if (EDGE_IS_BOUND(level_state->frame->lead))
{
forwardEdgeForRange(level_state, wstate,
level_state->frame->lead,
level_state->lead_expr,
level_state->lead_range_expr,
level_state->lead_reader,
true);
}
else
{
ntuplestore_acc_advance(level_state->lead_reader, 1);
level_state->lead_ready = false;
}
}
}
/*
* Adjust the trailing and leading edge to the frame buffer for
* its list of key levels.
*/
static void
adjustEdges(WindowFrameBuffer buffer,
WindowState *wstate)
{
WindowStatePerLevel level_state = buffer->level_state;
if (level_state->is_rows)
adjustEdgesForRows(buffer, wstate);
else
adjustEdgesForRange(buffer, wstate);
}
/*
* invokeNonTrivialFuncs -- compute transition values for
* non-trivial window functions.
*/
static void
invokeNonTrivialFuncs(WindowState *wstate)
{
int level;
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
computeFrameValue(level_state,
wstate,
level_state->trail_reader,
level_state->lead_reader);
}
}
static void
finaliseFuncs(WindowState *wstate)
{
int funcno;
ExprContext *econtext = wstate->ps.ps_ExprContext;
MemoryContext oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
/*
* Put the current transition value into the econtext
*/
for (funcno = 0; funcno < wstate->numfuncs; funcno++)
{
WindowStatePerFunction funcstate = &wstate->func_state[funcno];
FunctionCallInfoData fcinfo;
/* Evaluate function arguments, save in fcinfo. */
if (funcstate->isAgg)
{
Datum *vals = econtext->ecxt_aggvalues;
bool *isnulls = econtext->ecxt_aggnulls;
if (OidIsValid(funcstate->finalfn_oid))
{
InitFunctionCallInfoData(fcinfo, &(funcstate->finalfn), 1,
(void *) wstate, NULL);
fcinfo.arg[0] = funcstate->final_aggTransValue;
fcinfo.argnull[0] = funcstate->final_aggTransValueIsNull;
if (fcinfo.flinfo->fn_strict && funcstate->final_aggTransValueIsNull)
{
/* don't call a strict function with NULL inputs */
vals[funcno] = (Datum) 0;
isnulls[funcno] = true;
}
else
{
vals[funcno] = FunctionCallInvoke(&fcinfo);
isnulls[funcno] = fcinfo.isnull;
}
}
else
{
vals[funcno] = funcstate->final_aggTransValue;
isnulls[funcno] = funcstate->final_aggTransValueIsNull;
}
}
else /* ordinary window function */
{
Datum *vals = econtext->ecxt_aggvalues;
bool *isnulls = econtext->ecxt_aggnulls;
vals[funcno] = funcstate->win_value;
isnulls[funcno] = funcstate->win_value_is_null;
}
}
MemoryContextSwitchTo(oldctx);
}
/*
* We must evaluate functions in a 'frame aware' way.
*
* UNBOUNDED PRECEDING frame edges are managed by passing the transition value
* from the last call to the function. In this way, we trivially evaluate
* all rows in the frame which have already passed. It gets more complex
* for RANGE UNBOUNDED PRECEDING frames because all peers must emit the same
* value -- so, we remember this in our state.
*
* Bounded PRECEDING and FOLLOWING edges are harder. We have two cases.
*
* With ROWS bounded frames, we must detect our position from the current
* rows. If we are earlier on in the tuple count than the frame edge, those
* tuples must be removed from the the transition value using the inversion
* method for the function. If one doesn't exist, we do something even more
* complex. See below. Once in the frame, we only care about the leading
* frame edge (it could be the current row or some rows from it). So, we must
* be careful about that.
*
* With RANGE bounded frames, we determine the edge with the equality
* function(s) of the key level. Before and after the frame edges, we do the
* same as for ROWS frames but remember, we must emit the same value for
* all peers.
*
* If inversion functions do exist for a function, we must evaluate the frame
* every single time it moves.
*/
static void
invokeWindowFuncs(WindowState *wstate)
{
bool found_complex_func = false;
invokeTrivialFuncs(wstate, &found_complex_func);
/*
* If there are no functions/frames which need non-trivial window
* management, we're done.
*/
if (found_complex_func)
invokeNonTrivialFuncs(wstate);
finaliseFuncs(wstate);
}
/*
* range_is_negative -- return true if the range expression
* is negative.
*/
static bool
range_is_negative(ExprState *edge_expr,
Datum value)
{
/*
* XXX Most likely not good enough to just compare the value
* with 0 here.
*/
if (value < 0)
return true;
return false;
}
/*
* get_delay_edge -- evaluate the expression from the current row,
* and return the value for the given DELAY_BOUND edge.
*/
static Datum
get_delay_edge(WindowFrameEdge *edge,
ExprState *edge_expr,
bool is_rows,
WindowState *wstate)
{
Datum edge_param = 0;
bool isnull = true;
ExprContext *econtext = wstate->ps.ps_ExprContext;
MemoryContext oldctx;
Assert(EDGE_IS_DELAYED(edge));
econtext->ecxt_scantuple = wstate->curslot;
if (TupIsNull(wstate->curslot))
ereport(ERROR,
(errcode(ERROR_INVALID_WINDOW_FRAME_PARAMETER),
errmsg("%s parameter cannot be NULL",
(is_rows ? "ROWS" : "RANGE"))));
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
edge_param = ExecEvalExpr(edge_expr,
econtext,
&isnull,
NULL);
MemoryContextSwitchTo(oldctx);
if (isnull)
ereport(ERROR,
(errcode(ERROR_INVALID_WINDOW_FRAME_PARAMETER),
errmsg("%s parameter cannot be NULL",
(is_rows ? "ROWS" : "RANGE"))));
if (is_rows && edge_param < 0)
ereport(ERROR,
(errcode(ERROR_INVALID_WINDOW_FRAME_PARAMETER),
errmsg("%s parameter cannot be negative",
(is_rows ? "ROWS" : "RANGE")),
errOmitLocation(true)));
/* Check if the range expression is negative. */
if (!is_rows)
if (range_is_negative(edge_expr, edge_param))
ereport(ERROR,
(errcode(ERROR_INVALID_WINDOW_FRAME_PARAMETER),
errmsg("%s parameter cannot be negative",
(is_rows ? "ROWS" : "RANGE")),
errOmitLocation(true)));
if (is_rows && EDGE_IS_BOUND_PRECEDING(edge))
edge_param = 0 - edge_param;
return edge_param;
}
/*
* checkLastRowForEdge -- check if the last row in the current partition
* in the input buffer is within frame edges of the current_row.
*
* This function is only used for RANGE-frames.
*
* This function returns true if the last row is within frame edges
* of the current_row.
*/
static bool
checkLastRowForEdge(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
NTupleStoreAccessor *edge_reader,
Datum new_edge_value,
bool new_edge_value_isnull,
bool is_lead_edge)
{
FrameBufferEntry *entry = level_state->curr_entry_buf;
bool is_less = true;
bool is_equal = true;
Assert(!level_state->is_rows);
/* Check if the last row is the edge. */
if (wstate->input_buffer->part_break)
{
/* If the last tuple breaks the partition key, the tuple to check
* is the last second tuple in the input buffer.
*/
bool found = ntuplestore_acc_advance(wstate->input_buffer->writer, -1);
Assert(found);
ntuplestore_acc_current_tupleslot(wstate->input_buffer->writer,
wstate->spare);
/* Put the writer pointer back to its original place. */
found = ntuplestore_acc_advance(wstate->input_buffer->writer, 1);
Assert(found);
}
else
{
ntuplestore_acc_current_tupleslot(wstate->input_buffer->writer,
wstate->spare);
}
deform_window_tuple(wstate->spare,
level_state->numSortCols, level_state->sortColIdx,
entry->keys, entry->nulls,
wstate);
is_less = cmp_deformed_tuple(entry->keys, entry->nulls,
&new_edge_value, &new_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
false);
if ((EDGE_IS_BOUND_FOLLOWING(edge) && !is_less) ||
(EDGE_IS_BOUND_PRECEDING(edge) && is_less))
{
is_equal =
cmp_deformed_tuple(entry->keys, entry->nulls,
&new_edge_value, &new_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
true);
}
else
is_equal = false;
if (is_equal)
return true;
if (is_lead_edge)
{
if ((EDGE_IS_BOUND_FOLLOWING(edge) && is_less) ||
(EDGE_IS_BOUND_PRECEDING(edge) && is_less))
{
return true;
}
}
else
{
if ((EDGE_IS_BOUND_FOLLOWING(edge) && !is_less) ||
(EDGE_IS_BOUND_PRECEDING(edge) && !is_less))
return true;
}
return false;
}
/*
* advanceEdgeForRange -- advance forward/backward a given RANGE-frame
* edge from its previous position to its new position in the frame
* buffer.
*/
static void
advanceEdgeForRange(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
ExprState *edge_expr,
ExprState *edge_range_expr,
NTupleStoreAccessor *edge_reader,
bool is_lead_edge)
{
ExprContext *econtext = wstate->ps.ps_ExprContext;
bool has_entry=false;
FrameBufferEntry *entry = level_state->curr_entry_buf;
Datum new_edge_value = 0;
bool new_edge_value_isnull = true;
bool is_less = false;
bool is_equal = false;
bool lastrow_edge = false;
MemoryContext oldctx;
if (is_lead_edge)
level_state->lead_ready = false;
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
/*
* Compute the new edge value. Note that this value can be
* NULL.
*/
new_edge_value = ExecEvalExpr(edge_range_expr, econtext,
&(new_edge_value_isnull), NULL);
MemoryContextSwitchTo(oldctx);
if (!ntuplestore_acc_tell(edge_reader, NULL))
{
lastrow_edge = checkLastRowForEdge(level_state, wstate,
edge, edge_reader,
new_edge_value,
new_edge_value_isnull,
is_lead_edge);
if ((is_lead_edge || lastrow_edge) && EDGE_IS_BOUND_PRECEDING(edge))
ntuplestore_acc_seek_last(edge_reader);
}
if (!EDGE_EQ_CURRENT_ROW(level_state, wstate, edge, is_lead_edge))
{
bool prev_is_equal = false;
do
{
has_entry = getCurrentValue(edge_reader, level_state, entry);
if (!has_entry)
break;
is_less = cmp_deformed_tuple(entry->keys, entry->nulls,
&new_edge_value,
&new_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
false);
prev_is_equal = is_equal;
is_equal =
cmp_deformed_tuple(entry->keys, entry->nulls,
&new_edge_value,
&new_edge_value_isnull,
level_state->col_sort_asc,
level_state->numSortCols,
level_state->ltfunctions,
level_state->eqfunctions,
wstate->cmpcontext,
true);
if ((EDGE_IS_BOUND_FOLLOWING(edge) && (is_less || is_equal)) ||
(EDGE_IS_BOUND_PRECEDING(edge) && (!is_less || is_equal)))
{
if (EDGE_IS_BOUND_FOLLOWING(edge))
ntuplestore_acc_advance(edge_reader, 1);
else
ntuplestore_acc_advance(edge_reader, -1);
}
} while ((EDGE_IS_BOUND_FOLLOWING(edge) && (is_less || is_equal)) ||
(EDGE_IS_BOUND_PRECEDING(edge) && (!is_less || is_equal)));
if (has_entry)
{
if (EDGE_IS_BOUND_FOLLOWING(edge))
{
/*
* When the edge is following, edge_reader points to
* the position whose value is after the new edge value.
* We need to back off one position if the value in its
* previous position is equal to the new edge value, or
* this is the leading edge.
*/
if (prev_is_equal || is_lead_edge)
ntuplestore_acc_advance(edge_reader, -1);
}
else
{
/*
* When the edge is preceding, edge_reader points to
* the position whose value is before the new edge value.
* We need to back off one if the value in its next
* position is equal to the new edge value. We also
* back off one position if this edge is the trailing
* edge.
*/
if (prev_is_equal || !is_lead_edge)
ntuplestore_acc_advance(edge_reader, 1);
}
if (is_lead_edge)
level_state->lead_ready = true;
}
else
{
ntuplestore_acc_set_invalid(edge_reader);
if (!EDGE_IS_BOUND_FOLLOWING(edge))
{
/*
* When the leading edge is "x preceding", check the first value
* in the buffer. If this value is equal to the new edge value,
* point the lead_reader to this value.
*/
if (is_lead_edge && is_equal)
ntuplestore_acc_seek_first(edge_reader);
return;
}
Assert(EDGE_IS_BOUND_FOLLOWING(edge));
/*
* If the last value in the frame buffer is equal to
* the new edge value, simple set the reader to
* the last value. Otherwise, we check with
* the last row in the input buffer.
*/
if (is_equal)
ntuplestore_acc_seek_last(edge_reader);
else
{
lastrow_edge = checkLastRowForEdge(level_state, wstate,
edge, edge_reader,
new_edge_value,
new_edge_value_isnull,
is_lead_edge);
if (!lastrow_edge && is_lead_edge)
ntuplestore_acc_seek_last(edge_reader);
}
}
}
/* For the trailing edge, we also back off one. */
if (!is_lead_edge)
{
bool found = false;
found = ntuplestore_acc_advance(edge_reader, -1);
if (!found)
{
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, edge, is_lead_edge))
{
if (!ntuplestore_acc_tell(level_state->frame_buffer->current_row_reader,
NULL))
ntuplestore_acc_seek_last(edge_reader);
}
else if (EDGE_IS_BOUND_FOLLOWING(edge))
{
lastrow_edge = checkLastRowForEdge(level_state, wstate,
edge, edge_reader,
new_edge_value,
new_edge_value_isnull,
is_lead_edge);
if (lastrow_edge)
ntuplestore_acc_seek_last(edge_reader);
}
}
}
}
/*
* adjustDelayBoundEdgeForRange -- evaluate the given DELAY_BOUND edge
* in RANGE-type frames and adjust the corresponding edge to the frame
* buffer based on this new value.
*/
static void
adjustDelayBoundEdgeForRange(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
ExprState *edge_expr,
ExprState *edge_range_expr,
NTupleStoreAccessor *edge_reader,
Datum *p_request_value,
bool is_lead_edge)
{
WindowFrameBuffer frame_buffer = level_state->frame_buffer;
NTupleStorePos pos;
bool found;
Assert(level_state->has_delay_bound && !level_state->is_rows);
Assert(EDGE_IS_DELAYED(edge));
Assert(level_state->numSortCols == 1);
/* Compute the new edge value */
*p_request_value =
get_delay_edge(edge, edge_expr, level_state->is_rows, wstate);
/* Set the edge to current_row */
found = ntuplestore_acc_tell(frame_buffer->current_row_reader, &pos);
if (found)
{
found = ntuplestore_acc_seek(edge_reader, &pos);
Assert(found);
}
else
{
ntuplestore_acc_set_invalid(edge_reader);
}
advanceEdgeForRange(level_state, wstate,
edge, edge_expr, edge_range_expr,
edge_reader, is_lead_edge);
}
/*
* adjustDelayBoundEdgeForRows -- evaluate the given DELAY_BOUND edge in
* a ROWS-type frame and adjust the corresponding edge to the frame buffer
* based on this new edge value.
*/
static void
adjustDelayBoundEdgeForRows(WindowStatePerLevel level_state,
WindowState *wstate,
WindowFrameEdge *edge,
ExprState *edge_expr,
NTupleStoreAccessor *edge_reader,
long int *p_request_num_rows,
long int *p_num_rows,
bool is_lead_edge)
{
WindowFrameBuffer frame_buffer = level_state->frame_buffer;
bool found;
Assert(level_state->has_delay_bound && level_state->is_rows);
Assert(EDGE_IS_DELAYED(edge));
Assert(list_length(level_state->level_funcs) >= 1);
*p_request_num_rows =
get_delay_edge(edge, edge_expr, level_state->is_rows, wstate);
*p_num_rows = 0;
/* Reset the edge_reader position in the frame buffer. */
if (EDGE_IS_BOUND_FOLLOWING(edge))
{
NTupleStorePos curr_row_pos;
if (ntuplestore_acc_tell(frame_buffer->current_row_reader, &curr_row_pos))
{
/*
* Set edge_reader to current_row and advance it to
* *p_request_num_rows.
*/
found = ntuplestore_acc_seek(edge_reader, &curr_row_pos);
Assert(found);
if (frame_buffer->num_rows_after - 1 >= *p_request_num_rows)
{
found = ntuplestore_acc_advance(edge_reader, *p_request_num_rows);
Assert(found);
*p_num_rows = *p_request_num_rows;
}
else
{
ntuplestore_acc_set_invalid(edge_reader);
*p_num_rows = frame_buffer->num_rows_after - 1;
if (level_state->agg_filled)
(*p_num_rows)++;
}
}
else
{
ntuplestore_acc_set_invalid(edge_reader);
*p_num_rows = 0;
}
}
else
{
NTupleStorePos curr_row_pos;
int ntuples = *p_request_num_rows;
*p_num_rows = 0;
if (ntuplestore_acc_tell(frame_buffer->current_row_reader, &curr_row_pos))
{
found = ntuplestore_acc_seek(edge_reader, &curr_row_pos);
Assert(found);
}
else
{
found = ntuplestore_acc_seek_last(edge_reader);
found = ntuplestore_acc_tell(edge_reader, &curr_row_pos);
if (level_state->agg_filled)
ntuples++;
}
if (found)
{
if (frame_buffer->num_rows_before >= 0 - *p_request_num_rows)
{
found = ntuplestore_acc_advance(edge_reader, ntuples);
*p_num_rows = *p_request_num_rows;
}
else
{
ntuplestore_acc_set_invalid(edge_reader);
*p_num_rows = 0 - frame_buffer->num_rows_before;
}
}
}
/* For the trailing edge, we also back off one. */
if (!is_lead_edge)
{
/*
* If the frame is "0 preceding/following" and the current_row_reader
* is pointing to an invalid position, we simple set the
* trailing edge to the last entry in the frame buffer.
*/
if (*p_request_num_rows == 0 &&
!ntuplestore_acc_tell(frame_buffer->current_row_reader, NULL))
{
ntuplestore_acc_seek_last(edge_reader);
}
/*
* If the trailing edge points to an invalid position and the
* edge is "x following", we want to set the current trailing
* edge to the end of the buffer.
*/
else if (*p_num_rows > 0 &&
!ntuplestore_acc_tell(edge_reader, NULL))
{
ntuplestore_acc_seek_last(edge_reader);
(*p_num_rows)--;
}
else
ntuplestore_acc_advance(edge_reader, -1);
}
}
/*
* fetchCurrentRow -- retrieve the current_row from the input buffer.
*
* The current_row_reader always points to the previous current_row
* position. When there is no rows in the buffer, this function reads
* the first tuple from the outer plan.
*/
static TupleTableSlot *
fetchCurrentRow(WindowState *wstate)
{
bool found = false;
WindowInputBuffer buffer = wstate->input_buffer;
Window *window = (Window *)wstate->ps.plan;
bool has_prior_tuple = false;
ExprContext *econtext = wstate->ps.ps_ExprContext;
int level;
if (buffer != NULL)
{
/* read the previous slot */
ntuplestore_acc_current_tupleslot(buffer->current_row_reader,
wstate->priorslot);
has_prior_tuple = true;
found = ntuplestore_acc_advance(buffer->current_row_reader, 1);
}
if (found)
wstate->cur_slot_is_new = false;
else
{
/* Fetch the first tuple from the outer plan */
TupleTableSlot *slot = ExecProcNode(outerPlanState(wstate));
if (TupIsNull(slot))
return NULL;
Gpmon_M_Incr(GpmonPktFromWindowState(wstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&wstate->ps);
if (buffer == NULL)
{
initializePartition(wstate);
}
ntuplestore_acc_put_tupleslot(wstate->input_buffer->writer, slot);
wstate->input_buffer->num_tuples++;
if (buffer == NULL)
{
/* The first tuple will not break any keys. */
wstate->cur_slot_key_break = wstate->numlevels;
ntuplestore_acc_seek_first(wstate->input_buffer->current_row_reader);
ntuplestore_acc_seek_first(wstate->input_buffer->reader);
}
else
{
ntuplestore_acc_seek_last(wstate->input_buffer->current_row_reader);
ntuplestore_acc_seek_last(wstate->input_buffer->reader);
}
buffer = wstate->input_buffer;
if (wstate->curslot->tts_tupleDescriptor == NULL)
ExecSetSlotDescriptor(wstate->curslot,
slot->tts_tupleDescriptor);
if (wstate->priorslot->tts_tupleDescriptor == NULL)
ExecSetSlotDescriptor(wstate->priorslot,
slot->tts_tupleDescriptor);
if (wstate->spare->tts_tupleDescriptor == NULL)
ExecSetSlotDescriptor(wstate->spare,
slot->tts_tupleDescriptor);
wstate->cur_slot_is_new = true;
}
found = ntuplestore_acc_current_tupleslot(buffer->current_row_reader,
wstate->curslot);
Assert(found);
wstate->cur_slot_part_break = false;
wstate->cur_slot_key_break = -1;
/* Check if the partition key breaks */
if (has_prior_tuple)
{
wstate->cur_slot_part_break =
(window->numPartCols &&
(!execTuplesMatch(wstate->curslot, wstate->priorslot,
window->numPartCols, window->partColIdx,
wstate->eqfunctions,
econtext->ecxt_per_tuple_memory)));
if (wstate->cur_slot_part_break)
wstate->input_buffer->part_break = true;
else
{
int level = 0;
bool match = true;
/* Process ordering partial key that breaks.
* We also increment the peer_count.
*/
for (level=0; level<wstate->numlevels; level++)
{
WindowStatePerLevel lvl = &wstate->level_state[level];
match = execTuplesMatch(wstate->curslot, wstate->priorslot,
lvl->numSortCols,
lvl->sortColIdx,
lvl->eqfunctions,
econtext->ecxt_per_tuple_memory);
if (!match)
{
wstate->cur_slot_key_break = level;
break;
}
lvl->peer_index++;
}
}
}
wstate->row_index++;
if (wstate->cur_slot_key_break != -1 &&
wstate->cur_slot_key_break < wstate->numlevels)
advanceKeyLevelState(wstate, wstate->cur_slot_key_break);
/* If this tuple breaks the partition key, re-initialize
* the state.
*/
if (wstate->cur_slot_part_break)
initializePartition(wstate);
else
trimInputBuffer(buffer);
/* Evaluate the DELAY_BOUND edges, including LEAD/LAG offset expressions,
* and change the framing clauses accordingly.
*
* We also adjust the trailing edge and the leading edge to the frame
* buffer based on these new edge values.
*/
for (level=0; level<wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
if (!level_state->has_delay_bound)
continue;
if (EDGE_IS_DELAYED(level_state->frame->lead))
{
if (level_state->is_rows)
adjustDelayBoundEdgeForRows(level_state, wstate,
level_state->frame->lead,
level_state->lead_expr,
level_state->lead_reader,
&(level_state->lead_rows),
&(level_state->num_lead_rows),
true);
else
adjustDelayBoundEdgeForRange(level_state, wstate,
level_state->frame->lead,
level_state->lead_expr,
level_state->lead_range_expr,
level_state->lead_reader,
&(level_state->lead_range),
true);
}
if (EDGE_IS_DELAYED(level_state->frame->trail))
{
if (level_state->is_rows)
{
if (equal(level_state->frame->lead, level_state->frame->trail))
{
NTupleStorePos pos;
bool found = false;
level_state->num_trail_rows = level_state->num_lead_rows;
level_state->trail_rows = level_state->lead_rows;
found = ntuplestore_acc_tell(level_state->lead_reader, &pos);
if (found)
{
ntuplestore_acc_seek(level_state->trail_reader, &pos);
/* We back off one for the trailing edge. */
ntuplestore_acc_advance(level_state->trail_reader, -1);
}
else
{
if (level_state->num_trail_rows > 0 ||
level_state->trail_rows == 0)
{
ntuplestore_acc_seek_last(level_state->trail_reader);
if (level_state->num_trail_rows > 0)
level_state->num_trail_rows--;
}
else
ntuplestore_acc_set_invalid(level_state->trail_reader);
}
}
else
adjustDelayBoundEdgeForRows(level_state, wstate,
level_state->frame->trail,
level_state->trail_expr,
level_state->trail_reader,
&(level_state->trail_rows),
&(level_state->num_trail_rows),
false);
}
else
{
adjustDelayBoundEdgeForRange(level_state, wstate,
level_state->frame->trail,
level_state->trail_expr,
level_state->trail_range_expr,
level_state->trail_reader,
&(level_state->trail_range),
false);
}
}
/* set empty_frame flag */
setEmptyFrame(level_state, wstate);
}
return wstate->curslot;
}
/*
* fetchTupleSlotThroughBuf -- fetch a tuple through the reader of the
* input buffer.
*/
static TupleTableSlot *
fetchTupleSlotThroughBuf(WindowState *wstate)
{
WindowInputBuffer buffer = wstate->input_buffer;
bool found = false;
Window *window = (Window *)wstate->ps.plan;
ExprContext *econtext = wstate->ps.ps_ExprContext;
bool has_prior_tuple = false;
/* Read the previous tupleslot if any */
if (ntuplestore_acc_tell(buffer->reader, NULL))
{
found = ntuplestore_acc_current_tupleslot(buffer->reader,
wstate->priorslot);
has_prior_tuple = found;
}
if (found)
{
/* Advance the reader */
found = ntuplestore_acc_advance(buffer->reader, 1);
Assert(!found);
}
if (!found)
{
TupleTableSlot *slot = ExecProcNode(outerPlanState(wstate));
if (TupIsNull(slot))
return NULL;
Gpmon_M_Incr(GpmonPktFromWindowState(wstate), GPMON_QEXEC_M_ROWSIN);
CheckSendPlanStateGpmonPkt(&wstate->ps);
/* Put the new tuple into the input buffer */
ntuplestore_acc_put_tupleslot(buffer->writer, slot);
buffer->num_tuples++;
ntuplestore_acc_seek_last(buffer->reader);
ntuplestore_acc_current_tupleslot(buffer->writer,
wstate->spare);
wstate->cur_slot_is_new = true;
}
wstate->cur_slot_part_break = false;
wstate->cur_slot_key_break = -1;
if (!TupIsNull(wstate->priorslot))
{
wstate->cur_slot_part_break =
(window->numPartCols &&
(!execTuplesMatch(wstate->spare, wstate->priorslot,
window->numPartCols, window->partColIdx,
wstate->eqfunctions,
econtext->ecxt_per_tuple_memory)));
if (wstate->cur_slot_part_break)
buffer->part_break = true;
else
{
int level = 0;
bool match;
/* Process ordering partial key that breaks.
* We also increment the peer_index.
*/
for (level=0; level<wstate->numlevels; level++)
{
WindowStatePerLevel lvl = &wstate->level_state[level];
match = execTuplesMatch(wstate->spare, wstate->priorslot,
lvl->numSortCols,
lvl->sortColIdx,
lvl->eqfunctions,
econtext->ecxt_per_tuple_memory);
if (!match)
{
wstate->cur_slot_key_break = level;
break;
}
lvl->peer_count++;
}
}
}
return wstate->spare;
}
/*
* ExecWindow
*
*/
TupleTableSlot *
ExecWindow(WindowState *wstate)
{
bool output_ready;
TupleTableSlot *next_tupleslot;
TupleTableSlot *resultSlot;
ExprContext *econtext;
int level;
ExprDoneCond isDone;
bool last_peer = false;
econtext = wstate->ps.ps_ExprContext;
/* Fetch the current_row */
econtext->ecxt_scantuple = fetchCurrentRow(wstate);
econtext->ecxt_outertuple =
econtext->ecxt_scantuple; /* XXX really need this? */
if (TupIsNull(econtext->ecxt_scantuple))
{
ExecEagerFreeWindow(wstate);
return NULL; /* we are done */
}
/* Process the current_row if it has not been processed. */
if (wstate->cur_slot_is_new)
{
processTupleSlot(wstate, wstate->curslot, false);
wstate->cur_slot_is_new = false;
}
/* Increment the current_row in the frame buffer for each
* key level.
*/
for(level=0; level<wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
if (level_state->empty_frame &&
!level_state->has_delay_bound)
continue;
if (!level_state->trivial_frames_only)
{
if ((wstate->input_buffer->num_tuples > 1) &&
(level_state->is_rows ||
(wstate->cur_slot_key_break != -1 &&
level >= wstate->cur_slot_key_break)))
{
econtext->ecxt_scantuple = wstate->curslot;
incrementCurrentRow(level_state->frame_buffer, wstate);
}
}
}
output_ready = checkOutputReady(wstate);
while (!output_ready)
{
next_tupleslot = fetchTupleSlotThroughBuf(wstate);
if (TupIsNull(next_tupleslot) || wstate->cur_slot_part_break)
last_peer = true;
processTupleSlot(wstate, next_tupleslot, last_peer);
if (!TupIsNull(next_tupleslot))
output_ready = checkOutputReady(wstate);
else
output_ready = true;
}
ResetExprContext(econtext);
econtext->ecxt_scantuple = wstate->curslot;
econtext->ecxt_outertuple = wstate->curslot;
invokeWindowFuncs(wstate);
/*
* Form the result tuple using ExecProject(), and return it.
*/
resultSlot = ExecProject(wstate->ps.ps_ProjInfo, &isDone);
if (!TupIsNull(resultSlot))
{
Gpmon_M_Incr_Rows_Out(GpmonPktFromWindowState(wstate));
CheckSendPlanStateGpmonPkt(&wstate->ps);
}
else
{
ExecEagerFreeWindow(wstate);
}
return resultSlot;
}
/*
* resetTransValue -- reset the transition value for
* a given function.
*/
static void
resetTransValue(WindowStatePerFunction funcstate,
WindowState *wstate)
{
if (funcstate->isAgg)
{
freeTransValue(&funcstate->aggTransValue,
funcstate->aggTranstypeByVal,
&funcstate->aggTransValueIsNull,
&funcstate->aggNoTransValue,
true);
if (!funcstate->aggTranstypeByVal && !funcstate->aggTransValueIsNull &&
DatumGetPointer(funcstate->aggTransValue) != NULL)
pfree (DatumGetPointer(funcstate->aggTransValue));
if (funcstate->aggInitValueIsNull)
funcstate->aggTransValue = funcstate->aggInitValue;
else
{
funcstate->aggTransValue =
datumCopyWithMemManager(0, funcstate->aggInitValue,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
&(wstate->mem_manager));
}
funcstate->aggTransValueIsNull = funcstate->aggInitValueIsNull;
funcstate->aggNoTransValue = funcstate->aggInitValueIsNull;
}
else
{
int argno = 1;
WindowRefExprState *wrxstate = funcstate->wrxstate;
if (wrxstate->argtypbyval)
freeTransValue(&funcstate->aggTransValue,
wrxstate->argtypbyval[argno],
&funcstate->aggTransValueIsNull,
&funcstate->aggNoTransValue,
true);
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
}
}
/*
* resetTransValues -- reset the transition values for
* each function.
*/
static void
resetTransValues(WindowStatePerLevel level_state,
WindowState *wstate)
{
ListCell *lc;
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction) lfirst(lc);
resetTransValue(funcstate, wstate);
}
}
/*
* processTupleSlot -- process a new tuple.
*
* This function passes the given tuple to each function in every
* key level to compute an intermediate result.
*
* We only handle non-trivial window functions here.
*
* 'last_peer' indicates if the tuple to be inserted is the last tuple from
* the input. It is used to determine if the trailing edge and leading edge
* need to be adjusted.
*/
static void
processTupleSlot(WindowState *wstate, TupleTableSlot *slot, bool last_peer)
{
int level;
ExprContext *econtext = wstate->ps.ps_ExprContext;
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
ListCell *lc;
bool adv_peercount = false;
/*
* For non-delayed frames, we can simply ignore this level. However,
* for delayed frames, we still need to store the intermediate results
* because they may be needed later.
*/
if (level_state->empty_frame &&
!level_state->has_delay_bound)
continue;
if (!level_state->trivial_frames_only)
{
bool write_pre_value = false;
if (wstate->input_buffer->num_tuples > 1 &&
wstate->cur_slot_is_new)
{
if (level_state->is_rows || level_state->has_only_trans_funcs)
write_pre_value = true;
else if (wstate->cur_slot_part_break || TupIsNull(slot))
write_pre_value = true;
else if ((wstate->cur_slot_key_break != -1 &&
level >= wstate->cur_slot_key_break))
write_pre_value = true;
}
/* If this tuple breaks the order by key in this level,
* we write out previous aggregate values if any.
*/
if (write_pre_value)
{
/*
* If there is a function that requires peer counts,
* we need to compute its preliminary value before
* appending it to the frame buffer.
*/
if (level_state->need_peercount)
{
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate =
(WindowStatePerFunction) lfirst(lc);
if (funcstate->winpeercount)
add_tuple_to_trans(funcstate, wstate, econtext, false);
}
}
/* Set econtext->ecxt_scantuple because the range
* frame needs this for order keys.
*/
econtext->ecxt_scantuple = wstate->priorslot;
appendToFrameBuffer(level_state, wstate, last_peer);
/* Reset the transition value for those functions which
* has preliminary functions, but not inverse preliminary
* functions.
*/
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction) lfirst(lc);
if (funcstate->isAgg &&
!funcstate->cumul_frame &&
!OidIsValid(funcstate->invprelimfn_oid) &&
OidIsValid(funcstate->prelimfn_oid))
{
resetTransValue(funcstate, wstate);
}
else if ((IS_LEAD_LAG(funcstate->wrxstate->winkind) ||
IS_FIRST_LAST(funcstate->wrxstate->winkind)) &&
!last_peer)
{
resetTransValue(funcstate, wstate);
}
}
}
/* If this slot causes the partition key to break, we don't add
* this tuple to the transition value.
*/
if (wstate->cur_slot_part_break || TupIsNull(slot))
{
if (write_pre_value)
level_state->agg_filled = false;
continue;
}
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate =
(WindowStatePerFunction) lfirst(lc);
ExprContext *econtext = wstate->ps.ps_ExprContext;
if (funcstate->trivial_frame)
continue;
if (HAS_ONLY_TRANS_FUNC(funcstate))
continue;
/*
* Increment the values for peer counts. There may be more than
* one such function in one key level. We should catch this
* in the earlier stage, but in case we didn't, we only do
* this once.
*/
if (!adv_peercount && funcstate->winpeercount)
{
if (wstate->cur_slot_is_new &&
wstate->cur_slot_key_break != -1 &&
wstate->cur_slot_key_break < wstate->numlevels)
{
advancePeerCount(level_state, level, wstate->cur_slot_key_break);
}
adv_peercount = true;
}
else
{
/* Add this tuple to its transition value */
econtext->ecxt_scantuple = slot;
econtext->ecxt_outertuple = slot; /* XXX really need this? */
add_tuple_to_trans(funcstate, wstate, econtext, true);
level_state->agg_filled = true;
}
}
}
}
}
/*
* checkOutputReady -- check each key level to see if its frame buffer
* has sufficient data to compute one output rows. If so, return true,
* othwise, return false.
*/
static bool
checkOutputReady(WindowState *wstate)
{
int level;
ExprContext *econtext = wstate->ps.ps_ExprContext;
econtext->ecxt_scantuple = wstate->curslot;
if (wstate->input_buffer->part_break)
return true;
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
if (level_state->trivial_frames_only)
continue;
if (level_state->is_rows)
{
if (!hasEnoughDataInRows(level_state->frame_buffer,
level_state,
wstate,
level_state->trail_rows,
level_state->lead_rows))
return false;
}
else
{
if (!hasEnoughDataInRange(level_state->frame_buffer,
level_state,
wstate,
level_state->trail_range,
level_state->lead_range))
return false;
}
}
return true;
}
/*
* coerceType -- coerce the frame parameter expression to a given type.
*/
static Expr *
coerceType(Expr* expr, Oid new_type)
{
Oid expr_type = 0;
int32 expr_typmod = 0;
Expr *new_expr = expr;
expr_type = exprType((Node *)expr);
expr_typmod = exprTypmod((Node *)expr);
if (expr_type != new_type)
{
new_expr = (Expr *)coerce_to_target_type(NULL,
(Node *)expr,
expr_type,
new_type, expr_typmod,
COERCION_EXPLICIT,
COERCE_IMPLICIT_CAST,
-1);
}
return new_expr;
}
static void
init_bound_frame_edge_expr(WindowFrameEdge *edge, TupleDesc desc,
AttrNumber attnum,
bool is_trail,
WindowStatePerLevel level_state,
WindowState *wstate)
{
ExprState *n;
Expr *expr;
Expr *varexpr;
Oid exprrestype;
Oid ltype, rtype;
HeapTuple tup;
ExprContext *econtext = wstate->ps.ps_ExprContext;
bool isNull;
char *oprname;
Form_pg_operator opr;
int32 vartypmod = desc->attrs[attnum - 1]->atttypmod;
cqContext *pcqCtx;
Insist(EDGE_IS_BOUND(edge));
ltype = desc->attrs[attnum - 1]->atttypid;
rtype = exprType(edge->val);
varexpr = (Expr *)makeVar(0, attnum, ltype, vartypmod, 0);
expr = (Expr *)edge->val;
/*
* Set trailing or leading expression state. If one of such expression
* exist, we coerce the other one to the same type.
*/
if (is_trail)
{
if (level_state->lead_expr != NULL)
{
expr = coerceType(expr, exprType((Node *)(level_state->lead_expr->expr)));
if ( expr == NULL )
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("can't coerce trailing frame bound to type of leading frame bound"),
errhint("specify the leading and trailing frame bounds as the same type"),
errOmitLocation(true)));
}
n = ExecInitExpr(expr, (PlanState *) wstate);
level_state->trail_expr = n;
if (!EDGE_IS_DELAYED(edge))
level_state->trail_range = ExecEvalExpr(n, econtext, &isNull, NULL);
}
else
{
if (level_state->trail_expr != NULL)
{
expr = coerceType(expr, exprType((Node *)(level_state->trail_expr->expr)));
if ( expr == NULL )
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("can't coerce leading frame bound to type of trailing frame bound"),
errhint("specify the leading and trailing frame bounds as the same type"),
errOmitLocation(true)));
}
n = ExecInitExpr((Expr *)expr, (PlanState *) wstate);
level_state->lead_expr = n;
if (!EDGE_IS_DELAYED(edge))
level_state->lead_range = ExecEvalExpr(n, econtext, &isNull, NULL);
}
if ((EDGE_IS_BOUND_PRECEDING(edge) &&
level_state->col_sort_asc[0]) ||
(EDGE_IS_BOUND_FOLLOWING(edge) &&
!level_state->col_sort_asc[0]))
{
oprname = "-";
}
else
{
oprname = "+";
}
pcqCtx = caql_beginscan(
NULL,
cql("SELECT * FROM pg_operator "
" WHERE oprname = :1 "
" AND oprleft = :2 "
" AND oprright = :3 "
" AND oprnamespace = :4 ",
CStringGetDatum(oprname),
ObjectIdGetDatum(ltype),
ObjectIdGetDatum(rtype),
ObjectIdGetDatum(PG_CATALOG_NAMESPACE)));
tup = caql_getnext(pcqCtx);
/*
* If we didn't find an operator, it's probably because the user has
* specified a RANGE parameter which does not have an implicit cast
* to the sorting column. So, see what operator the parser found
* via oper() and arrange for the LHS to be cast to that for the purpose
* of the expression evaluation.
*/
if (!HeapTupleIsValid(tup))
{
HeapTuple tup2;
List *oprlist = lappend(NIL, makeString(oprname));
tup2 = oper(NULL, oprlist, ltype, rtype, true, -1);
list_free_deep(oprlist);
opr = (Form_pg_operator) GETSTRUCT(tup2);
/* this is why we're here */
Insist(ltype != opr->oprleft);
varexpr = (Expr *)coerce_to_target_type(NULL, (Node *)varexpr,
ltype, opr->oprleft, vartypmod,
COERCION_EXPLICIT,
COERCE_IMPLICIT_CAST,
-1);
exprrestype = opr->oprresult;
expr = make_opclause(HeapTupleGetOid(tup2), exprrestype,
false, varexpr,
(Expr *)edge->val);
((OpExpr *)expr)->opfuncid = opr->oprcode;
ReleaseOperator(tup2);
}
else
{
opr = ((Form_pg_operator) GETSTRUCT(tup));
exprrestype = opr->oprresult;
expr = make_opclause(HeapTupleGetOid(tup), exprrestype,
false, varexpr,
(Expr *)edge->val);
((OpExpr *)expr)->opfuncid = opr->oprcode;
caql_endscan(pcqCtx);
}
/*
* If the frame edge operation returns a different type
* to the input type, we must coerce it back.
*/
if (ltype != exprrestype)
{
expr =
(Expr *)coerce_to_target_type(NULL,
(Node *)expr,
exprrestype,
ltype, vartypmod,
COERCION_EXPLICIT,
COERCE_IMPLICIT_CAST,
-1);
}
Insist(PointerIsValid(expr));
n = ExecInitExpr(expr, (PlanState *) wstate);
if (is_trail)
level_state->trail_range_expr = n;
else
level_state->lead_range_expr = n;
/*
* Construct immediate clause like:
* var +/- range = var
* where var is ORDER BY value and range is frame range value,
* so "var +/- range" means frame edge created above (expr).
* Now, if the left hand equals to the right hand, the frame
* edge must be on the current row.
* The created expression is used to determine if the frame
* edge is on the current row or not.
*/
n = make_eq_exprstate(wstate, expr, varexpr);
if (is_trail)
level_state->trail_range_eq_expr = n;
else
level_state->lead_range_eq_expr = n;
}
/*
* make_eq_exprstate
* Given expr1 and expr2, make clase "expr1 = expr2".
* In case the result types of expr1 and expr2 are different,
* the cast is installed on expr2.
*/
static ExprState *
make_eq_exprstate(WindowState *wstate, Expr *expr1, Expr *expr2)
{
Oid restype1, restype2;
Expr *eq_expr;
Operator eq_optup;
Oid eq_opid;
restype1 = exprType((Node *) expr1);
restype2 = exprType((Node *) expr2);
if (restype1 != restype2)
{
expr2 = (Expr *) coerce_to_target_type(NULL,
(Node *) expr2,
restype2,
restype1,
exprTypmod((Node *) expr2),
COERCION_EXPLICIT,
COERCE_IMPLICIT_CAST,
-1);
}
eq_optup = equality_oper(restype1, false);
Assert(exprType((Node *) expr1) == exprType((Node *) expr2));
eq_opid = oprid(eq_optup);
eq_expr = make_opclause(eq_opid, BOOLOID, false, expr1, expr2);
((OpExpr *) eq_expr)->opfuncid = oprfuncid(eq_optup);
ReleaseSysCache(eq_optup);
return ExecInitExpr(eq_expr, (PlanState *) wstate);
}
/*
* exec_eq_exprstate
* Executes eq_exprstate made in make_eq_exprstate() and returns
* bool result. It sets ecxt_scantuple to the current slot
* so that Vars contained in eq_exprstate point to the current row.
*/
static bool
exec_eq_exprstate(WindowState *wstate, ExprState *eq_exprstate)
{
ExprContext *econtext = wstate->ps.ps_ExprContext;
MemoryContext oldctx;
bool isnull, result;
Assert(IsA(eq_exprstate->expr, OpExpr));
/* Make sure Var in eq_expr points to the current slot */
econtext->ecxt_scantuple = wstate->curslot;
oldctx = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
result = DatumGetBool(ExecEvalExpr(eq_exprstate, econtext, &isnull, NULL));
MemoryContextSwitchTo(oldctx);
return isnull ? false : result;
}
/*
* setEmptyFrame -- set empty_frame flag if the framing clause
* contains no tuples.
*/
static void
setEmptyFrame(WindowStatePerLevel level_state,
WindowState *wstate)
{
WindowFrame *frame = level_state->frame;
level_state->empty_frame = false;
Assert (!(frame->trail->kind == WINDOW_UNBOUND_FOLLOWING ||
frame->lead->kind == WINDOW_UNBOUND_PRECEDING ||
(EDGE_IS_BOUND_FOLLOWING(frame->trail) &&
EDGE_IS_BOUND_PRECEDING(frame->lead))));
if (!EDGE_IS_DELAYED_BOUND(frame->trail) &&
!EDGE_IS_DELAYED_BOUND(frame->lead) &&
((EDGE_IS_BOUND_PRECEDING(frame->trail) &&
EDGE_IS_BOUND_PRECEDING(frame->lead)) ||
(EDGE_IS_BOUND_FOLLOWING(frame->trail) &&
EDGE_IS_BOUND_FOLLOWING(frame->lead))))
{
bool is_pre =
(EDGE_IS_BOUND_PRECEDING(frame->trail) &&
EDGE_IS_BOUND_PRECEDING(frame->lead));
if (frame->is_rows)
{
if (level_state->lead_rows < level_state->trail_rows)
level_state->empty_frame = true;
}
else
{
Oid ineq_ordfuncid;
Oid eq_ordfuncid;
Datum ineq_datum;
Datum eq_datum;
FmgrInfo ineq_fcinfo;
FmgrInfo eq_fcinfo;
Operator ineq_optup;
Operator eq_optup;
ineq_optup = ordering_oper(exprType((Node *)level_state->trail_expr->expr),
false);
ineq_ordfuncid = oprfuncid(ineq_optup);
ReleaseSysCache(ineq_optup);
fmgr_info(ineq_ordfuncid, &ineq_fcinfo);
eq_optup = equality_oper(exprType((Node *)level_state->trail_expr->expr),
false);
eq_ordfuncid = oprfuncid(eq_optup);
ReleaseSysCache(eq_optup);
fmgr_info(eq_ordfuncid, &eq_fcinfo);
/* is trail less than or equal to lead */
ineq_datum = FunctionCall2(&ineq_fcinfo, level_state->trail_range,
level_state->lead_range);
eq_datum = FunctionCall2(&eq_fcinfo, level_state->trail_range,
level_state->lead_range);
if (is_pre)
{
if (DatumGetBool(ineq_datum) && !DatumGetBool(eq_datum))
level_state->empty_frame = true;
}
else
{
if (!DatumGetBool(ineq_datum) && !DatumGetBool(eq_datum))
level_state->empty_frame = true;
}
}
}
}
/*
* Initialise frame specific state.
*/
static void
init_frames(WindowState *wstate)
{
int level = -1;
ListCell *lc;
int col_no;
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
int ncols = level_state->numSortCols;
WindowFrame *frame = level_state->frame;
level_state->col_sort_asc = palloc(sizeof(bool) * ncols);
foreach (lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate =
(WindowStatePerFunction)lfirst(lc);
if (!frame || (!funcstate->isAgg && !funcstate->allowframe &&
!IS_LEAD_LAG(funcstate->wrxstate->winkind) &&
!funcstate->winpeercount))
{
funcstate->trivial_frame = true;
}
/*
* We say the function is trivial to invoke if its frame is defined
* as ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW with no
* exclusion clause OR it is a window function without a frame.
*/
if (funcstate->isAgg && frame && frame->is_rows &&
frame->trail->kind == WINDOW_UNBOUND_PRECEDING &&
frame->lead->kind == WINDOW_CURRENT_ROW &&
frame->exclude == WINDOW_EXCLUSION_NULL)
{
funcstate->trivial_frame = true;
}
else if (funcstate->isAgg && frame && !frame->is_rows &&
frame->trail->kind == WINDOW_UNBOUND_PRECEDING &&
frame->lead->kind == WINDOW_CURRENT_ROW &&
frame->exclude == WINDOW_EXCLUSION_NULL)
{
funcstate->cumul_frame = true;
}
}
/* what sort order did the user specify for each key? */
for (col_no = 0; col_no < ncols; col_no++)
{
Oid sortop = level_state->sortOperators[col_no];
char *oprname;
int fetchCount;
oprname = caql_getcstring_plus(
NULL,
&fetchCount,
NULL,
cql("SELECT oprname FROM pg_operator "
" WHERE oid = :1 ",
ObjectIdGetDatum(sortop)));
Insist(fetchCount);
if (strcmp(oprname, "<") == 0)
level_state->col_sort_asc[col_no] = true;
else
level_state->col_sort_asc[col_no] = false;
pfree(oprname);
}
/* now, initialize the actual frame */
if (frame)
{
if (frame->is_rows)
{
ExprContext *econtext = wstate->ps.ps_ExprContext;
if (EDGE_IS_BOUND(frame->trail) &&
level_state->frame->trail->val != NULL)
{
long int rows_param = 0;
bool isnull = true;
Expr *expr = (Expr *)level_state->frame->trail->val;
expr = coerceType(expr, INT4OID);
level_state->trail_expr =
ExecInitExpr((Expr *)expr,
(PlanState *) wstate);
if (!EDGE_IS_DELAYED_BOUND(frame->trail))
{
rows_param = ExecEvalExpr(level_state->trail_expr,
econtext,
&isnull,
NULL);
Insist(!isnull);
}
if (frame->trail->kind == WINDOW_BOUND_PRECEDING)
rows_param = -rows_param;
level_state->trail_rows = rows_param;
}
else
level_state->trail_rows = 0L;
if (EDGE_IS_BOUND(frame->lead) &&
level_state->frame->lead->val != NULL)
{
long int rows_param = 0;
bool isnull = true;
Expr *expr = (Expr *)level_state->frame->lead->val;
expr = coerceType(expr, INT4OID);
level_state->lead_expr =
ExecInitExpr((Expr *)expr,
(PlanState *) wstate);
if (!EDGE_IS_DELAYED_BOUND(frame->lead))
{
rows_param = ExecEvalExpr(level_state->lead_expr,
econtext,
&isnull,
NULL);
Insist(!isnull);
}
if (frame->lead->kind == WINDOW_BOUND_PRECEDING)
rows_param = -rows_param;
level_state->lead_rows = rows_param;
}
else
level_state->lead_rows = 0L;
}
else
{
TupleDesc desc = ExecGetResultType(wstate->ps.lefttree);
/* we only need the subtraction function for bound frames */
if (EDGE_IS_BOUND(frame->trail))
{
init_bound_frame_edge_expr(frame->trail, desc,
level_state->sortColIdx[0],
true, level_state, wstate);
}
if (EDGE_IS_BOUND(frame->lead))
{
init_bound_frame_edge_expr(frame->lead, desc,
level_state->sortColIdx[0],
false, level_state, wstate);
}
}
}
/* look for empty frames */
setEmptyFrame(level_state, wstate);
}
/*
* Set trivial_frames_only in each level state if its functions
* all have trivial frames.
*
* Also initialize the transition values for functions
* in this level.
*/
for (level = 0; level < wstate->numlevels; level++)
{
bool trivial_frames_only = true;
WindowStatePerLevel level_state = &wstate->level_state[level];
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate = (WindowStatePerFunction)
lfirst(lc);
if (!funcstate->trivial_frame)
{
trivial_frames_only = false;
break;
}
}
level_state->trivial_frames_only = trivial_frames_only;
resetTransValues(level_state, wstate);
}
/*
* Set has_delay_bound flag in each level state if the frame clause
* has a edge of DELAY_BOUND type.
*/
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
level_state->has_delay_bound =
(EDGE_IS_DELAYED(level_state->frame->lead) ||
EDGE_IS_DELAYED(level_state->frame->trail));
}
/*
* Set has_only_trans_funcs in each level state if all functions
* in this level are aggregate functions that have no preliminary
* functions or inverse preliminary functions.
*/
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
bool has_only_trans_funcs = false;
int funcno = 0;
if (level_state->is_rows)
continue;
foreach(lc, level_state->level_funcs)
{
WindowStatePerFunction funcstate =
(WindowStatePerFunction) lfirst(lc);
if (HAS_ONLY_TRANS_FUNC(funcstate))
{
Assert(funcno == 0 || has_only_trans_funcs);
has_only_trans_funcs = true;
/*
* XXX disable the general case for now. Some work
* needs to be done when adjusting edges after
* appending values into the frame buffer.
*/
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
errmsg("aggregate functions with no prelimfn or "
"invprelimfn are not yet supported as window functions"),
errOmitLocation(true)));
}
funcno++;
}
level_state->has_only_trans_funcs = has_only_trans_funcs;
}
/*
* Allocate one FrameBufferEntry buffer for each level state, so that
* we don't need to do pallocs/pfrees every time we read an entry
* from the frame buffer.
*/
for (level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
level_state->curr_entry_buf = createFrameBufferEntry(level_state);
level_state->trail_entry_buf = createFrameBufferEntry(level_state);
level_state->lead_entry_buf = createFrameBufferEntry(level_state);
}
}
static FmgrInfo *
get_ltfuncs(TupleDesc tupdesc, int numCols, AttrNumber *matchColIdx)
{
FmgrInfo *ltfunctions = (FmgrInfo *) palloc(numCols * sizeof(FmgrInfo));
int i;
for (i = 0; i < numCols; i++)
{
AttrNumber att = matchColIdx[i];
Oid typid = tupdesc->attrs[att - 1]->atttypid;
Oid lt_function;
Operator optup;
optup = ordering_oper(typid, false);
lt_function = oprfuncid(optup);
fmgr_info(lt_function, &ltfunctions[i]);
ReleaseSysCache(optup);
}
return ltfunctions;
}
/* -----------------
* ExecInitWindow
*
* Creates the run-time information for the window node produced by the
* planner and initializes its outer subtree
* -----------------
*/
WindowState *
ExecInitWindow(Window *node, EState *estate, int eflags)
{
WindowState *wstate;
Plan *outerPlan;
ExprContext *econtext;
int numrefs;
TupleDesc desc;
/* Check for unsupported flags. */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
/* Create state structure. */
wstate = makeWindowState(node, estate);
/* Create expression context. */
ExecAssignExprContext(estate, &wstate->ps);
/* XXX: temporarily high for debugging purposes */
#define WINDOW_NSLOTS 4
/* Initialize tuple table. */
ExecInitResultTupleSlot(estate, &wstate->ps);
wstate->curslot = ExecInitExtraTupleSlot(estate);
wstate->priorslot = ExecInitExtraTupleSlot(estate);
wstate->spare = ExecInitExtraTupleSlot(estate);
/* Initialize child expressions.
*
* ExecInitExpr adds WindowRefExprState nodes it encounters to the
* wrxstates list.
*/
wstate->ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist,
(PlanState *) wstate);
numrefs = list_length(wstate->wrxstates);
/* Initialize child nodes. */
outerPlan = outerPlan(node);
outerPlanState(wstate) = ExecInitNode(outerPlan, estate, eflags);
Assert(innerPlan(node) == NULL);
/* Initialize result tuple type and projection info. */
ExecAssignResultTypeFromTL(&wstate->ps);
ExecAssignProjectionInfo(&wstate->ps, NULL);
desc = ExecGetResultType(wstate->ps.lefttree);
/* Precompute fmgr lookup data for partition key equality function. */
if (node->numPartCols > 0)
{
wstate->eqfunctions =
execTuplesMatchPrepare(desc,
node->numPartCols,
node->partColIdx);
}
else
wstate->eqfunctions = NULL;
initWindowStatePerLevel(wstate, node);
/* Allocate result storage and working storage per window ref.
* (Later we may shrink this if we notice duplicate function
* calls, but allocate for worst case.) Note that we borrow
* aggregate result storage, since there are no non-window
* aggregates that might use it.
*/
econtext = wstate->ps.ps_ExprContext;
econtext->ecxt_aggvalues = (Datum*) palloc0(sizeof(Datum) * numrefs);
econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numrefs);
wstate->need_peercount = false;
initWindowFuncState(wstate, node);
wstate->mem_manager.alloc = cxt_alloc;
wstate->mem_manager.free = cxt_free;
wstate->mem_manager.manager = wstate->transcontext;
/* Frame initialisation can take place now */
init_frames(wstate);
initGpmonPktForWindow((Plan *)node, &wstate->ps.gpmon_pkt, estate);
return wstate;
}
int
ExecCountSlotsWindow(Window *node)
{
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
WINDOW_NSLOTS;
}
void
ExecEndWindow(WindowState *node)
{
int level = 0;
for (level = 0; level < node->numlevels; level++)
{
WindowStatePerLevel level_state = &node->level_state[level];
freeFrameBufferEntry(level_state->curr_entry_buf);
level_state->curr_entry_buf = NULL;
freeFrameBufferEntry(level_state->trail_entry_buf);
level_state->trail_entry_buf = NULL;
freeFrameBufferEntry(level_state->lead_entry_buf);
level_state->lead_entry_buf = NULL;
}
ExecEagerFreeWindow(node);
/* Free the exprcontext */
ExecFreeExprContext(&node->ps);
/* clean out the tuple table */
ExecClearTuple(node->curslot);
ExecClearTuple(node->spare);
ExecClearTuple(node->priorslot);
ExecClearTuple(node->ps.ps_ResultTupleSlot);
if (node->transcontext != NULL)
MemoryContextDelete(node->transcontext);
if (node->cmpcontext != NULL)
MemoryContextDelete(node->cmpcontext);
pfree(node->serial_array);
if (node->numlevels > 0)
pfree(node->level_state);
if (node->func_state != NULL)
pfree(node->func_state);
/* shut down subplans */
ExecEndNode(outerPlanState(node));
EndPlanStateGpmonPkt(&node->ps);
}
/**
* ExecReScanWindow
*
* Higher-up node is telling window node to perform re-scanning.
* Note that exprCtxt may be NULL.
*/
void
ExecReScanWindow(WindowState *node, ExprContext *exprCtxt)
{
Assert(node);
resetFrameBuffers(node);
ExecEagerFreeWindow(node);
Assert(outerPlanState(node));
ExecReScan(outerPlanState(node), exprCtxt);
}
/*
* window_dummy - dummy execution routine for window functions
*
* This function is listed as the implementation (prosrc field) of pg_proc
* entries for window functions. Its only purpose is to throw an error
* if someone mistakenly executes such a function in the normal way.
*/
Datum
window_dummy(PG_FUNCTION_ARGS)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("function %s may only be called as a window function",
format_procedure(fcinfo->flinfo->fn_oid)),
errhint("To call a function as a window function use an OVER clause."),
errOmitLocation(true))
);
return (Datum) 0; /* keep compiler quiet */
}
/*
* Implements the gp_execution_segment() function to return the contentid
* of the current executing segment.
*/
Datum
mpp_execution_segment(PG_FUNCTION_ARGS)
{
PG_RETURN_INT32(GetQEIndex());
}
/*
* Implements the gp_execution_dbid() function to return the dbid of the
* current executing segment.
*/
Datum
gp_execution_dbid(PG_FUNCTION_ARGS)
{
/* DON'T WANNA SUPPORT THIS FUNCTION */
PG_RETURN_INT32(0);
}
/*
* ROW_NUMBER() OVER (...) --> BIGINT
*
* Implement ROW_NUMBER for the given window state:
*
* row_number_immed(internal) --> bigint
*/
Datum
row_number_immed(PG_FUNCTION_ARGS)
{
WindowRefExprState *ref_state = (WindowRefExprState *) PG_GETARG_POINTER(0);
int64 result = 1 + ref_state->windowstate->row_index;
PG_RETURN_INT64(result);
}
/*
* RANK() OVER (... ORDER BY s) --> BIGINT
*
* Implement RANK for the given WindowRefExprState.
*
* rank_immed(internal) --> bigint
*/
Datum
rank_immed(PG_FUNCTION_ARGS)
{
int64 result;
WindowRefExprState *ref_state = (WindowRefExprState *) PG_GETARG_POINTER(0);
WindowState *window_state = ref_state->windowstate;
WindowRef *ref = (WindowRef *)ref_state->xprstate.expr;
WindowStatePerLevel level_state = &window_state->level_state[ref->winlevel];
/* Don't advance prior_rank here, let the framework do this. */
result = level_state->rank;
PG_RETURN_INT64(result);
}
/*
* DENSE_RANK() OVER (... ORDER BY s) --> BIGINT
*
* Implement DENSE_RANK for the given WindowRefExprState.
*
* dense_rank_immed(internal) --> bigint
*/
Datum
dense_rank_immed(PG_FUNCTION_ARGS)
{
int64 result;
WindowRefExprState *ref_state = (WindowRefExprState *) PG_GETARG_POINTER(0);
WindowState *window_state = ref_state->windowstate;
WindowRef *ref = (WindowRef *)ref_state->xprstate.expr;
WindowStatePerLevel level_state = &window_state->level_state[ref->winlevel];
/* Don't advance prior_dense_rank here, let the framework do this. */
result = level_state->dense_rank;
PG_RETURN_INT64(result);
}
/*
* PERCENT_RANK() OVER (... ORDER BY s) -- FLOAT8
*
* Implement rank for the given WindowRefExprState.
*
* rank_immed(internal) --> bigint
* percent_rank_final(bigint,bigint) --> float8
*/
Datum
percent_rank_final(PG_FUNCTION_ARGS)
{
int64 arg1 = PG_GETARG_INT64(0); /* rank in partition */
int64 arg2 = PG_GETARG_INT64(1); /* partition row count */
double result;
if (arg1 < 1 || arg2 < 1)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("arguments invalid or inconsistent"),
errhint("inappropriate call to window function implementation"),
errOmitLocation(true)));
result = 0.0; /* quieten GCC */
}
else if ( arg1 == 1 && arg2 == 1 )
{
result = 0.0;
}
else
{
/* Do division in double, then check for overflow */
result = (double) (arg1 - 1) / (double) (arg2- 1);
if ( result > DBL_MAX )
{
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("value out of range: overflow"),
errOmitLocation(true)));
}
}
PG_RETURN_FLOAT8(result);
}
/*
* CUME_DIST() OVER (... ORDER BY s) --> FLOAT8
*
* Implement CUME_DIST for the given WindowRefExprState.
*
* cume_dist_prelim(internal) --> bigint
* cume_dist_final(bigint,bigint) --> float8
*/
Datum
cume_dist_prelim(PG_FUNCTION_ARGS)
{
int64 result;
WindowRefExprState *ref_state = (WindowRefExprState *) PG_GETARG_POINTER(0);
WindowState *window_state = ref_state->windowstate;
WindowRef *ref = (WindowRef *)ref_state->xprstate.expr;
WindowStatePerLevel level_state = &window_state->level_state[ref->winlevel];
result = level_state->prior_non_peer_count + (level_state->peer_count + 1);
PG_RETURN_INT64(result);
}
Datum
cume_dist_final(PG_FUNCTION_ARGS)
{
int64 arg1 = PG_GETARG_INT64(0); /* prior_non_peer_count + peer_count */
int64 arg2 = PG_GETARG_INT64(1); /* partition row count */
double result;
if (arg1 < 1 || arg2 < 1)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("arguments invalid or inconsistent"),
errhint("inappropriate call to window function implementation")));
result = 0.0; /* quieten GCC */
}
else if ( arg1 == 1 && arg2 == 1 )
{
result = 1.0; /* or is it 0.0? */
}
else
{
/* Do division in double, then check for overflow */
result = (double) arg1 / (double) arg2;
if ( result > DBL_MAX )
{
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("value out of range: overflow")));
}
}
PG_RETURN_FLOAT8((float8)result);
}
/*
* NTILE(BIGINT) OVER (... ORDER BY s) --> BIGINT
*
* Implement NTILE for the given WindowRefExprState.
*
* ntile_prelim_int(internal,int) --> bigint[]
* ntile_prelim_bigint(internal,bigint) --> bigint[]
* ntile_prelim_numeric(internal,numeric) --> bigint[]
* ntile_final(bigint[],bigint) --> bigint
*/
/* Helper defined in src/backend/util/adt/numeric.c. */
extern int64 numeric_to_pos_int8_trunc(Numeric num);
/* Helper. */
static ArrayType *
do_ntile_prelim(WindowRefExprState *ref_state, int64 num_tiles)
{
/*
* Pack row_index from state and the argument num_tiles into a
* two-element array of type bigint.
*/
Datum work[2];
WindowState *window_state = ref_state->windowstate;
if ( num_tiles <= 0 )
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("argument value out of range"),
errhint("NTILE expects a positive integer argument.")));
work[0] = Int64GetDatumFast(num_tiles); /* ntile argument */
work[1] = Int64GetDatumFast(window_state->row_index);
return construct_array(work, 2, INT8OID, 8, true, 'd');
}
Datum
ntile_prelim_int(PG_FUNCTION_ARGS)
{
WindowRefExprState *rstate = (WindowRefExprState *) PG_GETARG_POINTER(0);
int64 ntile_arg = (int64)PG_GETARG_INT32(1);
PG_RETURN_ARRAYTYPE_P(do_ntile_prelim(rstate, ntile_arg));
}
Datum
ntile_prelim_bigint(PG_FUNCTION_ARGS)
{
WindowRefExprState *rstate = (WindowRefExprState *) PG_GETARG_POINTER(0);
int64 ntile_arg = PG_GETARG_INT64(1);
PG_RETURN_ARRAYTYPE_P(do_ntile_prelim(rstate, ntile_arg));
}
Datum
ntile_prelim_numeric(PG_FUNCTION_ARGS)
{
int64 num_tiles;
WindowRefExprState *rstate = (WindowRefExprState *) PG_GETARG_POINTER(0);
Numeric ntile_arg = PG_GETARG_NUMERIC(1);
/* Truncate ntile_arg to int8, put in num_tiles. */
num_tiles = numeric_to_pos_int8_trunc(ntile_arg);
if ( num_tiles < 1 )
num_tiles = 0; /* Set value out of range to trigger error report. */
PG_RETURN_ARRAYTYPE_P(do_ntile_prelim(rstate, num_tiles));
}
/*
* To implement the semantics of NTILE correctly, tuples in the same bucket
* must be clustered together according to the input ordering --
* i.e., 1, 1, 1, 2, 2, 2, 3, 3, 4, 4.
*
* It's tempting to just do ((N - 1) mod M) + 1 but this will not meet
* our requirement. Alternatively, we could just divide the partition
* row count into N buckets and see if the row index fit into an individual
* bucket. That would give us clustered results but it wouldn't handle the
* situation where the number of rows in the partition did not divide evenly
* into the number of buckets (notice that in the above example, these
* larger-by-one buckets are the leading buckets).
*
* So, instead we identify the threshold (prefix_size) to which buckets will
* have one more row and we divide the rows between them (row_index/max_size).
* After the threshold, the next bucket will be number 1 + spares and
* we divide the values evenly between each bucket.
*
* Note that row_index counts from 0.
*/
Datum
ntile_final(PG_FUNCTION_ARGS)
{
int64 result;
Datum *work;
int len;
int64 row_index, num_tiles, min_size, max_size, prefix_size, spares;
ArrayType *pair = PG_GETARG_ARRAYTYPE_P(0); /* row_index, num_tiles */
int64 partition_row_count = PG_GETARG_INT64(1); /* partition row count */
/* we expect the input to be bigint[2] */
deconstruct_array(pair, INT8OID, 8, true, 'd', &work, NULL, &len);
if (len != 2)
elog(ERROR, "expected 2-element int8 array");
num_tiles = DatumGetInt64(work[0]);
row_index = DatumGetInt64(work[1]);
min_size = partition_row_count / num_tiles;
spares = partition_row_count % num_tiles;
max_size = min_size + 1;
prefix_size = spares * max_size;
if (row_index < prefix_size)
result = 1 + row_index / max_size;
else
result = 1 + spares +
(row_index - prefix_size) / min_size;
PG_RETURN_INT64(result);
}
static Datum
lead_lag_internal(PG_FUNCTION_ARGS, bool is_lead, bool *isnull)
{
int64 offset = 0;
Datum val_expr = 0;
bool val_expr_null = true;
WindowRefExprState *wrxstate =
(WindowRefExprState *)PG_GETARG_POINTER(0);
WindowState *wstate = wrxstate->windowstate;
ExprContext *econtext = wstate->ps.ps_ExprContext;
WindowStatePerFunction funcstate = &wstate->func_state[wrxstate->funcno];
WindowStatePerLevel level_state = funcstate->wlevel;
FrameBufferEntry *lead_entry = level_state->lead_entry_buf;
bool has_lead_entry = false;
WindowValue *lead_value = NULL;
Insist(PG_NARGS() >= 2);
if (PG_NARGS() > 3)
{
val_expr = PG_GETARG_DATUM(3);
val_expr_null = PG_ARGISNULL(3);
}
if (PG_NARGS() > 2)
{
if (PG_ARGISNULL(2))
{
if (is_lead)
elog(ERROR, "LEAD offset cannot be NULL");
else
elog(ERROR, "LAG offset cannot be NULL");
}
offset = PG_GETARG_INT64(2);
}
if (PG_NARGS() == 2)
{
offset = 1; /* default */
}
if (offset < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("%s offset cannot be negative",
is_lead ? "LEAD" : "LAG"),
errOmitLocation(true)));
Assert(level_state->is_rows);
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
level_state->num_lead_rows >= level_state->trail_rows)
has_lead_entry = getCurrentValue(level_state->lead_reader,
level_state, lead_entry);
if (has_lead_entry)
{
lead_value = (WindowValue *)list_nth(lead_entry->func_values,
funcstate->serial_index);
funcstate->win_value = lead_value->value;
funcstate->win_value_is_null = lead_value->valueIsNull;
}
/* The value could be stored in aggTransValue. */
else if (level_state->agg_filled &&
(EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true) ||
(EDGE_IS_BOUND_FOLLOWING(level_state->frame->lead) &&
level_state->num_lead_rows >= level_state->trail_rows)))
{
funcstate->win_value = funcstate->aggTransValue;
funcstate->win_value_is_null = funcstate->aggTransValueIsNull;
}
else
{
WindowRefExprState *wrxstate = funcstate->wrxstate;
ExprState *argstate;
int nargs = list_length(wrxstate->args);
FunctionCallInfoData fcinfo;
int argno = 3;
Assert(nargs <= 4);
if (level_state->lead_rows < 0 ||
level_state->num_lead_rows < level_state->lead_rows)
{
/* Use the default value if any. */
if (nargs == 4)
{
argstate = (ExprState *)list_nth(wrxstate->args, argno);
funcstate->win_value = ExecEvalExpr(argstate, econtext,
fcinfo.argnull + argno, NULL);
funcstate->win_value_is_null = fcinfo.argnull[argno];
}
else
{
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
}
}
else
{
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
}
}
*isnull = funcstate->win_value_is_null;
return funcstate->win_value;
}
/*
* Initial implementation of LEAD(col, offset, value_expr)
*/
Datum
lead_generic(PG_FUNCTION_ARGS)
{
bool isnull;
Datum d;
d = lead_lag_internal(fcinfo, true, &isnull);
if (isnull)
PG_RETURN_NULL();
PG_RETURN_DATUM(d);
}
Datum
lag_generic(PG_FUNCTION_ARGS)
{
bool isnull;
Datum d;
d = lead_lag_internal(fcinfo, false, &isnull);
if (isnull)
PG_RETURN_NULL();
PG_RETURN_DATUM(d);
}
static Datum
last_value_internal(WindowRefExprState *wrxstate, bool *isnull)
{
WindowState *wstate = wrxstate->windowstate;
WindowStatePerFunction funcstate = &wstate->func_state[wrxstate->funcno];
WindowStatePerLevel level_state = funcstate->wlevel;
FrameBufferEntry *lead_entry = level_state->lead_entry_buf;
bool has_lead_entry = false;
WindowValue *lead_value = NULL;
bool lead_valid = true;
bool has_tuples = true;
bool include_last_agg = false;
has_tuples = hasTuplesInFrame(level_state, wstate);
if (has_tuples)
{
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
include_last_agg = true;
if (!EDGE_IS_BOUND(level_state->frame->lead) ||
EDGE_IS_BOUND_FOLLOWING(level_state->frame->lead) ||
EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
include_last_agg = true;
if (!level_state->agg_filled &&
!ntuplestore_acc_tell(level_state->lead_reader, NULL))
{
lead_valid = false;
ntuplestore_acc_seek_last(level_state->lead_reader);
}
if (level_state->is_rows)
{
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
level_state->num_lead_rows >= level_state->trail_rows)
has_lead_entry = getCurrentValue(level_state->lead_reader,
level_state, lead_entry);
}
else
{
has_lead_entry = getCurrentValue(level_state->lead_reader,
level_state, lead_entry);
}
if (has_lead_entry)
lead_value = (WindowValue *)list_nth(lead_entry->func_values,
funcstate->serial_index);
}
if (lead_value != NULL)
{
funcstate->win_value = lead_value->value;
funcstate->win_value_is_null = lead_value->valueIsNull;
}
else
{
if (has_tuples && include_last_agg && level_state->agg_filled)
{
funcstate->win_value = funcstate->aggTransValue;
funcstate->win_value_is_null = funcstate->aggTransValueIsNull;
}
else
{
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
}
}
*isnull = funcstate->win_value_is_null;
if (!lead_valid)
ntuplestore_acc_set_invalid(level_state->lead_reader);
return funcstate->win_value;
}
Datum
last_value_generic(PG_FUNCTION_ARGS)
{
Datum d;
bool isnull = false;
d = last_value_internal((WindowRefExprState *)PG_GETARG_POINTER(0),
&isnull);
if (isnull)
PG_RETURN_NULL();
return d;
}
static Datum
first_value_internal(WindowRefExprState *wrxstate, bool *isnull)
{
WindowState *wstate = wrxstate->windowstate;
WindowStatePerFunction funcstate = &wstate->func_state[wrxstate->funcno];
WindowStatePerLevel level_state = funcstate->wlevel;
FrameBufferEntry *trail_entry = level_state->trail_entry_buf;
bool has_trail_entry = false;
WindowValue *trail_value = NULL;
NTupleStorePos orig_pos;
bool trail_valid;
bool has_tuples = true;
has_tuples = hasTuplesInFrame(level_state, wstate);
if (!has_tuples)
{
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
*isnull = true;
return funcstate->win_value;
}
/* Save the position for the trail_reader */
trail_valid = ntuplestore_acc_tell(level_state->trail_reader, &orig_pos);
/* Since the trail_reader points to the value that is right before
* the trailing edge, we advance the trail_reader by one.
*/
if (trail_valid)
ntuplestore_acc_advance(level_state->trail_reader, 1);
else
{
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
ntuplestore_acc_seek_first(level_state->trail_reader);
}
if (level_state->is_rows)
{
if (!EDGE_IS_BOUND_FOLLOWING(level_state->frame->trail) ||
level_state->num_lead_rows >= level_state->trail_rows)
has_trail_entry = getCurrentValue(level_state->trail_reader,
level_state, trail_entry);
}
else
{
has_trail_entry = getCurrentValue(level_state->trail_reader,
level_state, trail_entry);
}
if (has_trail_entry)
trail_value = (WindowValue *)list_nth(trail_entry->func_values,
funcstate->serial_index);
if (trail_value != NULL)
{
funcstate->win_value = trail_value->value;
funcstate->win_value_is_null = trail_value->valueIsNull;
}
else
{
bool include_last_agg = false;
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->trail, false))
include_last_agg = true;
if (EDGE_EQ_CURRENT_ROW(level_state, wstate, level_state->frame->lead, true))
include_last_agg = true;
if ((has_tuples || include_last_agg || trail_valid) &&
level_state->agg_filled)
{
funcstate->win_value = funcstate->aggTransValue;
funcstate->win_value_is_null = funcstate->aggTransValueIsNull;
}
else
{
funcstate->win_value = 0;
funcstate->win_value_is_null = true;
}
}
/* Reset the trail_reader to its original position. */
if (!ntuplestore_acc_seek(level_state->trail_reader, &orig_pos))
ntuplestore_acc_set_invalid(level_state->trail_reader);
*isnull = funcstate->win_value_is_null;
return funcstate->win_value;
}
Datum
first_value_generic(PG_FUNCTION_ARGS)
{
Datum d;
bool isnull = false;
d = first_value_internal((WindowRefExprState *)PG_GETARG_POINTER(0),
&isnull);
if (isnull)
PG_RETURN_NULL();
return d;
}
void
initGpmonPktForWindow(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, Window));
{
Assert(GPMON_WINDOW_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_Window,
(int64)planNode->plan_rows, NULL);
}
}
void
ExecEagerFreeWindow(WindowState *node)
{
if (node->input_buffer != NULL)
{
freeInputBuffer(node);
}
freeFrameBuffers(node);
}