blob: 17a6dce88c2119c20234b8b23beeca801b37d535 [file] [log] [blame]
// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************
//
#include "ComOptIncludes.h"
#include "Generator.h"
#include "GenExpGenerator.h"
#include "GroupAttr.h"
#include "ExpCriDesc.h"
#include "DefaultConstants.h"
#include "ComTdbProbeCache.h"
#include "RelProbeCache.h"
#include "DefaultConstants.h"
#include "ExpSqlTupp.h" // for sizeof(tupp_descriptor)
#include "ComDefs.h" // to get common defines (ROUND8)
/////////////////////////////////////////////////////////////////////
//
// Contents:
//
// ProbeCache::codeGen()
//
//////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
//
// Layout at this node:
// (not including the mandatory consts and temps entries)
//
// Returned atp layout (to parent of ProbeCache):
//
// |-------------| |------------------------------|
// | input data | | input data | output row |
// | ( I tupps ) | | ( I tupps ) | ( 1 tupp ) |
// |-------------| |------------------------------|
// <- input ATP -> <-- returned ATP to parent ---->
//
// input data: the atp input to this node by its parent.
// output row: optional tupp for cached inner table result. Note
// that there is no output row (and no tupp) if
// ProbeCache is on the right of a semi-join or
// anti-semi-join.
//
//
// ATP layout of queue entries between ProbeCache node and its child:
//
// |-------------| |-----------------------------|
// | input data | | input data | output row |
// | ( I tupps ) | | ( I tupps )| ( J tupps ) |
// |-------------| |-----------------------------|
// <ATP to child > <-- returned ATP to parent --->
//
// input data: same as atp input from this node's parent.
// output row: optional inner table result returned from child.
// See note above about semi-join and anti-semi-join.
//
//
// Work atp layout:
//
// |----------------------------------------------------------|
// | Constants| Temps | Probe Hash| Probe Row | Inner Row |
// | (1 tupp) | (1 tupp) | (1 tupp) | (1 tupp) | (1 tupp) |
// |----------------------------------------------------------|
//
// Constants: the constants tupp from the given Atp.
// Temps: the temps tupp from the given Atp.
// Probe Hash: the target for the hash of the probe input hash
// Probe Row: the target for the probe input encode expr.
// Inner Row: the target for the inner row's move expr,
// if ProbeCache is not on the right of a
// semi-join or anti-semi-join.
//
////////////////////////////////////////////////////////////////////////////
short ProbeCache::codeGen(Generator *generator)
{
ExpGenerator * exp_gen = generator->getExpGenerator();
Space * space = generator->getSpace();
MapTable * last_map_table = generator->getLastMapTable();
ex_cri_desc * given_desc
= generator->getCriDesc(Generator::DOWN);
ex_cri_desc * returned_desc
= new(space) ex_cri_desc(given_desc->noTuples() + 1, space);
// cri descriptor for work atp has 5 entries:
// entry #0 for const
// entry #1 for temp
// entry #2 for hash value of probe input data in Probe Cache Manager
// entry #3 for encoded probe input data in Probe Cache Manager
// enrry #4 for inner table row data in this operator's cache buffer
Int32 work_atp = 1;
ex_cri_desc * work_cri_desc = new(space) ex_cri_desc(5, space);
unsigned short hashValIdx = 2;
unsigned short encodedProbeDataIdx = 3;
unsigned short innerRowDataIdx = 4;
// generate code for child tree, and get its tdb and explain tuple.
child(0)->codeGen(generator);
ComTdb * child_tdb = (ComTdb *)(generator->getGenObj());
ExplainTuple *childExplainTuple = generator->getExplainTuple();
//////////////////////////////////////////////////////
// Generate up to 4 runtime expressions.
//////////////////////////////////////////////////////
// Will use child's char. inputs (+ execution count) for the next
// two runtime expressions.
ValueIdList inputsToUse = child(0).getGroupAttr()->getCharacteristicInputs();
inputsToUse.insert(generator->getOrAddStatementExecutionCount());
// Expression #1 gets the hash value of the probe input data
ValueIdList hvAsList;
// Executor has hard-coded assumption that the result is long,
// so add a Cast node to convert result to a long.
ItemExpr *probeHashAsIe = new (generator->wHeap())
HashDistPartHash(inputsToUse.rebuildExprTree(ITM_ITEM_LIST));
probeHashAsIe->bindNode(generator->getBindWA());
NumericType &nTyp = (NumericType &)probeHashAsIe->getValueId().getType();
GenAssert(nTyp.isSigned() == FALSE,
"Unexpected signed HashDistPartHash.");
GenAssert(probeHashAsIe->getValueId().getType().supportsSQLnullLogical()
== FALSE, "Unexpected nullable HashDistPartHash.");
ItemExpr *hvAsIe = new (generator->wHeap()) Cast(
probeHashAsIe,
new (generator->wHeap())
SQLInt(generator->wHeap(), FALSE, // false == unsigned.
FALSE // false == not nullable.
));
hvAsIe->bindNode(generator->getBindWA());
hvAsList.insert(hvAsIe->getValueId());
ex_expr *hvExpr = NULL;
ULng32 hvLength;
exp_gen->generateContiguousMoveExpr(
hvAsList,
0, // don't add convert node
work_atp,
hashValIdx,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
hvLength,
&hvExpr);
GenAssert(hvLength == sizeof(Lng32),
"Unexpected length of result of hash function.");
// Expression #2 encodes the probe input data for storage in
// the ProbeCacheManager.
ValueIdList encodeInputAsList;
CollIndex inputListIndex;
for (inputListIndex = 0;
inputListIndex < inputsToUse.entries();
inputListIndex++) {
ItemExpr *inputIe =
(inputsToUse[inputListIndex].getValueDesc())->getItemExpr();
if (inputIe->getValueId().getType().getVarLenHdrSize() > 0)
{
// This logic copied from Sort::codeGen().
// Explode varchars by moving them to a fixed field
// whose length is equal to the max length of varchar.
// 5/8/98: add support for VARNCHAR
const CharType& char_type =
(CharType&)(inputIe->getValueId().getType());
if (!CollationInfo::isSystemCollation(char_type.getCollation()))
{
inputIe = new(generator->wHeap())
Cast (inputIe,
(new(generator->wHeap())
SQLChar(
generator->wHeap(), CharLenInfo(char_type.getStrCharLimit(), char_type.getDataStorageSize()),
char_type.supportsSQLnull(),
FALSE, FALSE, FALSE,
char_type.getCharSet(),
char_type.getCollation(),
char_type.getCoercibility()
)
)
);
}
}
CompEncode * enode = new(generator->wHeap())
CompEncode(inputIe, FALSE /* ascend/descend doesn't matter*/);
enode->bindNode(generator->getBindWA());
encodeInputAsList.insert(enode->getValueId());
}
ex_expr *encodeInputExpr = NULL;
ULng32 encodedInputLength;
exp_gen->generateContiguousMoveExpr(encodeInputAsList,
0, //don't add conv nodes
work_atp, encodedProbeDataIdx,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
encodedInputLength, &encodeInputExpr);
// Expression #3 moves the inner table data into a buffer pool.
// This is also the tuple returned to ProbeCache's parent.
ex_expr * innerRecExpr = NULL;
ValueIdList innerTableAsList = getGroupAttr()->getCharacteristicOutputs();
//////////////////////////////////////////////////////
// Describe the returned row and add the returned
// values to the map table.
//////////////////////////////////////////////////////
// determine internal format
NABoolean useCif = FALSE;
ExpTupleDesc::TupleDataFormat tupleFormat = generator->getInternalFormat();
//tupleFormat = determineInternalFormat( innerTableAsList, this, useCif,generator);
ULng32 innerRecLength = 0;
ExpTupleDesc * innerRecTupleDesc = 0;
MapTable * returnedMapTable = NULL;
exp_gen->generateContiguousMoveExpr(innerTableAsList,
-1, // do add conv nodes
work_atp, innerRowDataIdx,
tupleFormat,
innerRecLength, &innerRecExpr,
&innerRecTupleDesc, ExpTupleDesc::SHORT_FORMAT,
&returnedMapTable);
returned_desc->setTupleDescriptor(returned_desc->noTuples() - 1,
innerRecTupleDesc);
// remove all appended map tables and return the returnedMapTable
generator->removeAll(last_map_table);
generator->appendAtEnd(returnedMapTable);
// This returnedMapTable will contain the value ids that are being returned
// (the inner table probed).
// Massage the atp and atp_index of the innerTableAsList.
for (CollIndex i = 0; i < innerTableAsList.entries(); i++)
{
ValueId innerValId = innerTableAsList[i];
Attributes *attrib =
generator->getMapInfo(innerValId)->getAttr();
// All reference to the returned values from this point on
// will be at atp = 0, atp_index = last entry in returned desc.
attrib->setAtp(0);
attrib->setAtpIndex(returned_desc->noTuples() - 1);
}
// Expression #4 is a selection predicate, to be applied
// before returning rows to the parent
ex_expr * selectPred = NULL;
if (!selectionPred().isEmpty())
{
ItemExpr * selPredTree =
selectionPred().rebuildExprTree(ITM_AND,TRUE,TRUE);
exp_gen->generateExpr(selPredTree->getValueId(),
ex_expr::exp_SCAN_PRED,
&selectPred);
}
//////////////////////////////////////////////////////
// Prepare params for ComTdbProbeCache.
//////////////////////////////////////////////////////
queue_index pDownSize = (queue_index)getDefault(GEN_PROBE_CACHE_SIZE_DOWN);
queue_index pUpSize = (queue_index)getDefault(GEN_PROBE_CACHE_SIZE_UP);
// Make sure that the ProbeCache queues can support the childs queues.
if(pDownSize < child_tdb->getInitialQueueSizeDown()) {
pDownSize = child_tdb->getInitialQueueSizeDown();
pDownSize = MINOF(pDownSize, 32768);
}
if(pUpSize < child_tdb->getInitialQueueSizeUp()) {
pUpSize = child_tdb->getInitialQueueSizeUp();
pUpSize = MINOF(pUpSize, 32768);
}
ULng32 pcNumEntries = numCachedProbes_;
// Number of entries in the probe cache cannot be less than
// max parent down queue size. Before testing and adjusting the
// max queue size, it is necessary to make sure it is a power of
// two, rounding up if necessary. This is to match the logic in
// ex_queue::resize.
queue_index pdq2 = 1;
queue_index bits = pDownSize;
while (bits && pdq2 < pDownSize) {
bits = bits >> 1;
pdq2 = pdq2 << 1;
}
if (pcNumEntries < pdq2)
pcNumEntries = pdq2;
numInnerTuples_ = getDefault(GEN_PROBE_CACHE_NUM_INNER);
if (innerRecExpr == NULL)
{
// For semi-join and anti-semi-join, executor need not allocate
// a buffer. Set the tdb's buffer size to 0 to be consistent.
numInnerTuples_ = 0;
}
else if (numInnerTuples_ == 0)
{
// Handle special value, 0, which tells code gen to
// decided on buffer size: i.e., large enough to accomodate
// all parent up queue entries and all probe cache entries
// having a different inner table row.
// As we did for the down queue, make sure the up queue size
// specified is a power of two.
queue_index puq2 = 1;
queue_index bits = pUpSize;
while (bits && puq2 < pUpSize) {
bits = bits >> 1;
puq2 = puq2 << 1;
}
numInnerTuples_ = puq2 + pcNumEntries;
}
else
{
// Just use the default. ExSimpleSqlBuffer takes care
// that there is room enough for one row.
}
double memoryLimitPerInstance =
ActiveSchemaDB()->getDefaults().getAsLong(EXE_MEMORY_FOR_PROBE_CACHE_IN_MB) * 1024 * 1024;
double estimatedMemory;
if (numInnerTuples_ > 0) {
estimatedMemory = numInnerTuples_ * innerRecLength;
if (estimatedMemory > memoryLimitPerInstance) {
numInnerTuples_ = memoryLimitPerInstance / innerRecLength;
queue_index pUpSize_calc;
pUpSize_calc = numInnerTuples_ ;
queue_index pq2 = 1;
queue_index bits = pUpSize_calc;
while (bits && pq2 < pUpSize_calc) {
bits = bits >> 1;
pq2 = pq2 << 1;
}
pUpSize = MINOF(pq2/2, 4);
pDownSize = MINOF(pq2/2, 4);
pcNumEntries = MINOF(numInnerTuples_, 4);
numInnerTuples_ = pcNumEntries;
numCachedProbes_ = pcNumEntries;
}
}
///////////////////////////////////////////////////////
// Create and initialize the ComTdbProbeCache, generate
// the explain tuple, set the new up queue cri desc
// and give the generator this newly generated object.
///////////////////////////////////////////////////////
ComTdbProbeCache *probeCacheTdb = new (space) ComTdbProbeCache (
hvExpr,
encodeInputExpr,
innerRecExpr,
selectPred,
encodedInputLength,
innerRecLength,
pcNumEntries,
returned_desc->noTuples() - 1,
hashValIdx,
encodedProbeDataIdx,
innerRowDataIdx,
child_tdb,
given_desc,
returned_desc,
pDownSize,
pUpSize,
(Cardinality)
(getInputCardinality() * getEstRowsUsed()).getValue(),
numInnerTuples_) ;
generator->initTdbFields(probeCacheTdb);
// Make sure that the ProbeCache queues were not set too large
if ((probeCacheTdb->getInitialQueueSizeDown() > pDownSize) ||
(probeCacheTdb->getInitialQueueSizeUp() > pUpSize)) {
probeCacheTdb->setQueueResizeParams(pDownSize,
pUpSize,
probeCacheTdb->getQueueResizeLimit(),
probeCacheTdb->getQueueResizeFactor());
}
double probeCacheMemEst = getEstimatedRunTimeMemoryUsage(generator, probeCacheTdb);
generator->addToTotalEstimatedMemory(probeCacheMemEst);
Lng32 pcMemEstInKBPerNode = getEstimatedRunTimeMemoryUsage(generator, TRUE).value() / 1024;
if(!generator->explainDisabled()) {
generator->setExplainTuple(
addExplainInfo(probeCacheTdb, childExplainTuple, 0, generator));
}
generator->setCriDesc(returned_desc, Generator::UP);
generator->setGenObj(this, probeCacheTdb);
return 0;
}
CostScalar ProbeCache::getEstimatedRunTimeMemoryUsage(Generator *generator, NABoolean perNode, Lng32 *numStreams)
{
const Lng32 probeSize =
getGroupAttr()->getCharacteristicInputs().getRowLength();
const Lng32 numCacheEntries =
ActiveSchemaDB()->getDefaults().getAsLong(GEN_PROBE_CACHE_NUM_ENTRIES);
const Int32 perProbeOverhead = 32 ; // bytes
const double cacheSize = (probeSize + perProbeOverhead) * numCacheEntries; // in bytes
const Lng32 resultSize =
getGroupAttr()->getCharacteristicOutputs().getRowLength();
Lng32 numBufferPoolEntries =
ActiveSchemaDB()->getDefaults().getAsLong(GEN_PROBE_CACHE_NUM_INNER);
if (numBufferPoolEntries == 0)
{
numBufferPoolEntries = numCacheEntries +
ActiveSchemaDB()->getDefaults().getAsLong(GEN_PROBE_CACHE_SIZE_UP);
}
const double outputBufferSize = resultSize * numBufferPoolEntries; // in bytes
// totalMemory is perNode at this point of time.
double totalMemory = cacheSize + outputBufferSize;
Lng32 numOfStreams = 1;
const PhysicalProperty* const phyProp = getPhysicalProperty();
if (phyProp)
{
PartitioningFunction * partFunc = phyProp -> getPartitioningFunction() ;
numOfStreams = partFunc->getCountOfPartitions();
if (numOfStreams <= 0)
numOfStreams = 1;
double memoryLimitPerInstance =
ActiveSchemaDB()->getDefaults().getAsLong(EXE_MEMORY_FOR_PROBE_CACHE_IN_MB) * 1024 * 1024;
if (totalMemory > memoryLimitPerInstance)
totalMemory = memoryLimitPerInstance;
totalMemory *= numOfStreams;
}
if (numStreams != NULL)
*numStreams = numOfStreams;
if (perNode)
totalMemory /= MINOF(MAXOF(((NAClusterInfoLinux*)gpClusterInfo)->getTotalNumberOfCPUs(), 1), numOfStreams);
else
totalMemory /= numOfStreams;
return totalMemory;
}
double ProbeCache::getEstimatedRunTimeMemoryUsage(Generator *generator, ComTdb * tdb)
{
// tdb is ignored for ProbeCache because this operator
// does not participate in the BMO quota system.
Lng32 numOfStreams = 1;
CostScalar totalMemory = getEstimatedRunTimeMemoryUsage(generator, FALSE, &numOfStreams);
totalMemory = totalMemory * numOfStreams ;
return totalMemory.value();
}