blob: cc3e6fc1c1c74e95486b7d3f3094a34b5f244a2e [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
#include "GroupAttr.h"
#include "AllRelExpr.h"
#include "RelPackedRows.h"
#include "Generator.h"
#include "GenExpGenerator.h"
#include "ExpCriDesc.h"
#include "ComTdbUnPackRows.h"
#include "ex_queue.h"
// PhysUnPackRows::preCodeGen() -------------------------------------------
// Perform local query rewrites such as for the creation and
// population of intermediate tables, for accessing partitioned
// data. Rewrite the value expressions after minimizing the dataflow
// using the transitive closure of equality predicates.
//
// PhysUnPackRows::preCodeGen() - is basically the same as the RelExpr::
// preCodeGen() except that here we replace the VEG references in the
// unPackExpr() and packingFactor(), as well as the selectionPred().
//
// Parameters:
//
// Generator *generator
// IN/OUT : A pointer to the generator object which contains the state,
// and tools (e.g. expression generator) to generate code for
// this node.
//
// ValueIdSet &externalInputs
// IN : The set of external Inputs available to this node.
//
//
RelExpr * PhysUnPackRows::preCodeGen(Generator * generator,
const ValueIdSet & externalInputs,
ValueIdSet &pulledNewInputs)
{
if (nodeIsPreCodeGenned())
return this;
// Resolve the VEGReferences and VEGPredicates, if any, that appear
// in the Characteristic Inputs, in terms of the externalInputs.
//
getGroupAttr()->resolveCharacteristicInputs(externalInputs);
// My Characteristic Inputs become the external inputs for my children.
//
ValueIdSet childPulledInputs;
child(0) = child(0)->preCodeGen(generator, externalInputs, pulledNewInputs);
if (! child(0).getPtr())
return NULL;
// process additional input value ids the child wants
getGroupAttr()->addCharacteristicInputs(childPulledInputs);
pulledNewInputs += childPulledInputs;
// The VEG expressions in the selection predicates and the characteristic
// outputs can reference any expression that is either a potential output
// or a characteristic input for this RelExpr. Supply these values for
// rewriting the VEG expressions.
//
ValueIdSet availableValues;
getInputValuesFromParentAndChildren(availableValues);
// The unPackExpr() and packingFactor() expressions have access
// to only the Input Values. These can come from the parent or be
// the outputs of the child.
//
unPackExpr().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
packingFactor().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
if (rowwiseRowset())
{
if (rwrsInputSizeExpr())
((ItemExpr*)rwrsInputSizeExpr())->
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
if (rwrsMaxInputRowlenExpr())
((ItemExpr*)rwrsMaxInputRowlenExpr())->
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
if (rwrsBufferAddrExpr())
((ItemExpr*)rwrsBufferAddrExpr())->
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
}
// The selectionPred has access to only the output values generated by
// UnPackRows and input values from the parent.
//
getInputAndPotentialOutputValues(availableValues);
// Rewrite the selection predicates.
//
NABoolean replicatePredicates = TRUE;
selectionPred().replaceVEGExpressions
(availableValues,
getGroupAttr()->getCharacteristicInputs(),
FALSE, // no key predicates here
0 /* no need for idempotence here */,
replicatePredicates
);
// Replace VEG references in the outputs and remove redundant
// outputs.
//
getGroupAttr()->resolveCharacteristicOutputs
(availableValues,
getGroupAttr()->getCharacteristicInputs());
Lng32 memLimit = (Lng32)CmpCommon::getDefaultNumeric(MEMORY_LIMIT_ROWSET_IN_MB);
if (memLimit > 0)
{
Lng32 rowLength = getGroupAttr()->getCharacteristicOutputs().getRowLength();
Lng32 rowsetSize = getGroupAttr()->getOutputLogPropList()[0]->getResultCardinality().value() ;
Lng32 memNeededMB = (rowLength * rowsetSize)/(1024 * 1024);
if (memLimit < memNeededMB)
{
*CmpCommon::diags() << DgSqlCode(-30050) << DgInt0(memNeededMB)
<< DgInt1(memLimit) << DgInt2(rowLength)
<< DgInt3(rowsetSize);
GenExit();
return NULL;
}
}
generator->oltOptInfo()->setMultipleRowsReturned(TRUE);
markAsPreCodeGenned();
return this;
} // PhysUnPackRows::preCodeGen
short
PhysUnPackRows::codeGen(Generator *generator)
{
// Get handles on expression generator, map table, and heap allocator
//
ExpGenerator *expGen = generator->getExpGenerator();
Space *space = generator->getSpace();
// Allocate a new map table for this operation
//
MapTable *localMapTable = generator->appendAtEnd();
// Generate the child and capture the task definition block and a description
// of the reply composite row layout and the explain information.
//
child(0)->codeGen(generator);
ComTdb *childTdb = (ComTdb*)(generator->getGenObj());
ex_cri_desc *childCriDesc = generator->getCriDesc(Generator::UP);
ExplainTuple *childExplainTuple = generator->getExplainTuple();
// Make all of my child's outputs map to ATP 1. Since they are
// not needed above, they will not be in the work ATP (0).
// (Later, they will be removed from the map table)
//
localMapTable->setAllAtp(1);
// Generate the given and returned composite row descriptors.
// unPackRows adds a tupp (for the generated outputs) to the
// row given by the parent. The workAtp will have the 2 more
// tupps (1 for the generated outputs and another for the
// indexValue) than the given.
//
ex_cri_desc *givenCriDesc = generator->getCriDesc(Generator::DOWN);
ex_cri_desc *returnedCriDesc =
new(space) ex_cri_desc(givenCriDesc->noTuples() + 1, space);
ex_cri_desc *workCriDesc =
new(space) ex_cri_desc(givenCriDesc->noTuples() + 2, space);
// unPackCols is the next to the last Tp in Atp 0, the work ATP.
// and the last Tp in the returned ATP.
//
const Int32 unPackColsAtpIndex = workCriDesc->noTuples() - 2;
const Int32 unPackColsAtp = 0;
// The length of the new tuple which will contain the columns
// generated by unPackRows
//
ULng32 unPackColsTupleLen;
// The Tuple Desc describing the tuple containing the new unPacked columns
// It is generated when the expression is generated.
//
ExpTupleDesc *unPackColsTupleDesc = 0;
// indexValue is the last Tp in Atp 0, the work ATP.
//
const Int32 indexValueAtpIndex = workCriDesc->noTuples() - 1;
const Int32 indexValueAtp = 0;
// The length of the work tuple which will contain the value
// of the index. This should always be sizeof(int).
//
ULng32 indexValueTupleLen = 0;
// The Tuple Desc describing the tuple containing the new unPacked columns
// It is generated when the expression is generated.
//
ExpTupleDesc *indexValueTupleDesc = 0;
ValueIdList indexValueList;
if (indexValue() != NULL_VALUE_ID)
{
indexValueList.insert(indexValue());
expGen->processValIdList(indexValueList,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
indexValueTupleLen,
indexValueAtp,
indexValueAtpIndex,
&indexValueTupleDesc,
ExpTupleDesc::SHORT_FORMAT);
GenAssert(indexValueTupleLen == sizeof(Int32),
"UnPackRows::codeGen: Internal Error");
}
// If a packingFactor exists, generate a move expression for this.
// It is assumed that the packingFactor expression evaluates to a
// 4 byte integer.
//
ex_expr *packingFactorExpr = 0;
ULng32 packingFactorTupleLen;
if(packingFactor().entries() > 0) {
expGen->generateContiguousMoveExpr(packingFactor(),
-1,
unPackColsAtp,
unPackColsAtpIndex,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
packingFactorTupleLen,
&packingFactorExpr);
GenAssert(packingFactorTupleLen == sizeof(Int32),
"UnPackRows::codeGen: Internal Error");
}
// Generate the UnPack expressions.
//
// characteristicOutputs() - refers to the list of expressions
// to be move to another tuple.
//
// 0 - Do not add conv. nodes.
//
// unPackColsAtp - this expression will move data to the
// unPackColsAtp (0) ATP.
//
// unPackColsAtpIndex - within the unPackColsAtp (0) ATP, the destination
// for this move expression will be the unPackColsAtpIndex TP. This should
// be the next to the last TP of the work ATP. (The indexValue will be in
// the last position)
//
// SQLARK_EXPLODED_FORMAT - generate the move expression to construct
// the destination tuple in EXPLODED FORMAT.
//
// unPackColsTupleLen - This is an output which will contain the length
// of the destination Tuple.
//
// &unPackColsExpr - The address of the pointer to the expression
// which will be generated.
//
// &unPackColsTupleDesc - The address of the tuple descriptor which is
// generated. This describes the destination tuple of the move expression.
//
// SHORT_FORMAT - generate the unPackColsTupleDesc in the SHORT FORMAT.
//
ex_expr *unPackColsExpr = 0;
expGen->
genGuardedContigMoveExpr(selectionPred(),
getGroupAttr()->getCharacteristicOutputs(),
0, // No Convert Nodes added
unPackColsAtp,
unPackColsAtpIndex,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
unPackColsTupleLen,
&unPackColsExpr,
&unPackColsTupleDesc,
ExpTupleDesc::SHORT_FORMAT);
workCriDesc->setTupleDescriptor(unPackColsAtpIndex,
unPackColsTupleDesc);
returnedCriDesc->setTupleDescriptor(unPackColsAtpIndex,
unPackColsTupleDesc);
// expressions for rowwise rowset implementation.
ex_expr * rwrsInputSizeExpr = 0;
ex_expr * rwrsMaxInputRowlenExpr = 0;
ex_expr * rwrsBufferAddrExpr = 0;
ULng32 rwrsInputSizeExprLen = 0;
ULng32 rwrsMaxInputRowlenExprLen = 0;
ULng32 rwrsBufferAddrExprLen = 0;
const Int32 rwrsAtp = 1;
const Int32 rwrsAtpIndex = workCriDesc->noTuples() - 2;
ExpTupleDesc * rwrsTupleDesc = 0;
ValueIdList rwrsVidList;
if (rowwiseRowset())
{
rwrsVidList.insert(this->rwrsInputSizeExpr()->getValueId());
expGen->generateContiguousMoveExpr(rwrsVidList,
0 /*don't add conv nodes*/,
rwrsAtp, rwrsAtpIndex,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
rwrsInputSizeExprLen,
&rwrsInputSizeExpr,
&rwrsTupleDesc,ExpTupleDesc::SHORT_FORMAT);
rwrsVidList.clear();
rwrsVidList.insert(this->rwrsMaxInputRowlenExpr()->getValueId());
expGen->generateContiguousMoveExpr(rwrsVidList,
0 /*don't add conv nodes*/,
rwrsAtp, rwrsAtpIndex,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
rwrsMaxInputRowlenExprLen,
&rwrsMaxInputRowlenExpr,
&rwrsTupleDesc,ExpTupleDesc::SHORT_FORMAT);
rwrsVidList.clear();
rwrsVidList.insert(this->rwrsBufferAddrExpr()->getValueId());
expGen->generateContiguousMoveExpr(rwrsVidList,
0 /*don't add conv nodes*/,
rwrsAtp, rwrsAtpIndex,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
rwrsBufferAddrExprLen,
&rwrsBufferAddrExpr,
&rwrsTupleDesc,ExpTupleDesc::SHORT_FORMAT);
expGen->assignAtpAndAtpIndex(rwrsOutputVids(),
unPackColsAtp, unPackColsAtpIndex);
}
// Move the generated maptable entries, to the localMapTable,
// so that all other entries can later be removed.
//
for(ValueId outputValId = getGroupAttr()->getCharacteristicOutputs().init();
getGroupAttr()->getCharacteristicOutputs().next(outputValId);
getGroupAttr()->getCharacteristicOutputs().advance(outputValId)) {
generator->addMapInfoToThis(localMapTable, outputValId,
generator->getMapInfo(outputValId)->
getAttr());
// Indicate that code was generated for this map table entry.
//
generator->getMapInfoFromThis(localMapTable, outputValId)->codeGenerated();
}
NABoolean tolerateNonFatalError = FALSE;
if (isRowsetIterator() && (generator->getTolerateNonFatalError())) {
tolerateNonFatalError = TRUE;
setTolerateNonFatalError(RelExpr::NOT_ATOMIC_);
}
// Allocate the UnPack TDB
//
ComTdbUnPackRows *unPackTdb = NULL;
if (rowwiseRowset())
{
unPackTdb =
new (space) ComTdbUnPackRows(NULL, //childTdb,
rwrsInputSizeExpr,
rwrsMaxInputRowlenExpr,
rwrsBufferAddrExpr,
rwrsAtpIndex,
givenCriDesc,
returnedCriDesc,
workCriDesc,
16,
1024,
(Cardinality) getGroupAttr()->
getOutputLogPropList()[0]->
getResultCardinality().value(),
2, 20000);
}
else
{
// Base the initial queue size on the est. cardinality.
// UnPackRows does not do dyn queue resize, so passed in
// queue size values represent initial (and final) queue
// sizes (not max queue sizes).
//
ULng32 rowsetSize =
getGroupAttr()->getOutputLogPropList()[0]->getResultCardinality().value();
double memoryLimitPerInstance =
(ActiveSchemaDB()->getDefaults().getAsLong(EXE_MEMORY_FOR_UNPACK_ROWS_IN_MB) * 1024 * 1024)/2; // At runtime (ExUnpackRowsTcb::ExUnpackRowsTcb) we allocate twice the size of the queue. So ensure it fits in to half the specified limit.
rowsetSize = (rowsetSize < 1024 ? 1024 : rowsetSize);
double estimatedMemory = (Int64)rowsetSize * (Int64)unPackColsTupleLen;
if (estimatedMemory > memoryLimitPerInstance)
{
estimatedMemory = memoryLimitPerInstance;
rowsetSize = estimatedMemory / unPackColsTupleLen;
rowsetSize = MAXOF(rowsetSize, 1);
}
queue_index upQueueSize = ex_queue::roundUp2Power((queue_index)rowsetSize);
unPackTdb =
new (space) ComTdbUnPackRows(childTdb,
packingFactorExpr,
unPackColsExpr,
unPackColsTupleLen,
unPackColsAtpIndex,
indexValueAtpIndex,
givenCriDesc,
returnedCriDesc,
workCriDesc,
16,
upQueueSize,
(Cardinality) getGroupAttr()->
getOutputLogPropList()[0]->
getResultCardinality().value(),
isRowsetIterator(),
tolerateNonFatalError);
}
generator->initTdbFields(unPackTdb);
// Remove child's outputs from mapTable, They are not needed
// above.
//
generator->removeAll(localMapTable);
//Turn off override of queue sizes for unpack. Once set they should remain.
generator->setMakeOnljLeftQueuesBig(FALSE);
// Add the explain Information for this node to the EXPLAIN
// Fragment. Set the explainTuple pointer in the generator so
// the parent of this node can get a handle on this explainTuple.
//
if(!generator->explainDisabled()) {
generator->setExplainTuple(addExplainInfo(unPackTdb,
childExplainTuple,
0,
generator));
}
// Restore the Cri Desc's and set the return object.
//
generator->setCriDesc(givenCriDesc, Generator::DOWN);
generator->setCriDesc(returnedCriDesc, Generator::UP);
generator->setGenObj(this, unPackTdb);
return 0;
}