| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| #include "GroupAttr.h" |
| #include "AllRelExpr.h" |
| #include "RelPackedRows.h" |
| #include "Generator.h" |
| #include "GenExpGenerator.h" |
| #include "ExpCriDesc.h" |
| #include "ComTdbUnPackRows.h" |
| #include "ex_queue.h" |
| |
| // PhysUnPackRows::preCodeGen() ------------------------------------------- |
| // Perform local query rewrites such as for the creation and |
| // population of intermediate tables, for accessing partitioned |
| // data. Rewrite the value expressions after minimizing the dataflow |
| // using the transitive closure of equality predicates. |
| // |
| // PhysUnPackRows::preCodeGen() - is basically the same as the RelExpr:: |
| // preCodeGen() except that here we replace the VEG references in the |
| // unPackExpr() and packingFactor(), as well as the selectionPred(). |
| // |
| // Parameters: |
| // |
| // Generator *generator |
| // IN/OUT : A pointer to the generator object which contains the state, |
| // and tools (e.g. expression generator) to generate code for |
| // this node. |
| // |
| // ValueIdSet &externalInputs |
| // IN : The set of external Inputs available to this node. |
| // |
| // |
| RelExpr * PhysUnPackRows::preCodeGen(Generator * generator, |
| const ValueIdSet & externalInputs, |
| ValueIdSet &pulledNewInputs) |
| { |
| if (nodeIsPreCodeGenned()) |
| return this; |
| |
| // Resolve the VEGReferences and VEGPredicates, if any, that appear |
| // in the Characteristic Inputs, in terms of the externalInputs. |
| // |
| getGroupAttr()->resolveCharacteristicInputs(externalInputs); |
| |
| // My Characteristic Inputs become the external inputs for my children. |
| // |
| ValueIdSet childPulledInputs; |
| |
| child(0) = child(0)->preCodeGen(generator, externalInputs, pulledNewInputs); |
| if (! child(0).getPtr()) |
| return NULL; |
| |
| // process additional input value ids the child wants |
| getGroupAttr()->addCharacteristicInputs(childPulledInputs); |
| pulledNewInputs += childPulledInputs; |
| |
| // The VEG expressions in the selection predicates and the characteristic |
| // outputs can reference any expression that is either a potential output |
| // or a characteristic input for this RelExpr. Supply these values for |
| // rewriting the VEG expressions. |
| // |
| ValueIdSet availableValues; |
| getInputValuesFromParentAndChildren(availableValues); |
| |
| // The unPackExpr() and packingFactor() expressions have access |
| // to only the Input Values. These can come from the parent or be |
| // the outputs of the child. |
| // |
| unPackExpr(). |
| replaceVEGExpressions(availableValues, |
| getGroupAttr()->getCharacteristicInputs()); |
| |
| packingFactor(). |
| replaceVEGExpressions(availableValues, |
| getGroupAttr()->getCharacteristicInputs()); |
| |
| if (rowwiseRowset()) |
| { |
| if (rwrsInputSizeExpr()) |
| ((ItemExpr*)rwrsInputSizeExpr())-> |
| replaceVEGExpressions(availableValues, |
| getGroupAttr()->getCharacteristicInputs()); |
| |
| if (rwrsMaxInputRowlenExpr()) |
| ((ItemExpr*)rwrsMaxInputRowlenExpr())-> |
| replaceVEGExpressions(availableValues, |
| getGroupAttr()->getCharacteristicInputs()); |
| |
| if (rwrsBufferAddrExpr()) |
| ((ItemExpr*)rwrsBufferAddrExpr())-> |
| replaceVEGExpressions(availableValues, |
| getGroupAttr()->getCharacteristicInputs()); |
| } |
| |
| // The selectionPred has access to only the output values generated by |
| // UnPackRows and input values from the parent. |
| // |
| getInputAndPotentialOutputValues(availableValues); |
| |
| // Rewrite the selection predicates. |
| // |
| NABoolean replicatePredicates = TRUE; |
| selectionPred().replaceVEGExpressions |
| (availableValues, |
| getGroupAttr()->getCharacteristicInputs(), |
| FALSE, // no key predicates here |
| 0 /* no need for idempotence here */, |
| replicatePredicates |
| ); |
| |
| // Replace VEG references in the outputs and remove redundant |
| // outputs. |
| // |
| getGroupAttr()->resolveCharacteristicOutputs |
| (availableValues, |
| getGroupAttr()->getCharacteristicInputs()); |
| |
| Lng32 memLimit = (Lng32)CmpCommon::getDefaultNumeric(MEMORY_LIMIT_ROWSET_IN_MB); |
| if (memLimit > 0) |
| { |
| Lng32 rowLength = getGroupAttr()->getCharacteristicOutputs().getRowLength(); |
| Lng32 rowsetSize = getGroupAttr()->getOutputLogPropList()[0]->getResultCardinality().value() ; |
| Lng32 memNeededMB = (rowLength * rowsetSize)/(1024 * 1024); |
| if (memLimit < memNeededMB) |
| { |
| *CmpCommon::diags() << DgSqlCode(-30050) << DgInt0(memNeededMB) |
| << DgInt1(memLimit) << DgInt2(rowLength) |
| << DgInt3(rowsetSize); |
| GenExit(); |
| return NULL; |
| } |
| } |
| |
| generator->oltOptInfo()->setMultipleRowsReturned(TRUE); |
| |
| markAsPreCodeGenned(); |
| |
| return this; |
| } // PhysUnPackRows::preCodeGen |
| |
| |
| short |
| PhysUnPackRows::codeGen(Generator *generator) |
| { |
| // Get handles on expression generator, map table, and heap allocator |
| // |
| ExpGenerator *expGen = generator->getExpGenerator(); |
| Space *space = generator->getSpace(); |
| |
| // Allocate a new map table for this operation |
| // |
| MapTable *localMapTable = generator->appendAtEnd(); |
| |
| // Generate the child and capture the task definition block and a description |
| // of the reply composite row layout and the explain information. |
| // |
| child(0)->codeGen(generator); |
| |
| ComTdb *childTdb = (ComTdb*)(generator->getGenObj()); |
| |
| ex_cri_desc *childCriDesc = generator->getCriDesc(Generator::UP); |
| |
| ExplainTuple *childExplainTuple = generator->getExplainTuple(); |
| |
| // Make all of my child's outputs map to ATP 1. Since they are |
| // not needed above, they will not be in the work ATP (0). |
| // (Later, they will be removed from the map table) |
| // |
| localMapTable->setAllAtp(1); |
| |
| // Generate the given and returned composite row descriptors. |
| // unPackRows adds a tupp (for the generated outputs) to the |
| // row given by the parent. The workAtp will have the 2 more |
| // tupps (1 for the generated outputs and another for the |
| // indexValue) than the given. |
| // |
| ex_cri_desc *givenCriDesc = generator->getCriDesc(Generator::DOWN); |
| |
| ex_cri_desc *returnedCriDesc = |
| new(space) ex_cri_desc(givenCriDesc->noTuples() + 1, space); |
| |
| ex_cri_desc *workCriDesc = |
| new(space) ex_cri_desc(givenCriDesc->noTuples() + 2, space); |
| |
| |
| // unPackCols is the next to the last Tp in Atp 0, the work ATP. |
| // and the last Tp in the returned ATP. |
| // |
| const Int32 unPackColsAtpIndex = workCriDesc->noTuples() - 2; |
| const Int32 unPackColsAtp = 0; |
| |
| // The length of the new tuple which will contain the columns |
| // generated by unPackRows |
| // |
| ULng32 unPackColsTupleLen; |
| |
| // The Tuple Desc describing the tuple containing the new unPacked columns |
| // It is generated when the expression is generated. |
| // |
| ExpTupleDesc *unPackColsTupleDesc = 0; |
| |
| // indexValue is the last Tp in Atp 0, the work ATP. |
| // |
| const Int32 indexValueAtpIndex = workCriDesc->noTuples() - 1; |
| const Int32 indexValueAtp = 0; |
| |
| // The length of the work tuple which will contain the value |
| // of the index. This should always be sizeof(int). |
| // |
| ULng32 indexValueTupleLen = 0; |
| |
| // The Tuple Desc describing the tuple containing the new unPacked columns |
| // It is generated when the expression is generated. |
| // |
| ExpTupleDesc *indexValueTupleDesc = 0; |
| |
| ValueIdList indexValueList; |
| |
| if (indexValue() != NULL_VALUE_ID) |
| { |
| indexValueList.insert(indexValue()); |
| |
| expGen->processValIdList(indexValueList, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| indexValueTupleLen, |
| indexValueAtp, |
| indexValueAtpIndex, |
| &indexValueTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT); |
| |
| GenAssert(indexValueTupleLen == sizeof(Int32), |
| "UnPackRows::codeGen: Internal Error"); |
| } |
| |
| // If a packingFactor exists, generate a move expression for this. |
| // It is assumed that the packingFactor expression evaluates to a |
| // 4 byte integer. |
| // |
| ex_expr *packingFactorExpr = 0; |
| ULng32 packingFactorTupleLen; |
| |
| if(packingFactor().entries() > 0) { |
| expGen->generateContiguousMoveExpr(packingFactor(), |
| -1, |
| unPackColsAtp, |
| unPackColsAtpIndex, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| packingFactorTupleLen, |
| &packingFactorExpr); |
| |
| GenAssert(packingFactorTupleLen == sizeof(Int32), |
| "UnPackRows::codeGen: Internal Error"); |
| } |
| |
| // Generate the UnPack expressions. |
| // |
| // characteristicOutputs() - refers to the list of expressions |
| // to be move to another tuple. |
| // |
| // 0 - Do not add conv. nodes. |
| // |
| // unPackColsAtp - this expression will move data to the |
| // unPackColsAtp (0) ATP. |
| // |
| // unPackColsAtpIndex - within the unPackColsAtp (0) ATP, the destination |
| // for this move expression will be the unPackColsAtpIndex TP. This should |
| // be the next to the last TP of the work ATP. (The indexValue will be in |
| // the last position) |
| // |
| // SQLARK_EXPLODED_FORMAT - generate the move expression to construct |
| // the destination tuple in EXPLODED FORMAT. |
| // |
| // unPackColsTupleLen - This is an output which will contain the length |
| // of the destination Tuple. |
| // |
| // &unPackColsExpr - The address of the pointer to the expression |
| // which will be generated. |
| // |
| // &unPackColsTupleDesc - The address of the tuple descriptor which is |
| // generated. This describes the destination tuple of the move expression. |
| // |
| // SHORT_FORMAT - generate the unPackColsTupleDesc in the SHORT FORMAT. |
| // |
| ex_expr *unPackColsExpr = 0; |
| |
| expGen-> |
| genGuardedContigMoveExpr(selectionPred(), |
| getGroupAttr()->getCharacteristicOutputs(), |
| 0, // No Convert Nodes added |
| unPackColsAtp, |
| unPackColsAtpIndex, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| unPackColsTupleLen, |
| &unPackColsExpr, |
| &unPackColsTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT); |
| |
| workCriDesc->setTupleDescriptor(unPackColsAtpIndex, |
| unPackColsTupleDesc); |
| |
| returnedCriDesc->setTupleDescriptor(unPackColsAtpIndex, |
| unPackColsTupleDesc); |
| |
| |
| // expressions for rowwise rowset implementation. |
| ex_expr * rwrsInputSizeExpr = 0; |
| ex_expr * rwrsMaxInputRowlenExpr = 0; |
| ex_expr * rwrsBufferAddrExpr = 0; |
| ULng32 rwrsInputSizeExprLen = 0; |
| ULng32 rwrsMaxInputRowlenExprLen = 0; |
| ULng32 rwrsBufferAddrExprLen = 0; |
| |
| const Int32 rwrsAtp = 1; |
| const Int32 rwrsAtpIndex = workCriDesc->noTuples() - 2; |
| ExpTupleDesc * rwrsTupleDesc = 0; |
| ValueIdList rwrsVidList; |
| if (rowwiseRowset()) |
| { |
| rwrsVidList.insert(this->rwrsInputSizeExpr()->getValueId()); |
| expGen->generateContiguousMoveExpr(rwrsVidList, |
| 0 /*don't add conv nodes*/, |
| rwrsAtp, rwrsAtpIndex, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| rwrsInputSizeExprLen, |
| &rwrsInputSizeExpr, |
| &rwrsTupleDesc,ExpTupleDesc::SHORT_FORMAT); |
| |
| rwrsVidList.clear(); |
| rwrsVidList.insert(this->rwrsMaxInputRowlenExpr()->getValueId()); |
| expGen->generateContiguousMoveExpr(rwrsVidList, |
| 0 /*don't add conv nodes*/, |
| rwrsAtp, rwrsAtpIndex, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| rwrsMaxInputRowlenExprLen, |
| &rwrsMaxInputRowlenExpr, |
| &rwrsTupleDesc,ExpTupleDesc::SHORT_FORMAT); |
| |
| rwrsVidList.clear(); |
| rwrsVidList.insert(this->rwrsBufferAddrExpr()->getValueId()); |
| expGen->generateContiguousMoveExpr(rwrsVidList, |
| 0 /*don't add conv nodes*/, |
| rwrsAtp, rwrsAtpIndex, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| rwrsBufferAddrExprLen, |
| &rwrsBufferAddrExpr, |
| &rwrsTupleDesc,ExpTupleDesc::SHORT_FORMAT); |
| |
| expGen->assignAtpAndAtpIndex(rwrsOutputVids(), |
| unPackColsAtp, unPackColsAtpIndex); |
| } |
| |
| // Move the generated maptable entries, to the localMapTable, |
| // so that all other entries can later be removed. |
| // |
| for(ValueId outputValId = getGroupAttr()->getCharacteristicOutputs().init(); |
| getGroupAttr()->getCharacteristicOutputs().next(outputValId); |
| getGroupAttr()->getCharacteristicOutputs().advance(outputValId)) { |
| |
| generator->addMapInfoToThis(localMapTable, outputValId, |
| generator->getMapInfo(outputValId)-> |
| getAttr()); |
| |
| // Indicate that code was generated for this map table entry. |
| // |
| generator->getMapInfoFromThis(localMapTable, outputValId)->codeGenerated(); |
| } |
| |
| NABoolean tolerateNonFatalError = FALSE; |
| |
| if (isRowsetIterator() && (generator->getTolerateNonFatalError())) { |
| tolerateNonFatalError = TRUE; |
| setTolerateNonFatalError(RelExpr::NOT_ATOMIC_); |
| } |
| |
| |
| // Allocate the UnPack TDB |
| // |
| ComTdbUnPackRows *unPackTdb = NULL; |
| |
| if (rowwiseRowset()) |
| { |
| unPackTdb = |
| new (space) ComTdbUnPackRows(NULL, //childTdb, |
| rwrsInputSizeExpr, |
| rwrsMaxInputRowlenExpr, |
| rwrsBufferAddrExpr, |
| rwrsAtpIndex, |
| givenCriDesc, |
| returnedCriDesc, |
| workCriDesc, |
| 16, |
| 1024, |
| (Cardinality) getGroupAttr()-> |
| getOutputLogPropList()[0]-> |
| getResultCardinality().value(), |
| 2, 20000); |
| } |
| else |
| { |
| |
| // Base the initial queue size on the est. cardinality. |
| // UnPackRows does not do dyn queue resize, so passed in |
| // queue size values represent initial (and final) queue |
| // sizes (not max queue sizes). |
| // |
| ULng32 rowsetSize = |
| getGroupAttr()->getOutputLogPropList()[0]->getResultCardinality().value(); |
| double memoryLimitPerInstance = |
| (ActiveSchemaDB()->getDefaults().getAsLong(EXE_MEMORY_FOR_UNPACK_ROWS_IN_MB) * 1024 * 1024)/2; // At runtime (ExUnpackRowsTcb::ExUnpackRowsTcb) we allocate twice the size of the queue. So ensure it fits in to half the specified limit. |
| |
| rowsetSize = (rowsetSize < 1024 ? 1024 : rowsetSize); |
| |
| double estimatedMemory = (Int64)rowsetSize * (Int64)unPackColsTupleLen; |
| |
| if (estimatedMemory > memoryLimitPerInstance) |
| { |
| estimatedMemory = memoryLimitPerInstance; |
| rowsetSize = estimatedMemory / unPackColsTupleLen; |
| rowsetSize = MAXOF(rowsetSize, 1); |
| } |
| |
| queue_index upQueueSize = ex_queue::roundUp2Power((queue_index)rowsetSize); |
| |
| unPackTdb = |
| new (space) ComTdbUnPackRows(childTdb, |
| packingFactorExpr, |
| unPackColsExpr, |
| unPackColsTupleLen, |
| unPackColsAtpIndex, |
| indexValueAtpIndex, |
| givenCriDesc, |
| returnedCriDesc, |
| workCriDesc, |
| 16, |
| upQueueSize, |
| (Cardinality) getGroupAttr()-> |
| getOutputLogPropList()[0]-> |
| getResultCardinality().value(), |
| isRowsetIterator(), |
| tolerateNonFatalError); |
| } |
| |
| generator->initTdbFields(unPackTdb); |
| |
| // Remove child's outputs from mapTable, They are not needed |
| // above. |
| // |
| generator->removeAll(localMapTable); |
| |
| //Turn off override of queue sizes for unpack. Once set they should remain. |
| generator->setMakeOnljLeftQueuesBig(FALSE); |
| // Add the explain Information for this node to the EXPLAIN |
| // Fragment. Set the explainTuple pointer in the generator so |
| // the parent of this node can get a handle on this explainTuple. |
| // |
| if(!generator->explainDisabled()) { |
| generator->setExplainTuple(addExplainInfo(unPackTdb, |
| childExplainTuple, |
| 0, |
| generator)); |
| } |
| |
| // Restore the Cri Desc's and set the return object. |
| // |
| generator->setCriDesc(givenCriDesc, Generator::DOWN); |
| generator->setCriDesc(returnedCriDesc, Generator::UP); |
| generator->setGenObj(this, unPackTdb); |
| |
| |
| return 0; |
| } |