blob: 130bb841fae98a67040776ada65800e0121167ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* tuplestore.c
* Generalized routines for temporary tuple storage.
*
* This module handles temporary storage of tuples for purposes such
* as Materialize nodes, hashjoin batch files, etc. It is essentially
* a dumbed-down version of tuplesort.c; it does no sorting of tuples
* but can only store and regurgitate a sequence of tuples. However,
* because no sort is required, it is allowed to start reading the sequence
* before it has all been written. This is particularly useful for cursors,
* because it allows random access within the already-scanned portion of
* a query without having to process the underlying scan to completion.
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
* The (approximate) amount of memory allowed to the tuplestore is specified
* in kilobytes by the caller. We absorb tuples and simply store them in an
* in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
* maxKBytes, we dump all the tuples into a temp file and then read from that
* when needed.
*
* When the caller requests random access to the data, we write the temp file
* in a format that allows either forward or backward scan. Otherwise, only
* forward scan is allowed. But rewind and markpos/restorepos are allowed
* in any case.
*
* Because we allow reading before writing is complete, there are two
* interesting positions in the temp file: the current read position and
* the current write position. At any given instant, the temp file's seek
* position corresponds to one of these, and the other one is remembered in
* the Tuplestore's state.
*
*
* Portions Copyright (c) 2007-2009, Greenplum inc
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.29.2.1 2007/08/02 17:48:54 neilc Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "executor/instrument.h" /* struct Instrumentation */
#include "storage/buffile.h"
#include "utils/memutils.h"
#include "utils/tuplestore.h"
#include "utils/debugbreak.h"
#include "cdb/cdbvars.h" /* currentSliceId */
/*
* Data structure describes the current postion of tuple store
*/
struct TuplestorePos
{
/*
* In state WRITEFILE, the current file seek position is the write point,
* and the read position is remembered in readpos_xxx; in state READFILE,
* the current file seek position is the read point, and the write
* position is remembered in writepos_xxx. (The write position is the
* same as EOF, but since BufFileSeek doesn't currently implement
* SEEK_END, we have to remember it explicitly.)
*
* Special case: if we are in WRITEFILE state and eof_reached is true,
* then the read position is implicitly equal to the write position (and
* hence to the file seek position); this way we need not update the
* readpos_xxx variables on each write.
*/
bool eof_reached; /* read reached EOF (always valid) */
int current; /* next array index (valid if INMEM) */
int64 readpos_offset; /* offset (valid if WRITEFILE and not eof) */
int64 writepos_offset; /* offset (valid if READFILE) */
/* markpos_xxx holds marked position for mark and restore */
int64 markpos_current; /* saved "current" */
int64 markpos_file; /* saved "readpos_file" */
int64 markpos_offset; /* saved "readpos_offset" */
};
/*
* Possible states of a Tuplestore object. These denote the states that
* persist between calls of Tuplestore routines.
*/
typedef enum
{
TSS_INMEM, /* Tuples still fit in memory */
TSS_WRITEFILE, /* Writing to temp file */
TSS_READFILE /* Reading from temp file */
} TupStoreStatus;
/*
* Private state of a Tuplestore operation.
*/
struct Tuplestorestate
{
TupStoreStatus status; /* enumerated value as shown above */
bool randomAccess; /* did caller request random access? */
bool interXact; /* keep open through transactions? */
long availMem; /* remaining memory available, in bytes */
BufFile *myfile; /* underlying file, or NULL if none */
/*
* These function pointers decouple the routines that must know what kind
* of tuple we are handling from the routines that don't need to know it.
* They are set up by the tuplestore_begin_xxx routines.
*
* (Although tuplestore.c currently only supports heap tuples, I've copied
* this part of tuplesort.c so that extension to other kinds of objects
* will be easy if it's ever needed.)
*
* Function to copy a supplied input tuple into palloc'd space. (NB: we
* assume that a single pfree() is enough to release the tuple later, so
* the representation must be "flat" in one palloc chunk.) state->availMem
* must be decreased by the amount of space used.
*/
void *(*copytup) (Tuplestorestate *state, TuplestorePos *pos, void *tup);
/*
* Function to write a stored tuple onto tape. The representation of the
* tuple on tape need not be the same as it is in memory; requirements on
* the tape representation are given below. After writing the tuple,
* pfree() it, and increase state->availMem by the amount of memory space
* thereby released.
*/
void (*writetup) (Tuplestorestate *state, TuplestorePos *pos, void *tup);
/*
* Function to read a stored tuple from tape back into memory. 'len' is
* the already-read length of the stored tuple. Create and return a
* palloc'd copy, and decrease state->availMem by the amount of memory
* space consumed.
*/
void *(*readtup) (Tuplestorestate *state, TuplestorePos *pos, uint32 len);
/*
* This array holds pointers to tuples in memory if we are in state INMEM.
* In states WRITEFILE and READFILE it's not used.
*/
void **memtuples; /* array of pointers to palloc'd tuples */
int memtupcount; /* number of tuples currently present */
int memtupsize; /* allocated length of memtuples array */
TuplestorePos pos;
/*
* These variables are used to keep track of the current position.
*
* In state WRITEFILE, the current file seek position is the write point,
* and the read position is remembered in readpos_xxx; in state READFILE,
* the current file seek position is the read point, and the write
* position is remembered in writepos_xxx. (The write position is the
* same as EOF, but since BufFileSeek doesn't currently implement
* SEEK_END, we have to remember it explicitly.)
*
* Special case: if we are in WRITEFILE state and eof_reached is true,
* then the read position is implicitly equal to the write position (and
* hence to the file seek position); this way we need not update the
* readpos_xxx variables on each write.
*/
bool eof_reached; /* read reached EOF (always valid) */
int current; /* next array index (valid if INMEM) */
int64 readpos_offset; /* offset (valid if WRITEFILE and not eof) */
int64 writepos_offset; /* offset (valid if READFILE) */
/* markpos_xxx holds marked position for mark and restore */
int64 markpos_current; /* saved "current" */
int64 markpos_file; /* saved "readpos_file" */
int64 markpos_offset; /* saved "readpos_offset" */
/*
* CDB: EXPLAIN ANALYZE reporting interface and statistics.
*/
struct Instrumentation *instrument;
long allowedMem; /* total memory allowed, in bytes */
long availMemMin; /* availMem low water mark (bytes) */
int64 spilledBytes; /* memory used for spilled tuples */
};
#define COPYTUP(state,pos, tup) ((*(state)->copytup) (state, pos, tup))
#define WRITETUP(state,pos, tup) ((*(state)->writetup) (state, pos, tup))
#define READTUP(state,pos,len) ((*(state)->readtup) (state, pos, len))
#define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
static inline void
FREEMEM(Tuplestorestate *state, int amt)
{
if (state->availMemMin > state->availMem)
state->availMemMin = state->availMem;
state->availMem += amt;
}
/*--------------------
*
* NOTES about on-tape representation of tuples:
*
* We require the first "unsigned int" of a stored tuple to be the total size
* on-tape of the tuple, including itself (so it is never zero).
* The remainder of the stored tuple
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
* If state->backward is true, then the stored representation of
* the tuple must be followed by another "unsigned int" that is a copy of the
* length --- so the total tape space used is actually sizeof(unsigned int)
* more than the stored length value. This allows read-backwards. When
* state->backward is not set, the write/read routines may omit the extra
* length word.
*
* writetup is expected to write both length words as well as the tuple
* data. When readtup is called, the tape is positioned just after the
* front length word; readtup must read the tuple data and advance past
* the back length word (if present).
*
* The write/read routines can make use of the tuple description data
* stored in the Tuplestorestate record, if needed. They are also expected
* to adjust state->availMem by the amount of memory space (not tape space!)
* released or consumed. There is no error return from either writetup
* or readtup; they should ereport() on failure.
*
*
* NOTES about memory consumption calculations:
*
* We count space allocated for tuples against the maxKBytes limit,
* plus the space used by the variable-size array memtuples.
* Fixed-size space (primarily the BufFile I/O buffer) is not counted.
* We don't worry about the size of the read pointer array, either.
*
* Note that we count actual space used (as shown by GetMemoryChunkSpace)
* rather than the originally-requested size. This is important since
* palloc can add substantial overhead. It's not a complete answer since
* we won't count any wasted space in palloc allocation blocks, but it's
* a lot better than what we were doing before 7.3.
*
*--------------------
*/
static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
bool interXact,
int maxKBytes);
static void tuplestore_puttuple_common(Tuplestorestate *state, TuplestorePos *pos, void *tuple);
static void dumptuples(Tuplestorestate *state, TuplestorePos *pos);
static uint32 getlen(Tuplestorestate *state, TuplestorePos *pos, bool eofOK);
static void *copytup_heap(Tuplestorestate *state, TuplestorePos *pos, void *tup);
static void writetup_heap(Tuplestorestate *state, TuplestorePos *pos, void *tup);
static void *readtup_heap(Tuplestorestate *state, TuplestorePos *pos, uint32 len);
/*
* tuplestore_begin_pos
*
* Intialize tuple store pos data structure
*/
void
tuplestore_begin_pos(Tuplestorestate* state, TuplestorePos **pos)
{
TuplestorePos *st_pos;
Assert(state != NULL);
st_pos = (TuplestorePos *) palloc0(sizeof(TuplestorePos));
memcpy(st_pos, &(state->pos), sizeof(TuplestorePos));
*pos = st_pos;
}
/*
* tuplestore_begin_xxx
*
* Initialize for a tuple store operation.
*/
static Tuplestorestate *
tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
{
Tuplestorestate *state;
state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
state->status = TSS_INMEM;
state->randomAccess = randomAccess;
state->interXact = interXact;
state->availMem = maxKBytes * 1024L;
state->availMemMin = state->availMem;
state->allowedMem = state->availMem;
state->myfile = NULL;
state->memtupcount = 0;
state->memtupsize = 1024; /* initial guess */
state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
state->pos.eof_reached = false;
state->pos.current = 0;
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->eof_reached = false;
state->current = 0;
return state;
}
/*
* tuplestore_begin_heap
*
* Create a new tuplestore; other types of tuple stores (other than
* "heap" tuple stores, for heap tuples) are possible, but not presently
* implemented.
*
* randomAccess: if true, both forward and backward accesses to the
* tuple store are allowed.
*
* interXact: if true, the files used for on-disk storage persist beyond the
* end of the current transaction. NOTE: It's the caller's responsibility to
* create such a tuplestore in a memory context that will also survive
* transaction boundaries, and to ensure the tuplestore is closed when it's
* no longer wanted.
*
* maxKBytes: how much data to store in memory (any data beyond this
* amount is paged to disk). When in doubt, use work_mem.
*/
Tuplestorestate *
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
{
Tuplestorestate *state;
state = tuplestore_begin_common(randomAccess, interXact, maxKBytes);
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
return state;
}
Tuplestorestate *
tuplestore_begin_heap_file_readerwriter(const char* fileName, bool isWriter)
{
Tuplestorestate *state;
state = tuplestore_begin_common(true, true, 1);
state->myfile = BufFileCreateTemp_ReaderWriter(fileName, isWriter);
state->status = isWriter ? TSS_WRITEFILE : TSS_READFILE;
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
return state;
}
/*
* tuplestore_set_instrument
*
* May be called after tuplestore_begin_xxx() to enable reporting of
* statistics and events for EXPLAIN ANALYZE.
*
* The 'instr' ptr is retained in the 'state' object. The caller must
* ensure that it remains valid for the life of the Tuplestorestate object.
*/
void
tuplestore_set_instrument(Tuplestorestate *state,
struct Instrumentation *instrument)
{
state->instrument = instrument;
} /* tuplestore_set_instrument */
/*
* tuplestore_end
*
* Release resources and clean up.
*/
void
tuplestore_end(Tuplestorestate *state)
{
int i;
/*
* CDB: Report statistics to EXPLAIN ANALYZE.
*/
if (state->instrument)
{
double nbytes;
/* How close did we come to the work_mem limit? */
FREEMEM(state, 0); /* update low-water mark */
nbytes = state->allowedMem - state->availMemMin;
state->instrument->workmemused = Max(state->instrument->workmemused, nbytes);
/* How much work_mem would be enough to hold all tuples in memory? */
if (state->spilledBytes > 0)
{
nbytes = state->allowedMem - state->availMem + state->spilledBytes;
state->instrument->workmemwanted =
Max(state->instrument->workmemwanted, nbytes);
}
}
if (state->myfile)
BufFileClose(state->myfile);
if (state->memtuples)
{
for (i = 0; i < state->memtupcount; i++)
{
if(state->memtuples[i])
pfree(state->memtuples[i]);
}
pfree(state->memtuples);
}
pfree(state);
}
/*
* tuplestore_ateof
*
* Returns the current eof_reached state.
*/
bool
tuplestore_ateof_pos(Tuplestorestate *state, TuplestorePos *pos)
{
return pos->eof_reached;
}
bool
tuplestore_ateof(Tuplestorestate *state)
{
return state->pos.eof_reached;
}
/*
* Accept one tuple and append it to the tuplestore.
*
* Note that the input tuple is always copied; the caller need not save it.
*
* If the read status is currently "AT EOF" then it remains so (the read
* pointer advances along with the write pointer); otherwise the read
* pointer is unchanged. This is for the convenience of nodeMaterial.c.
*
* tuplestore_puttupleslot() is a convenience routine to collect data from
* a TupleTableSlot without an extra copy operation.
*/
void
tuplestore_puttupleslot_pos(Tuplestorestate *state, TuplestorePos *pos,
TupleTableSlot *slot)
{
MemTuple tuple;
/*
* Form a MinimalTuple in working memory
*/
tuple = ExecCopySlotMemTuple(slot);
USEMEM(state, GetMemoryChunkSpace(tuple));
tuplestore_puttuple_common(state, pos, (void *) tuple);
}
void
tuplestore_puttupleslot(Tuplestorestate *state, TupleTableSlot *slot)
{
tuplestore_puttupleslot_pos(state, &state->pos, slot);
}
/*
* "Standard" case to copy from a HeapTuple. This is actually now somewhat
* deprecated, but not worth getting rid of in view of the number of callers.
* (Consider adding something that takes a tupdesc+values/nulls arrays so
* that we can use heap_form_minimal_tuple() and avoid a copy step.)
*/
void
tuplestore_puttuple_pos(Tuplestorestate *state, TuplestorePos *pos, HeapTuple tuple)
{
/*
* Copy the tuple. (Must do this even in WRITEFILE case.)
*/
tuple = COPYTUP(state, pos, tuple);
tuplestore_puttuple_common(state, pos, (void *) tuple);
}
void tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
{
tuplestore_puttuple_pos(state, &state->pos, tuple);
}
static void
tuplestore_puttuple_common(Tuplestorestate *state, TuplestorePos *pos, void *tuple)
{
switch (state->status)
{
case TSS_INMEM:
/*
* Grow the array as needed. Note that we try to grow the array
* when there is still one free slot remaining --- if we fail,
* there'll still be room to store the incoming tuple, and then
* we'll switch to tape-based operation.
*/
if (state->memtupcount >= state->memtupsize - 1)
{
/*
* See grow_memtuples() in tuplesort.c for the rationale
* behind these two tests.
*/
if (state->availMem > (long) (state->memtupsize * sizeof(void *)) &&
(Size) (state->memtupsize * 2) < MaxAllocSize / sizeof(void *))
{
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->memtupsize *= 2;
state->memtuples = (void **)
repalloc(state->memtuples,
state->memtupsize * sizeof(void *));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
}
}
/* Stash the tuple in the in-memory array */
state->memtuples[state->memtupcount++] = tuple;
/* If eof_reached, keep read position in sync */
if (pos->eof_reached)
pos->current = state->memtupcount;
/*
* Done if we still fit in available memory and have array slots.
*/
if (state->memtupcount < state->memtupsize && !LACKMEM(state))
return;
/*
* Nope; time to switch to tape-based operation.
*/
{
char tmpprefix[50];
snprintf(tmpprefix, 50, "slice%d_tuplestore", currentSliceId);
state->myfile = BufFileCreateTemp(tmpprefix, state->interXact);
}
state->status = TSS_WRITEFILE;
dumptuples(state, pos);
break;
case TSS_WRITEFILE:
WRITETUP(state, pos, tuple);
break;
case TSS_READFILE:
/*
* Switch from reading to writing.
*/
if (!pos->eof_reached)
BufFileTell(state->myfile,
&pos->readpos_offset);
if (BufFileSeek(state->myfile,
pos->writepos_offset,
SEEK_SET) != 0)
elog(ERROR, "seek to EOF failed");
state->status = TSS_WRITEFILE;
WRITETUP(state, pos, tuple);
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
}
/*
* Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. If should_free is set, the
* caller must pfree the returned tuple when done with it.
*/
static void *
tuplestore_gettuple(Tuplestorestate *state, TuplestorePos *pos, bool forward,
bool *should_free)
{
uint32 tuplen;
void *tup;
Assert(forward || state->randomAccess);
switch (state->status)
{
case TSS_INMEM:
*should_free = false;
if (forward)
{
if (pos->current < state->memtupcount)
return state->memtuples[pos->current++];
pos->eof_reached = true;
return NULL;
}
else
{
if (pos->current <= 0)
return NULL;
/*
* if all tuples are fetched already then we return last
* tuple, else - tuple before last returned.
*/
if (pos->eof_reached)
pos->eof_reached = false;
else
{
pos->current--; /* last returned tuple */
if (pos->current <= 0)
return NULL;
}
return state->memtuples[pos->current - 1];
}
break;
case TSS_WRITEFILE:
/* Skip state change if we'll just return NULL */
if (pos->eof_reached && forward)
return NULL;
/*
* Switch from writing to reading.
*/
BufFileTell(state->myfile,
&pos->writepos_offset);
if (!pos->eof_reached)
if (BufFileSeek(state->myfile,
pos->readpos_offset,
SEEK_SET) != 0)
elog(ERROR, "seek failed");
state->status = TSS_READFILE;
/* FALL THRU into READFILE case */
case TSS_READFILE:
*should_free = true;
if (forward)
{
if ((tuplen = getlen(state, pos, true)) != 0)
{
tup = READTUP(state, pos, tuplen);
/* CDB XXX XXX XXX XXX */
/* MPP-1347: EXPLAIN ANALYZE shows runaway memory usage.
* Readtup does a usemem, but the free happens in
* ExecStoreTuple. Do a free so state->availMem
* doesn't go massively negative to screw up
* stats. It would be better to interrogate the
* heap for actual memory usage than use this
* homemade accounting.
*/
FREEMEM(state, GetMemoryChunkSpace(tup));
/* CDB XXX XXX XXX XXX */
return tup;
}
else
{
pos->eof_reached = true;
return NULL;
}
}
/*
* Backward.
*
* if all tuples are fetched already then we return last tuple,
* else - tuple before last returned.
*
* Back up to fetch previously-returned tuple's ending length
* word. If seek fails, assume we are at start of file.
*/
insist_log(false, "Backward scanning of tuplestores are not supported at this time");
if (BufFileSeek(state->myfile, -(long) sizeof(uint32) /* offset */,
SEEK_CUR) != 0)
return NULL;
tuplen = getlen(state, pos, false);
if (pos->eof_reached)
{
pos->eof_reached = false;
/* We will return the tuple returned before returning NULL */
}
else
{
/*
* Back up to get ending length word of tuple before it.
*/
if (BufFileSeek(state->myfile,
-(long) (tuplen + 2 * sizeof(uint32)) /* offset */,
SEEK_CUR) != 0)
{
/*
* If that fails, presumably the prev tuple is the first
* in the file. Back up so that it becomes next to read
* in forward direction (not obviously right, but that is
* what in-memory case does).
*/
if (BufFileSeek(state->myfile,
-(long) (tuplen + sizeof(uint32)) /* offset */,
SEEK_CUR) != 0)
elog(ERROR, "bogus tuple length in backward scan");
return NULL;
}
tuplen = getlen(state, pos, false);
}
/*
* Now we have the length of the prior tuple, back up and read it.
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
if (BufFileSeek(state->myfile,
-(long) tuplen /* offset */,
SEEK_CUR) != 0)
elog(ERROR, "bogus tuple length in backward scan");
tup = READTUP(state, pos, tuplen);
return tup;
default:
elog(ERROR, "invalid tuplestore state");
return NULL; /* keep compiler quiet */
}
}
/*
* tuplestore_gettupleslot - exported function to fetch a MinimalTuple
*
* If successful, put tuple in slot and return TRUE; else, clear the slot
* and return FALSE.
*/
bool
tuplestore_gettupleslot_pos(Tuplestorestate *state, TuplestorePos *pos, bool forward,
TupleTableSlot *slot)
{
MemTuple tuple;
bool should_free = false;
tuple = (MemTuple) tuplestore_gettuple(state, pos, forward, &should_free);
if (tuple)
{
ExecStoreMemTuple(tuple, slot, should_free);
return true;
}
else
{
ExecClearTuple(slot);
return false;
}
}
bool
tuplestore_gettupleslot(Tuplestorestate *state, bool forward, TupleTableSlot *slot)
{
return tuplestore_gettupleslot_pos(state, &state->pos, forward, slot);
}
/*
* tuplestore_advance - exported function to adjust position without fetching
*
* We could optimize this case to avoid palloc/pfree overhead, but for the
* moment it doesn't seem worthwhile.
*/
bool
tuplestore_advance_pos(Tuplestorestate *state, TuplestorePos *pos, bool forward)
{
void *tuple;
bool should_free = false;
tuple = tuplestore_gettuple(state, pos, forward, &should_free);
if (tuple)
{
if (should_free)
pfree(tuple);
return true;
}
else
{
return false;
}
}
bool
tuplestore_advance(Tuplestorestate *state, bool forward)
{
return tuplestore_advance_pos(state, &state->pos, forward);
}
/*
* dumptuples - remove tuples from memory and write to tape
*
* As a side effect, we must set readpos and markpos to the value
* corresponding to "current"; otherwise, a dump would lose the current read
* position.
*/
static void
dumptuples(Tuplestorestate *state, TuplestorePos *pos)
{
int i;
for (i = 0;; i++)
{
if (i == pos->current)
BufFileTell(state->myfile,
&pos->readpos_offset);
if (i == pos->markpos_current)
BufFileTell(state->myfile,
&pos->markpos_offset);
if (i >= state->memtupcount)
break;
WRITETUP(state, pos, state->memtuples[i]);
}
state->memtupcount = 0;
}
/* flush underlying file */
void tuplestore_flush(Tuplestorestate *state)
{
if(!state->myfile)
return;
BufFileFlush(state->myfile);
}
/*
* tuplestore_rescan - rewind and replay the scan
*/
void
tuplestore_rescan_pos(Tuplestorestate *state, TuplestorePos *pos)
{
switch (state->status)
{
case TSS_INMEM:
pos->eof_reached = false;
pos->current = 0;
break;
case TSS_WRITEFILE:
pos->eof_reached = false;
pos->readpos_offset = 0L;
break;
case TSS_READFILE:
pos->eof_reached = false;
if (BufFileSeek(state->myfile, 0L /* offset */, SEEK_SET) != 0)
elog(ERROR, "seek to start failed");
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
}
void
tuplestore_rescan(Tuplestorestate *state)
{
tuplestore_rescan_pos(state, &state->pos);
}
/*
* tuplestore_markpos - saves current position in the tuple sequence
*/
void
tuplestore_markpos_pos(Tuplestorestate *state, TuplestorePos *pos)
{
switch (state->status)
{
case TSS_INMEM:
pos->markpos_current = pos->current;
break;
case TSS_WRITEFILE:
if (pos->eof_reached)
{
/* Need to record the implicit read position */
BufFileTell(state->myfile,
&pos->markpos_offset);
}
else
{
pos->markpos_offset = pos->readpos_offset;
}
break;
case TSS_READFILE:
BufFileTell(state->myfile,
&pos->markpos_offset);
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
}
void
tuplestore_markpos(Tuplestorestate *state)
{
tuplestore_markpos_pos(state, &state->pos);
}
/*
* tuplestore_restorepos - restores current position in tuple sequence to
* last saved position
*/
void
tuplestore_restorepos_pos(Tuplestorestate *state, TuplestorePos *pos)
{
switch (state->status)
{
case TSS_INMEM:
pos->eof_reached = false;
pos->current = pos->markpos_current;
break;
case TSS_WRITEFILE:
pos->eof_reached = false;
pos->readpos_offset = pos->markpos_offset;
break;
case TSS_READFILE:
pos->eof_reached = false;
if (BufFileSeek(state->myfile,
pos->markpos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore_restorepos failed");
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
}
void
tuplestore_restorepos(Tuplestorestate *state)
{
tuplestore_restorepos_pos(state, &state->pos);
}
/*
* Tape interface routines
*/
static uint32
getlen(Tuplestorestate *state, TuplestorePos *pos, bool eofOK)
{
uint32 len;
size_t nbytes;
nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
if (nbytes == sizeof(len))
return len;
insist_log(nbytes == 0, "unexpected end of tape");
insist_log(eofOK, "unexpected end of data");
return 0;
}
/*
* Routines specialized for HeapTuple case
*
* The stored form is actually a MinimalTuple, but for largely historical
* reasons we allow COPYTUP to work from a HeapTuple.
*
* Since MinimalTuple already has length in its first word, we don't need
* to write that separately.
*/
static void *
copytup_heap(Tuplestorestate *state, TuplestorePos *pos, void *tup)
{
if(!is_heaptuple_memtuple((HeapTuple) tup))
return heaptuple_copy_to((HeapTuple) tup, NULL, NULL);
return memtuple_copy_to((MemTuple) tup, NULL, NULL, NULL);
}
static void
writetup_heap(Tuplestorestate *state, TuplestorePos *pos, void *tup)
{
uint32 tuplen = 0;
Size memsize = 0;
if(is_heaptuple_memtuple((HeapTuple) tup))
tuplen = memtuple_get_size((MemTuple) tup, NULL);
else
{
Assert(!is_heaptuple_splitter((HeapTuple) tup));
tuplen = heaptuple_get_size((HeapTuple) tup);
}
if (BufFileWrite(state->myfile, (void *) tup, tuplen) != (size_t) tuplen)
elog(ERROR, "write failed");
if (state->randomAccess) /* need trailing length word? */
if (BufFileWrite(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "write failed");
memsize = GetMemoryChunkSpace(tup);
state->spilledBytes += memsize;
FREEMEM(state, memsize);
pfree(tup);
}
static void *
readtup_heap(Tuplestorestate *state, TuplestorePos *pos, uint32 len)
{
void *tup = NULL;
uint32 tuplen = 0;
if(is_len_memtuplen(len))
{
tuplen = memtuple_size_from_uint32(len);
}
else
{
/* len is HeapTuple.t_len. The record size includes rest of the HeapTuple fields */
tuplen = len + HEAPTUPLESIZE;
}
tup = (void *) palloc(tuplen);
USEMEM(state, GetMemoryChunkSpace(tup));
if(is_len_memtuplen(len))
{
/* read in the tuple proper */
memtuple_set_mtlen((MemTuple) tup, NULL, len);
if (BufFileRead(state->myfile, (void *) ((char *) tup + sizeof(uint32)),
tuplen - sizeof(uint32))
!= (size_t) (tuplen - sizeof(uint32)))
{
insist_log(false, "unexpected end of data");
}
}
else
{
HeapTuple htup = (HeapTuple) tup;
htup->t_len = tuplen - HEAPTUPLESIZE;
if (BufFileRead(state->myfile, (void *) ((char *) tup + sizeof(uint32)),
tuplen - sizeof(uint32))
!= (size_t) (tuplen - sizeof(uint32)))
{
insist_log(false, "unexpected end of data");
}
htup->t_data = (HeapTupleHeader ) ((char *) tup + HEAPTUPLESIZE);
}
if (state->randomAccess) /* need trailing length word? */
if (BufFileRead(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
{
insist_log(false, "unexpected end of data");
}
return (void *) tup;
}