blob: 8156987d0df99f718c6357f8d7a0099f6352a24f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* nodeDynamicIndexscan.c
* Support routines for scanning one or more indexes that
* are determined at runtime.
*
* DynamicIndexScan node scans each index one after the other.
* For each index, it opens the index, scans the index, and returns
* relevant tuples.
*
*/
#include "postgres.h"
#include "executor/executor.h"
#include "executor/instrument.h"
#include "nodes/execnodes.h"
#include "executor/execIndexscan.h"
#include "executor/nodeIndexscan.h"
#include "executor/execDynamicScan.h"
#include "executor/nodeDynamicIndexscan.h"
#include "cdb/cdbpartition.h"
#include "parser/parsetree.h"
#include "access/genam.h"
#include "catalog/catquery.h"
#include "nodes/nodeFuncs.h"
#include "utils/memutils.h"
#include "cdb/cdbvars.h"
/* Number of slots required for DynamicIndexScan */
#define DYNAMICINDEXSCAN_NSLOTS 2
/*
* Free resources from a partition.
*/
static inline void
CleanupOnePartition(IndexScanState *indexState);
/*
* Account for the number of tuple slots required for DynamicIndexScan
*/
int
ExecCountSlotsDynamicIndexScan(DynamicIndexScan *node)
{
return DYNAMICINDEXSCAN_NSLOTS;
}
/*
* Initialize ScanState in DynamicIndexScan.
*/
DynamicIndexScanState *
ExecInitDynamicIndexScan(DynamicIndexScan *node, EState *estate, int eflags)
{
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
DynamicIndexScanState *dynamicIndexScanState = makeNode(DynamicIndexScanState);
dynamicIndexScanState->indexScanState.ss.scan_state = SCAN_INIT;
/*
* This context will be reset per-partition to free up per-partition
* copy of LogicalIndexInfo
*/
dynamicIndexScanState->partitionMemoryContext = AllocSetContextCreate(CurrentMemoryContext,
"DynamicIndexScanPerPartition",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
IndexScanState *indexState = &(dynamicIndexScanState->indexScanState);
InitCommonIndexScanState((IndexScanState *)dynamicIndexScanState, (IndexScan *)node, estate, eflags);
/* We don't support eager free in DynamicIndexScan */
indexState->ss.ps.delayEagerFree = true;
InitRuntimeKeysContext(indexState);
initGpmonPktForDynamicIndexScan((Plan *)node, &indexState->ss.ps.gpmon_pkt, estate);
return dynamicIndexScanState;
}
static bool
DynamicIndexScan_ReMapColumns(DynamicIndexScanState *scanState, Oid newOid)
{
Assert(OidIsValid(newOid));
IndexScan *indexScan = (IndexScan *) scanState->indexScanState.ss.ps.plan;
if (!isDynamicScan(&indexScan->scan))
{
return false;
}
Oid oldOid = scanState->columnLayoutOid;
if (!OidIsValid(oldOid))
{
/* Very first partition */
oldOid = rel_partition_get_root(newOid);
}
Assert(OidIsValid(oldOid));
if (oldOid == newOid)
{
/*
* If we have only one partition and we are rescanning
* then we can have this scenario.
*/
return false;
}
AttrNumber *attMap = DynamicScan_GetColumnMapping(oldOid, newOid);
scanState->columnLayoutOid = newOid;
if (attMap)
{
IndexScan_MapLogicalIndexInfo(indexScan->logicalIndexInfo, attMap, indexScan->scan.scanrelid);
change_varattnos_of_a_varno((Node*)indexScan->scan.plan.targetlist, attMap, indexScan->scan.scanrelid);
change_varattnos_of_a_varno((Node*)indexScan->indexqual, attMap, indexScan->scan.scanrelid);
pfree(attMap);
return true;
}
else
{
return false;
}
}
/*
* This function initializes a part and returns true if a new index has been prepared for scanning.
*/
static bool
initNextIndexToScan(DynamicIndexScanState *node)
{
IndexScanState *indexState = &(node->indexScanState);
DynamicIndexScan *dynamicIndexScan = (DynamicIndexScan *)node->indexScanState.ss.ps.plan;
/* Load new index when the scanning of the previous index is done. */
if (indexState->ss.scan_state == SCAN_INIT ||
indexState->ss.scan_state == SCAN_DONE)
{
/* This is the oid of a partition of the table (*not* index) */
Oid *pid = hash_seq_search(&node->pidxStatus);
if (pid == NULL)
{
/* Return if all parts have been scanned. */
node->shouldCallHashSeqTerm = false;
return false;
}
/* Collect number of partitions scanned in EXPLAIN ANALYZE */
if(NULL != indexState->ss.ps.instrument)
{
Instrumentation *instr = indexState->ss.ps.instrument;
instr->numPartScanned ++;
}
DynamicIndexScan_ReMapColumns(node, *pid);
/*
* The is the oid of the partition of an *index*. Note: a partitioned table
* has a root and a set of partitions (may be multi-level). An index
* on a partitioned table also has a root and a set of index partitions.
* We started at table level, and now we are fetching the oid of an index
* partition.
*/
Oid pindex = getPhysicalIndexRelid(dynamicIndexScan->logicalIndexInfo,
*pid);
Assert(OidIsValid(pindex));
Relation currentRelation = OpenScanRelationByOid(*pid);
indexState->ss.ss_currentRelation = currentRelation;
for (int i=0; i < DYNAMICINDEXSCAN_NSLOTS; i++)
{
indexState->ss.ss_ScanTupleSlot[i].tts_tableOid = *pid;
}
ExecAssignScanType(&indexState->ss, RelationGetDescr(currentRelation));
ScanState *scanState = (ScanState *)node;
MemoryContextReset(node->partitionMemoryContext);
MemoryContext oldCxt = MemoryContextSwitchTo(node->partitionMemoryContext);
/* Initialize child expressions */
scanState->ps.qual = (List *)ExecInitExpr((Expr *)scanState->ps.plan->qual, (PlanState*)scanState);
scanState->ps.targetlist = (List *)ExecInitExpr((Expr *)scanState->ps.plan->targetlist, (PlanState*)scanState);
ExecAssignScanProjectionInfo(scanState);
EState *estate = indexState->ss.ps.state;
indexState->iss_RelationDesc =
OpenIndexRelation(estate, pindex, *pid);
/*
* build the index scan keys from the index qualification
*/
ExecIndexBuildScanKeys((PlanState *) indexState,
indexState->iss_RelationDesc,
dynamicIndexScan->indexqual,
dynamicIndexScan->indexstrategy,
dynamicIndexScan->indexsubtype,
&indexState->iss_ScanKeys,
&indexState->iss_NumScanKeys,
&indexState->iss_RuntimeKeys,
&indexState->iss_NumRuntimeKeys,
NULL,
NULL);
MemoryContextSwitchTo(oldCxt);
ExprContext *econtext = indexState->iss_RuntimeContext; /* context for runtime keys */
if (indexState->iss_NumRuntimeKeys != 0)
{
ExecIndexEvalRuntimeKeys(econtext,
indexState->iss_RuntimeKeys,
indexState->iss_NumRuntimeKeys);
}
indexState->iss_RuntimeKeysReady = true;
/*
* Initialize result tuple type and projection info.
*/
TupleDesc td = indexState->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
if (td)
{
pfree(td);
td = NULL;
}
ExecAssignResultTypeFromTL(&indexState->ss.ps);
ExecAssignScanProjectionInfo(&indexState->ss);
indexState->iss_ScanDesc = index_beginscan(currentRelation,
indexState->iss_RelationDesc,
estate->es_snapshot,
indexState->iss_NumScanKeys,
indexState->iss_ScanKeys);
indexState->ss.scan_state = SCAN_SCAN;
}
return true;
}
/*
* setPidIndex
* Set the hash table of Oids to scan.
*/
static void
setPidIndex(DynamicIndexScanState *node)
{
Assert(node->pidxIndex == NULL);
IndexScanState *indexState = (IndexScanState *)node;
EState *estate = indexState->ss.ps.state;
DynamicIndexScan *plan = (DynamicIndexScan *)indexState->ss.ps.plan;
Assert(estate->dynamicTableScanInfo != NULL);
/*
* Ensure that the dynahash exists even if the partition selector
* didn't choose any partition for current scan node [MPP-24169].
*/
InsertPidIntoDynamicTableScanInfo(plan->scan.partIndex, InvalidOid, InvalidPartitionSelectorId);
Assert(NULL != estate->dynamicTableScanInfo->pidIndexes);
Assert(estate->dynamicTableScanInfo->numScans >= plan->scan.partIndex);
node->pidxIndex = estate->dynamicTableScanInfo->pidIndexes[plan->scan.partIndex - 1];
Assert(node->pidxIndex != NULL);
if (optimizer_partition_selection_log)
{
LogSelectedPartitionOids(node->pidxIndex);
}
}
/*
* Execution of DynamicIndexScan
*/
TupleTableSlot *
ExecDynamicIndexScan(DynamicIndexScanState *node)
{
Assert(node);
IndexScanState *indexState = &(node->indexScanState);
TupleTableSlot *slot = NULL;
/*
* If this is called the first time, find the pid index that contains all unique
* partition pids for this node to scan.
*/
if (node->pidxIndex == NULL)
{
setPidIndex(node);
Assert(node->pidxIndex != NULL);
hash_seq_init(&node->pidxStatus, node->pidxIndex);
node->shouldCallHashSeqTerm = true;
}
/*
* Scan index to find next tuple to return. If the current index
* is exhausted, close it and open the next index for scan.
*/
while (TupIsNull(slot) &&
initNextIndexToScan(node))
{
slot = ExecScan(&indexState->ss, (ExecScanAccessMtd) IndexNext);
if (!TupIsNull(slot))
{
/* Report output rows to Gpmon */
Gpmon_M_Incr_Rows_Out(GpmonPktFromDynamicIndexScanState(node));
CheckSendPlanStateGpmonPkt(&indexState->ss.ps);
}
else
{
CleanupOnePartition(indexState);
}
}
return slot;
}
/*
* Release resources for one part (this includes closing the index and
* the relation).
*/
static inline void
CleanupOnePartition(IndexScanState *indexState)
{
Assert(NULL != indexState);
/* Reset index state and release locks. */
ExecClearTuple(indexState->ss.ps.ps_ResultTupleSlot);
ExecClearTuple(indexState->ss.ss_ScanTupleSlot);
if ((indexState->ss.scan_state & SCAN_SCAN) != 0)
{
Assert(indexState->iss_ScanDesc != NULL);
Assert(indexState->iss_RelationDesc != NULL);
Assert(indexState->ss.ss_currentRelation != NULL);
index_endscan(indexState->iss_ScanDesc);
indexState->iss_ScanDesc = NULL;
index_close(indexState->iss_RelationDesc, NoLock);
indexState->iss_RelationDesc = NULL;
ExecCloseScanRelation(indexState->ss.ss_currentRelation);
indexState->ss.ss_currentRelation = NULL;
}
indexState->ss.scan_state = SCAN_INIT;
}
/*
* Ends current scan by closing relations, and ending hash
* iteration
*/
static void
DynamicIndexScanEndCurrentScan(DynamicIndexScanState *node)
{
IndexScanState *indexState = &(node->indexScanState);
CleanupOnePartition(indexState);
if (node->shouldCallHashSeqTerm)
{
hash_seq_term(&node->pidxStatus);
node->shouldCallHashSeqTerm = false;
}
}
/*
* Release resources of DynamicIndexScan
*/
void
ExecEndDynamicIndexScan(DynamicIndexScanState *node)
{
DynamicIndexScanEndCurrentScan(node);
IndexScanState *indexState = &(node->indexScanState);
FreeRuntimeKeysContext((IndexScanState *) node);
EndPlanStateGpmonPkt(&indexState->ss.ps);
MemoryContextDelete(node->partitionMemoryContext);
}
/*
* Allow rescanning an index.
*/
void
ExecDynamicIndexReScan(DynamicIndexScanState *node, ExprContext *exprCtxt)
{
DynamicIndexScanEndCurrentScan(node);
/* Force reloading the hash table */
node->pidxIndex = NULL;
/* Context for runtime keys */
ExprContext *econtext = node->indexScanState.iss_RuntimeContext;
if (econtext)
{
/*
* If we are being passed an outer tuple, save it for runtime key
* calc. We also need to link it into the "regular" per-tuple
* econtext, so it can be used during indexqualorig evaluations.
*/
if (exprCtxt != NULL)
{
econtext->ecxt_outertuple = exprCtxt->ecxt_outertuple;
ExprContext *stdecontext = node->indexScanState.ss.ps.ps_ExprContext;
stdecontext->ecxt_outertuple = exprCtxt->ecxt_outertuple;
}
/*
* Reset the runtime-key context so we don't leak memory as each outer
* tuple is scanned. Note this assumes that we will recalculate *all*
* runtime keys on each call.
*/
ResetExprContext(econtext);
}
Gpmon_M_Incr(GpmonPktFromDynamicIndexScanState(node), GPMON_DYNAMICINDEXSCAN_RESCAN);
CheckSendPlanStateGpmonPkt(&node->indexScanState.ss.ps);
}
/*
* Method for reporting DynamicIndexScan progress to gpperfmon
*/
void
initGpmonPktForDynamicIndexScan(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, DynamicIndexScan));
{
RangeTblEntry *rte = rt_fetch(((Scan *)planNode)->scanrelid, estate->es_range_table);
char schema_rel_name[SCAN_REL_NAME_BUF_SIZE] = {0};
Assert(GPMON_DYNAMICINDEXSCAN_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_DynamicIndexScan,
(int64) planNode->plan_rows, GetScanRelNameGpmon(rte->relid, schema_rel_name));
}
}