blob: 90c1c1d502b4f832529436e9b5ab01e23735fb12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* nodeParquetScan.c
*
* Created on: Jul 4, 2013
* Author: malili
*/
#include "postgres.h"
#include "access/heapam.h"
#include "cdb/cdbparquetam.h"
#include "cdb/cdbvars.h"
#include "executor/execdebug.h"
#include "executor/nodeParquetScan.h"
static TupleTableSlot *ParquetNext(ParquetScanState *node);
static void OpenParquetScanRelation(ParquetScanState *node);
static void CloseParquetScanRelation(ParquetScanState *node);
static TupleTableSlot *ParquetNext(ParquetScanState *node)
{
parquet_getnext(node->scandesc, node->ss.ps.state->es_direction, node->ss.ss_ScanTupleSlot);
return node->ss.ss_ScanTupleSlot;
}
/*
* Open/close underlying table.
* Resource acquire and release
*/
static void OpenParquetScanRelation(ParquetScanState *node)
{
Assert(node->ss.scan_state == SCAN_INIT || node->ss.scan_state == SCAN_DONE);
Assert(!node->scandesc);
node->scandesc = parquet_beginscan(
node->ss.ss_currentRelation,
node->ss.ps.state->es_snapshot,
NULL /* relationTupleDesc */,
node->proj);
node->ss.scan_state = SCAN_SCAN;
}
static void CloseParquetScanRelation(ParquetScanState *node)
{
Assert((node->ss.scan_state & SCAN_SCAN) != 0);
parquet_endscan(node->scandesc);
node->scandesc = NULL;
node->ss.scan_state = SCAN_INIT;
}
/* ----------------------------------------------------------------
* ExecInitParquetScan
* ----------------------------------------------------------------
*/
ParquetScanState *
ExecInitParquetScan(ParquetScan *node, EState *estate, int eflags){
ParquetScanState *state;
Relation currentRelation;
Assert(outerPlan(node) == NULL);
Assert(innerPlan(node) == NULL);
/*
* create state structure
*/
state = makeNode(ParquetScanState);
state->ss.ps.plan = (Plan *) node;
state->ss.ps.state = estate;
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &state->ss.ps);
/*
* initialize child expressions
*/
state->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *) node->scan.plan.targetlist,
(PlanState *) state);
state->ss.ps.qual = (List *)
ExecInitExpr((Expr *) node->scan.plan.qual,
(PlanState *) state);
#define ParquetSCAN_NSLOTS 2
/*
* tuple table initialization
*/
ExecInitResultTupleSlot(estate, &state->ss.ps);
ExecInitScanTupleSlot(estate, &state->ss);
/*
* get the relation object id from the relid'th entry in the range table
* and open that relation.
*/
currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
state->ss.ss_currentRelation = currentRelation;
ExecAssignScanType(&state->ss, RelationGetDescr(currentRelation));
/*
* Initialize result tuple type and projection info.
*/
ExecAssignResultTypeFromTL(&state->ss.ps);
ExecAssignScanProjectionInfo(&state->ss);
/* init aocs prj */
{
int i;
state->ncol = currentRelation->rd_att->natts;
state->proj = palloc0(sizeof(bool) * state->ncol);
GetNeededColumnsForScan((Node *) node->scan.plan.targetlist, state->proj, state->ncol);
GetNeededColumnsForScan((Node *) node->scan.plan.qual, state->proj, state->ncol);
/* At least project one col */
for(i=0; i<state->ncol; ++i)
{
if(state->proj[i])
break;
}
if(i==state->ncol)
state->proj[0] = true;
}
// initGpmonPktForParquetScan((Plan *)node, &state->ss.ps.gpmon_pkt, estate);
return state;
}
/* ----------------------------------------------------------------
* ExecParquetScan(node)
*
* Scans the relation sequentially and returns the next qualifying
* tuple.
* It calls the ExecScan() routine and passes it the access method
* which retrieve tuples sequentially.
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecParquetScan(ParquetScanState *node){
/*
* use AppendOnlyNext as access method
*/
TupleTableSlot *slot;
if((node->ss.scan_state & SCAN_SCAN) == 0)
OpenParquetScanRelation(node);
slot = ExecScan((ScanState *) node, (ExecScanAccessMtd) ParquetNext);
if (!TupIsNull(slot))
{
Gpmon_M_Incr_Rows_Out(GpmonPktFromParquetScanState(node));
CheckSendPlanStateGpmonPkt(&node->ss.ps);
}
Assert((node->ss.scan_state & SCAN_MARKPOS) == 0);
if(TupIsNull(slot))
CloseParquetScanRelation(node);
return slot;
}
/* ----------------------------------------------------------------
* ExecEndParquetScan
*
* frees any storage allocated through C routines.
* ----------------------------------------------------------------
*/
void
ExecEndParquetScan(ParquetScanState * node)
{
Relation relation;
/*
* get information from node
*/
relation = node->ss.ss_currentRelation;
/*
* Free the exprcontext
*/
ExecFreeExprContext(&node->ss.ps);
/*
* clean out the tuple table
*/
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecClearTuple(node->ss.ss_ScanTupleSlot);
ExecEagerFreeParquetScan(node);
/*
* close the heap relation.
*/
ExecCloseScanRelation(relation);
EndPlanStateGpmonPkt(&node->ss.ps);
}
/* ----------------------------------------------------------------
* ExecParquetReScan
*
* Rescans the relation.
* ----------------------------------------------------------------
*/
void
ExecParquetReScan(ParquetScanState *node, ExprContext *exprCtxt)
{
EState *estate;
Index scanrelid;
estate = node->ss.ps.state;
scanrelid = ((ParquetScan *) node->ss.ps.plan)->scan.scanrelid;
/* If this is re-scanning of PlanQual ... */
if (estate->es_evTuple != NULL &&
estate->es_evTuple[scanrelid - 1] != NULL)
{
estate->es_evTupleNull[scanrelid - 1] = false;
return;
}
if((node->ss.scan_state & SCAN_SCAN) == 0)
OpenParquetScanRelation(node);
parquet_rescan(node->scandesc);
Gpmon_M_Incr(GpmonPktFromParquetScanState(node), GPMON_ParquetSCAN_RESCAN);
CheckSendPlanStateGpmonPkt(&node->ss.ps);
}
/* ----------------------------------------------------------------
* ExecEagerFreeParquetScan
* ----------------------------------------------------------------
*/
void
ExecEagerFreeParquetScan(ParquetScanState *node)
{
if((node->ss.scan_state & SCAN_SCAN) != 0)
CloseParquetScanRelation(node);
}
/* ----------------------------------------------------------------
* ExecCountSlotsParquetScan
* ----------------------------------------------------------------
*/
int
ExecCountSlotsParquetScan(ParquetScan *node)
{
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
ParquetSCAN_NSLOTS;
}
/* ----------------------------------------------------------------
* initGpmonPktForParquetScan
* ----------------------------------------------------------------
void initGpmonPktForParquetScan(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate){
// Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, ParquetScan));
//
// {
// RangeTblEntry *rte = rt_fetch(((AOCSScan *)planNode)->scan.scanrelid,
// estate->es_range_table);
// char schema_rel_name[SCAN_REL_NAME_BUF_SIZE] = {0};
//
// Assert(GPMON_AOCSSCAN_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
// InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_AppendOnlyColumnarScan,
// (int64)planNode->plan_rows,
// GetScanRelNameGpmon(rte->relid, schema_rel_name));
// }
}*/