blob: f88a3e20a3f311214656c742196acd2c1b43ae08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* tuplestorenew.c
* A better tuplestore
*/
#include "postgres.h"
#include "access/heapam.h"
#include "executor/instrument.h"
#include "executor/execWorkfile.h"
#include "utils/tuplestorenew.h"
#include "utils/memutils.h"
#include "cdb/cdbvars.h" /* currentSliceId */
typedef struct NTupleStorePageHeader
{
long blockn; /* block number */
long page_flag; /* flag */
void *prev_1; /* in mem list */
void *next_1; /* in mem list */
int pin_cnt; /* in mem pin count */
int data_bcnt; /* tail of payload data */
int slot_cnt; /* number of slot writen in the page */
int first_slot; /* first valid slot */
} NTupleStorePageHeader;
typedef struct NTupleStorePageSlotEntry
{
short data_start; /* data starts */
short size; /* size of the entry */
} NTupleStorePageSlotEntry;
#define NTS_MAX_ENTRY_SIZE (BLCKSZ - sizeof(NTupleStorePageHeader) - sizeof(NTupleStorePageSlotEntry))
/* Page. data grown incr, slot grow desc */
typedef struct NTupleStorePage
{
NTupleStorePageHeader header;
char data[NTS_MAX_ENTRY_SIZE];
NTupleStorePageSlotEntry slot[1];
} NTupleStorePage;
typedef struct NTupleStoreLobRef
{
int64 start;
Size size;
} NTupleStoreLobRef;
/* some convinient macro/inline functions */
/* page flag bits */
#define NTS_PAGE_DIRTY 1
static inline bool nts_page_is_dirty(NTupleStorePage * page)
{
return (page->header.page_flag & NTS_PAGE_DIRTY) != 0;
}
static inline void nts_page_set_dirty(NTupleStorePage *page, bool dirty)
{
if(dirty)
(page)->header.page_flag |= NTS_PAGE_DIRTY;
else
(page)->header.page_flag &= (~NTS_PAGE_DIRTY);
}
#define NTS_ALLIGN8(n) (((n)+7) & (~7))
/* page stuff. Note slot grow desc, so the minus array index stuff. */
static inline long nts_page_blockn(NTupleStorePage *page) { return page->header.blockn; }
static inline void nts_page_set_blockn(NTupleStorePage *page, long blockn) { page->header.blockn = blockn; }
static inline NTupleStorePage *nts_page_prev(NTupleStorePage *page) { return (NTupleStorePage *) page->header.prev_1; }
static inline void nts_page_set_prev(NTupleStorePage *page, NTupleStorePage* prev) { page->header.prev_1 = (void *) prev; }
static inline NTupleStorePage *nts_page_next(NTupleStorePage *page) { return (NTupleStorePage *) page->header.next_1; }
static inline void nts_page_set_next(NTupleStorePage *page, NTupleStorePage *next) { page->header.next_1 = (void *) next; }
static inline int nts_page_pin_cnt(NTupleStorePage *page) { return page->header.pin_cnt; }
static inline void nts_page_set_pin_cnt(NTupleStorePage *page, int pc) { page->header.pin_cnt = pc; }
static inline void nts_page_incr_pin_cnt(NTupleStorePage *page) { ++page->header.pin_cnt; }
static inline void nts_page_decr_pin_cnt(NTupleStorePage *page) { --page->header.pin_cnt; }
static inline int nts_page_slot_cnt(NTupleStorePage *page) { return page->header.slot_cnt; }
static inline void nts_page_set_slot_cnt(NTupleStorePage *page, int sc) { page->header.slot_cnt = sc; }
static inline void nts_page_incr_slot_cnt(NTupleStorePage *page) { ++page->header.slot_cnt; }
static inline int nts_page_data_bcnt(NTupleStorePage *page) { return page->header.data_bcnt; }
static inline void nts_page_set_data_bcnt(NTupleStorePage *page, int bc) { page->header.data_bcnt = bc; }
static inline int nts_page_first_valid_slotn(NTupleStorePage *page) { return page->header.first_slot; }
static inline void nts_page_set_first_valid_slotn(NTupleStorePage *page, int fs) { page->header.first_slot = fs; }
static inline int nts_page_valid_slot_cnt(NTupleStorePage *page)
{
return nts_page_slot_cnt(page) - nts_page_first_valid_slotn(page);
}
static inline NTupleStorePageSlotEntry *nts_page_slot_entry(NTupleStorePage *page, int slotn)
{
return &(page->slot[-slotn]);
}
static inline char *nts_page_slot_data_ptr(NTupleStorePage *page, int slotn)
{
return page->data + nts_page_slot_entry(page, slotn)->data_start;
}
static inline int nts_page_slot_data_len(NTupleStorePage *page, int slotn)
{
return nts_page_slot_entry(page, slotn)->size;
}
static inline NTupleStorePageSlotEntry *nts_page_next_slot_entry(NTupleStorePage *page)
{
return nts_page_slot_entry(page, nts_page_slot_cnt(page));
}
static inline char *nts_page_next_slot_data_ptr(NTupleStorePage *page)
{
return page->data + NTS_ALLIGN8(nts_page_data_bcnt(page));
}
static inline int nts_page_avail_bytes(NTupleStorePage *page)
{
int b = (char *) nts_page_slot_entry(page, nts_page_slot_cnt(page)) -
(char *) nts_page_next_slot_data_ptr(page);
if(b < 0)
return 0;
return b;
}
static inline void verify_nts_page_format_is_sane()
{
Assert(sizeof(NTupleStorePage) == BLCKSZ);
Assert(NTS_ALLIGN8(sizeof(NTupleStorePageHeader)) == sizeof(NTupleStorePageHeader));
Assert(offsetof(NTupleStorePage, data) == sizeof(NTupleStorePageHeader));
Assert(offsetof(NTupleStorePage, slot) == BLCKSZ-sizeof(NTupleStorePageSlotEntry));
}
static inline void update_page_slot_entry(NTupleStorePage *page, int len)
{
short data_start = (short) NTS_ALLIGN8( nts_page_data_bcnt(page) );
int w_len = len > 0 ? len : -len;
NTupleStorePageSlotEntry *slot = nts_page_next_slot_entry(page);
Assert(len < (int) NTS_MAX_ENTRY_SIZE);
Assert(len > 0 || len == -(int)sizeof(NTupleStoreLobRef));
slot->data_start = data_start;
slot->size = (short) len;
nts_page_incr_slot_cnt(page);
nts_page_set_data_bcnt(page, data_start+w_len);
nts_page_set_dirty(page, true);
}
/* tuple store type */
#define NTS_NOT_READERWRITER 1
#define NTS_IS_WRITER 2
#define NTS_IS_READER 3
struct NTupleStore
{
int page_max; /* max page the store can use */
int page_effective_max;
int page_cnt; /* page count */
int pin_cnt; /* pin count */
NTupleStorePage *first_page; /* first page */
NTupleStorePage *last_page; /* last page */
NTupleStorePage *first_free_page; /* free page, cached to save palloc */
long first_ondisk_blockn; /* first blockn that is written to disk */
int rwflag; /* if I am ordinary store, or a reader, or a writer of readerwriter (share input) */
bool cached_workfiles_found; /* true if found matching and usable cached workfiles */
bool cached_workfiles_loaded; /* set after loading cached workfiles */
bool workfiles_created; /* set if the operator created workfiles */
workfile_set *work_set; /* workfile set to use when using workfile manager */
ExecWorkFile *pfile; /* underlying backed file */
ExecWorkFile *plobfile; /* underlying backed file for lobs (entries does not fit one page) */
int64 lobbytes; /* number of bytes written to lob file */
List *accessors; /* all current accessors of the store */
bool fwacc; /* if I had already has a write acc */
/* instrumentation for explain analyze */
Instrumentation *instrument;
};
bool ntuplestore_is_readerwriter_reader(NTupleStore *nts) { return nts->rwflag == NTS_IS_READER; }
bool ntuplestore_is_readerwriter_writer(NTupleStore *nts) { return nts->rwflag == NTS_IS_WRITER; }
bool ntuplestore_is_readerwriter(NTupleStore *nts) { return nts->rwflag != NTS_NOT_READERWRITER; }
/* Accessor to the tuplestore.
*
* About the pos of an Accessor.
* An acessor has a pos, pos.blockn == -1 implies the postion is not valid.
* Else, pos.blockn should be the same as blockn of the current page.
* If the pos.blockn is valid, then pos.slotn should also be a valid slotn in the
* page, or
* 1. it may equals (first_valid - 1), which indicates it is positioned right before
* first valid slot,
* 2. it may equals page->header.slotn, which indicates it is positioned just after
* the last slot,
* the two positons are handy when we advance the accessor.
*/
struct NTupleStoreAccessor
{
NTupleStore *store;
NTupleStorePage *page;
bool isWriter;
char *tmp_lob;
int tmp_len;
NTupleStorePos pos;
};
static void ntuplestore_init_reader(NTupleStore *store, int maxBytes);
static void ntuplestore_create_spill_files(NTupleStore *nts);
void ntuplestore_setinstrument(NTupleStore *st, struct Instrumentation *instr)
{
st->instrument = instr;
}
static inline void init_page(NTupleStorePage *page)
{
nts_page_set_blockn(page, -1);
nts_page_set_prev(page, NULL);
nts_page_set_next(page, NULL);
nts_page_set_dirty(page, true);
nts_page_set_pin_cnt(page, 0);
nts_page_set_data_bcnt(page, 0);
nts_page_set_slot_cnt(page, 0);
nts_page_set_first_valid_slotn(page, 0);
}
static inline void nts_pin_page(NTupleStore* nts, NTupleStorePage *page)
{
Assert(nts && page);
nts_page_incr_pin_cnt(page);
++nts->pin_cnt;
}
static inline void nts_unpin_page(NTupleStore *nts, NTupleStorePage *page)
{
Assert(nts && nts->pin_cnt > 0);
Assert(page && nts_page_pin_cnt(page) > 0);
--nts->pin_cnt;
nts_page_decr_pin_cnt(page);
}
/* Prepend to list head. Pass in pointer of prev, and next so that the function
* can be reused later if the one page can be on server lists
*/
static NTupleStorePage *nts_prepend_to_dlist(
NTupleStorePage *head,
NTupleStorePage *page,
void **prev, void **next)
{
void **head_prev;
*prev = NULL;
if(!head)
{
(*next) = NULL;
return page;
}
(*next) = head;
head_prev = (void **) ((char *) head + ((char *) prev - (char *) page));
*head_prev = page;
return page;
}
#define NTS_PREPEND_1(head, page) (nts_prepend_to_dlist((head), (page), &((page)->header.prev_1), &((page)->header.next_1)))
/* Append page after curr */
static void nts_append_to_dlist(
NTupleStorePage *curr,
NTupleStorePage *page,
void **prev, void **next)
{
*prev = curr;
if(!curr)
*next = NULL;
else
{
void **curr_next = (void **) ((char *) curr + ((char *) next - (char *) page));
NTupleStorePage *nextpage = (NTupleStorePage *) (*curr_next);
*curr_next = page;
*next = nextpage;
if(nextpage)
{
void **next_prev = (void **) ((char *) nextpage + ((char *) prev - (char *) page));
Assert( *next_prev == curr );
*next_prev = page;
}
}
}
#define NTS_APPEND_1(curr, page) (nts_append_to_dlist((curr), (page), &((page)->header.prev_1), &((page)->header.next_1)))
static void nts_remove_page_from_dlist(NTupleStorePage *page, void **prev, void **next)
{
void ** prev_next = (void **) ((char *) (*prev) + ((char *) next - (char *) page));
void ** next_prev = (void **) ((char *) (*next) + ((char *) prev - (char *) page));
if(*prev)
*prev_next = *next;
if(*next)
*next_prev = *prev;
}
#define NTS_REMOVE_1(page) (nts_remove_page_from_dlist((page), &((page)->header.prev_1), &((page)->header.next_1)))
/* read a page by blockn. After read the page in not pined and not in any list */
static bool ntsReadBlock(NTupleStore *ts, int blockn, NTupleStorePage *page)
{
long diskblockn = blockn - ts->first_ondisk_blockn;
if(!ts->pfile)
return false;
Assert(ts->first_ondisk_blockn >= 0);
Assert(ts && diskblockn >= 0 && page);
if(ExecWorkFile_Seek(ts->pfile, diskblockn * BLCKSZ, SEEK_SET) != 0 ||
ExecWorkFile_Read(ts->pfile, page, BLCKSZ) != BLCKSZ)
{
return false;
}
Assert(nts_page_blockn(page) == blockn);
Assert(!nts_page_is_dirty(page));
nts_page_set_pin_cnt(page, 0);
nts_page_set_prev(page, NULL);
nts_page_set_next(page, NULL);
return true;
}
/* write a page */
static bool ntsWriteBlock(NTupleStore *ts, NTupleStorePage *page)
{
Assert(ts->rwflag != NTS_IS_READER);
Assert(nts_page_blockn(page) >= 0);
if(ts->first_ondisk_blockn == -1)
{
AssertImply(ts->rwflag == NTS_IS_WRITER, nts_page_blockn(page) == 0);
ts->first_ondisk_blockn = nts_page_blockn(page);
}
Assert(nts_page_blockn(page) >= ts->first_ondisk_blockn);
Assert(nts_page_is_dirty(page));
nts_page_set_dirty(page, false);
if(ExecWorkFile_Seek(ts->pfile, (nts_page_blockn(page) - ts->first_ondisk_blockn) * BLCKSZ, SEEK_SET) != 0 ||
!ExecWorkFile_Write(ts->pfile, page, BLCKSZ))
{
return false;
}
return true;
}
/* Put a page onto the free list. Do not increase nts->page_cnt */
static void nts_return_free_page(NTupleStore *nts, NTupleStorePage *page)
{
nts->first_free_page = NTS_PREPEND_1(nts->first_free_page, page);
}
static inline void *check_malloc(int size)
{
void *ptr = gp_malloc(size);
if(!ptr)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("NTupleStore failed to malloc: out of memory")));
return ptr;
}
/* Get a free page. Will shrink if necessary.
* The page returned still belong to the store (accounted by nts->page_cnt), but it is
* not on any list. Caller is responsible to put it back onto a list.
*
* Page is allocated/freed with gcc malloc/free, not palloc/pfree.
*/
static NTupleStorePage *nts_get_free_page(NTupleStore *nts)
{
NTupleStorePage *page = NULL;
NTupleStorePage *page_next = NULL;
int page_max = nts->page_max;
/* if have free page, just use it */
if(nts->first_free_page)
{
page = nts->first_free_page;
nts->first_free_page = nts_page_next(page);
init_page(page);
return page;
}
/* use 1/4 of max allowed if we are working with a disk file */
if(nts->pfile)
page_max >>= 2;
if(nts->page_cnt >= page_max)
{
if(!nts->pfile)
{
if (nts->work_set != NULL)
{
/* We have a usable workfile_set. Use that to generate temp files */
ntuplestore_create_spill_files(nts);
}
else
{
char tmpprefix[MAXPGPATH];
snprintf(tmpprefix, MAXPGPATH, "%s/slice%d_ntuplestore", PG_TEMP_FILES_DIR, currentSliceId);
nts->pfile = ExecWorkFile_CreateUnique(tmpprefix, BUFFILE, true /* delOnClose */, 0 /* compressType */ );
}
nts->workfiles_created = true;
if (nts->instrument)
{
nts->instrument->workfileCreated = true;
}
}
page = nts->first_page;
while(page)
{
page_next = nts_page_next(page);
if(page_next && nts_page_pin_cnt(page_next) == 0)
{
NTS_REMOVE_1(page_next);
if(nts_page_is_dirty(page_next))
{
Assert(nts->rwflag != NTS_IS_READER);
if(!ntsWriteBlock(nts, page_next))
{
workfile_mgr_report_error();
}
}
if(nts->page_cnt >= page_max)
{
gp_free2(page_next, sizeof(NTupleStorePage));
--nts->page_cnt;
}
else
break;
}
else
page = page_next;
}
if(page_next)
{
init_page(page_next);
return page_next;
}
/* failed to shrink */
if(nts->page_cnt >= page_max)
return NULL;
}
Assert(page_next == NULL && nts->page_cnt < page_max);
page = (NTupleStorePage *) check_malloc(sizeof(NTupleStorePage));
init_page(page);
++nts->page_cnt;
if(nts->instrument)
{
nts->instrument->workmemused = Max(nts->instrument->workmemused, nts->page_cnt * BLCKSZ);
if(nts->last_page)
{
long pagewanted = nts_page_blockn(nts->last_page) - nts_page_blockn(nts->first_page) + 1;
Assert(pagewanted >= 1);
nts->instrument->workmemwanted = Max(nts->instrument->workmemwanted, pagewanted * BLCKSZ);
}
}
return page;
}
/* Find a page in the store with blockn, return NULL if not found.
* It will load the page from disk if necessary. The page returned
* is not pin-ed.
*/
static NTupleStorePage *nts_load_page(NTupleStore* store, int blockn)
{
NTupleStorePage *page = NULL;
NTupleStorePage *page_next = NULL;
bool readOK;
/* fast track. Try last page */
if(store->last_page && nts_page_blockn(store->last_page) == blockn)
return store->last_page;
/* try the in mem list */
page = store->first_page;
while(page)
{
if(nts_page_blockn(page) == blockn)
return page;
page_next = nts_page_next(page);
if(page_next == NULL || nts_page_blockn(page_next) > blockn)
break;
page = page_next;
}
Assert(page);
/* not found. Need to load from disk */
page_next = nts_get_free_page(store);
if(!page_next)
return NULL;
readOK = ntsReadBlock(store, blockn, page_next);
if(!readOK)
{
nts_return_free_page(store, page_next);
return NULL;
}
NTS_APPEND_1(page, page_next);
return page_next;
}
/* Find the next page. This function is likely to be faster than
* nts_load_page(nts_page_blockn(page) + 1);
*/
static NTupleStorePage *nts_load_next_page(NTupleStore* store, NTupleStorePage *page)
{
int blockn = nts_page_blockn(page) + 1;
bool fOK;
NTupleStorePage *next = nts_page_next(page);
if(!next && store->rwflag != NTS_IS_READER)
return NULL;
if(!next || nts_page_blockn(next) > blockn)
{
next = nts_get_free_page(store);
if (next == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("Can not allocate a new page in the tuplestore.")));
fOK = ntsReadBlock(store, blockn, next);
if(!fOK)
{
Assert(store->rwflag == NTS_IS_READER);
nts_return_free_page(store, next);
return NULL;
}
NTS_APPEND_1(page, next);
}
Assert(next && nts_page_blockn(next) == blockn);
return next;
}
/* Find prev page. Likely to be faster than nts_load_page(nts_page_blockn(page) - 1) */
static NTupleStorePage *nts_load_prev_page(NTupleStore *store, NTupleStorePage *page)
{
int blockn = nts_page_blockn(page) - 1;
bool fOK;
NTupleStorePage *prev = NULL;
if(blockn < nts_page_blockn(store->first_page))
return NULL;
prev = nts_page_prev(page);
Assert(prev && nts_page_blockn(prev) <= blockn);
if(nts_page_blockn(prev) == blockn)
return prev;
else
{
NTupleStorePage *p = nts_get_free_page(store);
if(!p)
return NULL;
fOK = ntsReadBlock(store, blockn, p);
Assert(fOK);
NTS_APPEND_1(prev, p);
return p;
}
}
static void ntuplestore_cleanup(NTupleStore *ts, bool fNormal, bool canReportError)
{
NTupleStorePage *p = ts->first_page;
/* normal case: for each accessor, we mark it has no owning store.
* This do not need to, actually, cannot be called in error out case,
* because the memory context of ts->accessor has already been
* cleaned up
*/
if(fNormal)
{
ListCell *cell;
foreach(cell, ts->accessors)
{
NTupleStoreAccessor *acc = (NTupleStoreAccessor *) lfirst(cell);
acc->store = NULL;
acc->page = NULL;
}
}
while(p)
{
NTupleStorePage *pnext = nts_page_next(p);
gp_free2(p, sizeof(NTupleStorePage));
p = pnext;
}
p = ts->first_free_page;
while(p)
{
NTupleStorePage *pnext = nts_page_next(p);
gp_free2(p, sizeof(NTupleStorePage));
p = pnext;
}
if(ts->pfile)
{
workfile_mgr_close_file(ts->work_set, ts->pfile, canReportError);
ts->pfile = NULL;
}
if(ts->plobfile)
{
workfile_mgr_close_file(ts->work_set, ts->plobfile, canReportError);
ts->plobfile = NULL;
}
if (ts->work_set != NULL)
{
workfile_mgr_close_set(ts->work_set);
ts->work_set = NULL;
}
gp_free2(ts, sizeof(NTupleStore));
}
static void XCallBack_NTS(XactEvent event, void *nts)
{
ntuplestore_cleanup((NTupleStore *)nts, false, (event!=XACT_EVENT_ABORT));
}
NTupleStore *
ntuplestore_create(int maxBytes)
{
NTupleStore *store = (NTupleStore *) check_malloc(sizeof(NTupleStore));
store->pfile = NULL;
store->first_ondisk_blockn = 0;
store->plobfile = NULL;
store->lobbytes = 0;
store->work_set = NULL;
store->cached_workfiles_found = false;
store->cached_workfiles_loaded = false;
store->workfiles_created = false;
Assert(maxBytes >= 0);
store->page_max = maxBytes / BLCKSZ;
/* give me at least 16 pages */
if(store->page_max < 16)
store->page_max = 16;
store->first_page = (NTupleStorePage *) check_malloc(sizeof(NTupleStorePage));
init_page(store->first_page);
nts_page_set_blockn(store->first_page, 0);
store->page_cnt = 1;
store->pin_cnt = 0;
store->last_page = store->first_page;
nts_pin_page(store, store->first_page);
nts_pin_page(store, store->last_page);
store->first_free_page = NULL;
store->rwflag = NTS_NOT_READERWRITER;
store->accessors = NULL;
store->fwacc = false;
store->instrument = NULL;
RegisterXactCallbackOnce(XCallBack_NTS, (void *) store);
return store;
}
/*
* Initialize a ntuplestore that is shared across slice through a ShareInputScan node
*
* filename must be a unique name that identifies the share.
* filename does not include the pgsql_tmp/ prefix
*/
NTupleStore *
ntuplestore_create_readerwriter(const char *filename, int maxBytes, bool isWriter)
{
NTupleStore* store = NULL;
char filenameprefix[MAXPGPATH];
char filenamelob[MAXPGPATH];
snprintf(filenameprefix, sizeof(filenameprefix), "%s/%s", PG_TEMP_FILES_DIR, filename);
snprintf(filenamelob, sizeof(filenamelob), "%s_LOB", filenameprefix);
if(isWriter)
{
store = ntuplestore_create(maxBytes);
store->pfile = ExecWorkFile_Create(filenameprefix, BUFFILE,
true /*delOnClose */, 0 /* compressType */);
store->rwflag = NTS_IS_WRITER;
store->plobfile = ExecWorkFile_Create(filenamelob, BUFFILE,
true /* delOnClose */, 0 /* compressType */ );
store->lobbytes = 0;
}
else
{
store = (NTupleStore *) check_malloc(sizeof(NTupleStore));
store->work_set = NULL;
store->cached_workfiles_found = false;
store->cached_workfiles_loaded = false;
store->workfiles_created = false;
store->pfile = ExecWorkFile_Open(filenameprefix, BUFFILE,
false /* delOnClose */,
0 /* compressType */);
store->plobfile = ExecWorkFile_Open(filenamelob, BUFFILE,
false /* delOnClose */,
0 /* compressType */);
ntuplestore_init_reader(store, maxBytes);
RegisterXactCallbackOnce(XCallBack_NTS, (void *) store);
}
return store;
}
/*
* Initializes a ntuplestore based on existing files.
*
* spill_filename and spill_lob_filename are required to have pgsql_tmp/ part of the name
*/
static void
ntuplestore_init_reader(NTupleStore *store, int maxBytes)
{
Assert(NULL != store);
Assert(NULL != store->pfile);
Assert(NULL != store->plobfile);
store->first_ondisk_blockn = 0;
store->rwflag = NTS_IS_READER;
store->lobbytes = 0;
Assert(maxBytes >= 0);
store->page_max = maxBytes / BLCKSZ;
/* give me at least 16 pages */
if(store->page_max < 16)
store->page_max = 16;
store->first_page = (NTupleStorePage *) check_malloc(sizeof(NTupleStorePage));
init_page(store->first_page);
bool fOK = ntsReadBlock(store, 0, store->first_page);
if(!fOK)
{
/* empty file. We set blockn anyway */
nts_page_set_blockn(store->first_page, 0);
}
store->page_cnt = 1;
store->pin_cnt = 0;
nts_pin_page(store, store->first_page);
store->last_page = NULL;
store->first_free_page = NULL;
store->accessors = NULL;
store->fwacc = false;
store->instrument = NULL;
}
/*
* Create tuple store using the workfile manager to create spill files if needed.
* The workSet needs to be initialized by the caller.
*/
NTupleStore *
ntuplestore_create_workset(workfile_set *workSet, bool cachedWorkfilesFound, int maxBytes)
{
elog(gp_workfile_caching_loglevel, "Creating tuplestore with workset in directory %s", workSet->path);
NTupleStore *store = ntuplestore_create(maxBytes);
store->work_set = workSet;
store->cached_workfiles_found = cachedWorkfilesFound;
if (store->cached_workfiles_found)
{
Assert(store->work_set != NULL);
/* Reusing existing files. Load data from spill files here */
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
store->pfile = workfile_mgr_open_fileno(store->work_set, WORKFILE_NUM_TUPLESTORE_DATA);
store->plobfile = workfile_mgr_open_fileno(store->work_set, WORKFILE_NUM_TUPLESTORE_LOB);
MemoryContextSwitchTo(oldcxt);
ntuplestore_init_reader(store, maxBytes);
store->cached_workfiles_loaded = true;
}
else
{
/* Creating new workset */
store->rwflag = NTS_IS_WRITER;
}
return store;
}
void
ntuplestore_reset(NTupleStore *ts)
{
NTupleStorePage *p = ts->first_page;
Assert(list_length(ts->accessors) == 0);
Assert(!ts->fwacc);
Assert(ts->rwflag == NTS_NOT_READERWRITER || !"Reset NYI for reader writer");
while(p)
{
NTupleStorePage *next = nts_page_next(p);
ts->first_free_page = NTS_PREPEND_1(ts->first_free_page, p);
p = next;
}
ts->pin_cnt = 0;
Assert(ts->first_free_page != NULL);
ts->first_page = ts->first_free_page;
ts->first_free_page = nts_page_next(ts->first_page);
init_page(ts->first_page);
nts_page_set_blockn(ts->first_page, 0);
ts->last_page = ts->first_page;
nts_pin_page(ts, ts->first_page);
nts_pin_page(ts, ts->last_page);
ts->first_ondisk_blockn = 0;
if(ts->plobfile)
{
#ifdef USE_ASSERT_CHECKING
int errorcode =
#endif /* USE_ASSERT_CHECKING */
ExecWorkFile_Seek(ts->plobfile, 0 /* offset */, SEEK_SET);
Assert(errorcode == 0);
}
ts->lobbytes = 0;
}
void
ntuplestore_flush(NTupleStore *ts)
{
NTupleStorePage *p = ts->first_page;
Assert(ts->rwflag != NTS_IS_READER || !"Flush attempted for Reader");
Assert(ts->pfile);
while(p)
{
if(nts_page_is_dirty(p) && nts_page_slot_cnt(p) > 0)
{
if (!ntsWriteBlock(ts, p))
{
workfile_mgr_report_error();
}
}
p = nts_page_next(p);
}
ExecWorkFile_Flush(ts->pfile);
if (ts->plobfile != NULL)
{
ExecWorkFile_Flush(ts->plobfile);
}
}
void
ntuplestore_destroy(NTupleStore *ts)
{
UnregisterXactCallbackOnce(XCallBack_NTS, (void *) ts);
ntuplestore_cleanup(ts, true, true);
}
NTupleStoreAccessor*
ntuplestore_create_accessor(NTupleStore *ts, bool isWriter)
{
NTupleStoreAccessor *acc = (NTupleStoreAccessor *) palloc(sizeof(NTupleStoreAccessor));
acc->store = ts;
acc->isWriter = isWriter;
acc->tmp_lob = NULL;
acc->tmp_len = 0;
acc->page = NULL;
acc->pos.blockn = -1;
acc->pos.slotn = -1;
ntuplestore_acc_seek_first(acc);
ts->accessors = lappend(ts->accessors, acc);
AssertImply(isWriter, ts->rwflag != NTS_IS_READER);
AssertImply(isWriter, !ts->fwacc);
if(isWriter)
ts->fwacc = true;
return acc;
}
void ntuplestore_destroy_accessor(NTupleStoreAccessor *acc)
{
if(!acc->store)
{
Assert(!acc->page);
pfree(acc);
}
else
{
if(acc->isWriter)
{
Assert(acc->store->fwacc);
acc->store->fwacc = false;
}
if(acc->page)
nts_unpin_page(acc->store, acc->page);
acc->store->accessors = list_delete_ptr(acc->store->accessors, acc);
if (acc->tmp_lob)
pfree(acc->tmp_lob);
pfree(acc);
}
}
static long ntuplestore_put_lob(NTupleStore *nts, char* data, NTupleStoreLobRef *lobref)
{
if(!nts->plobfile)
{
if (nts->work_set != NULL)
{
/* We have a usable workfile_set. Use that to generate temp files */
ntuplestore_create_spill_files(nts);
}
else
{
char tmpprefix[MAXPGPATH];
Assert(nts->rwflag == NTS_NOT_READERWRITER);
Assert(nts->work_set == NULL);
Assert(nts->lobbytes == 0);
snprintf(tmpprefix, sizeof(tmpprefix), "%s/slice%d_ntuplestorelob", PG_TEMP_FILES_DIR, currentSliceId);
nts->plobfile = ExecWorkFile_CreateUnique(tmpprefix, BUFFILE,
true /*delOnClose */, 0 /* compressType */);
}
}
lobref->start = nts->lobbytes;
#if USE_ASSERT_CHECKING
bool res =
#endif
ExecWorkFile_Write(nts->plobfile, data, lobref->size);
Assert(res);
nts->lobbytes += lobref->size;
return lobref->size;
}
static long ntuplestore_get_lob(NTupleStore *nts, void *data, NTupleStoreLobRef *lobref)
{
Assert(lobref->start >= 0);
long ret = ExecWorkFile_Seek(nts->plobfile, lobref->start, SEEK_SET);
Assert(ret == 0);
ret = ExecWorkFile_Read(nts->plobfile, data, lobref->size);
Assert(ret == lobref->size);
return ret;
}
void ntuplestore_acc_put_tupleslot(NTupleStoreAccessor *tsa, TupleTableSlot *slot)
{
char *dest_ptr;
uint32 dest_len;
MemTuple mtup;
Assert(tsa->isWriter);
Assert(tsa->store && tsa->store->first_page && tsa->store->last_page);
/* first move the page to last page */
if(!tsa->page || tsa->page != tsa->store->last_page)
{
if(tsa->page)
nts_unpin_page(tsa->store, tsa->page);
tsa->page = tsa->store->last_page;
nts_pin_page(tsa->store, tsa->page);
}
dest_ptr = nts_page_next_slot_data_ptr(tsa->page);
dest_len = (uint32) nts_page_avail_bytes(tsa->page);
Assert(dest_len <= NTS_MAX_ENTRY_SIZE);
mtup = ExecCopySlotMemTupleTo(slot, NULL, dest_ptr, &dest_len);
if(mtup != NULL)
update_page_slot_entry(tsa->page, (int) dest_len);
else if(dest_len <= NTS_MAX_ENTRY_SIZE)
{
NTupleStorePage *page = nts_get_free_page(tsa->store);
if(!page)
elog(ERROR, "NTuplestore out of page");
Assert(nts_page_slot_cnt(page) == 0 && nts_page_data_bcnt(page) == 0);
dest_ptr = nts_page_next_slot_data_ptr(page);
dest_len = (unsigned int) nts_page_avail_bytes(page);
mtup = ExecCopySlotMemTupleTo(slot, NULL, dest_ptr, &dest_len);
Assert(mtup!=NULL);
update_page_slot_entry(page, (int) dest_len);
nts_page_set_blockn(page, nts_page_blockn(tsa->page) + 1);
NTS_APPEND_1(tsa->page, page);
nts_unpin_page(tsa->store, tsa->page);
nts_pin_page(tsa->store, page);
tsa->page = page;
nts_unpin_page(tsa->store, tsa->store->last_page);
tsa->store->last_page = page;
nts_pin_page(tsa->store, tsa->store->last_page);
}
else
{
char* lob = palloc(dest_len);
NTupleStoreLobRef lobref;
lobref.size = dest_len;
mtup = ExecCopySlotMemTupleTo(slot, NULL, lob, &dest_len);
Assert(mtup != NULL && dest_len == lobref.size);
ntuplestore_put_lob(tsa->store, lob, &lobref);
ntuplestore_acc_put_data(tsa, &lobref, -(int)sizeof(NTupleStoreLobRef));
pfree(lob);
return;
}
tsa->pos.blockn = nts_page_blockn(tsa->page);
tsa->pos.slotn = nts_page_slot_cnt(tsa->page) - 1;
}
void ntuplestore_acc_put_data(NTupleStoreAccessor *tsa, void *data, int len)
{
char *dest_ptr;
int dest_len;
int w_len = len > 0 ? len : -len;
Assert(tsa->isWriter);
Assert(tsa->store && tsa->store->first_page && tsa->store->last_page);
Assert(len > 0 || len == (-(int)sizeof(NTupleStoreLobRef)));
if (len > (int) NTS_MAX_ENTRY_SIZE)
{
NTupleStoreLobRef lobref;
lobref.size = len;
ntuplestore_put_lob(tsa->store, data, &lobref);
ntuplestore_acc_put_data(tsa, &lobref, -(int)sizeof(NTupleStoreLobRef));
return;
}
Assert(len <= (int) NTS_MAX_ENTRY_SIZE);
/* first move the page to last page */
if(!tsa->page || tsa->page != tsa->store->last_page)
{
if (tsa->page)
nts_unpin_page(tsa->store, tsa->page);
tsa->page = tsa->store->last_page;
nts_pin_page(tsa->store, tsa->page);
}
dest_ptr = nts_page_next_slot_data_ptr(tsa->page);
dest_len = nts_page_avail_bytes(tsa->page);
if(dest_len >= w_len)
{
memcpy(dest_ptr, data, w_len);
update_page_slot_entry(tsa->page, len);
}
else
{
NTupleStorePage *page = nts_get_free_page(tsa->store);
if(!page)
elog(ERROR, "NTuplestore out of page");
Assert(nts_page_slot_cnt(page) == 0 && nts_page_data_bcnt(page) == 0);
dest_ptr = nts_page_next_slot_data_ptr(page);
memcpy(dest_ptr, data, w_len);
update_page_slot_entry(page, len);
nts_page_set_blockn(page, nts_page_blockn(tsa->page) + 1);
NTS_APPEND_1(tsa->page, page);
nts_unpin_page(tsa->store, tsa->page);
nts_pin_page(tsa->store, page);
tsa->page = page;
nts_unpin_page(tsa->store, tsa->store->last_page);
tsa->store->last_page = page;
nts_pin_page(tsa->store, tsa->store->last_page);
}
tsa->pos.blockn = nts_page_blockn(tsa->page);
tsa->pos.slotn = nts_page_slot_cnt(tsa->page) - 1;
}
/* XXX Not done yet.
* Postgres does not allow hole in the BufFile, therefore, even if we are trimming
* a page, if, we are already in disk mode, we will have to flush the page (or write
* any junk, but we need to write a page). A better way is to may logical page blockn
* to a physical blockn.
*/
void ntuplestore_trim(NTupleStore *ts, NTupleStorePos *pos)
{
NTupleStorePage *page = nts_load_page(ts, pos->blockn);
Assert(page);
nts_page_set_first_valid_slotn(page, pos->slotn);
nts_unpin_page(ts, ts->first_page);
while(ts->first_page != page)
{
NTupleStorePage *next = nts_page_next(ts->first_page);
Assert(nts_page_pin_cnt(ts->first_page) == 0);
/* Flush dirty page anyway, to prevent holes in file */
if(nts_page_is_dirty(ts->first_page))
{
if(ts->pfile)
{
if (!ntsWriteBlock(ts, ts->first_page))
{
workfile_mgr_report_error();
}
}
}
ts->first_free_page = NTS_PREPEND_1(ts->first_free_page, ts->first_page);
ts->first_page = next;
}
Assert(ts->first_page);
nts_pin_page(ts, ts->first_page);
}
int ntuplestore_compare_pos(NTupleStore *ts, NTupleStorePos *pos1, NTupleStorePos *pos2)
{
if(pos1->blockn < pos2->blockn)
return -1;
if(pos1->blockn > pos2->blockn)
return 1;
if(pos1->slotn < pos2->slotn)
return -1;
if(pos1->slotn > pos2->slotn)
return 1;
return 0;
}
static void ntuplestore_acc_advance_in_page(NTupleStoreAccessor *tsa, int* pn)
{
if(*pn == 0)
return ;
if(*pn > 0) /* seeking forward */
{
Assert(tsa->pos.slotn + 1 >= nts_page_first_valid_slotn(tsa->page));
if((tsa->pos.slotn + *pn) < nts_page_slot_cnt(tsa->page))
{
tsa->pos.slotn += *pn;
*pn = 0;
}
else
*pn -= (nts_page_slot_cnt(tsa->page) - tsa->pos.slotn) - 1;
}
else /* seeking backward */
{
if(tsa->pos.slotn + *pn >= nts_page_first_valid_slotn(tsa->page))
{
tsa->pos.slotn += *pn;
*pn = 0;
}
else
*pn += (tsa->pos.slotn - nts_page_first_valid_slotn(tsa->page));
}
}
bool ntuplestore_acc_advance(NTupleStoreAccessor *tsa, int n)
{
if(!tsa->page)
return false;
ntuplestore_acc_advance_in_page(tsa, &n);
while(n != 0)
{
if(n > 0) /* seek forward */
{
NTupleStorePage *oldpage = tsa->page;
tsa->page = nts_load_next_page(tsa->store, tsa->page);
nts_unpin_page(tsa->store, oldpage);
if(!tsa->page)
{
tsa->pos.blockn = -1;
tsa->pos.slotn = -1;
return false;
}
Assert(nts_page_blockn(tsa->page) == tsa->pos.blockn + 1);
Assert(nts_page_valid_slot_cnt(tsa->page) > 0);
nts_pin_page(tsa->store, tsa->page);
tsa->pos.blockn = nts_page_blockn(tsa->page);
tsa->pos.slotn = -1;
}
else /* seeking backward */
{
NTupleStorePage *oldpage = tsa->page;
tsa->page = nts_load_prev_page(tsa->store, tsa->page);
nts_unpin_page(tsa->store, oldpage);
if(!tsa->page)
{
tsa->pos.blockn = -1;
tsa->pos.slotn = -1;
return false;
}
Assert(nts_page_blockn(tsa->page) == tsa->pos.blockn - 1);
Assert(nts_page_valid_slot_cnt(tsa->page) > 0);
nts_pin_page(tsa->store, tsa->page);
tsa->pos.blockn = nts_page_blockn(tsa->page);
tsa->pos.slotn = nts_page_slot_cnt(tsa->page);
}
ntuplestore_acc_advance_in_page(tsa, &n);
}
return true;
}
static bool ntuplestore_acc_current_data_internal(NTupleStoreAccessor *tsa, void **data, int *len)
{
if(!tsa->page || tsa->pos.slotn == -1)
return false;
*data = (void *) nts_page_slot_data_ptr(tsa->page, tsa->pos.slotn);
*len = nts_page_slot_data_len(tsa->page, tsa->pos.slotn);
return true;
}
bool ntuplestore_acc_current_tupleslot(NTupleStoreAccessor *tsa, TupleTableSlot *slot)
{
MemTuple tuple = NULL;
int len = 0;
bool fOK = ntuplestore_acc_current_data_internal(tsa, (void **) &tuple, &len);
if(!fOK)
{
ExecClearTuple(slot);
return false;
}
if(len < 0)
{
NTupleStoreLobRef *plobref = (NTupleStoreLobRef *) tuple;
Assert(len == -(int)sizeof(NTupleStoreLobRef));
tuple = (MemTuple) palloc(plobref->size);
len = ntuplestore_get_lob(tsa->store, tuple, plobref);
Assert(len == plobref->size);
ExecStoreMemTuple(tuple, slot, true);
return true;
}
ExecStoreMemTuple(tuple, slot, false);
return true;
}
bool ntuplestore_acc_current_data(NTupleStoreAccessor *tsa, void **data, int *len)
{
bool fOK = ntuplestore_acc_current_data_internal(tsa, (void **) data, len);
if(!fOK)
{
return false;
}
if(*len < 0)
{
NTupleStoreLobRef *plobref = (NTupleStoreLobRef *) (*data);
Assert(*len == -(int)sizeof(NTupleStoreLobRef));
if (tsa->tmp_len < plobref->size)
{
if (tsa->tmp_lob)
pfree(tsa->tmp_lob);
tsa->tmp_lob = palloc(plobref->size);
tsa->tmp_len = plobref->size;
}
*data = tsa->tmp_lob;
*len = ntuplestore_get_lob(tsa->store, *data, plobref);
Assert(*len == plobref->size);
return true;
}
return true;
}
bool ntuplestore_acc_tell(NTupleStoreAccessor *tsa, NTupleStorePos *pos)
{
AssertImply(tsa->pos.blockn==-1, tsa->pos.slotn==-1);
if (pos != NULL)
{
pos->blockn = -1;
pos->slotn = -1;
}
if(!tsa->page || tsa->pos.slotn == -1)
return false;
Assert(tsa->pos.blockn == nts_page_blockn(tsa->page));
if(tsa->pos.slotn >= nts_page_slot_cnt(tsa->page)
|| tsa->pos.slotn < nts_page_first_valid_slotn(tsa->page))
return false;
if(pos)
{
pos->blockn = tsa->pos.blockn;
pos->slotn = tsa->pos.slotn;
}
return true;
}
bool ntuplestore_acc_seek(NTupleStoreAccessor *tsa, NTupleStorePos *pos)
{
Assert(tsa && pos);
if(pos->slotn < 0)
{
Assert(pos->slotn == -1);
return false;
}
/* first seek page */
if(!tsa->page || nts_page_blockn(tsa->page) != pos->blockn)
{
if(tsa->page)
nts_unpin_page(tsa->store, tsa->page);
tsa->page = nts_load_page(tsa->store, pos->blockn);
if(!tsa->page)
return false;
nts_pin_page(tsa->store, tsa->page);
}
Assert(tsa->page && nts_page_blockn(tsa->page) == pos->blockn);
Assert(nts_page_slot_cnt(tsa->page) > pos->slotn && nts_page_first_valid_slotn(tsa->page) <= pos->slotn);
tsa->pos.blockn = pos->blockn;
tsa->pos.slotn = pos->slotn;
return true;
}
bool ntuplestore_acc_seek_first(NTupleStoreAccessor *tsa)
{
ntuplestore_acc_seek_bof(tsa);
return ntuplestore_acc_advance(tsa, 1);
}
bool ntuplestore_acc_seek_last(NTupleStoreAccessor *tsa)
{
ntuplestore_acc_seek_eof(tsa);
return ntuplestore_acc_advance(tsa, -1);
}
void ntuplestore_acc_set_invalid(NTupleStoreAccessor *tsa)
{
Assert(tsa);
if(tsa->page)
nts_unpin_page(tsa->store, tsa->page);
tsa->page = NULL;
tsa->pos.blockn = -1;
tsa->pos.slotn = -1;
}
void ntuplestore_acc_seek_bof(NTupleStoreAccessor *tsa)
{
Assert(tsa && tsa->store && tsa->store->first_page);
if(tsa->page)
nts_unpin_page(tsa->store, tsa->page);
tsa->page = tsa->store->first_page;
nts_pin_page(tsa->store, tsa->page);
tsa->pos.blockn = nts_page_blockn(tsa->page);
tsa->pos.slotn = nts_page_first_valid_slotn(tsa->page) - 1;
}
void ntuplestore_acc_seek_eof(NTupleStoreAccessor *tsa)
{
Assert(tsa && tsa->store && tsa->store->last_page);
if(tsa->page)
nts_unpin_page(tsa->store, tsa->page);
tsa->page = tsa->store->last_page;
nts_pin_page(tsa->store, tsa->page);
tsa->pos.blockn = nts_page_blockn(tsa->page);
tsa->pos.slotn = nts_page_slot_cnt(tsa->page);
}
int ntuplestore_count_slot(NTupleStore *nts, NTupleStorePos *pos1, NTupleStorePos *pos2)
{
NTupleStoreAccessor *tsa1 = ntuplestore_create_accessor(nts, false);
NTupleStoreAccessor *tsa2 = ntuplestore_create_accessor(nts, false);
bool fOK;
int ret;
fOK = ntuplestore_acc_seek(tsa1, pos1);
if(!fOK)
return -1;
fOK = ntuplestore_acc_seek(tsa2, pos2);
if(!fOK)
return -1;
ret = ntuplestore_count_slot_acc(nts, tsa1, tsa2);
ntuplestore_destroy_accessor(tsa1);
ntuplestore_destroy_accessor(tsa2);
return ret;
}
int ntuplestore_count_slot_acc(NTupleStore *nts, NTupleStoreAccessor *tsa1, NTupleStoreAccessor *tsa2)
{
int ret = 0;
bool fOK;
NTupleStorePos oldpos1 =
{
0, /* blockn */
0, /* slotn */
};
fOK = ntuplestore_acc_tell(tsa1, &oldpos1);
if(!fOK)
return -1;
fOK = ntuplestore_acc_tell(tsa2, NULL);
if(!fOK)
return -1;
if(ntuplestore_compare_pos(nts, &tsa1->pos, &tsa2->pos) > 0)
return -1;
while(tsa1->pos.blockn < tsa2->pos.blockn)
{
int tmp = 1000000; /* Larger than possible slots can be hold in one page */
NTupleStorePage *oldpage = tsa1->page;
ntuplestore_acc_advance_in_page(tsa1, &tmp);
ret += 1000000 - tmp;
tsa1->page = nts_load_next_page(nts, tsa1->page);
Assert(tsa1->page);
nts_unpin_page(nts, oldpage);
nts_pin_page(nts, tsa1->page);
tsa1->pos.blockn = nts_page_blockn(tsa1->page);
tsa1->pos.slotn = -1;
}
Assert(tsa1->pos.blockn == tsa2->pos.blockn);
ret += tsa2->pos.slotn - tsa1->pos.slotn;
fOK = ntuplestore_acc_seek(tsa1, &oldpos1);
Assert(fOK);
return ret;
}
/*
* Check if the tuple pointed by tsa1 is before the tuple pointed by tsa2.
*
* This function assumes that both tsa1 and tsa2 are pointing to a valid position.
*/
bool ntuplestore_acc_is_before(NTupleStoreAccessor *tsa1, NTupleStoreAccessor *tsa2)
{
Assert(ntuplestore_acc_tell(tsa1, NULL) &&
ntuplestore_acc_tell(tsa2, NULL));
if (tsa1->pos.blockn < tsa2->pos.blockn)
return true;
if (tsa1->pos.blockn > tsa2->pos.blockn)
return false;
if (tsa1->pos.slotn < tsa2->pos.slotn)
return true;
else
return false;
}
/*
* Use the associated workfile set to create spill files for this tuplestore.
*/
static void
ntuplestore_create_spill_files(NTupleStore *nts)
{
Assert(nts->work_set != NULL);
Assert(!nts->cached_workfiles_found);
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
nts->pfile = workfile_mgr_create_fileno(nts->work_set, WORKFILE_NUM_TUPLESTORE_DATA);
nts->plobfile = workfile_mgr_create_fileno(nts->work_set, WORKFILE_NUM_TUPLESTORE_LOB);
MemoryContextSwitchTo(oldcxt);
}
/*
* Returns true if this tuplestore created workfiles that can potentially be
* reused by other queries.
*/
bool
ntuplestore_created_reusable_workfiles(NTupleStore *nts)
{
Assert(nts);
return gp_workfile_caching && nts->workfiles_created && nts->work_set && nts->work_set->can_be_reused;
}
/*
* Mark the associated workfile set as complete, allowing it to be cached for reuse.
*/
void
ntuplestore_mark_workset_complete(NTupleStore *nts)
{
Assert(nts != NULL);
if (nts->work_set == NULL)
{
return;
}
if (nts->workfiles_created)
{
elog(gp_workfile_caching_loglevel, "Tuplestore: Marking workset as complete");
workfile_mgr_mark_complete(nts->work_set);
}
}
/* EOF */