blob: 634ef4346c71332ad25f099334e4af4fa0c19b97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* tuplesort.c
* Generalized tuple sorting routines.
*
* This module handles sorting of heap tuples, index tuples, or single
* Datums (and could easily support other kinds of sortable objects,
* if necessary). It works efficiently for both small and large amounts
* of data. Small amounts are sorted in-memory using qsort(). Large
* amounts are sorted using temporary files and a standard external sort
* algorithm.
*
* See Knuth, volume 3, for more than you want to know about the external
* sorting algorithm. We divide the input into sorted runs using replacement
* selection, in the form of a priority tree implemented as a heap
* (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
* merge, Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D
* are implemented by logtape.c, which avoids space wastage by recycling
* disk space as soon as each block is read from its "tape".
*
* We do not form the initial runs using Knuth's recommended replacement
* selection data structure (Algorithm 5.4.1R), because it uses a fixed
* number of records in memory at all times. Since we are dealing with
* tuples that may vary considerably in size, we want to be able to vary
* the number of records kept in memory to ensure full utilization of the
* allowed sort memory space. So, we keep the tuples in a variable-size
* heap, with the next record to go out at the top of the heap. Like
* Algorithm 5.4.1R, each record is stored with the run number that it
* must go into, and we use (run number, key) as the ordering key for the
* heap. When the run number at the top of the heap changes, we know that
* no more records of the prior run are left in the heap.
*
* The approximate amount of memory allowed for any one sort operation
* is specified in kilobytes by the caller (most pass work_mem). Initially,
* we absorb tuples and simply store them in an unsorted array as long as
* we haven't exceeded workMem. If we reach the end of the input without
* exceeding workMem, we sort the array using qsort() and subsequently return
* tuples just by scanning the tuple array sequentially. If we do exceed
* workMem, we construct a heap using Algorithm H and begin to emit tuples
* into sorted runs in temporary tapes, emitting just enough tuples at each
* step to get back within the workMem limit. Whenever the run number at
* the top of the heap changes, we begin a new run with a new output tape
* (selected per Algorithm D). After the end of the input is reached,
* we dump out remaining tuples in memory into a final run (or two),
* then merge the runs using Algorithm D.
*
* When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and insert the
* next tuple from its source tape (if any). When the heap empties, the merge
* is complete. The basic merge algorithm thus needs very little memory ---
* only M tuples for an M-way merge, and M is constrained to a small number.
* However, we can still make good use of our full workMem allocation by
* pre-reading additional tuples from each source tape. Without prereading,
* our access pattern to the temporary file would be very erratic; on average
* we'd read one block from each of M source tapes during the same time that
* we're writing M blocks to the output tape, so there is no sequentiality of
* access at all, defeating the read-ahead methods used by most Unix kernels.
* Worse, the output tape gets written into a very random sequence of blocks
* of the temp file, ensuring that things will be even worse when it comes
* time to read that tape. A straightforward merge pass thus ends up doing a
* lot of waiting for disk seeks. We can improve matters by prereading from
* each source tape sequentially, loading about workMem/M bytes from each tape
* in turn. Then we run the merge algorithm, writing but not reading until
* one of the preloaded tuple series runs out. Then we switch back to preread
* mode, fill memory again, and repeat. This approach helps to localize both
* read and write accesses.
*
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* that we can access it randomly. When the caller does not need random
* access, we return from tuplesort_performsort() as soon as we are down
* to one run per logical tape. The final merge is then performed
* on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
* saves one cycle of writing all the data out to disk and reading it in.
*
* Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
* grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
* to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
* tape drives are expensive beasts, and in particular that there will always
* be many more runs than tape drives. In our implementation a "tape drive"
* doesn't cost much more than a few Kb of memory buffers, so we can afford
* to have lots of them. In particular, if we can have as many tape drives
* as sorted runs, we can eliminate any repeated I/O at all. In the current
* code we determine the number of tapes M on the basis of workMem: we want
* workMem/M to be large enough that we read a fair amount of data each time
* we preread from a tape, so as to maintain the locality of access described
* above. Nonetheless, with large workMem we can have many tapes.
*
*
* Portions Copyright (c) 2007-2008, Greenplum inc
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.70 2006/10/04 00:30:04 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "catalog/catquery.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_operator.h"
#include "executor/instrument.h" /* Instrumentation */
#include "executor/nodeSort.h" /* Gpmon */
#include "lib/stringinfo.h" /* StringInfo */
#include "miscadmin.h"
#include "utils/datum.h"
#include "executor/execWorkfile.h"
#include "utils/logtape.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
#include "cdb/cdbvars.h"
#include "utils/dynahash.h" /* my_log2 */
/* GUC variable */
/*
* The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and
* can be freed by a simple pfree(). SortTuples also contain the tuple's
* first key column in Datum/nullflag format, and an index integer.
*
* Storing the first key column lets us save heap_getattr or index_getattr
* calls during tuple comparisons. We could extract and save all the key
* columns not just the first, but this would increase code complexity and
* overhead, and wouldn't actually save any comparison cycles in the common
* case where the first key determines the comparison result. Note that
* for a pass-by-reference datatype, datum1 points into the "tuple" storage.
*
* When sorting single Datums, the data value is represented directly by
* datum1/isnull1. If the datatype is pass-by-reference and isnull1 is false,
* then datum1 points to a separately palloc'd data value that is also pointed
* to by the "tuple" pointer; otherwise "tuple" is NULL.
*
* While building initial runs, tupindex holds the tuple's run number. During
* merge passes, we re-use it to hold the input tape number that each tuple in
* the heap was read from, or to hold the index of the next tuple pre-read
* from the same tape in the case of pre-read entries. tupindex goes unused
* if the sort occurs entirely in memory.
*/
typedef struct
{
MemTuple tuple;
Datum datum1;
int tupindex; /* see notes above */
bool isnull1;
} SortTuple;
/*
* Possible states of a Tuplesort object. These denote the states that
* persist between calls of Tuplesort routines.
*/
typedef enum
{
TSS_INITIAL, /* Loading tuples; still within memory limit */
TSS_BUILDRUNS, /* Loading tuples; writing to tape */
TSS_SORTEDINMEM, /* Sort completed entirely in memory */
TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
TSS_FINALMERGE /* Performing final merge on-the-fly */
} TupSortStatus;
/*
* Parameters for calculation of number of tapes to use --- see inittapes()
* and tuplesort_merge_order().
*
* In this calculation we assume that each tape will cost us about 3 blocks
* worth of buffer space (which is an underestimate for very large data
* volumes, but it's probably close enough --- see logtape.c).
*
* MERGE_BUFFER_SIZE is how much data we'd like to read from each input
* tape during a preread cycle (see discussion at top of file).
*/
#define MINORDER 6 /* minimum merge order */
#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
/*
* Current postion of Tuplesort operation.
*/
struct TuplesortPos
{
/*
* These variables are used after completion of sorting to keep track of
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
*/
int current; /* array index (only used if SORTEDINMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
/* markpos_xxx holds marked position for mark and restore */
union {
LogicalTapePos tapepos;
long mempos;
} markpos;
bool markpos_eof;
LogicalTape* cur_work_tape; /* current tape that I am working on */
};
/*
* Private state of a Tuplesort operation.
*/
struct Tuplesortstate
{
TupSortStatus status; /* enumerated value as shown above */
int nKeys; /* number of columns in sort key */
bool randomAccess; /* did caller request random access? */
long availMem; /* remaining memory available, in bytes */
long availMemMin; /* CDB: availMem low water mark (bytes) */
long availMemMin01; /* MPP-1559: initial low water mark */
long allowedMem; /* total memory allowed, in bytes */
int maxTapes; /* number of tapes (Knuth's T) */
int tapeRange; /* maxTapes-1 (Knuth's P) */
MemoryContext sortcontext; /* memory context holding all sort data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
/*
* These function pointers decouple the routines that must know what kind
* of tuple we are sorting from the routines that don't need to know it.
* They are set up by the tuplesort_begin_xxx routines.
*
* Function to compare two tuples; result is per qsort() convention, ie:
* <0, 0, >0 according as a<b, a=b, a>b. The API must match
* qsort_arg_comparator.
*/
int (*comparetup) (const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
/*
* Function to copy a supplied input tuple into palloc'd space and set up
* its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
* state->availMem must be decreased by the amount of space used for the
* tuple copy (note the SortTuple struct itself is not counted).
*/
void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
/*
* Function to write a stored tuple onto tape. The representation of the
* tuple on tape need not be the same as it is in memory; requirements on
* the tape representation are given below. After writing the tuple,
* pfree() the out-of-line data (not the SortTuple struct!), and increase
* state->availMem by the amount of memory space thereby released.
*/
void (*writetup) (Tuplesortstate *state, LogicalTape *lt,
SortTuple *stup);
/*
* Function to read a stored tuple from tape back into memory. 'len' is
* the already-read length of the stored tuple. Create a palloc'd copy,
* initialize tuple/datum1/isnull1 in the target SortTuple struct, and
* decrease state->availMem by the amount of memory space consumed.
*/
void (*readtup) (Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup,
LogicalTape *lt, uint32 len);
/*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
* and FINALMERGE, the tuples are organized in "heap" order per Algorithm
* H. (Note that memtupcount only counts the tuples that are part of the
* heap --- during merge passes, memtuples[] entries beyond tapeRange are
* never in the heap and are used to hold pre-read tuples.) In state
* SORTEDONTAPE, the array is not used.
*/
SortTuple *memtuples; /* array of SortTuple structs */
long memtupcount; /* number of tuples currently present */
long tuparraysize; /* allocated length of memtuples array */
int64 memtupLIMIT; /* LIMIT of memtuples array */ /*CDB*/
bool memtupblimited; /* true when hit the limit */
int64 dumpcount; /* count of dumps */ /*CDB*/
int64 discardcount; /* count of discards */ /*CDB*/
int64 totalNumTuples; /* count of all input tuples */ /*CDB*/
bool noduplicates; /* discard duplicate rows if true *//* CDB */
int mppsortflags; /* special sort flags*//* CDB */
int64 gpmaxdistinct; /* max number of distinct values */ /*CDB*/
bool standardsort; /* do regular sort if true *//* CDB */
/*
* A flag to indicate whether the stats for this tuplesort
* has been finalized.
*/
bool statsFinalized;
/*
* While building initial runs, this is the current output run number
* (starting at 0). Afterwards, it is the number of initial runs we made.
*/
int currentRun;
/*
* Unless otherwise noted, all pointer variables below are pointers to
* arrays of length maxTapes, holding per-tape data.
*/
/*
* These variables are only used during merge passes. mergeactive[i] is
* true if we are reading an input run from (actual) tape number i and
* have not yet exhausted that run. mergenext[i] is the memtuples index
* of the next pre-read tuple (next to be loaded into the heap) for tape
* i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
* points to the last pre-read tuple from each tape. mergeavailslots[i]
* is the number of unused memtuples[] slots reserved for tape i, and
* mergeavailmem[i] is the amount of unused space allocated for tape i.
* mergefreelist and mergefirstfree keep track of unused locations in the
* memtuples[] array. The memtuples[].tupindex fields link together
* pre-read tuples for each tape as well as recycled locations in
* mergefreelist. It is OK to use 0 as a null link in these lists, because
* memtuples[0] is part of the merge heap and is never a pre-read tuple.
*/
bool *mergeactive; /* active input run source? */
int *mergenext; /* first preread tuple for each source */
int *mergelast; /* last preread tuple for each source */
int *mergeavailslots; /* slots left for prereading each tape */
long *mergeavailmem; /* availMem for prereading each tape */
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
* number, ie, an index into the tp_xxx[] arrays. Be careful to keep
* "logical" and "actual" tape numbers straight!
*/
int Level; /* Knuth's l */
int destTape; /* current output tape (Knuth's j, less 1) */
int *tp_fib; /* Target Fibonacci run counts (A[]) */
int *tp_runs; /* # of real runs on each tape */
int *tp_dummy; /* # of dummy runs for each tape (D[]) */
int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
int activeTapes; /* # of active input tapes in merge pass */
LogicalTape *result_tape; /* actual tape of finished output */
TuplesortPos pos; /* current postion */
/*
* These variables are specific to the MinimalTuple case; they are set by
* tuplesort_begin_heap and used only by the MinimalTuple routines.
*/
TupleDesc tupDesc;
ScanKey scanKeys; /* array of length nKeys */
SortFunctionKind *sortFnKinds; /* array of length nKeys */
MemTupleBinding *mt_bind;
/*
* These variables are specific to the IndexTuple case; they are set by
* tuplesort_begin_index and used only by the IndexTuple routines.
*/
Relation indexRel;
ScanKey indexScanKey;
bool enforceUnique; /* complain if we find duplicate tuples */
/*
* These variables are specific to the Datum case; they are set by
* tuplesort_begin_datum and used only by the DatumTuple routines.
*/
Oid datumType;
Oid sortOperator;
FmgrInfo sortOpFn; /* cached lookup data for sortOperator */
SortFunctionKind sortFnKind;
/* we need typelen and byval in order to know how to copy the Datums. */
int datumTypeLen;
bool datumTypeByVal;
/*
* CDB: EXPLAIN ANALYZE reporting interface and statistics.
*/
struct Instrumentation *instrument;
struct StringInfoData *explainbuf;
uint64 spilledBytes;
uint64 memUsedBeforeSpill; /* memory that is used at the time of spilling */
/*
* Resource snapshot for time of sort start.
*/
PGRUsage ru_start;
/*
* File for dump/load logical tape set state. Used by sharing sort across slice
*/
char *pfile_rwfile_prefix;
ExecWorkFile *pfile_rwfile_state;
/* gpmon */
gpmon_packet_t *gpmon_pkt;
int *gpmon_sort_tick;
};
static bool is_sortstate_rwfile(Tuplesortstate *state)
{
return state->pfile_rwfile_state != NULL;
}
#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,pos,stup,tape,len) ((*(state)->readtup) (state, pos, stup, tape, len))
#define LACKMEM(state) ((state)->availMem < 0)
static inline void USEMEM(Tuplesortstate *state, int amt)
{
state->availMem -= amt;
if(state->gpmon_pkt)
Gpmon_M_Add(state->gpmon_pkt, GPMON_SORT_MEMORY_BYTE, amt);
}
static inline void
FREEMEM(Tuplesortstate *state, int amt)
{
if (state->availMemMin > state->availMem)
state->availMemMin = state->availMem;
state->availMem += amt;
if(state->gpmon_pkt)
Gpmon_M_Add(state->gpmon_pkt, GPMON_SORT_MEMORY_BYTE, -amt);
}
/*
* NOTES about on-tape representation of tuples:
*
* We require the first "unsigned int" of a stored tuple to be the total size
* on-tape of the tuple, including itself (so it is never zero; an all-zero
* unsigned int is used to delimit runs). The remainder of the stored tuple
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
* If state->randomAccess is true, then the stored representation of the
* tuple must be followed by another "unsigned int" that is a copy of the
* length --- so the total tape space used is actually sizeof(unsigned int)
* more than the stored length value. This allows read-backwards. When
* randomAccess is not true, the write/read routines may omit the extra
* length word.
*
* writetup is expected to write both length words as well as the tuple
* data. When readtup is called, the tape is positioned just after the
* front length word; readtup must read the tuple data and advance past
* the back length word (if present).
*
* The write/read routines can make use of the tuple description data
* stored in the Tuplesortstate record, if needed. They are also expected
* to adjust state->availMem by the amount of memory space (not tape space!)
* released or consumed. There is no error return from either writetup
* or readtup; they should ereport() on failure.
*
*
* NOTES about memory consumption calculations:
*
* We count space allocated for tuples against the workMem limit, plus
* the space used by the variable-size memtuples array. Fixed-size space
* is not counted; it's small enough to not be interesting.
*
* Note that we count actual space used (as shown by GetMemoryChunkSpace)
* rather than the originally-requested size. This is important since
* palloc can add substantial overhead. It's not a complete answer since
* we won't count any wasted space in palloc allocation blocks, but it's
* a lot better than what we were doing before 7.3.
*/
static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess, bool allocmemtuple);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
static void inittapes(Tuplesortstate *state, const char* rwfile_prefix);
static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
static void mergepreread(Tuplesortstate *state);
static void mergeprereadone(Tuplesortstate *state, int srcTape);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void tuplesort_sorted_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex);
static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex,
int i);
static unsigned int getlen(Tuplesortstate *state, TuplesortPos *pos, LogicalTape *lt, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_heap(Tuplesortstate *state, LogicalTape *lt, SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup,
LogicalTape *lt, unsigned int len);
static int comparetup_index(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_index(Tuplesortstate *state, LogicalTape *lt, SortTuple *stup);
static void readtup_index(Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup,
LogicalTape *lt, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
static void writetup_datum(Tuplesortstate *state, LogicalTape *lt, SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup,
LogicalTape *lt, unsigned int len);
/*
* tuplesort_begin_xxx
*
* Initialize for a tuple sort operation.
*
* After calling tuplesort_begin, the caller should call tuplesort_putXXX
* zero or more times, then call tuplesort_performsort when all the tuples
* have been supplied. After performsort, retrieve the tuples in sorted
* order by calling tuplesort_getXXX until it returns false/NULL. (If random
* access was requested, rescan, markpos, and restorepos can also be called.)
* Call tuplesort_end to terminate the operation and release memory/disk space.
*
* Each variant of tuplesort_begin has a workMem parameter specifying the
* maximum number of kilobytes of RAM to use before spilling data to disk.
* (The normal value of this parameter is work_mem, but some callers use
* other values.) Each variant also has a randomAccess parameter specifying
* whether the caller needs non-sequential access to the sort result.
*
* CDB: During EXPLAIN ANALYZE, after tuplesort_begin_xxx() the caller should
* use tuplesort_set_instrument() (q.v.) to enable statistical reporting.
*/
static Tuplesortstate *
tuplesort_begin_common(int workMem, bool randomAccess, bool allocmemtuple)
{
Tuplesortstate *state;
MemoryContext sortcontext;
MemoryContext oldcontext;
/*
* Create a working memory context for this sort operation. All data
* needed by the sort will live inside this context.
*/
sortcontext = AllocSetContextCreate(CurrentMemoryContext,
"TupleSort",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* Make the Tuplesortstate within the per-sort context. This way, we
* don't need a separate pfree() operation for it at shutdown.
*/
oldcontext = MemoryContextSwitchTo(sortcontext);
state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
if (trace_sort)
pg_rusage_init(&state->ru_start);
state->status = TSS_INITIAL;
state->randomAccess = randomAccess;
state->allowedMem = workMem * 1024L;
state->availMemMin = state->availMem = state->allowedMem;
state->availMemMin01 = state->availMemMin;
state->sortcontext = sortcontext;
state->tapeset = NULL;
state->memtupcount = 0;
state->tuparraysize = 1024; /* initial guess */
state->memtupLIMIT = 0; /*CDB*/
state->memtupblimited = false;
state->dumpcount = 0; /*CDB*/
state->discardcount = 0; /*CDB*/
state->totalNumTuples = 0; /*CDB*/
state->mppsortflags = 0; /* special sort flags*//* CDB */
state->standardsort = true; /* normal sort *//* CDB */
state->gpmaxdistinct = 20000; /* maximum distinct values *//* CDB */
if(allocmemtuple)
{
state->memtuples = (SortTuple *) palloc(state->tuparraysize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
/* workMem must be large enough for the minimal memtuples array */
if (LACKMEM(state))
elog(ERROR, "insufficient memory allowed for sort");
}
else
{
state->memtuples = NULL;
}
state->currentRun = 0;
/*
* maxTapes, tapeRange, and Algorithm D variables will be initialized by
* inittapes(), if needed
*/
state->result_tape = NULL; /* flag that result tape has not been formed */
state->pos.cur_work_tape = NULL; /* flag no work tape associated with pos */
MemoryContextSwitchTo(oldcontext);
Assert(!state->statsFinalized);
return state;
}
/*
* Initialize some extra CDB attributes for the sort, including limit
* and uniqueness. Should do this after begin_heap.
*
*/
void
cdb_tuplesort_init(Tuplesortstate *state,
int64 offset, int64 limit, int unique, int sort_flags,
int64 maxdistinct)
{
/* set a limit to internal sorts. If the offset is non-zero but
* the limit is not set, then no limit */
if (limit)
{
state->memtupLIMIT = offset + limit;
}
if (unique)
state->noduplicates = true;
state->mppsortflags = sort_flags;
state->gpmaxdistinct = maxdistinct;
/* do a standard sort unless performing limit or duplicate
* elimination */
state->standardsort =
(state->mppsortflags == 0) ||
((state->memtupLIMIT == 0)
&& (!state->noduplicates));
}
/* make a copy of current state pos */
void tuplesort_begin_pos(Tuplesortstate *st, TuplesortPos **pos)
{
TuplesortPos *st_pos;
Assert(st);
st_pos = (TuplesortPos *) palloc0(sizeof(TuplesortPos));
memcpy(st_pos, &(st->pos), sizeof(TuplesortPos));
if(st->tapeset)
st_pos->cur_work_tape = LogicalTapeSetDuplicateTape(st->tapeset, st->result_tape);
*pos = st_pos;
}
Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,
int nkeys,
Oid *sortOperators, AttrNumber *attNums,
int workMem, bool randomAccess)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess, true);
MemoryContext oldcontext;
int i;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
AssertArg(nkeys > 0);
if (trace_sort)
elog(LOG,
"begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
nkeys, workMem, randomAccess ? 't' : 'f');
state->nKeys = nkeys;
state->comparetup = comparetup_heap;
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
state->mt_bind = create_memtuple_binding(tupDesc);
state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
state->sortFnKinds = (SortFunctionKind *)
palloc0(nkeys * sizeof(SortFunctionKind));
for (i = 0; i < nkeys; i++)
{
RegProcedure sortFunction = 0;
AssertArg(sortOperators[i] != 0);
AssertArg(attNums[i] != 0);
/* select a function that implements the sort operator */
SelectSortFunction(sortOperators[i], &sortFunction,
&state->sortFnKinds[i]);
/*
* We needn't fill in sk_strategy or sk_subtype since these scankeys
* will never be passed to an index.
*/
ScanKeyInit(&state->scanKeys[i],
attNums[i],
InvalidStrategy,
sortFunction,
(Datum) 0);
}
MemoryContextSwitchTo(oldcontext);
return state;
}
Tuplesortstate *
tuplesort_begin_heap_file_readerwriter(
const char *rwfile_prefix, bool isWriter,
TupleDesc tupDesc,
int nkeys,
Oid *sortOperators, AttrNumber *attNums,
int workMem, bool randomAccess)
{
Tuplesortstate *state;
char statedump[MAXPGPATH];
char full_prefix[MAXPGPATH];
Assert(randomAccess);
int len = snprintf(statedump, sizeof(statedump), "%s/%s_sortstate", PG_TEMP_FILES_DIR, rwfile_prefix);
insist_log(len <= MAXPGPATH - 1, "could not generate temporary file name");
len = snprintf(full_prefix, sizeof(full_prefix), "%s/%s",
PG_TEMP_FILES_DIR,
rwfile_prefix);
insist_log(len <= MAXPGPATH - 1, "could not generate temporary file name");
if(isWriter)
{
/*
* Writer is a oridinary tuplesort, except the underlying buf file are named by
* rwfile_prefix.
*/
state = tuplesort_begin_heap(tupDesc, nkeys, sortOperators, attNums, workMem, randomAccess);
state->pfile_rwfile_prefix = MemoryContextStrdup(state->sortcontext, full_prefix);
state->pfile_rwfile_state = ExecWorkFile_Create(statedump,
BUFFILE,
true /* delOnClose */ ,
0 /* compressType */ );
Assert(state->pfile_rwfile_state != NULL);
return state;
}
else
{
/*
* For reader, we really don't know anything about sort op, attNums, etc.
* All the readers cares are the data on the logical tape set. The state
* of the logical tape set has been dumped, so we load it back and that is
* it.
*/
state = tuplesort_begin_common(workMem, randomAccess, false);
state->status = TSS_SORTEDONTAPE;
state->randomAccess = true;
state->readtup = readtup_heap;
state->pfile_rwfile_prefix = MemoryContextStrdup(state->sortcontext, full_prefix);
state->pfile_rwfile_state = ExecWorkFile_Open(statedump,
BUFFILE,
false /* delOnClose */,
0 /* compressType */);
ExecWorkFile *tapefile = ExecWorkFile_Open(full_prefix,
BUFFILE,
false /* delOnClose */,
0 /* compressType */);
state->tapeset = LoadLogicalTapeSetState(state->pfile_rwfile_state, tapefile);
state->currentRun = 0;
state->result_tape = LogicalTapeSetGetTape(state->tapeset, 0);
state->pos.eof_reached =false;
state->pos.markpos.tapepos.blkNum = 0;
state->pos.markpos.tapepos.offset = 0;
state->pos.markpos.mempos = 0;
state->pos.markpos_eof = false;
state->pos.cur_work_tape = NULL;
return state;
}
}
Tuplesortstate *
tuplesort_begin_index(Relation indexRel,
bool enforceUnique,
int workMem, bool randomAccess)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess, true);
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
if (trace_sort)
elog(LOG,
"begin index sort: unique = %c, workMem = %d, randomAccess = %c",
enforceUnique ? 't' : 'f',
workMem, randomAccess ? 't' : 'f');
state->nKeys = RelationGetNumberOfAttributes(indexRel);
state->comparetup = comparetup_index;
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->indexRel = indexRel;
/* see comments below about btree dependence of this code... */
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->enforceUnique = enforceUnique;
MemoryContextSwitchTo(oldcontext);
return state;
}
Tuplesortstate *
tuplesort_begin_datum(Oid datumType,
Oid sortOperator,
int workMem, bool randomAccess)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess, true);
MemoryContext oldcontext;
RegProcedure sortFunction = 0;
int16 typlen;
bool typbyval;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
if (trace_sort)
elog(LOG,
"begin datum sort: workMem = %d, randomAccess = %c",
workMem, randomAccess ? 't' : 'f');
state->nKeys = 1; /* always a one-column sort */
state->comparetup = comparetup_datum;
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
state->datumType = datumType;
state->sortOperator = sortOperator;
/* select a function that implements the sort operator */
SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
/* and look up the function */
fmgr_info(sortFunction, &state->sortOpFn);
/* lookup necessary attributes of the datum type */
get_typlenbyval(datumType, &typlen, &typbyval);
state->datumTypeLen = typlen;
state->datumTypeByVal = typbyval;
MemoryContextSwitchTo(oldcontext);
return state;
}
/*
* tuplesort_end
*
* Release resources and clean up.
*
* NOTE: after calling this, any pointers returned by tuplesort_getXXX are
* pointing to garbage. Be careful not to attempt to use or free such
* pointers afterwards!
*/
void
tuplesort_end(Tuplesortstate *state)
{
long spaceUsed;
if (state->tapeset)
spaceUsed = LogicalTapeSetBlocks(state->tapeset);
else
spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
/*
* Delete temporary "tape" files, if any.
*
* Note: want to include this in reported total cost of sort, hence need
* for two #ifdef TRACE_SORT sections.
*/
if (state->tapeset)
{
LogicalTapeSetClose(state->tapeset, NULL /* workset */);
if (state->pfile_rwfile_state)
{
workfile_mgr_close_file(NULL /* workset */, state->pfile_rwfile_state);
}
}
tuplesort_finalize_stats(state);
if (trace_sort)
{
if (state->tapeset)
elog(LOG, "external sort ended, %ld disk blocks used: %s",
spaceUsed, pg_rusage_show(&state->ru_start));
else
elog(LOG, "internal sort ended, %ld KB used: %s",
spaceUsed, pg_rusage_show(&state->ru_start));
}
/*
* Free the per-sort memory context, thereby releasing all working memory,
* including the Tuplesortstate struct itself.
*/
MemoryContextDelete(state->sortcontext);
}
/*
* tuplesort_finalize_stats
*
* Finalize the EXPLAIN ANALYZE stats.
*/
void
tuplesort_finalize_stats(Tuplesortstate *state)
{
if (state->instrument && !state->statsFinalized)
{
double workmemused;
/* How close did we come to the work_mem limit? */
FREEMEM(state, 0); /* update low-water mark */
workmemused = MemoryContextGetPeakSpace(state->sortcontext);
if (state->instrument->workmemused < workmemused)
state->instrument->workmemused = workmemused;
/* Report executor memory used by our memory context. */
state->instrument->execmemused +=
(double)MemoryContextGetPeakSpace(state->sortcontext);
state->statsFinalized = true;
}
}
/*
* tuplesort_set_instrument
*
* May be called after tuplesort_begin_xxx() to enable reporting of
* statistics and events for EXPLAIN ANALYZE.
*
* The 'instr' and 'explainbuf' ptrs are retained in the 'state' object for
* possible use anytime during the sort, up to and including tuplesort_end().
* The caller must ensure that the referenced objects remain allocated and
* valid for the life of the Tuplesortstate object; or if they are to be
* freed early, disconnect them by calling again with NULL pointers.
*/
void
tuplesort_set_instrument(Tuplesortstate *state,
struct Instrumentation *instrument,
struct StringInfoData *explainbuf)
{
state->instrument = instrument;
state->explainbuf = explainbuf;
} /* tuplesort_set_instrument */
void
tuplesort_set_gpmon(Tuplesortstate *state, gpmon_packet_t *pkt, int *tick)
{
state->gpmon_pkt = pkt;
state->gpmon_sort_tick = tick;
}
/*
* Grow the memtuples[] array, if possible within our memory constraint.
* Return TRUE if able to enlarge the array, FALSE if not.
*
* At each increment we double the size of the array. When we are short
* on memory we could consider smaller increases, but because availMem
* moves around with tuple addition/removal, this might result in thrashing.
* Small increases in the array size are likely to be pretty inefficient.
*/
static bool
grow_memtuples(Tuplesortstate *state)
{
/*
* We need to be sure that we do not cause LACKMEM to become true, else
* the space management algorithm will go nuts. We assume here that the
* memory chunk overhead associated with the memtuples array is constant
* and so there will be no unexpected addition to what we ask for. (The
* minimum array size established in tuplesort_begin_common is large
* enough to force palloc to treat it as a separate chunk, so this
* assumption should be good. But let's check it.)
*/
if (state->availMem <= (long) (state->tuparraysize * sizeof(SortTuple)))
return false;
/*
* On a 64-bit machine, allowedMem could be high enough to get us into
* trouble with MaxAllocSize, too.
*/
if ((Size) (state->tuparraysize * 2) >= MaxAllocSize / sizeof(SortTuple))
return false;
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->tuparraysize *= 2;
state->memtuples = (SortTuple *)
repalloc(state->memtuples,
state->tuparraysize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
if (LACKMEM(state))
elog(ERROR, "unexpected out-of-memory situation during sort");
return true;
}
/*
* Accept one tuple while collecting input data for sort.
*
* Note that the input data is always copied; the caller need not save it.
*/
void
tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
/*
* Copy the given tuple into memory we control, and decrease availMem.
* Then call the common code.
*/
COPYTUP(state, &stup, (void *) slot);
puttuple_common(state, &stup);
MemoryContextSwitchTo(oldcontext);
}
/*
* Accept one index tuple while collecting input data for sort.
*
* Note that the input tuple is always copied; the caller need not save it.
*/
void
tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
/*
* Copy the given tuple into memory we control, and decrease availMem.
* Then call the common code.
*/
COPYTUP(state, &stup, (void *) tuple);
puttuple_common(state, &stup);
MemoryContextSwitchTo(oldcontext);
}
/*
* Accept one Datum while collecting input data for sort.
*
* If the Datum is pass-by-ref type, the value will be copied.
*/
void
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
/*
* If it's a pass-by-reference value, copy it into memory we control, and
* decrease availMem. Then call the common code.
*/
if (isNull || state->datumTypeByVal)
{
stup.datum1 = val;
stup.isnull1 = isNull;
stup.tuple = NULL; /* No separate storage */
}
else
{
stup.datum1 = datumCopy(val, false, state->datumTypeLen);
stup.isnull1 = false;
stup.tuple = DatumGetPointer(stup.datum1);
USEMEM(state, GetMemoryChunkSpace(DatumGetPointer(stup.datum1)));
}
/*
* MPP-1561: always safe to set the index to zero, which matches
* the behavior of tuplesort_sorted_insert and inittapes.
*/
stup.tupindex = 0;
puttuple_common(state, &stup);
MemoryContextSwitchTo(oldcontext);
}
/*
* Shared code for tuple and datum cases.
*/
static void
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
{
int do_standardsort = (state->memtupcount == 0) || state->standardsort;
state->totalNumTuples++;
/* gpmon */
if(state->gpmon_pkt)
Gpmon_M_Incr(state->gpmon_pkt, GPMON_QEXEC_M_ROWSIN);
switch (state->status)
{
case TSS_INITIAL:
/*
* Save the tuple into the unsorted array. First, grow the array
* as needed. Note that we try to grow the array when there is
* still one free slot remaining --- if we fail, there'll still be
* room to store the incoming tuple, and then we'll switch to
* tape-based operation.
*/
if (state->memtupcount >= state->tuparraysize - 1)
{
(void) grow_memtuples(state);
Assert(state->memtupcount < state->tuparraysize);
}
if (do_standardsort)
{
state->memtuples[state->memtupcount++] = *tuple;
}
else
{
/* sorting LIMIT or noduplicates: only one run */
tuplesort_sorted_insert(state, tuple, 0, false);
/* check if have sufficient discards to justify
* fullsort
*/
/*
* don't bother if mppsortflags set to "always on"
*/
if ((state->mppsortflags > 1) &&
(state->totalNumTuples >= state->mppsortflags))
{
int64 discard_ratio =
(state->discardcount * 100)/state->totalNumTuples;
if (discard_ratio < 50)
{
state->standardsort = true;
}
/* MPP-1342: limit the number of distinct values */
if (state->memtupcount > state->gpmaxdistinct)
{
state->standardsort = true;
}
}
}
/*
* Done if we still fit in available memory and have array slots.
*/
if (state->memtupcount < state->tuparraysize && !LACKMEM(state))
return;
state->memUsedBeforeSpill = MemoryContextGetPeakSpace(state->sortcontext);
/*
* Nope; time to switch to tape-based operation.
*/
inittapes(state, is_sortstate_rwfile(state) ? state->pfile_rwfile_prefix : NULL);
/*
* Dump tuples until we are back under the limit.
*/
dumptuples(state, false);
break;
case TSS_BUILDRUNS:
/*
* Insert the tuple into the heap, with run number currentRun if
* it can go into the current run, else run number currentRun+1.
* The tuple can go into the current run if it is >= the first
* not-yet-output tuple. (Actually, it could go into the current
* run if it is >= the most recently output tuple ... but that
* would require keeping around the tuple we last output, and it's
* simplest to let writetup free each tuple as soon as it's
* written.)
*
* Note there will always be at least one tuple in the heap at
* this point; see dumptuples.
*/
Assert(state->memtupcount > 0);
if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
tuplesort_heap_insert(state, tuple, state->currentRun, true);
else
tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
/*
* If we are over the memory limit, dump tuples till we're under.
*/
dumptuples(state, false);
break;
default:
elog(ERROR, "invalid tuplesort state");
break;
}
}
/*
* All tuples have been provided; finish the sort.
*/
void
tuplesort_performsort(Tuplesortstate *state)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
if (trace_sort)
elog(LOG, "performsort starting: %s",
pg_rusage_show(&state->ru_start));
switch (state->status)
{
case TSS_INITIAL:
if(!is_sortstate_rwfile(state))
{
/*
* We were able to accumulate all the tuples within the allowed
* amount of memory. Just qsort 'em and we're done.
*/
if ((state->memtupcount > 1)
&& state->standardsort)
qsort_arg((void *) state->memtuples,
state->memtupcount,
sizeof(SortTuple),
(qsort_arg_comparator) state->comparetup,
(void *) state);
state->pos.current = 0;
state->pos.eof_reached = false;
state->pos.markpos.mempos = 0;
state->pos.markpos_eof = false;
state->status = TSS_SORTEDINMEM;
break;
}
else
{
inittapes(state, state->pfile_rwfile_prefix);
/* fall through */
}
case TSS_BUILDRUNS:
/*
* Finish tape-based sort. First, flush all tuples remaining in
* memory out to tape; then merge until we have a single remaining
* run (or, if !randomAccess, one run per tape). Note that
* mergeruns sets the correct state->status.
*/
dumptuples(state, true);
/* CDB: How much work_mem would be enough for in-memory sort? */
if (state->instrument)
{
/*
* The workmemwanted is summed up of the following:
* (1) metadata: Tuplesortstate, tuple array
* (2) the total bytes for all tuples.
*/
int64 workmemwanted =
sizeof(Tuplesortstate) +
((uint64)(1 << my_log2(state->totalNumTuples))) * sizeof(SortTuple) +
state->spilledBytes;
state->instrument->workmemwanted =
Max(state->instrument->workmemwanted, workmemwanted);
}
mergeruns(state);
state->pos.eof_reached = false;
state->pos.markpos.tapepos.blkNum = 0L;
state->pos.markpos.tapepos.offset = 0;
state->pos.markpos_eof = false;
break;
default:
elog(ERROR, "invalid tuplesort state");
break;
}
if (trace_sort)
{
if (state->status == TSS_FINALMERGE)
elog(LOG, "performsort done (except %d-way final merge): %s",
state->activeTapes,
pg_rusage_show(&state->ru_start));
else
elog(LOG, "performsort done: %s",
pg_rusage_show(&state->ru_start));
}
/* MPP-1559 */
state->availMemMin01 = state->availMemMin;
MemoryContextSwitchTo(oldcontext);
}
void tuplesort_flush(Tuplesortstate *state)
{
Assert(state->status == TSS_SORTEDONTAPE);
Assert(state->tapeset && state->pfile_rwfile_state);
Assert(state->pos.cur_work_tape == NULL);
LogicalTapeFlush(state->tapeset, state->result_tape, state->pfile_rwfile_state);
ExecWorkFile_Flush(state->pfile_rwfile_state);
}
/*
* Internal routine to fetch the next tuple in either forward or back
* direction into *stup. Returns FALSE if no more tuples.
* If *should_free is set, the caller must pfree stup.tuple when done with it.
*/
static bool
tuplesort_gettuple_common_pos(Tuplesortstate *state, TuplesortPos *pos,
bool forward, SortTuple *stup, bool *should_free)
{
unsigned int tuplen;
LogicalTape *work_tape;
bool fOK;
switch (state->status)
{
case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess);
*should_free = false;
if (forward)
{
if (pos->current < state->memtupcount)
{
*stup = state->memtuples[pos->current++];
return true;
}
pos->eof_reached = true;
return false;
}
else
{
if (pos->current <= 0)
return false;
/*
* if all tuples are fetched already then we return last
* tuple, else - tuple before last returned.
*/
if (pos->eof_reached)
pos->eof_reached = false;
else
{
pos->current--; /* last returned tuple */
if (pos->current <= 0)
return false;
}
*stup = state->memtuples[pos->current - 1];
return true;
}
break;
case TSS_SORTEDONTAPE:
AssertEquivalent((pos == &state->pos), (pos->cur_work_tape == NULL));
Assert(forward || state->randomAccess);
*should_free = true;
work_tape = pos->cur_work_tape == NULL ? state->result_tape : pos->cur_work_tape;
if (forward)
{
if (pos->eof_reached)
return false;
if ((tuplen = getlen(state, pos, work_tape, true)) != 0)
{
READTUP(state, pos, stup, work_tape, tuplen);
return true;
}
else
{
pos->eof_reached = true;
return false;
}
}
/*
* Backward.
*
* if all tuples are fetched already then we return last tuple,
* else - tuple before last returned.
*/
/*
* Seek position is pointing just past the zero tuplen at the
* end of file; back up to fetch last tuple's ending length
* word. If seek fails we must have a completely empty file.
*/
fOK = LogicalTapeBackspace(state->tapeset, work_tape, 2*sizeof(unsigned));
if(!fOK)
return false;
if (pos->eof_reached)
{
pos->eof_reached = false;
}
else
{
tuplen = getlen(state, pos, work_tape, false);
/*
* Back up to get ending length word of tuple before it.
*/
fOK = LogicalTapeBackspace(state->tapeset, work_tape, tuplen + 2 *sizeof(unsigned));
if (!fOK)
{
/*
* If that fails, presumably the prev tuple is the first
* in the file. Back up so that it becomes next to read
* in forward direction (not obviously right, but that is
* what in-memory case does).
*/
fOK = LogicalTapeBackspace(state->tapeset, work_tape, tuplen + 2 *sizeof(unsigned));
if(!fOK)
elog(ERROR, "bogus tuple length in backward scan");
return false;
}
}
tuplen = getlen(state, pos, work_tape, false);
/*
* Now we have the length of the prior tuple, back up and read it.
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
fOK = LogicalTapeBackspace(state->tapeset, work_tape, tuplen);
if (!fOK)
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, pos, stup, work_tape, tuplen);
return true;
case TSS_FINALMERGE:
Assert(forward);
Assert(pos == &state->pos && pos->cur_work_tape == NULL);
*should_free = true;
/*
* This code should match the inner loop of mergeonerun().
*/
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
Size tuplen;
int tupIndex;
SortTuple *newtup;
*stup = state->memtuples[0];
/* returned tuple is no longer counted in our memory space */
if (stup->tuple)
{
tuplen = GetMemoryChunkSpace(stup->tuple);
FREEMEM(state, tuplen);
state->mergeavailmem[srcTape] += tuplen;
}
tuplesort_heap_siftup(state, false, 0);
if ((tupIndex = state->mergenext[srcTape]) == 0)
{
/*
* out of preloaded data on this tape, try to read more
*
* Unlike mergeonerun(), we only preload from the single
* tape that's run dry. See mergepreread() comments.
*/
mergeprereadone(state, srcTape);
/*
* if still no data, we've reached end of run on this tape
*/
if ((tupIndex = state->mergenext[srcTape]) == 0)
return true;
}
/* pull next preread tuple from list, insert in heap */
newtup = &state->memtuples[tupIndex];
state->mergenext[srcTape] = newtup->tupindex;
if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0;
tuplesort_heap_insert(state, newtup, srcTape, false);
/* put the now-unused memtuples entry on the freelist */
newtup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeavailslots[srcTape]++;
return true;
}
return false;
default:
elog(ERROR, "invalid tuplesort state");
return false; /* keep compiler quiet */
}
}
/*
* Fetch the next tuple in either forward or back direction.
* If successful, put tuple in slot and return TRUE; else, clear the slot
* and return FALSE.
*/
bool
tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
TupleTableSlot *slot)
{
return tuplesort_gettupleslot_pos(state, &state->pos, forward, slot);
}
bool
tuplesort_gettupleslot_pos(Tuplesortstate *state, TuplesortPos *pos,
bool forward, TupleTableSlot *slot)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
bool should_free = false;
if (!tuplesort_gettuple_common_pos(state, pos, forward, &stup, &should_free))
stup.tuple = NULL;
MemoryContextSwitchTo(oldcontext);
if (stup.tuple)
{
ExecStoreMemTuple(stup.tuple, slot, should_free);
if (state->gpmon_pkt)
Gpmon_M_Incr_Rows_Out(state->gpmon_pkt);
return true;
}
ExecClearTuple(slot);
return false;
}
/*
* Fetch the next index tuple in either forward or back direction.
* Returns NULL if no more tuples. If *should_free is set, the
* caller must pfree the returned tuple when done with it.
*/
IndexTuple
tuplesort_getindextuple(Tuplesortstate *state, bool forward,
bool *should_free)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
if (!tuplesort_gettuple_common_pos(state, &state->pos, forward, &stup, should_free))
stup.tuple = NULL;
MemoryContextSwitchTo(oldcontext);
return (IndexTuple) (stup.tuple);
}
/*
* Fetch the next Datum in either forward or back direction.
* Returns FALSE if no more datums.
*
* If the Datum is pass-by-ref type, the returned value is freshly palloc'd
* and is now owned by the caller.
*/
bool
tuplesort_getdatum(Tuplesortstate *state, bool forward,
Datum *val, bool *isNull)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
bool should_free = false;
if (!tuplesort_gettuple_common_pos(state, &state->pos, forward, &stup, &should_free))
{
MemoryContextSwitchTo(oldcontext);
return false;
}
if (stup.isnull1 || state->datumTypeByVal)
{
*val = stup.datum1;
*isNull = stup.isnull1;
}
else
{
if (should_free)
*val = stup.datum1;
else
*val = datumCopy(stup.datum1, false, state->datumTypeLen);
*isNull = false;
}
MemoryContextSwitchTo(oldcontext);
return true;
}
/*
* inittapes - initialize for tape sorting.
*
* This is called only if we have found we don't have room to sort in memory.
*/
static void
inittapes(Tuplesortstate *state, const char* rwfile_prefix)
{
int maxTapes,
ntuples,
j;
long tapeSpace;
/* Compute number of tapes to use: merge order plus 1 */
maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
/*
* We must have at least 2*maxTapes slots in the memtuples[] array, else
* we'd not have room for merge heap plus preread. It seems unlikely that
* this case would ever occur, but be safe.
*/
maxTapes = Min(maxTapes, state->tuparraysize / 2);
/* XXX XXX: with losers, only need 1x slots because we don't need a merge heap */
state->maxTapes = maxTapes;
state->tapeRange = maxTapes - 1;
if (trace_sort)
elog(LOG, "switching to external sort with %d tapes: %s",
maxTapes, pg_rusage_show(&state->ru_start));
/*
* Decrease availMem to reflect the space needed for tape buffers; but
* don't decrease it to the point that we have no room for tuples. (That
* case is only likely to occur if sorting pass-by-value Datums; in all
* other scenarios the memtuples[] array is unlikely to occupy more than
* half of allowedMem. In the pass-by-value case it's not important to
* account for tuple space, so we don't care if LACKMEM becomes
* inaccurate.)
*/
tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
USEMEM(state, tapeSpace);
/*
* Create the tape set and allocate the per-tape data arrays.
*/
if(!rwfile_prefix){
state->tapeset = LogicalTapeSetCreate(maxTapes, true /* del_on_close */);
}
else
{
/* We are shared XSLICE, use given prefix to create files so that consumers can find them */
ExecWorkFile *tape_file = ExecWorkFile_Create(rwfile_prefix,
BUFFILE,
true /* delOnClose */,
0 /* compressType */);
state->tapeset = LogicalTapeSetCreate_File(tape_file, maxTapes);
}
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
/*
* Convert the unsorted contents of memtuples[] into a heap. Each tuple is
* marked as belonging to run number zero.
*
* NOTE: we pass false for checkIndex since there's no point in comparing
* indexes in this step, even though we do intend the indexes to be part
* of the sort key...
*/
if (state->standardsort)
{
ntuples = state->memtupcount;
state->memtupcount = 0; /* make the heap empty */
for (j = 0; j < ntuples; j++)
{
/* Must copy source tuple to avoid possible overwrite */
SortTuple stup = state->memtuples[j];
tuplesort_heap_insert(state, &stup, 0, false);
}
Assert(state->memtupcount == ntuples);
}
state->currentRun = 0;
/*
* Initialize variables of Algorithm D (step D1).
*/
for (j = 0; j < maxTapes; j++)
{
state->tp_fib[j] = 1;
state->tp_runs[j] = 0;
state->tp_dummy[j] = 1;
state->tp_tapenum[j] = j;
}
state->tp_fib[state->tapeRange] = 0;
state->tp_dummy[state->tapeRange] = 0;
state->Level = 1;
state->destTape = 0;
state->status = TSS_BUILDRUNS;
}
/*
* selectnewtape -- select new tape for new initial run.
*
* This is called after finishing a run when we know another run
* must be started. This implements steps D3, D4 of Algorithm D.
*/
static void
selectnewtape(Tuplesortstate *state)
{
int j;
int a;
/* Step D3: advance j (destTape) */
if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
{
state->destTape++;
return;
}
if (state->tp_dummy[state->destTape] != 0)
{
state->destTape = 0;
return;
}
/* Step D4: increase level */
state->Level++;
a = state->tp_fib[0];
for (j = 0; j < state->tapeRange; j++)
{
state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
state->tp_fib[j] = a + state->tp_fib[j + 1];
}
state->destTape = 0;
}
/*
* mergeruns -- merge all the completed initial runs.
*
* This implements steps D5, D6 of Algorithm D. All input data has
* already been written to initial runs on tape (see dumptuples).
*/
static void
mergeruns(Tuplesortstate *state)
{
int tapenum,
svTape,
svRuns,
svDummy;
LogicalTape *lt = NULL;
Assert(state->status == TSS_BUILDRUNS);
/*
* If we produced only one initial run (quite likely if the total data
* volume is between 1X and 2X workMem), we can just use that tape as the
* finished output, rather than doing a useless merge. (This obvious
* optimization is not in Knuth's algorithm.)
*/
if (state->currentRun == 1)
{
state->result_tape = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->destTape]);
/* must freeze and rewind the finished output tape */
LogicalTapeFreeze(state->tapeset, state->result_tape);
state->status = TSS_SORTEDONTAPE;
return;
}
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
lt = LogicalTapeSetGetTape(state->tapeset, tapenum);
LogicalTapeRewind(state->tapeset, lt, false);
}
/* Clear gpmon for repilling data */
if(state->gpmon_pkt)
{
Gpmon_M_Incr(state->gpmon_pkt, GPMON_SORT_SPILLPASS);
Gpmon_M_Reset(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_TUPLE);
Gpmon_M_Reset(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_BYTE);
}
for (;;)
{
/*
* At this point we know that tape[T] is empty. If there's just one
* (real or dummy) run left on each input tape, then only one merge
* pass remains. If we don't have to produce a materialized sorted
* tape, we can stop at this point and do the final merge on-the-fly.
*/
if (!state->randomAccess)
{
bool allOneRun = true;
Assert(state->tp_runs[state->tapeRange] == 0);
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
{
allOneRun = false;
break;
}
}
if (allOneRun)
{
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
beginmerge(state);
state->status = TSS_FINALMERGE;
return;
}
}
/* Step D5: merge runs onto tape[T] until tape[P] is empty */
while (state->tp_runs[state->tapeRange - 1] ||
state->tp_dummy[state->tapeRange - 1])
{
bool allDummy = true;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] == 0)
{
allDummy = false;
break;
}
}
if (allDummy)
{
state->tp_dummy[state->tapeRange]++;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
state->tp_dummy[tapenum]--;
}
else
mergeonerun(state);
}
/* Step D6: decrease level */
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->tapeRange]);
LogicalTapeRewind(state->tapeset, lt, false);
/* rewind used-up input tape P, and prepare it for write pass */
lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
LogicalTapeRewind(state->tapeset, lt, true);
state->tp_runs[state->tapeRange - 1] = 0;
/*
* reassign tape units per step D6; note we no longer care about A[]
*/
svTape = state->tp_tapenum[state->tapeRange];
svDummy = state->tp_dummy[state->tapeRange];
svRuns = state->tp_runs[state->tapeRange];
for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
{
state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
}
state->tp_tapenum[0] = svTape;
state->tp_dummy[0] = svDummy;
state->tp_runs[0] = svRuns;
}
/*
* Done. Knuth says that the result is on TAPE[1], but since we exited
* the loop without performing the last iteration of step D6, we have not
* rearranged the tape unit assignment, and therefore the result is on
* TAPE[T]. We need to do it this way so that we can freeze the final
* output tape while rewinding it. The last iteration of step D6 would be
* a waste of cycles anyway...
*/
state->result_tape = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->tapeRange]);
LogicalTapeFreeze(state->tapeset, state->result_tape);
state->status = TSS_SORTEDONTAPE;
}
/*
* Merge one run from each input tape, except ones with dummy runs.
*
* This is the inner loop of Algorithm D step D5. We know that the
* output tape is TAPE[T].
*/
static void
mergeonerun(Tuplesortstate *state)
{
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
int tupIndex;
SortTuple *tup;
long priorAvail,
spaceFreed;
LogicalTape *lt = NULL;
/*
* Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts.
*/
beginmerge(state);
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
* out, and replacing it with next tuple from same tape (if there is
* another one).
*/
lt = LogicalTapeSetGetTape(state->tapeset, destTape);
while (state->memtupcount > 0)
{
/* write the tuple to destTape */
priorAvail = state->availMem;
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, lt, &state->memtuples[0]);
/* writetup adjusted total free space, now fix per-tape space */
spaceFreed = state->availMem - priorAvail;
state->mergeavailmem[srcTape] += spaceFreed;
/* compact the heap */
tuplesort_heap_siftup(state, false, 0);
if ((tupIndex = state->mergenext[srcTape]) == 0)
{
/* out of preloaded data on this tape, try to read more */
mergepreread(state);
/* if still no data, we've reached end of run on this tape */
if ((tupIndex = state->mergenext[srcTape]) == 0)
continue;
}
/* pull next preread tuple from list, insert in heap */
tup = &state->memtuples[tupIndex];
state->mergenext[srcTape] = tup->tupindex;
if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0;
tuplesort_heap_insert(state, tup, srcTape, false);
/* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeavailslots[srcTape]++;
}
/*
* When the heap empties, we're done. Write an end-of-run marker on the
* output tape, and increment its count of real runs.
*/
markrunend(state, destTape);
state->tp_runs[state->tapeRange]++;
if (trace_sort)
elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
pg_rusage_show(&state->ru_start));
}
/*
* beginmerge - initialize for a merge pass
*
* We decrease the counts of real and dummy runs for each tape, and mark
* which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
*/
static void
beginmerge(Tuplesortstate *state)
{
int activeTapes;
int tapenum;
int srcTape;
int slotsPerTape;
long spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
/* Adjust run counts and mark the active tapes */
memset(state->mergeactive, 0,
state->maxTapes * sizeof(*state->mergeactive));
activeTapes = 0;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] > 0)
state->tp_dummy[tapenum]--;
else
{
Assert(state->tp_runs[tapenum] > 0);
state->tp_runs[tapenum]--;
srcTape = state->tp_tapenum[tapenum];
state->mergeactive[srcTape] = true;
activeTapes++;
}
}
state->activeTapes = activeTapes;
/* Clear merge-pass state variables */
memset(state->mergenext, 0,
state->maxTapes * sizeof(*state->mergenext));
memset(state->mergelast, 0,
state->maxTapes * sizeof(*state->mergelast));
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space.
*/
Assert(activeTapes > 0);
slotsPerTape = (state->tuparraysize - state->mergefirstfree) / activeTapes;
Assert(slotsPerTape > 0);
spacePerTape = state->availMem / activeTapes;
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (state->mergeactive[srcTape])
{
state->mergeavailslots[srcTape] = slotsPerTape;
state->mergeavailmem[srcTape] = spacePerTape;
}
}
/*
* Preread as many tuples as possible (and at least one) from each active
* tape
*/
mergepreread(state);
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
int tupIndex = state->mergenext[srcTape];
SortTuple *tup;
if (tupIndex)
{
tup = &state->memtuples[tupIndex];
state->mergenext[srcTape] = tup->tupindex;
if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0;
tuplesort_heap_insert(state, tup, srcTape, false);
/* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeavailslots[srcTape]++;
}
}
}
/*
* mergepreread - load tuples from merge input tapes
*
* This routine exists to improve sequentiality of reads during a merge pass,
* as explained in the header comments of this file. Load tuples from each
* active source tape until the tape's run is exhausted or it has used up
* its fair share of available memory. In any case, we guarantee that there
* is at least one preread tuple available from each unexhausted input tape.
*
* We invoke this routine at the start of a merge pass for initial load,
* and then whenever any tape's preread data runs out. Note that we load
* as much data as possible from all tapes, not just the one that ran out.
* This is because logtape.c works best with a usage pattern that alternates
* between reading a lot of data and writing a lot of data, so whenever we
* are forced to read, we should fill working memory completely.
*
* In FINALMERGE state, we *don't* use this routine, but instead just preread
* from the single tape that ran dry. There's no read/write alternation in
* that state and so no point in scanning through all the tapes to fix one.
* (Moreover, there may be quite a lot of inactive tapes in that state, since
* we might have had many fewer runs than tapes. In a regular tape-to-tape
* merge we can expect most of the tapes to be active.)
*/
static void
mergepreread(Tuplesortstate *state)
{
int srcTape;
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
mergeprereadone(state, srcTape);
}
/*
* mergeprereadone - load tuples from one merge input tape
*
* Read tuples from the specified tape until it has used up its free memory
* or array slots; but ensure that we have at least one tuple, if any are
* to be had.
*/
static void
mergeprereadone(Tuplesortstate *state, int srcTape)
{
unsigned int tuplen;
SortTuple stup;
int tupIndex;
long priorAvail,
spaceUsed;
LogicalTape *srclt = NULL;
if (!state->mergeactive[srcTape])
return; /* tape's run is already exhausted */
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
srclt = LogicalTapeSetGetTape(state->tapeset, srcTape);
while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
state->mergenext[srcTape] == 0)
{
/* read next tuple, if any */
Assert(state->pos.cur_work_tape == NULL);
if ((tuplen = getlen(state, &state->pos, srclt, true)) == 0)
{
state->mergeactive[srcTape] = false;
break;
}
READTUP(state, &state->pos, &stup, srclt, tuplen);
/* find a free slot in memtuples[] for it */
tupIndex = state->mergefreelist;
if (tupIndex)
state->mergefreelist = state->memtuples[tupIndex].tupindex;
else
{
tupIndex = state->mergefirstfree++;
Assert(tupIndex < state->tuparraysize);
}
state->mergeavailslots[srcTape]--;
/* store tuple, append to list for its tape */
stup.tupindex = 0;
state->memtuples[tupIndex] = stup;
if (state->mergelast[srcTape])
state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
else
state->mergenext[srcTape] = tupIndex;
state->mergelast[srcTape] = tupIndex;
}
/* update per-tape and global availmem counts */
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
state->mergeavailmem[srcTape] = state->availMem;
state->availMem = priorAvail - spaceUsed;
}
/*
* dumptuples - remove tuples from heap and write to tape
*
* This is used during initial-run building, but not during merging.
*
* When alltuples = false, dump only enough tuples to get under the
* availMem limit (and leave at least one tuple in the heap in any case,
* since puttuple assumes it always has a tuple to compare to). We also
* insist there be at least one free slot in the memtuples[] array.
*
* When alltuples = true, dump everything currently in memory.
* (This case is only used at end of input data.)
*
* If we empty the heap, close out the current run and return (this should
* only happen at end of input data). If we see that the tuple run number
* at the top of the heap has changed, start a new run.
*/
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
int bDumped = 0;
long spilledBytes = state->availMem;
LogicalTape *lt = NULL;
while (alltuples ||
(LACKMEM(state) && state->memtupcount > 1) ||
state->memtupcount >= state->tuparraysize)
{
/* ShareInput or sort: The sort may sort nothing, we still
* need to handle it here
*/
if (state->memtupcount == 0)
{
markrunend(state, state->tp_tapenum[state->destTape]);
state->currentRun++;
state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
break;
}
if (!bDumped)
bDumped = 1;
/*
* Dump the heap's frontmost entry, and sift up to remove it from the
* heap.
*/
Assert(state->memtupcount > 0);
lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->destTape]);
WRITETUP(state, lt, &state->memtuples[0]);
tuplesort_heap_siftup(state, true, 0);
/*
* If the heap is empty *or* top run number has changed, we've
* finished the current run.
*/
if (state->memtupcount == 0 || state->currentRun != state->memtuples[0].tupindex)
{
markrunend(state, state->tp_tapenum[state->destTape]);
state->currentRun++;
state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
if (trace_sort)
elog(LOG, "finished writing%s run %d to tape %d: %s",
(state->memtupcount == 0) ? " final" : "",
state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
/*
* Done if heap is empty, else prepare for new run.
*/
if (state->memtupcount == 0)
break;
Assert(state->currentRun == state->memtuples[0].tupindex);
selectnewtape(state);
}
}
if (bDumped) state->dumpcount++;
/* CDB: Accumulate total size of spilled tuples. */
spilledBytes = state->availMem - spilledBytes;
if (spilledBytes > 0)
state->spilledBytes += spilledBytes;
if(state->gpmon_pkt)
tuplesort_checksend_gpmonpkt(state->gpmon_pkt, state->gpmon_sort_tick);
}
/*
* Put pos at the begining of the tuplesort. Create pos->work_tape if necessary
*/
void
tuplesort_rescan_pos(Tuplesortstate *state, TuplesortPos *pos)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
Assert(state->randomAccess);
switch (state->status)
{
case TSS_SORTEDINMEM:
pos->current = 0;
pos->eof_reached = false;
pos->markpos.mempos = 0;
pos->markpos_eof = false;
pos->cur_work_tape = NULL;
break;
case TSS_SORTEDONTAPE:
if(pos == &state->pos)
{
Assert(pos->cur_work_tape == NULL);
LogicalTapeRewind(state->tapeset, state->result_tape, false);
}
else
{
if(pos->cur_work_tape == NULL)
pos->cur_work_tape = state->result_tape;
LogicalTapeRewind(state->tapeset, pos->cur_work_tape, false);
}
pos->eof_reached = false;
pos->markpos.tapepos.blkNum = 0L;
pos->markpos.tapepos.offset = 0;
pos->markpos_eof = false;
break;
default:
elog(ERROR, "invalid tuplesort state");
break;
}
MemoryContextSwitchTo(oldcontext);
}
/*
* tuplesort_rescan - rewind and replay the scan
*/
void tuplesort_rescan(Tuplesortstate *state)
{
tuplesort_rescan_pos(state, &state->pos);
}
/*
* tuplesort_markpos - saves current position in the merged sort file
*/
void
tuplesort_markpos_pos(Tuplesortstate *state, TuplesortPos *pos)
{
LogicalTape *work_tape = NULL;
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
Assert(state->randomAccess);
switch (state->status)
{
case TSS_SORTEDINMEM:
pos->markpos.mempos = pos->current;
pos->markpos_eof = pos->eof_reached;
break;
case TSS_SORTEDONTAPE:
AssertEquivalent(pos == &state->pos, pos->cur_work_tape == NULL);
work_tape = pos->cur_work_tape == NULL ? state->result_tape : pos->cur_work_tape;
LogicalTapeTell(state->tapeset, work_tape, &pos->markpos.tapepos);
pos->markpos_eof = pos->eof_reached;
break;
default:
elog(ERROR, "invalid tuplesort state");
break;
}
MemoryContextSwitchTo(oldcontext);
}
void
tuplesort_markpos(Tuplesortstate *state)
{
tuplesort_markpos_pos(state, &state->pos);
}
/*
* tuplesort_restorepos - restores current position in merged sort file to
* last saved position
*/
void
tuplesort_restorepos_pos(Tuplesortstate *state, TuplesortPos *pos)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
Assert(state->randomAccess);
switch (state->status)
{
case TSS_SORTEDINMEM:
pos->current = pos->markpos.mempos;
pos->eof_reached = pos->markpos_eof;
break;
case TSS_SORTEDONTAPE:
AssertEquivalent(pos == &state->pos, pos->cur_work_tape == NULL);
{
LogicalTape *work_tape = pos->cur_work_tape == NULL ? state->result_tape : pos->cur_work_tape;
bool fSeekOK = LogicalTapeSeek(state->tapeset, work_tape, &pos->markpos.tapepos);
if(!fSeekOK)
elog(ERROR, "tuplesort_restorepos failed");
pos->eof_reached = pos->markpos_eof;
}
break;
default:
elog(ERROR, "invalid tuplesort state");
break;
}
MemoryContextSwitchTo(oldcontext);
}
void
tuplesort_restorepos(Tuplesortstate *state)
{
tuplesort_restorepos_pos(state, &state->pos);
}
/*
* Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
*
* Compare two SortTuples. If checkIndex is true, use the tuple index
* as the front of the sort key; otherwise, no.
*/
#define HEAPCOMPARE(tup1,tup2) \
(checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
((tup1)->tupindex) - ((tup2)->tupindex) : \
COMPARETUP(state, tup1, tup2))
/*
* Insert a new tuple into an empty or existing heap, maintaining the
* heap invariant. Caller is responsible for ensuring there's room.
*
* Note: we assume *tuple is a temporary variable that can be scribbled on.
* For some callers, tuple actually points to a memtuples[] entry above the
* end of the heap. This is safe as long as it's not immediately adjacent
* to the end of the heap (ie, in the [memtupcount] array entry) --- if it
* is, it might get overwritten before being moved into the heap!
*/
static void
tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex)
{
SortTuple *memtuples;
int j;
int comparestat;
/*
* Save the tupleindex --- see notes above about writing on *tuple. It's a
* historical artifact that tupleindex is passed as a separate argument
* and not in *tuple, but it's notationally convenient so let's leave it
* that way.
*/
tuple->tupindex = tupleindex;
memtuples = state->memtuples;
Assert(state->memtupcount < state->tuparraysize);
/*
* Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
* using 1-based array indexes, not 0-based.
*/
j = state->memtupcount++;
while (j > 0)
{
int i = (j - 1) >> 1;
comparestat = HEAPCOMPARE(tuple, &memtuples[i]);
if (comparestat >= 0)
break;
memtuples[j] = memtuples[i];
j = i;
}
/* CDB: discard duplicates during run creation */
/*
if (0 && state->noduplicates && (0 == comparestat) &&
(state->status == TSS_BUILDRUNS))
{
HeapTuple htup = (HeapTuple) tuple->tuple;
FREEMEM(state, GetMemoryChunkSpace(htup));
heap_freetuple(htup);
}
else
*/
{
memtuples[j] = *tuple;
}
}
static void
tuplesort_sorted_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex)
{
SortTuple *memtuples;
SortTuple *freetup;
int comparestat;
int j;
int searchpoint = 0;
/*
* Save the tupleindex --- see notes above about writing on *tuple. It's a
* historical artifact that tupleindex is passed as a separate argument
* and not in *tuple, but it's notationally convenient so let's leave it
* that way.
*/
tuple->tupindex = tupleindex;
memtuples = state->memtuples;
Assert(state->memtupcount < state->tuparraysize);
/* compare to the last value first */
comparestat =
HEAPCOMPARE(tuple, &memtuples[state->memtupcount - 1]);
if (state->memtupblimited)
{
/* discard the last tuple if it exceeds the LIMIT */
if (comparestat >= 0)
{
freetup = tuple;
goto L_freetup;
}
else
{
state->memtupblimited = false;
}
}
if (state->noduplicates && (0 == comparestat))
{
freetup = tuple;
goto L_freetup;
}
/* j is the last array index to memtuples, and memtupcount is the
* number of values
*/
j = state->memtupcount++;
if (comparestat >= 0)
{
memtuples[j] = *tuple;
goto L_checklimit;
}
if (j > 1)
{
int lefty = 0;
int righty = j-1;
righty--;
while (1)
{
int middle;
if (lefty > righty)
break;
middle = (lefty + righty) / 2;
comparestat =
HEAPCOMPARE(tuple, &memtuples[middle]);
if (comparestat < 0)
{
/* if key < current entry then keep moving left
* (eliminate the right interval)
*/
righty = middle - 1;
}
else
{
/* check if can free a duplicate */
if (state->noduplicates && (0 == comparestat))
{
state->memtupcount--;
freetup = tuple;
goto L_freetup;
}
/* if key >= current entry then keep moving right
(eliminate the left interval). Note that the return
value for the estimate gets bumped up to the current
position, because we can start a linear scan from
this location */
searchpoint = middle;
lefty = middle + 1;
}
} /* end while */
}
/* Note: only go to position j-1, because we haven't filled in
* memtuples[j] yet
*/
for (; searchpoint < j; searchpoint++)
{
comparestat =
HEAPCOMPARE(tuple, &memtuples[searchpoint]);
if (comparestat < 0)
{
break;
}
if (searchpoint >= (j-1))
break;
}
/*
if only a single tuple, and new tuple < memtuple[0], then
searchpoint = 0, so memtuple[0] is moved to memtuple[1].
If new tuple is less than the last tuple, then searchpoint is
set to j-1, so memtuple[j-1] is moved to memtuple[j], which is
correct
*/
/* check if can free a duplicate */
if (state->noduplicates && (0 == comparestat))
{
state->memtupcount--;
freetup = tuple;
goto L_freetup;
}
/* move the other elements over by one */
{
void *src = &memtuples[searchpoint];
void *dst = &memtuples[searchpoint+1];
size_t len = (j-searchpoint) * sizeof(SortTuple);
memmove(dst, src, len);
memtuples[searchpoint] = *tuple;
}
L_checklimit:
/* CDB: always add new value if never dumped or no limit, else
* drop the last value if it exceeds the limit
*/
if ((state->memtupLIMIT)
&& (state->memtupcount > state->memtupLIMIT))
{
/* set blimited true if have limit and memtuples are sorted */
state->memtupblimited = true;
freetup = &memtuples[--state->memtupcount];
goto L_freetup;
}
return;
/* free up tuples if necessary */
L_freetup:
if (freetup->tuple != NULL)
{
FREEMEM(state, GetMemoryChunkSpace(freetup->tuple));
pfree(freetup->tuple);
}
state->discardcount++;
}
/*
* The tuple at state->memtuples[0] has been removed from the heap.
* Decrement memtupcount, and sift up to maintain the heap invariant.
*/
static void
tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex, int i)
{
SortTuple *memtuples = state->memtuples;
SortTuple *tuple;
int n;
if (--state->memtupcount <= 0)
return;
n = state->memtupcount;
tuple = &memtuples[n]; /* tuple that must be reinserted */
/* i = 0; */ /* i is where the "hole" is */
for (;;)
{
int j = 2 * i + 1;
if (j >= n)
break;
if (j + 1 < n &&
HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
j++;
if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
break;
memtuples[i] = memtuples[j];
i = j;
}
memtuples[i] = *tuple;
}
/*
* Tape interface routines
*/
static unsigned int
getlen(Tuplesortstate *state, TuplesortPos *pos, LogicalTape *lt, bool eofOK)
{
unsigned int len;
size_t readSize;
Assert(lt);
readSize = LogicalTapeRead(state->tapeset, lt, (void *)&len, sizeof(len));
if(readSize != sizeof(len))
{
Assert(!"Catch me");
elog(ERROR, "unexpected end of tape");
}
if (len == 0 && !eofOK)
elog(ERROR, "unexpected end of data");
return len;
}
static void
markrunend(Tuplesortstate *state, int tapenum)
{
unsigned int len = 0;
LogicalTape *lt = LogicalTapeSetGetTape(state->tapeset, tapenum);
LogicalTapeWrite(state->tapeset, lt, (void *) &len, sizeof(len));
}
/*
* This routine selects an appropriate sorting function to implement
* a sort operator as efficiently as possible. The straightforward
* method is to use the operator's implementation proc --- ie, "<"
* comparison. However, that way often requires two calls of the function
* per comparison. If we can find a btree three-way comparator function
* associated with the operator, we can use it to do the comparisons
* more efficiently. We also support the possibility that the operator
* is ">" (descending sort), in which case we have to reverse the output
* of the btree comparator.
*
* Possibly this should live somewhere else (backend/catalog/, maybe?).
*/
void
SelectSortFunction(Oid sortOperator,
RegProcedure *sortFunction,
SortFunctionKind *kind)
{
CatCList *catlist;
int i;
HeapTuple tuple;
Oid opclass = InvalidOid;
/*
* Search pg_amop to see if the target operator is registered as the "<"
* or ">" operator of any btree opclass. It's possible that it might be
* registered both ways (eg, if someone were to build a "reverse sort"
* opclass for some reason); prefer the "<" case if so. If the operator is
* registered the same way in multiple opclasses, assume we can use the
* associated comparator function from any one.
*/
catlist = caql_begin_CacheList(
NULL,
cql("SELECT * FROM pg_amop "
" WHERE amopopr = :1 "
" ORDER BY amopopr, "
" amopclaid ",
ObjectIdGetDatum(sortOperator)));
for (i = 0; i < catlist->n_members; i++)
{
Form_pg_amop aform;
tuple = &catlist->members[i]->tuple;
aform = (Form_pg_amop) GETSTRUCT(tuple);
if (!opclass_is_btree(aform->amopclaid))
continue;
/* must be of default subtype, too */
if (OidIsValid(aform->amopsubtype))
continue;
if (aform->amopstrategy == BTLessStrategyNumber)
{
opclass = aform->amopclaid;
*kind = SORTFUNC_CMP;
break; /* done looking */
}
else if (aform->amopstrategy == BTGreaterStrategyNumber)
{
opclass = aform->amopclaid;
*kind = SORTFUNC_REVCMP;
/* keep scanning in hopes of finding a BTLess entry */
}
}
caql_end_CacheList(catlist);
if (OidIsValid(opclass))
{
/* Found a suitable opclass, get its default comparator function */
*sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
Assert(RegProcedureIsValid(*sortFunction));
return;
}
/* shouldn't get here if the parser did its job. See sort_op_can_sort() */
elog(ERROR, "operator %s cannot sort", get_opname(sortOperator));
}
/*
* Inline-able copy of FunctionCall2() to save some cycles in sorting.
*/
static inline Datum
myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
{
FunctionCallInfoData fcinfo;
Datum result;
InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL);
fcinfo.arg[0] = arg1;
fcinfo.arg[1] = arg2;
fcinfo.argnull[0] = false;
fcinfo.argnull[1] = false;
result = FunctionCallInvoke(&fcinfo);
/* Check for null result, since caller is clearly not expecting one */
if (fcinfo.isnull)
elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
return result;
}
/*
* Apply a sort function (by now converted to fmgr lookup form)
* and return a 3-way comparison result. This takes care of handling
* NULLs and sort ordering direction properly.
*/
static inline int32
inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
Datum datum1, bool isNull1,
Datum datum2, bool isNull2)
{
switch (kind)
{
case SORTFUNC_LT:
if (isNull1)
{
if (isNull2)
return 0;
return 1; /* NULL sorts after non-NULL */
}
if (isNull2)
return -1;
if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
return -1; /* a < b */
if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
return 1; /* a > b */
return 0;
case SORTFUNC_REVLT:
/* We reverse the ordering of NULLs, but not the operator */
if (isNull1)
{
if (isNull2)
return 0;
return -1; /* NULL sorts before non-NULL */
}
if (isNull2)
return 1;
if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
return -1; /* a < b */
if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
return 1; /* a > b */
return 0;
case SORTFUNC_CMP:
if (isNull1)
{
if (isNull2)
return 0;
return 1; /* NULL sorts after non-NULL */
}
if (isNull2)
return -1;
return DatumGetInt32(myFunctionCall2(sortFunction,
datum1, datum2));
case SORTFUNC_REVCMP:
if (isNull1)
{
if (isNull2)
return 0;
return -1; /* NULL sorts before non-NULL */
}
if (isNull2)
return 1;
return -DatumGetInt32(myFunctionCall2(sortFunction,
datum1, datum2));
default:
elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
return 0; /* can't get here, but keep compiler quiet */
}
}
/*
* Non-inline ApplySortFunction() --- this is needed only to conform to
* C99's brain-dead notions about how to implement inline functions...
*/
int32
ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
Datum datum1, bool isNull1,
Datum datum2, bool isNull2)
{
return inlineApplySortFunction(sortFunction, kind,
datum1, isNull1,
datum2, isNull2);
}
/*
* Routines specialized for HeapTuple (actually MinimalTuple) case
*/
static int
comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
{
ScanKey scanKey = state->scanKeys;
int nkey;
int32 compare;
/* Allow interrupting long sorts */
CHECK_FOR_INTERRUPTS();
Assert(state->mt_bind);
compare = inlineApplySortFunction(&scanKey->sk_func, state->sortFnKinds[0],
a->datum1, a->isnull1,
b->datum1, b->isnull1
);
if(compare != 0)
return compare;
scanKey++;
for (nkey = 1; nkey < state->nKeys; nkey++, scanKey++)
{
AttrNumber attno = scanKey->sk_attno;
Datum datum1, datum2;
bool isnull1, isnull2;
datum1 = memtuple_getattr(a->tuple, state->mt_bind, attno, &isnull1);
datum2 = memtuple_getattr(b->tuple, state->mt_bind, attno, &isnull2);
compare = inlineApplySortFunction(&scanKey->sk_func, state->sortFnKinds[nkey],
datum1, isnull1,
datum2, isnull2);
if (compare != 0)
return compare;
}
/* CDB JC XXX XXX - need to merge aggregates or discard duplicates here */
return 0;
}
static void
copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
{
/*
* We expect the passed "tup" to be a TupleTableSlot, and form a
* MinimalTuple using the exported interface for that.
*/
TupleTableSlot *slot = (TupleTableSlot *) tup;
slot_getallattrs(slot);
stup->tuple = memtuple_form_to(state->mt_bind,
slot_get_values(slot),
slot_get_isnull(slot),
NULL, NULL, false
);
USEMEM(state, GetMemoryChunkSpace(stup->tuple));
Assert(state->mt_bind);
stup->datum1 = memtuple_getattr(stup->tuple, state->mt_bind, state->scanKeys[0].sk_attno, &stup->isnull1);
}
/*
* Since MinimalTuple already has length in its first word, we don't need
* to write that separately.
*/
static void
writetup_heap(Tuplesortstate *state, LogicalTape *lt, SortTuple *stup)
{
uint32 tuplen = memtuple_get_size(stup->tuple, NULL);
LogicalTapeWrite(state->tapeset, lt, (void *) stup->tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */
LogicalTapeWrite(state->tapeset, lt, (void *) &tuplen, sizeof(tuplen));
if (state->gpmon_pkt)
{
Gpmon_M_Incr(state->gpmon_pkt, GPMON_SORT_SPILLTUPLE);
Gpmon_M_Add(state->gpmon_pkt, GPMON_SORT_SPILLBYTE, tuplen);
Gpmon_M_Incr(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_TUPLE);
Gpmon_M_Add(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_BYTE, tuplen);
}
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
}
static void
readtup_heap(Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup, LogicalTape *lt, uint32 len)
{
uint32 tuplen;
size_t readSize;
stup->tuple = (MemTuple) palloc(memtuple_size_from_uint32(len));
USEMEM(state, GetMemoryChunkSpace(stup->tuple));
memtuple_set_mtlen(stup->tuple, NULL, len);
Assert(lt);
readSize = LogicalTapeRead(state->tapeset, lt,
(void *) ((char *)stup->tuple + sizeof(uint32)),
memtuple_size_from_uint32(len) - sizeof(uint32));
if (readSize != (size_t) (memtuple_size_from_uint32(len) - sizeof(uint32)))
elog(ERROR, "unexpected end of data");
if (state->randomAccess) /* need trailing length word? */
{
readSize = LogicalTapeRead(state->tapeset, lt, (void *)&tuplen, sizeof(tuplen));
if(readSize != sizeof(tuplen))
elog(ERROR, "unexpected end of data");
}
/* For shareinput on sort, the reader will not set mt_bind. In this case,
* we will not call compare.
*/
AssertImply(!state->mt_bind, state->status == TSS_SORTEDONTAPE);
if(state->mt_bind)
stup->datum1 = memtuple_getattr(stup->tuple, state->mt_bind, state->scanKeys[0].sk_attno, &stup->isnull1);
}
/*
* Routines specialized for IndexTuple case
*
* NOTE: actually, these are specialized for the btree case; it's not
* clear whether you could use them for a non-btree index. Possibly
* you'd need to make another set of routines if you needed to sort
* according to another kind of index.
*/
static int
comparetup_index(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
{
/*
* This is similar to _bt_tuplecompare(), but we have already done the
* index_getattr calls for the first column, and we need to keep track of
* whether any null fields are present. Also see the special treatment
* for equal keys at the end.
*/
ScanKey scanKey = state->indexScanKey;
IndexTuple tuple1;
IndexTuple tuple2;
int keysz;
TupleDesc tupDes;
bool equal_hasnull = false;
int nkey;
int32 compare;
/* Allow interrupting long sorts */
CHECK_FOR_INTERRUPTS();
/* Compare the leading sort key */
compare = inlineApplySortFunction(&scanKey->sk_func,
SORTFUNC_CMP,
a->datum1, a->isnull1,
b->datum1, b->isnull1);
if (compare != 0)
return compare;
/* they are equal, so we only need to examine one null flag */
if (a->isnull1)
equal_hasnull = true;
/* Compare additional sort keys */
tuple1 = (IndexTuple) a->tuple;
tuple2 = (IndexTuple) b->tuple;
keysz = state->nKeys;
tupDes = RelationGetDescr(state->indexRel);
scanKey++;
for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
{
Datum datum1,
datum2;
bool isnull1,
isnull2;
datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
/* see comments about NULLs handling in btbuild */
/* the comparison function is always of CMP type */
compare = inlineApplySortFunction(&scanKey->sk_func,
SORTFUNC_CMP,
datum1, isnull1,
datum2, isnull2);
if (compare != 0)
return compare; /* done when we find unequal attributes */
/* they are equal, so we only need to examine one null flag */
if (isnull1)
equal_hasnull = true;
}
/*
* If btree has asked us to enforce uniqueness, complain if two equal
* tuples are detected (unless there was at least one NULL field).
*
* It is sufficient to make the test here, because if two tuples are equal
* they *must* get compared at some stage of the sort --- otherwise the
* sort algorithm wouldn't have checked whether one must appear before the
* other.
*
* Some rather brain-dead implementations of qsort will sometimes call the
* comparison routine to compare a value to itself. (At this writing only
* QNX 4 is known to do such silly things; we don't support QNX anymore,
* but perhaps the behavior still exists elsewhere.) Don't raise a bogus
* error in that case.
*/
if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
ereport(ERROR,
(errcode(ERRCODE_UNIQUE_VIOLATION),
errmsg("could not create unique index"),
errdetail("Table contains duplicated values.")));
/*
* If key values are equal, we sort on ItemPointer. This does not affect
* validity of the finished index, but it offers cheap insurance against
* performance problems with bad qsort implementations that have trouble
* with large numbers of equal keys.
*/
{
BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
if (blk1 != blk2)
return (blk1 < blk2) ? -1 : 1;
}
{
OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
if (pos1 != pos2)
return (pos1 < pos2) ? -1 : 1;
}
return 0;
}
static void
copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
{
IndexTuple tuple = (IndexTuple) tup;
unsigned int tuplen = IndexTupleSize(tuple);
IndexTuple newtuple;
/* copy the tuple into sort storage */
newtuple = (IndexTuple) palloc(tuplen);
memcpy(newtuple, tuple, tuplen);
USEMEM(state, GetMemoryChunkSpace(newtuple));
stup->tuple = (void *) newtuple;
/* set up first-column key value */
stup->datum1 = index_getattr(newtuple,
1,
RelationGetDescr(state->indexRel),
&stup->isnull1);
}
static void
writetup_index(Tuplesortstate *state, LogicalTape *lt, SortTuple *stup)
{
IndexTuple tuple = (IndexTuple) stup->tuple;
unsigned int tuplen;
tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
LogicalTapeWrite(state->tapeset, lt, (void *) &tuplen, sizeof(tuplen));
LogicalTapeWrite(state->tapeset, lt, (void *) tuple, IndexTupleSize(tuple));
if (state->randomAccess) /* need trailing length word? */
LogicalTapeWrite(state->tapeset, lt, (void *) &tuplen, sizeof(tuplen));
FREEMEM(state, GetMemoryChunkSpace(tuple));
pfree(tuple);
}
static void
readtup_index(Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup,
LogicalTape *lt, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
IndexTuple tuple = (IndexTuple) palloc(tuplen);
size_t readSize;
Assert(lt);
USEMEM(state, GetMemoryChunkSpace(tuple));
readSize = LogicalTapeRead(state->tapeset, lt, (void *)tuple, tuplen);
if(readSize != tuplen)
elog(ERROR, "unexpected end of data");
if (state->randomAccess) /* need trailing length word? */
{
readSize = LogicalTapeRead(state->tapeset, lt, (void *)&tuplen, sizeof(tuplen));
if (readSize != sizeof(tuplen))
elog(ERROR, "unexpected end of data");
}
stup->tuple = (void *) tuple;
/* set up first-column key value */
stup->datum1 = index_getattr(tuple,
1,
RelationGetDescr(state->indexRel),
&stup->isnull1);
}
/*
* Routines specialized for DatumTuple case
*/
static int
comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
{
/* Allow interrupting long sorts */
CHECK_FOR_INTERRUPTS();
return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
a->datum1, a->isnull1,
b->datum1, b->isnull1);
}
static void
copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
{
/* Not currently needed */
elog(ERROR, "copytup_datum() should not be called");
}
static void
writetup_datum(Tuplesortstate *state, LogicalTape *lt, SortTuple *stup)
{
void *waddr;
unsigned int tuplen;
unsigned int writtenlen;
if (stup->isnull1)
{
waddr = NULL;
tuplen = 0;
}
else if (state->datumTypeByVal)
{
waddr = &stup->datum1;
tuplen = sizeof(Datum);
}
else
{
waddr = DatumGetPointer(stup->datum1);
tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
Assert(tuplen != 0);
}
writtenlen = tuplen + sizeof(unsigned int);
LogicalTapeWrite(state->tapeset, lt, (void *) &writtenlen, sizeof(writtenlen));
LogicalTapeWrite(state->tapeset, lt, waddr, tuplen);
if (state->randomAccess) /* need trailing length word? */
LogicalTapeWrite(state->tapeset, lt, (void *) &writtenlen, sizeof(writtenlen));
if (stup->tuple)
{
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
}
}
static void
readtup_datum(Tuplesortstate *state, TuplesortPos *pos, SortTuple *stup,
LogicalTape *lt, unsigned int len)
{
size_t readSize;
unsigned int tuplen = len - sizeof(unsigned int);
Assert(lt);
if (tuplen == 0)
{
/* it's NULL */
stup->datum1 = (Datum) 0;
stup->isnull1 = true;
stup->tuple = NULL;
}
else if (state->datumTypeByVal)
{
Assert(tuplen == sizeof(Datum));
readSize = LogicalTapeRead(state->tapeset, lt, (void *)&stup->datum1, tuplen);
if (readSize != tuplen)
elog(ERROR, "unexpected end of data");
stup->isnull1 = false;
stup->tuple = NULL;
}
else
{
void *raddr = palloc(tuplen);
readSize = LogicalTapeRead(state->tapeset, lt, raddr, tuplen);
if(readSize != tuplen)
elog(ERROR, "unexpected end of data");
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
USEMEM(state, GetMemoryChunkSpace(raddr));
}
if (state->randomAccess) /* need trailing length word? */
{
readSize = LogicalTapeRead(state->tapeset, lt, (void *)&tuplen, sizeof(tuplen));
if (readSize != sizeof(tuplen))
elog(ERROR, "unexpected end of data");
}
}
void tuplesort_checksend_gpmonpkt(gpmon_packet_t *pkt, int *tick)
{
if(!pkt)
return;
if(gp_enable_gpperfmon)
{
if(*tick != gpmon_tick)
gpmon_send(pkt);
*tick = gpmon_tick;
}
}