blob: f891a5722a50dbb5d3fe0d97d8ad80f9e7df61fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* nodeExternalscan.c
* Support routines for scans of external relations (on flat files for example)
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUTINES
* ExecExternalScan sequentially scans a relation.
* ExecExternalNext retrieve next tuple in sequential order.
* ExecInitExternalScan creates and initializes a externalscan node.
* ExecEndExternalScan releases any storage allocated.
* ExecStopExternalScan closes external resources before EOD.
* ExecExternalReScan rescans the relation
*/
#include "postgres.h"
#include "fmgr.h"
#include "access/fileam.h"
#include "access/filesplit.h"
#include "access/heapam.h"
#include "access/plugstorage.h"
#include "cdb/cdbvars.h"
#include "executor/execdebug.h"
#include "executor/nodeExternalscan.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "parser/parsetree.h"
#include "optimizer/var.h"
static TupleTableSlot *ExternalNext(ExternalScanState *node);
/* ----------------------------------------------------------------
* Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExternalNext
*
* This is a workhorse for ExecExtScan
* ----------------------------------------------------------------
*/
static TupleTableSlot *
ExternalNext(ExternalScanState *node)
{
FileScanDesc scandesc;
Index scanrelid;
EState *estate = NULL;
ScanDirection direction;
TupleTableSlot *slot = NULL;
ExternalSelectDesc externalSelectDesc = NULL;
bool returnTuple = false;
/*
* get information from the estate and scan state
*/
estate = node->ss.ps.state;
scandesc = node->ess_ScanDesc;
scanrelid = ((ExternalScan *) node->ss.ps.plan)->scan.scanrelid;
direction = estate->es_direction;
slot = node->ss.ss_ScanTupleSlot;
/*
* get the next tuple from the file access methods
*/
if (scandesc->fs_formatter_type == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (scandesc->fs_formatter_type != ExternalTableType_PLUG)
{
externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
returnTuple = external_getnext(scandesc, direction, externalSelectDesc,
&(node->ss), slot);
}
else
{
Assert(scandesc->fs_formatter_name);
FmgrInfo *getnextInitFunc = scandesc->fs_ps_scan_funcs.getnext_init;
if (getnextInitFunc)
{
/*
* pg_strncasecmp(scandesc->fs_formatter_name, "orc", strlen("orc"))
* Performance improvement for string comparison.
*/
const char *formatter_name = "orc";
if (*(int *)(scandesc->fs_formatter_name) != *(int *)formatter_name)
{
externalSelectDesc =
InvokePlugStorageFormatGetNextInit(getnextInitFunc,
&(node->ss.ps),
node);
}
}
else
{
elog(ERROR, "%s_getnext_init function was not found",
scandesc->fs_formatter_name);
}
FmgrInfo *getnextFunc = scandesc->fs_ps_scan_funcs.getnext;
if (getnextFunc)
{
returnTuple = InvokePlugStorageFormatGetNext(getnextFunc,
scandesc,
direction,
externalSelectDesc,
&(node->ss),
slot);
}
else
{
elog(ERROR, "%s_getnext function was not found",
scandesc->fs_formatter_name);
}
}
/*
* save the tuple and the buffer returned to us by the access methods in
* our scan tuple slot and return the slot. Note: we pass 'false' because
* tuples returned by heap_getnext() are pointers onto disk pages and were
* not created with palloc() and so should not be pfree()'d. Note also
* that ExecStoreTuple will increment the refcount of the buffer; the
* refcount will not be dropped until the tuple table slot is cleared.
*/
if (returnTuple)
{
/*
* Perfmon is not supported any more.
*
* Gpmon_M_Incr_Rows_Out(GpmonPktFromExtScanState(node));
* CheckSendPlanStateGpmonPkt(&node->ss.ps);
*/
/*
* CDB: Label each row with a synthetic ctid if needed for subquery dedup.
*/
if (node->cdb_want_ctid &&
!TupIsNull(slot))
{
slot_set_ctid_from_fake(slot, &node->cdb_fake_ctid);
}
}
else
{
ExecClearTuple(slot);
if (!node->ss.ps.delayEagerFree)
{
ExecEagerFreeExternalScan(node);
}
}
if (externalSelectDesc)
{
pfree(externalSelectDesc);
}
return slot;
}
/* ----------------------------------------------------------------
* ExecExternalScan(node)
*
* Scans the external relation sequentially and returns the next qualifying
* tuple.
* It calls the ExecScan() routine and passes it the access method
* which retrieve tuples sequentially.
*
*/
TupleTableSlot *
ExecExternalScan(ExternalScanState *node)
{
/*
* use SeqNext as access method
*/
return ExecScan(&node->ss, (ExecScanAccessMtd) ExternalNext);
}
/* ----------------------------------------------------------------
* ExecInitExternalScan
* ----------------------------------------------------------------
*/
ExternalScanState *
ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
{
ResultRelSegFileInfo *segfileinfo = NULL;
ExternalScanState *externalstate = NULL;
Relation currentRelation = NULL;
FileScanDesc currentScanDesc = NULL;
Assert(outerPlan(node) == NULL);
Assert(innerPlan(node) == NULL);
/*
* create state structure
*/
externalstate = makeNode(ExternalScanState);
externalstate->ss.ps.plan = (Plan *) node;
externalstate->ss.ps.state = estate;
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &externalstate->ss.ps);
/*
* initialize child expressions
*/
externalstate->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *) node->scan.plan.targetlist,
(PlanState *) externalstate);
externalstate->ss.ps.qual = (List *)
ExecInitExpr((Expr *) node->scan.plan.qual,
(PlanState *) externalstate);
/* Check if targetlist or qual contains a var node referencing the ctid column */
externalstate->cdb_want_ctid = contain_ctid_var_reference(&node->scan);
ItemPointerSetInvalid(&externalstate->cdb_fake_ctid);
#define EXTSCAN_NSLOTS 2
/*
* tuple table initialization
*/
ExecInitResultTupleSlot(estate, &externalstate->ss.ps);
ExecInitScanTupleSlot(estate, &externalstate->ss);
/*
* get the relation object id from the relid'th entry in the range table
* and open that relation.
*/
currentRelation = ExecOpenScanExternalRelation(estate, node->scan.scanrelid);
if (Gp_role == GP_ROLE_EXECUTE && node->err_aosegfileinfos)
{
segfileinfo = (ResultRelSegFileInfo *)list_nth(node->err_aosegfileinfos, GetQEIndex());
}
else
{
segfileinfo = NULL;
}
externalstate->ss.splits = GetFileSplitsOfSegment(estate->es_plannedstmt->scantable_splits,
currentRelation->rd_id,
GetQEIndex());
int formatterType = ExternalTableType_Invalid;
char *formatterName = NULL;
getExternalTableTypeInList(node->fmtType, node->fmtOpts,
&formatterType, &formatterName);
if (formatterType == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (formatterType != ExternalTableType_PLUG)
{
currentScanDesc = external_beginscan(node, currentRelation, segfileinfo,
formatterType, formatterName);
}
else
{
Assert(formatterName);
Oid procOid = LookupPlugStorageValidatorFunc(formatterName,
"beginscan");
if (OidIsValid(procOid))
{
FmgrInfo beginScanFunc;
fmgr_info(procOid, &beginScanFunc);
currentScanDesc = InvokePlugStorageFormatBeginScan(&beginScanFunc,
node,
&(externalstate->ss),
currentRelation,
formatterType,
formatterName);
}
else
{
elog(ERROR, "%s_beginscan function was not found", formatterName);
}
}
externalstate->ss.ss_currentRelation = currentRelation;
externalstate->ess_ScanDesc = currentScanDesc;
ExecAssignScanType(&externalstate->ss, RelationGetDescr(currentRelation));
/*
* Initialize result tuple type and projection info.
*/
ExecAssignResultTypeFromTL(&externalstate->ss.ps);
ExecAssignScanProjectionInfo(&externalstate->ss);
/*
* If eflag contains EXEC_FLAG_REWIND or EXEC_FLAG_BACKWARD or EXEC_FLAG_MARK,
* then this node is not eager free safe.
*/
externalstate->ss.ps.delayEagerFree =
((eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)) != 0);
/*
* If eflag contains EXEC_FLAG_EXTERNAL_AGG_COUNT then notify the underlying storage level
*/
externalstate->parent_agg_type = (eflags & EXEC_FLAG_EXTERNAL_AGG_COUNT);
initGpmonPktForExternalScan((Plan *)node, &externalstate->ss.ps.gpmon_pkt, estate);
return externalstate;
}
int
ExecCountSlotsExternalScan(ExternalScan *node)
{
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
EXTSCAN_NSLOTS;
}
/* ----------------------------------------------------------------
* ExecEndExternalScan
*
* frees any storage allocated through C routines.
* ----------------------------------------------------------------
*/
void
ExecEndExternalScan(ExternalScanState *node)
{
/*
* Free the exprcontext
*/
ExecFreeExprContext(&node->ss.ps);
/*
* clean out the tuple table
*/
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecClearTuple(node->ss.ss_ScanTupleSlot);
ExecEagerFreeExternalScan(node);
pfree(node->ess_ScanDesc);
/*
* close the external relation.
*
* MPP-8040: make sure we don't close it if it hasn't completed setup, or
* if we've already closed it.
*/
if (node->ss.ss_currentRelation)
{
Relation relation = node->ss.ss_currentRelation;
node->ss.ss_currentRelation = NULL;
ExecCloseScanRelation(relation);
}
EndPlanStateGpmonPkt(&node->ss.ps);
}
/* ----------------------------------------------------------------
* ExecStopExternalScan
*
* Performs identically to ExecEndExternalScan except that
* closure errors are ignored. This function is called for
* normal termination when the external data source is NOT
* exhausted (such as for a LIMIT clause).
* ----------------------------------------------------------------
*/
void
ExecStopExternalScan(ExternalScanState *node)
{
FileScanDesc fileScanDesc;
/*
* get information from node
*/
fileScanDesc = node->ess_ScanDesc;
/*
* stop the file scan
*/
if (fileScanDesc->fs_formatter_type == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (fileScanDesc->fs_formatter_type != ExternalTableType_PLUG)
{
external_stopscan(fileScanDesc);
}
else
{
FmgrInfo *stopScanFunc = fileScanDesc->fs_ps_scan_funcs.stopscan;
if (stopScanFunc)
{
InvokePlugStorageFormatStopScan(stopScanFunc, fileScanDesc);
}
else
{
elog(ERROR, "%s_stopscan function was not found",
fileScanDesc->fs_formatter_name);
}
}
}
/* ----------------------------------------------------------------
* Join Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecExternalReScan
*
* Rescans the relation.
* ----------------------------------------------------------------
*/
void
ExecExternalReScan(ExternalScanState *node, ExprContext *exprCtxt)
{
EState *estate;
Index scanrelid;
FileScanDesc fileScan;
estate = node->ss.ps.state;
scanrelid = ((SeqScan *) node->ss.ps.plan)->scanrelid;
/* If this is re-scanning of PlanQual ... */
if (estate->es_evTuple != NULL &&
estate->es_evTuple[scanrelid - 1] != NULL)
{
estate->es_evTupleNull[scanrelid - 1] = false;
return;
}
Gpmon_M_Incr(GpmonPktFromExtScanState(node), GPMON_EXTSCAN_RESCAN);
CheckSendPlanStateGpmonPkt(&node->ss.ps);
fileScan = node->ess_ScanDesc;
ItemPointerSet(&node->cdb_fake_ctid, 0, 0);
if (fileScan->fs_formatter_type == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (fileScan->fs_formatter_type != ExternalTableType_PLUG)
{
external_rescan(fileScan);
}
else
{
Assert(fileScan->fs_formatter_name);
FmgrInfo *rescanFunc = fileScan->fs_ps_scan_funcs.rescan;
if (rescanFunc)
{
InvokePlugStorageFormatReScan(rescanFunc, fileScan);
}
else
{
elog(ERROR, "%s_rescan function was not found",
fileScan->fs_formatter_name);
}
}
}
void
initGpmonPktForExternalScan(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, ExternalScan));
{
RangeTblEntry *rte = rt_fetch(((ExternalScan *)planNode)->scan.scanrelid,
estate->es_range_table);
char schema_rel_name[SCAN_REL_NAME_BUF_SIZE] = {0};
Assert(GPMON_EXTSCAN_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_ExternalScan,
(int64)planNode->plan_rows,
GetScanRelNameGpmon(rte->relid, schema_rel_name));
}
}
void
ExecEagerFreeExternalScan(ExternalScanState *node)
{
Assert(node->ess_ScanDesc != NULL);
FileScanDesc fileScanDesc = node->ess_ScanDesc;
if (fileScanDesc->fs_formatter_type == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (fileScanDesc->fs_formatter_type != ExternalTableType_PLUG)
{
external_endscan(fileScanDesc);
}
else
{
FmgrInfo *endScanFunc = fileScanDesc->fs_ps_scan_funcs.endscan;
if (endScanFunc)
{
InvokePlugStorageFormatEndScan(endScanFunc, fileScanDesc);
}
else
{
elog(ERROR, "%s_endscan function was not found",
fileScanDesc->fs_formatter_name);
}
}
}