blob: b592c319d4d99c4c0726f415daf91f1c3c1650ae [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
**************************************************************************
*
* = File: ScmCostMethod.C
* Description: Cost estimation interface object for Simple Cost Model
* Language: C++
*
* Last Modified:
* Modified by:
* Purpose: Simple Cost Vector Reduction
*
*
*
**************************************************************************
*/
#include "GroupAttr.h"
#include "AllRelExpr.h"
#include "RelPackedRows.h"
#include "RelSequence.h"
#include "RelSample.h"
#include "AllItemExpr.h"
#include "ItemSample.h"
#include "opt.h"
#include "EstLogProp.h"
#include "DefaultConstants.h"
#include "ItemOther.h"
#include "ScanOptimizer.h"
#include "SimpleScanOptimizer.h"
#include "NAFileSet.h"
#include "SchemaDB.h"
#include "CostMethod.h"
#include "Cost.h"
#include "NodeMap.h"
#include "HDFSHook.h"
#include "CmpStatement.h"
#include "sqludr.h"
#include <math.h>
#ifndef NDEBUG
static THREAD_P FILE* pfp = NULL;
#endif // NDEBUG
// -----------------------------------------------------------------------
// CostMethod::scmComputeOperatorCost()
// -----------------------------------------------------------------------
Cost*
CostMethod::scmComputeOperatorCost(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
Cost* cost;
try {
cost = scmComputeOperatorCostInternal(op, pws, countOfStreams);
} catch(...) {
// cleanUp() must be called before this function is called again
// because wrong results may occur the next time scmComputeOperatorCost()
// is called and because the SharedPtr objects must be set to zero.
// Failure to call cleanUp() will very likely cause problems.
cleanUp();
throw; // rethrow the exception
}
cleanUp();
return cost;
} // CostMethod::scmComputeOperatorCost()
// -----------------------------------------------------------------------
// CostMethod::scmComputeOperatorCostInternal() (derived class must redefine)
// -----------------------------------------------------------------------
Cost*
CostMethod::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams )
{
countOfStreams = 1;
#ifndef NDEBUG
if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
fprintf(stdout," %s : is not yet implemented, plan may not be optimal\n", className_);
#endif // NDEBUG
Cost* costPtr =
scmCost(1e32 /*tcProc */, csZero, csZero, csZero, csZero, csOne,
csZero, csZero, csZero, csZero);
return costPtr;
} // CostMethod::scmComputeOperatorCostInternal()
//<pb>
//==============================================================================
// scmComputePlanCost() produces a final cumulative cost for an entire
// subtree rooted at a specified physical operator.
//
// Input:
// op -- specified physical operator.
//
// myContext -- context associated with specified physical operator
//
// pws -- plan workspace associated with specified physical operator.
//
// planNumber -- used to get appropriate child contexts.
//
// Output:
// none
//
// Return:
// Pointer to cumulative final cost.
//
//==============================================================================
Cost*
CostMethod::scmComputePlanCost( RelExpr* op,
const PlanWorkSpace* pws,
Lng32 planNumber
)
{
const Context* myContext = pws->getContext();
//----------------------------------------------------------------------
// Defensive programming.
//----------------------------------------------------------------------
CMPASSERT( op != NULL );
CMPASSERT( myContext != NULL );
CMPASSERT( pws != NULL );
//--------------------------------------------------------------------------
// Grab parent's cost (independent of its children) directly from the plan
// work space. This cost should contain result of scmComputeOperatorCost().
//--------------------------------------------------------------------------
// Need to cast constness away since getFinalOperatorCost cannot
// be made const
Cost* parentCost = ((PlanWorkSpace *)pws)->getFinalOperatorCost( planNumber );
//----------------------------------------------------------------------------
// For leaf nodes (i.e. those having no children), return a copy of parent's
// cost as the final cost.
//----------------------------------------------------------------------------
if ( op->getArity() == 0 )
{
return parentCost;
}
// The following code assumes that each operator will have maximum
// of two children. And requires modification if that is not true.
// For example UNION operator.
CostPtr leftChildCost = NULL;
CostPtr rightChildCost = NULL;
if (op->getArity() == 1)
{
getChildCostForUnaryOp(op,
myContext,
pws,
planNumber,
leftChildCost);
}
else if (op->getArity() == 2)
{
getChildCostsForBinaryOp( op
, myContext
, pws
, planNumber
, leftChildCost
, rightChildCost);
}
else
ABORT("CostMethod::scmComputePlanCost(): More than two children");
Cost* planCost = scmRollUp( parentCost
, leftChildCost
, rightChildCost
, myContext->getReqdPhysicalProperty()
);
if (leftChildCost)
delete leftChildCost;
if (rightChildCost)
delete rightChildCost;
delete parentCost;
return planCost;
} // CostMethod::computePlanCost()
//==============================================================================
// Roll up children cost and parent cost into a cumulative cost.
// This is the default method and may be overridden for specific operators.
//
// Input:
// parentCost -- Cost of parent independent of its child.
//
// leftChildCost -- cumulative cost of the left child.
//
// rightChildCost -- cumulative cost of the right child.
//
// rpp -- Parent's required physical properties needed by lower level
// roll-up routines.
//
// Output:
// none
//
// Return:
// Rolled up cost.
//
//==============================================================================
Cost*
CostMethod::scmRollUp( Cost* const parentCost
, Cost* const leftChildCost
, Cost* const rightChildCost
, const ReqdPhysicalProperty* const rpp
)
{
//----------------------------------------------------------------------
// Defensive programming.
//----------------------------------------------------------------------
CMPASSERT( parentCost != NULL );
SimpleCostVector parentVector = parentCost->getScmCplr();
SimpleCostVector leftChildVector, rightChildVector;
if (leftChildCost != NULL)
leftChildVector = leftChildCost->getScmCplr();
if (rightChildCost != NULL)
rightChildVector = rightChildCost->getScmCplr();
SimpleCostVector cumCostVector;
if (leftChildCost == NULL)
cumCostVector = parentVector;
else
{
if (rightChildCost == NULL)
cumCostVector = parentVector + leftChildVector;
else
cumCostVector = parentVector + (leftChildVector + rightChildVector);
}
return new STMTHEAP Cost(&cumCostVector);
} // CostMethod::scmRollUp()
//==============================================================================
// Wrapper for SCM Cost constructor.
// Used by SCM only.
//
// Return:
// Cost
//
//==============================================================================
Cost *
CostMethod::scmCost(CostScalar tuplesProcessed,
CostScalar tuplesProduced,
CostScalar tuplesSent,
CostScalar ioRand,
CostScalar ioSeq,
CostScalar noOfProbes,
CostScalar input1RowSize,
CostScalar input2RowSize,
CostScalar outputRowSize,
CostScalar probeRowSize)
{
// assert if called by OCM .
DCMPASSERT(CmpCommon::getDefault(SIMPLE_COST_MODEL) == DF_ON);
SimpleCostVector scmLR ( csZero, /* CPUTime */
csZero, /* IOTime */
csZero, /* MSGTime */
csZero, /* idle time */
tuplesProcessed, /* tcProc */
tuplesProduced, /* tcProd */
tuplesSent, /* tcSent */
ioRand, /* ioRand */
ioSeq, /* ioSeq */
noOfProbes ); /* num probes */
// This is ok for now, as cpuCount will almost always be >= countOfStreams.
// The reason this is commented out for now is because exchange costing
// does not set countOfStreams_ and this leads to problems. Doing this in
// each operator separately will circumvent this problem, but make the code
// ugly, so it is best to comment it out for now. Does not have any negative
// impact on plans.
/* if (cpuCount < countOfStreams_)
{
scmLR = scmLR.scaleByValue(countOfStreams_/cpuCount);
}
*/
NABoolean scmDebugOn = (CmpCommon::getDefault(NCM_PRINT_ROWSIZE) == DF_ON);
if (scmDebugOn == TRUE)
{
// debug mode, build another SimpleCostVector with rowsize information
// for debugging purposes only
// FOR INTERNAL USE ONLY.
const SimpleCostVector scmDebug ( csZero,
csZero,
csZero,
csZero,
input1RowSize, /* input1 rowsize */
input2RowSize, /* input2 rowsize */
outputRowSize, /* output rowsize */
probeRowSize, /* probe rowsize */
csZero,
csZero );
return new STMTHEAP Cost(&scmLR, &scmDebug);
}
else
{
// Normal mode
return new STMTHEAP Cost(&scmLR);
}
} // CostMethod::scmCost
//==============================================================================
// Wrapper for SCM Cost constructor.
// Used by SCM only.
//
// Return:
// Cost
//
//==============================================================================
Cost *
ScanOptimizer::scmCost(CostScalar tuplesProcessed,
CostScalar tuplesProduced,
CostScalar tuplesSent,
CostScalar ioRand,
CostScalar ioSeq,
CostScalar noOfProbes,
CostScalar input1RowSize,
CostScalar input2RowSize,
CostScalar outputRowSize,
CostScalar probeRowSize)
{
// assert if called by OCM .
DCMPASSERT(CmpCommon::getDefault(SIMPLE_COST_MODEL) == DF_ON);
SimpleCostVector scmLR ( csZero, /* CPUTime */
csZero, /* IOTime */
csZero, /* MSGTime */
csZero, /* idle time */
tuplesProcessed, /* tcProc */
tuplesProduced, /* tcProd */
tuplesSent, /* tcSent */
ioRand, /* ioRand */
ioSeq, /* ioSeq */
noOfProbes ); /* num probes */
NABoolean scmDebugOn = (CmpCommon::getDefault(NCM_PRINT_ROWSIZE) == DF_ON);
if (scmDebugOn == TRUE)
{
// debug mode, build another SimpleCostVector with rowsize information
// for debugging purposes only
// FOR INTERNAL USE ONLY.
const SimpleCostVector scmDebug ( csZero,
csZero,
csZero,
csZero,
input1RowSize, /* input1 rowsize */
input2RowSize, /* input2 rowsize */
outputRowSize, /* output rowsize */
probeRowSize, /* probe rowsize */
csZero,
csZero );
return new STMTHEAP Cost(&scmLR, &scmDebug);
}
else
{
// Normal mode
return new STMTHEAP Cost(&scmLR);
}
} // ScanOptimizer::scmCost
//==============================================================================
// Scale the cost to account for rowsizes. In case of large rowsizes,
// tuple counts alone may not be enough to capture the true costs.
//
// Inputs:
// rowSize -- size of the row.
// Output:
// none.
//
// Return:
// RowSize factor
//
//==============================================================================
CostScalar
CostMethod::scmRowSizeFactor( CostScalar rowSize, ncmRowSizeFactorType rowSizeFactorType )
{
CostScalar rowSizeFactor;
switch(rowSizeFactorType)
{
case TUPLES_ROWSIZE_FACTOR:
rowSizeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_TUPLES_ROWSIZE_FACTOR);
break;
case SEQ_IO_ROWSIZE_FACTOR:
rowSizeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_SEQ_IO_ROWSIZE_FACTOR);
break;
case RAND_IO_ROWSIZE_FACTOR:
rowSizeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_RAND_IO_ROWSIZE_FACTOR);
break;
default:
rowSizeFactor = 0.0;
break;
}
// Defensive programming
if (rowSize <= 1)
return 1.0;
return MAXOF(pow(rowSize.getValue(), rowSizeFactor.getValue()), 1.0);
} // CostMethod::scmRowSizeFactor
//==============================================================================
// Scale the cost to account for rowsizes. In case of large rowsizes,
// tuple counts alone may not be enough to capture the true costs.
//
// Inputs:
// rowSize -- size of the row.
// Output:
// none.
//
// Return:
// RowSize factor
//
//==============================================================================
CostScalar
ScanOptimizer::scmRowSizeFactor( CostScalar rowSize, ncmRowSizeFactorType rowSizeFactorType )
{
CostScalar rowSizeFactor;
switch(rowSizeFactorType)
{
case TUPLES_ROWSIZE_FACTOR:
rowSizeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_TUPLES_ROWSIZE_FACTOR);
break;
case SEQ_IO_ROWSIZE_FACTOR:
rowSizeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_SEQ_IO_ROWSIZE_FACTOR);
break;
case RAND_IO_ROWSIZE_FACTOR:
rowSizeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_RAND_IO_ROWSIZE_FACTOR);
break;
default:
rowSizeFactor = 0.0;
break;
}
// Defensive programming
if (rowSize <= 1)
return 1.0;
return MAXOF(pow(rowSize.getValue(), rowSizeFactor.getValue()), 1.0);
} // ScanOptimizer::scmRowSizeFactor
//<pb>
/**********************************************************************/
/* */
/* CostMethodRelRoot */
/* */
/**********************************************************************/
Cost*
CostMethodRelRoot::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// -------------------------------------------------------------
// Save off estimated degree of parallelism. Always 1 for root.
// -------------------------------------------------------------
countOfStreams = 1;
// ---------------------------------------------------------------------
// In SCM Root operator cost is ignored. so, we just return an empty
// Cost object.
return new HEAP Cost();
} // CostMethodRelRoot::scmComputeOperatorCostInternal()
//<pb>
/**********************************************************************/
/* */
/* CostMethodFixedCostPerRow */
/* */
/**********************************************************************/
Cost*
CostMethodFixedCostPerRow::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op, pws->getContext());
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
// ---------------------------------------------------------------------
// In SCM Fixed cost per row is ignored as this cost doesn't affect plan.
// So, we just return an empty
// Cost object.
return new HEAP Cost();
} // CostMethodFixedCostPerRow::scmComputeOperatorCostInternal()
//<pb>
/**********************************************************************/
/* */
/* CostMethodFileScan */
/* */
/**********************************************************************/
Cost*
CostMethodFileScan::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// call old computeOperatorCostInternal method.
return computeOperatorCostInternal( op, pws->getContext(), countOfStreams );
} // CostMethodFileScan::scmComputeOperatorCostInternal()
//<pb>
/**********************************************************************/
/* */
/* CostMethodDP2Scan */
/* */
/**********************************************************************/
Cost*
CostMethodDP2Scan::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
FileScan *fs = (FileScan *) op;
if (!fs->isHiveTable())
// call old computeOperatorCostInternalx method.
return computeOperatorCostInternal( op, myContext, countOfStreams );
// ---------------------------------------------------------------------
// Try to do a very vanilla cost computation for Hive scans for now
// ---------------------------------------------------------------------
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
Cost* hiveScanCost = NULL;
CostScalar tuplesProcessed = csZero;
CostScalar tuplesProduced = csZero;
// Hbase table case
if (fs->isHbaseTable()) {
CostScalar outputRowSize = 100 /* getEstimatedRecordLength*/;
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
hiveScanCost = scmCost(100,
100,
csZero,
csZero,
csZero,
noOfProbesPerStream_,
csZero,
csZero,
outputRowSize,
csZero);
} else {
// Hive table case
HHDFSStatsBase hdfsStats(/* HHDFSTableStats * tableStats */ NULL);
fs->getHiveSearchKey()->accumulateSelectedStats(hdfsStats);
CostScalar tuplesProcessed = (CostScalar)hdfsStats.getEstimatedRowCount();
CostScalar tuplesProduced = myRowCount_ / partFunc_->getCountOfPartitions();
// ---------------------------------------------------------------------
// tupleProduced can never be more than tupleProcessed.
// This thing can happen sometimes especially for partial group bys since
// cardinality estimates do not distinguish between partial
// and full group bys.
// ---------------------------------------------------------------------
tuplesProduced = MINOF(tuplesProcessed, tuplesProduced);
CostScalar outputRowSize = (CostScalar)hdfsStats.getEstimatedRecordLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= outputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ---------------------------------------------------------------------
// Synthesize and return the cost object.
// ---------------------------------------------------------------------
hiveScanCost = scmCost(tuplesProcessed,
tuplesProduced,
csZero,
csZero,
csZero,
noOfProbesPerStream_,
csZero,
csZero,
outputRowSize,
csZero);
}
// ---------------------------------------------------------------------
// For debugging.
// ---------------------------------------------------------------------
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"HIVESCAN::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"tuplesProcessed=%g,myRowCount=%g\n",
tuplesProcessed.toDouble(),myRowCount_.toDouble());
hiveScanCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", hiveScanCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).
value());
fprintf(pfp,"\n");
}
#endif
return hiveScanCost;
} // CostMethodDP2Scan::scmComputeOperatorCostInternal()
// SimpleFileScanOptimizer::scmComputeCostVectors()
// Computes the cost vectors for this scan using the simple costing model.
// Computes:
// - number of tuples processed
// - number of tuples produced
// - sequential IOs
//
// OUTPUTS: lastRow SimpleCostVectors of a new Cost object are populated.
//
Cost *
SimpleFileScanOptimizer::scmComputeCostVectors()
{
// if the table is Hbase, then call scmComputeCostVectorsForHbase()
if (getIndexDesc()->getPrimaryTableDesc()->getNATable()->isHbaseTable())
return scmComputeCostVectorsForHbase();
const LogPhysPartitioningFunction *logPhysPartFunc =
getContext().getPlan()->getPhysicalProperty()->getPartitioningFunction()->
castToLogPhysPartitioningFunction();
NABoolean syncAccess = FALSE;
CostScalar numActivePartitions;
CostScalar tuplesProcessed, tuplesProduced, tuplesSent = csZero;
numActivePartitions = getEstNumActivePartitionsAtRuntime();
if (logPhysPartFunc != NULL)
syncAccess = logPhysPartFunc->getSynchronousAccess();
if (syncAccess)
{
tuplesProcessed = getSingleSubsetSize();
tuplesProduced = getResultSetCardinality();
}
else
{
tuplesProcessed = (getSingleSubsetSize()/numActivePartitions).getCeiling();
tuplesProduced = getResultSetCardinalityPerScan();
}
setProbes(1);
setTuplesProcessed(getSingleSubsetSize());
CostScalar numBlocks = getNumBlocksForRows(tuplesProcessed);
//CostScalar numBlocks = estimateSeqKBReadPerScan()/getBlockSizeInKb();
// Store in ScanOptimizer object. FileScan will later grab this
// value.
setNumberOfBlocksToReadPerAccess(numBlocks);
// Factor in row sizes.
CostScalar rowSize = recordSizeInKb_ * csOneKiloBytes;
CostScalar outputRowSize = getRelExpr().getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= rowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
CostScalar seqIORowSizeFactor = scmRowSizeFactor(rowSize, SEQ_IO_ROWSIZE_FACTOR);
numBlocks *= seqIORowSizeFactor;
// fix Bugzilla #1110.
setEstRowsAccessed(getSingleSubsetSize());
Cost* scanCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, numBlocks, csOne,
rowSize, csZero, outputRowSize, csZero);
return scanCost;
} // scmComputeCostVectors
// SimpleFileScanOptimizer::scmComputeCostVectorsMultiProbes()
// Computes the cost vectors for this multi-probe scan using the simple costing model.
// Computes:
// - number of tuples processed for all probes
// - number of tuples produced for all probes
// - sequential IOs for all probes
// - number of probes for all probes
// currently assumes that probes are not ordered. Need to implement
// ordered probes case later.
//
// OUTPUTS: lastRow SimpleCostVectors of a new Cost object are populated.
//
Cost *
SimpleFileScanOptimizer::scmComputeCostVectorsMultiProbes()
{
if ( getIndexDesc()->getPrimaryTableDesc()->getNATable()->isHbaseTable() AND
(CmpCommon::getDefault(NCM_HBASE_COSTING) == DF_ON) )
return scmComputeCostVectorsMultiProbesForHbase();
CostScalar numOuterProbes = (getContext().getInputLogProp())->getResultCardinality();
CostScalar numActivePartitions = getNumActivePartitions();
CostScalar ioSeq, ioRand, numRandIOs;
NABoolean isUnique = getSearchKey()->isUnique();
NABoolean isAnIndexJoin;
ValueIdSet charInputs = getRelExpr().getGroupAttr()->getCharacteristicInputs();
const ReqdPhysicalProperty* rpp = getContext().getReqdPhysicalProperty();
NABoolean ocbJoin = FALSE;
if (rpp != NULL && rpp->getOcbEnabledCostingRequirement())
ocbJoin = TRUE;
// Effective Total Row Count is the size of the bounding subset of
// all probes. Typically this will be all the rows of the table,
// but if all probes are restricted to a subset of rows (e.g. the
// key predicate contains leading constants) then the effective row
// count will be less than the total row count.
estimateEffTotalRowCount(totalRowCount_, effectiveTotalRowCount_);
CostScalar effectiveTotalRowCount = (effectiveTotalRowCount_/numActivePartitions).getCeiling();
CostScalar effectiveTotalBlocks = getNumBlocksForRows(effectiveTotalRowCount).getCeiling();
CostScalar cacheSize = ActiveSchemaDB()->getDefaults().getAsDouble(NCM_CACHE_SIZE_IN_BLOCKS); //getDP2CacheSizeInBlocks(getBlockSizeInKb());
categorizeMultiProbes(&isAnIndexJoin);
CostScalar numProbes = (probes_/numActivePartitions).getCeiling();
CostScalar numUniqueProbes = (uniqueProbes_/numActivePartitions).getCeiling();
CostScalar numSuccessfulProbes = (successfulProbes_/numActivePartitions).getCeiling();
CostScalar numUniqueSuccessfulProbes = ((successfulProbes_ - duplicateSuccProbes_)/numActivePartitions).getCeiling();
NABoolean njSeqIoFix = (CmpCommon::getDefault(NCM_NJ_SEQIO_FIX) == DF_ON);
CostScalar numBlocksPerSuccessfulProbe = csZero;
if (njSeqIoFix)
numBlocksPerSuccessfulProbe = blksPerSuccProbe_;
else
numBlocksPerSuccessfulProbe = (blksPerSuccProbe_/numActivePartitions).getCeiling();
CostScalar numBlocksAccessedByUniqueProbes = MINOF(numUniqueProbes, effectiveTotalBlocks);
if (ocbJoin)
{
// In OCB, the probe side is broadcast to all partitions of the right side.
numProbes = numOuterProbes;
numUniqueProbes = numOuterProbes;
}
// The probes are pushed down from the nested join to the right child.
// Initialize tuplesProcessed to the number of probes.
CostScalar tuplesProcessed = numProbes;
// These values are for all probes.
CostScalar accessedRows = (getDataRows()/numActivePartitions).getCeiling();
CostScalar selectedRows = getResultSetCardinalityPerScan();
CostScalar tuplesProduced = MINOF(selectedRows, accessedRows);
CostScalar outputJoinCard = tuplesProduced;
const IndexDesc *iDesc = getIndexDesc();
CostScalar indexLevels = MAXOF(iDesc->getIndexLevels() - 1, 1);
CostScalar a = numProbes;
if (effectiveTotalBlocks <= cacheSize)
{
numRandIOs = MINOF(a, effectiveTotalBlocks);
}
else
{
if (a <= cacheSize)
{
numRandIOs = a;
}
else
{
numRandIOs = cacheSize + (a - cacheSize) * (effectiveTotalBlocks - cacheSize) / effectiveTotalBlocks;
}
}
if (isAnIndexJoin)
{
// Join between Index (left child) and Table (right child),
// involves random IOs.
tuplesProcessed += accessedRows;
ioRand = numRandIOs;
}
else
{
// The right child is the base table or the covering index being probed.
if (getInOrderProbesFlag() OR
ocbJoin OR
(effectiveTotalBlocks <= cacheSize) OR // Whole table fits in cache
((numUniqueSuccessfulProbes * blksPerSuccProbe_)
+ getFailedProbes() <= cacheSize) // all blocks accessed fits cache
)
{
if (isUnique)
tuplesProcessed += numSuccessfulProbes;
else
tuplesProcessed += accessedRows;
// Incoming probes are in the same order as the clustering key,
// no full table scan. For every probe, a subset of the table
// (or covered index) is processed.
if (isUnique && numBlocksAccessedByUniqueProbes <= cacheSize)
{
ioRand = numBlocksAccessedByUniqueProbes;
}
else if (numBlocksPerSuccessfulProbe <= cacheSize)
{
ioRand = numRandIOs;
ioSeq = numUniqueSuccessfulProbes * (numBlocksPerSuccessfulProbe - 1);
}
else
{
ioRand = numRandIOs;
ioSeq = numSuccessfulProbes * (numBlocksPerSuccessfulProbe - 1);
}
}
else
{
// Begin Temp code
// My changes to 5450/5425 exposed a bug where RandomIo NJ preferred
// because Ocr plan was not tried in ETL workload. I am partially
// rolling back my changes 5425, but will fix the root cause asap.
// get all predicates
ValueIdSet allPreds;
allPreds = getSingleSubsetPreds();
// get all predicates' base columns
ValueIdSet allReferencedBaseCols;
allPreds.findAllReferencedBaseCols(allReferencedBaseCols);
// try to find a predicate that matches
// 1st nonconstant key prefix column
NABoolean foundKey = FALSE;
CollIndex x = 0;
ColAnalysis *colA = NULL;
const ValueIdList *currentIndexSortKey = &(iDesc->getOrderOfKeyValues());
for (x = 0; x < (*currentIndexSortKey).entries() && !foundKey; x++)
{
ValueId firstkey = (*currentIndexSortKey)[x];
// firstkey with a constant predicate does not count in
// making this NJ better than a HJ. keep going.
ItemExpr *cv;
NABoolean isaConstant = FALSE;
ValueId firstkeyCol;
colA = firstkey.baseColAnalysis(&isaConstant, firstkeyCol);
if (isaConstant)
continue; // try next prefix column
if (!colA) // no column analysis
break; // we can't go further, break out of the loop.
if (colA->getConstValue(cv,FALSE/*useRefAConstExpr*/))
continue; // try next prefix column
// any predicate on first nonconstant prefix key column?
if (allReferencedBaseCols.containsTheGivenValue(firstkeyCol))
// nonconstant prefix key matches predicate
foundKey = TRUE;
else
break; // predicate is not a key predicate, cost it high
}
if ((NOT (iDesc->isClusteringIndex()) &&
(getSearchKey()->getKeyPredicates().entries() > 0)))
foundKey = TRUE;
// end Temp code
// re-check risky NJ Heuristics from JoinToTSJRule::topMatch()
NABoolean allowNJ = TRUE;
Lng32 ratio = (ActiveSchemaDB()->getDefaults()).getAsLong
(HJ_SCAN_TO_NJ_PROBE_SPEED_RATIO);
// innerS is size of data scanned for inner table in HJ plan
CostScalar innerS = effectiveTotalRowCount_ * getRecordSizeInKb() * 1024;
// We must allow NJ if #outerRows <= 1 because
// HashJoinRule::topMatch disables HJ for that case.
// if innerS < #outerRows * ratio, prefer HJ over NJ.
if (ratio > 0 && numOuterProbes > 1 &&
(innerS < numOuterProbes * ratio) )
allowNJ = FALSE;
// if probes are partially ordered and max cardinality of probes
// < 10000 * probes, then cost this NJ cheaper than random order probes.
// Otherwise we suspect card estimation of probes and accessedRows,
// so do not want take chance with NJ plan.
// This check is controlled by CQD NCM_NJ_PROBES_MAXCARD_FACTOR(10000).
// get max card factor and probes estimated max cardinality
CostScalar maxCardFactor = (ActiveSchemaDB()->getDefaults()).getAsDouble(NCM_NJ_PROBES_MAXCARD_FACTOR);
CostScalar maxCardOfProbes = (getContext().getInputLogProp())->getMaxCardEst();
if ((getPartialOrderProbesFlag() &&
maxCardOfProbes != -1 &&
maxCardOfProbes / numOuterProbes < maxCardFactor &&
allowNJ) || foundKey)
{
if (isUnique)
tuplesProcessed += numSuccessfulProbes;
else
tuplesProcessed += accessedRows;
if (isUnique && numBlocksAccessedByUniqueProbes <= cacheSize)
{
ioRand = numBlocksAccessedByUniqueProbes;
}
else
{
ioRand = numRandIOs;
ioSeq = numSuccessfulProbes * (numBlocksPerSuccessfulProbe - 1);
}
}
else
{
// No appropriate index, full table (or covering index) scan,
// involves sequential IOs. For every probe, the whole right side
// is accessed. For covering indexes, the row size will be typically
// smaller than the row size of the base table, so the number of blocks
// will less, making it the cheaper alternative.
tuplesProcessed += (numProbes * effectiveTotalRowCount);
ioSeq = numProbes * effectiveTotalBlocks;
ioRand = numRandIOs;
}
}
}
// set the field before it is being mutiplied by the row size factor
setTuplesProcessed(tuplesProcessed*numActivePartitions);
// Store in ScanOptimizer object. FileScan will later grab this value.
ioSeq = MAXOF(ioSeq, 0);
setNumberOfBlocksToReadPerAccess(ioSeq.minCsOne());
// Factor in row sizes.
CostScalar rowSize = recordSizeInKb_ * csOneKiloBytes;
CostScalar probeRowSize = charInputs.getRowLength();
CostScalar outputRowSize = getRelExpr().getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(probeRowSize + outputRowSize);
tuplesProcessed *= rowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
CostScalar seqIORowSizeFactor = scmRowSizeFactor(rowSize, SEQ_IO_ROWSIZE_FACTOR);
CostScalar randIORowSizeFactor = scmRowSizeFactor(rowSize, RAND_IO_ROWSIZE_FACTOR);
ioSeq *= seqIORowSizeFactor;
ioRand *= randIORowSizeFactor;
LogPhysPartitioningFunction *logPhysPartFunc =
(LogPhysPartitioningFunction *) // cast away const
getContext().getPlan()->getPhysicalProperty()->getPartitioningFunction()->
castToLogPhysPartitioningFunction();
if (logPhysPartFunc != NULL)
{
PartitioningFunction* logPartFunc = logPhysPartFunc->getLogPartitioningFunction();
CostScalar numParts = logPartFunc->getCountOfPartitions();
CostScalar serialNJFactor = ActiveSchemaDB()->getDefaults().getAsDouble(NCM_SERIAL_NJ_FACTOR);
if (isAnIndexJoin && numParts == 1)
{
tuplesProcessed *= serialNJFactor;
tuplesProduced *= serialNJFactor;
ioRand *= serialNJFactor;
ioSeq *= serialNJFactor;
}
}
if (isProbeCacheApplicable())
{
CostScalar pcCostAdjFactor = getProbeCacheCostAdjFactor();
tuplesProduced *= pcCostAdjFactor;
ioRand *= pcCostAdjFactor;
ioSeq *= pcCostAdjFactor;
}
Cost* scanCostMultiProbes =
scmCost(tuplesProcessed, tuplesProduced, csZero, ioRand, ioSeq, numProbes,
rowSize, csZero, outputRowSize, probeRowSize);
// temporary fix to cost index join cheaper than base table scan.
// when we allow salted indexes, then this can be removed
if (isAnIndexJoin &&
getIndexDesc()->getPrimaryTableDesc()->getNATable()->isHbaseTable())
{
CostScalar redFactor = CostScalar(1.0) /
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_IND_JOIN_COST_ADJ_FACTOR);
scanCostMultiProbes->cpScmlr().scaleByValue(redFactor);
}
// temporary fix to cost index scan cheaper than base table scan.
// when we allow salted indexes, then this can be removed
if ( !(getIndexDesc()->isClusteringIndex()) &&
getIndexDesc()->getPrimaryTableDesc()->getNATable()->isHbaseTable())
{
CostScalar redFactor = CostScalar(1.0) /
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_IND_SCAN_COST_ADJ_FACTOR);
scanCostMultiProbes->cpScmlr().scaleByValue(redFactor);
}
// fix Bugzilla #1110.
setEstRowsAccessed(getDataRows());
return scanCostMultiProbes;
} // SimpleFileScanOptimizer::scmComputeCostVectorsMultiProbes(...)
// Compute the Cost for this single subset Scan using the simple cossting model.
//
// Attempts to find an existing basic cost object which can be reused.
//
// Computes or reuses the last row cost vector.
//
// OUTPUTS:
// Return - A NON-NULL Cost* representing the Cost for this scan node
//
// Side-Affects - computes and sets the numberOfBlocksToReadPerAccess_
// data member of ScanOptimizer. This value will be captured by the
// FileScan node and passed to DP2 by the executor, DP2 uses it to
// decide whether it will do read ahead or not.
//
Cost*
SimpleFileScanOptimizer::scmComputeCostForSingleSubset()
{
// Determine if this scan is receiving multiple probes. If so,
// use the MultiProbe Scan Optimizer methods. Other wise use the
// single probe methods.
//
Cost *scanCost;
CostScalar repeatCount =
getContext().getPlan()->getPhysicalProperty()->
getDP2CostThatDependsOnSPP()->getRepeatCountForOperatorsInDP2();
NABoolean multiProbeScan = repeatCount.isGreaterThanOne() OR
(getContext().getInputLogProp()->getColStats().entries() > 0);
//Bugzilla 1110: either method called below will call setEstRowsAccessed()
//to set the estimated rows access for this object (SimpleFileScanOptimizer)
if ( multiProbeScan )
{
scanCost = scmComputeCostVectorsMultiProbes();
}
else
{
scanCost = scmComputeCostVectors();
}
CostScalar skewFactor = 1.0;
if (CURRSTMT_OPTDEFAULTS->incorporateSkewInCosting())
{
// ompute multiplicative factor = probesAtBusiestStream/ProbesPerScan_
// Multiply the last row cost by the factor
// Note that the last row cost is per Partition
CostScalar probesAtBusyStream
= getContext().getPlan()->getPhysicalProperty()->
getDP2CostThatDependsOnSPP()->
getProbesAtBusiestStream();
CostScalar probesPerScan = scanCost->getScmCplr().getNumProbes();
// probes must be > 0 if not assert.
DCMPASSERT(probesPerScan.isGreaterThanZero());
probesPerScan = MAXOF(probesPerScan, 1);
skewFactor = (probesAtBusyStream/probesPerScan).minCsOne();
if (CmpCommon::getDefault(NCM_SKEW_COST_ADJ_FOR_PROBES) == DF_ON)
scanCost->cpScmlr().scaleByValue(skewFactor);
}
return scanCost;
} // SimpleFileScanOptimizer::SCMComputeCostForSingleSubset()
Cost*
FileScanOptimizer::scmComputeCostForSingleSubset()
{
SimpleFileScanOptimizer *sfso =
new (STMTHEAP) SimpleFileScanOptimizer(getFileScan(),
getResultSetCardinality(),
getContext(),
getExternalInputs());
SearchKey *searchKey = NULL;
MdamKey *mdamKey = NULL;
Cost* c =
sfso->optimize(searchKey, mdamKey);//scmComputeCostForSingleSubset();
// transfer various counters from sfso to this (the FileScan optimizer)
setProbes(sfso->getProbes());
setSuccessfulProbes(sfso->getSuccessfulProbes());
setUniqueProbes(sfso->getUniqueProbes());
setDuplicateSuccProbes(sfso->getDuplicateSuccProbes());
setTuplesProcessed(sfso->getTuplesProcessed());
setEstRowsAccessed(sfso->getEstRowsAccessed());
setSingleSubsetSize(sfso->getSingleSubsetSize());
return c;
} // FileScanOptimizer::ScmComputeCostForSingleSubset()
// FileScanOptimizer::scmComputeMDAMForHbase()
// Compute MDAM cost for Hbase Scan
// Computes:
// -- number of tuples processed by Region Server
// -- number of tuples produced by Region Server
// -- sequential IOs by Region Server
// -- random IOs by Region Server
//
// -- number of tuples processed by Hbase client
// -- number of tuples produced by Hbase client
// -- number of tuples received by Hbase client
//
// OUTPUTS: SimpleCostVectors of a new Cost object are populated.
Cost*
FileScanOptimizer::scmComputeMDAMCostForHbase(
CostScalar &totalRows,
CostScalar & seeks,
CostScalar & seq_kb_read,
CostScalar& incomingProbes)
{
// Hbase Scan cost :
// Cost incurred at client side (Master or ESP)
// +
// Cost incurred at Server side (Hbase Region Server a.k.a HRS)
//
// Server side cost : CPU cost + IO cost
// CPU cost = Tuples processed +
// Tuples produced (same as processed for now, will be different
// when predicates pushed down to HRS
// IO cost = Seq IO cost + random IO cost
// Seq IO cost = number of blocks need to be read to retrive total rows
// qualified by all disjuncts
// random IO cost = seeks incurred by all disjuncts of all key columns
// (Random IO is a function of UEC of key cols with preds missing)
//
// Client side cost : CPU cost + Msg Cost
// CPU cost = Tuples processed for all disjuncts +
// Tuples produced after applying executor predicates for all disjuncts
// Msg cost = Tuples received from HRS for all disjuncts
// estimate HRS cost
CostScalar tcProcInHRS, tcProdInHRS, seqIOsInHRS, randomIOsInHRS = csZero;
tcProcInHRS = totalRows;
tcProdInHRS = totalRows; // will be different when predicates pushed down to HRS
seqIOsInHRS = seq_kb_read / getIndexDesc()->getBlockSizeInKb();
randomIOsInHRS = seeks;
// estimate Hbase Client Side (HCS)cost
CostScalar tcProcInHCS, tcProdInHCS, tcRcvdByHCS = csZero;
tcProcInHCS = tcProdInHRS;
tcProdInHCS = MINOF(getResultSetCardinality(), totalRows);
tcRcvdByHCS = tcProdInHRS;
// factor in row sizes;
CostScalar rowSize = getIndexDesc()->getRecordSizeInKb() * csOneKiloBytes;
CostScalar outputRowSize = getRelExpr().getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar seqIORowSizeFactor = scmRowSizeFactor(rowSize, SEQ_IO_ROWSIZE_FACTOR);
CostScalar randIORowSizeFactor = scmRowSizeFactor(rowSize, RAND_IO_ROWSIZE_FACTOR);
// for HRS
tcProcInHRS *= rowSizeFactor;
tcProdInHRS *= rowSizeFactor;
seqIOsInHRS *= seqIORowSizeFactor;
randomIOsInHRS *= randIORowSizeFactor;
// for HCS
tcProcInHCS *= rowSizeFactor;
tcProdInHCS *= outputRowSizeFactor;
tcRcvdByHCS *= rowSizeFactor;
// normalize it by #region servers for HRS
CollIndex HRSPartitions = getEstNumActivePartitionsAtRuntimeForHbaseRegions();
tcProcInHRS = (tcProcInHRS / HRSPartitions).getCeiling();
tcProdInHRS = (tcProdInHRS / HRSPartitions).getCeiling();
seqIOsInHRS = (seqIOsInHRS / HRSPartitions).getCeiling();
randomIOsInHRS = (randomIOsInHRS / HRSPartitions).getCeiling();
// normalize it by DoP for HCS
CollIndex HCSPartitions = getEstNumActivePartitionsAtRuntime();
tcProcInHCS = (tcProcInHCS / HCSPartitions).getCeiling();
tcProdInHCS = (tcProdInHCS / HCSPartitions).getCeiling();
tcRcvdByHCS = (tcRcvdByHCS / HCSPartitions).getCeiling();
CostScalar tuplesProcessed = tcProcInHRS + tcProcInHCS;
CostScalar tuplesProduced = tcProdInHRS + tcProdInHCS;
CostScalar probesPerPartition = (incomingProbes/HRSPartitions).getCeiling();
Cost* hbaseMdamCost =
scmCost(tuplesProcessed, tuplesProduced, tcRcvdByHCS, randomIOsInHRS,
seqIOsInHRS, probesPerPartition, rowSize, csZero, outputRowSize, csZero);
return hbaseMdamCost;
}
// SimpleFileScanOptimizer::scmComputeCostVectorsForHbase()
// Compute operator cost for Hbase Scan.
// Computes:
// -- number of tuples processed by Region Server
// -- number of tuples produced by Region Server
// -- sequential IOs by Region Server
//
// -- number of tuples processed by Hbase client
// -- number of tuples produced by Hbase client
// -- number of tuples received by Hbase client
//
// OUTPUTS: SimpleCostVectors of a new Cost object are populated.
Cost *
SimpleFileScanOptimizer::scmComputeCostVectorsForHbase()
{
// Hbase Scan cost :
// Cost incurred at client side (Master or ESP)
// +
// Cost incurred at Server side (Hbase Region Server a.k.a HRS)
//
// Server side cost : CPU cost + IO cost
// CPU cost = Tuples processed +
// Tuples produced (same as processed for now, will be different
// when predicates pushed down to HRS
// Seq IO cost = number of blocks need to be read get "Tuples processed"
//
// Client side cost : CPU cost + Msg Cost
// CPU cost = Tuples processed +
// Tuples produced after applying executor predicates
// Msg cost = Tuples received from HRS
// estimate HRS cost
CostScalar tcProcInHRS, tcProdInHRS, seqIOsInHRS = csZero;
tcProcInHRS = getSingleSubsetSize();
tcProdInHRS = tcProcInHRS; // will be different when predicates pushed down to HRS
seqIOsInHRS = getNumBlocksForRows(tcProcInHRS);
// estimate Hbase Client Side (HCS)cost
CostScalar tcProcInHCS, tcProdInHCS, tcRcvdByHCS = csZero;
tcProcInHCS = tcProdInHRS;
tcProdInHCS = getResultSetCardinality();
tcRcvdByHCS = tcProdInHRS;
// heuristics to favor serial plans for small queries
NABoolean costParPlanSameAsSer = FALSE;
CostScalar parPlanRcLowerLimit = 2.0 *
ActiveSchemaDB()->getDefaults().getAsDouble(NUMBER_OF_ROWS_PARALLEL_THRESHOLD);
if ( tcRcvdByHCS <= parPlanRcLowerLimit AND
(CmpCommon::getDefault(NCM_HBASE_COSTING) == DF_ON) )
costParPlanSameAsSer = TRUE;
// factor in row sizes;
CostScalar rowSize = recordSizeInKb_ * csOneKiloBytes;
CostScalar outputRowSize = getRelExpr().getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar seqIORowSizeFactor = scmRowSizeFactor(rowSize, SEQ_IO_ROWSIZE_FACTOR);
// for HRS
tcProcInHRS *= rowSizeFactor;
tcProdInHRS *= rowSizeFactor;
seqIOsInHRS *= seqIORowSizeFactor;
// for HCS
tcProcInHCS *= rowSizeFactor;
tcProdInHCS *= outputRowSizeFactor;
tcRcvdByHCS *= rowSizeFactor;
// some book keeping
setProbes(1);
setTuplesProcessed(getSingleSubsetSize());
setEstRowsAccessed(getSingleSubsetSize());
setNumberOfBlocksToReadPerAccess(seqIOsInHRS);
// normalize it by #region servers for HRS
CollIndex HRSPartitions = getEstNumActivePartitionsAtRuntimeForHbaseRegions();
tcProcInHRS = (tcProcInHRS / HRSPartitions).getCeiling();
tcProdInHRS = (tcProdInHRS / HRSPartitions).getCeiling();
seqIOsInHRS = (seqIOsInHRS / HRSPartitions).getCeiling();
// normalize it by DoP for HCS
CollIndex HCSPartitions = getEstNumActivePartitionsAtRuntime();
if (costParPlanSameAsSer)
HCSPartitions = 1;
tcProcInHCS = (tcProcInHCS / HCSPartitions).getCeiling();
tcProdInHCS = (tcProdInHCS / HCSPartitions).getCeiling();
tcRcvdByHCS = (tcRcvdByHCS / HCSPartitions).getCeiling();
// compute HRS cost + HCS cost
// Total CPU cost
CostScalar tuplesProcessed = tcProcInHRS + tcProcInHCS;
CostScalar tuplesProduced = tcProdInHRS + tcProdInHCS;
// Total IO cost = seqIOsInHRS
// Total Msg cost = tcRcvdByHCS
Cost* hbaseScanCost =
scmCost(tuplesProcessed, tuplesProduced, tcRcvdByHCS, csZero,
seqIOsInHRS, csOne, rowSize, csZero, outputRowSize, csZero);
// temporary fix to cost index scan cheaper than base table scan.
// when we allow salted indexes, then this can be removed
if ( !(getIndexDesc()->isClusteringIndex()) )
{
CostScalar redFactor = CostScalar(1.0) /
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_IND_SCAN_COST_ADJ_FACTOR);
hbaseScanCost->cpScmlr().scaleByValue(redFactor);
}
return hbaseScanCost;
}
// SimpleFileScanOptimizer::scmComputeCostVectorsMultiProbesForHbase()
// Computes the cost for multi-probe scan using NCM for Hbase Scan.
// Computes:
// -- number of tuples processed by Region Server for all probes
// -- number of tuples produced by Region Server for all probes
// -- sequential IOs by Region Server for all probes
// -- random IOs by Region Server for all probes
//
// -- number of tuples processed by Hbase client
// -- number of tuples produced by Hbase client
// -- number of tuples received by Hbase client
//
// OUTPUTS: SimpleCostVectors of a new Cost object are populated.
Cost *
SimpleFileScanOptimizer::scmComputeCostVectorsMultiProbesForHbase()
{
// Hbase Scan cost :
// Cost incurred at client side (Master or ESP)
// +
// Cost incurred at Server side (Hbase Region Server a.k.a HRS)
//
// Server side cost : CPU cost + IO cost
// CPU cost = Tuples processed +
// Tuples produced (same as processed for now, will be different
// when predicates pushed down to HRS
// IO cost = Seq IO cost + Random IO cost
// Seq IO cost = number of blocks need to be read get "Tuples processed"
// Random IO cost = number of seeks needed to get "Tuples processed"
//
// Client side cost : CPU cost + Msg Cost
// CPU cost = Tuples processed +
// Tuples produced after applying executor predicates
// Msg cost = Tuples received from HRS
// estimate HRS cost
CostScalar tcProcInHRS, tcProdInHRS, seqIOsInHRS, randIOsInHRS = csZero;
NABoolean isUnique = getSearchKey()->isUnique();
NABoolean isAnIndexJoin;
// helping data members needed to compute cost vectors
estimateEffTotalRowCount(totalRowCount_, effectiveTotalRowCount_);
CostScalar effectiveTotalBlocks =
getNumBlocksForRows(effectiveTotalRowCount_).getCeiling();
categorizeMultiProbes(&isAnIndexJoin);
CostScalar numProbes = probes_;
CostScalar numUniqueProbes = uniqueProbes_;
CostScalar numSuccessfulProbes = successfulProbes_;
CostScalar numUniqueSuccessfulProbes = (successfulProbes_ - duplicateSuccProbes_);
CostScalar estBolcksByUniqProbes = MINOF(uniqueProbes_, effectiveTotalBlocks);
const CostScalar uniqBlocks = numUniqueSuccessfulProbes * blksPerSuccProbe_;
CostScalar lowerBoundBlockCount = uniqBlocks + getFailedProbes();
tcProcInHRS = numProbes;
CostScalar accessedRows = getDataRows();
tcProdInHRS = getResultSetCardinality();
CollIndex HRSPartitions = getEstNumActivePartitionsAtRuntimeForHbaseRegions();
// 52 blocks cache per region
// Assumption : heap size of RS = 8GB
// data block cache = 20% * 8GB => 1.6GB
// cache in blocks = 1.6GB / 64KB (hbase block size) => 26214 blocks
// Assume on avg RS services 500 regions = 26214/500 ~= 52 blocks per region
CostScalar cacheSize = ActiveSchemaDB()->getDefaults().getAsDouble(NCM_CACHE_SIZE_IN_BLOCKS);
const CostScalar cacheSizeForAll(cacheSize * HRSPartitions);
CostScalar cacheHitProbability = (cacheSizeForAll/uniqBlocks).maxCsOne();
// logic for estimating randomIOs
if (isUnique)
{
// each probe brings no more than one data block
if (estBolcksByUniqProbes <= cacheSizeForAll)
randIOsInHRS = estBolcksByUniqProbes;
else
{
CostScalar cacheMissProbabilityForUniqueProbe =
((estBolcksByUniqProbes - cacheSizeForAll)
/estBolcksByUniqProbes).minCsZero();
randIOsInHRS = cacheSizeForAll + // First probes to fill the cache
(getProbes() - cacheSizeForAll) * // rest could miss
cacheMissProbabilityForUniqueProbe;
}
}
else
{
// SearchKey is not unique. In this case each successful probe could bring several
// (or even all) // blocks of inner table.
if (uniqBlocks <= cacheSizeForAll)
randIOsInHRS = estBolcksByUniqProbes;
else
// table does not fit in cache, extra seeks needed.
randIOsInHRS = getUniqueProbes() +
getDuplicateSuccProbes() * (csOne - cacheHitProbability);
}
// logic for estimating seqIOs
// we also modify tcProcInHRS
if ( (effectiveTotalBlocks <= cacheSizeForAll) OR // Whole table fits in cache
(lowerBoundBlockCount <= cacheSizeForAll) OR // all blocks accessed fits cache
// Duplicate probe finds data in cache
((getBlksPerSuccProbe() <= cacheSizeForAll) AND getInOrderProbesFlag())
)
{
// Full cache benefit
seqIOsInHRS = MINOF(effectiveTotalBlocks, lowerBoundBlockCount);
if (isUnique)
tcProcInHRS += numSuccessfulProbes;
else
tcProcInHRS += accessedRows;
}
else if ((getBlksPerSuccProbe() > cacheSize) &&
getInOrderProbesFlag())
{
seqIOsInHRS = lowerBoundBlockCount;
tcProcInHRS += accessedRows;
}
else if (isLeadingKeyColCovered())
{
const CostScalar extraDataBlocks =
(getDuplicateSuccProbes() * getBlksPerSuccProbe()) * (csOne - cacheHitProbability);
seqIOsInHRS = lowerBoundBlockCount + extraDataBlocks;
tcProcInHRS += accessedRows;
}
else
{
// worst case scenario
tcProcInHRS += (numProbes * effectiveTotalRowCount_);
seqIOsInHRS = numProbes * effectiveTotalBlocks;
}
tcProdInHRS = accessedRows; // will be different when predicates pushed down to HRS
// estimate Hbase Client Side (HCS)cost
CostScalar tcProcInHCS, tcProdInHCS, tcRcvdByHCS = csZero;
tcProcInHCS = tcProdInHRS;
tcProdInHCS = getResultSetCardinality();
tcRcvdByHCS = tcProdInHRS;
// heuristics to favor serial plans for small queries
NABoolean costParPlanSameAsSer = FALSE;
CostScalar parPlanRcLowerLimit = 2.0 *
ActiveSchemaDB()->getDefaults().getAsDouble(NUMBER_OF_ROWS_PARALLEL_THRESHOLD);
if ( tcRcvdByHCS <= parPlanRcLowerLimit )
costParPlanSameAsSer = TRUE;
// factor in row sizes;
CostScalar rowSize = recordSizeInKb_ * csOneKiloBytes;
CostScalar outputRowSize = getRelExpr().getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar seqIORowSizeFactor = scmRowSizeFactor(rowSize, SEQ_IO_ROWSIZE_FACTOR);
CostScalar randIORowSizeFactor = scmRowSizeFactor(rowSize, RAND_IO_ROWSIZE_FACTOR);
// some book keeping
// set the field before it is being mutiplied by the row size factor
setTuplesProcessed(tcProcInHRS+tcProcInHCS);
setEstRowsAccessed(getDataRows());
setNumberOfBlocksToReadPerAccess(seqIOsInHRS);
// for HRS
tcProcInHRS *= rowSizeFactor;
tcProdInHRS *= rowSizeFactor;
seqIOsInHRS *= seqIORowSizeFactor;
randIOsInHRS *= randIORowSizeFactor;
// for HCS
tcProcInHCS *= rowSizeFactor;
tcProdInHCS *= outputRowSizeFactor;
tcRcvdByHCS *= rowSizeFactor;
// normalize it by #region servers for HRS
tcProcInHRS = (tcProcInHRS / HRSPartitions).getCeiling();
tcProdInHRS = (tcProdInHRS / HRSPartitions).getCeiling();
seqIOsInHRS = (seqIOsInHRS / HRSPartitions).getCeiling();
randIOsInHRS = (randIOsInHRS / HRSPartitions).getCeiling();
// normalize it by DoP for HCS
CollIndex HCSPartitions = getEstNumActivePartitionsAtRuntime();
if (costParPlanSameAsSer)
HCSPartitions = 1;
tcProcInHCS = (tcProcInHCS / HCSPartitions).getCeiling();
tcProdInHCS = (tcProdInHCS / HCSPartitions).getCeiling();
// compute HRS cost + HCS cost
// Total CPU cost
CostScalar tuplesProcessed = tcProcInHRS + tcProcInHCS;
CostScalar tuplesProduced = tcProdInHRS + tcProdInHCS;
// Total IO cost = seqIOsInHRS + randIOsInHRS
// Total Msg cost = tcRcvdByHCS
Cost* hbaseMultiProbeScanCost =
scmCost(tuplesProcessed, tuplesProduced, tcRcvdByHCS, randIOsInHRS,
seqIOsInHRS, csOne, rowSize, csZero, outputRowSize, csZero);
if (isProbeCacheApplicable())
{
CostScalar pcCostAdjFactor = getProbeCacheCostAdjFactor();
hbaseMultiProbeScanCost->cpScmlr().scaleByValue(pcCostAdjFactor);
}
// cost un-split index_scan cheaper than base table scan cost if index_scan
// selectivity < NCM_IND_SCAN_SELECTIVITY
if ( !(getIndexDesc()->isClusteringIndex()) && HRSPartitions == 1)
{
CostScalar sel = accessedRows / totalRowCount_;
CostScalar indScanSelThreshold =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_IND_SCAN_SELECTIVITY);
if ( indScanSelThreshold != -1 AND sel < indScanSelThreshold )
hbaseMultiProbeScanCost->cpScmlr().scaleByValue(sel);
}
// to do : get number of index partitions:
/*
if (isAnIndexJoin && num_index_partitions == 1)
{
CostScalar sel = probes_ / totalRowCount_;
CostScalar indJoinSelThreshold =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_IND_JOIN_SELECTIVITY);
if ( sel < indJoinSelThreshold )
hbaseMultiProbeScanCost->cpScmlr().scaleByValue(sel);
}
*/
return hbaseMultiProbeScanCost;
}
/**********************************************************************/
/* */
/* CostMethodExchange */
/* */
/**********************************************************************/
//<pb>
//==============================================================================
// Compute operator cost for a specified Exchange operator.
//
// Input:
// op -- pointer to specified Exchange operator.
//
// myContext -- pointer to optimization context for this Exchange
// operator.
//
// Output:
// countOfStreams -- degree of parallelism for this Exchange (i.e. number of
// consumers for this Exchange.)
//
// Return:
// Pointer to computed cost object for this exchange operator.
//
//==============================================================================
Cost*
CostMethodExchange::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
//----------------------------------------------------------------------
// Defensive programming.
//----------------------------------------------------------------------
CMPASSERT( op != NULL );
CMPASSERT( myContext != NULL );
//-----------------------------------
// Downcast to an Exchange operator.
//-----------------------------------
Exchange* exch = (Exchange*)op;
isOpBelowRoot_ = (*CURRSTMT_OPTGLOBALS->memo)[myContext->getGroupId()]->isBelowRoot();
const CostScalar & numOfProbes =
( myContext->getInputLogProp()->getResultCardinality() ).minCsOne();
const ReqdPhysicalProperty* rpp = myContext->getReqdPhysicalProperty();
sppForMe_ = (PhysicalProperty *) myContext->getPlan()->getPhysicalProperty();
//------------------------------------------------------------------------
// If we have not synthesized physical properties, we are going down
// the tree, return an empty cost for now. Is this OK??
//------------------------------------------------------------------------
if ( NOT sppForMe_ )
return new STMTHEAP Cost();
//------------------------------------------------------------
// Get the physical properties for the child of the Exchange.
//------------------------------------------------------------
const PhysicalProperty* sppForChild =
myContext->getPhysicalPropertyOfSolutionForChild(0);
sppForChild_ = (PhysicalProperty*) sppForChild;
numOfProbes_ = (CostScalar )numOfProbes;
NABoolean executeInDP2 = sppForChild->executeInDP2();
const PartitioningFunction* const childPartFunc =
sppForChild->getPartitioningFunction();
PartitioningFunction* const myPartFunc =
myContext->getPlan()->getPhysicalProperty()->getPartitioningFunction();
const PartitioningFunction* const bottomPartFunc = exch->getBottomPartitioningFunction();
//--------------------------------------------------------------------------
// Compute number of consumer processes associated with this Exchange.
//--------------------------------------------------------------------------
const CostScalar& numOfConsumers = ((NodeMap *)(myPartFunc->getNodeMap()))->
getNumActivePartitions();
const CostScalar& numOfPartitions = ((NodeMap *)(childPartFunc->getNodeMap()))->
getEstNumActivePartitionsAtRuntime();
const CostScalar& numOfProducers = MINOF( numOfPartitions ,sppForChild->getCurrentCountOfCPUs() );
//---------------------------------------------
// Determine number of rows produced by child.
//---------------------------------------------
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
CMPASSERT( inputLP );
EstLogPropSharedPtr childOutputLP = exch->child(0).outputLogProp(inputLP);
const CostScalar& totalRowCount = (childOutputLP->getResultCardinality()).getCeiling();
//---------------------------------------------------------------
//---------------------------------------------------------------
// Exchange operator's number of streams is parent's degree of
// parallelism (i.e. number of concurrently executing consumers).
//---------------------------------------------------------------
countOfStreams = Lng32(numOfConsumers.getValue());
//---------------------------------------------
// set output resultset cardinality as active streams, this value
// will be used in ColStatDescList::getCardOfBusiestStream() if
// partitioning key is a random number.
//---------------------------------------------
long randomFix = ActiveSchemaDB()->getDefaults().getAsLong(COMP_INT_26);
if (NOT executeInDP2 && (randomFix != 0))
{
CostScalar estDop = totalRowCount;
CostScalar maxCard = childOutputLP->getMaxCardEst();
// use the max cardinality, somewhat corrected for risk by taking
// geometric mean of card and maxCard.
if (maxCard != -1)
{
estDop = sqrt((estDop * maxCard).getValue());
estDop.getFloor();
}
// if estDop > countOfStreams, then take countOfStreams
estDop = MINOF((CostScalar)countOfStreams, estDop);
myPartFunc->setActiveStreams(estDop);
}
//---------------------------------------------------------------------
// Compute the number of "up" tuples sent.
// "up" tuples sent are the number of tuples sent from the child to the parent.
//---------------------------------------------------------------------
//------------------------------------------------------------
// Get default values needed for subsequent Exchange costing.
//------------------------------------------------------------
CostScalar messageSpacePerRecordInKb;
CostScalar messageHeaderInKb;
CostScalar messageBufferSizeInKb;
getDefaultValues(executeInDP2,
exch,
messageSpacePerRecordInKb,
messageHeaderInKb,
messageBufferSizeInKb);
CostScalar espFixUpWeight = ActiveSchemaDB()->getDefaults().getAsDouble(NCM_ESP_FIXUP_WEIGHT);
NABoolean espStartFix = (CmpCommon::getDefault(NCM_ESP_STARTUP_FIX) == DF_ON);
CostScalar espFixupCost = csZero;
if (espStartFix)
espFixupCost = espFixUpWeight * numOfConsumers;
else
espFixupCost = espFixUpWeight * (numOfProducers + numOfConsumers);
CostScalar tuplesProcessed = csZero;
CostScalar tuplesProduced = csZero;
CostScalar upTuplesSent = csZero;
CostScalar origUpTuplesSent = csZero;
NADefaults &defs1 = ActiveSchemaDB()->getDefaults();
if (NOT executeInDP2)
{
exch->computeBufferLength(myContext,
numOfConsumers,
numOfProducers,
upMessageBufferLength_,
downMessageBufferLength_);
upTuplesSent = scmComputeUpTuplesSent(myContext,
exch,
myPartFunc,
childPartFunc,
numOfConsumers,
numOfProducers);
origUpTuplesSent = upTuplesSent;
}
else
{
upMessageBufferLength_= messageBufferSizeInKb;
downMessageBufferLength_= csOne;
if (childPartFunc)
{
const LogPhysPartitioningFunction *lpf = childPartFunc->castToLogPhysPartitioningFunction();
if (lpf != NULL AND lpf->getUsePapa())
{
//PAPA (SplitTop)
upTuplesSent = totalRowCount/numOfConsumers;
// Is merging of sorted streams possibly needed?
// Add in tuplesProcessed to account for the merging of sorted streams.
isMergeNeeded_ = (sppForMe_->getSortOrderType() != DP2_SOT) &&
(NOT sppForMe_->getSortKey().isEmpty());
// get weight for merging of sorted streams by each ESP
CostScalar mergeFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_EXCH_MERGE_FACTOR);
if ( isMergeNeeded_ AND numOfProducers > numOfConsumers )
{
tuplesProcessed = upTuplesSent * mergeFactor;
}
// Fix for the serial plan issue with split tops.
// This was an issue seen with TPCH Q1, the serial plan looks
// like a very good plan, but for some reason performs poorly. Bob W.
// thinks it may be the "wave pattern" issue. Adjust costs by
// adding "fixup" costs to compensate.
if ( (CmpCommon::getDefault(COMP_BOOL_202) == DF_OFF) AND
(numOfConsumers == csOne) AND
(numOfProducers > numOfConsumers) )
// (rpp->getPlanExecutionLocation() == EXECUTE_IN_MASTER) ) //isOpBelowRoot_)
upTuplesSent += espFixupCost;
}
}
}
if (NOT executeInDP2)
{
// Fixup Costs for esp_exchanges.
upTuplesSent += espFixupCost;
}
CostScalar inputRowSize = exch->child(0).getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(inputRowSize);
tuplesProcessed *= rowSizeFactor;
upTuplesSent *= rowSizeFactor;
NABoolean ndcsFixOff = (CmpCommon::getDefault(NCM_EXCH_NDCS_FIX) == DF_OFF);
if (ndcsFixOff)
origUpTuplesSent = upTuplesSent;
CostScalar adj1 = defs1.getAsLong(COMP_INT_62);
CostScalar parAdj = defs1.getAsDouble(NCM_PAR_ADJ_FACTOR);
if (adj1 == csZero)
adj1=1000000;
if ( (numOfConsumers == csOne) AND
(numOfProducers > numOfConsumers) AND
isOpBelowRoot_ AND
origUpTuplesSent > adj1 &&
parAdj > 0)
{
upTuplesSent = parAdj ;
}
// adjust the Last Row / First Row Cost for an exchange
// operator on top of a Partial GroupBy Leaf node
const RelExpr * myImmediateChild = myContext->
getPhysicalExprOfSolutionForChild(0);
const RelExpr * myGrandChild = myContext->
getPhysicalExprOfSolutionForGrandChild(0,0);
ValueIdSet immediateChildPartKey =
childPartFunc->getPartitioningKey();
const PhysicalProperty* sppForGrandChild =
myContext->
getPhysicalPropertyOfSolutionForGrandChild(0,0);
PartitioningFunction * grandChildPartFunc =
(sppForGrandChild?
sppForGrandChild->getPartitioningFunction():
NULL);
ValueIdSet grandChildPartKey;
if(grandChildPartFunc)
grandChildPartKey =
grandChildPartFunc->
getPartitioningKey();
NABoolean myChildIsExchange = FALSE;
NABoolean myChildIsSortOnTopOfHashPartGbyLeaf = FALSE;
if (myImmediateChild)
{
if ((myImmediateChild->getOperatorType() == REL_SORT) &&
myGrandChild &&
(myGrandChild->getOperatorType() == REL_HASHED_GROUPBY) &&
(((GroupByAgg*)myGrandChild)->isAPartialGroupByLeaf()) &&
(CmpCommon::getDefault(COMP_BOOL_103) == DF_OFF))
myChildIsSortOnTopOfHashPartGbyLeaf = TRUE;
if ((myImmediateChild->getOperatorType() == REL_EXCHANGE) &&
(CmpCommon::getDefault(COMP_BOOL_186) == DF_ON))
myChildIsExchange = TRUE;
}
// childToConsider will be either the immediate child of this
// exchange node, or the grand child. It will be the grand child
// in case there is another exchange below this exchange i.e.
// this exchange is on top of a PA. The grand child is only used
// in case we want to influence the cost for partial grouping in
// dp2. By default we don't adjust the cost for partial grouping
// in dp2, but if COMP_BOOL_186 is ON then we allow cost adjustments
// for exchange on top partial grouping in DP2.
const RelExpr * childToConsider =
((myChildIsExchange || myChildIsSortOnTopOfHashPartGbyLeaf)?
myGrandChild:myImmediateChild);
ValueIdSet bottomPartKey =
((myChildIsExchange)?grandChildPartKey:immediateChildPartKey);
if (childToConsider &&
(!executeInDP2) &&
((childToConsider->getOperatorType() == REL_HASHED_GROUPBY)||
(childToConsider->getOperatorType() == REL_ORDERED_GROUPBY))&&
(((GroupByAgg*)childToConsider)->isAPartialGroupByLeaf()))
{
ValueIdSet childGroupingColumns =
((GroupByAgg*)childToConsider)->groupExpr();
NABoolean childMatchesPartitioning = FALSE;
if (childGroupingColumns.contains(bottomPartKey))
childMatchesPartitioning = TRUE;
if (!childMatchesPartitioning &&
(CmpCommon::getDefault(NCM_PAR_GRPBY_ADJ) == DF_ON))
{
CostScalar grpByAdjFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(ROBUST_PAR_GRPBY_EXCHANGE_FCTR);
tuplesProcessed *= grpByAdjFactor;
upTuplesSent *= grpByAdjFactor;
}
}
Cost* exchangeCost =
scmCost(tuplesProcessed, tuplesProduced, upTuplesSent, csZero, csZero, numOfProbes,
inputRowSize, csZero, inputRowSize, csZero);
return exchangeCost;
} // CostMethodExchange::scmComputeOperatorCostInternal()
//<pb>
//==============================================================================
// Compute number of tuples sent from child of a specified exchange operator
// up to its parent.
//
// Input:
// parentContext -- pointer to optimization context for specified
// Exchange operator.
//
// exch -- pointer to specified Exchange operator.
//
//
// parentPartFunc -- pointer to parent's partitioning function.
//
// childPartFunc -- pointer to child's partitioning function.
//
// numOfConsumers -- number parent processes actually receiving
// messages.
// Return:
// Number of tuples sent from child up to parent.
//
//==============================================================================
CostScalar
CostMethodExchange::scmComputeUpTuplesSent(
const Context* parentContext,
Exchange* exch,
const PartitioningFunction* parentPartFunc,
const PartitioningFunction* childPartFunc,
const CostScalar & numOfConsumers,
const CostScalar & numOfProducers) const
{
//----------------------------------------------------------------------
// Defensive programming.
//----------------------------------------------------------------------
CMPASSERT( parentContext != NULL );
CMPASSERT( exch != NULL );
CMPASSERT( parentPartFunc != NULL );
CMPASSERT( childPartFunc != NULL );
//---------------------------------------------
// Determine number of rows produced by child.
//---------------------------------------------
EstLogPropSharedPtr inputLP = parentContext->getInputLogProp();
CMPASSERT( inputLP );
EstLogPropSharedPtr childOutputLP = exch->child(0).outputLogProp(inputLP);
const CostScalar& totalRowCount = (childOutputLP->getResultCardinality()).getCeiling();
const CostScalar& rowCountPerConsumer = (totalRowCount/numOfConsumers).getCeiling();
const CostScalar& rowCountPerProducer = (totalRowCount/numOfProducers).getCeiling();
// Determine number of probes and whether there are any outer references
// (i.e. probe values)
const CostScalar& noOfProbes = ( inputLP->getResultCardinality() ).minCsOne();
ValueIdSet externalInputs( exch->getGroupAttr()->getCharacteristicInputs() );
ValueIdSet outerRefs;
externalInputs.getOuterReferences( outerRefs );
CostScalar upRowsPerConsumer, upRowsPerProducer;
// Calculate number of rows each consumer will receive.
//------------------------------------------------------------------------
if (CURRSTMT_OPTDEFAULTS->incorporateSkewInCosting())
{
upRowsPerConsumer = childOutputLP->
getCardOfBusiestStream(parentPartFunc,
(Lng32)numOfConsumers.getValue(),
exch->getGroupAttr(),
(Lng32)numOfConsumers.getValue());
// assume number of consumers = number of CPUs;
// it is only used for round robin pf. where skew is not an issue
upRowsPerProducer = childOutputLP->
getCardOfBusiestStream(childPartFunc,
(Lng32)numOfProducers.getValue(),
exch->getGroupAttr(),
(Lng32)numOfProducers.getValue());
}
else
{
upRowsPerConsumer = rowCountPerConsumer;
upRowsPerProducer = rowCountPerProducer;
}
CostScalar upRowsPerBusiestStream = MAXOF(upRowsPerConsumer, upRowsPerProducer);
//------------------------------------------------------------------------
// For broadcast replication, each child will send all rows to all consumers.
// For no broadcast replication underneath a materialize that is not
// passing the probes through, each consumer will read the rows of all
// the children. This is similar to broadcast replication, so what we
// want to do here is the same - multiply the number of rows by the
// by the number of consumers.
//------------------------------------------------------------------------
if (parentPartFunc->isAReplicateViaBroadcastPartitioningFunction()
OR (parentPartFunc->isAReplicateNoBroadcastPartitioningFunction()
AND noOfProbes == 1 // 1 probe, but
AND outerRefs.isEmpty() // no probe values - must be under a materialize
)
)
{
//---------------------------------------------------------------------
// All producers send all rows to all consumers.
//---------------------------------------------------------------------
upRowsPerBusiestStream *= numOfConsumers;
}
return upRowsPerBusiestStream;
} // CostMethodExchange::scmComputeUpTuplesSent()
//<pb>
// -----------------------------------------------------------------------
// CostMethodSort scmComputeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost*
CostMethodSort::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
sort_ = (Sort*) op;
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
CostScalar rowCount(csZero);
if ((CURRSTMT_OPTDEFAULTS->incorporateSkewInCosting()) &&
(partFunc_ != NULL) )
{
rowCount =
myLogProp_->getCardOfBusiestStream(partFunc_,
countOfStreams_,
ga_,
countOfAvailableCPUs_);
}
else
{
// Row count a single probe on one instance of this sort is processing.
rowCount = myRowCount_ / countOfStreams_;
}
// need to do nlog(n) operations to sort n rows.
// log function will give us the natural logarithm.
// So, to get log to a different base, we have to compute log(n)/log(base)
CostScalar tuplesProcessed = rowCount * log(rowCount.value());
CostScalar logBase = ActiveSchemaDB()->getDefaults().getAsDouble(NCM_NUM_SORT_RUNS);
tuplesProcessed /= log(logBase.getValue());
CostScalar tuplesProduced = rowCount;
CostScalar inputRowSize = sort_->child(0).getGroupAttr()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(inputRowSize);
// Compute sort overflow costs.
// tuplesProcessed += scmComputeOverflowCost(rowCount, inputRowSize);
CostScalar overflowCost = scmComputeOverflowCost(rowCount, inputRowSize);
// Factor in rowsize.
tuplesProcessed *= rowSizeFactor;
tuplesProduced *= rowSizeFactor;
// Find out the number of cpus and number of fragments per cpu.
// long cpuCount, fragmentsPerCPU;
// determineCpuCountAndFragmentsPerCpu( cpuCount, fragmentsPerCPU );
// We are using tuplesSent as a placeholder for overflow costs as it would be
// easier for debugging by differentiating between regular tuples processed
// and overflow costs, rather than lumping them all togther in
// tuplesProcessed. tuplesSent is zero for all operators except exchange,
// so it makes sense to use this instead of adding a new field.
// It does not make a difference to the overall cost computation.
Cost* sortCost =
scmCost(tuplesProcessed, tuplesProduced, overflowCost, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, inputRowSize, csZero);
#ifndef NDEBUG
if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
{
pfp = stdout;
fprintf(pfp,"SORT::scmComputeOperatorCostInternal()\n");
sortCost->print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", sortCost->
convertToElapsedTime(
myContext->getReqdPhysicalProperty()).
value());
fprintf(pfp,"\n");
}
#endif
return sortCost;
} // CostMethodSort::scmComputeOperatorCostInternal()
//==============================================================================
// Compute the overflow cost of the sort.
// tuple counts alone may not be enough to capture the true costs.
//
// Inputs:
// numInputTuples -- input cardinality
// inputRowSize -- size of the input row.
// Output:
// none.
//
// Return:
// Overflow cost
//
//==============================================================================
CostScalar
CostMethodSort::scmComputeOverflowCost( CostScalar numInputTuples, CostScalar inputRowSize )
{
NABoolean sortOverFlowCosting =
(CmpCommon::getDefault(NCM_SORT_OVERFLOW_COSTING) == DF_ON);
if (sortOverFlowCosting == FALSE)
return 0;
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar sortBufferSizeInBytes = ActiveSchemaDB()->getDefaults().getAsDouble(GEN_SORT_MAX_BUFFER_SIZE);
CostScalar numSortBuffers = ActiveSchemaDB()->getDefaults().getAsDouble(GEN_SORT_MAX_NUM_BUFFERS);
CostScalar memoryAvailable = sortBufferSizeInBytes * numSortBuffers;
CostScalar memoryUsed = numInputTuples * inputRowSize;
// Pointers to the sort keys (32 bytes) are stored in an array. The array
// size is limited by the 128MB segment allocation limit.
// Therefore, the number of sort key pointers is limited to 128/32=4MB
// If this changes, maxSortKeys will need to be updated.
CostScalar maxSortKeys = CostScalar(4.0) * csOneKiloBytes * csOneKiloBytes;
if (memoryUsed <= memoryAvailable && numInputTuples <= maxSortKeys)
return 0;
// If overflow occurs during sort, the whole table is overflowed to disk.
// The factor of 2 comes from having to write overflow tuples to disk and
// read back the overflow tuples from disk.
CostScalar overflow = CostScalar(2.0) * numInputTuples * inputRowSizeFactor;
return overflow;
} // CostMethodSort::scmComputeOverflowCost
//<pb>
/**********************************************************************/
/* */
/* CostMethodSortGroupBy */
/* */
/**********************************************************************/
//<pb>
//==============================================================================
// Compute operator cost for a specified SortGroupBy operator.
//
// Input:
// op -- pointer to specified SortGroupBy operator.
//
// myContext -- pointer to optimization context for this SortGroupBy
// operator.
//
// Output:
// countOfStreams -- degree of parallelism for this SortGroupBy operator.
//
// Return:
// Pointer to computed cost object for this SortGroupBy operator.
//
//==============================================================================
Cost*
CostMethodSortGroupBy::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
// ---------------------------------------------------------------------
// Added on 7/16/97: If we're on our way down the tree and this group
// by is being considered for execution in DP2, generate a zero cost
// object first and come back to cost it later when we're on our way up.
// Set the count of streams to an invalid value (-1) to force us to
// recost on the way back up.
// ---------------------------------------------------------------------
if(rpp_->executeInDP2() AND
(NOT context_->getPlan()->getPhysicalProperty()))
{
countOfStreams = -1;
return generateZeroCostObject();
}
// ---------------------------------------------------------------------
// Make sure rowcount is at least group count to prevent absurdity in
// results.
// ---------------------------------------------------------------------
rowCountPerStream_ = MAXOF(rowCountPerStream_,groupCountPerStream_);
// The input to the SortGroupBy is always sorted, either the input is
// already naturally sorted or there is an explicit Sort operator below it.
CostScalar tuplesProcessed = rowCountPerStream_;
CostScalar tuplesProduced = myRowCountPerStream_ ;
// ---------------------------------------------------------------------
// tupleProduced can never be more than tupleProcessed.
// This thing can happen sometimes especially for partial group bys since
// cardinality estimates do not distinguish between partial
// and full group bys.
// ---------------------------------------------------------------------
tuplesProduced = MINOF(tuplesProcessed, tuplesProduced);
// Make the SortGroupBy cheaper than the HashGroupBy if the input is
// naturally sorted. If there is an explicit Sort below, then the overall cost
// of the SortGroupBy could be more than the HashGroupBy.
// Assume that SortGroupBy is 30% faster than HashGroupBy if the input is already sorted.
double sortToHashGroupFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_SGB_TO_HGB_FACTOR);
tuplesProcessed *= sortToHashGroupFactor;
CostScalar inputRowSize = gb_->child(0).getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar outputRowSize = myVis().getRowLength();
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= inputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// If this is a partial Group By leaf in an ESP adjust it's cost
GroupByAgg * groupByNode = (GroupByAgg *) op;
PhysicalProperty * sppForMe = (PhysicalProperty *) myContext->
getPlan()->getPhysicalProperty();
if(groupByNode->isAPartialGroupByLeaf() &&
((sppForMe && sppForMe->executeInESPOnly()) ||
(CmpCommon::getDefault(COMP_BOOL_186) == DF_ON)))
{
// don't adjust if Group Columns contain partition columns.
const PartitioningFunction* const myPartFunc =
sppForMe->getPartitioningFunction();
ValueIdSet myPartKey = myPartFunc->getPartitioningKey();
ValueIdSet myGroupingColumns = groupByNode->groupExpr();
NABoolean myGroupingMatchesPartitioning = FALSE;
if (myPartKey.entries() &&
myGroupingColumns.contains(myPartKey))
myGroupingMatchesPartitioning = TRUE;
if (!myGroupingMatchesPartitioning &&
(CmpCommon::getDefault(NCM_PAR_GRPBY_ADJ) == DF_ON))
{
CostScalar grpByAdjFactor = ActiveSchemaDB()->getDefaults().getAsDouble(ROBUST_PAR_GRPBY_LEAF_FCTR);
tuplesProcessed *= grpByAdjFactor;
tuplesProduced *= grpByAdjFactor;
}
}
// ---------------------------------------------------------------------
// Synthesize and return the cost object.
// ---------------------------------------------------------------------
Cost* sortGroupByCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, outputRowSize, csZero);
// ---------------------------------------------------------------------
// For debugging.
// ---------------------------------------------------------------------
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"SORTGROUPBY::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"child0RowCount=%g,groupCount=%g,myRowCount=%g\n",
child0RowCount_.toDouble(),groupCount_.toDouble(),myRowCount_.toDouble());
sortGroupByCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", sortGroupByCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).
value());
fprintf(pfp,"\n");
}
#endif
return sortGroupByCost;
} // CostMethodSortGroupBy::scmComputeOperatorCostInternal().
/**********************************************************************/
/* */
/* CostMethodShortCutGroupBY */
/* */
/**********************************************************************/
// -----------------------------------------------------------------------
// CostMethodShortCutGroupBy::scmComputeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost*
CostMethodShortCutGroupBy::scmComputeOperatorCostInternal
(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// CostScalars to be computed.
// ---------------------------------------------------------------------
CostScalar cpu(csZero);
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
// ---------------------------------------------------------------------
// Added on 7/16/97: If we're on our way down the tree and this group
// by is being considered for execution in DP2, generate a zero cost
// object first and come back to cost it later when we're on our way up.
// Set the count of streams to an invalid value (0) to force us to
// recost on the way back up.
// ---------------------------------------------------------------------
if(rpp_->executeInDP2() AND
(NOT context_->getPlan()->getPhysicalProperty()))
{
countOfStreams = 0;
return generateZeroCostObject();
}
// ---------------------------------------------------------------------
// ShortCutGroupBy execution can be short-circuited after we found one
// row to have satisfied the any-true aggregate expression.
// This short circuit can even lead to the cancellation of the execution
// of the operator's child.
// Assume that on average you process 50% of child rows before a
// short_circuit. If no short_circuit it is same as regular sortGroupBy,
// no harm either.
// ---------------------------------------------------------------------
// aggrVis should contain only one expr which is rooted by ANYTRUE op.
// ---------------------------------------------------------------------
const ValueIdSet& aggrVis = gb_->aggregateExpr();
CMPASSERT(NOT aggrVis.isEmpty());
ValueId ExprVid = aggrVis.init();
// coverity[check_return]
aggrVis.next(ExprVid);
const ItemExpr * itemExpr = ExprVid.getItemExpr();
OperatorTypeEnum optype = itemExpr->getOperatorType();
CMPASSERT(optype==ITM_MIN OR
optype==ITM_MAX OR
optype==ITM_ANY_TRUE OR
optype==ITM_ANY_TRUE_MAX);
CostScalar tuplesProcessed = csZero;
CostScalar tuplesProduced = csZero;
if((optype==ITM_ANY_TRUE) OR (optype==ITM_ANY_TRUE_MAX))
{
tuplesProcessed = (rowCountPerStream_ * 0.5);
tuplesProduced = myRowCountPerStream_ ;
}
else // MIN/MAX optimization
{
//it only passes along a single row
CMPASSERT(op->getFirstNRows()==1);
tuplesProcessed = tuplesProduced = 1;
}
CostScalar inputRowSize = gb_->child(0).getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar outputRowSize = myVis().getRowLength();
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= inputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* shortCutGBCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, outputRowSize, csZero);
// ---------------------------------------------------------------------
// For debugging.
// ---------------------------------------------------------------------
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"ShortCutGroupBy::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"childRowCount=%g",child0RowCount_.toDouble());
shortCutGBCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", shortCutGBCost->convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp,"\n");
}
#endif
return shortCutGBCost;
} // ShortCutGroupBy::scmComputeOperatorCostInternal().
/**********************************************************************/
/* */
/* CostMethodHashGroupBy */
/* */
/**********************************************************************/
//<pb>
//==============================================================================
// Compute operator cost for a specified HashGroupBy operator.
//
// Input:
// op -- pointer to specified HashGroupBy operator.
//
// myContext -- pointer to optimization context for this HashGroupBy
// operator.
//
// Output:
// countOfStreams -- degree of parallelism for this HashGroupBy operator.
//
// Return:
// Pointer to computed cost object for this HashGroupBy operator.
//
//==============================================================================
Cost*
CostMethodHashGroupBy::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
//-------------------
// Preparatory work.
//-------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
//-------------------------------------------
// Save off estimated degree of parallelism.
//-------------------------------------------
countOfStreams = countOfStreams_;
//----------------------------------------------------------------------
// Added on 7/16/97: If we're on our way down the tree and this group
// by is being considered for execution in DP2, generate a zero cost
// object first and come back to cost it later when we're on our way up.
// Set the count of streams to an invalid value (-1) to force us to
// recost on the way back up.
//----------------------------------------------------------------------
if(rpp_->executeInDP2() AND
(NOT context_->getPlan()->getPhysicalProperty()))
{
countOfStreams = -1;
return generateZeroCostObject();
}
// ---------------------------------------------------------------------
// Make sure rowcount is at least group count to prevent absurdity in
// results.
// ---------------------------------------------------------------------
rowCountPerStream_ = MAXOF(rowCountPerStream_,groupCountPerStream_);
CostScalar tuplesProcessed = rowCountPerStream_;
CostScalar tuplesProduced = myRowCountPerStream_;
// ---------------------------------------------------------------------
// tupleProduced can never be more than tupleProcessed.
// This thing can happen sometimes especially for partial group bys since
// cardinality estimates do not distinguish between partial
// and full group bys.
// ---------------------------------------------------------------------
tuplesProduced = MINOF(tuplesProcessed, tuplesProduced);
// Special things for Dp2 (push down) hash groupby.
// I think this is not required that is why it is commented.
if(rpp_->executeInDP2())
{
CostScalar groupCountInMemory =
ActiveSchemaDB()->getDefaults().getAsLong(MAX_DP2_HASHBY_GROUPS);
groupCountInMemory = MINOF(groupCountInMemory, groupCountPerStream_);
//----------------------------------
// Average number of rows per group.
//----------------------------------
CostScalar rowsPerGroup = (rowCountPerStream_ / groupCountPerStream_);
CostScalar rowsNotTouched =
rowCountPerStream_ - groupCountInMemory * rowsPerGroup;
// tuplesProduced = groupCountInMemory + rowsNotTouched;
if (isUnderNestedJoin_)
{
tuplesProduced *= countOfStreams;
tuplesProcessed *= countOfStreams;
// tuplesProduced = groupCountInMemory + rowsNotTouched;
// tuplesProcessed += groupCountInMemory * rowsPerGroup;
}
}
CostScalar inputRowSize = gb_->child(0).getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar outputRowSize = myVis().getRowLength();
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// tuplesProcessed += scmComputeOverflowCost(tuplesProcessed, inputRowSize,
// tuplesProduced, outputRowSize);
GroupByAgg * groupByNode = (GroupByAgg *) op;
CostScalar overflowCost;
if (rpp_->executeInDP2() || groupByNode->isAPartialGroupByLeaf())
overflowCost = csZero;
else
overflowCost = scmComputeOverflowCost(tuplesProcessed, inputRowSize,
tuplesProduced, outputRowSize);
tuplesProcessed *= inputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// If this is a partial Group By leaf in an ESP adjust it's cost
PhysicalProperty * sppForMe = (PhysicalProperty *) myContext->
getPlan()->getPhysicalProperty();
if(groupByNode->isAPartialGroupByLeaf() &&
((sppForMe && sppForMe->executeInESPOnly()) ||
(CmpCommon::getDefault(COMP_BOOL_186) == DF_ON)))
{
// don't adjust if Group Columns contain partition columns.
const PartitioningFunction* const myPartFunc =
sppForMe->getPartitioningFunction();
ValueIdSet myPartKey = myPartFunc->getPartitioningKey();
ValueIdSet myGroupingColumns = groupByNode->groupExpr();
NABoolean myGroupingMatchesPartitioning = FALSE;
if (myPartKey.entries() &&
myGroupingColumns.contains(myPartKey))
myGroupingMatchesPartitioning = TRUE;
if (!myGroupingMatchesPartitioning &&
(CmpCommon::getDefault(NCM_PAR_GRPBY_ADJ) == DF_ON))
{
CostScalar grpByAdjFactor = ActiveSchemaDB()->getDefaults().getAsDouble(ROBUST_PAR_GRPBY_LEAF_FCTR);
tuplesProcessed *= grpByAdjFactor;
overflowCost *= grpByAdjFactor;
tuplesProduced *= grpByAdjFactor;
}
}
//----------------------------------------
// Synthesize and return the cost object.
//----------------------------------------
// We are using tuplesSent as a placeholder for overflow costs as it would be
// easier for debugging by differentiating between regular tuples processed
// and overflow costs, rather than lumping them all togther in
// tuplesProcessed. tuplesSent is zero for all operators except exchange,
// so it makes sense to use this instead of adding a new field.
// It does not make a difference to the overall cost computation.
Cost* hashGroupByCost =
scmCost(tuplesProcessed, tuplesProduced, overflowCost, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, outputRowSize, csZero);
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"HASHGROUPBY::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"child0RowCount=%g,groupCount=%g,myRowCount=%g\n",
child0RowCount_.toDouble(),groupCount_.toDouble(),myRowCount_.toDouble());
hashGroupByCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", hashGroupByCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp,"\n");
}
#endif
return hashGroupByCost;
}
// CostMethodHashGroupBy::scmComputeOperatorCostInternal()
//<pb>
//==============================================================================
// Compute the overflow cost of the hash group by.
// tuple counts alone may not be enough to capture the true costs.
//
// Inputs:
// numInputTuples -- input cardinality
// inputRowSize -- size of the input row.
// numOutputTuples -- output cardinality
// outputRowSize -- size of the output row.
// Output:
// none.
//
// Return:
// Overflow cost
//
//==============================================================================
CostScalar
CostMethodHashGroupBy::scmComputeOverflowCost( CostScalar numInputTuples, CostScalar inputRowSize, CostScalar numOutputTuples, CostScalar outputRowSize )
{
NABoolean hashGroupByOverFlowCosting =
(CmpCommon::getDefault(NCM_HGB_OVERFLOW_COSTING) == DF_ON);
if (hashGroupByOverFlowCosting == FALSE)
return 0;
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
// Use the OCM value (200 MB) for memory limit per cpu/stream in bytes.
// The value in the defaults table is 204800, in KB.
CostScalar memoryAvailableInKB = ActiveSchemaDB()->getDefaults().getAsDouble(BMO_MEMORY_SIZE);
// Convert to bytes as all computations here are in bytes.
CostScalar memoryAvailable = memoryAvailableInKB * csOneKiloBytes;
CostScalar memoryUsed = numOutputTuples * outputRowSize;
if (memoryUsed <= memoryAvailable)
return 0;
CostScalar fractionOverflow = CostScalar(1.0) - memoryAvailable/memoryUsed;
// The factor of 2 comes from having to write overflow tuples to disk and
// read back the overflow tuples from disk.
CostScalar overflow = CostScalar(2.0) * fractionOverflow *
numInputTuples * inputRowSizeFactor;
return overflow;
} // CostMethodHashGroupBy::scmComputeOverflowCost
/**********************************************************************/
/* */
/* CostMethodHashJoin */
/* */
/**********************************************************************/
// -----------------------------------------------------------------------
// CostMethodHashJoin::scmComputeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost*
CostMethodHashJoin::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
hj_ = (HashJoin*) op;
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
CostScalar outputJoinCard, tuplesProcessed, commutativeTuplesProcessed;
CostScalar overflowCost, commutativeOverflowCost;
//COMP_INT_80 = 0 means fix is OFF.
// = 1 means only 1736 fix is ON.
// = 2 means only 1769 fix is ON.
// >= 3 means 1736 and 1769 fixes are ON.
Lng32 compInt80 = (ActiveSchemaDB()->getDefaults()).getAsLong(COMP_INT_80);
// fix for 1736
// cost of serial HJ is less than cost of parallel one because
// values of equiJnRowCountPerStream_ and myRowCount_ are different.
// Fix is to use the same value for both (equiJnRowCountPerStream_)
if (compInt80 == 1 OR compInt80 >= 3)
outputJoinCard = equiJnRowCountPerStream_;
else { // will be removed after perf testing
if (CURRSTMT_OPTDEFAULTS->incorporateSkewInCosting() && countOfStreams > 1)
{
/*
CostScalar child0CardOfFreqValue =
((child0RowCountPerStream_ * countOfStreams - child0RowCount_)/
(countOfStreams - 1));
CostScalar child1CardOfFreqValue =
((child1RowCountPerStream_ * countOfStreams - child1RowCount_)/
(countOfStreams - 1));
outputJoinCard = MAXOF(child0CardOfFreqValue * child1CardOfFreqValue,
(myRowCount_/countOfStreams).getCeiling());
outputJoinCard = MINOF(outputJoinCard, myRowCount_);
*/
outputJoinCard = equiJnRowCountPerStream_;
}
else
{
outputJoinCard = (myRowCount_/countOfStreams).getCeiling();
}
} // will be removed after perf testing
CostScalar probeTuplesPerStream = child0RowCountPerStream_;
CostScalar buildTuplesPerStream = child1RowCountPerStream_;
CostScalar broadcastTuplesPerStream;
if (ActiveSchemaDB()->getDefaults().getAsLong(COMP_INT_95) == 0)
broadcastTuplesPerStream = child1RowCountPerStream_;
else
broadcastTuplesPerStream = child1RowCount_;
CostScalar probeTuples = child0RowCount_;
CostScalar buildTuples = child1RowCount_;
CostScalar probeRowSize = child0RowLength_;
CostScalar buildRowSize = child1RowLength_;
CostScalar outputRowSize = hj_->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar probeSize = probeTuples * probeRowSize;
CostScalar buildSize = buildTuples * buildRowSize;
CostScalar probeRowSizeFactor = scmRowSizeFactor(probeRowSize);
CostScalar buildRowSizeFactor = scmRowSizeFactor(buildRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// this is not true when going down the tree because child1PartFunc_
// is always NULL even for Type1 plans.
NABoolean type2Join = ((child1PartFunc_ == NULL) OR
child1PartFunc_->isAReplicateViaBroadcastPartitioningFunction());
NABoolean buildSizeSmaller = FALSE;
if (buildSize <= probeSize)
buildSizeSmaller = TRUE;
// In SCM, we always build with the smaller side if possible.
// If the build side is larger than the probe side, we will not pick this
// plan.
// In certain cases like (left or right) outer joins, there is no choice in
// picking the left and left side of a join.
if (type2Join)
{
tuplesProcessed = broadcastTuplesPerStream * buildRowSizeFactor +
probeTuplesPerStream * probeRowSizeFactor;
overflowCost =
scmComputeOverflowCost(broadcastTuplesPerStream, buildRowSize,
probeTuplesPerStream, probeRowSize);
commutativeTuplesProcessed = probeTuples * probeRowSizeFactor +
buildTuplesPerStream * buildRowSizeFactor;
commutativeOverflowCost =
scmComputeOverflowCost(probeTuples, probeRowSize,
buildTuplesPerStream, buildRowSize);
if (NOT hasEquiJoinPred_)
{
if ( compInt80 < 2) { // will be removed after perf testing
// cross product type2 join.
tuplesProcessed += (probeTuplesPerStream * probeRowSizeFactor *
broadcastTuplesPerStream * buildRowSizeFactor);
commutativeTuplesProcessed += (buildTuplesPerStream * buildRowSizeFactor *
probeTuples * probeRowSizeFactor);
} // will be removed after perf testing
}
}
else
{
// Regular type1 hash join.
tuplesProcessed = buildTuplesPerStream * buildRowSizeFactor +
probeTuplesPerStream * probeRowSizeFactor;
overflowCost =
scmComputeOverflowCost(buildTuplesPerStream, buildRowSize,
probeTuplesPerStream, probeRowSize);
commutativeTuplesProcessed = probeTuplesPerStream * probeRowSizeFactor +
buildTuplesPerStream * buildRowSizeFactor;
commutativeOverflowCost =
scmComputeOverflowCost(probeTuplesPerStream, probeRowSize,
buildTuplesPerStream, buildRowSize);
}
NABoolean commutativeCostSame =
(tuplesProcessed + overflowCost == commutativeTuplesProcessed + commutativeOverflowCost);
NABoolean commutativeCostCheaper =
(tuplesProcessed + overflowCost > commutativeTuplesProcessed + commutativeOverflowCost);
if (buildSizeSmaller)
{
// Ensure that this is cheaper than the commutative operation.
if (commutativeCostSame)
{
tuplesProcessed = tuplesProcessed * 0.999;
overflowCost = overflowCost * 0.999;
}
else if (commutativeCostCheaper)
{
tuplesProcessed = commutativeTuplesProcessed;
overflowCost = commutativeOverflowCost;
}
}
else
{
// Ensure that this is NOT cheaper than the commutative operation.
if (commutativeCostSame)
{
tuplesProcessed = tuplesProcessed / 0.999;
overflowCost = overflowCost / 0.999;
}
else if (NOT commutativeCostCheaper)
{
tuplesProcessed = commutativeTuplesProcessed;
overflowCost = commutativeOverflowCost;
}
}
CostScalar tuplesProduced = outputJoinCard * outputRowSizeFactor;
//----------------------------------------
// Synthesize and return the cost object.
//----------------------------------------
// We are using tuplesSent as a placeholder for overflow costs as it would be
// easier for debugging by differentiating between regular tuples processed
// and overflow costs, rather than lumping them all togther in
// tuplesProcessed. tuplesSent is zero for all operators except exchange,
// so it makes sense to use this instead of adding a new field.
// It does not make a difference to the overall cost computation.
Cost* hashJoinCost =
scmCost(tuplesProcessed, tuplesProduced, overflowCost, csZero, csZero, noOfProbesPerStream_,
probeRowSize, buildRowSize, outputRowSize, csZero);
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"HASHJOIN::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"child0RowCount=%g,child1RowCount=%g,myRowCount=%g\n",
child0RowCount_.toDouble(),child1RowCount_.toDouble(),myRowCount_.toDouble());
hashJoinCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", hashJoinCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp,"\n");
}
#endif
return hashJoinCost;
}
//==============================================================================
// Compute the overflow cost of the hash join.
// tuple counts alone may not be enough to capture the true costs.
//
// Inputs:
// numBuildTuples -- build side cardinality
// buildRowSize -- size of the build side row.
// numProbeTuples -- probe side cardinality
// probeRowSize -- size of the probe side row.
// Output:
// none.
//
// Return:
// Overflow cost
//
//==============================================================================
CostScalar
CostMethodHashJoin::scmComputeOverflowCost( CostScalar numBuildTuples, CostScalar buildRowSize, CostScalar numProbeTuples, CostScalar probeRowSize )
{
NABoolean hashJoinOverFlowCosting =
(CmpCommon::getDefault(NCM_HJ_OVERFLOW_COSTING) == DF_ON);
if (hashJoinOverFlowCosting == FALSE)
return 0;
CostScalar probeRowSizeFactor = scmRowSizeFactor(probeRowSize);
CostScalar buildRowSizeFactor = scmRowSizeFactor(buildRowSize);
// Use the OCM value (200 MB) for memory limit per cpu/stream in bytes.
// The value in the defaults table is 204800, in KB.
CostScalar memoryAvailableInKB = ActiveSchemaDB()->getDefaults().getAsDouble(BMO_MEMORY_SIZE);
// Convert to bytes as all computations here are in bytes.
CostScalar memoryAvailable = memoryAvailableInKB * csOneKiloBytes;
CostScalar memoryUsed = numBuildTuples * buildRowSize;
if (memoryUsed <= memoryAvailable)
return 0;
CostScalar fractionOverflow = CostScalar(1.0) - memoryAvailable/memoryUsed;
// The factor of 2 comes from having to write overflow tuples to disk and
// read back the overflow tuples from disk.
CostScalar buildOverflow = CostScalar(2.0) * fractionOverflow *
numBuildTuples * buildRowSizeFactor;
// The factor of 2 comes from having to write overflow tuples to disk and
// read back the overflow tuples from disk.
CostScalar probeOverflow = CostScalar(2.0) * fractionOverflow *
numProbeTuples * probeRowSizeFactor;
return (buildOverflow + probeOverflow);
} // CostMethodHashJoin::scmComputeOverflowCost
//<pb>
//**********************************************************************/
/* */
/* CostMethodMergeJoin */
/* */
/**********************************************************************/
// -----------------------------------------------------------------------
// CostMethodMergeJoin::scmComputeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost*
CostMethodMergeJoin::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
mj_ = (MergeJoin*) op;
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
CostScalar outputJoinCard = equiJnRowCountPerStream_;
CostScalar leftTuples = child0RowCountPerStream_;
CostScalar rightTuples = child1RowCountPerStream_;
CostScalar tuplesProcessed, tuplesProduced;
// Length of a row from the left table.
GroupAttributes* child0GA = mj_->child(0).getGroupAttr();
CostScalar leftRowSize = child0GA->getRecordLength();
// Length of a row from the right table.
GroupAttributes* child1GA = mj_->child(1).getGroupAttr();
CostScalar rightRowSize = child1GA->getRecordLength();
CostScalar outputRowSize = mj_->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar leftRowSizeFactor = scmRowSizeFactor(leftRowSize);
CostScalar rightRowSizeFactor = scmRowSizeFactor(rightRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// The inputs to the MergeJoin are always sorted, either the inputs are
// already naturally sorted or there are explicit Sort operators below.
// MergeJoins are 40% faster than HashJoins if the inputs are already sorted.
// This was based on lots of experiments and analysis of costs and execution
// times.
double mergeToHashJoinFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_MJ_TO_HJ_FACTOR);
tuplesProcessed = (rightTuples * rightRowSizeFactor +
leftTuples * leftRowSizeFactor) * mergeToHashJoinFactor;
// COMP_BOOL_97 OFF means MJ fix is ON (default case).
if ( CmpCommon::getDefault(COMP_BOOL_97) == DF_OFF )
tuplesProduced = outputJoinCard * outputRowSizeFactor;
else // will be removed after perf testing
tuplesProduced = outputJoinCard * outputRowSizeFactor * mergeToHashJoinFactor;
//----------------------------------------
// Synthesize and return the cost object.
//----------------------------------------
Cost* mergeJoinCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
leftRowSize, rightRowSize, outputRowSize, csZero);
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"MERGEJOIN::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"child0RowCount=%g,child1RowCount=%g,myRowCount=%g\n",
child0RowCount_.toDouble(),child1RowCount_.toDouble(),myRowCount_.toDouble());
mergeJoinCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", mergeJoinCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp,"\n");
}
#endif
return mergeJoinCost;
}
//<pb>
/**********************************************************************/
/* */
/* CostMethodNestedJoin */
/* */
/**********************************************************************/
// -----------------------------------------------------------------------
// CostMethodNestedJoin::scmComputeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost*
CostMethodNestedJoin::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
CostScalar tuplesProcessed, tuplesProduced;
CostScalar outputJoinCard = (myRowCount_/countOfStreams).getCeiling();
CostScalar leftTuples;
// I have chosen the left side arbitrarily, the right side will be considered
// by the optimizer during consideration of the commutative permutation.
CostScalar broadcastTuples = child0RowCount_;// child0RowCountPerStream_ * countOfStreams;
if ((child0PartFunc_ == NULL) OR
child0PartFunc_->isAReplicateViaBroadcastPartitioningFunction())
leftTuples = broadcastTuples;
else
leftTuples = child0RowCountPerStream_;
// Length of a row from the left table.
GroupAttributes* child0GA = nj_->child(0).getGroupAttr();
CostScalar leftRowSize = child0GA->getRecordLength();
// Length of a row from the right table.
GroupAttributes* child1GA = nj_->child(1).getGroupAttr();
CostScalar rightRowSize = child1GA->getRecordLength();
CostScalar outputRowSize = nj_->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar leftRowSizeFactor = scmRowSizeFactor(leftRowSize);
CostScalar rightRowSizeFactor = scmRowSizeFactor(rightRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// The tuples (probes) from the left side are processed by the nested join
// and sent down to the right side.
// These are sent back up from the right side to the nested join, this
// accounts for the factor of 2 in the formula.
tuplesProcessed = leftTuples * leftRowSizeFactor * 2;
// The nested join operator produces "outputJoinCard" number of rows.
tuplesProduced = outputJoinCard * outputRowSizeFactor;
//----------------------------------------
// Synthesize and return the cost object.
//----------------------------------------
Cost* nestedJoinCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
leftRowSize, rightRowSize, outputRowSize, csZero);
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"NESTEDJOIN::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"child0RowCount=%g,child1RowCount=%g,myRowCount=%g\n",
child0RowCount_.toDouble(),child1RowCount_.toDouble(),myRowCount_.toDouble());
nestedJoinCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", nestedJoinCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp,"\n");
}
#endif
return nestedJoinCost;
}
/**********************************************************************/
/* */
/* CostMethodNestedJoinFlow */
/* */
/**********************************************************************/
// -----------------------------------------------------------------------
// CostMethodNestedJoinFlow::scmComputeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost*
CostMethodNestedJoinFlow::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
// Length of a row from the left table.
GroupAttributes* child0GA = nj_->child(0).getGroupAttr();
CostScalar leftRowSize = child0GA->getRecordLength();
CostScalar outputRowSize = nj_->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar leftRowSizeFactor = scmRowSizeFactor(leftRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// NestedJoinFlow doesn't do anything by itself except
// sends left child rows to right child
CostScalar tuplesProcessed = child0RowCountPerStream_ * leftRowSizeFactor;
// Passes rows from right child to its parent.
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling() * outputRowSizeFactor;
//----------------------------------------
// Synthesize and return the cost object.
//----------------------------------------
Cost* nestedJoinFlowCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
leftRowSize, csZero, outputRowSize, csZero);
#ifndef NDEBUG
NABoolean printCost =
( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON );
if ( printCost )
{
pfp = stdout;
fprintf(pfp,"NESTEDJOINFLOW::scmComputeOperatorCostInternal()\n");
fprintf(pfp,"child0RowCount=%g,child1RowCount=%g,myRowCount=%g\n",
child0RowCount_.toDouble(),child1RowCount_.toDouble(),myRowCount_.toDouble());
nestedJoinFlowCost->getScmCplr().print(pfp);
fprintf(pfp, "Elapsed time: ");
fprintf(pfp,"%f", nestedJoinFlowCost->
convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp,"\n");
}
#endif
return nestedJoinFlowCost;
}
// CostMethodTranspose::scmComputeOperatorCostInternal() -------------------------
// Compute the cost of this Transpose node given the optimization context.
//
// Parameters
//
// RelExpr *op
// IN - The PhysTranpose node which is being costed.
//
// Context *myContext
// IN - The optimization context within which to cost this node.
//
// long& countOfStreams
// OUT - Estimated degree of parallelism for returned preliminary cost.
//
Cost *
CostMethodTranspose::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// Just to make sure things are working as expected
//
CMPASSERT(op->getOperatorType() == REL_TRANSPOSE);
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
EstLogPropSharedPtr childOutputLP = op->child(0).outputLogProp( inputLP );
const CostScalar & child0RowCount = childOutputLP->getResultCardinality();
CostScalar tuplesProcessed = (child0RowCount/countOfStreams).getCeiling();
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling();
GroupAttributes* child0GA = op->child(0).getGroupAttr();
CostScalar inputRowSize = child0GA->getRecordLength();
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= inputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* trnsPoseCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, outputRowSize, csZero);
return trnsPoseCost;
} // CostMethodTranspose::scmComputeOperatorInternal()
/**********************************************************************/
/* */
/* CostMethodSample */
/* */
/**********************************************************************/
// Compute common costing parameters.
//
// CostMethodSample::scmComputeOperatorCostInternal() ---------------------
// Compute the cost of this Sample node given the optimization context.
//
// Parameters
//
// RelExpr *op
// IN - The PhysSample node which is being costed.
//
// const PlanWorkSpace* pws
// IN - Optimization context within which to cost this node.
//
// long& countOfStreams
// OUT - Estimated degree of parallelism for returned preliminary cost.
//
Cost *
CostMethodSample::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// Just to make sure things are working as expected
//
DCMPASSERT(op->getOperatorType() == REL_SAMPLE);
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
EstLogPropSharedPtr childOutputLP = op->child(0).outputLogProp( inputLP );
const CostScalar & child0RowCount = childOutputLP->getResultCardinality();
CostScalar tuplesProcessed = (child0RowCount/countOfStreams).getCeiling();
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling();
GroupAttributes* child0GA = op->child(0).getGroupAttr();
CostScalar inputRowSize = child0GA->getRecordLength();
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= inputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* sampleCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, outputRowSize, csZero);
return sampleCost;
} // CostMethodSample::scmComputeOperatorCostInternal()
Cost *
CostMethodRelSequence::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// Just to make sure things are working as expected
//
DCMPASSERT(op->getOperatorType() == REL_SEQUENCE);
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
EstLogPropSharedPtr childOutputLP = op->child(0).outputLogProp( inputLP );
const CostScalar & child0RowCount = childOutputLP->getResultCardinality();
CostScalar tuplesProcessed = (child0RowCount/countOfStreams).getCeiling();
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling();
GroupAttributes* child0GA = op->child(0).getGroupAttr();
CostScalar inputRowSize = child0GA->getRecordLength();
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= inputRowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* seqCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
inputRowSize, csZero, outputRowSize, csZero);
return seqCost;
} // CostMethodRelSequence::scmComputeOperatorInternal()
//-------------------------------------------------------------------------
// CostMethodCompoundStmt::scmComputeOperatorInternal()
// Compute the cost of this Compound Statement node, given the optimization
// context.
//
// Parameters
//
// RelExpr *op
// IN - The PhysTranpose node which is being costed.
//
// Context *myContext
// IN - The optimization context within which to cost this node.
//-------------------------------------------------------------------------
Cost *
CostMethodCompoundStmt::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// Just to make sure things are working as expected
//
DCMPASSERT(op->getOperatorType() == REL_COMPOUND_STMT);
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
EstLogPropSharedPtr child0OutputLP = op->child(0).outputLogProp( inputLP );
EstLogPropSharedPtr child1OutputLP = op->child(1).outputLogProp( inputLP );
const CostScalar & child0RowCount = child0OutputLP->getResultCardinality();
const CostScalar & child1RowCount = child1OutputLP->getResultCardinality();
GroupAttributes* child0GA = op->child(0).getGroupAttr();
CostScalar input0RowSize = child0GA->getRecordLength();
GroupAttributes* child1GA = op->child(1).getGroupAttr();
CostScalar input1RowSize = child1GA->getRecordLength();
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar input0RowSizeFactor = scmRowSizeFactor(input0RowSize);
CostScalar input1RowSizeFactor = scmRowSizeFactor(input1RowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar tuplesProcessed =
child0RowCount * input0RowSizeFactor + child1RowCount * input1RowSizeFactor;
// per stream basis
tuplesProcessed = (tuplesProcessed/countOfStreams).getCeiling();
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling() * outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* csCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
input0RowSize, input1RowSize, outputRowSize, csZero);
return csCost;
} // CostMethodCompoundStmt::scmComputeOperatorInternal()
Cost *
CostMethodStoredProc::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Stored procedures never run in parallel.
// -----------------------------------------
countOfStreams = 1;
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar tuplesProduced = myRowCount_ * outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* spCost =
scmCost(csZero, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
csZero, csZero, outputRowSize, csZero);
return spCost;
} // CostMethodStoredProc::scmComputeOperatorInternal()
Cost *
CostMethodTuple::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,pws->getContext());
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// ---------------------------------------------------------------------
// The Tuple operator returns exactly one row for each probe it gets.
// Thus, its total row count should just be probeCount.
// ---------------------------------------------------------------------
CostScalar tuplesProduced = myRowCount_ * noOfProbesPerStream_ * outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* tupCost =
scmCost(csZero, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
csZero, csZero, outputRowSize, csZero);
return tupCost;
} // CostMethodTuple::scmComputeOperatorInternal()
Cost *
CostMethodUnPackRows::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// Just to make sure things are working as expected
//
DCMPASSERT(op->getOperatorType() == REL_UNPACKROWS);
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,pws->getContext());
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling() * outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* upackCost =
scmCost(csZero, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
csZero, csZero, outputRowSize, csZero);
return upackCost;
} // CostMethodUnPackRows::scmComputeOperatorInternal()
// -----------------------------------------------------------------------
// CostMethodMergeUnion::computeOperatorCostInternal().
// -----------------------------------------------------------------------
Cost *
CostMethodMergeUnion::scmComputeOperatorCostInternal(RelExpr *op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
EstLogPropSharedPtr child0OutputLP = op->child(0).outputLogProp( inputLP );
EstLogPropSharedPtr child1OutputLP = op->child(1).outputLogProp( inputLP );
const CostScalar & child0RowCount = child0OutputLP->getResultCardinality();
const CostScalar & child1RowCount = child1OutputLP->getResultCardinality();
GroupAttributes* child0GA = op->child(0).getGroupAttr();
CostScalar input0RowSize = child0GA->getRecordLength();
GroupAttributes* child1GA = op->child(1).getGroupAttr();
CostScalar input1RowSize = child1GA->getRecordLength();
CostScalar outputRowSize = op->getGroupAttr()->getCharacteristicOutputs().getRowLength();
CostScalar input0RowSizeFactor = scmRowSizeFactor(input0RowSize);
CostScalar input1RowSizeFactor = scmRowSizeFactor(input1RowSize);
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar tuplesProcessed =
child0RowCount * input0RowSizeFactor + child1RowCount * input1RowSizeFactor;
// per stream basis
tuplesProcessed = (tuplesProcessed/countOfStreams).getCeiling();
CostScalar tuplesProduced = (myRowCount_/countOfStreams).getCeiling() * outputRowSizeFactor;
// ------------------------------------------------
// Synthesize and return the cost object.
// ------------------------------------------------
Cost* muCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, csZero, csZero, noOfProbesPerStream_,
input0RowSize, input1RowSize, outputRowSize, csZero);
return muCost;
} // CostMethodMergeUnion::scmComputeOperatorInternal()
//<pb>
// -------------------------------------------------------------------
// Cost methods for write DML operations
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// This method is a stub for obsolete old cost model
// -------------------------------------------------------------------
Cost*
CostMethodHbaseUpdateOrDelete::computeOperatorCostInternal(RelExpr* op,
const Context * context,
Lng32& countOfStreams)
{
CMPASSERT(false); // should never be called
return NULL;
}
// -----------------------------------------------------------------------
// CostMethodHbaseUpdateOrDelete::allKeyColumnsHaveHistogramStatistics()
//
// Returns TRUE if all key columns have histograms, FALSE if not.
// -----------------------------------------------------------------------
NABoolean CostMethodHbaseUpdateOrDelete::allKeyColumnsHaveHistogramStatistics(
const IndexDescHistograms & histograms,
const IndexDesc * CIDesc
) const
{
// Check if all key columns have histogram statistics
NABoolean statsForAllKeyCols = TRUE;
for ( CollIndex j = 0; j < CIDesc->getIndexKey().entries(); j++ )
{
if (histograms.isEmpty())
{
statsForAllKeyCols = FALSE;
break;
}
else if (!histograms.getColStatsPtrForColumn((CIDesc->getIndexKey()) [j]))
{
// If we get a null pointer when we try to retrieve a
// ColStats for a column of this table, then no histogram
// data was created for that column.
statsForAllKeyCols = FALSE;
break;
}
}
return statsForAllKeyCols;
} // CostMethodHbaseUpdateOrDelete::allKeyColumnsHaveHistogramStatistics()
// -----------------------------------------------------------------------
// CostMethodHbaseUpdateOrDelete::numRowsToScanWhenAllKeyColumnsHaveHistograms()
//
// Returns an estimate of the number of rows that will be scanned as a
// result of applying key predicates. Assumes that histograms exist for
// all key columns.
// -----------------------------------------------------------------------
CostScalar
CostMethodHbaseUpdateOrDelete::numRowsToScanWhenAllKeyColumnsHaveHistograms(
IndexDescHistograms & histograms,
const ColumnOrderList & keyPredsByCol,
const CostScalar & activePartitions,
const IndexDesc * CIDesc
) const
{
// Determine if there are single subset predicates:
CollIndex singleSubsetPrefixOrder;
NABoolean itIsSingleSubset =
keyPredsByCol.getSingleSubsetOrder( singleSubsetPrefixOrder );
NABoolean thereAreSingleSubsetPreds = FALSE;
if ( singleSubsetPrefixOrder > 0 )
{
thereAreSingleSubsetPreds = TRUE;
}
else
{
// singleSubsetPrefixOrder==0 means either there
// is an equal, an IN, or there are no key preds in the
// first column.
// singleSubsetPrefixOrder==0 AND itIsSingleSubset
// means there is an EQUAL or there are no key preds
// in the first column, check for existance of
// predicates in this case:
if ( itIsSingleSubset // this FALSE for an IN predicate
AND keyPredsByCol[0] != NULL
)
{
thereAreSingleSubsetPreds = TRUE;
}
}
CMPASSERT(NOT histograms.isEmpty());
// Apply those key predicates that reference key columns
// before the first missing key to the histograms:
const SelectivityHint * selHint = CIDesc->getPrimaryTableDesc()->getSelectivityHint();
const CardinalityHint * cardHint = CIDesc->getPrimaryTableDesc()->getCardinalityHint();
if ( thereAreSingleSubsetPreds || selHint || cardHint )
{
// ---------------------------------------------------------
// There are some key predicates, so apply them
// to the histograms and get the total rows:
// ---------------------------------------------------------
// Get all the key preds for the key columns up to the first
// key column with no key preds (if any)
ValueIdSet singleSubsetPrefixPredicates;
for ( CollIndex i = 0; i <= singleSubsetPrefixOrder; i++ )
{
const ValueIdSet *predsPtr = keyPredsByCol[i];
CMPASSERT( predsPtr != NULL ); // it must contain preds
singleSubsetPrefixPredicates.insert( *predsPtr );
} // for every key col in the sing. subset prefix
RelExpr * dummyExpr = new (STMTHEAP) RelExpr(ITM_FIRST_ITEM_OP,
NULL,
NULL,
STMTHEAP);
histograms.applyPredicates( singleSubsetPrefixPredicates, *dummyExpr, selHint, cardHint);
} // if there are key predicates
// If there is no key predicates, a full table scan will be generated.
// Otherwise, key predicates will be applied to the histograms.
// Now, compute the number of rows after key preds are applied,
// and accounting for asynchronous parallelism:
CostScalar numRowsToScan =
((histograms.getRowCount()/activePartitions).getCeiling()).minCsOne();
return numRowsToScan;
} // CostMethodHbaseUpdateOrDelete::numRowsToScanWhenAllKeyColumnsHaveHistograms()
// -----------------------------------------------------------------------
// CostMethodHbaseUpdateOrDelete::computeIOCostsForCursorOperation().
// -----------------------------------------------------------------------
void CostMethodHbaseUpdateOrDelete::computeIOCostsForCursorOperation(
CostScalar & randomIOs, // out
CostScalar & sequentialIOs, // out
const IndexDesc * CIDesc,
const CostScalar & numRowsToScan,
NABoolean probesInOrder
) const
{
const CostScalar & kbPerBlock = CIDesc->getBlockSizeInKb();
// if rowsize is bigger than blocksize, rowsPerBlock will be 1.
const CostScalar rowsPerBlock =
((kbPerBlock * csOneKiloBytes) /
CIDesc->getNAFileSet()->getRecordLength()).getCeiling();
CostScalar totalIndexBlocks(csZero);
if (probesInOrder)
{
// If the probes are in order, assume that each successive
// probe refers to the next record in the table, i.e. the
// probes are "highly inclusive", or in other words, there
// are no gaps in the records to be updated. So, assuming
// this, the number of blocks that need to be read is
// the # of probes divided by the rows per block. This is
// guaranteed to be correct if we are updating most of the
// rows, we can only go wrong if we are updating a small
// number of dispersed rows. This seems unlikely, and anyway
// even if it's true we won't be that far off. If the rows
// are highly inclusive, we could also assume that since the
// blocks will be contiguous that there will only by one seek.
// But, we'd be way off in the case where the assumption
// doesn't hold so we won't do it for now. What we need is
// some way to determine the "inclusiveness factor".
sequentialIOs = (numRowsToScan / rowsPerBlock).getCeiling();
// The # of index blocks to read is based on the number of data
// blocks that must be read
totalIndexBlocks =
CIDesc->getEstimatedIndexBlocksLowerBound(sequentialIOs);
randomIOs = totalIndexBlocks;
}
else // probes not in order
{
// Assume all IOs are random. This is a bit pessimistic
// because at some point much of the file will be in cache,
// so one could argue that as the number of rows updated
// or deleted grows large the number of random IOs should
// decrease. We'll leave that to future work.
sequentialIOs = csZero;
totalIndexBlocks =
CIDesc->getEstimatedIndexBlocksLowerBound(numRowsToScan);
randomIOs = numRowsToScan + totalIndexBlocks;
}
} // CostMethodHbaseUpdateOrDelete::computeIOCostsForCursorOperation()
// ----QUICKSEARCH FOR HbaseUpdate........................................
/**********************************************************************/
/* */
/* CostMethodHbaseUpdate */
/* */
/**********************************************************************/
//*******************************************************************
// This method computes the cost vector of the HbaseUpdate operation
//*******************************************************************
Cost*
CostMethodHbaseUpdate::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// TODO: Write this method; the code below is a copy of the Delete
// method which we'll use for the moment. This is better than just
// a simple constant stub; we will get parallel Update plans with
// this code, for example, that we won't get with constant cost.
// The theory of operation of Update is somewhat different (since it
// might, for example, do a Delete + an Insert, or might do an Hbase
// Update -- is that decided before we get here?), so this code will
// underestimate the cost in general.
const Context * myContext = pws->getContext();
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
const InputPhysicalProperty* ippForMe =
myContext->getInputPhysicalProperty();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
HbaseUpdate* updOp = (HbaseUpdate *)op; // downcast
CMPASSERT(partFunc_ != NULL);
// Later, if and when we start using NodeMaps to track active regions for
// Trafodion tables in HBase (or native HBase tables), we can use the
// following to get active partitions.
//CostScalar activePartitions =
// (CostScalar)
// (((NodeMap *)(partFunc_->getNodeMap()))->getNumActivePartitions());
// But for now, we do the following:
CostScalar activePartitions = (CostScalar)(partFunc_->getCountOfPartitions());
const IndexDesc* CIDesc = updOp->getIndexDesc();
const CostScalar & recordSizeInKb = CIDesc->getRecordSizeInKb();
CostScalar tuplesProcessed(csZero);
CostScalar tuplesProduced(csZero);
CostScalar tuplesSent(csZero); // we use tuplesSent to model sending rowIDs to Hbase
CostScalar randomIOs(csZero);
CostScalar sequentialIOs(csZero);
CostScalar countOfAsynchronousStreams = activePartitions;
// figure out if the probes are in order - if they are, then when
// scanning, I/O will tend to be sequential
NABoolean probesInOrder = FALSE;
if (ippForMe != NULL) // input physical properties exist?
{
// See if the probes are in order.
// For delete, a partial order is ok.
NABoolean partiallyInOrderOK = TRUE;
NABoolean probesForceSynchronousAccess = FALSE;
ValueIdList targetSortKey = CIDesc->getOrderOfKeyValues();
ValueIdSet sourceCharInputs =
updOp->getGroupAttr()->getCharacteristicInputs();
ValueIdSet targetCharInputs;
// The char inputs are still in terms of the source. Map them to the target.
// Note: The source char outputs in the ipp have already been mapped to
// the target. CharOutputs are a set, meaning they do not have duplicates
// But we could have cases where two columns of the target are matched to the
// same source column, example: Sol: 10-040416-5166, where we have
// INSERT INTO b6table1
// ( SELECT f, h_to_f, f, 8.4
// FROM btre211
// );
// Hence we use lists here instead of sets.
// Check to see if there are any duplicates in the source Characteristics inputs
// if no, we shall perform set operations, as these are faster
ValueIdList bottomValues = updOp->updateToSelectMap().getBottomValues();
ValueIdSet bottomValuesSet(bottomValues);
NABoolean useListInsteadOfSet = FALSE;
CascadesGroup* group1 = (*CURRSTMT_OPTGLOBALS->memo)[updOp->getGroupId()];
GenericUpdate* upOperator = (GenericUpdate *) group1->getFirstLogExpr();
if (((upOperator->getTableName().getSpecialType() == ExtendedQualName::NORMAL_TABLE ) || (upOperator->getTableName().getSpecialType() == ExtendedQualName::GHOST_TABLE )) &&
(bottomValuesSet.entries() != bottomValues.entries() ) )
{
ValueIdList targetInputList;
// from here get all the bottom values that appear in the sourceCharInputs
bottomValues.findCommonElements(sourceCharInputs );
bottomValuesSet = bottomValues;
// we can use the bottomValues only if these contain some duplicate columns of
// characteristics inputs, otherwise we shall use the characteristics inputs.
if (bottomValuesSet == sourceCharInputs)
{
useListInsteadOfSet = TRUE;
updOp->updateToSelectMap().rewriteValueIdListUpWithIndex(
targetInputList,
bottomValues);
targetCharInputs = targetInputList;
}
}
if (!useListInsteadOfSet)
{
updOp->updateToSelectMap().rewriteValueIdSetUp(
targetCharInputs,
sourceCharInputs);
}
// If a target key column is covered by a constant on the source side,
// then we need to remove that column from the target sort key
removeConstantsFromTargetSortKey(&targetSortKey,
&(updOp->updateToSelectMap()));
NABoolean orderedNJ = TRUE;
// Don't call ordersMatch if njOuterOrder_ is null.
if (ippForMe->getAssumeSortedForCosting())
orderedNJ = FALSE;
else
// if leading keys are not same then don't try ordered NJ.
orderedNJ =
isOrderedNJFeasible(*(ippForMe->getNjOuterOrder()), targetSortKey);
if (orderedNJ AND
ordersMatch(ippForMe,
CIDesc,
&targetSortKey,
targetCharInputs,
partiallyInOrderOK,
probesForceSynchronousAccess))
{
probesInOrder = TRUE;
if (probesForceSynchronousAccess)
{
// The probes form a complete order across all partitions and
// the clustering key and partitioning key are the same. So, the
// only asynchronous I/O we will see will be due to ESPs. So,
// limit the count of streams in DP2 by the count of streams in ESP.
// Get the logPhysPartitioningFunction, which we will use
// to get the logical partitioning function. If it's NULL,
// it means the table was not partitioned at all, so we don't
// need to limit anything since there already is no asynch I/O.
// TODO: lppf is always null in Trafodion; figure out what to do instead...
const LogPhysPartitioningFunction* lppf =
partFunc_->castToLogPhysPartitioningFunction();
if (lppf != NULL)
{
PartitioningFunction* logPartFunc =
lppf->getLogPartitioningFunction();
// Get the number of ESPs:
CostScalar numParts = logPartFunc->getCountOfPartitions();
countOfAsynchronousStreams = MINOF(numParts,
countOfAsynchronousStreams);
} // lppf != NULL
} // probesForceSynchronousAccess
} // probes are in order
} // if input physical properties exist
CostScalar currentCpus =
(CostScalar)myContext->getPlan()->getPhysicalProperty()->getCurrentCountOfCPUs();
CostScalar activeCpus = MINOF(countOfAsynchronousStreams, currentCpus);
CostScalar streamsPerCpu =
(countOfAsynchronousStreams / activeCpus).getCeiling();
CostScalar noOfProbesPerPartition(csOne);
CostScalar numRowsToDelete(csOne);
CostScalar numRowsToScan(csOne);
CostScalar commonComputation;
// Determine # of rows to scan and to delete
if (updOp->getSearchKey() && updOp->getSearchKey()->isUnique() &&
(noOfProbes_ == 1))
{
// unique access
activePartitions = csOne;
countOfAsynchronousStreams = csOne;
activeCpus = csOne;
streamsPerCpu = csOne;
numRowsToScan = csOne;
// assume the 1 row always satisfies any executor predicates so
// we'll always do the Delete
numRowsToDelete = csOne;
}
else
{
// non-unique access
numRowsToDelete =
((myRowCount_ / activePartitions).getCeiling()).minCsOne();
noOfProbesPerPartition =
((noOfProbes_ / countOfAsynchronousStreams).getCeiling()).minCsOne();
// need to compute the number of rows that satisfy the key predicates
// to compute the I/Os that must be performed
// need to create a new histogram, since the one from input logical
// prop. has the histogram for the table after all executor preds are
// applied (i.e. the result cardinality)
IndexDescHistograms histograms(*CIDesc,CIDesc->getIndexKey().entries());
// retrieve all of the key preds in key column order
ColumnOrderList keyPredsByCol(CIDesc->getIndexKey());
updOp->getSearchKey()->getKeyPredicatesByColumn(keyPredsByCol);
if ( NOT allKeyColumnsHaveHistogramStatistics( histograms, CIDesc ) )
{
// All key columns do not have histogram data, the best we can
// do is use the number of rows that satisfy all predicates
// (i.e. the number of rows we will be updating)
numRowsToScan = numRowsToDelete;
}
else
{
numRowsToScan = numRowsToScanWhenAllKeyColumnsHaveHistograms(
histograms,
keyPredsByCol,
activePartitions,
CIDesc
);
if (numRowsToScan < numRowsToDelete) // sanity test
{
// we will scan at least as many rows as we delete
numRowsToScan = numRowsToDelete;
}
}
}
// Notes: At execution time, several different TCBs can be created
// for a delete. We can class them three ways: Unique, Subset, and
// Rowset. Representative examples of the three classes are:
//
// ExHbaseUMDtrafUniqueTaskTcb
// ExHbaseUMDtrafSubsetTaskTcb
// ExHbaseAccessSQRowsetTcb
//
// The theory of operation of each of these differs somewhat.
//
// For the Unique variant, we use an HBase "get" to obtain a row, apply
// a predicate to it, then do an HBase "delete" to delete it if the
// predicate is true. (If there is no predicate, we'll simply do a
// "checkAndDelete" so there would be no "get" cost.)
//
// For the Subset variant, we use an HBase "scan" to obtain a sequence
// of rows, apply a predicate to each, then do an HBase "delete" on
// each row that passes the predicate.
//
// For the Rowset variant, we simply pass all the input keys to
// HBase in batches in HBase "deleteRows" calls. (In Explain plans,
// this TCB shows up as "trafodion_delete_vsbb", while the first two
// show up as "trafodion_delete".) There is no "get" cost. In plans
// with this TCB, there is a separate Scan TCB to obtain the keys,
// which then flow to this Rowset TCB via a tuple flow or nested join.
// (Such a separate Scan might exist with the first two TCBs also,
// e.g., when an index is used to decide which rows to delete.)
// The messaging cost to HBase is also reduced since multiple delete
// keys are sent per HBase interaction.
//
// Unfortunately the decisions as to which TCB will be used are
// currently made in the generator code and so aren't easily
// available to us here. For the moment then, we make no attempt
// to distinguish a separate "get" cost, nor do we take into account
// possible reduced message cost in the Rowset case. Should this
// choice be refactored in the future to push it into the Optimizer,
// then we can do a better job here. We did attempt to distinguish
// the unique case here from the others, but even there our criteria
// are not quite the same as in the generator. So at best, this attempt
// simply sharpens the cost estimate in this one particular case.
// Compute the I/O cost
computeIOCostsForCursorOperation(
randomIOs /* out */,
sequentialIOs /* out */,
CIDesc,
numRowsToScan,
probesInOrder
);
// Compute the tuple cost
tuplesProduced = numRowsToDelete;
tuplesProcessed = numRowsToScan;
tuplesSent = numRowsToDelete;
CostScalar rowSize = updOp->getIndexDesc()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSize = updOp->getGroupAttr()->getRecordLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= rowSizeFactor;
tuplesSent *= rowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ---------------------------------------------------------------------
// Synthesize and return cost object.
// ---------------------------------------------------------------------
CostScalar probeRowSize = updOp->getIndexDesc()->getKeyLength();
Cost * updateCost =
scmCost(tuplesProcessed, tuplesProduced, tuplesSent, randomIOs, sequentialIOs, noOfProbes_,
rowSize, csZero, outputRowSize, probeRowSize);
#ifndef NDEBUG
if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
{
pfp = stdout;
fprintf(pfp, "HbaseUpdate::scmComputeOperatorCostInternal()\n");
updateCost->getScmCplr().print(pfp);
fprintf(pfp, "HBase Update elapsed time: ");
fprintf(pfp,"%f", updateCost->
convertToElapsedTime(
myContext->getReqdPhysicalProperty()).
value());
fprintf(pfp,"\n");
fprintf(pfp,"CountOfStreams returned %d\n",countOfStreams);
}
#endif
return updateCost;
}
// ----QUICKSEARCH FOR HbaseDelete........................................
/**********************************************************************/
/* */
/* CostMethodHbaseDelete */
/* */
/**********************************************************************/
//*******************************************************************
// This method computes the cost vector of the HbaseDelete operation
//*******************************************************************
Cost*
CostMethodHbaseDelete::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context * myContext = pws->getContext();
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
const InputPhysicalProperty* ippForMe =
myContext->getInputPhysicalProperty();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
HbaseDelete* delOp = (HbaseDelete *)op; // downcast
CMPASSERT(partFunc_ != NULL);
// Later, if and when we start using NodeMaps to track active regions for
// Trafodion tables in HBase (or native HBase tables), we can use the
// following to get active partitions.
//CostScalar activePartitions =
// (CostScalar)
// (((NodeMap *)(partFunc_->getNodeMap()))->getNumActivePartitions());
// But for now, we do the following:
CostScalar activePartitions = (CostScalar)(partFunc_->getCountOfPartitions());
const IndexDesc* CIDesc = delOp->getIndexDesc();
const CostScalar & recordSizeInKb = CIDesc->getRecordSizeInKb();
CostScalar tuplesProcessed(csZero);
CostScalar tuplesProduced(csZero);
CostScalar tuplesSent(csZero); // we use tuplesSent to model sending rowIDs to Hbase
CostScalar randomIOs(csZero);
CostScalar sequentialIOs(csZero);
CostScalar countOfAsynchronousStreams = activePartitions;
// figure out if the probes are in order - if they are, then when
// scanning, I/O will tend to be sequential
NABoolean probesInOrder = FALSE;
if (ippForMe != NULL) // input physical properties exist?
{
// See if the probes are in order.
// For delete, a partial order is ok.
NABoolean partiallyInOrderOK = TRUE;
NABoolean probesForceSynchronousAccess = FALSE;
ValueIdList targetSortKey = CIDesc->getOrderOfKeyValues();
ValueIdSet sourceCharInputs =
delOp->getGroupAttr()->getCharacteristicInputs();
ValueIdSet targetCharInputs;
// The char inputs are still in terms of the source. Map them to the target.
// Note: The source char outputs in the ipp have already been mapped to
// the target. CharOutputs are a set, meaning they do not have duplicates
// But we could have cases where two columns of the target are matched to the
// same source column, example: Sol: 10-040416-5166, where we have
// INSERT INTO b6table1
// ( SELECT f, h_to_f, f, 8.4
// FROM btre211
// );
// Hence we use lists here instead of sets.
// Check to see if there are any duplicates in the source Characteristics inputs
// if no, we shall perform set operations, as these are faster
ValueIdList bottomValues = delOp->updateToSelectMap().getBottomValues();
ValueIdSet bottomValuesSet(bottomValues);
NABoolean useListInsteadOfSet = FALSE;
CascadesGroup* group1 = (*CURRSTMT_OPTGLOBALS->memo)[delOp->getGroupId()];
GenericUpdate* upOperator = (GenericUpdate *) group1->getFirstLogExpr();
if (((upOperator->getTableName().getSpecialType() == ExtendedQualName::NORMAL_TABLE ) || (upOperator->getTableName().getSpecialType() == ExtendedQualName::GHOST_TABLE )) &&
(bottomValuesSet.entries() != bottomValues.entries() ) )
{
ValueIdList targetInputList;
// from here get all the bottom values that appear in the sourceCharInputs
bottomValues.findCommonElements(sourceCharInputs );
bottomValuesSet = bottomValues;
// we can use the bottomValues only if these contain some duplicate columns of
// characteristics inputs, otherwise we shall use the characteristics inputs.
if (bottomValuesSet == sourceCharInputs)
{
useListInsteadOfSet = TRUE;
delOp->updateToSelectMap().rewriteValueIdListUpWithIndex(
targetInputList,
bottomValues);
targetCharInputs = targetInputList;
}
}
if (!useListInsteadOfSet)
{
delOp->updateToSelectMap().rewriteValueIdSetUp(
targetCharInputs,
sourceCharInputs);
}
// If a target key column is covered by a constant on the source side,
// then we need to remove that column from the target sort key
removeConstantsFromTargetSortKey(&targetSortKey,
&(delOp->updateToSelectMap()));
NABoolean orderedNJ = TRUE;
// Don't call ordersMatch if njOuterOrder_ is null.
if (ippForMe->getAssumeSortedForCosting())
orderedNJ = FALSE;
else
// if leading keys are not same then don't try ordered NJ.
orderedNJ =
isOrderedNJFeasible(*(ippForMe->getNjOuterOrder()), targetSortKey);
if (orderedNJ AND
ordersMatch(ippForMe,
CIDesc,
&targetSortKey,
targetCharInputs,
partiallyInOrderOK,
probesForceSynchronousAccess))
{
probesInOrder = TRUE;
if (probesForceSynchronousAccess)
{
// The probes form a complete order across all partitions and
// the clustering key and partitioning key are the same. So, the
// only asynchronous I/O we will see will be due to ESPs. So,
// limit the count of streams in DP2 by the count of streams in ESP.
// Get the logPhysPartitioningFunction, which we will use
// to get the logical partitioning function. If it's NULL,
// it means the table was not partitioned at all, so we don't
// need to limit anything since there already is no asynch I/O.
// TODO: lppf is always null in Trafodion; figure out what to do instead...
const LogPhysPartitioningFunction* lppf =
partFunc_->castToLogPhysPartitioningFunction();
if (lppf != NULL)
{
PartitioningFunction* logPartFunc =
lppf->getLogPartitioningFunction();
// Get the number of ESPs:
CostScalar numParts = logPartFunc->getCountOfPartitions();
countOfAsynchronousStreams = MINOF(numParts,
countOfAsynchronousStreams);
} // lppf != NULL
} // probesForceSynchronousAccess
} // probes are in order
} // if input physical properties exist
CostScalar currentCpus =
(CostScalar)myContext->getPlan()->getPhysicalProperty()->getCurrentCountOfCPUs();
CostScalar activeCpus = MINOF(countOfAsynchronousStreams, currentCpus);
CostScalar streamsPerCpu =
(countOfAsynchronousStreams / activeCpus).getCeiling();
CostScalar noOfProbesPerPartition(csOne);
CostScalar numRowsToDelete(csOne);
CostScalar numRowsToScan(csOne);
CostScalar commonComputation;
// Determine # of rows to scan and to delete
if (delOp->getSearchKey() && delOp->getSearchKey()->isUnique() &&
(noOfProbes_ == 1))
{
// unique access
activePartitions = csOne;
countOfAsynchronousStreams = csOne;
activeCpus = csOne;
streamsPerCpu = csOne;
numRowsToScan = csOne;
// assume the 1 row always satisfies any executor predicates so
// we'll always do the Delete
numRowsToDelete = csOne;
}
else
{
// non-unique access
numRowsToDelete =
((myRowCount_ / activePartitions).getCeiling()).minCsOne();
noOfProbesPerPartition =
((noOfProbes_ / countOfAsynchronousStreams).getCeiling()).minCsOne();
// need to compute the number of rows that satisfy the key predicates
// to compute the I/Os that must be performed
// need to create a new histogram, since the one from input logical
// prop. has the histogram for the table after all executor preds are
// applied (i.e. the result cardinality)
IndexDescHistograms histograms(*CIDesc,CIDesc->getIndexKey().entries());
// retrieve all of the key preds in key column order
ColumnOrderList keyPredsByCol(CIDesc->getIndexKey());
delOp->getSearchKey()->getKeyPredicatesByColumn(keyPredsByCol);
if ( NOT allKeyColumnsHaveHistogramStatistics( histograms, CIDesc ) )
{
// All key columns do not have histogram data, the best we can
// do is use the number of rows that satisfy all predicates
// (i.e. the number of rows we will be updating)
numRowsToScan = numRowsToDelete;
}
else
{
numRowsToScan = numRowsToScanWhenAllKeyColumnsHaveHistograms(
histograms,
keyPredsByCol,
activePartitions,
CIDesc
);
if (numRowsToScan < numRowsToDelete) // sanity test
{
// we will scan at least as many rows as we delete
numRowsToScan = numRowsToDelete;
}
}
}
// Notes: At execution time, several different TCBs can be created
// for a delete. We can class them three ways: Unique, Subset, and
// Rowset. Representative examples of the three classes are:
//
// ExHbaseUMDtrafUniqueTaskTcb
// ExHbaseUMDtrafSubsetTaskTcb
// ExHbaseAccessSQRowsetTcb
//
// The theory of operation of each of these differs somewhat.
//
// For the Unique variant, we use an HBase "get" to obtain a row, apply
// a predicate to it, then do an HBase "delete" to delete it if the
// predicate is true. (If there is no predicate, we'll simply do a
// "checkAndDelete" so there would be no "get" cost.)
//
// For the Subset variant, we use an HBase "scan" to obtain a sequence
// of rows, apply a predicate to each, then do an HBase "delete" on
// each row that passes the predicate.
//
// For the Rowset variant, we simply pass all the input keys to
// HBase in batches in HBase "deleteRows" calls. (In Explain plans,
// this TCB shows up as "trafodion_delete_vsbb", while the first two
// show up as "trafodion_delete".) There is no "get" cost. In plans
// with this TCB, there is a separate Scan TCB to obtain the keys,
// which then flow to this Rowset TCB via a tuple flow or nested join.
// (Such a separate Scan might exist with the first two TCBs also,
// e.g., when an index is used to decide which rows to delete.)
// The messaging cost to HBase is also reduced since multiple delete
// keys are sent per HBase interaction.
//
// Unfortunately the decisions as to which TCB will be used are
// currently made in the generator code and so aren't easily
// available to us here. For the moment then, we make no attempt
// to distinguish a separate "get" cost, nor do we take into account
// possible reduced message cost in the Rowset case. Should this
// choice be refactored in the future to push it into the Optimizer,
// then we can do a better job here. We did attempt to distinguish
// the unique case here from the others, but even there our criteria
// are not quite the same as in the generator. So at best, this attempt
// simply sharpens the cost estimate in this one particular case.
// Compute the I/O cost
computeIOCostsForCursorOperation(
randomIOs /* out */,
sequentialIOs /* out */,
CIDesc,
numRowsToScan,
probesInOrder
);
// Compute the tuple cost
tuplesProduced = numRowsToDelete;
tuplesProcessed = numRowsToScan;
tuplesSent = numRowsToDelete;
CostScalar rowSize = delOp->getIndexDesc()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
CostScalar outputRowSize = delOp->getGroupAttr()->getRecordLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
tuplesProcessed *= rowSizeFactor;
tuplesSent *= rowSizeFactor;
tuplesProduced *= outputRowSizeFactor;
// ---------------------------------------------------------------------
// Synthesize and return cost object.
// ---------------------------------------------------------------------
CostScalar probeRowSize = delOp->getIndexDesc()->getKeyLength();
Cost * deleteCost =
scmCost(tuplesProcessed, tuplesProduced, tuplesSent, randomIOs, sequentialIOs, noOfProbes_,
rowSize, csZero, outputRowSize, probeRowSize);
#ifndef NDEBUG
if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
{
pfp = stdout;
fprintf(pfp, "HbaseDelete::scmComputeOperatorCostInternal()\n");
deleteCost->getScmCplr().print(pfp);
fprintf(pfp, "HBase Delete elapsed time: ");
fprintf(pfp,"%f", deleteCost->
convertToElapsedTime(
myContext->getReqdPhysicalProperty()).
value());
fprintf(pfp,"\n");
fprintf(pfp,"CountOfStreams returned %d\n",countOfStreams);
}
#endif
return deleteCost;
} // CostMethodHbaseDelete::scmComputeOperatorCostInternal()
// ----QUICKSEARCH FOR HbaseInsert ........................................
/**********************************************************************/
/* */
/* CostMethodHbaseInsert */
/* */
/**********************************************************************/
//**************************************************************
// This method computes the cost vector of the HbaseInsert operation
//**************************************************************
Cost*
CostMethodHbaseInsert::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
// TODO: For now assume we are always doing an HBase insert.
// Is it possible we go through this code path for Hive inserts?
// If so, figure out what to do.
HbaseInsert* insOp = (HbaseInsert *)op; // downcast
// compute some details
const Context * myContext = pws->getContext();
Cost *costPtr = computeOperatorCostInternal(op, myContext, countOfStreams);
CostScalar noOfProbesPerStream(csOne);
// the number of rows to insert (this is "per-stream" costing).
CostScalar numOfProbesPerStream =
(noOfProbes_ / countOfAsynchronousStreams_).minCsOne();
CostScalar tuplesProcessed = numOfProbesPerStream;
CostScalar tuplesProduced = numOfProbesPerStream;
CostScalar ioRand = csZero; // we don't bother estimating this
CostScalar ioSeq = csZero; // we don't bother estimating this
// Factor in row sizes.
CostScalar rowSize = ((IndexDesc *)insOp->getIndexDesc())->getNAFileSet()->getRecordLength();
CostScalar rowSizeFactor = scmRowSizeFactor(rowSize);
tuplesProcessed *= rowSizeFactor;
tuplesProduced *= rowSizeFactor;
// there doesn't seem to be an estRowsAccessed_ member or
// related methods in hbaseInsert at the moment... add it
// when the need becomes apparent
//insOp->setEstRowsAccessed(noOfProbes_);
//----------------------------------------
// Synthesize and return the cost object.
//----------------------------------------
Cost *hbaseInsertCost =
scmCost(tuplesProcessed, tuplesProduced, csZero, ioRand, ioSeq, noOfProbesPerStream_,
rowSize, csZero, rowSize, csZero);
#ifndef NDEBUG
NABoolean printCost =
(CmpCommon::getDefault(OPTIMIZER_PRINT_COST) == DF_ON);
if (printCost)
{
pfp = stdout;
fprintf(pfp, "HbaseInsert::scmComputeOperatorCostInternal()\n");
hbaseInsertCost->getScmCplr().print(pfp);
fprintf(pfp, "Hbase Insert elapsed time: ");
fprintf(pfp, "%f", hbaseInsertCost->convertToElapsedTime(myContext->getReqdPhysicalProperty()).value());
fprintf(pfp, "\n");
fprintf(pfp,"CountOfStreams returned %d\n",countOfStreams);
}
#endif
// We use the call to computeOperatorCostInternal() to compute the various costs, but we
// we do not need the cost object computed by this method, we generate and return
// a different New Cost Model (NCM) cost object.
delete costPtr;
return hbaseInsertCost;
}
/**********************************************************************/
// End cost methods for WRITE DML operations
/**********************************************************************/
//<pb>
//**************************************************************
// This method computes the cost vector of the IsolatedScalarUDF operation
//**************************************************************
Cost*
CostMethodIsolatedScalarUDF::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// -----------------------------------------
// Save off estimated degree of parallelism.
// -----------------------------------------
countOfStreams = countOfStreams_;
IsolatedScalarUDF *udf = (IsolatedScalarUDF *) op;
// Make sure we are an UDF.
CMPASSERT( udf != NULL );
CMPASSERT( op->getOperatorType() == REL_ISOLATED_SCALAR_UDF );
// Get the size of the inputs.
RowSize inputRowBytes = udf->getGroupAttr()->getInputVarLength();
// -----------------------------------------
// Determine the number of input Rows/Probes
// -----------------------------------------
CostScalar noOfProbes =
( myContext->getInputLogProp()->getResultCardinality() ).minCsOne();
noOfProbes -= csOne; // subtract one to account for the first row.
// Make sure we have a RoutineDesc.
CMPASSERT( udf->getRoutineDesc() != NULL );
SimpleCostVector &initialCostV = udf->getRoutineDesc()->getEffInitialRowCostVector();
SimpleCostVector &normalCostV = udf->getRoutineDesc()->getEffNormalRowCostVector();
// Gather the different cost numbers
CostScalar initialCpuCost = initialCostV.getCPUTime();
CostScalar initialMsgCost = initialCostV.getMessageTime();
CostScalar initialIOCost = initialCostV.getIOTime();
CostScalar normalCpuCost = normalCostV.getCPUTime();
CostScalar normalMsgCost = normalCostV.getMessageTime();
CostScalar normalIOCost = normalCostV.getIOTime();
CostScalar fanOut = udf->getRoutineDesc()->getEffFanOut();
CostScalar inputRowSize = inputRowBytes;
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSize = udf->getGroupAttr()->getRecordLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// Normalize the rowCount (inputProbes) over the number of streams.
CostScalar rowCount = (noOfProbes/countOfStreams);
// Cost for First probe.
CostScalar tuplesProcessed = initialCpuCost * inputRowSizeFactor;
CostScalar tuplesProduced = initialIOCost * outputRowSizeFactor;
CostScalar tuplesSent = initialMsgCost * inputRowSizeFactor;
// Cost for subsequent probes
tuplesProcessed += rowCount * normalCpuCost * inputRowSizeFactor;
tuplesProduced += rowCount * normalCpuCost * fanOut * normalIOCost * outputRowSizeFactor;
// Assume we produce 2 messages to the UDF server per probe.
// per output row(fanOut).
tuplesSent += rowCount * normalMsgCost * 2 * fanOut * inputRowSizeFactor;
Cost* tableRoutineCost = scmCost(tuplesProcessed, tuplesProduced,
tuplesSent, csZero, csZero, csZero,
inputRowSize, csZero, outputRowSize, csZero);
return tableRoutineCost;
}
//**************************************************************
// This method computes the cost vector of the TableMappingUDF operation
//**************************************************************
Cost*
CostMethodTableMappingUDF::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// Save off estimated degree of parallelism.
countOfStreams = countOfStreams_;
TableMappingUDF *udf = (TableMappingUDF *) op;
// Make sure we are an UDF.
CMPASSERT( udf != NULL );
CMPASSERT( op->castToTableMappingUDF() );
const TMUDFPlanWorkSpace *tmudfPWS = static_cast<const TMUDFPlanWorkSpace *>(pws);
const tmudr::UDRPlanInfo *planInfo = tmudfPWS->getUDRPlanInfo();
EstLogPropSharedPtr outputLP = op->getGroupAttr()->outputLogProp( inputLP );
CostScalar outputRowsPerStream = outputLP->getResultCardinality() / countOfStreams;
// Get the size of the scalar inputs and output row
RowSize inputRowSize = udf->getGroupAttr()->getInputVarLength();
RowSize outputRowSize = op->getGroupAttr()->getRecordLength();
// -----------------------------------------
// Determine the number of input Rows/Probes
// -----------------------------------------
CostScalar noOfProbes =
( inputLP->getResultCardinality() ).minCsOne();
// per stream Rows from child
CostScalar tuplesProcessed = csZero;
for (int i=0; i < op->getArity(); i++)
{
EstLogPropSharedPtr childOutputLP = op->child(i).outputLogProp( inputLP );
CostScalar rowsFromChildPerStream =
childOutputLP->getResultCardinality() / countOfStreams;
RowSize childOutputRowBytes = udf->child(i).getGroupAttr()->getInputVarLength();
CostScalar childOutputRowSizeFactor = scmRowSizeFactor(childOutputRowBytes);
tuplesProcessed += rowsFromChildPerStream * childOutputRowSizeFactor;
}
// tuples produced
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
CostScalar tuplesProduced = outputRowsPerStream * outputRowSizeFactor;
// We send all child output rows to the UDR server and receive all
// the output rows back, so all processed and produced tuples are also sent
CostScalar tuplesSent = csZero;
// For isolated TMUDFs, add the cost of sending all child outputs
// to the UDR server and sending all the results back from the UDR server,
// this is in addition to any cost estimates the UDF writer provided.
// For now, add this unconditionally, since all TMUDFs go through tdm_udrserv.
// if (udf->getInvocationInfo()->getIsolationType() ==
// tmudr::UDRInvocationInfo::ISOLATED)
tuplesSent = tuplesProcessed + tuplesProduced;
if (planInfo->getCostPerRow() > 0)
{
// The UDF writer specified a cost per row (per stream) in nanoseconds,
// convert that to internal units and create a cost by interpreting the
// UDF cost per row as tuples produced
CostScalar rowNanosecFactor =
ActiveSchemaDB()->getDefaults().getAsDouble(NCM_UDR_NANOSEC_FACTOR);
tuplesProduced = outputRowsPerStream * rowNanosecFactor * planInfo->getCostPerRow();
// in this case, set tuplesProcessed to 0, since that cost is
// included in the per output tuple cost specified by the UDF writer
tuplesProcessed = csZero;
// Note that we still add cost for sending tuples back and forth,
// if applicable, in tuplesSent.
}
Cost* tableRoutineCost = scmCost(tuplesProcessed, tuplesProduced,
tuplesSent, csZero, csZero, csOne,
inputRowSize, csZero, outputRowSize, csZero);
return tableRoutineCost;
}
//**************************************************************
// This method computes the cost vector of the FastExtract operation
//**************************************************************
Cost*
CostMethodFastExtract::scmComputeOperatorCostInternal(RelExpr* op,
const PlanWorkSpace* pws,
Lng32& countOfStreams)
{
const Context* myContext = pws->getContext();
EstLogPropSharedPtr inputLP = myContext->getInputLogProp();
// ---------------------------------------------------------------------
// Preparatory work.
// ---------------------------------------------------------------------
cacheParameters(op,myContext);
estimateDegreeOfParallelism();
// Save off estimated degree of parallelism.
countOfStreams = countOfStreams_;
// Get the size of the inputs.
RowSize inputRowBytes = op->getGroupAttr()->getInputVarLength();
// -----------------------------------------
// Determine the number of input Rows/Probes
// -----------------------------------------
CostScalar noOfProbes =
( inputLP->getResultCardinality() ).minCsOne();
noOfProbes -= csOne; // subtract one to account for the first row.
CostScalar inputRowSize = inputRowBytes;
CostScalar inputRowSizeFactor = scmRowSizeFactor(inputRowSize);
CostScalar outputRowSize = op->getGroupAttr()->getRecordLength();
CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
// per stream Rows from child
CostScalar rowsFromChildPerStream ;
EstLogPropSharedPtr childOutputLP = op->child(0).outputLogProp( inputLP );
rowsFromChildPerStream = childOutputLP->getResultCardinality() / countOfStreams;
// Cost for subsequent probes
CostScalar tuplesProcessed = rowsFromChildPerStream * inputRowSizeFactor;
CostScalar tuplesProduced = rowsFromChildPerStream * outputRowSizeFactor;
// Assume we produce 2 messages to the UDF server per probe.
// per output row(fanOut).
CostScalar tuplesSent = tuplesProcessed;
Cost* fastExtractCost = scmCost(tuplesProcessed, tuplesProduced,
tuplesSent, csZero, csZero, csZero,
inputRowSize, csZero, outputRowSize, csZero);
return fastExtractCost;
}