blob: cb303be38f7403bd028033fdb64177c48d8e8f51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* nodeShareInputScan.c
*
* Portions Copyright (c) 2007-2008, Greenplum inc
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUNTINES
* ExecCountSlotsShareInputScan
* ExecInitShareInputScan
* ExecShareInputScan
* ExecEndShareInputScan
* ExecShareInputMarkPosScan
* ExecShareInputRestrPosScan
* ExecShareInputReScanScanv
*/
#include "postgres.h"
#include "access/xact.h"
#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeShareInputScan.h"
#include "utils/tuplestorenew.h"
#include "miscadmin.h"
#include "utils/debugbreak.h"
#include "utils/tuplesort.h"
#include "postmaster/primary_mirror_mode.h"
typedef struct ShareInput_Lk_Context
{
int readyfd;
int donefd;
int zcnt;
bool del_ready;
bool del_done;
char lkname_ready[MAXPGPATH];
char lkname_done[MAXPGPATH];
} ShareInput_Lk_Context;
static TupleTableSlot *ShareInputNext(ShareInputScanState *node);
static void writer_wait_for_acks(ShareInput_Lk_Context *pctxt, int share_id, int xslice);
/* ------------------------------------------------------------------
* ExecShareInputScan
* ------------------------------------------------------------------
*/
TupleTableSlot *ExecShareInputScan(ShareInputScanState *node)
{
return ExecScan(&node->ss, (ExecScanAccessMtd) ShareInputNext);
}
/*
* init_tuplestore_state
* Initialize the tuplestore state for the Shared node if the state
* is not initialized.
*/
static void
init_tuplestore_state(ShareInputScanState *node)
{
Assert(node->ts_state == NULL);
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *)node->ss.ps.plan;
ShareNodeEntry *snEntry = ExecGetShareNodeEntry(estate, sisc->share_id, false);
PlanState *snState = NULL;
ShareType share_type = sisc->share_type;
if(snEntry)
{
snState = (PlanState *) snEntry->shareState;
if(snState)
{
ExecProcNode(snState);
}
else
{
Assert(share_type == SHARE_MATERIAL_XSLICE || share_type == SHARE_SORT_XSLICE);
}
}
if(share_type == SHARE_MATERIAL_XSLICE)
{
char rwfile_prefix[100];
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
node->ts_state = palloc0(sizeof(GenericTupStore));
node->ts_state->matstore = ntuplestore_create_readerwriter(rwfile_prefix, 0, false);
node->ts_pos = (void *) ntuplestore_create_accessor(node->ts_state->matstore, false);
ntuplestore_acc_seek_bof((NTupleStoreAccessor *)node->ts_pos);
}
else if(share_type == SHARE_MATERIAL)
{
/* The materialstate->ts_state structure should have been initialized already, during init of material node */
node->ts_state = ((MaterialState *)snState)->ts_state;
Assert(NULL != node->ts_state->matstore);
node->ts_pos = (void *) ntuplestore_create_accessor(node->ts_state->matstore, false);
ntuplestore_acc_seek_bof((NTupleStoreAccessor *)node->ts_pos);
}
else if(share_type == SHARE_SORT_XSLICE)
{
char rwfile_prefix[100];
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id);
node->ts_state = palloc0(sizeof(GenericTupStore));
if(gp_enable_mk_sort)
{
node->ts_state->sortstore_mk = tuplesort_begin_heap_file_readerwriter_mk(
& node->ss,
rwfile_prefix, false,
NULL, 0, NULL, NULL, PlanStateOperatorMemKB((PlanState *) node), true);
tuplesort_begin_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk **)(&node->ts_pos));
tuplesort_rescan_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk *)node->ts_pos);
}
else
{
node->ts_state->sortstore = tuplesort_begin_heap_file_readerwriter(
rwfile_prefix, false,
NULL, 0, NULL, NULL, PlanStateOperatorMemKB((PlanState *) node), true);
tuplesort_begin_pos(node->ts_state->sortstore, (TuplesortPos **)(&node->ts_pos));
tuplesort_rescan_pos(node->ts_state->sortstore, (TuplesortPos *)node->ts_pos);
}
}
else
{
Assert(sisc->share_type == SHARE_SORT);
Assert(snState != NULL);
if(gp_enable_mk_sort)
{
node->ts_state = ((SortState *)snState)->tuplesortstate;
Assert(NULL != node->ts_state->sortstore_mk);
tuplesort_begin_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk **)(&node->ts_pos));
tuplesort_rescan_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk *)node->ts_pos);
}
else
{
node->ts_state = ((SortState *)snState)->tuplesortstate;
Assert(NULL != node->ts_state->sortstore);
tuplesort_begin_pos(node->ts_state->sortstore, (TuplesortPos **)(&node->ts_pos));
tuplesort_rescan_pos(node->ts_state->sortstore, (TuplesortPos *)node->ts_pos);
}
}
Assert(NULL != node->ts_state);
Assert(NULL != node->ts_state->matstore || NULL != node->ts_state->sortstore || NULL != node->ts_state->sortstore_mk);
}
/* ------------------------------------------------------------------
* ShareInputNext
* Retrieve a tuple from the ShareInputScan
* ------------------------------------------------------------------
*/
TupleTableSlot *
ShareInputNext(ShareInputScanState *node)
{
EState *estate;
ScanDirection dir;
bool forward;
TupleTableSlot *slot;
ShareInputScan * sisc = (ShareInputScan *) node->ss.ps.plan;
ShareType share_type = sisc->share_type;
/*
* get state info from node
*/
estate = node->ss.ps.state;
dir = estate->es_direction;
forward = ScanDirectionIsForward(dir);
/* if first time call, need to initialize the tuplestore state. */
if(node->ts_state == NULL)
{
elog(DEBUG1, "SISC (shareid=%d, slice=%d): No tuplestore yet, initializing tuplestore",
sisc->share_id, currentSliceId);
init_tuplestore_state(node);
}
slot = node->ss.ps.ps_ResultTupleSlot;
while(1)
{
bool gotOK = false;
if(share_type == SHARE_MATERIAL || share_type == SHARE_MATERIAL_XSLICE)
{
ntuplestore_acc_advance((NTupleStoreAccessor *) node->ts_pos, forward ? 1 : -1);
gotOK = ntuplestore_acc_current_tupleslot((NTupleStoreAccessor *) node->ts_pos, slot);
}
else
{
if(gp_enable_mk_sort)
{
gotOK = tuplesort_gettupleslot_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk *)node->ts_pos, forward, slot);
}
else
{
gotOK = tuplesort_gettupleslot_pos(node->ts_state->sortstore, (TuplesortPos *)node->ts_pos, forward, slot);
}
}
if(!gotOK)
return NULL;
Gpmon_M_Incr_Rows_Out(GpmonPktFromShareInputState(node));
CheckSendPlanStateGpmonPkt(&node->ss.ps);
return slot;
}
Assert(!"should not be here");
return NULL;
}
/* ------------------------------------------------------------------
* ExecInitShareInputScan
* ------------------------------------------------------------------
*/
ShareInputScanState *
ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags)
{
ShareInputScanState *sisstate;
Plan *outerPlan;
TupleDesc tupDesc;
Assert(innerPlan(node) == NULL);
/* create state data structure */
sisstate = makeNode(ShareInputScanState);
sisstate->ss.ps.plan = (Plan *) node;
sisstate->ss.ps.state = estate;
sisstate->ts_state = NULL;
sisstate->ts_pos = NULL;
sisstate->ts_markpos = NULL;
sisstate->share_lk_ctxt = NULL;
sisstate->freed = false;
/*
* init child node.
* if outerPlan is NULL, this is no-op (so that the ShareInput node will be
* only init-ed once).
*/
outerPlan = outerPlan(node);
outerPlanState(sisstate) = ExecInitNode(outerPlan, estate, eflags);
sisstate->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist, (PlanState *) sisstate);
Assert(node->plan.qual == NULL);
sisstate->ss.ps.qual = NULL;
/* Misc initialization
*
* Create expression context
*/
ExecAssignExprContext(estate, &sisstate->ss.ps);
/* tuple table init */
ExecInitResultTupleSlot(estate, &sisstate->ss.ps);
sisstate->ss.ss_ScanTupleSlot = ExecInitExtraTupleSlot(estate);
/*
* init tuple type.
*/
ExecAssignResultTypeFromTL(&sisstate->ss.ps);
{
bool hasoid;
if (!ExecContextForcesOids(&sisstate->ss.ps, &hasoid))
hasoid = false;
tupDesc = ExecTypeFromTL(node->plan.targetlist, hasoid);
}
ExecAssignScanType(&sisstate->ss, tupDesc);
sisstate->ss.ps.ps_ProjInfo = NULL;
/*
* If this is an intra-slice share node, increment reference count to
* tell the underlying node not to be freed before this node is ready to
* be freed. fCreate flag to ExecGetShareNodeEntry is true because
* at this point we don't have the entry which will be initialized in
* the underlying node initialization later.
*/
if (node->share_type == SHARE_MATERIAL || node->share_type == SHARE_SORT)
{
ShareNodeEntry *snEntry = ExecGetShareNodeEntry(estate, node->share_id, true);
snEntry->refcount++;
}
initGpmonPktForShareInputScan((Plan *)node, &sisstate->ss.ps.gpmon_pkt, estate);
return sisstate;
}
void
ExecSliceDependencyShareInputScan(ShareInputScanState *node)
{
ShareInputScan * sisc = (ShareInputScan *) node->ss.ps.plan;
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): exec dependency on slice %d, driver_slice is %d",
sisc->share_id, currentSliceId,
currentSliceId, sisc->driver_slice);
EState *estate = node->ss.ps.state;
if(sisc->driver_slice >= 0 && sisc->driver_slice == currentSliceId)
{
node->share_lk_ctxt = shareinput_reader_waitready(sisc->share_id,
estate->es_plannedstmt->planGen);
}
}
/* ------------------------------------------------------------------
* ExecCountSlotsShareInputScan
* ------------------------------------------------------------------
*/
int
ExecCountSlotsShareInputScan(ShareInputScan* node)
{
#define SHAREINPUT_NSLOTS 2
return ExecCountSlotsNode(outerPlan((Plan *) node))
+ SHAREINPUT_NSLOTS;
}
/* ------------------------------------------------------------------
* ExecEndShareInputScan
* ------------------------------------------------------------------
*/
void ExecEndShareInputScan(ShareInputScanState *node)
{
/* clean up tuple table */
ExecClearTuple(node->ss.ss_ScanTupleSlot);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ShareInputScan * sisc = (ShareInputScan *) node->ss.ps.plan;
if(node->share_lk_ctxt)
shareinput_reader_notifydone(node->share_lk_ctxt, sisc->share_id);
ExecEagerFreeShareInputScan(node);
/*
* shutdown subplan. First scanner of underlying share input will
* do the shutdown, all other scanners are no-op because outerPlanState
* is NULL
*/
ExecEndNode(outerPlanState(node));
EndPlanStateGpmonPkt(&node->ss.ps);
}
/* ------------------------------------------------------------------
* ExecShareInputMarkPosScan
* ------------------------------------------------------------------
*/
void ExecShareInputScanMarkPos(ShareInputScanState *node)
{
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
Assert(NULL != node->ts_state);
Assert(NULL != node->ts_state->matstore || NULL != node->ts_state->sortstore || NULL != node->ts_state->sortstore_mk);
if(sisc->share_type == SHARE_MATERIAL || sisc->share_type == SHARE_MATERIAL_XSLICE)
{
Assert(node->ts_pos);
if(node->ts_markpos == NULL)
{
node->ts_markpos = palloc(sizeof(NTupleStorePos));
}
ntuplestore_acc_tell((NTupleStoreAccessor *)node->ts_pos, (NTupleStorePos *) node->ts_markpos);
}
else if(sisc->share_type == SHARE_SORT || sisc->share_type == SHARE_SORT_XSLICE)
{
if(gp_enable_mk_sort)
{
tuplesort_markpos_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk *) node->ts_pos);
}
else
{
tuplesort_markpos_pos(node->ts_state->sortstore, (TuplesortPos *) node->ts_pos);
}
}
else
Assert(!"ExecShareInputScanMarkPos: invalid share type");
}
/* ------------------------------------------------------------------
* ExecShareInputRestrPosScan
* ------------------------------------------------------------------
*/
void ExecShareInputScanRestrPos(ShareInputScanState *node)
{
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
Assert(NULL != node->ts_state);
Assert(NULL != node->ts_state->matstore || NULL != node->ts_state->sortstore || NULL != node->ts_state->sortstore_mk);
if(sisc->share_type == SHARE_MATERIAL || sisc->share_type == SHARE_MATERIAL_XSLICE)
{
Assert(node->ts_pos && node->ts_markpos);
ntuplestore_acc_seek((NTupleStoreAccessor *) node->ts_pos, (NTupleStorePos *) node->ts_markpos);
}
else if(sisc->share_type == SHARE_SORT || sisc->share_type == SHARE_SORT_XSLICE)
{
if(gp_enable_mk_sort)
{
tuplesort_restorepos_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk *) node->ts_pos);
}
else
{
tuplesort_restorepos_pos(node->ts_state->sortstore, (TuplesortPos *) node->ts_pos);
}
}
else
Assert(!"ExecShareInputScanRestrPos: invalid share type");
Gpmon_M_Incr(GpmonPktFromShareInputState(node), GPMON_SHAREINPUT_RESTOREPOS);
CheckSendPlanStateGpmonPkt(&node->ss.ps);
}
/* ------------------------------------------------------------------
* ExecShareInputScanReScan
* ------------------------------------------------------------------
*/
void ExecShareInputScanReScan(ShareInputScanState *node, ExprContext *exprCtxt)
{
/* if first time call, need to initialize the tuplestore state */
if(node->ts_state == NULL)
{
init_tuplestore_state(node);
}
ShareInputScan *sisc = (ShareInputScan *) node->ss.ps.plan;
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
Assert(NULL != node->ts_pos);
if(sisc->share_type == SHARE_MATERIAL || sisc->share_type == SHARE_MATERIAL_XSLICE)
{
Assert(NULL != node->ts_state->matstore);
ntuplestore_acc_seek_bof((NTupleStoreAccessor *) node->ts_pos);
}
else if (sisc->share_type == SHARE_SORT || sisc->share_type == SHARE_SORT_XSLICE)
{
if(gp_enable_mk_sort)
{
Assert(NULL != node->ts_state->sortstore_mk);
tuplesort_rescan_pos_mk(node->ts_state->sortstore_mk, (TuplesortPos_mk *) node->ts_pos);
}
else
{
Assert(NULL != node->ts_state->sortstore);
tuplesort_rescan_pos(node->ts_state->sortstore, (TuplesortPos *) node->ts_pos);
}
}
else
{
Assert(!"ExecShareInputScanReScan: invalid share type ");
}
Gpmon_M_Incr(GpmonPktFromShareInputState(node), GPMON_SHAREINPUT_RESCAN);
CheckSendPlanStateGpmonPkt(&node->ss.ps);
}
/*************************************************************************
* XXX
* we need some IPC mechanism for shareinptu_read_wait/writer_notify. Semaphore is
* the first thing come to mind but it turns out postgres is very picky about
* how to use semaphore and we do not want to mess up with it.
*
* Here we used FIFO (named pipe). mkfifo is Posix.1 and should be available on any
* reasonable Unix like system.
*
* When we open fifo, we open it with O_RDWR. So the fifo has both reader and writer.
* That also means, for write, it will not block, but reader will until writer writes
* something.
*
* At first, I used postgres File to manage the FIFO. It turns out this is not
* correct because when postgres run out of file descriptors, it will try to close
* some file descriptors using an LRU algorithm. Later when the File is used again,
* postgres will reopen it. The FIFO here is used for synchronization so it is simply
* wrong. Here we use the file descriptor directly, and use a XCallBack to cleanup
* the resource at the end of transaction (commit or abort).
*
* XXX However, it is always better to have this kind of stuff abstracted out
* by the system.
**************************************************************************/
#include "fcntl.h"
#include "unistd.h"
#include "sys/types.h"
#include "sys/stat.h"
#include "storage/fd.h"
#include "cdb/cdbselect.h"
void shareinput_create_bufname_prefix(char* p, int size, int share_id)
{
snprintf(p, size, "%s_SIRW_%d_%d_%d_%d",
PG_TEMP_FILE_PREFIX,
GetQEIndex(), gp_session_id, gp_command_count, share_id);
}
/* Here we use the absolute path name as the lock name. See fd.c
* for how the name is created (GP_TEMP_FILE_DIR and make_database_relative).
*/
static void sisc_lockname(char* p, int size, int share_id, const char* name)
{
if (snprintf(p, size,
"%s/%s/%s_gpcdb2.sisc_%d_%d_%d_%d_%s",
getCurrentTempFilePath, PG_TEMP_FILES_DIR, PG_TEMP_FILE_PREFIX,
GetQEIndex(), gp_session_id, gp_command_count, share_id, name
) > size)
{
ereport(ERROR, (errmsg("cannot generate path %s/%s/%s_gpcdb2.sisc_%d_%d_%d_%d_%s",
getCurrentTempFilePath, PG_TEMP_FILES_DIR, PG_TEMP_FILE_PREFIX,
GetQEIndex(), gp_session_id, gp_command_count, share_id, name)));
}
}
static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
{
int err;
elog(DEBUG1, "shareinput_clean_lk_ctxt cleanup lk ctxt %p", lk_ctxt);
if(lk_ctxt->readyfd >= 0)
{
err = gp_retry_close(lk_ctxt->readyfd);
insist_log(!err, "shareinput_clean_lk_ctxt cannot close readyfd: %m");
lk_ctxt->readyfd = -1;
}
if(lk_ctxt->donefd >= 0)
{
err = gp_retry_close(lk_ctxt->donefd);
insist_log(!err, "shareinput_clean_lk_ctxt cannot close donefd: %m");
lk_ctxt->donefd = -1;
}
if(lk_ctxt->del_ready && lk_ctxt->lkname_ready[0])
{
err = unlink(lk_ctxt->lkname_ready);
insist_log(!err, "shareinput_clean_lk_ctxt cannot unlink \"%s\": %m", lk_ctxt->lkname_ready);
lk_ctxt->del_ready = false;
}
if(lk_ctxt->del_done && lk_ctxt->lkname_done[0])
{
err = unlink(lk_ctxt->lkname_done);
insist_log(!err, "shareinput_clean_lk_ctxt cannot unline \"%s\": %m", lk_ctxt->lkname_done);
lk_ctxt->del_done = false;
}
gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context));
}
static void XCallBack_ShareInput_FIFO(XactEvent ev, void* vp)
{
ShareInput_Lk_Context *lk_ctxt = (ShareInput_Lk_Context *) vp;
shareinput_clean_lk_ctxt(lk_ctxt);
}
static void create_tmp_fifo(const char *fifoname)
{
#ifdef WIN32
elog(ERROR, "mkfifo not supported on win32");
#else
int err = mkfifo(fifoname, 0600);
if(err < 0)
{
/* first try may be due to pgsql_tmp dir is not created yet. */
char tmpdir[MAXPGPATH];
if (snprintf(tmpdir, MAXPGPATH, "%s/%s", getCurrentTempFilePath, PG_TEMP_FILES_DIR) > MAXPGPATH)
{
ereport(ERROR, (errmsg("cannot create dir path %s/%s", getCurrentTempFilePath, PG_TEMP_FILES_DIR)));
}
mkdir(tmpdir, S_IRWXU);
/* then try it again */
err = mkfifo(fifoname, 0600);
if(err < 0 && errno != EEXIST)
elog(ERROR, "could not create temporary fifo \"%s\": %m", fifoname);
}
#endif
}
/*
* As all other read/write in postgres, we may be interrupted so retry is needed.
*/
static int retry_read(int fd, char *buf, int rsize)
{
int sz;
Assert(rsize > 0);
read_retry:
sz = read(fd, buf, rsize);
if (sz > 0)
return sz;
else if(sz == 0 || errno == EINTR)
goto read_retry;
else
elog(ERROR, "could not read from fifo: %m");
Assert(!"Never be here");
return 0;
}
static int retry_write(int fd, char *buf, int wsize)
{
int sz;
Assert(wsize > 0);
write_retry:
sz = write(fd, buf, wsize);
if(sz > 0)
return sz;
else if(sz == 0 || errno == EINTR)
goto write_retry;
else
elog(ERROR, "could not write to fifo: %m");
Assert(!"Never be here");
return 0;
}
/*
* Readiness (a) synchronization.
*
* For readiness, the shared node will write xslice of 'a' into the pipe.
* For each share, there is just one ready writer. Once sharer starts write
* it need to write all xslice copies of 'a', even if we are interrupted, that
* is, we should not call CHECK_FOR_INTERRUPTS.
*
* For sharer, it need to check for ready to read (using select), because read
* is blocking. Otherwise if shared is cancelled before write, then we will be
* blocked here forever. Once shared has write at least one 'a', it will write
* all xslice of 'a', so once select succeed, read will eventually succeed. Once
* sharer got 'a', it write 'b' back to shared.
*
* Done (b and z) synchronization.
* For done, the shared is the only reader. sharer will not block for writing,
* but shared may block for read, therefore, we much call select before shared
* calling read. Because there is only one shared, nobody can steal char from
* the pipe, therefore, if select succeed, read will not block forever.
*
* One thing to note is that some 'z' may comeback before all 'b' come back.
* So, need to handle this in notifyready.
*
* For optimizer-generated plans, we skip the 'b' synchronization. The writer
* does not wait for readers to acknowledge the "ready" handshake anymore, as
* that can cause deadlocks (OPT-2690).
*/
/*
* shareinput_reader_waitready
*
* Called by the reader (consumer) to wait for the writer (producer) to produce
* all the tuples and write them to disk.
*
* This is a blocking operation.
*/
void *
shareinput_reader_waitready(int share_id, PlanGenerator planGen)
{
mpp_fd_set rset;
struct timeval tval;
int n;
char a;
ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));
if(!pctxt)
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("Share input reader failed: out of memory")));
pctxt->readyfd = -1;
pctxt->donefd = -1;
pctxt->zcnt = 0;
pctxt->del_ready = false;
pctxt->del_done = false;
pctxt->lkname_ready[0] = '\0';
pctxt->lkname_done[0] = '\0';
RegisterXactCallbackOnce(XCallBack_ShareInput_FIFO, pctxt);
sisc_lockname(pctxt->lkname_ready, MAXPGPATH, share_id, "ready");
create_tmp_fifo(pctxt->lkname_ready);
pctxt->readyfd = open(pctxt->lkname_ready, O_RDWR, 0600);
if(pctxt->readyfd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_ready);
sisc_lockname(pctxt->lkname_done, MAXPGPATH, share_id, "done");
create_tmp_fifo(pctxt->lkname_done);
pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
if(pctxt->donefd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
while(1)
{
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&rset);
MPP_FD_SET(pctxt->readyfd, &rset);
tval.tv_sec = 1;
tval.tv_usec = 0;
n = select(pctxt->readyfd+1, (fd_set *) &rset, NULL, NULL, &tval);
if(n==1)
{
#if USE_ASSERT_CHECKING
int rwsize =
#endif
retry_read(pctxt->readyfd, &a, 1);
Assert(rwsize == 1 && a == 'a');
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
share_id, currentSliceId);
if (planGen == PLANGEN_PLANNER)
{
/* For planner-generated plans, we send ack back after receiving the handshake */
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
share_id, currentSliceId);
#if USE_ASSERT_CHECKING
rwsize =
#endif
retry_write(pctxt->donefd, "b", 1);
Assert(rwsize == 1);
}
break;
}
else if(n==0)
{
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
share_id, currentSliceId);
}
else
{
int save_errno = errno;
elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ",
share_id, currentSliceId, save_errno);
}
}
return (void *) pctxt;
}
/*
* shareinput_writer_notifyready
*
* Called by the writer (producer) once it is done producing all tuples and
* writing them to disk. It notifies all the readers (consumers) that tuples
* are ready to be read from disk.
*
* For planner-generated plans we wait for acks from all the readers before
* proceedings. It is a blocking operation.
*
* For optimizer-generated plans we don't wait for acks, we proceed immediately.
* It is a non-blocking operation.
*/
void *
shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen)
{
int n;
ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));
if(!pctxt)
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("Shareinput Writer failed: out of memory")));
pctxt->readyfd = -1;
pctxt->donefd = -1;
pctxt->zcnt = 0;
pctxt->del_ready = false;
pctxt->del_done = false;
pctxt->lkname_ready[0] = '\0';
pctxt->lkname_done[0] = '\0';
RegisterXactCallbackOnce(XCallBack_ShareInput_FIFO, pctxt);
sisc_lockname(pctxt->lkname_ready, MAXPGPATH, share_id, "ready");
create_tmp_fifo(pctxt->lkname_ready);
pctxt->del_ready = true;
pctxt->readyfd = open(pctxt->lkname_ready, O_RDWR, 0600);
if(pctxt->readyfd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_ready);
sisc_lockname(pctxt->lkname_done, MAXPGPATH, share_id, "done");
create_tmp_fifo(pctxt->lkname_done);
pctxt->del_done = true;
pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
if(pctxt->donefd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
for(n=0; n<xslice; ++n)
{
#if USE_ASSERT_CHECKING
int rwsize =
#endif
retry_write(pctxt->readyfd, "a", 1);
Assert(rwsize == 1);
}
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): wrote notify_ready to %d xslice readers",
share_id, currentSliceId, xslice);
if (planGen == PLANGEN_PLANNER)
{
/* For planner-generated plans, we wait for acks from all the readers */
writer_wait_for_acks(pctxt, share_id, xslice);
}
return (void *) pctxt;
}
/*
* writer_wait_for_acks
*
* After sending the handshake to all the reader, the writer waits for acks
* from all the readers.
*
* This is a blocking operation.
*/
static void
writer_wait_for_acks(ShareInput_Lk_Context *pctxt, int share_id, int xslice)
{
int ack_needed = xslice;
mpp_fd_set rset;
struct timeval tval;
char b;
while(ack_needed > 0)
{
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&rset);
MPP_FD_SET(pctxt->donefd, &rset);
tval.tv_sec = 1;
tval.tv_usec = 0;
int numReady = select(pctxt->donefd+1, (fd_set *) &rset, NULL, NULL, &tval);
if(numReady==1)
{
#if USE_ASSERT_CHECKING
int rwsize =
#endif
retry_read(pctxt->donefd, &b, 1);
Assert(rwsize == 1);
if(b == 'z')
{
++pctxt->zcnt;
}
else
{
Assert(b == 'b');
--ack_needed;
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): notify ready succeed 1, xslice remaining %d",
share_id, currentSliceId, ack_needed);
}
}
else if(numReady==0)
{
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): Notify ready time out once ... ",
share_id, currentSliceId);
}
else
{
int save_errno = errno;
elog(LOG, "SISC WRITER (shareid=%d, slice=%d): notify still wait for an answer, errno %d",
share_id, currentSliceId, save_errno);
}
}
}
/*
* shareinput_reader_notifydone
*
* Called by the reader (consumer) to notify the writer (producer) that
* it is done reading tuples from disk.
*
* This is a non-blocking operation.
*/
void
shareinput_reader_notifydone(void *ctxt, int share_id)
{
ShareInput_Lk_Context *pctxt = (ShareInput_Lk_Context *) ctxt;
#if USE_ASSERT_CHECKING
int rwsize =
#endif
retry_write(pctxt->donefd, "z", 1);
Assert(rwsize == 1);
UnregisterXactCallbackOnce(XCallBack_ShareInput_FIFO, (void *) ctxt);
shareinput_clean_lk_ctxt(pctxt);
}
/*
* shareinput_writer_waitdone
*
* Called by the writer (producer) to wait for the "done" notfication from
* all readers (consumers).
*
* This is a blocking operation.
*/
void
shareinput_writer_waitdone(void *ctxt, int share_id, int nsharer_xslice)
{
ShareInput_Lk_Context *pctxt = (ShareInput_Lk_Context *) ctxt;
mpp_fd_set rset;
struct timeval tval;
int numReady;
char z;
int ack_needed = nsharer_xslice - pctxt->zcnt;
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): waiting for DONE message from %d readers",
share_id, currentSliceId, ack_needed);
while(ack_needed > 0)
{
CHECK_FOR_INTERRUPTS();
/*
* Writer won't wait for data reading done notification from readers if transaction is
* aborting. Readers may fail to send data reading done notification to writer in two
* cases:
*
* 1. The transaction is aborted due to interrupts or exceptions, i.e., user cancels
* query, division by zero on some segment
*
* 2. Logic errors in reader which incur its unexpected exit, i.e., segmentation fault
*/
if (IsAbortInProgress())
{
break;
}
MPP_FD_ZERO(&rset);
MPP_FD_SET(pctxt->donefd, &rset);
tval.tv_sec = 1;
tval.tv_usec = 0;
numReady = select(pctxt->donefd+1, (fd_set *) &rset, NULL, NULL, &tval);
if(numReady==1)
{
#if USE_ASSERT_CHECKING
int rwsize =
#endif
retry_read(pctxt->donefd, &z, 1);
Assert(rwsize == 1 && z == 'z');
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): wait done get 1 notification",
share_id, currentSliceId);
--ack_needed;
}
else if(numReady==0)
{
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): wait done timeout once",
share_id, currentSliceId);
}
else
{
int save_errno = errno;
elog(LOG, "SISC WRITER (shareid=%d, slice=%d): wait done time out once, errno %d",
share_id, currentSliceId, save_errno);
}
}
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): Writer received all %d reader done notifications",
share_id, currentSliceId, nsharer_xslice - pctxt->zcnt);
UnregisterXactCallbackOnce(XCallBack_ShareInput_FIFO, (void *) ctxt);
shareinput_clean_lk_ctxt(ctxt);
}
void
initGpmonPktForShareInputScan(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, ShareInputScan));
{
Assert(GPMON_SHAREINPUT_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_SharedScan,
(int64)planNode->plan_rows,
NULL);
}
}
/*
* During EagerFree ShareInputScan decrements the
* reference count in ShareNodeEntry when its intra-slice share node.
* The reference count tells the underlying Material/Sort node not to free
* too eagerly as this node still needs to read its tuples. Once this node
* is freed, the underlying node can free its content.
* We consider this reference counter only in intra-slice cases, because
* inter-slice share nodes have their own pointer to the buffer and
* there is not way to tell this reference over Motions anyway.
*/
void
ExecEagerFreeShareInputScan(ShareInputScanState *node)
{
/*
* no need to call tuplestore end. Underlying ShareInput will take
* care of releasing tuplestore resources
*/
/*
* XXX Do we need to pfree the tuplestore_state and pos?
* XXX nodeMaterial.c does not, need to find out why
*/
ShareInputScan * sisc = (ShareInputScan *) node->ss.ps.plan;
if(sisc->share_type == SHARE_MATERIAL || sisc->share_type == SHARE_MATERIAL_XSLICE)
{
if(node->ts_pos != NULL)
ntuplestore_destroy_accessor((NTupleStoreAccessor *) node->ts_pos);
if(node->ts_markpos != NULL)
pfree(node->ts_markpos);
if(NULL != node->ts_state && NULL != node->ts_state->matstore)
{
/* Check if shared X-SLICE. In that case, we can safely destroy our tuplestore */
if(ntuplestore_is_readerwriter_reader(node->ts_state->matstore))
{
ntuplestore_destroy(node->ts_state->matstore);
}
}
}
/*
* Reset our copy of the pointer to the the ts_state. The tuplestore can still be accessed by
* the other consumers, but we don't have a pointer to it anymore
*/
node->ts_state = NULL;
node->ts_pos = NULL;
node->ts_markpos = NULL;
/* This can be called more than once */
if (!node->freed &&
(sisc->share_type == SHARE_MATERIAL || sisc->share_type == SHARE_SORT))
{
/*
* Decrement reference count when it's intra-slice. We don't need
* two-pass tree descending because ShareInputScan should always appear
* before the underlying Material/Sort node.
*/
EState *estate = node->ss.ps.state;
ShareNodeEntry *snEntry = ExecGetShareNodeEntry(estate, sisc->share_id, false);
Assert(snEntry && snEntry->refcount > 0);
snEntry->refcount--;
}
node->freed = true;
}