| // ********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // ********************************************************************** |
| // |
| #include "ComOptIncludes.h" |
| #include "Generator.h" |
| #include "GenExpGenerator.h" |
| #include "GroupAttr.h" |
| #include "ExpCriDesc.h" |
| #include "DefaultConstants.h" |
| #include "ComTdbProbeCache.h" |
| #include "RelProbeCache.h" |
| #include "DefaultConstants.h" |
| #include "ExpSqlTupp.h" // for sizeof(tupp_descriptor) |
| #include "ComDefs.h" // to get common defines (ROUND8) |
| |
| ///////////////////////////////////////////////////////////////////// |
| // |
| // Contents: |
| // |
| // ProbeCache::codeGen() |
| // |
| ////////////////////////////////////////////////////////////////////// |
| |
| /////////////////////////////////////////////////////////////////// |
| // |
| // Layout at this node: |
| // (not including the mandatory consts and temps entries) |
| // |
| // Returned atp layout (to parent of ProbeCache): |
| // |
| // |-------------| |------------------------------| |
| // | input data | | input data | output row | |
| // | ( I tupps ) | | ( I tupps ) | ( 1 tupp ) | |
| // |-------------| |------------------------------| |
| // <- input ATP -> <-- returned ATP to parent ----> |
| // |
| // input data: the atp input to this node by its parent. |
| // output row: optional tupp for cached inner table result. Note |
| // that there is no output row (and no tupp) if |
| // ProbeCache is on the right of a semi-join or |
| // anti-semi-join. |
| // |
| // |
| // ATP layout of queue entries between ProbeCache node and its child: |
| // |
| // |-------------| |-----------------------------| |
| // | input data | | input data | output row | |
| // | ( I tupps ) | | ( I tupps )| ( J tupps ) | |
| // |-------------| |-----------------------------| |
| // <ATP to child > <-- returned ATP to parent ---> |
| // |
| // input data: same as atp input from this node's parent. |
| // output row: optional inner table result returned from child. |
| // See note above about semi-join and anti-semi-join. |
| // |
| // |
| // Work atp layout: |
| // |
| // |----------------------------------------------------------| |
| // | Constants| Temps | Probe Hash| Probe Row | Inner Row | |
| // | (1 tupp) | (1 tupp) | (1 tupp) | (1 tupp) | (1 tupp) | |
| // |----------------------------------------------------------| |
| // |
| // Constants: the constants tupp from the given Atp. |
| // Temps: the temps tupp from the given Atp. |
| // Probe Hash: the target for the hash of the probe input hash |
| // Probe Row: the target for the probe input encode expr. |
| // Inner Row: the target for the inner row's move expr, |
| // if ProbeCache is not on the right of a |
| // semi-join or anti-semi-join. |
| // |
| //////////////////////////////////////////////////////////////////////////// |
| |
| |
| short ProbeCache::codeGen(Generator *generator) |
| { |
| ExpGenerator * exp_gen = generator->getExpGenerator(); |
| Space * space = generator->getSpace(); |
| |
| MapTable * last_map_table = generator->getLastMapTable(); |
| |
| ex_cri_desc * given_desc |
| = generator->getCriDesc(Generator::DOWN); |
| |
| ex_cri_desc * returned_desc |
| = new(space) ex_cri_desc(given_desc->noTuples() + 1, space); |
| |
| // cri descriptor for work atp has 5 entries: |
| // entry #0 for const |
| // entry #1 for temp |
| // entry #2 for hash value of probe input data in Probe Cache Manager |
| // entry #3 for encoded probe input data in Probe Cache Manager |
| // enrry #4 for inner table row data in this operator's cache buffer |
| Int32 work_atp = 1; |
| ex_cri_desc * work_cri_desc = new(space) ex_cri_desc(5, space); |
| unsigned short hashValIdx = 2; |
| unsigned short encodedProbeDataIdx = 3; |
| unsigned short innerRowDataIdx = 4; |
| |
| // generate code for child tree, and get its tdb and explain tuple. |
| child(0)->codeGen(generator); |
| ComTdb * child_tdb = (ComTdb *)(generator->getGenObj()); |
| ExplainTuple *childExplainTuple = generator->getExplainTuple(); |
| |
| |
| ////////////////////////////////////////////////////// |
| // Generate up to 4 runtime expressions. |
| ////////////////////////////////////////////////////// |
| |
| // Will use child's char. inputs (+ execution count) for the next |
| // two runtime expressions. |
| ValueIdList inputsToUse = child(0).getGroupAttr()->getCharacteristicInputs(); |
| |
| inputsToUse.insert(generator->getOrAddStatementExecutionCount()); |
| |
| // Expression #1 gets the hash value of the probe input data |
| ValueIdList hvAsList; |
| |
| // Executor has hard-coded assumption that the result is long, |
| // so add a Cast node to convert result to a long. |
| |
| ItemExpr *probeHashAsIe = new (generator->wHeap()) |
| HashDistPartHash(inputsToUse.rebuildExprTree(ITM_ITEM_LIST)); |
| |
| probeHashAsIe->bindNode(generator->getBindWA()); |
| |
| NumericType &nTyp = (NumericType &)probeHashAsIe->getValueId().getType(); |
| GenAssert(nTyp.isSigned() == FALSE, |
| "Unexpected signed HashDistPartHash."); |
| |
| GenAssert(probeHashAsIe->getValueId().getType().supportsSQLnullLogical() |
| == FALSE, "Unexpected nullable HashDistPartHash."); |
| |
| ItemExpr *hvAsIe = new (generator->wHeap()) Cast( |
| probeHashAsIe, |
| new (generator->wHeap()) |
| SQLInt(generator->wHeap(), FALSE, // false == unsigned. |
| FALSE // false == not nullable. |
| )); |
| |
| hvAsIe->bindNode(generator->getBindWA()); |
| |
| hvAsList.insert(hvAsIe->getValueId()); |
| |
| ex_expr *hvExpr = NULL; |
| ULng32 hvLength; |
| exp_gen->generateContiguousMoveExpr( |
| hvAsList, |
| 0, // don't add convert node |
| work_atp, |
| hashValIdx, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| hvLength, |
| &hvExpr); |
| |
| GenAssert(hvLength == sizeof(Lng32), |
| "Unexpected length of result of hash function."); |
| |
| // Expression #2 encodes the probe input data for storage in |
| // the ProbeCacheManager. |
| |
| ValueIdList encodeInputAsList; |
| |
| CollIndex inputListIndex; |
| for (inputListIndex = 0; |
| inputListIndex < inputsToUse.entries(); |
| inputListIndex++) { |
| |
| ItemExpr *inputIe = |
| (inputsToUse[inputListIndex].getValueDesc())->getItemExpr(); |
| |
| if (inputIe->getValueId().getType().getVarLenHdrSize() > 0) |
| { |
| // This logic copied from Sort::codeGen(). |
| // Explode varchars by moving them to a fixed field |
| // whose length is equal to the max length of varchar. |
| // 5/8/98: add support for VARNCHAR |
| |
| const CharType& char_type = |
| (CharType&)(inputIe->getValueId().getType()); |
| |
| if (!CollationInfo::isSystemCollation(char_type.getCollation())) |
| { |
| inputIe = new(generator->wHeap()) |
| Cast (inputIe, |
| (new(generator->wHeap()) |
| SQLChar( |
| generator->wHeap(), CharLenInfo(char_type.getStrCharLimit(), char_type.getDataStorageSize()), |
| char_type.supportsSQLnull(), |
| FALSE, FALSE, FALSE, |
| char_type.getCharSet(), |
| char_type.getCollation(), |
| char_type.getCoercibility() |
| ) |
| ) |
| ); |
| } |
| } |
| |
| CompEncode * enode = new(generator->wHeap()) |
| CompEncode(inputIe, FALSE /* ascend/descend doesn't matter*/); |
| |
| enode->bindNode(generator->getBindWA()); |
| encodeInputAsList.insert(enode->getValueId()); |
| } |
| |
| ex_expr *encodeInputExpr = NULL; |
| ULng32 encodedInputLength; |
| exp_gen->generateContiguousMoveExpr(encodeInputAsList, |
| 0, //don't add conv nodes |
| work_atp, encodedProbeDataIdx, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| encodedInputLength, &encodeInputExpr); |
| |
| |
| // Expression #3 moves the inner table data into a buffer pool. |
| // This is also the tuple returned to ProbeCache's parent. |
| |
| ex_expr * innerRecExpr = NULL; |
| ValueIdList innerTableAsList = getGroupAttr()->getCharacteristicOutputs(); |
| |
| ////////////////////////////////////////////////////// |
| // Describe the returned row and add the returned |
| // values to the map table. |
| ////////////////////////////////////////////////////// |
| |
| // determine internal format |
| NABoolean useCif = FALSE; |
| ExpTupleDesc::TupleDataFormat tupleFormat = generator->getInternalFormat(); |
| //tupleFormat = determineInternalFormat( innerTableAsList, this, useCif,generator); |
| |
| ULng32 innerRecLength = 0; |
| ExpTupleDesc * innerRecTupleDesc = 0; |
| MapTable * returnedMapTable = NULL; |
| |
| exp_gen->generateContiguousMoveExpr(innerTableAsList, |
| -1, // do add conv nodes |
| work_atp, innerRowDataIdx, |
| tupleFormat, |
| innerRecLength, &innerRecExpr, |
| &innerRecTupleDesc, ExpTupleDesc::SHORT_FORMAT, |
| &returnedMapTable); |
| |
| returned_desc->setTupleDescriptor(returned_desc->noTuples() - 1, |
| innerRecTupleDesc); |
| |
| // remove all appended map tables and return the returnedMapTable |
| generator->removeAll(last_map_table); |
| generator->appendAtEnd(returnedMapTable); |
| // This returnedMapTable will contain the value ids that are being returned |
| // (the inner table probed). |
| |
| // Massage the atp and atp_index of the innerTableAsList. |
| for (CollIndex i = 0; i < innerTableAsList.entries(); i++) |
| { |
| ValueId innerValId = innerTableAsList[i]; |
| |
| Attributes *attrib = |
| generator->getMapInfo(innerValId)->getAttr(); |
| |
| // All reference to the returned values from this point on |
| // will be at atp = 0, atp_index = last entry in returned desc. |
| attrib->setAtp(0); |
| attrib->setAtpIndex(returned_desc->noTuples() - 1); |
| } |
| |
| // Expression #4 is a selection predicate, to be applied |
| // before returning rows to the parent |
| |
| ex_expr * selectPred = NULL; |
| |
| if (!selectionPred().isEmpty()) |
| { |
| ItemExpr * selPredTree = |
| selectionPred().rebuildExprTree(ITM_AND,TRUE,TRUE); |
| exp_gen->generateExpr(selPredTree->getValueId(), |
| ex_expr::exp_SCAN_PRED, |
| &selectPred); |
| } |
| |
| ////////////////////////////////////////////////////// |
| // Prepare params for ComTdbProbeCache. |
| ////////////////////////////////////////////////////// |
| |
| queue_index pDownSize = (queue_index)getDefault(GEN_PROBE_CACHE_SIZE_DOWN); |
| queue_index pUpSize = (queue_index)getDefault(GEN_PROBE_CACHE_SIZE_UP); |
| |
| // Make sure that the ProbeCache queues can support the childs queues. |
| if(pDownSize < child_tdb->getInitialQueueSizeDown()) { |
| pDownSize = child_tdb->getInitialQueueSizeDown(); |
| pDownSize = MINOF(pDownSize, 32768); |
| } |
| if(pUpSize < child_tdb->getInitialQueueSizeUp()) { |
| pUpSize = child_tdb->getInitialQueueSizeUp(); |
| pUpSize = MINOF(pUpSize, 32768); |
| } |
| |
| ULng32 pcNumEntries = numCachedProbes_; |
| |
| // Number of entries in the probe cache cannot be less than |
| // max parent down queue size. Before testing and adjusting the |
| // max queue size, it is necessary to make sure it is a power of |
| // two, rounding up if necessary. This is to match the logic in |
| // ex_queue::resize. |
| |
| queue_index pdq2 = 1; |
| queue_index bits = pDownSize; |
| while (bits && pdq2 < pDownSize) { |
| bits = bits >> 1; |
| pdq2 = pdq2 << 1; |
| } |
| if (pcNumEntries < pdq2) |
| pcNumEntries = pdq2; |
| |
| numInnerTuples_ = getDefault(GEN_PROBE_CACHE_NUM_INNER); |
| |
| if (innerRecExpr == NULL) |
| { |
| // For semi-join and anti-semi-join, executor need not allocate |
| // a buffer. Set the tdb's buffer size to 0 to be consistent. |
| numInnerTuples_ = 0; |
| } |
| else if (numInnerTuples_ == 0) |
| { |
| // Handle special value, 0, which tells code gen to |
| // decided on buffer size: i.e., large enough to accomodate |
| // all parent up queue entries and all probe cache entries |
| // having a different inner table row. |
| |
| // As we did for the down queue, make sure the up queue size |
| // specified is a power of two. |
| |
| queue_index puq2 = 1; |
| queue_index bits = pUpSize; |
| while (bits && puq2 < pUpSize) { |
| bits = bits >> 1; |
| puq2 = puq2 << 1; |
| } |
| numInnerTuples_ = puq2 + pcNumEntries; |
| } |
| else |
| { |
| // Just use the default. ExSimpleSqlBuffer takes care |
| // that there is room enough for one row. |
| } |
| |
| double memoryLimitPerInstance = |
| ActiveSchemaDB()->getDefaults().getAsLong(EXE_MEMORY_FOR_PROBE_CACHE_IN_MB) * 1024 * 1024; |
| double estimatedMemory; |
| |
| if (numInnerTuples_ > 0) { |
| estimatedMemory = numInnerTuples_ * innerRecLength; |
| if (estimatedMemory > memoryLimitPerInstance) { |
| numInnerTuples_ = memoryLimitPerInstance / innerRecLength; |
| queue_index pUpSize_calc; |
| |
| pUpSize_calc = numInnerTuples_ ; |
| queue_index pq2 = 1; |
| queue_index bits = pUpSize_calc; |
| while (bits && pq2 < pUpSize_calc) { |
| bits = bits >> 1; |
| pq2 = pq2 << 1; |
| } |
| pUpSize = MINOF(pq2/2, 4); |
| pDownSize = MINOF(pq2/2, 4); |
| pcNumEntries = MINOF(numInnerTuples_, 4); |
| numInnerTuples_ = pcNumEntries; |
| numCachedProbes_ = pcNumEntries; |
| } |
| } |
| |
| |
| /////////////////////////////////////////////////////// |
| // Create and initialize the ComTdbProbeCache, generate |
| // the explain tuple, set the new up queue cri desc |
| // and give the generator this newly generated object. |
| /////////////////////////////////////////////////////// |
| |
| ComTdbProbeCache *probeCacheTdb = new (space) ComTdbProbeCache ( |
| hvExpr, |
| encodeInputExpr, |
| innerRecExpr, |
| selectPred, |
| encodedInputLength, |
| innerRecLength, |
| pcNumEntries, |
| returned_desc->noTuples() - 1, |
| hashValIdx, |
| encodedProbeDataIdx, |
| innerRowDataIdx, |
| child_tdb, |
| given_desc, |
| returned_desc, |
| pDownSize, |
| pUpSize, |
| (Cardinality) |
| (getInputCardinality() * getEstRowsUsed()).getValue(), |
| numInnerTuples_) ; |
| |
| generator->initTdbFields(probeCacheTdb); |
| |
| // Make sure that the ProbeCache queues were not set too large |
| if ((probeCacheTdb->getInitialQueueSizeDown() > pDownSize) || |
| (probeCacheTdb->getInitialQueueSizeUp() > pUpSize)) { |
| probeCacheTdb->setQueueResizeParams(pDownSize, |
| pUpSize, |
| probeCacheTdb->getQueueResizeLimit(), |
| probeCacheTdb->getQueueResizeFactor()); |
| } |
| |
| double probeCacheMemEst = getEstimatedRunTimeMemoryUsage(generator, probeCacheTdb); |
| generator->addToTotalEstimatedMemory(probeCacheMemEst); |
| Lng32 pcMemEstInKBPerNode = getEstimatedRunTimeMemoryUsage(generator, TRUE).value() / 1024; |
| if(!generator->explainDisabled()) { |
| generator->setExplainTuple( |
| addExplainInfo(probeCacheTdb, childExplainTuple, 0, generator)); |
| } |
| |
| generator->setCriDesc(returned_desc, Generator::UP); |
| |
| generator->setGenObj(this, probeCacheTdb); |
| |
| return 0; |
| } |
| |
| CostScalar ProbeCache::getEstimatedRunTimeMemoryUsage(Generator *generator, NABoolean perNode, Lng32 *numStreams) |
| { |
| const Lng32 probeSize = |
| getGroupAttr()->getCharacteristicInputs().getRowLength(); |
| const Lng32 numCacheEntries = |
| ActiveSchemaDB()->getDefaults().getAsLong(GEN_PROBE_CACHE_NUM_ENTRIES); |
| const Int32 perProbeOverhead = 32 ; // bytes |
| const double cacheSize = (probeSize + perProbeOverhead) * numCacheEntries; // in bytes |
| |
| const Lng32 resultSize = |
| getGroupAttr()->getCharacteristicOutputs().getRowLength(); |
| Lng32 numBufferPoolEntries = |
| ActiveSchemaDB()->getDefaults().getAsLong(GEN_PROBE_CACHE_NUM_INNER); |
| if (numBufferPoolEntries == 0) |
| { |
| numBufferPoolEntries = numCacheEntries + |
| ActiveSchemaDB()->getDefaults().getAsLong(GEN_PROBE_CACHE_SIZE_UP); |
| } |
| const double outputBufferSize = resultSize * numBufferPoolEntries; // in bytes |
| |
| // totalMemory is perNode at this point of time. |
| double totalMemory = cacheSize + outputBufferSize; |
| Lng32 numOfStreams = 1; |
| const PhysicalProperty* const phyProp = getPhysicalProperty(); |
| if (phyProp) |
| { |
| PartitioningFunction * partFunc = phyProp -> getPartitioningFunction() ; |
| numOfStreams = partFunc->getCountOfPartitions(); |
| if (numOfStreams <= 0) |
| numOfStreams = 1; |
| double memoryLimitPerInstance = |
| ActiveSchemaDB()->getDefaults().getAsLong(EXE_MEMORY_FOR_PROBE_CACHE_IN_MB) * 1024 * 1024; |
| if (totalMemory > memoryLimitPerInstance) |
| totalMemory = memoryLimitPerInstance; |
| totalMemory *= numOfStreams; |
| } |
| if (numStreams != NULL) |
| *numStreams = numOfStreams; |
| if (perNode) |
| totalMemory /= MINOF(MAXOF(((NAClusterInfoLinux*)gpClusterInfo)->getTotalNumberOfCPUs(), 1), numOfStreams); |
| else |
| totalMemory /= numOfStreams; |
| return totalMemory; |
| } |
| |
| double ProbeCache::getEstimatedRunTimeMemoryUsage(Generator *generator, ComTdb * tdb) |
| { |
| // tdb is ignored for ProbeCache because this operator |
| // does not participate in the BMO quota system. |
| Lng32 numOfStreams = 1; |
| CostScalar totalMemory = getEstimatedRunTimeMemoryUsage(generator, FALSE, &numOfStreams); |
| totalMemory = totalMemory * numOfStreams ; |
| return totalMemory.value(); |
| } |
| |