blob: 41bfc202b21102c5200236056d625e37bdf5da2b [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
#include "GroupAttr.h"
#include "AllRelExpr.h"
#include "RelSequence.h"
#include "Generator.h"
#include "GenExpGenerator.h"
#include "ExpCriDesc.h"
#include "ComTdbSequence.h"
#include "NumericType.h"
#include "ItmFlowControlFunction.h"
#include "ExplainTupleMaster.h"
#include "ComDefs.h"
#include "HashBufferHeader.h"
ItemExpr *
addConvNode(ItemExpr *childExpr,
ValueIdMap *mapping,
CollHeap *wHeap)
{
if(childExpr->getOperatorType() != ITM_CONVERT &&
!childExpr->isASequenceFunction()) {
ValueId topValue;
mapping->mapValueIdUp(topValue,
childExpr->getValueId());
if(topValue == childExpr->getValueId()) {
// add the convert node
ItemExpr *newChild = new(wHeap) Convert (childExpr);
newChild->synthTypeAndValueId(TRUE);
mapping->addMapEntry(newChild->getValueId(),
childExpr->getValueId());
return newChild;
} else {
return topValue.getItemExpr();
}
}
return childExpr;
}
// getHistoryAttributes
//
// Helper function that traverses the set of root sequence functions
// supplied by the compiler and constructs the set of all of the
// attributes that must be materialized in the history row.
//
void PhysSequence::getHistoryAttributes(const ValueIdSet &sequenceFunctions,
const ValueIdSet &outputFromChild,
ValueIdSet &historyAttributes,
NABoolean addConvNodes,
CollHeap *wHeap,
ValueIdMap *origAttributes) const
{
if(addConvNodes && !origAttributes) {
origAttributes = new (wHeap) ValueIdMap();
}
ValueIdSet children;
for(ValueId valId = sequenceFunctions.init();
sequenceFunctions.next(valId);
sequenceFunctions.advance(valId)) {
if(valId.getItemExpr()->isASequenceFunction()) {
ItemExpr *itmExpr = valId.getItemExpr();
switch(itmExpr->getOperatorType())
{
// The child needs to be in the history row.
//
case ITM_OLAP_LEAD:
case ITM_OLAP_LAG:
case ITM_OFFSET:
case ITM_ROWS_SINCE:
case ITM_THIS:
case ITM_NOT_THIS:
// If the child needs to be in the history buffer, then
// add a Convert node to force the value to be moved to the
// history buffer.
if (addConvNodes)
{
itmExpr->child(0) =
addConvNode(itmExpr->child(0), origAttributes, wHeap);
}
historyAttributes += itmExpr->child(0)->getValueId();
break;
// The sequence function needs to be in the history row.
//
case ITM_RUNNING_SUM:
case ITM_RUNNING_COUNT:
case ITM_RUNNING_MIN:
case ITM_RUNNING_MAX:
case ITM_LAST_NOT_NULL:
historyAttributes += itmExpr->getValueId();
break;
/*
// after PhysSequence precode gen OLAP sum and count are already transform,ed into running
// this is used during optimization phase--
case ITM_OLAP_SUM:
case ITM_OLAP_COUNT:
case ITM_OLAP_RANK:
case ITM_OLAP_DRANK:
if (addConvNodes)
{
itmExpr->child(0) =
addConvNode(itmExpr->child(0), origAttributes, wHeap);
}
historyAttributes += itmExpr->child(0)->getValueId();
//historyAttributes += itmExpr->getValueId();
break;
*/
// The child and sequence function need to be in the history row.
//
case ITM_OLAP_MIN:
case ITM_OLAP_MAX:
case ITM_MOVING_MIN:
case ITM_MOVING_MAX:
// If the child needs to be in the history buffer, then
// add a Convert node to force the value to be moved to the
// history buffer.
if (addConvNodes)
{
itmExpr->child(0) =
addConvNode(itmExpr->child(0), origAttributes, wHeap);
}
historyAttributes += itmExpr->child(0)->getValueId();
historyAttributes += itmExpr->getValueId();
break;
case ITM_RUNNING_CHANGE:
if (itmExpr->child(0)->getOperatorType() == ITM_ITEM_LIST)
{
// child is a multi-valued expression
//
ExprValueId treePtr = itmExpr->child(0);
ItemExprTreeAsList changeValues(&treePtr,
ITM_ITEM_LIST,
RIGHT_LINEAR_TREE);
CollIndex nc = changeValues.entries();
ItemExpr *newChild = NULL;
if(addConvNodes) {
newChild = addConvNode(changeValues[nc-1], origAttributes, wHeap);
historyAttributes += newChild->getValueId();
} else {
historyAttributes += changeValues[nc-1]->getValueId();
}
// add each item in the list
//
for (CollIndex i = nc; i > 0; i--)
{
if(addConvNodes) {
ItemExpr *conv
= addConvNode(changeValues[i-1], origAttributes, wHeap);
newChild = new(wHeap) ItemList(conv, newChild);
newChild->synthTypeAndValueId(TRUE);
historyAttributes += conv->getValueId();
} else {
historyAttributes += changeValues[i-1]->getValueId();
}
}
if(addConvNodes) {
itmExpr->child(0) = newChild;
}
}
else
{
// If the child needs to be in the history buffer, then
// add a Convert node to force the value to be moved to the
// history buffer.
if (addConvNodes)
{
itmExpr->child(0) =
addConvNode(itmExpr->child(0), origAttributes, wHeap);
}
historyAttributes += itmExpr->child(0)->getValueId();
}
historyAttributes += itmExpr->getValueId();
break;
default:
CMPASSERT(0);
}
}
// Gather all the children, and if not empty, recurse down to the
// next level of the tree.
//
for(Lng32 i = 0; i < valId.getItemExpr()->getArity(); i++)
{
if (!outputFromChild.contains(valId.getItemExpr()->child(i)->getValueId()))
//!valId.getItemExpr()->child(i)->nodeIsPreCodeGenned())
{
children += valId.getItemExpr()->child(i)->getValueId();
}
}
}
if (NOT children.isEmpty())
{
getHistoryAttributes( children,
outputFromChild,
historyAttributes,
addConvNodes,
wHeap,
origAttributes);
}
} // PhysSequence::getHistoryAttributes
// PhysSequence::computeHistoryAttributes
//
// Helper function to compute the attribute for the history buffer based
// on the items projected from the child and the computed history items.
// Also, adds the attribute information the the map table.
//
void
PhysSequence::computeHistoryAttributes(Generator *generator,
MapTable *localMapTable,
Attributes **attrs,
const ValueIdSet &historyIds) const
{
// Get a local handle on some of the generator objects.
//
CollHeap *wHeap = generator->wHeap();
// Populate the attribute vector with the flattened list of sequence
// functions and/or sequence function arguments that must be in the
// history row. Add convert nodes for the items that are not sequence
// functions to force them to be moved into the history row.
//
if(NOT historyIds.isEmpty())
{
Int32 i = 0;
ValueId valId;
for (valId = historyIds.init();
historyIds.next(valId);
historyIds.advance(valId))
{
// If this is not a sequence function, then insert a convert
// node.
//
if(!valId.getItemExpr()->isASequenceFunction())
{
// Get a handle on the original expression and erase
// the value ID.
//
ItemExpr *origExpr = valId.getItemExpr();
origExpr->setValueId(NULL_VALUE_ID);
origExpr->markAsUnBound();
// Construct the cast expression with the original expression
// as the child -- must have undone the child value ID to
// avoid recursion later.
//
ItemExpr *castExpr = new(wHeap)
Cast(origExpr, &(valId.getType()));
// Replace the expression for the original value ID and the
// synthesize the types and value ID for the new expression.
//
valId.replaceItemExpr(castExpr);
castExpr->synthTypeAndValueId(TRUE);
}
attrs[i++] = (generator->addMapInfoToThis(localMapTable, valId, 0))->getAttr();
}
}
} // PhysSequence::computeHistoryAttributes
// computeHistoryBuffer
//
// Helper function that traverses the set of root sequence functions
// supplied by the compiler and dynamically determines the size
// of the history buffer.
//
void PhysSequence::computeHistoryRows(const ValueIdSet &sequenceFunctions,//historyIds
Lng32 &computedHistoryRows,
Lng32 &unableToCalculate,
NABoolean &unboundedFollowing,
Lng32 &minFollowingRows,
const ValueIdSet &outputFromChild)
{
ValueIdSet children;
ValueIdSet historyAttributes;
Lng32 value = 0;
for(ValueId valId = sequenceFunctions.init();
sequenceFunctions.next(valId);
sequenceFunctions.advance(valId))
{
if(valId.getItemExpr()->isASequenceFunction())
{
ItemExpr *itmExpr = valId.getItemExpr();
switch(itmExpr->getOperatorType())
{
// THIS and NOT THIS are not dynamically computed
//
case ITM_THIS:
case ITM_NOT_THIS:
break;
// The RUNNING functions and LastNotNull all need to go back just one row.
//
case ITM_RUNNING_SUM:
case ITM_RUNNING_COUNT:
case ITM_RUNNING_MIN:
case ITM_RUNNING_MAX:
case ITM_RUNNING_CHANGE:
case ITM_LAST_NOT_NULL:
computedHistoryRows = MAXOF(computedHistoryRows, 2);
break;
case ITM_OLAP_LEAD:
value = ((ItmLeadOlapFunction*)itmExpr)->getOffset();
///set to unable to compute for now-- will change later to compte values from frameStart_ and frameEnd_
case ITM_OLAP_SUM:
case ITM_OLAP_COUNT:
case ITM_OLAP_MIN:
case ITM_OLAP_MAX:
case ITM_OLAP_RANK:
case ITM_OLAP_DRANK:
{
if ( !outputFromChild.contains(itmExpr->getValueId()))
{
ItmSeqOlapFunction * olap = (ItmSeqOlapFunction*)itmExpr;
if (olap->isFrameStartUnboundedPreceding()) //(olap->getframeStart() == - INT_MAX)
{
computedHistoryRows = MAXOF(computedHistoryRows, 2);
}
else
{
computedHistoryRows = MAXOF(computedHistoryRows, ABS(olap->getframeStart()) + 2);
}
if (!olap->isFrameEndUnboundedFollowing()) //(olap->getframeEnd() != INT_MAX)
{
computedHistoryRows = MAXOF(computedHistoryRows, ABS(olap->getframeEnd()) + 1);
}
if (olap->isFrameEndUnboundedFollowing()) //(olap->getframeEnd() == INT_MAX)
{
unboundedFollowing = TRUE;
if (olap->getframeStart() > 0)
{
minFollowingRows = ((minFollowingRows > olap->getframeStart()) ?
minFollowingRows : olap->getframeStart());
}
} else if (olap->getframeEnd() > 0)
{
minFollowingRows = ((minFollowingRows > olap->getframeEnd()) ?
minFollowingRows : olap->getframeEnd());
}
}
}
break;
// If 'rows since', we cannot determine how much history is needed.
case ITM_ROWS_SINCE:
unableToCalculate = 1;
break;
// The MOVING and OFFSET functions need to go back as far as the value
// of their second child.
//
// The second argument can be:
// Constant: for these, we can use the constant value to set the upper bound
// for the history buffer.
// ItmScalarMinMax(child0, child1) (with operType = ITM_SCALAR_MIN)
// - if child0 or child1 is a constant, then we can use either one
// to set the upper bound.
case ITM_MOVING_MIN:
case ITM_MOVING_MAX:
case ITM_OFFSET:
case ITM_OLAP_LAG:
for(Lng32 i = 1; i < itmExpr->getArity(); i++)
{
if (itmExpr->child(i)->getOperatorType() != ITM_NOTCOVERED)
{
ItemExpr * exprPtr = itmExpr->child(i);
NABoolean negate;
ConstValue *cv = exprPtr->castToConstValue(negate);
if (cv AND cv->canGetExactNumericValue())
{
Lng32 scale;
Int64 value64 = cv->getExactNumericValue(scale);
if(scale == 0 && value64 >= 0 && value64 < INT_MAX)
{
value64 = (negate ? -value64 : value64);
value = MAXOF((Lng32)value64, value);
}
}
else
{
if (exprPtr->getOperatorType() == ITM_SCALAR_MIN)
{
for(Lng32 j = 0; j < exprPtr->getArity(); j++)
{
if (exprPtr->child(j)->getOperatorType()
!= ITM_NOTCOVERED)
{
ItemExpr * exprPtr1 = exprPtr->child(j);
NABoolean negate1;
ConstValue *cv1 = exprPtr1->castToConstValue(negate1);
if (cv1 AND cv1->canGetExactNumericValue())
{
Lng32 scale1;
Int64 value64_1 = cv1->getExactNumericValue(scale1);
if(scale1 == 0 && value64_1 >= 0 && value64_1 < INT_MAX)
{
value64_1 = (negate1 ? -value64_1 : value64_1);
value = MAXOF((Lng32)value64_1, value);
}
}
}
}
}
} // end of inner else
}// end of if
}// end of for
// Check if the value is greater than zero.
// If it is, then save the value, but first
// increment the returned ConstValue by one.
// Otherwise, the offset or moving value was unable
// to be calculated.
if (value > 0)
{
value++;
computedHistoryRows = MAXOF(computedHistoryRows, value);
value = 0;
}
else
unableToCalculate = 1;
break;
default:
CMPASSERT(0);
}
}
// Gather all the children, and if not empty, recurse down to the
// next level of the tree.
//
for(Lng32 i = 0; i < valId.getItemExpr()->getArity(); i++) {
if (//valId.getItemExpr()->child(i)->getOperatorType() != ITM_NOTCOVERED //old stuff
!outputFromChild.contains(valId.getItemExpr()->child(i)->getValueId()))
{
children += valId.getItemExpr()->child(i)->getValueId();
}
}
}
if (NOT children.isEmpty())
{
computeHistoryRows(children,
computedHistoryRows,
unableToCalculate,
unboundedFollowing,
minFollowingRows,
outputFromChild);
}
} // PhysSequence::computeHistoryRows
// PhysSequence::generateChildProjectExpression
//
// Helper function to compute the child project expression. Also, sets up the
// local map table so that the moved items will be referenced from their new
// location. Returns the resulting project expression.
//
#ifdef OLD
ex_expr *
PhysSequence::generateChildProjectExpression(Generator *generator,
MapTable *mapTable,
MapTable *localMapTable,
const ValueIdSet &childProjectIds) const
{
ex_expr * projectExpr = NULL;
if(NOT childProjectIds.isEmpty())
{
// Generate the clauses for the expression
//
generator->getExpGenerator()->generateSetExpr(childProjectIds,
ex_expr::exp_ARITH_EXPR,
&projectExpr);
// Add the projected values to the local map table.
//
ValueId valId;
for(valId = childProjectIds.init();
childProjectIds.next(valId);
childProjectIds.advance(valId))
{
// Get the attribute information from the convert destination.
//
Attributes *newAttr = mapTable->getMapInfo(valId)->getAttr();
// Add the original value to the local map table with the
// attribute information from the convert desination.
//
MapInfo *mapInfo = localMapTable->addMapInfoToThis
(valId.getItemExpr()->child(0)->getValueId(), newAttr);
// Nothing more needs to be done for this item.
//
mapInfo->codeGenerated();
}
}
return projectExpr;
} // PhysSequence::generateChildProjectExpression
#endif
void
RelSequence::addCancelExpr(CollHeap *wHeap)
{
ItemExpr *cPred = NULL;
if (this->partition().entries() > 0)
{
return;
}
if(cancelExpr().entries() > 0)
{
return;
}
for(ValueId valId = selectionPred().init();
selectionPred().next(valId);
selectionPred().advance(valId))
{
ItemExpr *pred = valId.getItemExpr();
// Look for preds that select a prefix of the sequence.
// Rank() < const; Rank <= const; const > Rank; const >= Rank
ItemExpr *op1 = NULL;
ItemExpr *op2 = NULL;
if(pred->getOperatorType() == ITM_LESS ||
pred->getOperatorType() == ITM_LESS_EQ)
{
op1 = pred->child(0);
op2 = pred->child(1);
}
else if (pred->getOperatorType() == ITM_GREATER ||
pred->getOperatorType() == ITM_GREATER_EQ)
{
op1 = pred->child(1);
op2 = pred->child(0);
}
NABoolean negate;
if (op1 && op2 &&
(op2->getOperatorType() == ITM_CONSTANT ||
op2->getOperatorType() == ITM_DYN_PARAM) &&
(op1->getOperatorType() == ITM_OLAP_RANK ||
op1->getOperatorType() == ITM_OLAP_DRANK ||
(op1->getOperatorType() == ITM_OLAP_COUNT &&
op1->child(0)->getOperatorType() == ITM_CONSTANT &&
!op1->child(0)->castToConstValue(negate)->isNull())))
{
cPred = new(wHeap) UnLogic(ITM_NOT, pred);
//break at first occurence
break;
}
}
if(cPred)
{
cPred->synthTypeAndValueId(TRUE);
cancelExpr().insert(cPred->getValueId());
}
}
void PhysSequence::addCheckPartitionChangeExpr( Generator *generator,
NABoolean addConvNodes,
ValueIdMap *origAttributes)
{
if(!(partition().entries() > 0))
{
return;
}
CollHeap * wHeap = generator->wHeap();
if(addConvNodes && !origAttributes)
{
origAttributes = new (wHeap) ValueIdMap();
}
ItemExpr * checkPartChng= NULL;
for (CollIndex ix = 0; ix < partition().entries(); ix++)
{
ItemExpr *iePtr = partition().at(ix).getItemExpr();
ItemExpr * convIePtr = addConvNode(iePtr, origAttributes,wHeap);
movePartIdsExpr() += convIePtr->getValueId();
ItemExpr * comp = new (wHeap) BiRelat(ITM_EQUAL,
iePtr,
convIePtr,
TRUE);
if (!checkPartChng)
{
checkPartChng = comp;
}
else
{
checkPartChng = new (wHeap) BiLogic( ITM_AND,
checkPartChng,
comp);
}
}
checkPartChng->convertToValueIdSet(checkPartitionChangeExpr(),
generator->getBindWA(),
ITM_AND);
}
// PhysSequence::preCodeGen() -------------------------------------------
// Perform local query rewrites such as for the creation and
// population of intermediate tables, for accessing partitioned
// data. Rewrite the value expressions after minimizing the dataflow
// using the transitive closure of equality predicates.
//
// PhysSequence::preCodeGen() - is basically the same as the RelExpr::
// preCodeGen() except that here we replace the VEG references in the
// sortKey() list, as well as the selectionPred().
//
// Parameters:
//
// Generator *generator
// IN/OUT : A pointer to the generator object which contains the state,
// and tools (e.g. expression generator) to generate code for
// this node.
//
// ValueIdSet &externalInputs
// IN : The set of external Inputs available to this node.
//
//
RelExpr * PhysSequence::preCodeGen(Generator * generator,
const ValueIdSet & externalInputs,
ValueIdSet &pulledNewInputs)
{
if (nodeIsPreCodeGenned())
return this;
// Resolve the VEGReferences and VEGPredicates, if any, that appear
// in the Characteristic Inputs, in terms of the externalInputs.
//
getGroupAttr()->resolveCharacteristicInputs(externalInputs);
// My Characteristic Inputs become the external inputs for my children.
//
ValueIdSet childPulledInputs;
child(0) = child(0)->preCodeGen(generator, externalInputs, pulledNewInputs);
if (! child(0).getPtr())
return NULL;
// process additional input value ids the child wants
getGroupAttr()->addCharacteristicInputs(childPulledInputs);
pulledNewInputs += childPulledInputs;
// The VEG expressions in the selection predicates and the characteristic
// outputs can reference any expression that is either a potential output
// or a characteristic input for this RelExpr. Supply these values for
// rewriting the VEG expressions.
//
ValueIdSet availableValues;
getInputValuesFromParentAndChildren(availableValues);
// The sequenceFunctions() have access to only the Input
// Values. These can come from the parent or be the outputs of the
// child.
//
sequenceFunctions().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
sequencedColumns().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
requiredOrder().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
cancelExpr().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
partition().
replaceVEGExpressions(availableValues,
getGroupAttr()->getCharacteristicInputs());
//checkPartitionChangeExpr().
// replaceVEGExpressions(availableValues,
// getGroupAttr()->getCharacteristicInputs());
// The selectionPred has access to only the output values generated by
// Sequence and input values from the parent.
//
getInputAndPotentialOutputValues(availableValues);
// Rewrite the selection predicates.
//
NABoolean replicatePredicates = TRUE;
selectionPred().replaceVEGExpressions
(availableValues,
getGroupAttr()->getCharacteristicInputs(),
FALSE, // no key predicates here
0 /* no need for idempotence here */,
replicatePredicates
);
// Replace VEG references in the outputs and remove redundant
// outputs.
//
getGroupAttr()->resolveCharacteristicOutputs
(availableValues,
getGroupAttr()->getCharacteristicInputs());
addCancelExpr(generator->wHeap());
///addCheckPartitionChangeExpr(generator->wHeap());
transformOlapFunctions(generator->wHeap());
if ( getUnboundedFollowing() ) {
// Count this Seq as a BMO and add its needed memory to the total needed
generator->incrNumBMOs();
if ((ActiveSchemaDB()->getDefaults()).
getAsDouble(BMO_MEMORY_LIMIT_PER_NODE_IN_MB) > 0)
generator->incrBMOsMemory(getEstimatedRunTimeMemoryUsage(generator, TRUE));
}
else
generator->incrNBMOsMemoryPerNode(getEstimatedRunTimeMemoryUsage(generator, TRUE));
markAsPreCodeGenned();
return this;
} // PhysSequence::preCodeGen
short
PhysSequence::codeGen(Generator *generator)
{
// Get a local handle on some of the generator objects.
//
CollHeap *wHeap = generator->wHeap();
Space *space = generator->getSpace();
ExpGenerator *expGen = generator->getExpGenerator();
MapTable *mapTable = generator->getMapTable();
// Allocate a new map table for this node. This must be done
// before generating the code for my child so that this local
// map table will be sandwiched between the map tables already
// generated and the map tables generated by my offspring.
//
// Only the items available as output from this node will
// be put in the local map table. Before exiting this function, all of
// my offsprings map tables will be removed. Thus, none of the outputs
// from nodes below this node will be visible to nodes above it except
// those placed in the local map table and those that already exist in
// my ancestors map tables. This is the standard mechanism used in the
// generator for managing the access to item expressions.
//
MapTable *localMapTable = generator->appendAtEnd();
// Since this operation doesn't modify the row on the way down the tree,
// go ahead and generate the child subtree. Capture the given composite row
// descriptor and the child's returned TDB and composite row descriptor.
//
ex_cri_desc * givenCriDesc = generator->getCriDesc(Generator::DOWN);
child(0)->codeGen(generator);
ComTdb *childTdb = (ComTdb*)generator->getGenObj();
ex_cri_desc * childCriDesc = generator->getCriDesc(Generator::UP);
ExplainTuple *childExplainTuple = generator->getExplainTuple();
// Make all of my child's outputs map to ATP 1. The child row is only
// accessed in the project expression and it will be the second ATP
// (ATP 1) passed to this expression.
//
localMapTable->setAllAtp(1);
// My returned composite row has an additional tupp.
//
Int32 numberTuples = givenCriDesc->noTuples() + 1;
ex_cri_desc * returnCriDesc
= new (space) ex_cri_desc(numberTuples, space);
// For now, the history buffer row looks just the return row. Later,
// it may be useful to add an additional tupp for sequence function
// itermediates that are not needed above this node -- thus, this
// ATP is kept separate from the returned ATP.
//
const Int32 historyAtp = 0;
const Int32 historyAtpIndex = numberTuples-1;
ex_cri_desc *historyCriDesc = new (space) ex_cri_desc(numberTuples, space);
ExpTupleDesc *historyDesc = 0;
//seperate the read and retur expressions
seperateReadAndReturnItems(wHeap);
// The history buffer consists of items projected directly from the
// child, the root sequence functions, the value arguments of the
// offset functions, and running sequence functions. These elements must
// be materialized in the history buffer in order to be able to compute
// the outputs of this node -- the items projected directly from the child
// (projectValues) and the root sequence functions (sequenceFunctions).
//
// Compute the set of sequence function items that must be materialized
// int the history buffer. -- sequenceItems
//
// Compute the set of items in the history buffer: the union of the
// projected values and the value arguments. -- historyIds
//
// Compute the set of items in the history buffer that are computed:
// the difference between all the elements in the history buffer
// and the projected items. -- computedHistoryIds
//
// KB---will need to return atp with 3 tups only 0,1 and 2
// 2 -->values from history buffer after ther are moved to it
addCheckPartitionChangeExpr(generator, TRUE);
ValueIdSet historyIds;
historyIds += movePartIdsExpr();
historyIds += sequencedColumns();
ValueIdSet outputFromChild = child(0)->getGroupAttr()->getCharacteristicOutputs();
getHistoryAttributes(readSeqFunctions(),outputFromChild, historyIds, TRUE, wHeap);
// remove LEAD from readSeqFunctions() and remeber it in leadFuncs
// Its child has CONVERT added in getHistoryAttributes() call.
ValueIdSet leadFuncs;
ValueIdSet leadFuncChildren;
readSeqFunctions().findAllOpType(ITM_OLAP_LEAD, leadFuncs);
leadFuncs.findAllChildren(leadFuncChildren);
readSeqFunctions() -= leadFuncs;
readSeqFunctions() += leadFuncChildren;
// Add in the top level sequence functions.
historyIds += readSeqFunctions();
// remove LEAD in old form from the return function
ValueIdSet leadFuncsInReturn;
returnSeqFunctions().findAllOpType(ITM_OLAP_LEAD, leadFuncsInReturn);
returnSeqFunctions() -= leadFuncsInReturn;
// add in the LEAD in new form
returnSeqFunctions() += leadFuncs;
getHistoryAttributes(returnSeqFunctions(),outputFromChild, historyIds, TRUE, wHeap);
// Add in the top level functions.
historyIds += returnSeqFunctions();
// Layout the work tuple format which consists of the projected
// columns and the computed sequence functions. First, compute
// the number of attributes in the tuple.
//
ULng32 numberAttributes
= ((NOT historyIds.isEmpty()) ? historyIds.entries() : 0);
// Allocate an attribute pointer vector from the working heap.
//
Attributes **attrs = new(wHeap) Attributes*[numberAttributes];
// Fill in the attributes vector for the history buffer including
// adding the entries to the map table. Also, compute the value ID
// set for the elements to project from the child row.
//
//??????????re-visit this function??
computeHistoryAttributes(generator,
localMapTable,
attrs,
historyIds);
// Create the tuple descriptor for the history buffer row and
// assign the offsets to the attributes. For now, this layout is
// identical to the returned row. Set the tuple descriptors for
// the return and history rows.
//
ULng32 historyRecLen;
expGen->processAttributes(numberAttributes,
attrs,
ExpTupleDesc::SQLARK_EXPLODED_FORMAT,
historyRecLen,
historyAtp,
historyAtpIndex,
&historyDesc,
ExpTupleDesc::SHORT_FORMAT);
NADELETEBASIC(attrs, wHeap);
returnCriDesc->setTupleDescriptor(historyAtpIndex, historyDesc);
historyCriDesc->setTupleDescriptor(historyAtpIndex, historyDesc);
// If there are any sequence function items, generate the sequence
// function expressions.
//
ex_expr * readSeqExpr = NULL;
if(NOT readSeqFunctions().isEmpty())
{
ValueIdSet seqVals = readSeqFunctions();
seqVals += sequencedColumns();
seqVals += movePartIdsExpr();
expGen->generateSequenceExpression(seqVals,
readSeqExpr);
}
ex_expr *checkPartChangeExpr = NULL;
if (!checkPartitionChangeExpr().isEmpty()) {
ItemExpr * newCheckPartitionChangeTree=
checkPartitionChangeExpr().rebuildExprTree(ITM_AND,TRUE,TRUE);
expGen->generateExpr(newCheckPartitionChangeTree->getValueId(),
ex_expr::exp_SCAN_PRED,
&checkPartChangeExpr);
}
//unsigned long rowLength;
ex_expr * returnExpr = NULL;
if(NOT returnSeqFunctions().isEmpty())
{
expGen->generateSequenceExpression(returnSeqFunctions(),
returnExpr);
}
// Generate expression to evaluate predicate on the output
//
ex_expr *postPred = 0;
if (! selectionPred().isEmpty()) {
ItemExpr * newPredTree =
selectionPred().rebuildExprTree(ITM_AND,TRUE,TRUE);
expGen->generateExpr(newPredTree->getValueId(), ex_expr::exp_SCAN_PRED,
&postPred);
}
// Reset ATP's to zero for parent.
//
localMapTable->setAllAtp(0);
// Generate expression to evaluate the cancel expression
//
ex_expr *cancelExpression = 0;
if (! cancelExpr().isEmpty()) {
ItemExpr * newCancelExprTree =
cancelExpr().rebuildExprTree(ITM_AND,TRUE,TRUE);
expGen->generateExpr(newCancelExprTree->getValueId(), ex_expr::exp_SCAN_PRED,
&cancelExpression);
}
//
// For overflow
//
// ( The following are meaningless if ! unlimitedHistoryRows() )
NABoolean noOverflow =
CmpCommon::getDefault(EXE_BMO_DISABLE_OVERFLOW) == DF_ON ;
NABoolean logDiagnostics =
CmpCommon::getDefault(EXE_DIAGNOSTIC_EVENTS) == DF_ON ;
NABoolean possibleMultipleCalls = generator->getRightSideOfFlow() ;
short scratchTresholdPct =
(short) CmpCommon::getDefaultLong(SCRATCH_FREESPACE_THRESHOLD_PERCENT);
// determione the memory usage (amount of memory as percentage from total
// physical memory used to initialize data structures)
unsigned short memUsagePercent =
(unsigned short) getDefault(BMO_MEMORY_USAGE_PERCENT);
short memPressurePct = (short)getDefault(GEN_MEM_PRESSURE_THRESHOLD);
historyRecLen = ROUND8(historyRecLen);
Lng32 maxNumberOfOLAPBuffers;
Lng32 maxRowsInOLAPBuffer;
Lng32 minNumberOfOLAPBuffers;
Lng32 numberOfWinOLAPBuffers;
Lng32 olapBufferSize;
computeHistoryParams(historyRecLen,
maxRowsInOLAPBuffer,
minNumberOfOLAPBuffers,
numberOfWinOLAPBuffers,
maxNumberOfOLAPBuffers,
olapBufferSize);
ComTdbSequence *sequenceTdb
= new(space) ComTdbSequence(readSeqExpr,
returnExpr,
postPred,
cancelExpression,
getMinFollowingRows(),
historyRecLen,
historyAtpIndex,
childTdb,
givenCriDesc,
returnCriDesc,
(queue_index)getDefault(GEN_SEQFUNC_SIZE_DOWN),
(queue_index)getDefault(GEN_SEQFUNC_SIZE_UP),
getDefault(GEN_SEQFUNC_NUM_BUFFERS),
getDefault(GEN_SEQFUNC_BUFFER_SIZE),
olapBufferSize,
maxNumberOfOLAPBuffers,
numHistoryRows(),
getUnboundedFollowing(),
logDiagnostics,
possibleMultipleCalls,
scratchTresholdPct,
memUsagePercent,
memPressurePct,
maxRowsInOLAPBuffer,
minNumberOfOLAPBuffers,
numberOfWinOLAPBuffers,
noOverflow,
checkPartChangeExpr);
generator->initTdbFields(sequenceTdb);
// update the estimated value of HistoryRowLength with actual value
//setEstHistoryRowLength(historyIds.getRowLength());
double sequenceMemEst = generator->getEstMemPerInst(getKey());
generator->addToTotalEstimatedMemory(sequenceMemEst);
if(!generator->explainDisabled()) {
generator->
setExplainTuple(addExplainInfo(sequenceTdb,
childExplainTuple,
0,
generator));
}
sequenceTdb->setScratchIOVectorSize((Int16)getDefault(SCRATCH_IO_VECTOR_SIZE_HASH));
sequenceTdb->setOverflowMode(generator->getOverflowMode());
sequenceTdb->setBmoMinMemBeforePressureCheck((Int16)getDefault(EXE_BMO_MIN_SIZE_BEFORE_PRESSURE_CHECK_IN_MB));
if(generator->getOverflowMode() == ComTdb::OFM_SSD )
sequenceTdb->setBMOMaxMemThresholdMB((UInt16)(ActiveSchemaDB()->
getDefaults()).
getAsLong(SSD_BMO_MAX_MEM_THRESHOLD_IN_MB));
else
sequenceTdb->setBMOMaxMemThresholdMB((UInt16)(ActiveSchemaDB()->
getDefaults()).
getAsLong(EXE_MEMORY_AVAILABLE_IN_MB));
// The CQD EXE_MEM_LIMIT_PER_BMO_IN_MB has precedence over the mem quota sys
NADefaults &defs = ActiveSchemaDB()->getDefaults();
UInt16 mmu = (UInt16)(defs.getAsDouble(EXE_MEM_LIMIT_PER_BMO_IN_MB));
UInt16 numBMOsInFrag = (UInt16)generator->getFragmentDir()->getNumBMOs();
Lng32 numStreams;
double bmoMemoryUsagePerNode = generator->getEstMemPerNode(getKey(), numStreams);
double memQuota = 0;
double memQuotaRatio;
if (mmu != 0)
sequenceTdb->setMemoryQuotaMB(mmu);
else {
// Apply quota system if either one the following two is true:
// 1. the memory limit feature is turned off and more than one BMOs
// 2. the memory limit feature is turned on
NABoolean mlimitPerNode = defs.getAsDouble(BMO_MEMORY_LIMIT_PER_NODE_IN_MB) > 0;
if ( mlimitPerNode || numBMOsInFrag > 1 ) {
double memQuota =
computeMemoryQuota(generator->getEspLevel() == 0,
mlimitPerNode,
generator->getBMOsMemoryLimitPerNode().value(),
generator->getTotalNumBMOs(),
generator->getTotalBMOsMemoryPerNode().value(),
numBMOsInFrag,
bmoMemoryUsagePerNode,
numStreams,
memQuotaRatio
);
}
Lng32 seqMemoryLowbound = defs.getAsLong(EXE_MEMORY_LIMIT_LOWER_BOUND_SEQUENCE);
Lng32 memoryUpperbound = defs.getAsLong(BMO_MEMORY_LIMIT_UPPER_BOUND);
if ( memQuota < seqMemoryLowbound ) {
memQuota = seqMemoryLowbound;
memQuotaRatio = BMOQuotaRatio::MIN_QUOTA;
}
else if (memQuota > memoryUpperbound)
memQuota = memoryUpperbound;
sequenceTdb->setMemoryQuotaMB( UInt16(memQuota) );
}
generator->setCriDesc(givenCriDesc, Generator::DOWN);
generator->setCriDesc(returnCriDesc, Generator::UP);
generator->setGenObj(this, sequenceTdb);
return 0;
}
void PhysSequence::seperateReadAndReturnItems(
//ValueIdSet & readPhaseSet,
//ValueIdSet & returnPhaseSet,
CollHeap *wHeap)
{
ValueIdSet outputFromChild = child(0)->getGroupAttr()->getCharacteristicOutputs();
ValueIdSet seqFuncs = sequenceFunctions();
for(ValueId valId = seqFuncs.init();
seqFuncs.next(valId);
seqFuncs.advance(valId))
{
computeReadNReturnItems(valId,
valId,
//sequenceFunctions(),
//returnSeqFunctions(),
outputFromChild,
wHeap);
}
}
void PhysSequence::transformOlapFunctions(CollHeap *wHeap)
{
for(ValueId valId = sequenceFunctions().init();
sequenceFunctions().next(valId);
sequenceFunctions().advance(valId))
{
ItemExpr * itmExpr = valId.getItemExpr();
//NAType *itmType = itmExpr->getValueId().getType().newCopy(wHeap);
if (itmExpr->isOlapFunction())
{
NAType *itmType = itmExpr->getValueId().getType().newCopy(wHeap);
itmExpr = ((ItmSeqOlapFunction*)itmExpr)->transformOlapFunction(wHeap);
if ( itmExpr->getOperatorType() == ITM_OLAP_LEAD ) {
computeAndSetMinFollowingRows(((ItmLeadOlapFunction*)itmExpr)->getOffset());
}
CMPASSERT(itmExpr);
if(itmExpr->getValueId() != valId)
{
itmExpr = new (wHeap) Cast(itmExpr, itmType);
itmExpr->synthTypeAndValueId(TRUE);
valId.replaceItemExpr(itmExpr);
itmExpr->getValueId().changeType(itmType);//????
}
}
itmExpr->transformOlapFunctions(wHeap);
}
}
void PhysSequence::computeHistoryParams(Lng32 histRecLength,
Lng32 &maxRowsInOLAPBuffer,
Lng32 &minNumberOfOLAPBuffers,
Lng32 &numberOfWinOLAPBuffers,
Lng32 &maxNumberOfOLAPBuffers,
Lng32 &olapBufferSize)
{
Lng32 maxFWAdditionalBuffers = getDefault(OLAP_MAX_FIXED_WINDOW_EXTRA_BUFFERS);
maxNumberOfOLAPBuffers = getDefault(OLAP_MAX_NUMBER_OF_BUFFERS);
// For testing we may force a smaller max # rows in a buffer
Lng32 forceMaxRowsInOLAPBuffer = getDefault(OLAP_MAX_ROWS_IN_OLAP_BUFFER);
minNumberOfOLAPBuffers = 0;
numberOfWinOLAPBuffers = 0;
olapBufferSize = getDefault(OLAP_BUFFER_SIZE);
Lng32 olapAvailableBufferSize = olapBufferSize
- ROUND8(sizeof(HashBufferHeader)); // header
// also consider the trailer reserved for DP2's checksum, for overflow only
if ( getUnboundedFollowing() ) olapAvailableBufferSize -= 8 ;
if ( histRecLength > olapAvailableBufferSize)
{ // history row exceeds size limit fir the overflow
if (getUnboundedFollowing())
{
*CmpCommon::diags() << DgSqlCode(-4390)
<< DgInt0(olapAvailableBufferSize);
GenExit();
return ;
}
else
{
olapAvailableBufferSize = histRecLength;
// Buffer needs to accomodate the header, but not the trailer (no O/F)
olapBufferSize = histRecLength + ROUND8(sizeof(HashBufferHeader)) ;
}
}
// Calculate the max # rows in a buffer
maxRowsInOLAPBuffer = olapAvailableBufferSize / histRecLength ;
// For testing - we may override the above max # rows
if ( forceMaxRowsInOLAPBuffer > 0 &&
forceMaxRowsInOLAPBuffer < maxRowsInOLAPBuffer )
maxRowsInOLAPBuffer = forceMaxRowsInOLAPBuffer ;
minNumberOfOLAPBuffers = numHistoryRows_ / maxRowsInOLAPBuffer;
if ( numHistoryRows_ % maxRowsInOLAPBuffer >0)
{
minNumberOfOLAPBuffers++;
}
if (getUnboundedFollowing())
{
numberOfWinOLAPBuffers = minFollowingRows_/maxRowsInOLAPBuffer ;
if (minFollowingRows_ % maxRowsInOLAPBuffer >0)
{
numberOfWinOLAPBuffers++;
}
if (numberOfWinOLAPBuffers +1 > minNumberOfOLAPBuffers)
{
minNumberOfOLAPBuffers = numberOfWinOLAPBuffers +1 ;
}
//produce an error here if maxNumberOfOLAPBuffers < minNumberOfOLAPBuffers
}
else
{
maxNumberOfOLAPBuffers = minNumberOfOLAPBuffers + maxFWAdditionalBuffers;
}
}
CostScalar PhysSequence::getEstimatedRunTimeMemoryUsage(Generator *generator, NABoolean perNode, Lng32 *numStreams)
{
// input param is not used as this operator does not participate in the
// quota system.
ValueIdSet outputFromChild = child(0).getGroupAttr()->getCharacteristicOutputs();
//TODO: Line below dumps core at times
//const CostScalar maxCard = child(0).getGroupAttr()->getResultMaxCardinalityForEmptyInput();
const CostScalar maxCard = 0;
const CostScalar rowCount = numHistoryRows();
//ValueIdSet historyIds;
//getHistoryAttributes(sequenceFunctions(),outputFromChild, historyIds);
//historyIds += sequenceFunctions();
const Lng32 historyBufferWidthInBytes = getEstHistoryRowLength(); //historyIds.getRowLength();
const double historyBufferSizeInBytes = rowCount.value() *
historyBufferWidthInBytes;
// totalMemory is per CPU at this point of time.
double totalMemory = historyBufferSizeInBytes;
CostScalar estMemPerNode;
CostScalar estMemPerInst;
const PhysicalProperty* const phyProp = getPhysicalProperty();
Lng32 numOfStreams = 1;
if (phyProp != NULL)
{
PartitioningFunction * partFunc = phyProp -> getPartitioningFunction() ;
numOfStreams = partFunc->getCountOfPartitions();
if (numOfStreams <= 0)
numOfStreams = 1;
// totalMemory is for all CPUs at this point of time.
totalMemory *= numOfStreams;
}
if (numStreams != NULL)
*numStreams = numOfStreams;
estMemPerNode = totalMemory /= MINOF(MAXOF(gpClusterInfo->getTotalNumberOfCPUs(), 1), numOfStreams);
estMemPerInst = totalMemory /= numOfStreams;
OperBMOQuota *operBMOQuota = new (generator->wHeap()) OperBMOQuota(getKey(), numOfStreams,
estMemPerNode, estMemPerInst, rowCount, maxCard);
generator->getBMOQuotaMap()->insert(operBMOQuota);
if (perNode)
return estMemPerNode;
else
return estMemPerInst;
}
ExplainTuple*
PhysSequence::addSpecificExplainInfo(ExplainTupleMaster *explainTuple,
ComTdb * tdb,
Generator *generator)
{
NAString buffer = "num_history_rows: ";
char buf[20];
sprintf(buf, "%d ", numHistoryRows());
buffer += buf;
const Lng32 bufferWidth = ((ComTdbSequence *)tdb)->getRecLength();
buffer += " history_row_size: ";
sprintf(buf, "%d ", bufferWidth);
buffer += buf;
buffer += " est_history_row_size: ";
sprintf(buf, "%d ", getEstHistoryRowLength());
buffer += buf;
//Int16 int16Val = ((ComTdbSequence *)tdb)->getUnboundedFollowing();
Int16 int16Val = ((ComTdbSequence *)tdb)->isUnboundedFollowing();
buffer += " unbounded_following: ";
sprintf(buf, "%d ", int16Val);
buffer += buf;
Int32 int32Val;
int32Val = ((ComTdbSequence *)tdb)->getMaxRowsInOLAPBuffer();
buffer += " max_rows_in_olap_buffer: ";
sprintf(buf, "%d ", int32Val);
buffer += buf;
int32Val = ((ComTdbSequence *)tdb)->getMinNumberOfOLAPBuffers();
buffer += " min_number_of_olap_buffer: ";
sprintf(buf, "%d ", int32Val);
buffer += buf;
int32Val = ((ComTdbSequence *)tdb)->getMaxNumberOfOLAPBuffers();
buffer += " max_number_of_olap_buffer: ";
sprintf(buf, "%d ", int32Val);
buffer += buf;
int32Val = ((ComTdbSequence *)tdb)->getOLAPBufferSize();
buffer += " olap_buffer_size: ";
sprintf(buf, "%d ", int32Val);
buffer += buf;
int32Val = ((ComTdbSequence *)tdb)->getNumberOfWinOLAPBuffers();
buffer += " number_of_win_olap_buffers: ";
sprintf(buf, "%d ", int32Val);
buffer += buf;
int16Val = ! ((ComTdbSequence *)tdb)->isNoOverflow();
buffer += " overflow_enabled: ";
sprintf(buf, "%d ", int16Val);
buffer += buf;
int32Val = ((ComTdbSequence *)tdb)->getMinFollowing();
buffer += " minimum_following: ";
sprintf(buf, "%d ", int32Val);
buffer += buf;
explainTuple->setDescription(buffer);
return(explainTuple);
}
void PhysSequence::computeReadNReturnItems( ValueId topSeqVid,
ValueId vid,
const ValueIdSet &outputFromChild,
CollHeap *wHeap)
{
ItemExpr * itmExpr = vid.getItemExpr();
if (outputFromChild.contains(vid))
{
return;
}
// Make sure both the return and returning function
// contain the same LEAD function. later on in
// Sequence::codeGen() near seperateReadAndReturnItems(),
// the function is separated as follows.
//
// LEAD in old form: LEAD(c)
// LEAD in new form: LEAD(CONVERT(c))
//
// After separation:
//
// readFunction: COVNERT(C)
// returnFunction: LEAD(CONVERT(C))
//
// In particular, both functions refer to CONVERT(C)
// with the same valueId.
//
if ( itmExpr->getOperatorType() == ITM_OLAP_LEAD )
{
readSeqFunctions() += topSeqVid;
returnSeqFunctions() += topSeqVid;
return;
}
//test if itm_minus and then if negative offset ....
if ( itmExpr->getOperatorType() == ITM_OFFSET &&
((ItmSeqOffset *)itmExpr)->getOffsetConstantValue() < 0)
{
readSeqFunctions() -= topSeqVid;
returnSeqFunctions() += topSeqVid;
readSeqFunctions() += itmExpr->child(0)->castToItemExpr()->getValueId();
return;
}
if (itmExpr->getOperatorType() == ITM_MINUS)
{
ItemExpr * chld0 = itmExpr->child(0)->castToItemExpr();
if ( chld0->getOperatorType() == ITM_OFFSET &&
((ItmSeqOffset *)chld0)->getOffsetConstantValue() <0)
{
readSeqFunctions() -= topSeqVid;
returnSeqFunctions() += topSeqVid;
readSeqFunctions() += chld0->child(0)->castToItemExpr()->getValueId();
ItemExpr * chld1 = itmExpr->child(1)->castToItemExpr();
if (chld1->getOperatorType() == ITM_OFFSET &&
((ItmSeqOffset *)chld1)->getOffsetConstantValue() < 0)
{
readSeqFunctions() += chld1->child(0)->castToItemExpr()->getValueId();
}
else
{
readSeqFunctions() += chld1->getValueId();
}
return;
}
}
if (itmExpr->getOperatorType() == ITM_OLAP_MIN ||
itmExpr->getOperatorType() == ITM_OLAP_MAX)
{
ItmSeqOlapFunction * olap = (ItmSeqOlapFunction *)itmExpr;
if (olap->getframeEnd()>0)
{
readSeqFunctions() -= topSeqVid;
returnSeqFunctions() += topSeqVid;
ItemExpr *newChild = new(wHeap) Convert (itmExpr->child(0)->castToItemExpr());
newChild->synthTypeAndValueId(TRUE);
itmExpr->child(0) = newChild;
readSeqFunctions() += newChild->getValueId();
return;
}
}
if (itmExpr->getOperatorType() == ITM_SCALAR_MIN ||
itmExpr->getOperatorType() == ITM_SCALAR_MAX)
{
ItemExpr * chld0 = itmExpr->child(0)->castToItemExpr();
ItemExpr * chld1 = itmExpr->child(1)->castToItemExpr();
if ((chld0->getOperatorType() == ITM_OLAP_MIN && chld1->getOperatorType() == ITM_OLAP_MIN )||
(chld0->getOperatorType() == ITM_OLAP_MAX && chld1->getOperatorType() == ITM_OLAP_MAX ))
{
ItmSeqOlapFunction * olap0 = (ItmSeqOlapFunction *)chld0;
ItmSeqOlapFunction * olap1 = (ItmSeqOlapFunction *)chld1;
if ( olap1->getframeEnd()>0)
{
CMPASSERT(olap0->getframeEnd()==0);
readSeqFunctions() -= topSeqVid;
returnSeqFunctions() += topSeqVid;
readSeqFunctions() += olap0->getValueId();
ItemExpr *newChild = new(wHeap) Convert (olap1->child(0)->castToItemExpr());
newChild->synthTypeAndValueId(TRUE);
olap1->child(0) = newChild;
readSeqFunctions() += newChild->getValueId();
}
else
{
CMPASSERT(olap1->getframeEnd()==0);
readSeqFunctions() -= topSeqVid;
returnSeqFunctions() += topSeqVid;
readSeqFunctions() += olap1->getValueId();
ItemExpr *newChild = new(wHeap) Convert (olap0->child(0)->castToItemExpr());
newChild->synthTypeAndValueId(TRUE);
olap0->child(0) = newChild;
readSeqFunctions() += newChild->getValueId();
}
return;
}
}
for (Int32 i= 0 ; i < itmExpr->getArity(); i++)
{
ItemExpr * chld= itmExpr->child(i);
computeReadNReturnItems(topSeqVid,
chld->getValueId(),
outputFromChild,
wHeap);
}
}//void PhysSequence::computeReadNReturnItems(ItemExpr * other)