| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * tuplesort.c |
| * Generalized tuple sorting routines. |
| * |
| * This module handles sorting of heap tuples, index tuples, or single |
| * Datums (and could easily support other kinds of sortable objects, |
| * if necessary). It works efficiently for both small and large amounts |
| * of data. Small amounts are sorted in-memory using qsort(). Large |
| * amounts are sorted using temporary files and a standard external sort |
| * algorithm. |
| * |
| * See Knuth, volume 3, for more than you want to know about the external |
| * sorting algorithm. We divide the input into sorted runs using replacement |
| * selection, in the form of a priority tree implemented as a heap |
| * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase |
| * merge, Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D |
| * are implemented by logtape.c, which avoids space wastage by recycling |
| * disk space as soon as each block is read from its "tape". |
| * |
| * We do not form the initial runs using Knuth's recommended replacement |
| * selection data structure (Algorithm 5.4.1R), because it uses a fixed |
| * number of records in memory at all times. Since we are dealing with |
| * tuples that may vary considerably in size, we want to be able to vary |
| * the number of records kept in memory to ensure full utilization of the |
| * allowed sort memory space. So, we keep the tuples in a variable-size |
| * heap, with the next record to go out at the top of the heap. Like |
| * Algorithm 5.4.1R, each record is stored with the run number that it |
| * must go into, and we use (run number, key) as the ordering key for the |
| * heap. When the run number at the top of the heap changes, we know that |
| * no more records of the prior run are left in the heap. |
| * |
| * The approximate amount of memory allowed for any one sort operation |
| * is specified in kilobytes by the caller (most pass work_mem). Initially, |
| * we absorb tuples and simply store them in an unsorted array as long as |
| * we haven't exceeded workMem. If we reach the end of the input without |
| * exceeding workMem, we sort the array using qsort() and subsequently return |
| * tuples just by scanning the tuple array sequentially. If we do exceed |
| * workMem, we construct a heap using Algorithm H and begin to emit tuples |
| * into sorted runs in temporary tapes, emitting just enough tuples at each |
| * step to get back within the workMem limit. Whenever the run number at |
| * the top of the heap changes, we begin a new run with a new output tape |
| * (selected per Algorithm D). After the end of the input is reached, |
| * we dump out remaining tuples in memory into a final run (or two), |
| * then merge the runs using Algorithm D. |
| * |
| * When merging runs, we use a heap containing just the frontmost tuple from |
| * each source run; we repeatedly output the smallest tuple and insert the |
| * next tuple from its source tape (if any). When the heap empties, the merge |
| * is complete. The basic merge algorithm thus needs very little memory --- |
| * only M tuples for an M-way merge, and M is constrained to a small number. |
| * However, we can still make good use of our full workMem allocation by |
| * pre-reading additional tuples from each source tape. Without prereading, |
| * our access pattern to the temporary file would be very erratic; on average |
| * we'd read one block from each of M source tapes during the same time that |
| * we're writing M blocks to the output tape, so there is no sequentiality of |
| * access at all, defeating the read-ahead methods used by most Unix kernels. |
| * Worse, the output tape gets written into a very random sequence of blocks |
| * of the temp file, ensuring that things will be even worse when it comes |
| * time to read that tape. A straightforward merge pass thus ends up doing a |
| * lot of waiting for disk seeks. We can improve matters by prereading from |
| * each source tape sequentially, loading about workMem/M bytes from each tape |
| * in turn. Then we run the merge algorithm, writing but not reading until |
| * one of the preloaded tuple series runs out. Then we switch back to preread |
| * mode, fill memory again, and repeat. This approach helps to localize both |
| * read and write accesses. |
| * |
| * When the caller requests random access to the sort result, we form |
| * the final sorted run on a logical tape which is then "frozen", so |
| * that we can access it randomly. When the caller does not need random |
| * access, we return from tuplesort_performsort() as soon as we are down |
| * to one run per logical tape. The final merge is then performed |
| * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this |
| * saves one cycle of writing all the data out to disk and reading it in. |
| * |
| * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the |
| * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according |
| * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that |
| * tape drives are expensive beasts, and in particular that there will always |
| * be many more runs than tape drives. In our implementation a "tape drive" |
| * doesn't cost much more than a few Kb of memory buffers, so we can afford |
| * to have lots of them. In particular, if we can have as many tape drives |
| * as sorted runs, we can eliminate any repeated I/O at all. In the current |
| * code we determine the number of tapes M on the basis of workMem: we want |
| * workMem/M to be large enough that we read a fair amount of data each time |
| * we preread from a tape, so as to maintain the locality of access described |
| * above. Nonetheless, with large workMem we can have many tapes. |
| * |
| * |
| * Portions Copyright (c) 2007-2008, Greenplum inc |
| * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.70 2006/10/04 00:30:04 momjian Exp $ |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/nbtree.h" |
| #include "access/tuptoaster.h" |
| #include "catalog/pg_type.h" |
| #include "catalog/pg_amop.h" |
| #include "catalog/pg_operator.h" |
| #include "executor/instrument.h" /* Instrumentation */ |
| #include "lib/stringinfo.h" /* StringInfo */ |
| #include "executor/nodeSort.h" /* gpmon */ |
| #include "miscadmin.h" |
| #include "utils/datum.h" |
| #include "executor/execWorkfile.h" |
| #include "utils/logtape.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/pg_rusage.h" |
| #include "utils/syscache.h" |
| #include "utils/tuplesort.h" |
| #include "utils/pg_locale.h" |
| #include "utils/builtins.h" |
| #include "utils/tuplesort_mk.h" |
| #include "utils/string_wrapper.h" |
| #include "utils/faultinjector.h" |
| |
| #include "cdb/cdbvars.h" |
| |
| /* |
| * Possible states of a Tuplesort object. These denote the states that |
| * persist between calls of Tuplesort routines. |
| */ |
| typedef enum |
| { |
| TSS_INITIAL, /* Loading tuples; still within memory limit */ |
| TSS_BUILDRUNS, /* Loading tuples; writing to tape */ |
| TSS_SORTEDINMEM, /* Sort completed entirely in memory */ |
| TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */ |
| TSS_FINALMERGE /* Performing final merge on-the-fly */ |
| } TupSortStatus; |
| |
| /* |
| * Parameters for calculation of number of tapes to use --- see inittapes() |
| * and tuplesort_merge_order(). |
| * |
| * In this calculation we assume that each tape will cost us about 3 blocks |
| * worth of buffer space (which is an underestimate for very large data |
| * volumes, but it's probably close enough --- see logtape.c). |
| * |
| * MERGE_BUFFER_SIZE is how much data we'd like to read from each input |
| * tape during a preread cycle (see discussion at top of file). |
| */ |
| #define MINORDER 6 /* minimum merge order */ |
| #define MAXORDER 250 /* maximum merge order */ |
| #define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3) |
| #define MERGE_BUFFER_SIZE (BLCKSZ * 32) |
| |
| // #define PRINT_SPILL_AND_MEMORY_MESSAGES |
| |
| /* |
| * Current position of Tuplesort operation. |
| */ |
| struct TuplesortPos_mk |
| { |
| /* |
| * These variables are used after completion of sorting to keep track of |
| * the next tuple to return. (In the tape case, the tape's current read |
| * position is also critical state.) |
| */ |
| int current; /* array index (only used if SORTEDINMEM) */ |
| bool eof_reached; /* reached EOF (needed for cursors) */ |
| |
| /* markpos_xxx holds marked position for mark and restore */ |
| union { |
| LogicalTapePos tapepos; |
| long mempos; |
| } markpos; |
| bool markpos_eof; |
| |
| LogicalTape* cur_work_tape; /* current tape that I am working on */ |
| }; |
| |
| /* Merge reader (read back from runs) context for mk_heap */ |
| typedef struct TupsortMergeReadCtxt |
| { |
| Tuplesortstate_mk *tsstate; |
| TuplesortPos_mk pos; |
| |
| /* Buffer for preread */ |
| MKEntry *p; |
| int allocsz; |
| int cnt; |
| int cur; |
| |
| int mem_allowed; |
| int mem_used; |
| } TupsortMergeReadCtxt; |
| |
| |
| /* |
| * Private state of a Tuplesort operation. |
| */ |
| struct Tuplesortstate_mk |
| { |
| TupSortStatus status; /* enumerated value as shown above */ |
| int nKeys; /* number of columns in sort key */ |
| bool randomAccess; /* did caller request random access? */ |
| |
| long memAllowed; |
| |
| int maxTapes; /* number of tapes (Knuth's T) */ |
| int tapeRange; /* maxTapes-1 (Knuth's P) */ |
| MemoryContext sortcontext; /* memory context holding all sort data */ |
| LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ |
| |
| ScanState *ss; |
| |
| /* Representation of all spill file names, for spill file reuse */ |
| workfile_set *work_set; |
| |
| /* |
| * MUST be set |
| * |
| * Function to copy a supplied input tuple into palloc'd space and set up |
| * its SortTuple representation (ie, set tuple/datum1/isnull1). Also, |
| * state->availMem must be decreased by the amount of space used for the |
| * tuple copy (note the SortTuple struct itself is not counted). |
| */ |
| void (*copytup) (Tuplesortstate_mk *state, MKEntry *e, void *tup); |
| |
| /* |
| * MUST be set |
| * |
| * Function to write a stored tuple onto tape. The representation of the |
| * tuple on tape need not be the same as it is in memory; requirements on |
| * the tape representation are given below. After writing the tuple, |
| * pfree() the out-of-line data (not the SortTuple struct!), and increase |
| * state->availMem by the amount of memory space thereby released. |
| */ |
| long (*writetup) (Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e); |
| |
| /* |
| * MUST be set |
| * |
| * Function to read a stored tuple from tape back into memory. 'len' is |
| * the already-read length of the stored tuple. Create a palloc'd copy, |
| * initialize tuple/datum1/isnull1 in the target SortTuple struct, and |
| * decrease state->availMem by the amount of memory space consumed. |
| */ |
| void (*readtup) (Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, |
| LogicalTape *lt, uint32 len); |
| |
| /* |
| * This array holds the tuples now in sort memory. If we are in state |
| * INITIAL, the tuples are in no particular order; if we are in state |
| * SORTEDINMEM, the tuples are in final sorted order; |
| */ |
| MKEntry *entries; |
| long entry_allocsize; |
| long entry_count; |
| |
| /* |
| * BUILDRUNS and FINALMERGE: Heap for producing and merging runs |
| */ |
| MKHeap *mkheap; |
| MKHeapReader *mkhreader; |
| TupsortMergeReadCtxt *mkhreader_ctxt; |
| int mkhreader_allocsize; |
| |
| /* |
| * MK context, holds info needed by compare/prepare functions |
| */ |
| MKContext mkctxt; |
| |
| /* |
| * A flag to indicate whether the stats for this tuplesort |
| * has been finalized. |
| */ |
| bool statsFinalized; |
| |
| int currentRun; |
| |
| /* |
| * Unless otherwise noted, all pointer variables below are pointers to |
| * arrays of length maxTapes, holding per-tape data. |
| */ |
| |
| /* |
| * These variables are only used during merge passes. mergeactive[i] is |
| * true if we are reading an input run from (actual) tape number i and |
| * have not yet exhausted that run. mergenext[i] is the memtuples index |
| * of the next pre-read tuple (next to be loaded into the heap) for tape |
| * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly |
| * points to the last pre-read tuple from each tape. mergeavailslots[i] |
| * is the number of unused memtuples[] slots reserved for tape i, and |
| * mergeavailmem[i] is the amount of unused space allocated for tape i. |
| * mergefreelist and mergefirstfree keep track of unused locations in the |
| * memtuples[] array. The memtuples[].tupindex fields link together |
| * pre-read tuples for each tape as well as recycled locations in |
| * mergefreelist. It is OK to use 0 as a null link in these lists, because |
| * memtuples[0] is part of the merge heap and is never a pre-read tuple. |
| */ |
| bool *mergeactive; /* active input run source? */ |
| int *mergenext; /* first preread tuple for each source */ |
| int *mergelast; /* last preread tuple for each source */ |
| int *mergeavailslots; /* slots left for prereading each tape */ |
| long *mergeavailmem; /* availMem for prereading each tape */ |
| int mergefreelist; /* head of freelist of recycled slots */ |
| int mergefirstfree; /* first slot never used in this merge */ |
| |
| /* |
| * Variables for Algorithm D. Note that destTape is a "logical" tape |
| * number, ie, an index into the tp_xxx[] arrays. Be careful to keep |
| * "logical" and "actual" tape numbers straight! |
| */ |
| int Level; /* Knuth's l */ |
| int destTape; /* current output tape (Knuth's j, less 1) */ |
| int *tp_fib; /* Target Fibonacci run counts (A[]) */ |
| int *tp_runs; /* # of real runs on each tape */ |
| int *tp_dummy; /* # of dummy runs for each tape (D[]) */ |
| int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ |
| int activeTapes; /* # of active input tapes in merge pass */ |
| |
| LogicalTape *result_tape; /* actual tape of finished output */ |
| TuplesortPos_mk pos; /* current postion */ |
| |
| /* |
| * Tuple desc and binding, for MemTuple. |
| */ |
| TupleDesc tupDesc; |
| MemTupleBinding *mt_bind; |
| ScanKey cmpScanKey; |
| |
| /* |
| * These variables are specific to the IndexTuple case; they are set by |
| * tuplesort_begin_index and used only by the IndexTuple routines. |
| */ |
| Relation indexRel; |
| |
| /* |
| * These variables are specific to the Datum case; they are set by |
| * tuplesort_begin_datum and used only by the DatumTuple routines. |
| */ |
| Oid datumType; |
| Oid sortOperator; |
| bool nullfirst; |
| |
| /* we need typelen and byval in order to know how to copy the Datums. */ |
| int datumTypeLen; |
| bool datumTypeByVal; |
| |
| /* |
| * CDB: EXPLAIN ANALYZE reporting interface and statistics. |
| */ |
| struct Instrumentation *instrument; |
| struct StringInfoData *explainbuf; |
| uint64 totalTupleBytes; |
| uint64 totalNumTuples; |
| uint64 numTuplesInMem; |
| uint64 memUsedBeforeSpill; /* memory that is used by Sort at the time of spilling */ |
| long arraySizeBeforeSpill; /* the value for entry_allocsize at the time of spilling */ |
| |
| /* |
| * File for dump/load logical tape set. Used by sharing sort across slice |
| */ |
| char *tapeset_file_prefix; |
| /* |
| * State file used to load a logical tape set. Used by sharing sort across slice |
| */ |
| ExecWorkFile *tapeset_state_file; |
| |
| /* Gpmon */ |
| gpmon_packet_t *gpmon_pkt; |
| int *gpmon_sort_tick; |
| }; |
| |
| static bool is_sortstate_rwfile(Tuplesortstate_mk *state) |
| { |
| return state->tapeset_state_file != NULL; |
| } |
| #ifdef USE_ASSERT_CHECKING |
| static bool is_under_sort_ctxt(Tuplesortstate_mk *state) |
| { |
| return CurrentMemoryContext == state->sortcontext; |
| } |
| #endif |
| |
| /** |
| * Any strings that are STRXFRM_INPUT_LENGTH_LIMIT or larger will store only the |
| * first STRXFRM_INPUT_LENGTH_LIMIT bytes of the transformed string. |
| * |
| * Note that this is actually less transformed data in some cases than if the string were |
| * just a little smaller than STRXFRM_INPUT_LENGTH_LIMIT. We can probably make the |
| * transition more gradual but will still want this fading off -- for long strings |
| * that differ within the first STRXFRM_INPUT_LENGTH_LIMIT bytes then the prefix |
| * will be sufficient for all but equal cases -- in which case a longer prefix does |
| * not help (we must resort to datum for comparison) |
| * |
| * If less than the whole transformed size is stored then the transformed string itself is also |
| * not copied -- so for large strings we must resort to datum-based comparison. |
| */ |
| #define STRXFRM_INPUT_LENGTH_LIMIT (512) |
| #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) |
| #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) |
| #define READTUP(state,pos,stup,tape,len) ((*(state)->readtup) (state, pos, stup, tape, len)) |
| |
| static inline bool LACKMEM_WITH_ESTIMATE(Tuplesortstate_mk *state) |
| { |
| /* Assert: lackmem is only effective during initial run build because |
| * we don't maintain the estimate after that. */ |
| Assert(state->status == TSS_INITIAL); |
| |
| return MemoryContextGetCurrentSpace(state->sortcontext) + state->mkctxt.estimatedExtraForPrep > state->memAllowed; |
| } |
| |
| /* |
| * NOTES about on-tape representation of tuples: |
| * |
| * We require the first "unsigned int" of a stored tuple to be the total size |
| * on-tape of the tuple, including itself (so it is never zero; an all-zero |
| * unsigned int is used to delimit runs). The remainder of the stored tuple |
| * may or may not match the in-memory representation of the tuple --- |
| * any conversion needed is the job of the writetup and readtup routines. |
| * |
| * If state->randomAccess is true, then the stored representation of the |
| * tuple must be followed by another "unsigned int" that is a copy of the |
| * length --- so the total tape space used is actually sizeof(unsigned int) |
| * more than the stored length value. This allows read-backwards. When |
| * randomAccess is not true, the write/read routines may omit the extra |
| * length word. |
| * |
| * writetup is expected to write both length words as well as the tuple |
| * data. When readtup is called, the tape is positioned just after the |
| * front length word; readtup must read the tuple data and advance past |
| * the back length word (if present). |
| * |
| * The write/read routines can make use of the tuple description data |
| * stored in the Tuplesortstate_mk record, if needed. They are also expected |
| * to adjust state->availMem by the amount of memory space (not tape space!) |
| * released or consumed. There is no error return from either writetup |
| * or readtup; they should ereport() on failure. |
| * |
| * |
| * NOTES about memory consumption calculations: |
| * |
| * We count space allocated for tuples against the workMem limit, plus |
| * the space used by the variable-size memtuples array. Fixed-size space |
| * is not counted; it's small enough to not be interesting. |
| * |
| * Note that we count actual space used (as shown by GetMemoryChunkSpace) |
| * rather than the originally-requested size. This is important since |
| * palloc can add substantial overhead. It's not a complete answer since |
| * we won't count any wasted space in palloc allocation blocks, but it's |
| * a lot better than what we were doing before 7.3. |
| */ |
| |
| static Tuplesortstate_mk *tuplesort_begin_common(ScanState * ss, int workMem, bool randomAccess, bool allocmemtuple); |
| static void puttuple_common(Tuplesortstate_mk *state, MKEntry *e); |
| static void selectnewtape_mk(Tuplesortstate_mk *state); |
| static void mergeruns(Tuplesortstate_mk *state); |
| static void beginmerge(Tuplesortstate_mk *state); |
| static uint32 getlen(Tuplesortstate_mk *state, TuplesortPos_mk *pos, LogicalTape *lt, bool eofOK); |
| static void markrunend(Tuplesortstate_mk *state, int tapenum); |
| |
| static void copytup_heap(Tuplesortstate_mk *state, MKEntry *e, void *tup); |
| static long writetup_heap(Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e); |
| static void freetup_heap(MKEntry *e); |
| static void readtup_heap(Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, |
| LogicalTape *lt, uint32 len); |
| |
| static void copytup_index(Tuplesortstate_mk *state, MKEntry *e, void *tup); |
| static long writetup_index(Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e); |
| static void freetup_index(MKEntry *e); |
| static void readtup_index(Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, |
| LogicalTape *lt, uint32 len); |
| |
| static void freetup_noop(MKEntry *e); |
| |
| static void copytup_datum(Tuplesortstate_mk *state, MKEntry *e, void *tup); |
| static long writetup_datum(Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e); |
| static void freetup_datum(MKEntry *e); |
| static void readtup_datum(Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, |
| LogicalTape *lt, uint32 len); |
| |
| static void tupsort_prepare_char(MKEntry *a, bool isChar); |
| static int tupsort_compare_char(MKEntry *v1, MKEntry *v2, MKLvContext *lvctxt, MKContext *mkContext); |
| |
| static Datum tupsort_fetch_datum_mtup(MKEntry *a, MKContext *mkctxt, MKLvContext *lvctxt, bool *isNullOut); |
| static Datum tupsort_fetch_datum_itup(MKEntry *a, MKContext *mkctxt, MKLvContext *lvctxt, bool *isNullOut); |
| |
| static int32 estimateMaxPrepareSizeForEntry(MKEntry *a, struct MKContext *mkctxt); |
| static int32 estimatePrepareSpaceForChar(struct MKContext *mkContext, MKEntry *e, Datum d, bool isCHAR); |
| |
| static void tuplesort_inmem_limit_insert(Tuplesortstate_mk *state, MKEntry *e); |
| static void tuplesort_inmem_nolimit_insert(Tuplesortstate_mk * state, MKEntry * e); |
| static void tuplesort_heap_insert(Tuplesortstate_mk *state, MKEntry *e); |
| static void tuplesort_limit_sort(Tuplesortstate_mk *state); |
| |
| static void tupsort_refcnt(void *vp, int ref); |
| |
| /* Declare the following as extern so that older dtrace will not complain */ |
| extern void inittapes_mk(Tuplesortstate_mk *state, const char* rwfile_prefix); |
| extern void dumptuples_mk(Tuplesortstate_mk *state, bool alltuples); |
| extern void mergeonerun_mk(Tuplesortstate_mk *state); |
| |
| extern void mkheap_verify_heap(MKHeap *heap, int top); |
| |
| /* |
| * tuplesort_begin_xxx |
| * |
| * Initialize for a tuple sort operation. |
| * |
| * After calling tuplesort_begin, the caller should call tuplesort_putXXX |
| * zero or more times, then call tuplesort_performsort when all the tuples |
| * have been supplied. After performsort, retrieve the tuples in sorted |
| * order by calling tuplesort_getXXX until it returns false/NULL. (If random |
| * access was requested, rescan, markpos, and restorepos can also be called.) |
| * Call tuplesort_end to terminate the operation and release memory/disk space. |
| * |
| * Each variant of tuplesort_begin has a workMem parameter specifying the |
| * maximum number of kilobytes of RAM to use before spilling data to disk. |
| * (The normal value of this parameter is work_mem, but some callers use |
| * other values.) Each variant also has a randomAccess parameter specifying |
| * whether the caller needs non-sequential access to the sort result. |
| * |
| * CDB: During EXPLAIN ANALYZE, after tuplesort_begin_xxx() the caller should |
| * use tuplesort_set_instrument() (q.v.) to enable statistical reporting. |
| */ |
| |
| static Tuplesortstate_mk * |
| tuplesort_begin_common(ScanState * ss, int workMem, bool randomAccess, bool allocmemtuple) |
| { |
| Tuplesortstate_mk *state; |
| MemoryContext sortcontext; |
| MemoryContext oldcontext; |
| |
| /* |
| * Create a working memory context for this sort operation. All data |
| * needed by the sort will live inside this context. |
| */ |
| sortcontext = AllocSetContextCreate(CurrentMemoryContext, |
| "TupleSort", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| /* |
| * Make the Tuplesortstate_mk within the per-sort context. This way, we |
| * don't need a separate pfree() operation for it at shutdown. |
| */ |
| oldcontext = MemoryContextSwitchTo(sortcontext); |
| |
| /* |
| * palloc0, need to zero out memeories, so we do not need to set |
| * every var of the tuplestate to 0 below. |
| */ |
| state = (Tuplesortstate_mk *) palloc0(sizeof(Tuplesortstate_mk)); |
| |
| state->status = TSS_INITIAL; |
| state->randomAccess = randomAccess; |
| state->memAllowed = workMem * 1024L; |
| |
| state->work_set = NULL; |
| state->ss = ss; |
| |
| state->sortcontext = sortcontext; |
| |
| if(allocmemtuple) |
| { |
| int i; |
| state->entry_allocsize = 1024; |
| state->entries = (MKEntry *) palloc0(state->entry_allocsize * sizeof(MKEntry)); |
| |
| for(i=0; i<state->entry_allocsize; ++i) |
| mke_blank(state->entries+i); |
| |
| /* workMem must be large enough for the minimal memtuples array */ |
| if (LACKMEM_WITH_ESTIMATE(state)) |
| elog(ERROR, "insufficient memory allowed for sort"); |
| } |
| |
| /* |
| * maxTapes, tapeRange, and Algorithm D variables will be initialized by |
| * inittapes(), if needed |
| */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| Assert(!state->statsFinalized); |
| |
| return state; |
| } |
| |
| |
| /* |
| * Initialize some extra CDB attributes for the sort, including limit |
| * and uniqueness. Should do this after begin_heap. |
| * |
| */ |
| void |
| cdb_tuplesort_init_mk(Tuplesortstate_mk *state, |
| int64 offset, int64 limit, int unique, int sort_flags, |
| int64 maxdistinct) |
| { |
| UnusedArg(sort_flags); |
| UnusedArg(maxdistinct); |
| |
| if(limit) |
| { |
| int64 uselimit = offset + limit; |
| |
| /* Only use limit for less than 10 million */ |
| if(uselimit < 10000000) |
| { |
| state->mkctxt.limit = (int32) uselimit; |
| state->mkctxt.limitmask = -1; |
| } |
| } |
| |
| if(unique) |
| state->mkctxt.unique = true; |
| } |
| |
| /* make a copy of current state pos */ |
| void tuplesort_begin_pos_mk(Tuplesortstate_mk *st, TuplesortPos_mk **pos) |
| { |
| TuplesortPos_mk *st_pos; |
| Assert(st); |
| |
| st_pos = (TuplesortPos_mk *) palloc(sizeof(TuplesortPos_mk)); |
| memcpy(st_pos, &(st->pos), sizeof(TuplesortPos_mk)); |
| |
| if(st->tapeset) |
| st_pos->cur_work_tape = LogicalTapeSetDuplicateTape(st->tapeset, st->result_tape); |
| |
| *pos = st_pos; |
| } |
| |
| void |
| create_mksort_context( |
| MKContext *mkctxt, |
| int nkeys, |
| MKFetchDatumForPrepare fetchForPrep, MKFreeTuple freeTupleFn, TupleDesc tupdesc, bool tbyv, int tlen, |
| Oid *sortOperators, |
| AttrNumber *attNums, |
| ScanKey sk) |
| { |
| int i; |
| |
| Assert(mkctxt); |
| mkctxt->total_lv = nkeys; |
| mkctxt->lvctxt = (MKLvContext *) palloc0(sizeof(MKLvContext) * nkeys); |
| |
| AssertEquivalent(sortOperators == NULL, sk != NULL); |
| AssertEquivalent(fetchForPrep==NULL, tupdesc==NULL); |
| |
| mkctxt->fetchForPrep = fetchForPrep; |
| mkctxt->tupdesc = tupdesc; |
| if (tupdesc) |
| mkctxt->mt_bind = create_memtuple_binding(tupdesc); |
| |
| mkctxt->cpfr = tupsort_cpfr; |
| mkctxt->freeTup = freeTupleFn; |
| mkctxt->estimatedExtraForPrep = 0; |
| |
| lc_guess_strxfrm_scaling_factor(&mkctxt->strxfrmScaleFactor, &mkctxt->strxfrmConstantFactor); |
| |
| for(i=0; i<nkeys; ++i) |
| { |
| RegProcedure sortFunction; |
| MKLvContext *sinfo = mkctxt->lvctxt+i; |
| |
| if (sortOperators) |
| { |
| Assert(sortOperators[i] != 0); |
| |
| /* Select a sort function */ |
| SelectSortFunction(sortOperators[i], &sortFunction, &sinfo->sortfnkind); |
| fmgr_info(sortFunction, &sinfo->fmgrinfo); |
| |
| /* Hacking in null first/null last. Untill parser implements null first/last, |
| * we will keep the old postgres behaviour. |
| */ |
| if(sinfo->sortfnkind == SORTFUNC_REVLT || sinfo->sortfnkind == SORTFUNC_REVCMP) |
| sinfo->nullfirst = true; |
| else |
| sinfo->nullfirst = false; |
| } |
| else |
| { |
| sinfo->sortfnkind = SORTFUNC_CMP; |
| sinfo->nullfirst = false; |
| sinfo->fmgrinfo = sk[i].sk_func; /* structural copy */ |
| } |
| |
| AssertImply(attNums, attNums[i] != 0); |
| /* Per lv context info. Need later for prep */ |
| sinfo->attno = attNums ? attNums[i] : i+1; |
| |
| sinfo->lvtype = MKLV_TYPE_NONE; |
| |
| if (tupdesc) |
| { |
| sinfo->typByVal = tupdesc->attrs[sinfo->attno-1]->attbyval; |
| sinfo->typLen = tupdesc->attrs[sinfo->attno-1]->attlen; |
| |
| if (sinfo->fmgrinfo.fn_addr == btint4cmp) |
| sinfo->lvtype = MKLV_TYPE_INT32; |
| if (!lc_collate_is_c()) |
| { |
| if(sinfo->fmgrinfo.fn_addr == bpcharcmp) |
| sinfo->lvtype = MKLV_TYPE_CHAR; |
| else if (sinfo->fmgrinfo.fn_addr == bttextcmp) |
| sinfo->lvtype = MKLV_TYPE_TEXT; |
| } |
| } |
| else |
| { |
| sinfo->typByVal = tbyv; |
| sinfo->typLen = tlen; |
| } |
| sinfo->mkctxt = mkctxt; |
| } |
| } |
| |
| Tuplesortstate_mk * |
| tuplesort_begin_heap_mk(ScanState *ss, |
| TupleDesc tupDesc, |
| int nkeys, |
| Oid *sortOperators, AttrNumber *attNums, |
| int workMem, bool randomAccess) |
| { |
| |
| Tuplesortstate_mk *state = tuplesort_begin_common(ss, workMem, randomAccess, true); |
| MemoryContext oldcontext; |
| |
| oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| AssertArg(nkeys > 0); |
| |
| if (trace_sort) |
| PG_TRACE3(tuplesort__begin, nkeys, workMem, randomAccess); |
| |
| state->nKeys = nkeys; |
| state->copytup = copytup_heap; |
| state->writetup = writetup_heap; |
| state->readtup = readtup_heap; |
| |
| state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ |
| state->mt_bind = create_memtuple_binding(tupDesc); |
| state->cmpScanKey = NULL; |
| |
| create_mksort_context( |
| &state->mkctxt, |
| nkeys, |
| tupsort_fetch_datum_mtup, |
| freetup_heap, |
| tupDesc, 0, 0, |
| sortOperators, attNums, |
| NULL); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return state; |
| } |
| |
| Tuplesortstate_mk * |
| tuplesort_begin_heap_file_readerwriter_mk(ScanState *ss, |
| const char *rwfile_prefix, bool isWriter, |
| TupleDesc tupDesc, |
| int nkeys, |
| Oid *sortOperators, AttrNumber *attNums, |
| int workMem, bool randomAccess) |
| { |
| Tuplesortstate_mk *state; |
| char statedump[MAXPGPATH]; |
| char full_prefix[MAXPGPATH]; |
| |
| Assert(randomAccess); |
| |
| int len = snprintf(statedump, sizeof(statedump), "%s/%s_sortstate", |
| PG_TEMP_FILES_DIR, |
| rwfile_prefix); |
| insist_log(len <= MAXPGPATH - 1, "could not generate temporary file name"); |
| |
| len = snprintf(full_prefix, sizeof(full_prefix), "%s/%s", |
| PG_TEMP_FILES_DIR, |
| rwfile_prefix); |
| insist_log(len <= MAXPGPATH - 1, "could not generate temporary file name"); |
| |
| |
| if(isWriter) |
| { |
| /* |
| * Writer is a ordinary tuplesort, except the underlying buf file are named by |
| * rwfile_prefix. |
| */ |
| state = tuplesort_begin_heap_mk(ss, tupDesc, nkeys, sortOperators, attNums, workMem, randomAccess); |
| |
| state->tapeset_file_prefix = MemoryContextStrdup(state->sortcontext, full_prefix); |
| |
| state->tapeset_state_file = ExecWorkFile_Create(statedump, |
| BUFFILE, |
| true /* delOnClose */ , |
| 0 /* compressType */ ); |
| Assert(state->tapeset_state_file != NULL); |
| |
| return state; |
| } |
| else |
| { |
| |
| /* |
| * For reader, we really don't know anything about sort op, attNums, etc. |
| * All the readers cares are the data on the logical tape set. The state |
| * of the logical tape set has been dumped, so we load it back and that is |
| * it. |
| */ |
| MemoryContext oldctxt; |
| state = tuplesort_begin_common(ss, workMem, randomAccess, false); |
| state->status = TSS_SORTEDONTAPE; |
| state->randomAccess = true; |
| |
| state->readtup = readtup_heap; |
| |
| oldctxt = MemoryContextSwitchTo(state->sortcontext); |
| |
| state->tapeset_file_prefix = MemoryContextStrdup(state->sortcontext, full_prefix); |
| |
| state->tapeset_state_file = ExecWorkFile_Open(statedump, |
| BUFFILE, |
| false /* delOnClose */, |
| 0 /* compressType */); |
| ExecWorkFile *tapefile = ExecWorkFile_Open(full_prefix, |
| BUFFILE, |
| false /* delOnClose */, |
| 0 /* compressType */); |
| |
| state->tapeset = LoadLogicalTapeSetState(state->tapeset_state_file, tapefile); |
| state->currentRun = 0; |
| state->result_tape = LogicalTapeSetGetTape(state->tapeset, 0); |
| |
| state->pos.eof_reached =false; |
| state->pos.markpos.tapepos.blkNum = 0; |
| state->pos.markpos.tapepos.offset = 0; |
| state->pos.markpos.mempos = 0; |
| state->pos.markpos_eof = false; |
| state->pos.cur_work_tape = NULL; |
| |
| MemoryContextSwitchTo(oldctxt); |
| return state; |
| } |
| } |
| |
| Tuplesortstate_mk * |
| tuplesort_begin_index_mk(Relation indexRel, |
| bool enforceUnique, |
| int workMem, bool randomAccess) |
| { |
| Tuplesortstate_mk *state = tuplesort_begin_common(NULL, workMem, randomAccess, true); |
| MemoryContext oldcontext; |
| TupleDesc tupdesc; |
| |
| oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| if (trace_sort) |
| PG_TRACE3(tuplesort__begin, enforceUnique, workMem, randomAccess); |
| |
| state->nKeys = RelationGetNumberOfAttributes(indexRel); |
| tupdesc = RelationGetDescr(indexRel); |
| |
| state->copytup = copytup_index; |
| state->writetup = writetup_index; |
| state->readtup = readtup_index; |
| |
| state->indexRel = indexRel; |
| state->cmpScanKey = _bt_mkscankey_nodata(indexRel); |
| |
| create_mksort_context( |
| &state->mkctxt, |
| state->nKeys, |
| tupsort_fetch_datum_itup, |
| freetup_index, |
| tupdesc, 0, 0, |
| NULL, NULL, state->cmpScanKey); |
| |
| state->mkctxt.enforceUnique = enforceUnique; |
| MemoryContextSwitchTo(oldcontext); |
| |
| return state; |
| } |
| |
| Tuplesortstate_mk * |
| tuplesort_begin_datum_mk(ScanState * ss, |
| Oid datumType, |
| Oid sortOperator, |
| int workMem, bool randomAccess) |
| { |
| Tuplesortstate_mk *state = tuplesort_begin_common(ss, workMem, randomAccess, true); |
| MemoryContext oldcontext; |
| int16 typlen; |
| bool typbyval; |
| |
| oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| if (trace_sort) |
| PG_TRACE3(tuplesort__begin, datumType, workMem, randomAccess); |
| |
| state->nKeys = 1; /* always a one-column sort */ |
| |
| state->copytup = copytup_datum; |
| state->writetup = writetup_datum; |
| state->readtup = readtup_datum; |
| |
| state->datumType = datumType; |
| |
| /* lookup necessary attributes of the datum type */ |
| get_typlenbyval(datumType, &typlen, &typbyval); |
| state->datumTypeLen = typlen; |
| state->datumTypeByVal = typbyval; |
| |
| state->sortOperator = sortOperator; |
| state->cmpScanKey = NULL; |
| create_mksort_context( |
| &state->mkctxt, |
| 1, |
| NULL, /* tupsort_prepare_datum, */ |
| typbyval ? freetup_noop : freetup_datum, |
| NULL, typbyval, typlen, |
| &sortOperator, NULL, NULL); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return state; |
| } |
| |
| /* |
| * tuplesort_end |
| * |
| * Release resources and clean up. |
| * |
| * NOTE: after calling this, any pointers returned by tuplesort_getXXX are |
| * pointing to garbage. Be careful not to attempt to use or free such |
| * pointers afterwards! |
| */ |
| void |
| tuplesort_end_mk(Tuplesortstate_mk *state) |
| { |
| long spaceUsed; |
| |
| if (state->tapeset) |
| spaceUsed = LogicalTapeSetBlocks(state->tapeset); |
| else |
| spaceUsed = (MemoryContextGetCurrentSpace(state->sortcontext) + 1024) / 1024; |
| |
| /* |
| * Delete temporary "tape" files, if any. |
| * |
| * Note: want to include this in reported total cost of sort, hence need |
| * for two #ifdef TRACE_SORT sections. |
| */ |
| if (state->tapeset) |
| { |
| LogicalTapeSetClose(state->tapeset, state->work_set); |
| state->tapeset = NULL; |
| |
| if (state->tapeset_state_file) |
| { |
| workfile_mgr_close_file(state->work_set, state->tapeset_state_file); |
| } |
| } |
| |
| |
| if (state->work_set) |
| { |
| workfile_mgr_close_set(state->work_set); |
| } |
| |
| tuplesort_finalize_stats_mk(state); |
| |
| if (trace_sort) |
| PG_TRACE2(tuplesort__end, state->tapeset ? 1 : 0, spaceUsed); |
| |
| /* |
| * Free the per-sort memory context, thereby releasing all working memory, |
| * including the Tuplesortstate_mk struct itself. |
| */ |
| MemoryContextDelete(state->sortcontext); |
| } |
| |
| /* |
| * tuplesort_finalize_stats_mk |
| * |
| * Finalize the EXPLAIN ANALYZE stats. |
| */ |
| void |
| tuplesort_finalize_stats_mk(Tuplesortstate_mk *state) |
| { |
| if (state->instrument && !state->statsFinalized) |
| { |
| Size maxSpaceUsedOnSort = MemoryContextGetPeakSpace(state->sortcontext); |
| |
| /* Report executor memory used by our memory context. */ |
| state->instrument->execmemused += (double)maxSpaceUsedOnSort; |
| |
| if (state->instrument->workmemused < maxSpaceUsedOnSort) |
| { |
| state->instrument->workmemused = maxSpaceUsedOnSort; |
| } |
| |
| if (state->numTuplesInMem < state->totalNumTuples) |
| { |
| uint64 mem_for_metadata = sizeof(Tuplesortstate_mk) + |
| state->arraySizeBeforeSpill * sizeof(MKEntry); |
| double tupleRatio = ((double)state->totalNumTuples) / ((double)state->numTuplesInMem); |
| |
| /* |
| * The memwanted is summed up of the following: |
| * (1) metadata |
| * (2) the array size |
| * (3) the prorated number of bytes for all tuples, estimated from |
| * the memUsedBeforeSpill. Note that because of our memory allocation |
| * algorithm, the used memory for tuples may be much larger than |
| * the actual bytes needed for tuples. |
| * (4) the prorated number of bytes for extra space needed. |
| */ |
| uint64 memwanted = |
| sizeof(Tuplesortstate_mk) /* (1) */ + |
| state->totalNumTuples * sizeof(MKEntry) /* (2) */ + |
| (uint64)(tupleRatio * (double)(state->memUsedBeforeSpill - mem_for_metadata)) /* (3) */ + |
| (uint64)(tupleRatio * (double)state->mkctxt.estimatedExtraForPrep) /* (4) */ ; |
| |
| state->instrument->workmemwanted = |
| Max(state->instrument->workmemwanted, memwanted); |
| } |
| |
| state->statsFinalized = true; |
| } |
| } |
| |
| |
| |
| /* |
| * tuplesort_set_instrument |
| * |
| * May be called after tuplesort_begin_xxx() to enable reporting of |
| * statistics and events for EXPLAIN ANALYZE. |
| * |
| * The 'instr' and 'explainbuf' ptrs are retained in the 'state' object for |
| * possible use anytime during the sort, up to and including tuplesort_end(). |
| * The caller must ensure that the referenced objects remain allocated and |
| * valid for the life of the Tuplesortstate_mk object; or if they are to be |
| * freed early, disconnect them by calling again with NULL pointers. |
| */ |
| void |
| tuplesort_set_instrument_mk(Tuplesortstate_mk *state, |
| struct Instrumentation *instrument, |
| struct StringInfoData *explainbuf) |
| { |
| state->instrument = instrument; |
| state->explainbuf = explainbuf; |
| } /* tuplesort_set_instrument */ |
| |
| /* |
| * Accept one tuple while collecting input data for sort. |
| * |
| * Note that the input data is always copied; the caller need not save it. |
| */ |
| void |
| tuplesort_puttupleslot_mk(Tuplesortstate_mk *state, TupleTableSlot *slot) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| MKEntry e; |
| mke_blank(&e); |
| |
| COPYTUP(state, &e, (void *) slot); |
| puttuple_common(state, &e); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Accept one index tuple while collecting input data for sort. |
| * |
| * Note that the input tuple is always copied; the caller need not save it. |
| */ |
| void |
| tuplesort_putindextuple_mk(Tuplesortstate_mk *state, IndexTuple tuple) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| MKEntry e; |
| mke_blank(&e); |
| |
| COPYTUP(state, &e, (void *) tuple); |
| puttuple_common(state, &e); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Accept one Datum while collecting input data for sort. |
| * |
| * If the Datum is pass-by-ref type, the value will be copied. |
| */ |
| void |
| tuplesort_putdatum_mk(Tuplesortstate_mk *state, Datum val, bool isNull) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| MKEntry e; |
| mke_blank(&e); |
| |
| /* |
| * If it's a pass-by-reference value, copy it into memory we control, and |
| * decrease availMem. Then call the common code. |
| */ |
| if (isNull || state->datumTypeByVal) |
| { |
| e.d = val; |
| if(isNull) |
| mke_set_null(&e, state->nullfirst); |
| else |
| mke_set_not_null(&e); |
| } |
| else |
| { |
| mke_set_not_null(&e); |
| e.d = datumCopy(val, false, state->datumTypeLen); |
| state->totalTupleBytes += state->datumTypeLen; |
| } |
| |
| puttuple_common(state, &e); |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * grow_unsorted_array |
| * Grow the unsorted array to allow more entries to be inserted later. |
| |
| * If there are no enough memory available for the growth, this function |
| * returns false. Otherwsie, returns true. |
| */ |
| static bool |
| grow_unsorted_array(Tuplesortstate_mk *state) |
| { |
| /* |
| * We want to grow the array twice as large as the previous one. However, |
| * when we are close to the memory limit, we do not want to do so. Otherwise, |
| * too much memory got allocated for the metadata, not to the tuple itself. |
| * We estimate the maximum number of entries that is possible under the |
| * current memory limit by considering both metadata and tuple size. |
| */ |
| if (state->memAllowed < MemoryContextGetCurrentSpace(state->sortcontext)) |
| return false; |
| |
| uint64 availMem = state->memAllowed - MemoryContextGetCurrentSpace(state->sortcontext); |
| uint64 avgTupSize = (uint64)(((double)state->totalTupleBytes) / ((double)state->totalNumTuples)); |
| Assert(avgTupSize >= 0); |
| uint64 avgExtraForPrep = (uint64) (((double)state->mkctxt.estimatedExtraForPrep) / ((double)state->totalNumTuples)); |
| |
| if ((availMem / (sizeof(MKEntry) + avgTupSize + avgExtraForPrep)) == 0) |
| return false; |
| |
| int maxNumEntries = state->entry_allocsize + (availMem / (sizeof(MKEntry) + avgTupSize + avgExtraForPrep)); |
| int newNumEntries = Min(maxNumEntries, state->entry_allocsize * 2); |
| |
| state->entries = (MKEntry *)repalloc(state->entries, newNumEntries * sizeof(MKEntry)); |
| for (int entryNo = state->entry_allocsize; entryNo < newNumEntries; entryNo++) |
| mke_blank(state->entries + entryNo); |
| |
| state->entry_allocsize = newNumEntries; |
| |
| return true; |
| } |
| |
| |
| /* |
| * Shared code for tuple and datum cases. |
| */ |
| static void |
| puttuple_common(Tuplesortstate_mk *state, MKEntry *e) |
| { |
| Assert(is_under_sort_ctxt(state)); |
| state->totalNumTuples++; |
| |
| if(state->gpmon_pkt) |
| Gpmon_M_Incr(state->gpmon_pkt, GPMON_QEXEC_M_ROWSIN); |
| |
| bool growSucceed = true; |
| |
| switch (state->status) |
| { |
| case TSS_INITIAL: |
| |
| /* |
| * Save the tuple into the unsorted array. First, grow the array |
| * as needed. Note that we try to grow the array when there is |
| * still one free slot remaining --- if we fail, there'll still be |
| * room to store the incoming tuple, and then we'll switch to |
| * tape-based operation. |
| */ |
| if (!state->mkheap && state->entry_count >= state->entry_allocsize - 1) |
| { |
| growSucceed = grow_unsorted_array(state); |
| } |
| |
| /* full sort? */ |
| if (state->mkctxt.limit == 0) |
| { |
| tuplesort_inmem_nolimit_insert(state,e); |
| } |
| else |
| { |
| /* Limit sort insert */ |
| tuplesort_inmem_limit_insert(state,e); |
| } |
| |
| /* If out of work_mem, switch to diskmode */ |
| if(!growSucceed) |
| { |
| if (state->mkheap != NULL) |
| { |
| /* Corner case. LIMIT == amount of entries in memory. |
| * In this case, we failed to grow array, but we just created |
| * the heap. No need to spill in this case, we'll just use the in-memory heap. */ |
| Assert(state->mkctxt.limit != 0); |
| Assert(state->numTuplesInMem == state->mkheap->count); |
| } |
| else |
| { |
| Assert(state->mkheap == NULL); |
| Assert(state->entry_count > 0); |
| state->arraySizeBeforeSpill = state->entry_allocsize; |
| state->memUsedBeforeSpill = MemoryContextGetPeakSpace(state->sortcontext); |
| inittapes_mk(state, is_sortstate_rwfile(state) ? state->tapeset_file_prefix : NULL); |
| Assert(state->status == TSS_BUILDRUNS); |
| } |
| |
| if (state->instrument) |
| { |
| state->instrument->workfileCreated = true; |
| } |
| } |
| |
| break; |
| |
| case TSS_BUILDRUNS: |
| |
| /* |
| * Insert the tuple into the heap |
| */ |
| Assert(state->mkheap); |
| tuplesort_heap_insert(state, e); |
| break; |
| default: |
| elog(ERROR, "invalid tuplesort state"); |
| break; |
| } |
| } |
| |
| /* |
| * All tuples have been provided; finish the sort. |
| */ |
| void |
| tuplesort_performsort_mk(Tuplesortstate_mk *state) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| if (trace_sort) |
| PG_TRACE(tuplesort__perform__sort); |
| |
| switch (state->status) |
| { |
| case TSS_INITIAL: |
| /* |
| * We were able to accumulate all the tuples within the allowed |
| * amount of memory. Just qsort 'em and we're done. |
| */ |
| if(state->mkctxt.limit == 0) |
| mk_qsort(state->entries, state->entry_count, &state->mkctxt); |
| else |
| tuplesort_limit_sort(state); |
| |
| state->pos.current = 0; |
| state->pos.eof_reached = false; |
| state->pos.markpos.mempos = 0; |
| state->pos.markpos_eof = false; |
| state->status = TSS_SORTEDINMEM; |
| |
| /* Not shareinput sort, we are done. */ |
| if(!is_sortstate_rwfile(state)) |
| break; |
| |
| /* Shareinput sort, need to put this stuff onto disk */ |
| inittapes_mk(state, state->tapeset_file_prefix); |
| /* Fall through */ |
| case TSS_BUILDRUNS: |
| |
| #ifdef PRINT_SPILL_AND_MEMORY_MESSAGES |
| elog(INFO, "Done building runs. Mem peak is now %ld", (long)MemoryContextGetPeakSpace(state->sortcontext)); |
| #endif // PRINT_SPILL_AND_MEMORY_MESSAGES |
| |
| dumptuples_mk(state, true); |
| |
| #ifdef PRINT_SPILL_AND_MEMORY_MESSAGES |
| elog(INFO, "Done tuple dump. Mem peak is now %ld", (long)MemoryContextGetPeakSpace(state->sortcontext)); |
| #endif // PRINT_SPILL_AND_MEMORY_MESSAGES |
| |
| HOLD_INTERRUPTS(); |
| /* MPP-18288: Do not change this log message, it is used to test mksort query cancellation */ |
| elog(DEBUG1,"ExecSort: mksort starting merge runs >>======== "); |
| RESUME_INTERRUPTS(); |
| |
| mergeruns(state); |
| |
| HOLD_INTERRUPTS(); |
| /* MPP-18288: Do not change this log message, it is used to test mksort query cancellation */ |
| elog(DEBUG1, "ExecSort: mksort finished merge runs ++++++++>> "); |
| RESUME_INTERRUPTS(); |
| |
| state->pos.eof_reached = false; |
| state->pos.markpos.tapepos.blkNum = 0; |
| state->pos.markpos.tapepos.offset = 0; |
| state->pos.markpos_eof = false; |
| |
| /* |
| * If we're planning to reuse the spill files from this sort, |
| * save metadata here and mark work_set complete. |
| */ |
| if (gp_workfile_caching && state->work_set) |
| { |
| tuplesort_write_spill_metadata_mk(state); |
| |
| /* We don't know how to handle TSS_FINALMERGE yet */ |
| Assert(state->status == TSS_SORTEDONTAPE); |
| Assert(state->work_set); |
| |
| workfile_mgr_mark_complete(state->work_set); |
| } |
| |
| break; |
| |
| default: |
| elog(ERROR, "Invalid tuplesort state"); |
| break; |
| } |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| void tuplesort_flush_mk(Tuplesortstate_mk *state) |
| { |
| Assert(state->status == TSS_SORTEDONTAPE); |
| Assert(state->tapeset && state->tapeset_state_file); |
| Assert(state->pos.cur_work_tape == NULL); |
| elog(gp_workfile_caching_loglevel, "tuplesort_mk: writing logical tape state to file"); |
| LogicalTapeFlush(state->tapeset, state->result_tape, state->tapeset_state_file); |
| ExecWorkFile_Flush(state->tapeset_state_file); |
| } |
| |
| /* |
| * Internal routine to fetch the next tuple in either forward or back |
| * direction into *stup. Returns FALSE if no more tuples. |
| * If *should_free is set, the caller must pfree stup.tuple when done with it. |
| */ |
| static bool |
| tuplesort_gettuple_common_pos(Tuplesortstate_mk *state, TuplesortPos_mk *pos, |
| bool forward, MKEntry *e, bool *should_free) |
| { |
| uint32 tuplen; |
| LogicalTape *work_tape; |
| bool fOK; |
| |
| Assert(is_under_sort_ctxt(state)); |
| |
| switch (state->status) |
| { |
| case TSS_SORTEDINMEM: |
| Assert(forward || state->randomAccess); |
| *should_free = false; |
| |
| if (forward) |
| { |
| if ( state->mkctxt.unique) |
| { |
| /** |
| * request to skip duplicates. qsort has already replaced all but one of each |
| * values with empty so skip the empties |
| * When limit heap sort was used, the limit heap sort should have done this |
| */ |
| while ( pos->current < state->entry_count && |
| mke_is_empty( &state->entries[pos->current])) |
| { |
| pos->current++; |
| } |
| } |
| |
| if (pos->current < state->entry_count) |
| { |
| *e = state->entries[pos->current]; |
| pos->current++; |
| return true; |
| } |
| pos->eof_reached = true; |
| return false; |
| } |
| else |
| { |
| if (pos->current <= 0) |
| return false; |
| |
| /* |
| * if all tuples are fetched already then we return last |
| * tuple, else - tuple before last returned. |
| */ |
| if (pos->eof_reached) |
| pos->eof_reached = false; |
| else |
| { |
| pos->current--; /* pos->current points to one above the last returned tuple */ |
| if (pos->current <= 0) |
| return false; |
| } |
| |
| if ( state->mkctxt.unique) |
| { |
| /** |
| * request to skip duplicates. qsort has already replaced all but one of each |
| * values with empty so skip the empties |
| * When limit heap sort was used, the limit heap sort should have done this |
| */ |
| while ( pos->current - 1 >= 0 && |
| mke_is_empty( &state->entries[pos->current - 1])) |
| { |
| pos->current--; |
| } |
| if ( pos->current <= 0) |
| return false; |
| } |
| |
| *e = state->entries[pos->current-1]; |
| |
| return true; |
| } |
| break; |
| |
| case TSS_SORTEDONTAPE: |
| AssertEquivalent((pos == &state->pos), (pos->cur_work_tape == NULL)); |
| Assert(forward || state->randomAccess); |
| *should_free = true; |
| work_tape = pos->cur_work_tape == NULL ? state->result_tape : pos->cur_work_tape; |
| |
| if (forward) |
| { |
| if (pos->eof_reached) |
| return false; |
| if ((tuplen = getlen(state, pos, work_tape, true)) != 0) |
| { |
| READTUP(state, pos, e, work_tape, tuplen); |
| return true; |
| } |
| else |
| { |
| pos->eof_reached = true; |
| return false; |
| } |
| } |
| |
| /* |
| * Backward. |
| * |
| * if all tuples are fetched already then we return last tuple, |
| * else - tuple before last returned. |
| */ |
| /* |
| * Seek position is pointing just past the zero tuplen at the |
| * end of file; back up to fetch last tuple's ending length |
| * word. If seek fails we must have a completely empty file. |
| */ |
| fOK = LogicalTapeBackspace(state->tapeset, work_tape, 2*sizeof(uint32)); |
| if(!fOK) |
| return false; |
| |
| if (pos->eof_reached) |
| { |
| pos->eof_reached = false; |
| } |
| else |
| { |
| tuplen = getlen(state, pos, work_tape, false); |
| |
| /* |
| * Back up to get ending length word of tuple before it. |
| */ |
| fOK = LogicalTapeBackspace(state->tapeset, work_tape, tuplen + 2 *sizeof(uint32)); |
| |
| if (!fOK) |
| { |
| /* |
| * If that fails, presumably the prev tuple is the first |
| * in the file. Back up so that it becomes next to read |
| * in forward direction (not obviously right, but that is |
| * what in-memory case does). |
| */ |
| fOK = LogicalTapeBackspace(state->tapeset, work_tape, tuplen + 2 *sizeof(uint32)); |
| if(!fOK) |
| elog(ERROR, "bogus tuple length in backward scan"); |
| |
| return false; |
| } |
| } |
| |
| tuplen = getlen(state, pos, work_tape, false); |
| |
| /* |
| * Now we have the length of the prior tuple, back up and read it. |
| * Note: READTUP expects we are positioned after the initial |
| * length word of the tuple, so back up to that point. |
| */ |
| fOK = LogicalTapeBackspace(state->tapeset, work_tape, tuplen); |
| if (!fOK) |
| elog(ERROR, "bogus tuple length in backward scan"); |
| |
| READTUP(state, pos, e, work_tape, tuplen); |
| return true; |
| |
| case TSS_FINALMERGE: |
| Assert(forward); |
| Assert(pos == &state->pos && pos->cur_work_tape == NULL); |
| |
| *should_free = true; |
| mkheap_putAndGet(state->mkheap, e); |
| return !mke_is_empty(e); |
| |
| default: |
| elog(ERROR, "invalid tuplesort state"); |
| return false; /* keep compiler quiet */ |
| } |
| } |
| |
| /* |
| * Fetch the next tuple in either forward or back direction. |
| * If successful, put tuple in slot and return TRUE; else, clear the slot |
| * and return FALSE. |
| */ |
| bool |
| tuplesort_gettupleslot_mk(Tuplesortstate_mk *state, bool forward, |
| TupleTableSlot *slot) |
| { |
| return tuplesort_gettupleslot_pos_mk(state, &state->pos, forward, slot); |
| } |
| |
| bool |
| tuplesort_gettupleslot_pos_mk(Tuplesortstate_mk *state, TuplesortPos_mk *pos, |
| bool forward, TupleTableSlot *slot) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| MKEntry e; |
| |
| bool should_free = false; |
| bool fOK; |
| |
| mke_set_empty(&e); |
| fOK = tuplesort_gettuple_common_pos(state, pos, forward, &e, &should_free); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| if (fOK) |
| { |
| Assert(!mke_is_empty(&e)); |
| ExecStoreMemTuple(e.ptr, slot, should_free); |
| |
| #ifdef USE_ASSERT_CHECKING |
| if (should_free && state->mkheap != NULL && state->mkheap->count > 0) |
| { |
| Assert(e.ptr != NULL && |
| (e.ptr != state->mkheap->lvtops->ptr)); |
| } |
| |
| #endif |
| |
| if(state->gpmon_pkt) |
| Gpmon_M_Incr_Rows_Out(state->gpmon_pkt); |
| |
| return true; |
| } |
| |
| ExecClearTuple(slot); |
| return false; |
| } |
| |
| /* |
| * Fetch the next index tuple in either forward or back direction. |
| * Returns NULL if no more tuples. If *should_free is set, the |
| * caller must pfree the returned tuple when done with it. |
| */ |
| IndexTuple |
| tuplesort_getindextuple_mk(Tuplesortstate_mk *state, bool forward, |
| bool *should_free) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| MKEntry e; |
| bool fOK = tuplesort_gettuple_common_pos(state, &state->pos, forward, &e, should_free); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| if(fOK) |
| return (IndexTuple) (e.ptr); |
| return NULL; |
| } |
| |
| /* |
| * Fetch the next Datum in either forward or back direction. |
| * Returns FALSE if no more datums. |
| * |
| * If the Datum is pass-by-ref type, the returned value is freshly palloc'd |
| * and is now owned by the caller. |
| */ |
| bool |
| tuplesort_getdatum_mk(Tuplesortstate_mk *state, bool forward, |
| Datum *val, bool *isNull) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| MKEntry e; |
| bool should_free = false; |
| |
| bool fOK = tuplesort_gettuple_common_pos(state, &state->pos, forward, &e, &should_free); |
| |
| if(fOK) |
| { |
| *isNull = mke_is_null(&e); |
| if(*isNull || state->datumTypeByVal || should_free) |
| *val = e.d; |
| else |
| *val = datumCopy(e.d, false, state->datumTypeLen); |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| return fOK; |
| } |
| |
| /* |
| * tuplesort_merge_order - report merge order we'll use for given memory |
| * (note: "merge order" just means the number of input tapes in the merge). |
| * |
| * This is exported for use by the planner. allowedMem is in bytes. |
| */ |
| int |
| tuplesort_merge_order(long allowedMem) |
| { |
| int mOrder; |
| |
| /* |
| * We need one tape for each merge input, plus another one for the output, |
| * and each of these tapes needs buffer space. In addition we want |
| * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't |
| * count). |
| * |
| * Note: you might be thinking we need to account for the memtuples[] |
| * array in this calculation, but we effectively treat that as part of the |
| * MERGE_BUFFER_SIZE workspace. |
| */ |
| mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / |
| (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); |
| |
| mOrder = Max(mOrder, MINORDER); |
| mOrder = Min(mOrder, MAXORDER); |
| |
| return mOrder; |
| } |
| |
| /* |
| * inittapes - initialize for tape sorting. |
| * |
| * This is called only if we have found we don't have room to sort in memory. |
| */ |
| void |
| inittapes_mk(Tuplesortstate_mk *state, const char* rwfile_prefix) |
| { |
| int maxTapes; |
| int j; |
| long tapeSpace; |
| |
| Assert(is_under_sort_ctxt(state)); |
| |
| /* Compute number of tapes to use: merge order plus 1 */ |
| maxTapes = tuplesort_merge_order(state->memAllowed) + 1; |
| |
| #ifdef PRINT_SPILL_AND_MEMORY_MESSAGES |
| elog(INFO, "Spilling after %d", (int) state->entry_count); |
| #endif // PRINT_SPILL_AND_MEMORY_MESSAGES |
| /* |
| * We must have at least 2*maxTapes slots in the memtuples[] array, else |
| * we'd not have room for merge heap plus preread. It seems unlikely that |
| * this case would ever occur, but be safe. |
| */ |
| maxTapes = Min(maxTapes, state->entry_count / 2); |
| maxTapes = Max(maxTapes, 1); |
| |
| #ifdef PRINT_SPILL_AND_MEMORY_MESSAGES |
| elog(INFO, " maxtapes %d Mem peak is now %ld", maxTapes, (long)MemoryContextGetPeakSpace(state->sortcontext)); |
| #endif // PRINT_SPILL_AND_MEMORY_MESSAGES |
| |
| /* XXX XXX: with losers, only need 1x slots because we don't need a merge heap */ |
| |
| state->maxTapes = maxTapes; |
| state->tapeRange = maxTapes - 1; |
| |
| if (trace_sort) |
| PG_TRACE1(tuplesort__switch__external, maxTapes); |
| |
| /* |
| * Decrease availMem to reflect the space needed for tape buffers; but |
| * don't decrease it to the point that we have no room for tuples. (That |
| * case is only likely to occur if sorting pass-by-value Datums; in all |
| * other scenarios the memtuples[] array is unlikely to occupy more than |
| * half of allowedMem. In the pass-by-value case it's not important to |
| * account for tuple space, so we don't care if LACKMEM becomes |
| * inaccurate.) |
| */ |
| tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD; |
| |
| Assert(state->work_set == NULL); |
| PlanState *ps = NULL; |
| bool can_be_reused = false; |
| if (state->ss != NULL) |
| { |
| ps = &state->ss->ps; |
| Sort *node = (Sort *) ps->plan; |
| if (node->share_type == SHARE_NOTSHARED) |
| { |
| /* Only attempt to cache when not shared under a ShareInputScan */ |
| can_be_reused = true; |
| } |
| } |
| |
| /* |
| * Create the tape set and allocate the per-tape data arrays. |
| */ |
| if(!rwfile_prefix) |
| { |
| state->work_set = workfile_mgr_create_set(BUFFILE, can_be_reused, ps, NULL_SNAPSHOT); |
| state->tapeset_state_file = workfile_mgr_create_fileno(state->work_set, WORKFILE_NUM_MKSORT_METADATA); |
| |
| ExecWorkFile *tape_file = workfile_mgr_create_fileno(state->work_set, WORKFILE_NUM_MKSORT_TAPESET); |
| state->tapeset = LogicalTapeSetCreate_File(tape_file, maxTapes); |
| } |
| else |
| { |
| /* We are shared XSLICE, use given prefix to create files so that consumers can find them */ |
| ExecWorkFile *tape_file = ExecWorkFile_Create(rwfile_prefix, |
| BUFFILE, |
| true /* delOnClose */, |
| 0 /* compressType */); |
| |
| state->tapeset = LogicalTapeSetCreate_File(tape_file, maxTapes); |
| } |
| |
| state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); |
| state->mergenext = (int *) palloc0(maxTapes * sizeof(int)); |
| state->mergelast = (int *) palloc0(maxTapes * sizeof(int)); |
| state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int)); |
| state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long)); |
| state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); |
| state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); |
| state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); |
| state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); |
| |
| /* |
| * Convert the unsorted contents of memtuples[] into a heap. Each tuple is |
| * marked as belonging to run number zero. |
| * |
| * NOTE: we pass false for checkIndex since there's no point in comparing |
| * indexes in this step, even though we do intend the indexes to be part |
| * of the sort key... |
| */ |
| Assert(state->mkheap == NULL); |
| Assert(state->status == TSS_INITIAL || state->status == TSS_SORTEDINMEM); |
| |
| #ifdef PRINT_SPILL_AND_MEMORY_MESSAGES |
| elog(INFO, " about to make heap mem peak is now %ld", (long)MemoryContextGetPeakSpace(CurrentMemoryContext)); |
| #endif // PRINT_SPILL_AND_MEMORY_MESSAGES |
| |
| if(state->status == TSS_INITIAL) |
| { |
| /* |
| * We are now building heap from array for run-building using |
| * replacement-selection algorithm. Such run-building heap need |
| * to be a MIN-HEAP, but for limit sorting, we use a MAX-HEAP. The way we |
| * convert MIN-HEAP to MAX-HEAP is by setting the limitmask to -1, and then |
| * using the limitmask in mkheap_compare() in tuplesort_mkheap.c. So, to restore |
| * the MAX-HEAP to a MIN-HEAP, we can just revert the limitmask to 0. |
| * This is needed for MPP-19310 and MPP-19857 |
| */ |
| state->mkctxt.limitmask = 0; |
| |
| state->mkheap = mkheap_from_array(state->entries, state->entry_allocsize, state->entry_count, &state->mkctxt); |
| state->entries = NULL; |
| state->entry_allocsize = 0; |
| state->entry_count = 0; |
| } |
| |
| state->currentRun = 0; |
| |
| /* |
| * Initialize variables of Algorithm D (step D1). |
| */ |
| for (j = 0; j < maxTapes; j++) |
| { |
| state->tp_fib[j] = 1; |
| state->tp_runs[j] = 0; |
| state->tp_dummy[j] = 1; |
| state->tp_tapenum[j] = j; |
| } |
| state->tp_fib[state->tapeRange] = 0; |
| state->tp_dummy[state->tapeRange] = 0; |
| |
| state->Level = 1; |
| state->destTape = 0; |
| |
| state->status = TSS_BUILDRUNS; |
| #ifdef PRINT_SPILL_AND_MEMORY_MESSAGES |
| elog(INFO, " build-run ready mem peak is now %ld", (long)MemoryContextGetPeakSpace(state->sortcontext)); |
| #endif // PRINT_SPILL_AND_MEMORY_MESSAGES |
| |
| } |
| |
| /* |
| * selectnewtape -- select new tape for new initial run. |
| * |
| * This is called after finishing a run when we know another run |
| * must be started. This implements steps D3, D4 of Algorithm D. |
| */ |
| static void |
| selectnewtape_mk(Tuplesortstate_mk *state) |
| { |
| int j; |
| int a; |
| |
| /* Step D3: advance j (destTape) */ |
| if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) |
| { |
| state->destTape++; |
| return; |
| } |
| if (state->tp_dummy[state->destTape] != 0) |
| { |
| state->destTape = 0; |
| return; |
| } |
| |
| /* Step D4: increase level */ |
| state->Level++; |
| a = state->tp_fib[0]; |
| for (j = 0; j < state->tapeRange; j++) |
| { |
| state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; |
| state->tp_fib[j] = a + state->tp_fib[j + 1]; |
| } |
| state->destTape = 0; |
| } |
| |
| /* |
| * mergeruns -- merge all the completed initial runs. |
| * |
| * This implements steps D5, D6 of Algorithm D. All input data has |
| * already been written to initial runs on tape (see dumptuples). |
| */ |
| static void |
| mergeruns(Tuplesortstate_mk *state) |
| { |
| int tapenum, |
| svTape, |
| svRuns, |
| svDummy; |
| LogicalTape *lt = NULL; |
| |
| Assert(state->status == TSS_BUILDRUNS); |
| |
| #ifdef FAULT_INJECTOR |
| /* |
| * MPP-18288: We're injecting an interrupt here. We have to hold interrupts |
| * while we're injecting it to make sure the interrupt is not handled |
| * within the fault injector itself. |
| */ |
| HOLD_INTERRUPTS(); |
| FaultInjector_InjectFaultIfSet( |
| ExecSortMKSortMergeRuns, |
| DDLNotSpecified, |
| "", // databaseName |
| ""); // tableName |
| RESUME_INTERRUPTS(); |
| #endif |
| |
| /* |
| * If we produced only one initial run (quite likely if the total data |
| * volume is between 1X and 2X workMem), we can just use that tape as the |
| * finished output, rather than doing a useless merge. (This obvious |
| * optimization is not in Knuth's algorithm.) |
| */ |
| if (state->currentRun == 1) |
| { |
| state->result_tape = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->destTape]); |
| /* must freeze and rewind the finished output tape */ |
| LogicalTapeFreeze(state->tapeset, state->result_tape); |
| state->status = TSS_SORTEDONTAPE; |
| return; |
| } |
| |
| /* End of step D2: rewind all output tapes to prepare for merging */ |
| for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
| { |
| lt = LogicalTapeSetGetTape(state->tapeset, tapenum); |
| LogicalTapeRewind(state->tapeset, lt, false); |
| } |
| |
| /* Clear gpmon for respilling data */ |
| if(state->gpmon_pkt) |
| { |
| Gpmon_M_Incr(state->gpmon_pkt, GPMON_SORT_SPILLPASS); |
| Gpmon_M_Reset(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_TUPLE); |
| Gpmon_M_Reset(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_BYTE); |
| } |
| |
| for (;;) |
| { |
| /* |
| * At this point we know that tape[T] is empty. If there's just one |
| * (real or dummy) run left on each input tape, then only one merge |
| * pass remains. If we don't have to produce a materialized sorted |
| * tape, we can stop at this point and do the final merge on-the-fly. |
| */ |
| |
| /* If workfile caching is enabled, always do the final merging |
| * and store the sorted result on disk, instead of stopping before the |
| * last merge iteration. |
| * This can cause some slowdown compared to no workfile caching, but |
| * it enables us to re-use the mechanism to dump and restore logical |
| * tape set information as-is. |
| */ |
| if (!state->randomAccess && !gp_workfile_caching) |
| { |
| bool allOneRun = true; |
| |
| Assert(state->tp_runs[state->tapeRange] == 0); |
| for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
| { |
| if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) |
| { |
| allOneRun = false; |
| break; |
| } |
| } |
| if (allOneRun) |
| { |
| /* Tell logtape.c we won't be writing anymore */ |
| LogicalTapeSetForgetFreeSpace(state->tapeset); |
| /* Initialize for the final merge pass */ |
| beginmerge(state); |
| state->status = TSS_FINALMERGE; |
| return; |
| } |
| } |
| |
| /* Step D5: merge runs onto tape[T] until tape[P] is empty */ |
| while (state->tp_runs[state->tapeRange - 1] || |
| state->tp_dummy[state->tapeRange - 1]) |
| { |
| bool allDummy = true; |
| |
| for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
| { |
| if (state->tp_dummy[tapenum] == 0) |
| { |
| allDummy = false; |
| break; |
| } |
| } |
| |
| if (allDummy) |
| { |
| state->tp_dummy[state->tapeRange]++; |
| for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
| state->tp_dummy[tapenum]--; |
| } |
| else |
| mergeonerun_mk(state); |
| } |
| |
| /* Step D6: decrease level */ |
| if (--state->Level == 0) |
| break; |
| |
| /* rewind output tape T to use as new input */ |
| lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->tapeRange]); |
| LogicalTapeRewind(state->tapeset, lt, false); |
| |
| /* rewind used-up input tape P, and prepare it for write pass */ |
| lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); |
| LogicalTapeRewind(state->tapeset, lt, true); |
| |
| state->tp_runs[state->tapeRange - 1] = 0; |
| |
| /* |
| * reassign tape units per step D6; note we no longer care about A[] |
| */ |
| svTape = state->tp_tapenum[state->tapeRange]; |
| svDummy = state->tp_dummy[state->tapeRange]; |
| svRuns = state->tp_runs[state->tapeRange]; |
| for (tapenum = state->tapeRange; tapenum > 0; tapenum--) |
| { |
| state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; |
| state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; |
| state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; |
| } |
| state->tp_tapenum[0] = svTape; |
| state->tp_dummy[0] = svDummy; |
| state->tp_runs[0] = svRuns; |
| } |
| |
| /* |
| * Done. Knuth says that the result is on TAPE[1], but since we exited |
| * the loop without performing the last iteration of step D6, we have not |
| * rearranged the tape unit assignment, and therefore the result is on |
| * TAPE[T]. We need to do it this way so that we can freeze the final |
| * output tape while rewinding it. The last iteration of step D6 would be |
| * a waste of cycles anyway... |
| */ |
| state->result_tape = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->tapeRange]); |
| LogicalTapeFreeze(state->tapeset, state->result_tape); |
| state->status = TSS_SORTEDONTAPE; |
| } |
| |
| /* |
| * Merge one run from each input tape, except ones with dummy runs. |
| * |
| * This is the inner loop of Algorithm D step D5. We know that the |
| * output tape is TAPE[T]. |
| */ |
| void |
| mergeonerun_mk(Tuplesortstate_mk *state) |
| { |
| int destTape = state->tp_tapenum[state->tapeRange]; |
| MKEntry e; |
| |
| LogicalTape *lt = NULL; |
| |
| /* |
| * Start the merge by loading one tuple from each active source tape into |
| * the heap. We can also decrease the input run/dummy run counts. |
| */ |
| beginmerge(state); |
| |
| /* |
| * Execute merge by repeatedly extracting lowest tuple in heap, writing it |
| * out, and replacing it with next tuple from same tape (if there is |
| * another one). |
| */ |
| lt = LogicalTapeSetGetTape(state->tapeset, destTape); |
| |
| Assert(state->mkheap); |
| |
| while (mkheap_putAndGet(state->mkheap, &e) >= 0) |
| WRITETUP(state, lt, &e); |
| |
| /* |
| * When the heap empties, we're done. Write an end-of-run marker on the |
| * output tape, and increment its count of real runs. |
| */ |
| markrunend(state, destTape); |
| state->tp_runs[state->tapeRange]++; |
| |
| if (trace_sort) |
| PG_TRACE1(tuplesort__mergeonerun, state->activeTapes); |
| } |
| |
| static bool tupsort_preread(TupsortMergeReadCtxt *ctxt) |
| { |
| uint32 tuplen; |
| |
| Assert(ctxt->mem_allowed > 0); |
| |
| if(!ctxt->pos.cur_work_tape || ctxt->pos.eof_reached) |
| return false; |
| |
| ctxt->mem_used = 0; |
| |
| Assert(ctxt->p && ctxt->allocsz > 0); |
| Assert(ctxt->mem_allowed > 0); |
| |
| for(ctxt->cnt=0; |
| ctxt->cnt<ctxt->allocsz && ctxt->mem_used < ctxt->mem_allowed; |
| ++ctxt->cnt) |
| { |
| tuplen = getlen(ctxt->tsstate, &ctxt->pos, ctxt->pos.cur_work_tape, true); |
| if(tuplen != 0) |
| { |
| MKEntry *e = ctxt->p + ctxt->cnt; |
| READTUP(ctxt->tsstate, &ctxt->pos, e, ctxt->pos.cur_work_tape, tuplen); |
| ctxt->mem_used += tuplen; |
| } |
| else |
| { |
| ctxt->pos.eof_reached = true; |
| break; |
| } |
| } |
| |
| ctxt->cur = 0; |
| return ctxt->cnt > ctxt->cur; |
| } |
| |
| static bool tupsort_mergeread(void *pvctxt, MKEntry *e) |
| { |
| TupsortMergeReadCtxt *ctxt = (TupsortMergeReadCtxt *) pvctxt; |
| |
| Assert(ctxt->mem_allowed > 0); |
| |
| if(ctxt->cur < ctxt->cnt) |
| { |
| *e = ctxt->p[ctxt->cur++]; |
| return true; |
| } |
| |
| if(!tupsort_preread(ctxt)) |
| return false; |
| |
| Assert(ctxt->cur == 0 && ctxt->cnt > 0); |
| *e = ctxt->p[ctxt->cur++]; |
| return true; |
| } |
| |
| /* |
| * beginmerge - initialize for a merge pass |
| * |
| * We decrease the counts of real and dummy runs for each tape, and mark |
| * which tapes contain active input runs in mergeactive[]. Then, load |
| * as many tuples as we can from each active input tape, and finally |
| * fill the merge heap with the first tuple from each active tape. |
| */ |
| static void |
| beginmerge(Tuplesortstate_mk *state) |
| { |
| int activeTapes; |
| int tapenum; |
| int srcTape; |
| int totalSlots; |
| int slotsPerTape; |
| long spacePerTape; |
| |
| int i; |
| |
| MemoryContext oldctxt; |
| |
| /* Heap should be empty here */ |
| Assert(mkheap_empty(state->mkheap)); |
| |
| /* Adjust run counts and mark the active tapes */ |
| memset(state->mergeactive, 0, |
| state->maxTapes * sizeof(*state->mergeactive)); |
| activeTapes = 0; |
| for (tapenum = 0; tapenum < state->tapeRange; tapenum++) |
| { |
| if (state->tp_dummy[tapenum] > 0) |
| state->tp_dummy[tapenum]--; |
| else |
| { |
| Assert(state->tp_runs[tapenum] > 0); |
| state->tp_runs[tapenum]--; |
| srcTape = state->tp_tapenum[tapenum]; |
| state->mergeactive[srcTape] = true; |
| activeTapes++; |
| } |
| } |
| state->activeTapes = activeTapes; |
| |
| /* Clear merge-pass state variables */ |
| memset(state->mergenext, 0, |
| state->maxTapes * sizeof(*state->mergenext)); |
| memset(state->mergelast, 0, |
| state->maxTapes * sizeof(*state->mergelast)); |
| state->mergefreelist = 0; /* nothing in the freelist */ |
| state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ |
| |
| /* |
| * Initialize space allocation to let each active input tape have an equal |
| * share of preread space. |
| */ |
| Assert(activeTapes > 0); |
| totalSlots = (state->mkheap == NULL) ? state->entry_allocsize : state->mkheap->alloc_size; |
| slotsPerTape = (totalSlots - state->mergefirstfree) / activeTapes; |
| slotsPerTape = Max(slotsPerTape, 128); |
| spacePerTape = state->memAllowed / activeTapes; |
| |
| |
| oldctxt = MemoryContextSwitchTo(state->sortcontext); |
| if(state->mkheap) |
| { |
| mkheap_destroy(state->mkheap); |
| state->mkheap = NULL; |
| } |
| |
| if(state->mkhreader) |
| { |
| Assert(state->mkhreader_ctxt); |
| for(i=0; i<state->mkhreader_allocsize; ++i) |
| { |
| TupsortMergeReadCtxt *mkhr_ctxt = state->mkhreader_ctxt+i; |
| AssertEquivalent(mkhr_ctxt->p!=NULL, mkhr_ctxt->allocsz>0); |
| if(mkhr_ctxt->p) |
| pfree(mkhr_ctxt->p); |
| } |
| |
| pfree(state->mkhreader_ctxt); |
| pfree(state->mkhreader); |
| } |
| |
| state->mkhreader = palloc0(sizeof(MKHeapReader) * activeTapes); |
| state->mkhreader_ctxt = palloc0(sizeof(TupsortMergeReadCtxt) * activeTapes); |
| |
| for(i=0; i<activeTapes; ++i) |
| { |
| TupsortMergeReadCtxt *mkhr_ctxt = state->mkhreader_ctxt+i; |
| state->mkhreader[i].reader = tupsort_mergeread; |
| state->mkhreader[i].mkhr_ctxt = mkhr_ctxt; |
| mkhr_ctxt->tsstate = state; |
| } |
| |
| state->mkhreader_allocsize = activeTapes; |
| |
| for (i=0, srcTape = 0; srcTape < state->maxTapes; srcTape++) |
| { |
| if (state->mergeactive[srcTape]) |
| { |
| TupsortMergeReadCtxt *mkhr_ctxt = state->mkhreader_ctxt+i; |
| |
| mkhr_ctxt->pos.cur_work_tape = LogicalTapeSetGetTape(state->tapeset, srcTape); |
| mkhr_ctxt->pos.eof_reached = false; |
| mkhr_ctxt->mem_allowed = spacePerTape; |
| Assert(mkhr_ctxt->mem_allowed > 0); |
| mkhr_ctxt->mem_used = 0; |
| mkhr_ctxt->cur = 0; |
| mkhr_ctxt->cnt = 0; |
| |
| Assert(mkhr_ctxt->p == NULL); |
| Assert(slotsPerTape > 0); |
| mkhr_ctxt->p = (MKEntry *) palloc(sizeof(MKEntry) * slotsPerTape); |
| mkhr_ctxt->allocsz = slotsPerTape; |
| |
| ++i; |
| } |
| } |
| |
| Assert(i==activeTapes); |
| |
| state->mkheap = mkheap_from_reader(state->mkhreader, i, &state->mkctxt); |
| Assert(state->mkheap); |
| |
| MemoryContextSwitchTo(oldctxt); |
| } |
| |
| |
| /* |
| * dumptuples - remove tuples from heap and write to tape |
| * |
| * This is used during initial-run building, but not during merging. |
| * |
| * When alltuples = false, dump only enough tuples to get under the |
| * availMem limit (and leave at least one tuple in the heap in any case, |
| * since puttuple assumes it always has a tuple to compare to). We also |
| * insist there be at least one free slot in the memtuples[] array. |
| * |
| * When alltuples = true, dump everything currently in memory. |
| * (This case is only used at end of input data.) |
| * |
| * If we empty the heap, close out the current run and return (this should |
| * only happen at end of input data). If we see that the tuple run number |
| * at the top of the heap has changed, start a new run. |
| */ |
| void |
| dumptuples_mk(Tuplesortstate_mk *state, bool alltuples) |
| { |
| int i; |
| int bDumped = 0; |
| LogicalTape *lt = NULL; |
| MKEntry e; |
| |
| Assert(is_under_sort_ctxt(state)); |
| |
| if(alltuples && !state->mkheap) |
| { |
| /* Dump a sorted array. Must be shared input that sends up here */ |
| |
| /* ShareInput or sort: The sort may sort nothing, we still |
| * need to handle it here |
| */ |
| if(state->entry_count != 0) |
| { |
| lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->destTape]); |
| |
| if ( state->mkctxt.unique) |
| { |
| for(i=0; i<state->entry_count; ++i) |
| if ( ! mke_is_empty(&state->entries[i])) /* can be empty because the qsort may have marked duplicates */ |
| WRITETUP(state, lt, &state->entries[i]); |
| } |
| else |
| { |
| for(i=0; i<state->entry_count; ++i) |
| WRITETUP(state, lt, &state->entries[i]); |
| } |
| } |
| |
| markrunend(state, state->tp_tapenum[state->destTape]); |
| state->currentRun++; |
| state->tp_runs[state->destTape]++; |
| state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ |
| return; |
| } |
| |
| /* OK. Normal case */ |
| Assert(state->mkheap); |
| while (1) |
| { |
| if (mkheap_empty(state->mkheap)) |
| { |
| markrunend(state, state->tp_tapenum[state->destTape]); |
| state->currentRun++; |
| state->tp_runs[state->destTape]++; |
| state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ |
| |
| break; |
| } |
| |
| if (!bDumped) |
| bDumped = 1; |
| |
| /* |
| * Dump the heap's frontmost entry, and sift up to remove it from the |
| * heap. |
| */ |
| Assert(!mkheap_empty(state->mkheap)); |
| lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->destTape]); |
| |
| mke_set_empty(&e); |
| mke_set_run(&e, state->currentRun+1); |
| mkheap_putAndGet(state->mkheap, &e); |
| |
| Assert(!mke_is_empty(&e)); |
| |
| WRITETUP(state, lt, &e); |
| |
| /* |
| * If the heap is empty *or* top run number has changed, we've |
| * finished the current run. |
| */ |
| if(mkheap_empty(state->mkheap) || !mkheap_run_match(state->mkheap, state->currentRun)) |
| { |
| #ifdef USE_ASSERT_CHECKING |
| mkheap_verify_heap(state->mkheap, 0); |
| #endif |
| markrunend(state, state->tp_tapenum[state->destTape]); |
| state->currentRun++; |
| state->tp_runs[state->destTape]++; |
| state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ |
| |
| if (trace_sort) |
| PG_TRACE3(tuplesort__dumptuples, state->entry_count, state->currentRun, state->destTape); |
| |
| /* |
| * Done if heap is empty, else prepare for new run. |
| */ |
| if (mkheap_empty(state->mkheap)) |
| break; |
| |
| Assert(mkheap_run_match(state->mkheap, state->currentRun)); |
| selectnewtape_mk(state); |
| } |
| } |
| |
| if(state->gpmon_pkt) |
| tuplesort_checksend_gpmonpkt(state->gpmon_pkt, state->gpmon_sort_tick); |
| } |
| |
| /* |
| * Put pos at the begining of the tuplesort. Create pos->work_tape if necessary |
| */ |
| void |
| tuplesort_rescan_pos_mk(Tuplesortstate_mk *state, TuplesortPos_mk *pos) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| Assert(state->randomAccess); |
| |
| switch (state->status) |
| { |
| case TSS_SORTEDINMEM: |
| pos->current = 0; |
| pos->eof_reached = false; |
| pos->markpos.mempos = 0; |
| pos->markpos_eof = false; |
| pos->cur_work_tape = NULL; |
| break; |
| case TSS_SORTEDONTAPE: |
| if(pos == &state->pos) |
| { |
| Assert(pos->cur_work_tape == NULL); |
| LogicalTapeRewind(state->tapeset, state->result_tape, false); |
| } |
| else |
| { |
| if(pos->cur_work_tape == NULL) |
| pos->cur_work_tape = state->result_tape; |
| |
| LogicalTapeRewind(state->tapeset, pos->cur_work_tape, false); |
| } |
| |
| pos->eof_reached = false; |
| pos->markpos.tapepos.blkNum = 0L; |
| pos->markpos.tapepos.offset = 0; |
| pos->markpos_eof = false; |
| break; |
| default: |
| elog(ERROR, "invalid tuplesort state"); |
| break; |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * tuplesort_rescan - rewind and replay the scan |
| */ |
| |
| void tuplesort_rescan_mk(Tuplesortstate_mk *state) |
| { |
| tuplesort_rescan_pos_mk(state, &state->pos); |
| } |
| |
| /* |
| * tuplesort_markpos - saves current position in the merged sort file |
| */ |
| void |
| tuplesort_markpos_pos_mk(Tuplesortstate_mk *state, TuplesortPos_mk *pos) |
| { |
| LogicalTape *work_tape = NULL; |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| Assert(state->randomAccess); |
| |
| switch (state->status) |
| { |
| case TSS_SORTEDINMEM: |
| pos->markpos.mempos = pos->current; |
| pos->markpos_eof = pos->eof_reached; |
| break; |
| case TSS_SORTEDONTAPE: |
| AssertEquivalent(pos == &state->pos, pos->cur_work_tape == NULL); |
| work_tape = pos->cur_work_tape == NULL ? state->result_tape : pos->cur_work_tape; |
| LogicalTapeTell(state->tapeset, work_tape, &pos->markpos.tapepos); |
| pos->markpos_eof = pos->eof_reached; |
| break; |
| default: |
| elog(ERROR, "invalid tuplesort state"); |
| break; |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| void |
| tuplesort_markpos_mk(Tuplesortstate_mk *state) |
| { |
| tuplesort_markpos_pos_mk(state, &state->pos); |
| } |
| |
| /* |
| * tuplesort_restorepos - restores current position in merged sort file to |
| * last saved position |
| */ |
| void |
| tuplesort_restorepos_pos_mk(Tuplesortstate_mk *state, TuplesortPos_mk *pos) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); |
| |
| Assert(state->randomAccess); |
| |
| switch (state->status) |
| { |
| case TSS_SORTEDINMEM: |
| pos->current = pos->markpos.mempos; |
| pos->eof_reached = pos->markpos_eof; |
| break; |
| case TSS_SORTEDONTAPE: |
| AssertEquivalent(pos == &state->pos, pos->cur_work_tape == NULL); |
| { |
| LogicalTape *work_tape = pos->cur_work_tape == NULL ? state->result_tape : pos->cur_work_tape; |
| bool fSeekOK = LogicalTapeSeek(state->tapeset, work_tape, &pos->markpos.tapepos); |
| |
| if(!fSeekOK) |
| elog(ERROR, "tuplesort_restorepos failed"); |
| pos->eof_reached = pos->markpos_eof; |
| } |
| break; |
| default: |
| elog(ERROR, "invalid tuplesort state"); |
| break; |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| void |
| tuplesort_restorepos_mk(Tuplesortstate_mk *state) |
| { |
| tuplesort_restorepos_pos_mk(state, &state->pos); |
| } |
| |
| /* |
| * Tape interface routines |
| */ |
| static uint32 |
| getlen(Tuplesortstate_mk *state, TuplesortPos_mk *pos, LogicalTape *lt, bool eofOK) |
| { |
| uint32 len; |
| size_t readSize; |
| |
| Assert(lt); |
| readSize = LogicalTapeRead(state->tapeset, lt, (void *)&len, sizeof(len)); |
| |
| if(readSize != sizeof(len)) |
| { |
| Assert(!"Catch me"); |
| elog(ERROR, "unexpected end of tape"); |
| } |
| |
| if (len == 0 && !eofOK) |
| elog(ERROR, "unexpected end of data"); |
| return len; |
| } |
| |
| static void |
| markrunend(Tuplesortstate_mk *state, int tapenum) |
| { |
| uint32 len = 0; |
| LogicalTape *lt = LogicalTapeSetGetTape(state->tapeset, tapenum); |
| |
| LogicalTapeWrite(state->tapeset, lt, (void *) &len, sizeof(len)); |
| } |
| |
| |
| /* |
| * Inline-able copy of FunctionCall2() to save some cycles in sorting. |
| */ |
| static inline Datum |
| myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2) |
| { |
| FunctionCallInfoData fcinfo; |
| Datum result; |
| |
| InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL); |
| |
| fcinfo.arg[0] = arg1; |
| fcinfo.arg[1] = arg2; |
| fcinfo.argnull[0] = false; |
| fcinfo.argnull[1] = false; |
| |
| result = FunctionCallInvoke(&fcinfo); |
| |
| /* Check for null result, since caller is clearly not expecting one */ |
| if (fcinfo.isnull) |
| elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid); |
| |
| return result; |
| } |
| |
| /* |
| * Apply a sort function (by now converted to fmgr lookup form) |
| * and return a 3-way comparison result. This takes care of handling |
| * NULLs and sort ordering direction properly. |
| */ |
| static inline int32 |
| inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind, |
| Datum datum1, bool isNull1, |
| Datum datum2, bool isNull2) |
| { |
| switch (kind) |
| { |
| case SORTFUNC_LT: |
| if (isNull1) |
| { |
| if (isNull2) |
| return 0; |
| return 1; /* NULL sorts after non-NULL */ |
| } |
| if (isNull2) |
| return -1; |
| if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2))) |
| return -1; /* a < b */ |
| if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1))) |
| return 1; /* a > b */ |
| return 0; |
| |
| case SORTFUNC_REVLT: |
| /* We reverse the ordering of NULLs, but not the operator */ |
| if (isNull1) |
| { |
| if (isNull2) |
| return 0; |
| return -1; /* NULL sorts before non-NULL */ |
| } |
| if (isNull2) |
| return 1; |
| if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2))) |
| return -1; /* a < b */ |
| if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1))) |
| return 1; /* a > b */ |
| return 0; |
| |
| case SORTFUNC_CMP: |
| if (isNull1) |
| { |
| if (isNull2) |
| return 0; |
| return 1; /* NULL sorts after non-NULL */ |
| } |
| if (isNull2) |
| return -1; |
| return DatumGetInt32(myFunctionCall2(sortFunction, |
| datum1, datum2)); |
| |
| case SORTFUNC_REVCMP: |
| if (isNull1) |
| { |
| if (isNull2) |
| return 0; |
| return -1; /* NULL sorts before non-NULL */ |
| } |
| if (isNull2) |
| return 1; |
| return -DatumGetInt32(myFunctionCall2(sortFunction, |
| datum1, datum2)); |
| |
| default: |
| elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind); |
| return 0; /* can't get here, but keep compiler quiet */ |
| } |
| } |
| |
| static void |
| copytup_heap(Tuplesortstate_mk *state, MKEntry *e, void *tup) |
| { |
| /* |
| * We expect the passed "tup" to be a TupleTableSlot, and form a |
| * MinimalTuple using the exported interface for that. |
| */ |
| TupleTableSlot *slot = (TupleTableSlot *) tup; |
| |
| slot_getallattrs(slot); |
| e->ptr = (void *) memtuple_form_to(state->mt_bind, |
| slot_get_values(slot), |
| slot_get_isnull(slot), |
| NULL, NULL, false |
| ); |
| |
| state->totalTupleBytes += memtuple_get_size((MemTuple)e->ptr, NULL); |
| |
| Assert(state->mt_bind); |
| } |
| |
| /* |
| * Since MinimalTuple already has length in its first word, we don't need |
| * to write that separately. |
| */ |
| static long |
| writetup_heap(Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e) |
| { |
| uint32 tuplen = memtuple_get_size(e->ptr, NULL); |
| long ret = tuplen; |
| LogicalTapeWrite(state->tapeset, lt, e->ptr, tuplen); |
| |
| if (state->randomAccess) /* need trailing length word? */ |
| { |
| LogicalTapeWrite(state->tapeset, lt, (void *) &tuplen, sizeof(tuplen)); |
| ret += sizeof(tuplen); |
| } |
| |
| if (state->gpmon_pkt) |
| { |
| Gpmon_M_Incr(state->gpmon_pkt, GPMON_SORT_SPILLTUPLE); |
| Gpmon_M_Add(state->gpmon_pkt, GPMON_SORT_SPILLBYTE, tuplen); |
| Gpmon_M_Incr(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_TUPLE); |
| Gpmon_M_Add(state->gpmon_pkt, GPMON_SORT_CURRSPILLPASS_BYTE, tuplen); |
| } |
| |
| pfree(e->ptr); |
| e->ptr = NULL; |
| |
| return ret; |
| } |
| |
| static void |
| freetup_heap(MKEntry *e) |
| { |
| pfree(e->ptr); |
| e->ptr = NULL; |
| } |
| |
| static void |
| readtup_heap(Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, LogicalTape *lt, uint32 len) |
| { |
| uint32 tuplen; |
| size_t readSize; |
| |
| Assert(is_under_sort_ctxt(state)); |
| |
| MemSet(e, 0, sizeof(MKEntry)); |
| e->ptr = palloc(memtuple_size_from_uint32(len)); |
| |
| memtuple_set_mtlen((MemTuple)e->ptr, NULL, len); |
| |
| Assert(lt); |
| readSize = LogicalTapeRead(state->tapeset, lt, |
| (void *) ((char *) e->ptr + sizeof(uint32)), |
| memtuple_size_from_uint32(len) - sizeof(uint32)); |
| |
| if (readSize != (size_t) (memtuple_size_from_uint32(len) - sizeof(uint32))) |
| elog(ERROR, "unexpected end of data"); |
| |
| if (state->randomAccess) /* need trailing length word? */ |
| { |
| readSize = LogicalTapeRead(state->tapeset, lt, (void *)&tuplen, sizeof(tuplen)); |
| |
| if(readSize != sizeof(tuplen)) |
| elog(ERROR, "unexpected end of data"); |
| } |
| |
| /* For shareinput on sort, the reader will not set mt_bind. In this case, |
| * we will not call compare. |
| */ |
| AssertImply(!state->mt_bind, state->status == TSS_SORTEDONTAPE); |
| } |
| |
| static void |
| copytup_index(Tuplesortstate_mk *state, MKEntry *e, void *tup) |
| { |
| IndexTuple tuple = (IndexTuple) tup; |
| uint32 tuplen = IndexTupleSize(tuple); |
| |
| /* copy the tuple into sort storage */ |
| e->ptr = palloc(tuplen); |
| memcpy(e->ptr, tuple, tuplen); |
| |
| state->totalTupleBytes += tuplen; |
| } |
| |
| static long |
| writetup_index(Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e) |
| { |
| IndexTuple tuple = (IndexTuple) e->ptr; |
| uint32 tuplen = IndexTupleSize(tuple) + sizeof(tuplen); |
| long ret = tuplen; |
| |
| LogicalTapeWrite(state->tapeset, lt, (void *) &tuplen, sizeof(tuplen)); |
| LogicalTapeWrite(state->tapeset, lt, (void *) tuple, IndexTupleSize(tuple)); |
| |
| if (state->randomAccess) /* need trailing length word? */ |
| { |
| LogicalTapeWrite(state->tapeset, lt, (void *) &tuplen, sizeof(tuplen)); |
| ret += sizeof(tuplen); |
| } |
| |
| pfree(tuple); |
| e->ptr = NULL; |
| return ret; |
| } |
| |
| static void |
| freetup_index(MKEntry *e) |
| { |
| pfree(e->ptr); |
| e->ptr = NULL; |
| } |
| |
| static void |
| readtup_index(Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, |
| LogicalTape *lt, uint32 len) |
| { |
| uint32 tuplen = len - sizeof(uint32); |
| IndexTuple tuple = (IndexTuple) palloc(tuplen); |
| size_t readSize; |
| |
| Assert(lt); |
| |
| MemSet(e, 0, sizeof(MKEntry)); |
| readSize = LogicalTapeRead(state->tapeset, lt, (void *)tuple, tuplen); |
| |
| if(readSize != tuplen) |
| elog(ERROR, "unexpected end of data"); |
| |
| if (state->randomAccess) /* need trailing length word? */ |
| { |
| readSize = LogicalTapeRead(state->tapeset, lt, (void *)&tuplen, sizeof(tuplen)); |
| |
| if (readSize != sizeof(tuplen)) |
| elog(ERROR, "unexpected end of data"); |
| } |
| e->ptr = (void *) tuple; |
| } |
| |
| |
| static void |
| copytup_datum(Tuplesortstate_mk *state, MKEntry *e, void *tup) |
| { |
| /* Not currently needed */ |
| elog(ERROR, "copytup_datum() should not be called"); |
| } |
| |
| static long |
| writetup_datum(Tuplesortstate_mk *state, LogicalTape *lt, MKEntry *e) |
| { |
| void *waddr; |
| uint32 tuplen; |
| uint32 writtenlen; |
| bool needfree = false; |
| long ret; |
| |
| if (mke_is_null(e)) |
| { |
| waddr = NULL; |
| tuplen = 0; |
| } |
| else if (state->datumTypeByVal) |
| { |
| waddr = &e->d; |
| tuplen = sizeof(Datum); |
| } |
| else |
| { |
| waddr = DatumGetPointer(e->d); |
| tuplen = datumGetSize(e->d, false, state->datumTypeLen); |
| needfree = true; |
| Assert(tuplen != 0); |
| } |
| |
| writtenlen = tuplen + sizeof(uint32); |
| ret = writtenlen; |
| |
| LogicalTapeWrite(state->tapeset, lt, (void *) &writtenlen, sizeof(writtenlen)); |
| LogicalTapeWrite(state->tapeset, lt, waddr, tuplen); |
| |
| if (state->randomAccess) /* need trailing length word? */ |
| { |
| LogicalTapeWrite(state->tapeset, lt, (void *) &writtenlen, sizeof(writtenlen)); |
| ret += sizeof(writtenlen); |
| } |
| |
| if (needfree) |
| pfree(waddr); |
| |
| return ret; |
| } |
| |
| /** |
| * No-op free: does nothing! |
| */ |
| static void |
| freetup_noop(MKEntry *e) |
| { |
| |
| } |
| |
| /** |
| * Free the datum if non-null, should only be used for by-reference datums |
| */ |
| static void |
| freetup_datum(MKEntry *e) |
| { |
| if (!mke_is_null(e)) |
| { |
| void *waddr = DatumGetPointer(e->d); |
| pfree(waddr); |
| } |
| } |
| |
| |
| static void |
| readtup_datum(Tuplesortstate_mk *state, TuplesortPos_mk *pos, MKEntry *e, |
| LogicalTape *lt, uint32 len) |
| { |
| size_t readSize; |
| uint32 tuplen = len - sizeof(uint32); |
| |
| Assert(lt); |
| |
| MemSet(e, 0, sizeof(MKEntry)); |
| |
| if (tuplen == 0) |
| mke_set_null(e, state->nullfirst); |
| else if (state->datumTypeByVal) |
| { |
| mke_set_not_null(e); |
| |
| Assert(tuplen == sizeof(Datum)); |
| readSize = LogicalTapeRead(state->tapeset, lt, (void *)&e->d, tuplen); |
| |
| if (readSize != tuplen) |
| elog(ERROR, "unexpected end of data"); |
| } |
| else |
| { |
| void *raddr = palloc(tuplen); |
| |
| readSize = LogicalTapeRead(state->tapeset, lt, raddr, tuplen); |
| |
| if(readSize != tuplen) |
| elog(ERROR, "unexpected end of data"); |
| |
| mke_set_not_null(e); |
| e->d = PointerGetDatum(raddr); |
| } |
| |
| if (state->randomAccess) /* need trailing length word? */ |
| { |
| readSize = LogicalTapeRead(state->tapeset, lt, (void *)&tuplen, sizeof(tuplen)); |
| |
| if (readSize != sizeof(tuplen)) |
| elog(ERROR, "unexpected end of data"); |
| } |
| } |
| |
| typedef struct refcnt_locale_str |
| { |
| int ref; |
| short xfrm_pos; |
| char isPrefixOnly; |
| char data[1]; |
| } refcnt_locale_str; |
| |
| typedef struct refcnt_locale_str_k |
| { |
| int ref; |
| short xfrm_pos; |
| char isPrefixOnly; |
| char data[16000]; |
| } refcnt_locale_str_k; |
| |
| /** |
| * Compare the datums for the given level. It is assumed that each entry has been prepared |
| * for the given level. |
| */ |
| int tupsort_compare_datum(MKEntry *v1, MKEntry *v2, MKLvContext *lvctxt, MKContext *context) |
| { |
| Assert(!mke_is_null(v1)); |
| Assert(!mke_is_null(v2)); |
| |
| switch(lvctxt->lvtype) |
| { |
| case MKLV_TYPE_NONE: |
| return inlineApplySortFunction(&lvctxt->fmgrinfo, lvctxt->sortfnkind, |
| v1->d, false, |
| v2->d, false |
| ); |
| case MKLV_TYPE_INT32: |
| { |
| int32 i1 = DatumGetInt32(v1->d); |
| int32 i2 = DatumGetInt32(v2->d); |
| int result = (i1 < i2) ? -1 : ((i1 == i2) ? 0 : 1); |
| return (lvctxt->sortfnkind == SORTFUNC_CMP) ? result : -result; |
| } |
| default: |
| return tupsort_compare_char(v1, v2, lvctxt, context); |
| } |
| |
| Assert(!"Never reach here"); |
| return 0; |
| } |
| |
| void tupsort_cpfr(MKEntry *dst, MKEntry *src, MKLvContext *lvctxt) |
| { |
| Assert(dst); |
| if(mke_is_refc(dst)) |
| { |
| Assert(dst->d != 0); |
| Assert(!mke_is_copied(dst)); |
| tupsort_refcnt(DatumGetPointer(dst->d), -1); |
| } |
| |
| if(mke_is_copied(dst)) |
| { |
| Assert(dst->d != 0); |
| Assert(!mke_is_refc(dst)); |
| pfree(DatumGetPointer(dst->d)); |
| } |
| |
| mke_clear_refc_copied(dst); |
| |
| if(src) |
| { |
| *dst = *src; |
| if(!mke_is_null(src)) |
| { |
| if(mke_is_refc(src)) |
| tupsort_refcnt(DatumGetPointer(dst->d), 1); |
| else if(!lvctxt->typByVal) |
| { |
| Assert(src->d != 0); |
| dst->d = datumCopy(src->d, lvctxt->typByVal, lvctxt->typLen); |
| mke_set_copied(dst); |
| } |
| } |
| } |
| } |
| |
| static void tupsort_refcnt(void *vp, int ref) |
| { |
| refcnt_locale_str *p = (refcnt_locale_str *) vp; |
| |
| Assert(p && p->ref > 0); |
| Assert(ref == 1 || ref == -1); |
| |
| if(ref == 1) |
| ++p->ref; |
| else |
| { |
| if(--p->ref == 0) |
| pfree(p); |
| } |
| } |
| |
| static int tupsort_compare_char(MKEntry *v1, MKEntry *v2, MKLvContext *lvctxt, MKContext *mkContext) |
| { |
| int result = 0; |
| |
| refcnt_locale_str *p1 = (refcnt_locale_str *) DatumGetPointer(v1->d); |
| refcnt_locale_str *p2 = (refcnt_locale_str *) DatumGetPointer(v2->d); |
| |
| Assert(!mke_is_null(v1)); |
| Assert(!mke_is_null(v2)); |
| |
| Assert(!lc_collate_is_c()); |
| Assert(lvctxt->sortfnkind == SORTFUNC_CMP || lvctxt->sortfnkind == SORTFUNC_REVCMP); |
| Assert(mkContext->fetchForPrep); |
| |
| Assert(p1->ref > 0 && p2->ref > 0); |
| |
| if(p1 == p2) |
| { |
| Assert(p1->ref >= 2); |
| result = 0; |
| } |
| else |
| { |
| result = strcmp(p1->data + p1->xfrm_pos, p2->data + p2->xfrm_pos); |
| |
| if(result == 0) |
| { |
| if ( p1->isPrefixOnly || p2->isPrefixOnly) |
| { |
| /* |
| * only prefixes were equal so we must compare more of the strings |
| * |
| * do this by getting the true datum and calling the built-in comparison |
| * function |
| */ |
| Datum p1Original, p2Original; |
| bool p1IsNull, p2IsNull; |
| |
| p1Original = (mkContext->fetchForPrep)(v1, mkContext, lvctxt, &p1IsNull); |
| p2Original = (mkContext->fetchForPrep)(v2, mkContext, lvctxt, &p2IsNull); |
| |
| Assert(!p1IsNull); /* should not have been prepared if null */ |
| Assert(!p2IsNull); |
| |
| result = inlineApplySortFunction(&lvctxt->fmgrinfo, lvctxt->sortfnkind, |
| p1Original, false, p2Original, false ); |
| } |
| else |
| { |
| /* |
| * See varstr_cmp for comment on comparing str with locale. |
| * Essentially, for some locale, strcoll may return eq even if |
| * original str are different. |
| */ |
| result = strcmp(p1->data, p2->data); |
| } |
| } |
| |
| /* The values were equal -- de-dupe them */ |
| if(result == 0) |
| { |
| if(p1->ref >= p2->ref) |
| { |
| v2->d = v1->d; |
| tupsort_refcnt(p1, 1); |
| tupsort_refcnt(p2, -1); |
| } |
| else |
| { |
| v1->d = v2->d; |
| tupsort_refcnt(p2, 1); |
| tupsort_refcnt(p1, -1); |
| } |
| } |
| } |
| |
| return (lvctxt->sortfnkind == SORTFUNC_CMP) ? result : -result; |
| } |
| |
| static int32 estimateMaxPrepareSizeForEntry(MKEntry *a, struct MKContext *mkContext) |
| { |
| int result = 0; |
| int lv; |
| |
| Assert(mkContext->fetchForPrep != NULL); |
| |
| for ( lv = 0; lv < mkContext->total_lv; lv++) |
| { |
| MKLvContext *level = mkContext->lvctxt + lv; |
| MKLvType levelType = level->lvtype; |
| |
| if (levelType == MKLV_TYPE_CHAR || |
| levelType == MKLV_TYPE_TEXT ) |
| { |
| bool isnull; |
| Datum d = (mkContext->fetchForPrep)(a, mkContext, level, &isnull); |
| |
| if ( ! isnull ) |
| { |
| int amountThisDatum = estimatePrepareSpaceForChar(mkContext, a, d, levelType == MKLV_TYPE_CHAR); |
| if ( amountThisDatum > result ) |
| result = amountThisDatum; |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| static Datum tupsort_fetch_datum_mtup(MKEntry *a, MKContext *mkctxt, MKLvContext *lvctxt, bool *isNullOut) |
| { |
| Datum d = memtuple_getattr( |
| (MemTuple) a->ptr, |
| mkctxt->mt_bind, |
| lvctxt->attno, |
| isNullOut |
| ); |
| return d; |
| } |
| |
| static Datum tupsort_fetch_datum_itup(MKEntry *a, MKContext *mkctxt, MKLvContext *lvctxt, bool *isNullOut) |
| { |
| Datum d = index_getattr( |
| (IndexTuple) a->ptr, |
| lvctxt->attno, |
| mkctxt->tupdesc, |
| isNullOut |
| ); |
| return d; |
| } |
| |
| void tupsort_prepare(MKEntry *a, MKContext *mkctxt, int lv) |
| { |
| MKLvContext *lvctxt = mkctxt->lvctxt + lv; |
| bool isnull; |
| |
| Assert(mkctxt->fetchForPrep != NULL); |
| |
| tupsort_cpfr(a, NULL, lvctxt); |
| |
| a->d = (mkctxt->fetchForPrep)(a, mkctxt, lvctxt, &isnull); |
| |
| if(!isnull) |
| mke_set_not_null(a); |
| else |
| mke_set_null(a, lvctxt->nullfirst); |
| |
| if (lvctxt->lvtype == MKLV_TYPE_CHAR) |
| tupsort_prepare_char(a, true); |
| else if (lvctxt->lvtype == MKLV_TYPE_TEXT) |
| tupsort_prepare_char(a, false); |
| } |
| |
| /* "True" length (not counting trailing blanks) of a BpChar */ |
| static inline int bcTruelen(char *p, int len) |
| { |
| int i; |
| |
| for (i = len - 1; i >= 0; i--) |
| { |
| if (p[i] != ' ') |
| break; |
| } |
| return i + 1; |
| } |
| |
| /** |
| * should only be called for non-null Datum (caller must check the isnull flag from the fetch) |
| */ |
| static int32 estimatePrepareSpaceForChar(struct MKContext *mkContext, MKEntry *e, Datum d, bool isCHAR) |
| { |
| int len, transformedLength, retlen; |
| |
| if(isCHAR) |
| { |
| char *p; |
| void *toFree; |
| |
| varattrib_untoast_ptr_len(d, &p, &len, &toFree); |
| len = bcTruelen(p, len); |
| |
| if ( toFree ) |
| pfree(toFree); |
| } |
| else |
| { |
| /* since we don't need the data for checking the true length, just unpack enough to get the length |
| * this may avoid decompression |
| */ |
| len = varattrib_untoast_len(d); |
| } |
| |
| /* figure out how much space transformed version will take */ |
| transformedLength = len * mkContext->strxfrmScaleFactor + mkContext->strxfrmConstantFactor; |
| if ( len > STRXFRM_INPUT_LENGTH_LIMIT) |
| { |
| if ( transformedLength > STRXFRM_INPUT_LENGTH_LIMIT) |
| transformedLength = STRXFRM_INPUT_LENGTH_LIMIT; |
| /* we do not store the raw data for long input strings (in part because |
| * of compression producing a long input string from a much smaller datum) */ |
| len = 0; |
| } |
| |
| retlen = offsetof(refcnt_locale_str, data) + len + 1 + transformedLength + 1; |
| return retlen; |
| } |
| |
| /** |
| * Prepare a character string by copying the datum out to a null-terminated string and |
| * then also keeping a strxfrmed copy of it. |
| * |
| * If the string is long then the copied out value is not kept and only the prefix of the strxfrm |
| * is kept. |
| * |
| * Note that this must be in sync with the estimation function. |
| */ |
| static void tupsort_prepare_char(MKEntry *a, bool isCHAR) |
| { |
| char *p; |
| void *tofree = NULL; |
| int len; |
| int lenToStore; |
| int transformedLenToStore; |
| int retlen; |
| |
| refcnt_locale_str *ret; |
| refcnt_locale_str_k kstr; |
| int avail = sizeof(kstr.data); |
| bool storePrefixOnly; |
| |
| Assert(!lc_collate_is_c()); |
| |
| if(mke_is_null(a)) |
| return; |
| |
| varattrib_untoast_ptr_len(a->d, &p, &len, &tofree); |
| |
| if(isCHAR) |
| len = bcTruelen(p, len); |
| |
| if ( len > STRXFRM_INPUT_LENGTH_LIMIT) |
| { |
| /* too long? store prefix of strxfrm only and DON'T store copy of original string */ |
| storePrefixOnly = true; |
| lenToStore = 0; |
| } |
| else |
| { |
| storePrefixOnly = false; |
| lenToStore = len; |
| } |
| |
| /* This assertion is true because avail is larger than |
| * STRXFRM_INPUT_LENGTH_LIMIT, which is the max of lenToStore ... */ |
| Assert(lenToStore <= avail - 2); |
| Assert(lenToStore < 32768); /* so it will fit in a short (xfrm_pos field) */ |
| Assert(STRXFRM_INPUT_LENGTH_LIMIT < 32768); /* so it will fit in a short (xfrm_pos field) */ |
| |
| kstr.ref = 1; |
| kstr.xfrm_pos = lenToStore+1; |
| memcpy(kstr.data, p, lenToStore); |
| kstr.data[lenToStore] = '\0'; |
| |
| avail -= lenToStore + 1; |
| Assert(avail > 0); |
| |
| /* |
| * String transformation. |
| */ |
| if ( storePrefixOnly) |
| { |
| /* prefix only: we haven't copied from p into a null-terminated string so do that now |
| */ |
| char *possibleStr = kstr.data + kstr.xfrm_pos + STRXFRM_INPUT_LENGTH_LIMIT + 2; |
| char *str; |
| |
| if ( avail >= STRXFRM_INPUT_LENGTH_LIMIT + 1 + len + 1 ) |
| { |
| /* will segment this so first part is xformed string and next is the throw-away string |
| * to be passed to strxfrm */ |
| str = possibleStr; |
| } |
| else |
| { |
| str = palloc(len + 1); |
| } |
| memcpy(str, p, len); |
| str[len] = '\0'; |
| |
| /* |
| * transform, but limit length of transformed string |
| */ |
| Assert(avail >= STRXFRM_INPUT_LENGTH_LIMIT + 1); |
| transformedLenToStore = (int) gp_strxfrm(kstr.data+kstr.xfrm_pos, str, STRXFRM_INPUT_LENGTH_LIMIT + 1); |
| |
| if ( transformedLenToStore > STRXFRM_INPUT_LENGTH_LIMIT) |
| { |
| transformedLenToStore = STRXFRM_INPUT_LENGTH_LIMIT; |
| |
| /* this is required for linux -- when there is not enough room then linux |
| * does NOT write the \0 |
| */ |
| kstr.data[kstr.xfrm_pos + transformedLenToStore] = '\0'; |
| } |
| |
| if ( str != possibleStr ) |
| pfree(str); |
| } |
| else transformedLenToStore = (int) gp_strxfrm(kstr.data+kstr.xfrm_pos, kstr.data, avail); |
| |
| /* |
| * Copy or transform into the result as needed |
| */ |
| retlen = offsetof(refcnt_locale_str, data) + lenToStore + 1 + transformedLenToStore + 1; |
| |
| ret = (refcnt_locale_str *) palloc(retlen); |
| |
| if(transformedLenToStore < avail) |
| { |
| memcpy(ret, &kstr, retlen); |
| Assert(ret->ref == 1); |
| Assert(ret->data[ret->xfrm_pos - 1] == '\0'); |
| Assert(ret->data[ret->xfrm_pos + transformedLenToStore] == '\0'); |
| } |
| else |
| { |
| /* note that when avail (determined by refcnt_locale_str_k.data) is much larger than |
| * STRXFRM_INPUT_LENGTH_LIMIT then this code won't be hit. |
| */ |
| memcpy(ret, &kstr, offsetof(refcnt_locale_str, data) + lenToStore + 1); |
| avail = retlen - offsetof(refcnt_locale_str, data) - lenToStore - 1; |
| Assert(avail > transformedLenToStore); |
| Assert(ret->ref == 1); |
| Assert(ret->xfrm_pos == len + 1); |
| Assert(ret->data[len] == '\0'); |
| |
| transformedLenToStore = (int) gp_strxfrm(ret->data+ret->xfrm_pos, kstr.data, avail); |
| Assert(transformedLenToStore < avail); |
| Assert(ret->data[ret->xfrm_pos + transformedLenToStore] == '\0'); |
| } |
| |
| if(tofree) |
| pfree(tofree); |
| |
| /* data string is length zero in this case (could just not even store it and have xfrm_pos == 0 |
| * but that complicates code more) |
| */ |
| AssertImply(storePrefixOnly, ret->data[0] == '\0' ); |
| |
| /* |
| * finalize result |
| */ |
| ret->isPrefixOnly = storePrefixOnly ? 1 : 0; |
| a->d = PointerGetDatum(ret); |
| mke_set_refc(a); |
| } |
| |
| /* |
| * tuplesort_inmem_limit_insert |
| * Adds a tuple for sorting when we are doing LIMIT sort and we (still) fit in memory |
| * If we're in here, it means we haven't exceeded memory yet. |
| * Three cases: |
| * - we're below LIMIT tuples. Then add to unsorted array, increase memory |
| * - we just hit LIMIT tuples. Switch to using a heap instead of array, so |
| * we only keep top LIMIT tuples. |
| * - we're above LIMIT tuples. Then add to heap, don't increase memory usage |
| */ |
| static void |
| tuplesort_inmem_limit_insert(Tuplesortstate_mk *state, MKEntry *entry) |
| { |
| Assert(state->mkctxt.limit > 0); |
| Assert(is_under_sort_ctxt(state)); |
| Assert(!mke_is_empty(entry)); |
| |
| Assert(state->status == TSS_INITIAL); |
| |
| if (state->mkheap == NULL) |
| { |
| Assert(state->entry_count < state->entry_allocsize); |
| |
| /* We haven't created the heap yet. |
| * First add element to unsorted array so we don't lose it. |
| * Then, we have two cases: |
| * A. if we're under limit, we're done, iterate. |
| * B. We just hit the limit. Create heap. Add to heap. |
| */ |
| |
| state->entries[state->entry_count++] = *entry; |
| state->numTuplesInMem++; |
| |
| if (state->entry_count == state->mkctxt.limit) |
| { |
| /* B: just hit limit. Create heap from array */ |
| state->mkheap = mkheap_from_array(state->entries, |
| state->entry_count, |
| state->entry_count, |
| &state->mkctxt); |
| state->entries = NULL; |
| state->entry_allocsize = 0; |
| state->entry_count = 0; |
| } |
| } |
| else |
| { |
| /* We already have the heap (and it fits in memory). |
| * Simply add to the heap, keeping only top LIMIT elements. |
| */ |
| Assert(state->mkheap && !mkheap_empty(state->mkheap)); |
| Assert(state->mkheap->count == state->numTuplesInMem); |
| Assert(state->entry_count == 0); |
| (void) mkheap_putAndGet(state->mkheap, entry); |
| |
| Assert(!mke_is_empty(entry)); |
| Assert(entry->ptr); |
| pfree(entry->ptr); |
| entry->ptr = NULL; |
| } |
| } |
| |
| /* |
| * tuplesort_inmem_nolimit_insert |
| * Adds a tuple for sorting when we are regular (no LIMIT) sort and we (still) fit in memory |
| * |
| * Simply add to the unsorted in-memory array. We'll either qsort or spill this later. |
| */ |
| static void |
| tuplesort_inmem_nolimit_insert(Tuplesortstate_mk *state, MKEntry *entry) |
| { |
| Assert(state->status == TSS_INITIAL); |
| Assert(state->mkctxt.limit == 0); |
| Assert(is_under_sort_ctxt(state)); |
| Assert(!mke_is_empty(entry)); |
| Assert(state->entry_count < state->entry_allocsize); |
| |
| state->entries[state->entry_count] = *entry; |
| if ( state->mkctxt.fetchForPrep) |
| { |
| state->mkctxt.estimatedExtraForPrep += estimateMaxPrepareSizeForEntry(&state->entries[state->entry_count], |
| &state->mkctxt); |
| } |
| state->entry_count++; |
| state->numTuplesInMem++; |
| |
| } |
| |
| static void tuplesort_heap_insert(Tuplesortstate_mk *state, MKEntry *e) |
| { |
| int ins; |
| Assert(state->mkheap); |
| Assert(is_under_sort_ctxt(state)); |
| |
| ins = mkheap_putAndGet_run(state->mkheap, e, state->currentRun); |
| tupsort_cpfr(e, NULL, &state->mkctxt.lvctxt[mke_get_lv(e)]); |
| |
| Assert(ins >= 0); |
| |
| { |
| LogicalTape *lt = LogicalTapeSetGetTape(state->tapeset, state->tp_tapenum[state->destTape]); |
| |
| WRITETUP(state, lt, e); |
| |
| if(!mkheap_run_match(state->mkheap, state->currentRun)) |
| { |
| markrunend(state, state->tp_tapenum[state->destTape]); |
| state->currentRun++; |
| state->tp_runs[state->destTape]++; |
| state->tp_dummy[state->destTape]--; |
| |
| selectnewtape_mk(state); |
| } |
| } |
| } |
| |
| static void tuplesort_limit_sort(Tuplesortstate_mk *state) |
| { |
| Assert(state->mkctxt.limit > 0); |
| Assert(is_under_sort_ctxt(state)); |
| |
| if(!state->mkheap) |
| { |
| Assert(state->entry_count <= state->mkctxt.limit); |
| mk_qsort(state->entries, state->entry_count, &state->mkctxt); |
| return; |
| } |
| else |
| { |
| int i; |
| |
| #ifdef USE_ASSERT_CHECKING |
| mkheap_verify_heap(state->mkheap, 0); |
| #endif |
| state->entry_allocsize = mkheap_cnt(state->mkheap); |
| state->entries = palloc(state->entry_allocsize * sizeof (MKEntry)); |
| |
| /* Put these things to minimum */ |
| for(i=0; i<state->entry_allocsize; ++i) |
| mke_set_comp_max(state->entries+i); |
| |
| for(i=state->entry_allocsize-1; i>=0; --i) |
| mkheap_putAndGet(state->mkheap, state->entries+i); |
| |
| Assert(mkheap_cnt(state->mkheap) == 0); |
| state->entry_count = state->entry_allocsize; |
| } |
| |
| mkheap_destroy(state->mkheap); |
| state->mkheap = NULL; |
| } |
| |
| void |
| tuplesort_set_gpmon_mk(Tuplesortstate_mk *state, gpmon_packet_t *gpmon_pkt, int *gpmon_tick) |
| { |
| state->gpmon_pkt = gpmon_pkt; |
| state->gpmon_sort_tick = gpmon_tick; |
| } |
| |
| /* |
| * Write the metadata of a workfile set to disk |
| */ |
| void |
| tuplesort_write_spill_metadata_mk(Tuplesortstate_mk *state) |
| { |
| |
| if (state->status == TSS_SORTEDINMEM) |
| { |
| /* No spill files, nothing to save */ |
| return; |
| } |
| |
| /* We don't know how to handle TSS_FINALMERGE yet */ |
| Assert(state->status == TSS_SORTEDONTAPE); |
| |
| tuplesort_flush_mk(state); |
| } |
| |
| void tuplesort_set_spillfile_set_mk(Tuplesortstate_mk * state, workfile_set * sfs) |
| { |
| state->work_set = sfs; |
| } |
| |
| void tuplesort_read_spill_metadata_mk(Tuplesortstate_mk *state) |
| { |
| |
| Assert(state->work_set != NULL); |
| |
| MemoryContext oldctxt = MemoryContextSwitchTo(state->sortcontext); |
| |
| state->status = TSS_SORTEDONTAPE; |
| state->readtup = readtup_heap; |
| |
| /* Open saved spill file set and metadata. |
| * Initialize logical tape set to point to the right blocks. */ |
| state->tapeset_state_file = workfile_mgr_open_fileno(state->work_set, WORKFILE_NUM_MKSORT_METADATA); |
| ExecWorkFile *tape_file = workfile_mgr_open_fileno(state->work_set, WORKFILE_NUM_MKSORT_TAPESET); |
| |
| state->tapeset = LoadLogicalTapeSetState(state->tapeset_state_file, tape_file); |
| state->currentRun = 0; |
| state->result_tape = LogicalTapeSetGetTape(state->tapeset, 0); |
| |
| state->pos.eof_reached = false; |
| state->pos.markpos.tapepos.blkNum = 0; |
| state->pos.markpos.tapepos.offset = 0; |
| state->pos.markpos.mempos = 0; |
| state->pos.markpos_eof = false; |
| state->pos.cur_work_tape = NULL; |
| |
| MemoryContextSwitchTo(oldctxt); |
| } |
| |
| /* EOF */ |