blob: 425a0b65c27c4c0c5ecabb1e5a77758dcb4340ec [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
**************************************************************************
*
* File: PartFunc.cpp
* Description: Partitioning Function
* Created: 11/16/1994
* Language: C++
*
*
*
*
**************************************************************************
*/
// -----------------------------------------------------------------------
#include "PartReq.h"
#include "PhyProp.h"
#include "ItemColRef.h"
#include "ItemLog.h"
#include "ItemFunc.h"
#include "ItemOther.h"
#include "ItemArith.h"
#include "opt.h"
#include "str.h"
#include "NumericType.h"
#include "MiscType.h"
#include "NAFileSet.h"
#include "SearchKey.h"
#include "GroupAttr.h"
#include "Generator.h"
// To test hash2 part number generated by a direct call to
// ExHDHash::hash() and compare the result, just uncomment out
// the following two lines.
//#include "Generator.h"
#include "exp_function.h"
// ***********************************************************************
// A function having an external linkage to allow display() to
// be called on a tree object. This is a workaround for bugs/missing
// functionality in ObjectCenter that cause display() to become
// an undefined symbol.
// ***********************************************************************
void displayPartitioningFunction(const PartitioningFunction& pf)
{
pf.display();
}
void displayPartitioningFunction(const PartitioningFunction* pf)
{
if (pf)
pf->display();
}
void displayPartitionBoundaries(const RangePartitionBoundaries& pb)
{
pb.display();
}
void displayPartitionBoundaries(const RangePartitionBoundaries* pb)
{
if (pb)
pb->display();
}
// ***********************************************************************
// PartitioningFunction
// ***********************************************************************
//------------------------------------------------------------------------
// PartitioningFunction destructor.
//------------------------------------------------------------------------
PartitioningFunction::~PartitioningFunction()
{
if (nodeMap_ != NULL)
{
delete nodeMap_;
}
} //PartitioningFunction destructor
// -----------------------------------------------------------------------
// Methods for perform type-safe pointer casts.
// -----------------------------------------------------------------------
const LogPhysPartitioningFunction*
PartitioningFunction::castToLogPhysPartitioningFunction() const
{ return NULL; }
const SinglePartitionPartitioningFunction*
PartitioningFunction::castToSinglePartitionPartitioningFunction() const
{ return NULL; }
const ReplicateViaBroadcastPartitioningFunction*
PartitioningFunction::castToReplicateViaBroadcastPartitioningFunction() const
{ return NULL; }
const ReplicateNoBroadcastPartitioningFunction*
PartitioningFunction::castToReplicateNoBroadcastPartitioningFunction() const
{ return NULL; }
const HashPartitioningFunction*
PartitioningFunction::castToHashPartitioningFunction() const
{ return NULL; }
const TableHashPartitioningFunction*
PartitioningFunction::castToTableHashPartitioningFunction() const
{ return NULL; }
const HashDistPartitioningFunction*
PartitioningFunction::castToHashDistPartitioningFunction() const
{ return NULL; }
const Hash2PartitioningFunction*
PartitioningFunction::castToHash2PartitioningFunction() const
{ return NULL; }
const RangePartitioningFunction*
PartitioningFunction::castToRangePartitioningFunction() const
{ return NULL; }
const RoundRobinPartitioningFunction*
PartitioningFunction::castToRoundRobinPartitioningFunction() const
{ return NULL; }
const SkewedDataPartitioningFunction*
PartitioningFunction::castToSkewedDataPartitioningFunction() const
{ return NULL; };
const HivePartitioningFunction*
PartitioningFunction::castToHivePartitioningFunction() const
{ return NULL; };
// ---------------------------------------------------------------------
// Method to test if the partitioning key contains any approximate
// numeric type columns. Necessary because in some cases certain
// parallel operations do not function properly if the partitioning
// key of the table contains approximate numeric columns.
// ---------------------------------------------------------------------
NABoolean PartitioningFunction::partKeyContainsFloatColumn() const
{
if (NOT getPartitioningKey().isEmpty())
{
ValueIdSet partKeyColumns = getPartitioningKey();
for (ValueId vid = partKeyColumns.init();
partKeyColumns.next(vid);
partKeyColumns.advance(vid))
{
const NAType& columnType = vid.getType();
if ((columnType.getTypeQualifier() == NA_NUMERIC_TYPE) AND
NOT ((NumericType&)columnType).isExact())
{
return TRUE;
}
} // end for
} // end if part key is not empty
return FALSE;
} // partKeyContainsFloatColumn()
// -----------------------------------------------------------------------
// PartitioningFunction::getNodeMap()
// Return base class node map by default.
// -----------------------------------------------------------------------
const NodeMap*
PartitioningFunction::getNodeMap() const { return nodeMap_; }
// use any existing nodemap from my req or my child (or synthesize one) that
// matches my partition count requirement
void
PartitioningFunction::useNodeMapFromReqOrChild(PartitioningRequirement *req,
PartitioningFunction *childPF,
NABoolean forESP)
{
ULng32 partCnt = (ULng32)getCountOfPartitions();
NodeMap *myNodeMap = NULL;
if (req->isRequirementFullySpecified() &&
(CmpCommon::getDefault(COMP_BOOL_87) != DF_ON) && getNodeMap() &&
(getNodeMap()->getNumEntries() == partCnt)) {
// we are a copy of a partitioning function that realizes a fully specified
// partitioning requirement. So, do nothing. our nodemap is good enough.
return;
} else if ((CmpCommon::getDefault(COMP_BOOL_87) != DF_ON) &&
(partCnt == (ULng32)childPF->getCountOfPartitions()) &&
childPF->getNodeMap() &&
(childPF->getNodeMap()->getNumEntries() == partCnt)) {
// we are a copy of a partitioning function that realizes a fuzzy
// partitioning requirement. childPF is the partitioning function of
// the synthesized physical property of the child of an Exchange. So,
// our child's partitioning function's nodemap is good enough.
myNodeMap = childPF->getNodeMap()->copy();
} else {
// only as a last resort do we synthesize a new node map
// Synthesize a nodemap based on the nodemap of the child and the
// desired number of ESPs. Using synthesizeLogicalMap() assumes
// that the lower and upper ESPs are associated via grouping. This
// assumption is not valid when considering the communication
// patterns between the upper and lower ESPs, but this assumption
// will lead to a reasonable nodemap for the upper ESPs.
myNodeMap = ((NodeMap*)childPF->getNodeMap())
->synthesizeLogicalMap(partCnt, forESP);
}
CMPASSERT(myNodeMap);
for(CollIndex i = 0; i < partCnt; i++) {
myNodeMap->setPartitionState(i, NodeMapEntry::ACTIVE);
}
CMPASSERT(myNodeMap->getNumActivePartitions() == partCnt);
replaceNodeMap(myNodeMap);
}
// -----------------------------------------------------------------------
// PartitioningFunction::replaceNodeMap()
// Replace node map with a newly specified node map.
// -----------------------------------------------------------------------
void
PartitioningFunction::replaceNodeMap(NodeMap* nodeMap)
{
if (nodeMap_ != NULL)
{
#ifndef NDEBUG
//assertion for NATable caching.
//assert if this object is not on the system heap i.e. (collHeap()!=0)
//and if the heap is not the statementHeap.
//The assertion would happen if this object is on a NATable heap,
//since we should not be changing the partitionins function associated
//with a cached NATable object
if((collHeap()) && (collHeap() != CmpCommon::statementHeap()))
CMPASSERT(FALSE);
#endif //NDEBUG
//Fix for solution 10-040120-2524
//Unconditionally deleting the nodeMap_
//was causing problems as the object pointed
//to by nodeMap_ was being used by other guys also.
//
//delete the object if is on the system heap.
//If it is on the the statement heap then it will
//be delete when the statement heap is deleted.
if(!nodeMap_->collHeap())delete nodeMap_;
}
nodeMap_ = nodeMap;
} // PartitioningFunction::replaceNodeMap()
// -----------------------------------------------------------------------
// PartitioningFunction::copy()
// Virtual copy constructor returns a copy of myself.
// -----------------------------------------------------------------------
PartitioningFunction* PartitioningFunction::copy() const
{
// illegal to call copy() of the base class
CMPABORT;
return NULL;
}
// -----------------------------------------------------------------------
// PartitioningFunction::normalizePartitioningKeys()
// Rewrite the partitioning keys of the partitioning function in
// terms of the VEGReference for the VEG to which the partitioning
// key column belongs.
// -----------------------------------------------------------------------
void PartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
partitioningKeyColumns_.normalizeNode(normWARef);
partitioningKeyPredicates_.normalizeNode(normWARef);
if(partitioningExpression_)
partitioningExpression_ =
partitioningExpression_->normalizeNode(normWARef);
if(partitionSelectionExpr_)
partitionSelectionExpr_ =
partitionSelectionExpr_->normalizeNode(normWARef);
} // PartitioningFunction::normalizePartitioningKeys
// --------------------------------------------------------------------
// A method that is used by optimizer for comparing partitioning
// function with the random number partitioning function i.e. it only
// compares the partitioning func type and the number of partitions.
// It does not compare partitioning key.
// --------------------------------------------------------------------
NABoolean
PartitioningFunction::isKnownReplicaPartFunc() const
{
return
(
(isASkewedDataPartitioningFunction() &&
((SkewedDataPartitioningFunction*)this)->getSkewProperty().isBroadcasted())
||
isAReplicateViaBroadcastPartitioningFunction()
||
isAReplicateNoBroadcastPartitioningFunction());
}
COMPARE_RESULT
PartitioningFunction::comparePartFuncsForUnion
(const PartitioningFunction &other) const
{
NABoolean myRepPartFunc = isKnownReplicaPartFunc();
NABoolean otherRepPartFunc = other.isKnownReplicaPartFunc();
if ( myRepPartFunc || otherRepPartFunc ) return INCOMPATIBLE;
if (getCountOfPartitions() == other.getCountOfPartitions())
return SAME;
return INCOMPATIBLE;
} // PartitioningFunction::comparePartFuncsForUnion
// -----------------------------------------------------------------------
// PartitioningFunction::comparePartFuncToFunc()
// Partitioning function comparison method for hash, round robin, replication,
// and single_partition partitioning functions i.e. everything except regular
// range partitioning functions.
// "Other" must be a partitioning function.
// -----------------------------------------------------------------------
COMPARE_RESULT
PartitioningFunction::comparePartFuncToFunc
(const PartitioningFunction &other) const
{
if ((getPartitioningFunctionType() == other.getPartitioningFunctionType()) AND
(getCountOfPartitions() == other.getCountOfPartitions()) AND
(getPartitioningKey() == other.getPartitioningKey())
)
return SAME;
else
return INCOMPATIBLE;
} // PartitioningFunction::comparePartFuncToFunc
static inline
PartitioningFunction* getPhysPartFunc(const PartitioningFunction* x)
{
PartitioningFunction* phys = 0;
if ( x -> isALogPhysPartitioningFunction() ) {
phys = x -> castToLogPhysPartitioningFunction() ->
getPhysPartitioningFunction();
}
return phys;
}
NABoolean PartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
// By default we are not smart and take no risk. However, two
// partitioning functions that are equal are always a grouping of
// each other.
return (comparePartFuncToFunc(other) == SAME);
}
PartitioningRequirement* PartitioningFunction::makePartitioningRequirement()
{
// Redefine PartitioningFunction::makePartitioningRequirement()
CMPABORT;
return NULL;
}
// -----------------------------------------------------------------------
// PartitioningFunction::scaleNumberOfPartitions
// -----------------------------------------------------------------------
PartitioningFunction * PartitioningFunction::scaleNumberOfPartitions(
Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
// if we come here this means that the derived class did not
// override scaleNumberOfPartitions(). In other words, the derived
// class doesn't know how to change its number of partitions.
// Therefore, ignore the suggestion and return the current number
// of partitions, unless we scale down to 1.
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
NodeMap* newNodeMap =
new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
replaceNodeMap(newNodeMap);
}
suggestedNewNumberOfPartitions = getCountOfPartitions();
return this;
}
// -----------------------------------------------------------------------
// PartitioningFunction::copyAndRemap()
// -----------------------------------------------------------------------
PartitioningFunction*
PartitioningFunction::copyAndRemap
(ValueIdMap& map, NABoolean mapItUp) const
{
if (getPartitioningKey().entries() == 0)
return (PartitioningFunction*)this;
PartitioningFunction* newPartFunc = copy(); // invoke virtual copy constructor
newPartFunc->remapIt(this, map, mapItUp);
return newPartFunc;
} // PartitioningFunction::copyAndRemap()
// -----------------------------------------------------------------------
// PartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void PartitioningFunction::remapIt
(const PartitioningFunction* opf,
ValueIdMap& map, NABoolean mapItUp)
{
// Clear because rewrite insists on it being so.
partitioningKeyColumns_.clear();
partitioningKeyPredicates_.clear();
partitionInputValues_.clear();
partitionInputValuesLayout_.clear();
if (mapItUp)
{
map.rewriteValueIdSetUp(partitioningKeyColumns_,
opf->partitioningKeyColumns_);
map.rewriteValueIdSetUp(partitioningKeyPredicates_,
opf->partitioningKeyPredicates_);
// Note the pivs don't need mapping, they just need to be copied.
map.rewriteValueIdSetUp(partitionInputValues_,
opf->partitionInputValues_);
map.rewriteValueIdListUp(partitionInputValuesLayout_,
opf->partitionInputValuesLayout_);
}
else
{
map.rewriteValueIdSetDown(opf->partitioningKeyColumns_,
partitioningKeyColumns_);
map.rewriteValueIdSetDown(opf->partitioningKeyPredicates_,
partitioningKeyPredicates_);
// Note the pivs don't need mapping, they just need to be copied.
map.rewriteValueIdSetDown(opf->partitionInputValues_,
partitionInputValues_);
map.rewriteValueIdListDown(opf->partitionInputValuesLayout_,
partitionInputValuesLayout_);
}
if (partitioningExpression_)
partitioningExpression_ = partitioningExpression_->
mapAndRewrite(map,NOT mapItUp).getItemExpr();
} // PartitioningFunction::remapIt()
// -----------------------------------------------------------------------
// PartitioningFunction::shouldUseSynchronousAccess()
// -----------------------------------------------------------------------
NABoolean PartitioningFunction::shouldUseSynchronousAccess(
const ReqdPhysicalProperty* rpp,
const EstLogPropSharedPtr& inputLogProp,
GroupAttributes* ga) const
{
// This default implementation handles all partitioning functions
// except range and single partition partitioning functions.
// Synchronous access only makes sense for range partitioned tables.
// So, unless the user is trying to force synchronous access
// (by disabling asynchronous access), then return FALSE.
NABoolean shouldUseSynchronousAccess = FALSE;
// NOTE: we cannot force synchronous access for non-range partitioned
// tables if there is a required logical order and/or arrangement!
// Doing so could lead to incorrect results! So, if the user is trying to
// force synchronous access, then they are out of luck!
if (NOT rpp->getLogicalOrderOrArrangementFlag())
{
const LogicalPartitioningRequirement *lpr =
rpp->getLogicalPartRequirement();
// Don't do synchronous access if the user is forcing a PAPA node
// and is not forcing the number of PAs
if ((lpr != NULL) AND lpr->getMustUsePapa() AND
(lpr->getNumClientsReq() == ANY_NUMBER_OF_PARTITIONS))
return FALSE;
// Get the value from the defaults table that specifies whether
// asynchronous access is ok. If it is not, then we need to force
// synchronous access. Also force synchronous access if the user
// forced it via C.Q. shape.
// See if the user is trying to force synchronous access.
if ((CmpCommon::getDefault(ATTEMPT_ASYNCHRONOUS_ACCESS) == DF_OFF) OR
((lpr != NULL) AND
(lpr->getNumClientsReq() != ANY_NUMBER_OF_PARTITIONS) AND
(lpr->getNumClientsReq() < getCountOfPartitions())))
{
shouldUseSynchronousAccess = TRUE;
}
}
return shouldUseSynchronousAccess;
}
// -----------------------------------------------------------------------
// Virtual functions that must be redefined for derived classes.
// -----------------------------------------------------------------------
Lng32 PartitioningFunction::getCountOfPartitions() const
{
// Redefine PartitioningFunction::getCountOfPartitions()
CMPABORT;
return 1;
}
NABoolean PartitioningFunction::canProducePartitioningKeyPredicates() const
{
return TRUE; // the most common case is TRUE
}
const ValueIdSet& PartitioningFunction::getPartitioningKeyPredicates() const
{
CMPASSERT(partKeyPredsCreated_);
return partitioningKeyPredicates_;
}
const ValueIdSet& PartitioningFunction::getPartitionInputValues() const
{
CMPASSERT(partKeyPredsCreated_);
return partitionInputValues_;
}
const ValueIdList& PartitioningFunction::getPartitionInputValuesLayout() const
{
CMPASSERT(partKeyPredsCreated_);
return partitionInputValuesLayout_;
}
void PartitioningFunction::createPartitioningKeyPredicates()
{
// Redefine PartitioningFunction::createPartitioningKeyPredicates()
CMPABORT;
}
void PartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Redefine PartitioningFunction::replacePivs()
CMPABORT;
}
PartitioningFunction*
PartitioningFunction::createPartitioningFunctionForIndexDesc
(IndexDesc *idesc) const
{
// Redefine PartitioningFunction::createPartitioningFunctionForIndexDesc()
CMPABORT;
return NULL;
}
ItemExpr* PartitioningFunction::createPartitioningExpression()
{
// Redefine PartitioningFunction::createPartitioningExpression()
CMPABORT;
return NULL;
}
void PartitioningFunction::createPartSelectionExprFromSearchKey(
const ValueId beginPartSelId,
const ValueId endPartSelId,
ValueIdList &partSelectionValIds) const
{
partSelectionValIds.insert(beginPartSelId);
partSelectionValIds.insert(endPartSelId);
}
// -----------------------------------------------------------------------
// PartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void PartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
ValueIdSet noExternalInputs;
partitioningKeyColumns_.replaceVEGExpressions
(availableValues, noExternalInputs,
FALSE, NULL, TRUE);
if (partitioningExpression_)
{
partitioningExpression_ = partitioningExpression_->replaceVEGExpressions(
availableValues,
noExternalInputs,
FALSE,
NULL,
TRUE); // deep copy because copy constructor made a shallow copy
partitioningExpression_->synthTypeAndValueId(TRUE);
}
} // PartitioningFunction::preCodeGen()
const NAString PartitioningFunction::getText() const
{
CMPABORT;
return NAString("some type of partitioning function",
CmpCommon::statementHeap());
}
void PartitioningFunction::setupForStatement()
{
if(setupForStatement_)
return;
setupForStatement_ = TRUE;
resetAfterStatement_ = FALSE;
}
void PartitioningFunction::resetAfterStatement()
{
if(resetAfterStatement_)
return;
partitioningKeyColumns_.clear();
partitioningKeyPredicates_.clear();
partitionInputValues_.clear();
partitionInputValuesLayout_.clear();
partitionSelectionExprInputs_.clear();
partKeyPredsCreated_=FALSE;
assignPartition_ = FALSE;
partitioningExpression_=NULL;
dataConversionErrorFlag_=NULL;
resetAfterStatement_ = TRUE;
setupForStatement_ = FALSE;
}
// -----------------------------------------------------------------------
// Method for debugging
// -----------------------------------------------------------------------
void PartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
BUMP_INDENT(indent);
fprintf(ofd,"%s--Partitioning-Function----------------\n",NEW_INDENT);
fprintf(ofd,"%s%s (%d)\n",
NEW_INDENT, title, getPartitioningFunctionType());
fprintf(ofd,"%snumber of partitions (%d)\n",
NEW_INDENT, getCountOfPartitions());
if (NOT partitioningKeyColumns_.isEmpty())
partitioningKeyColumns_.print(ofd,
NEW_INDENT,
"Partitioning Key Columns");
if(partKeyPredsCreated())
fprintf(ofd,"%sPartition Key Predicates Created\n",NEW_INDENT);
if (NOT partitioningKeyPredicates_.isEmpty())
partitioningKeyPredicates_.print(ofd,
NEW_INDENT,
"Partitioning Key Predicates");
if (NOT partitionInputValues_.isEmpty())
partitionInputValues_.print(ofd, NEW_INDENT,
"Partition Input Values");
if (NOT partitionInputValuesLayout_.isEmpty())
partitionInputValuesLayout_.print(ofd, NEW_INDENT,
"Partition Input Values Layout");
if (getPartitioningExpression()) {
NAString partExpr("Partitioning Expression\n", CmpCommon::statementHeap());
getPartitioningExpression()->
unparse(partExpr, DEFAULT_PHASE, EXPLAIN_FORMAT);
fprintf(ofd,partExpr);
}
if(assignPartition())
fprintf(ofd,"%sDoes Partition Assignment\n",NEW_INDENT);
fprintf(ofd,"%s--Partitioning-Function----------------\n",NEW_INDENT);
} // PartitioningFunction::print()
void PartitioningFunction::display() const { print(); }
// ***********************************************************************
// SinglePartitionPartitioningFunction
// ***********************************************************************
SinglePartitionPartitioningFunction::~SinglePartitionPartitioningFunction() {}
Lng32 SinglePartitionPartitioningFunction::getCountOfPartitions() const
{
return EXACTLY_ONE_PARTITION;
}
PartitioningRequirement*
SinglePartitionPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireExactlyOnePartition(this);
}
const SinglePartitionPartitioningFunction*
SinglePartitionPartitioningFunction::castToSinglePartitionPartitioningFunction()
const
{
return this;
}
PartitioningFunction*
SinglePartitionPartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap())
SinglePartitionPartitioningFunction(*this, CmpCommon::statementHeap());
}
void SinglePartitionPartitioningFunction::createPartitioningKeyPredicates()
{
// do nothing, there aren't any partitioning key preds for a single
// partition
storePartitioningKeyPredicates(ValueIdSet());
}
void SinglePartitionPartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// do nothing, there aren't any pivs for a single
// partition
}
// -----------------------------------------------------------------------
// SinglePartitionPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr*
SinglePartitionPartitioningFunction::createPartitioningExpression()
{
if (getExpression()) // already constructed?
return getExpression(); // reuse it!
// ---------------------------------------------------------------------
// Construct an expression of the form: ConstValue(0)
// Allocate ValueIds and type synthesize this partitioning function.
// ---------------------------------------------------------------------
ItemExpr * partFunc = new (CmpCommon::statementHeap())
Cast(new (CmpCommon::statementHeap()) SystemLiteral(0),
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE));
partFunc->synthTypeAndValueId();
storeExpression(partFunc);
return partFunc;
}
NABoolean SinglePartitionPartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = other.getCountOfPartitions();
// A single partition is a grouping of about anything. There
// is only one exception: the replication partitioning "function".
// Combining the partitions of a replication partitioning
// function would get us multiple copies of the data which is
// different from the single partition.
return ((other.castToReplicateViaBroadcastPartitioningFunction() == NULL) AND
(other.castToReplicateNoBroadcastPartitioningFunction() == NULL));
}
// -----------------------------------------------------------------------
// SinglePartitionPartitioningFunction::shouldUseSynchronousAccess()
// -----------------------------------------------------------------------
NABoolean SinglePartitionPartitioningFunction::shouldUseSynchronousAccess(
const ReqdPhysicalProperty* rpp,
const EstLogPropSharedPtr& inputLogProp,
GroupAttributes* ga) const
{
// An unpartitioned table does not need to be accessed synchronously
// (or asynchronously, for that matter). So, always return FALSE.
return FALSE;
}
// -----------------------------------------------------------------------
// Push down checking: testing condition C1.2: all tables are not
// partitioned.
// -----------------------------------------------------------------------
NABoolean
SinglePartitionPartitioningFunction::partFuncAndFuncPushDownCompatible(
const PartitioningFunction& other) const
{
if ( other.castToSinglePartitionPartitioningFunction() == 0 )
return FALSE;
return TRUE;
}
const NAString SinglePartitionPartitioningFunction::getText() const
{
return "exactly 1 partition";
}
void SinglePartitionPartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent,
"SinglePartitionPartitioningFunction");
}
// ***********************************************************************
// ReplicateViaBroadcastPartitioningFunction
// ***********************************************************************
ReplicateViaBroadcastPartitioningFunction::
~ReplicateViaBroadcastPartitioningFunction()
{}
Lng32 ReplicateViaBroadcastPartitioningFunction::getCountOfPartitions() const
{ return numberOfPartitions_; }
const ReplicateViaBroadcastPartitioningFunction*
ReplicateViaBroadcastPartitioningFunction::
castToReplicateViaBroadcastPartitioningFunction() const
{ return this; }
PartitioningFunction *
ReplicateViaBroadcastPartitioningFunction::scaleNumberOfPartitions(
Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
numberOfPartitions_ = suggestedNewNumberOfPartitions;
return this;
}
PartitioningRequirement*
ReplicateViaBroadcastPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireReplicateViaBroadcast(this);
}
PartitioningFunction*
ReplicateViaBroadcastPartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap())
ReplicateViaBroadcastPartitioningFunction(*this);
}
void ReplicateViaBroadcastPartitioningFunction::
createPartitioningKeyPredicates()
{
// Do nothing, the partitioning key preds of a replication "function"
// are the empty set. By doing nothing we will return all data for
// each partition.
storePartitioningKeyPredicates(ValueIdSet());
}
void ReplicateViaBroadcastPartitioningFunction::
replacePivs(const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Do nothing, the pivs of a replication "function"
// are the empty set.
}
ItemExpr*
ReplicateViaBroadcastPartitioningFunction::createPartitioningExpression()
{
return NULL;
}
NABoolean ReplicateViaBroadcastPartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
// A replication partitioning function is never a grouping of any
// other partitioning function than itself. See the definition of a
// grouping: it is created by merging two partitions of "other"
// zero or more times.
return (comparePartFuncToFunc(other) == SAME);
}
const NAString ReplicateViaBroadcastPartitioningFunction::getText() const
{
char ntimes[20];
sprintf(ntimes,"%d",numberOfPartitions_);
return NAString("broadcast ") + ntimes + " times";
}
void ReplicateViaBroadcastPartitioningFunction::
print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::
print(ofd, indent, "ReplicateViaBroadcastPartitioningFunction");
}
// ***********************************************************************
// ReplicateNoBroadcastPartitioningFunction
// ***********************************************************************
ReplicateNoBroadcastPartitioningFunction::
~ReplicateNoBroadcastPartitioningFunction()
{}
Lng32 ReplicateNoBroadcastPartitioningFunction::getCountOfPartitions() const
{ return numberOfPartitions_; }
const ReplicateNoBroadcastPartitioningFunction*
ReplicateNoBroadcastPartitioningFunction::
castToReplicateNoBroadcastPartitioningFunction() const
{ return this; }
PartitioningFunction *
ReplicateNoBroadcastPartitioningFunction::scaleNumberOfPartitions(
Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
numberOfPartitions_ = suggestedNewNumberOfPartitions;
return this;
}
PartitioningRequirement*
ReplicateNoBroadcastPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireReplicateNoBroadcast(this);
}
PartitioningFunction*
ReplicateNoBroadcastPartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap())
ReplicateNoBroadcastPartitioningFunction(*this);
}
void ReplicateNoBroadcastPartitioningFunction::
createPartitioningKeyPredicates()
{
// Do nothing, the partitioning key preds of a replication "function"
// are the empty set. By doing nothing we will return all data for
// each partition.
storePartitioningKeyPredicates(ValueIdSet());
}
void ReplicateNoBroadcastPartitioningFunction::
replacePivs(const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Do nothing, the pivs of a replication "function"
// are the empty set.
}
ItemExpr*
ReplicateNoBroadcastPartitioningFunction::createPartitioningExpression()
{
return NULL;
}
NABoolean ReplicateNoBroadcastPartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
// A replication partitioning function is never a grouping of any
// other partitioning function than itself. See the definition of a
// grouping: it is created by merging two partitions of "other"
// zero or more times.
return (comparePartFuncToFunc(other) == SAME);
}
const NAString ReplicateNoBroadcastPartitioningFunction::getText() const
{
char ntimes[20];
sprintf(ntimes,"%d",numberOfPartitions_);
return NAString("replicate no broadcast ") + ntimes + " times";
}
void ReplicateNoBroadcastPartitioningFunction::
print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::
print(ofd, indent, "ReplicateNoBroadcastPartitioningFunction");
}
// ***********************************************************************
// HashPartitioningFunction
// ***********************************************************************
HashPartitioningFunction::~HashPartitioningFunction() {}
Lng32 HashPartitioningFunction::getCountOfPartitions() const
{ return numberOfHashPartitions_; }
const HashPartitioningFunction*
HashPartitioningFunction::castToHashPartitioningFunction() const
{ return this; }
PartitioningRequirement*
HashPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireHash(this);
}
PartitioningFunction*
HashPartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap())
HashPartitioningFunction(*this, CmpCommon::statementHeap());
}
// -----------------------------------------------------------------------
// PartitioningFunction::createBetweenPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void PartitioningFunction::createBetweenPartitioningKeyPredicates(
const char * pivLoName,
const char * pivHiName,
ItemExpr * partNumExpr,
NABoolean useHash2Split)
{
if (NOT partKeyPredsCreated() || partNumExpr != NULL)
{
ItemExpr * rootPtr;
ItemExpr * loPart;
ItemExpr * hiPart;
ValueIdSet setOfPartKeyPredicates;
// by default we use the partitioning function's expression
// to compute the partition number
if (partNumExpr == NULL)
partNumExpr = createPartitioningExpression();
// compute part input values if not already done so
if (partitionInputValues_.isEmpty())
{
ValueIdList partInputValues;
// must specify PIV names if they need to be created
CMPASSERT(pivLoName && pivHiName);
// the partition input values are two integer values: lo and hi part #
loPart = new (CmpCommon::statementHeap())
HostVar(pivLoName,
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE),
TRUE);
hiPart = new (CmpCommon::statementHeap())
HostVar(pivHiName,
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE),
TRUE);
loPart->synthTypeAndValueId();
hiPart->synthTypeAndValueId();
partInputValues.insert(loPart->getValueId());
partInputValues.insert(hiPart->getValueId());
storePartitionInputValues(partInputValues);
}
else
{
loPart = getPartitionInputValuesLayout()[0].getItemExpr();
hiPart = getPartitionInputValuesLayout()[1].getItemExpr();
}
// -----------------------------------------------------------------
// The partitioning key predicate for a hash partitioning selects
// a range of hash partitions with the following predicate:
// partNumExpr >= :loPart AND partNumExpr < :hiPart
// where the hash function is the partitioning function generated
// by the createPartitioningExpression() method and the host variables
// are generated here as partition input values.
// -----------------------------------------------------------------
if (useHash2Split)
{
// For a HASH2 function, the PIVs are expressed as min and max
// hash values to be retrieved. These need to be converted to
// partition numbers when we use them here. Note that we need to
// use the original number of HASH2 partitions here. For example,
// if we have a table salted with 64 salt buckets and we use
// 8 ESPs (8 partitions), the predicate needs to select 8 different
// salt values (original partitions) for each ESP.
CMPASSERT(isAHash2PartitioningFunction());
UInt32 numOfOrigPartns = castToHash2PartitioningFunction()->
getCountOfOrigHashPartitions();
char numPartsString[30];
snprintf(numPartsString,sizeof(numPartsString),"%d",numOfOrigPartns);
NAString numPartsLiteral(numPartsString);
ConstValue *numPartns = new (CmpCommon::statementHeap())
ConstValue(new (CmpCommon::statementHeap())
SQLInt(CmpCommon::statementHeap(), FALSE,
FALSE),
(void *) &numOfOrigPartns,
(Lng32) sizeof(numOfOrigPartns),
&numPartsLiteral,
CmpCommon::statementHeap());
loPart = new (CmpCommon::statementHeap())
Hash2Distrib(loPart, numPartns);
hiPart = new (CmpCommon::statementHeap())
Hash2Distrib(hiPart, numPartns);
}
// lower bound
rootPtr = new (CmpCommon::statementHeap())
BiRelat(ITM_GREATER_EQ,
partNumExpr,
loPart,
TRUE);
rootPtr->synthTypeAndValueId();
setOfPartKeyPredicates += rootPtr->getValueId();
// upper bound
rootPtr = new (CmpCommon::statementHeap())
BiRelat((useHash2Split ? ITM_LESS_EQ : ITM_LESS),
partNumExpr,
hiPart,
TRUE);
rootPtr->synthTypeAndValueId();
setOfPartKeyPredicates += rootPtr->getValueId();
// Store the set of key predicates in the partitioning attributes.
storePartitioningKeyPredicates(setOfPartKeyPredicates);
}
} // PartitioningFunction::createBetweenPartitioningKeyPredicates()
void HashPartitioningFunction::createPartitioningKeyPredicates()
{
createBetweenPartitioningKeyPredicates(
"_sys_HostVarLoHashPart",
"_sys_HostVarHiHashPart");
}
// -----------------------------------------------------------------------
// HashPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void HashPartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Overwrite the old pivs, part key preds, and part expr. with the new ones.
storePartitionInputValues(newPivs);
storePartitioningKeyPredicates(newPartKeyPreds);
} // HashPartitioningFunction::replacePivs()
COMPARE_RESULT
HashPartitioningFunction::comparePartFuncToFunc
(const PartitioningFunction &other) const
{
COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);
if (c != SAME)
return INCOMPATIBLE;
return comparePartKeyToKey(other);
}
COMPARE_RESULT
HashPartitioningFunction::comparePartKeyToKey
(const PartitioningFunction &other) const
{
if (getPartitioningFunctionType() != other.getPartitioningFunctionType())
return INCOMPATIBLE;
const HashPartitioningFunction &oth =
(const HashPartitioningFunction &) other;
if (keyColumnList_.entries() != oth.keyColumnList_.entries())
return INCOMPATIBLE;
// compare the key columns and their order
for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
{
if (keyColumnList_[i] != oth.keyColumnList_[i])
return INCOMPATIBLE;
if ( NOT (originalKeyColumnList_[i].getType() ==
oth.originalKeyColumnList_[i].getType() )
)
return INCOMPATIBLE;
}
return SAME;
}
// -----------------------------------------------------------------------
// HashPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
// Right now we assume that the split function of a hash partitioning
// function is such that no two functions are a grouping of each other.
// The only exception is identity, of course. We use the base class'
// implementation.
//
// With more knowledge about the split function (e.g. by knowing it's
// a simple modulus), one could guarantee that a 4-way hash-partitioning
// scheme is actually a grouping of an 8-way scheme. This is not really
// necessary and therefore not done here.
// -----------------------------------------------------------------------
// HashPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* HashPartitioningFunction::createPartitioningExpression()
{
if (getExpression()) // already constructed?
return getExpression(); // reuse it!
// ---------------------------------------------------------------------
// Construct an expression of the form:
//
// Modulus
// / \
// / \
// Hash <count of partitions>
// |
// (narrow(key1), narrow(key2), ..., narrow(keyN))
//
// Allocate ValueIds and type synthesize this partitioning function.
// ---------------------------------------------------------------------
ValueIdList typedKeyCols;
for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
{
// cast the key column to the exact type of the original key column
const NAType &oType = originalKeyColumnList_[i].getType();
ItemExpr *c = getCastedItemExpre(keyColumnList_[i].getItemExpr(), oType,
CmpCommon::statementHeap());
c->synthTypeAndValueId();
typedKeyCols.insert(c->getValueId());
}
ItemExpr * partFunc =
new (CmpCommon::statementHeap())
Modulus(
buildHashingExpressionForExpr(
typedKeyCols.rebuildExprTree(ITM_ITEM_LIST)
),
// Hash(typedKeyCols.rebuildExprTree(ITM_ITEM_LIST)),
new (CmpCommon::statementHeap())
SystemLiteral(getCountOfPartitions()));
// once we support late binding, and/or changing the number of target
// partitions at run time, the number of partitions will have
// to become some sort of an input value $$$$
partFunc->synthTypeAndValueId();
storeExpression(partFunc);
return partFunc;
} // HashPartitioningFunction::createPartitioningExpression()
ItemExpr *
HashPartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) const
{
return new (CmpCommon::statementHeap()) Hash(expr);
}
UInt32 HashPartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len)
{
// Directly call the implementation function to compute the hash. NULL
// values and VARCHAR data types are not handled.
return FastHash(data, len);
}
ItemExpr * HashPartitioningFunction::getHashingExpression() const
{
ItemExpr* hashExpr = NULL;
ItemExpr* partExpr = getExpression();
if ( partExpr ) {
hashExpr = partExpr->child(0);
CMPASSERT(hashExpr AND
hashExpr->getOperatorType()== ITM_HASH);
}
return hashExpr;
}
// -----------------------------------------------------------------------
// HashPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void HashPartitioningFunction::remapIt
(const PartitioningFunction* opf,
ValueIdMap& map, NABoolean mapItUp)
{
PartitioningFunction::remapIt(opf, map,mapItUp);
// If we have arrived here, the original partitioning function (*opf)
// MUST be a HashPartitioningFunction().
CMPASSERT(opf->castToHashPartitioningFunction());
// Clear because rewrite insists on it being so.
keyColumnList_.clear();
if (mapItUp)
{
map.rewriteValueIdListUp(
keyColumnList_,
opf->castToHashPartitioningFunction()->keyColumnList_);
}
else
{
map.rewriteValueIdListDown(
opf->castToHashPartitioningFunction()->keyColumnList_,
keyColumnList_);
}
// do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL
} // HashPartitioningFunction::remapIt()
PartitioningFunction * HashPartitioningFunction::scaleNumberOfPartitions(
Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
NodeMap* newNodeMap =
new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
replaceNodeMap(newNodeMap);
}
numberOfHashPartitions_ = suggestedNewNumberOfPartitions;
return this;
}
// -----------------------------------------------------------------------
// HashPartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void HashPartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
ValueIdSet noExternalInputs;
PartitioningFunction::preCodeGen(availableValues);
keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs,
FALSE, NULL, TRUE);
} // HashPartitioningFunction::preCodeGen()
// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString HashPartitioningFunction::getTextImp(const char* hashType) const
{
char nparts[20];
NAString result(hashType, CmpCommon::statementHeap());
result.append(" partitioned ");
sprintf(nparts,"%d",numberOfHashPartitions_);
result += nparts;
result += " ways on (";
//getPartitioningKey().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
result += ")";
if (result.contains("randomNum"))
result = NAString("round robin partitioned");
return result;
}
const NAString HashPartitioningFunction::getText() const
{
return getTextImp("hash");
}
void HashPartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent, "HashPartitioningFunction");
} // HashPartitioningFunction::print()
// Return an expression casting an encoded skew value to oType.
static
ItemExpr*
getCastedSkewValueExpr(const EncodedValue& ev, const NAType& oType, CollHeap* heap)
{
double x = ev.getDblValue();
return new (heap) Cast(
new (heap) ConstValue(
new (heap) SQLDoublePrecision(heap, FALSE /* no SQL NULL*/),
(char*)&x, sizeof(x)
),
&oType
);
}
// ***********************************************************************
// TableHashPartitioningFunction
// - The externalized Hash Partitioning functions.
// ***********************************************************************
// -----------------------------------------------------------------------
// TableHashPartitioningFunction Destructor
// -----------------------------------------------------------------------
TableHashPartitioningFunction::~TableHashPartitioningFunction() {}
void TableHashPartitioningFunction::setupForStatement()
{
if(setupForStatement_)
return;
PartitioningFunction::setupForStatement();
setupForStatement_ = TRUE;
resetAfterStatement_ = FALSE;
}
void TableHashPartitioningFunction::resetAfterStatement()
{
if(resetAfterStatement_)
return;
PartitioningFunction::resetAfterStatement();
keyColumnList_.clear();
originalKeyColumnList_.clear();
numberOfPartitions_ = numberOfOrigHashPartitions_;
setupForStatement_ = FALSE;
resetAfterStatement_ = TRUE;
}
// -----------------------------------------------------------------------
// TableHashPartitioningFunction Safe down cast.
// -----------------------------------------------------------------------
const TableHashPartitioningFunction*
TableHashPartitioningFunction::castToTableHashPartitioningFunction() const
{
return this;
}
Lng32
TableHashPartitioningFunction::getCountOfPartitions() const
{
return numberOfPartitions_;
}
PartitioningRequirement*
TableHashPartitioningFunction::makePartitioningRequirement()
{
// Redefine PartitioningFunction::makePartitioningRequirement()
CMPABORT;
return NULL;
}
// -----------------------------------------------------------------------
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::createPartitionSelectionExprInputs()
{
if (partitionSelectionExprInputs().entries() != 0)
return;
CollHeap *heap = CmpCommon::statementHeap();
// Use a host var to provide access to numParts, this will be
// mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
//
char hvFabricatedName[50];
sprintf(hvFabricatedName, "_sys_hostVarNumParts_%p", this);
ItemExpr *numParts = new (heap)
HostVar(hvFabricatedName,
// int not null
new (heap) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
// is system-supplied
TRUE);
numParts->synthTypeAndValueId();
// Use a host var to provide access to partNum, this will be
// mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
//
sprintf(hvFabricatedName, "_sys_hostVarPartNo_%p", this);
ItemExpr *partNum = new (heap)
HostVar(hvFabricatedName,
// int not null
new (heap) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
// is system-supplied
TRUE);
partNum->synthTypeAndValueId();
// Record these hostvars as the inputs to the partitionSelectionExpr.
//
partitionSelectionExprInputs().insert(partNum->getValueId());
partitionSelectionExprInputs().insert(numParts->getValueId());
}
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::normalizePartitioningKeys()
// Rewrite the partitioning keys of the partitioning function in
// terms of the VEGReference for the VEG to which the partitioning
// key column belongs.
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
PartitioningFunction::normalizePartitioningKeys(normWARef);
keyColumnList_.normalizeNode(normWARef);
// don't normalize original key col list, avoid VEGies which could
// cause data type changes.
}
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::createPartitioningKeyPredicates()
// Since hash dist partitioning can not create any Key Predicates,
// this method simply creates the partition input values (PIVs).
// -----------------------------------------------------------------------
void TableHashPartitioningFunction::createPartitioningKeyPredicates()
{
if (NOT partKeyPredsCreated())
createBetweenPartitioningKeyPredicates("_sys_HostVarLoHashPart",
"_sys_HostVarHiHashPart");
// Create the partition selection input values (needed by hash2).
createPartitionSelectionExprInputs();
} // TableHashPartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
// For salted tables, since we store the hash partition value, we
// can generate a range predicate for the _SALT_ column
// -----------------------------------------------------------------------
void TableHashPartitioningFunction::createPartitioningKeyPredicatesForSaltedTable(
ValueId saltCol)
{
// For now we only allow this call after calling the regular
// createPartitioningKeyPredicates() method
CMPASSERT(partKeyPredsCreated());
// this allows us to specify NULL for the names here
createBetweenPartitioningKeyPredicates(NULL,
NULL,
saltCol.getItemExpr(),
isAHash2PartitioningFunction());
// and we don't need to call this
// createPartitionSelectionExprInputs();
} // TableHashPartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void TableHashPartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Overwrite the old pivs, part key preds, and part expr. with the new ones.
storePartitionInputValues(newPivs);
storePartitioningKeyPredicates(newPartKeyPreds);
} // TableHashPartitioningFunction::replacePivs()
PartitioningFunction*
TableHashPartitioningFunction::createPartitioningFunctionForIndexDesc
(IndexDesc *idesc) const
{
// Redefine PartitioningFunction::createPartitioningFunctionForIndexDesc()
CMPABORT;
return NULL;
}
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::remapIt(const PartitioningFunction* opf,
ValueIdMap& map,
NABoolean mapItUp)
{
PartitioningFunction::remapIt(opf, map,mapItUp);
// If we have arrived here, the original partitioning function (*opf)
// MUST be a TableHashPartitioningFunction().
//
const TableHashPartitioningFunction *oth =
opf->castToTableHashPartitioningFunction();
CMPASSERT(oth);
// Clear because rewrite insists on it being so.
//
keyColumnList_.clear();
if (mapItUp)
map.rewriteValueIdListUp(keyColumnList_, oth->keyColumnList_);
else
map.rewriteValueIdListDown(oth->keyColumnList_, keyColumnList_);
// do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL
} // TableHashPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
ValueIdSet noExternalInputs;
PartitioningFunction::preCodeGen(availableValues);
keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs,
FALSE, NULL, TRUE);
} // TableHashPartitioningFunction::preCodeGen()
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* TableHashPartitioningFunction::createPartitioningExpression()
{
return TableHashPartitioningFunction::createPartitioningExpressionImp(FALSE);
} // TableHashPartitioningFunction::createPartitioningExpression()
ItemExpr*
TableHashPartitioningFunction::createPartitioningExpressionImp(NABoolean doVarCharCast)
{
// already constructed with the same doVarCharCast argument?
if (getExpression() && doVarCharCast == doVarCharCast_ )
return getExpression(); // reuse it!
doVarCharCast_ = doVarCharCast;
ValueIdList typedKeyCols;
const ValueIdList &keyColumnList = getKeyColumnList();
const ValueIdList &originalKeyColumnList = getOriginalKeyColumnList();
CollHeap *heap = CmpCommon::statementHeap();
for (CollIndex i = 0; i < keyColumnList.entries(); i++)
{
const NAType &oType = originalKeyColumnList[i].getType();
ItemExpr *c;
ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
if (dataConversionErrorFlag == 0)
{
dataConversionErrorFlag =
new (heap) HostVar("_sys_repartConvErrorFlg",
new (heap) SQLInt(heap, TRUE,FALSE),
TRUE);
storeConvErrorExpr(dataConversionErrorFlag);
}
// Note that we always generate a Narrow operator here, even
// if it's not necessary. Because keyColumnList[i] may be a
// VEGReference it may not be possible to determine its
// exact type at this time. We could eliminate some cases of
// Narrow in the preCodeGen phase but this is not done yet.
NAType* finalTypePtr = oType.newCopy(heap);
if ( oType.getTypeQualifier() == NA_CHARACTER_TYPE &&
doVarCharCast == TRUE ) {
ValueId vId = originalKeyColumnList[i];
ValueIdSet vidSet;
CharType *maxCharType = NULL;
switch (vId.getItemExpr()->getOperatorType()) {
case ITM_VEG_PREDICATE:
case ITM_VEG_REFERENCE:
{
vId.getItemExpr()->findAll(ITM_BASECOLUMN, vidSet, TRUE, TRUE);
for (ValueId x=vidSet.init(); vidSet.next(x); vidSet.advance(x))
{
const CharType &ctype = (CharType&)(x.getType());
if ( maxCharType == NULL OR
maxCharType->getNominalSize() < ctype.getNominalSize() OR
(maxCharType->getNominalSize() == ctype.getNominalSize() AND
maxCharType->getStrCharLimit() < ctype.getStrCharLimit())
)
maxCharType = &(CharType&)x.getType() ;
}
finalTypePtr = maxCharType->equivalentVarCharType(heap);
}
break;
case ITM_INDEXCOLUMN:
case ITM_BASECOLUMN:
// For skewbuster+OCR nested join, the partition key column
// is not a VEG or VEGref since the equality predicates have
// been pushed down to the inner table. Here we cast the
// column type to equivalent VarCHAR, regardless of the length
// of the other column in the join predicate. The optimization
// works fine because we just want to trim the trailing
// spaces and then hash the trimmed value.
finalTypePtr = ((CharType&)(vId.getType())).equivalentVarCharType(heap);
break;
default:
break;
}
}
// leave the statement heap to delete the SQLVarChar object, if created
c = new (heap) Narrow(keyColumnList[i].getItemExpr(),
dataConversionErrorFlag,
finalTypePtr,
ITM_NARROW,
FALSE /* reverseDataErrorConversionFlag */,
TRUE /* match child nullability */);
c->synthTypeAndValueId();
typedKeyCols.insert(c->getValueId());
}
ItemExpr * partFunc = buildPartitioningExpression(typedKeyCols);
#if 0
//===================================================================
// Test the classification expression
//===================================================================
ItemExpr* isSkewExpr = createClassificationExpressionForSkewedValues();
if ( isSkewExpr ) {
ValueIdList exprs;
exprs.insert(isSkewExpr->getValueId());
char resultBuf[2000];
ex_expr::exp_return_type evalReturnCode = exprs.evalAtCompileTime
(0, ExpTupleDesc::SQLARK_EXPLODED_FORMAT, resultBuf, 2000);
}
#endif
// once we support late binding, and/or changing the number of target
// partitions at run time, the number of partitions will have
// to become some sort of an input value $$$$
partFunc->synthTypeAndValueId();
storeExpression(partFunc);
#if 0 // test
buildHashListForSkewedValues();
#endif
return partFunc;
} // TableHashPartitioningFunction::createPartitioningExpressionImp()
// -----------------------------------------------------------------------
// TableHashPartitioningFunction::createPartitionSelectionExpr()
// Create the partition selection expression. 'Partition selection'
// means that an expression is used to determine the partition to
// access as opposed to using the File System to determine the range
// of partitions to access based on a set of partitioning key
// predicates. Partition selection is currently used for Hash Dist and
// Round Robin Partitioning. And the File System is used for Range
// Partitioning. If a partitioning selection expression is created,
// it is cached in the data member 'partitionSelectionExpr_' and the
// partition selection inputs are generated and stored in
// 'partitionSelectionExprInputs_'. This method is redefined for
// TableHashPartitioningFunction and RoundRobinPartitioningFunction.
// -----------------------------------------------------------------------
ItemExpr *
TableHashPartitioningFunction::
createPartitionSelectionExpr(const SearchKey *partSearchKey,
const ValueIdSet &availableValues)
{
// If it has already been created, return cached version.
//
if(partitionSelectionExpr())
return partitionSelectionExpr();
// For now, only support a partition selection expression when the
// partition search key is unique (identifies exactly one partition).
//
if(!partSearchKey->isUnique()) {
return NULL;
} else {
CollHeap *heap = CmpCommon::statementHeap();
const ValueIdList &keyColumns = partSearchKey->getKeyColumns();
const ValueIdList &keyValues = partSearchKey->getBeginKeyValues();
ValueIdList newValues;
createPartitionSelectionExprInputs();
const ValueIdList &partSelExprInputs = partitionSelectionExprInputs();
ItemExpr *numParts = partSelExprInputs[1].getItemExpr();
// Construct the list of key values. Make sure that they are cast
// to the exact type of the original key columns since the Hash
// function is sensitive to the type of its inputs. Must use the
// original key columns, since the keyColumnList may have been
// changed.
//
ValueIdList typedKeyCols;
const ValueIdList &originalKeyColumnList = getOriginalKeyColumnList();
for (CollIndex i = 0; i < keyColumns.entries(); i++)
{
// cast the key value to the exact type of the original key column
const NAType &oType = originalKeyColumnList[i].getType();
ItemExpr *c;
ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
if (dataConversionErrorFlag == 0)
{
dataConversionErrorFlag =
new (heap) HostVar("_sys_repartConvErrorFlg",
new (heap) SQLInt(heap, TRUE,FALSE),
TRUE);
storeConvErrorExpr(dataConversionErrorFlag);
}
// Note that we always generate a Narrow operator here, even
// if it's not necessary. Because keyColumnList[i] may be a
// VEGReference it may not be possible to determine its
// exact type at this time. We could eliminate some cases of
// Narrow in the preCodeGen phase but this is not done yet.
//
NAType * finalTypePtr = oType.newCopy(heap);
c = new (heap) Narrow(keyValues[i].getItemExpr(),
dataConversionErrorFlag,
finalTypePtr);
c->synthTypeAndValueId();
typedKeyCols.insert(c->getValueId());
}
// Construct and cache the partitionSelectionExpr for Table Hash.
//
partitionSelectionExpr() =
buildPartitioningSelectionExpr(typedKeyCols, numParts);
// Bind the expression
//
partitionSelectionExpr()->synthTypeAndValueId();
// PreCodeGen the expression (This maybe should go in
// TableHashPartitioningFunction::preCodeGen(), but preCodeGen() is
// typically called before this expression is generated.
//
partitionSelectionExpr()->replaceVEGExpressions(availableValues,
availableValues);
return partitionSelectionExpr();
}
} // TableHashPartitioningFunction::createPartitionSelectionExpr()
// -----------------------------------------------------------------------
// Make a new partSearchKey that indicates that PA_PARTITION_GROUPING
// is being done. Note that a search key can not be generated which
// can group hashed partitions. For TableHashPartitioning, a flag in
// the search key is used to indicate that PA_PARTITION_GROUPING is
// being done and the begin/end key values of the search key are set
// to the partition input values of the partitioning function.
// -----------------------------------------------------------------------
SearchKey *
TableHashPartitioningFunction::createSearchKey(const IndexDesc *indexDesc,
ValueIdSet availInputs,
ValueIdSet additionalPreds) const
{
ValueIdSet preds(getPartitioningKeyPredicates());
ValueIdSet nonKeyColumnSet; // empty set
SearchKey *partSearchKey = NULL;
availInputs += getPartitionInputValues();
preds += additionalPreds;
if (indexDesc->getPrimaryTableDesc()->getNATable()->isHbaseTable())
{
// The HbaseAccess executor operator doesn't have a separate
// place to handle partitioning key predicates. Instead, we can
// only use them as regular key or executor predicates.
// For salted tables, we have a chance to read a range of salt
// values through a begin/end key.
partSearchKey = new (CmpCommon::statementHeap())
SearchKey(indexDesc->getIndexKey(),
indexDesc->getOrderOfKeyValues(),
availInputs,
TRUE,
preds,
nonKeyColumnSet,
indexDesc);
}
else
{
// Call this special constructor that constructs a search key for a
// TableHashPartitioningFunction.
//
partSearchKey = new (CmpCommon::statementHeap())
SearchKey(indexDesc->getPartitioningKey(),
indexDesc->getOrderOfPartitioningKeyValues(),
availInputs,
preds,
this,
nonKeyColumnSet,
indexDesc);
}
return partSearchKey;
} // TableHashPartitioningFunction::createSearchKey()
ItemExpr *
TableHashPartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr)
const
{
CollHeap *heap = CmpCommon::statementHeap();
return new (heap) HashDistPartHash(expr);
}
UInt32 TableHashPartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len)
{
// Directly call the implementation function to compute the hash. NULL
// values and VARCHAR data types are not handled.
return ExHDPHash::hash(data, flags, len);
}
ItemExpr * TableHashPartitioningFunction::getHashingExpression() const
{
ItemExpr* hashExpr = NULL;
ItemExpr* partExpr = getExpression();
if ( partExpr ) {
hashExpr = partExpr->child(0);
CMPASSERT(hashExpr);
if ( hashExpr->getOperatorType() == ITM_PROGDISTRIB ) {
hashExpr = hashExpr->child(0);
CMPASSERT(hashExpr);
}
CMPASSERT ( hashExpr->getOperatorType() == ITM_HDPHASH);
}
return hashExpr;
}
// -----------------------------------------------------------------------
// HashDistPartitioningFunction Destructor
// -----------------------------------------------------------------------
HashDistPartitioningFunction::~HashDistPartitioningFunction() {}
// -----------------------------------------------------------------------
// HashDistPartitioningFunction Safe down cast.
// -----------------------------------------------------------------------
const HashDistPartitioningFunction*
HashDistPartitioningFunction::castToHashDistPartitioningFunction() const
{
return this;
}
PartitioningRequirement*
HashDistPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireHashDist(this);
}
PartitioningFunction*
HashDistPartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap()) HashDistPartitioningFunction(*this);
}
// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString HashDistPartitioningFunction::getText() const
{
NAString result("hash1 partitioned ");
char nparts[40];
sprintf(nparts,"%d (%d) ways on (", numberOfPartitions_,
numberOfOrigHashPartitions_);
result += nparts;
getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
result += ")";
return result;
}
void HashDistPartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent, "HashDistPartitioningFunction");
} // TableHashPartitioningFunction::print()
// -----------------------------------------------------------------------
// HashDistPartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
HashDistPartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
const NAFileSet * fileSet = idesc->getNAFileSet();
const NAColumnArray & allColumns = fileSet->getAllColumns();
const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();
CollIndex ixColNumber;
ValueId keyValueId;
ValueIdSet partitioningKey;
ValueIdList partitioningKeyList;
for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
{
// which column of the index is this (usually this will be == i)
ixColNumber = allColumns.index(partKeyColumns[i]);
// insert the value id of the index column into the partitioning
// key column value id set
keyValueId = idesc->getIndexColumns()[ixColNumber];
partitioningKey += keyValueId;
partitioningKeyList.insertAt(i,keyValueId);
} // end loop over partitioning key columns
// -----------------------------------------------------------------
// Allocate a new HashPartitioningFunction.
// -----------------------------------------------------------------
HashDistPartitioningFunction *partFunc = new(idesc->wHeap())
HashDistPartitioningFunction (partitioningKey,
partitioningKeyList,
getCountOfPartitions(),
getNodeMap()->copy(idesc->wHeap()));
// -----------------------------------------------------------------
// Construct the partitioning key predicates.
// -----------------------------------------------------------------
partFunc->createPartitioningKeyPredicates();
return partFunc;
} // HashDistPartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
// HashDistPartitioningFunction::comparePartFuncToFunc(): Compare this
// partitioning function to another hash dist function. To be 'SAME'
// must have the same number and order of partitioning key columns and
// have the same number of partitions (scaled and original).
// -----------------------------------------------------------------------
COMPARE_RESULT
HashDistPartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);
if (c != SAME)
return INCOMPATIBLE;
const HashDistPartitioningFunction *oth =
other.castToHashDistPartitioningFunction();
// Since they compared 'SAME', oth should always exist, so this
// test is redundant.
//
if(!oth)
return INCOMPATIBLE;
// They must be based on the same physical partitioning.
//
if (getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())
return INCOMPATIBLE;
// Make sure that the keys are in the same order.
//
if (keyColumnList_.entries() != oth->keyColumnList_.entries())
return INCOMPATIBLE;
for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
{
if (keyColumnList_[i] != oth->keyColumnList_[i])
return INCOMPATIBLE;
if ( NOT (originalKeyColumnList_[i].getType() ==
oth->originalKeyColumnList_[i].getType() )
)
return INCOMPATIBLE;
}
return SAME;
} // HashDistPartitioningFunction::comparePartFuncToFunc()
PartitioningFunction *
HashDistPartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
// Allow arbitrary scaling down of HashDistPartitioningFunction.
// (The runtime will handle the mapping of physical partitions
// to logical partitions.)
// Scaling up (to a greater number of partitions is not allowed for
// HashDistPartitioningFunction.
//
suggestedNewNumberOfPartitions =
(suggestedNewNumberOfPartitions > numberOfPartitions_)
? numberOfPartitions_
: suggestedNewNumberOfPartitions;
numberOfPartitions_ = suggestedNewNumberOfPartitions;
return this;
} // HashDistPartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
// HashDistPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
HashDistPartitioningFunction::isAGroupingOf(const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
const HashDistPartitioningFunction *oth =
other.castToHashDistPartitioningFunction();
// If other is not a HashDistPartitioningFunction, then it cannot
// be a grouping of...
if (oth == NULL)
return FALSE;
if (keyColumnList_.entries() != oth->keyColumnList_.entries())
return FALSE;
// compare the key columns and their order
for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
{
if (keyColumnList_[i] != oth->keyColumnList_[i])
return FALSE;
if ( NOT (originalKeyColumnList_[i].getType() ==
oth->originalKeyColumnList_[i].getType() )
)
return FALSE;
}
// If this function has more partitions than other,
// then it cannot be a grouping of.
// Eg. this.numParts: 10 this.origNumParts: 20
// oth.numParts: 5 oth.origNumParts: 20
//
// If the two functions are not based on the same physical function,
// then it cannot be a grouping of.
// Eg. this.numParts: 10 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 30
//
if((getCountOfPartitions() > oth->getCountOfPartitions()) ||
(getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())) {
return FALSE;
}
// Here the following is known to be TRUE:
//
// (getCountOfPartitions() <= oth->getCountOfPartitions()
//
// AND
//
// (getCountOfOrigHashPartitions() == oth->getCountOfOrigHashPartitions())
//
// Eg. this.numParts: 10 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
// If other has not been scaled (allow arbitrary scaling of one function):
// Eg. this.numParts: 7 this.origNumParts: 20
// oth.numParts: 20 oth.origNumParts: 20
// OR
// If they have both been scaled to the same number of partitions:
// then it is a grouping of.
// Eg. this.numParts: 7 this.origNumParts: 20
// oth.numParts: 7 oth.origNumParts: 20
//
if((oth->getCountOfPartitions() == oth->getCountOfOrigHashPartitions()) ||
(getCountOfPartitions() == oth->getCountOfPartitions())) {
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup =
((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
/ getCountOfPartitions());
return TRUE;
}
// WARNING..... I am not sure if the current code can ever produce
// a situation that would bring control to here. Also, I am not
// sure if the semantics of GROUPING implemented below are correct
// for these situations.
//
// Here the following is known to be TRUE:
//
// both functions have been scaled. (I DON'T THINK THIS CAN HAPPEN)
//
// AND
//
// They are scaled to different sizes.
//
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 7 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// Under these conditions, three things must be true for it to be a
// grouping of:
//
// - the scaled number of partitions must evenly divide the scaled
// number of partitions of other
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// - the other scaling must be a multiple of or evenly divide the
// original number of partitions
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 5 this.origNumParts: 20
// oth.numParts: 40 oth.origNumParts: 20
//
// - this scaling must also be a multiple of or evenly divide the
// original number of partitions
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 40 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// If the scaled number of partitions evenly divides the scaled
// number of partitions of other...
//
if((oth->getCountOfPartitions() % getCountOfPartitions()) != 0) {
return FALSE;
}
// AND the other scaling is a multiple of or evenly divides the
// original number of partitions...
//
if(oth->getCountOfOrigHashPartitions() >= oth->getCountOfPartitions()) {
if(oth->getCountOfOrigHashPartitions() % oth->getCountOfPartitions()) {
return FALSE;
}
} else {
if(oth->getCountOfPartitions() % oth->getCountOfOrigHashPartitions()) {
return FALSE;
}
}
// AND this scaling is a multiple of or evenly divides the original
// number of partitions ...
//
if(getCountOfOrigHashPartitions() >= getCountOfPartitions()) {
if(getCountOfOrigHashPartitions() % getCountOfPartitions()) {
return FALSE;
}
} else {
if(getCountOfPartitions() % getCountOfOrigHashPartitions()) {
return FALSE;
}
}
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup =
((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
/ getCountOfPartitions());
// THEN it is a grouping of...
//
return TRUE;
} // HashDistPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
// - Create expressions for the constant values for the original and
// scale number of partitions. Create the hash1 (hash distrib)
// function, PAGroup, and return the result.
// -----------------------------------------------------------------------
ItemExpr *
HashDistPartitioningFunction::buildPartitioningExpression(
const ValueIdList &keyCols) const
{
CollHeap *heap = CmpCommon::statementHeap();
NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE);
char buffer[20];
// Create a ConstValue expression containing the original number of hash
// partitions.
Lng32 numParts = getCountOfOrigHashPartitions();
sprintf(buffer, "%d", numParts);
NAString numPartsStr("origNumParts");
ItemExpr *origNumParts =
new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);
// Create a ConstValue expression containing the scaled number of hash
// partitions.
numParts = getCountOfPartitions();
sprintf(buffer, "%d", numParts);
numPartsStr = "scaledNumParts";
ItemExpr *scaledNumParts = new (heap)
ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);
// Create the hash distribution function.
// partitions.
ItemExpr *partFunc =
new (heap)
ProgDistrib(new (heap) HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), origNumParts);
// Add a PAGroup expression and return the partitioning function
partFunc = new (heap) PAGroup(partFunc, scaledNumParts, origNumParts);
return partFunc;
}
// -----------------------------------------------------------------------
// HashDistPartitioningFunction::buildPartitioningSelectionExpr
// - build the expression used during partition selection
// -----------------------------------------------------------------------
ItemExpr *
HashDistPartitioningFunction::buildPartitioningSelectionExpr(
const ValueIdList &keyCols,
ItemExpr *numParts) const
{
CollHeap *heap = CmpCommon::statementHeap();
ItemExpr *partFunc =
new (heap)
ProgDistrib(new (heap)
HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)),
numParts);
return partFunc;
}
// -----------------------------------------------------------------------
// Hash2PartitioningFunction Destructor
// -----------------------------------------------------------------------
Hash2PartitioningFunction::~Hash2PartitioningFunction() {}
// -----------------------------------------------------------------------
// Hash2PartitioningFunction Safe down cast.
// -----------------------------------------------------------------------
const Hash2PartitioningFunction*
Hash2PartitioningFunction::castToHash2PartitioningFunction() const
{
return this;
}
PartitioningRequirement*
Hash2PartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireHash2(this);
}
PartitioningFunction*
Hash2PartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap()) Hash2PartitioningFunction(*this);
}
// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString Hash2PartitioningFunction::getText() const
{
NAString result("hash2 partitioned ");
char nparts[32];
sprintf(nparts,"%d ways on (", numberOfPartitions_);
result += nparts;
getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
result += ")";
return result;
}
void Hash2PartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent, "Hash2PartitioningFunction");
} // TableHashPartitioningFunction::print()
// -----------------------------------------------------------------------
// Hash2PartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
Hash2PartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
const NAFileSet * fileSet = idesc->getNAFileSet();
const NAColumnArray & allColumns = fileSet->getAllColumns();
const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();
CollIndex ixColNumber;
ValueId keyValueId;
ValueIdSet partitioningKey;
ValueIdList partitioningKeyList;
for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
{
// which column of the index is this (usually this will be == i)
ixColNumber = allColumns.index(partKeyColumns[i]);
// insert the value id of the index column into the partitioning
// key column value id set
keyValueId = idesc->getIndexColumns()[ixColNumber];
partitioningKey += keyValueId;
partitioningKeyList.insertAt(i,keyValueId);
} // end loop over partitioning key columns
// -----------------------------------------------------------------
// Allocate a new HashPartitioningFunction.
// -----------------------------------------------------------------
Hash2PartitioningFunction *partFunc = new(idesc->wHeap())
Hash2PartitioningFunction (partitioningKey,
partitioningKeyList,
getCountOfPartitions(),
getNodeMap()->copy(idesc->wHeap()));
partFunc->setRestrictedBeginPartNumber(getRestrictedBeginPartNumber());
partFunc->setRestrictedEndPartNumber(getRestrictedEndPartNumber());
// -----------------------------------------------------------------
// Construct the partitioning key predicates.
// -----------------------------------------------------------------
partFunc->createPartitioningKeyPredicates();
return partFunc;
} // Hash2PartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
// Hash2PartitioningFunction::createPartSelectionExprFromSearchKey
// is called from PartitionAccess::preCodeGen() to create expressions
// for determining the beginning and ending partition numbers that
// partition access must deal with. Other partitioning schemes
// pass partition numbers, but Hash2 passes hash boundaries. Hash2
// passes hash boundaries instead to allow a single ESP to work with
// groupings of tables with different numbers of physical partitions.
// -----------------------------------------------------------------------
void Hash2PartitioningFunction::createPartSelectionExprFromSearchKey(
const ValueId beginPartSelId,
const ValueId endPartSelId,
ValueIdList &partSelectionValIds) const
{
ItemExpr *beginPartSelExpr = new (CmpCommon::statementHeap())
Hash2Distrib(beginPartSelId.getItemExpr(),
partitionSelectionExprInputs()[1].getItemExpr());
beginPartSelExpr->synthTypeAndValueId();
ItemExpr *endPartSelExpr = new (CmpCommon::statementHeap())
Hash2Distrib(endPartSelId.getItemExpr(),
partitionSelectionExprInputs()[1].getItemExpr());
endPartSelExpr->synthTypeAndValueId();
partSelectionValIds.insert(beginPartSelExpr->getValueId());
partSelectionValIds.insert(endPartSelExpr->getValueId());
}
// -----------------------------------------------------------------------
// Hash2PartitioningFunction::comparePartFuncToFunc(): Compare this
// partitioning function to another hash2 function. To be 'SAME'
// must have the same number and order of partitioning key columns and
// the same number of scaled partitions.
// -----------------------------------------------------------------------
COMPARE_RESULT
Hash2PartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);
if (c != SAME)
return INCOMPATIBLE;
const Hash2PartitioningFunction *oth =
other.castToHash2PartitioningFunction();
// Since they compared 'SAME', oth should always exist, so this
// test is redundant.
//
if(!oth)
return INCOMPATIBLE;
// If OLD_HASH2_GROUPING is turned on, then the original number of
// partitions must be equal. The OLD_HASH2_GROUPING CQD is for test
// purposes and will most likely be removed after the new hash2
// grouping has been tested extensively.
if (CmpCommon::getDefault(OLD_HASH2_GROUPING) == DF_ON &&
getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())
return INCOMPATIBLE;
// The normal behavior of hash2 grouping is that the scaled
// number of partitions must be equal.
if (getCountOfPartitions() != oth->getCountOfPartitions())
return INCOMPATIBLE;
// Make sure that the keys are in the same order.
//
if (keyColumnList_.entries() != oth->keyColumnList_.entries())
return INCOMPATIBLE;
for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
{
if (keyColumnList_[i] != oth->keyColumnList_[i])
return INCOMPATIBLE;
if ( NOT (originalKeyColumnList_[i].getType() ==
oth->originalKeyColumnList_[i].getType() )
)
return INCOMPATIBLE;
}
return SAME;
} // Hash2PartitioningFunction::comparePartFuncToFunc()
// -----------------------------------------------------------------------
// Hash2PartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
PartitioningFunction *
Hash2PartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
// Do not allow suggestedNewNumberOfPartitions greater than
// the number of partitions.
suggestedNewNumberOfPartitions =
(suggestedNewNumberOfPartitions > numberOfPartitions_)
? numberOfPartitions_
: suggestedNewNumberOfPartitions;
// Only allow a suggestedNewNumberOfPartitions that evenly
// divides into the number of partitions.
CMPASSERT(suggestedNewNumberOfPartitions > 0);
while (numberOfPartitions_ % suggestedNewNumberOfPartitions != 0)
suggestedNewNumberOfPartitions--;
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
NodeMap* newNodeMap =
new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
replaceNodeMap(newNodeMap);
}
numberOfPartitions_ = suggestedNewNumberOfPartitions;
return this;
} // Hash2PartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
// Hash2PartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
Hash2PartitioningFunction::isAGroupingOf(const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
const Hash2PartitioningFunction *oth =
other.castToHash2PartitioningFunction();
// If other is not a Hash2PartitioningFunction, then it cannot
// be a grouping of...
if (oth == NULL)
return FALSE;
if (getCountOfPartitions() > oth->getCountOfPartitions() ||
oth->getCountOfPartitions() % getCountOfPartitions() != 0)
return FALSE;
// For testing purposes, it was requested that the ability to
// provide grouping similar to hash1 be provided. If the
// OLD_HASH2_GROUPING CQD is true, then only allow grouping
// when the number of original partitions is the same.
if (CmpCommon::getDefault(OLD_HASH2_GROUPING) == DF_ON &&
getCountOfPartitions() != oth->getCountOfOrigHashPartitions())
return FALSE;
if (keyColumnList_.entries() != oth->keyColumnList_.entries())
return FALSE;
// compare the key columns and their order
for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
{
if (keyColumnList_[i] != oth->keyColumnList_[i])
return FALSE;
if ( NOT (originalKeyColumnList_[i].getType() ==
oth->originalKeyColumnList_[i].getType() )
)
return FALSE;
}
// This is a grouping of. Set the maxPartsPerGroup and return TRUE.
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = oth->getCountOfPartitions() / getCountOfPartitions();
return TRUE;
} // Hash2PartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
// Hash2PartitioningFunction::buildPartitioningExpression()
// - Create an expression for the Hash2 partitioning function.
// -----------------------------------------------------------------------
ItemExpr *
Hash2PartitioningFunction::buildPartitioningExpression(const ValueIdList &keyCols) const
{
CollHeap *heap = CmpCommon::statementHeap();
NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE);
// Build a ConstValue expression of the scaled number of partitions.
Lng32 numParts = getCountOfPartitions();
char buffer[20];
sprintf(buffer, "%d", numParts);
NAString numPartsStr("scaledNumParts");
ItemExpr *scaledNumParts = new (heap)
ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);
// Create the hash2 partitioning function expression and return it.
ItemExpr *partFunc = new (heap) Hash2Distrib(
new (heap) HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), scaledNumParts
);
return partFunc;
}
// -----------------------------------------------------------------------
// HashDistPartitioningFunction::buildPartitioningSelectionExpr
// - build the expression used during partition selection
// -----------------------------------------------------------------------
ItemExpr *
Hash2PartitioningFunction::buildPartitioningSelectionExpr(
const ValueIdList &keyCols,
ItemExpr *numParts) const
{
CollHeap *heap = CmpCommon::statementHeap();
ItemExpr *partFunc = new (heap)
Hash2Distrib(new (heap)
HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)),
numParts);
return partFunc;
}
// A refactored method whose implementaion is originally contained inside
// method HashPartitioningFunction::createPartitioningExpression().
ItemExpr*
PartitioningFunction::getCastedItemExpre(ItemExpr* iv, const NAType& oType, CollHeap* heap)
{
ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
if (dataConversionErrorFlag == 0)
{
dataConversionErrorFlag =
new (CmpCommon::statementHeap()) HostVar(
"_sys_repartConvErrorFlg",
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE),
TRUE);
storeConvErrorExpr(dataConversionErrorFlag);
}
// Note that we always generate a Narrow operator here, even
// if it's not necessary. Because keyColumnList_[i] may be a
// VEGReference it may not be possible to determine its
// exact type at this time. We could eliminate some cases of
// Narrow in the preCodeGen phase but this is not done yet.
// Begin_Fix 10-040114-2431
// 02/18/2004
// changed statement below for above mentioned solution
ItemExpr* c = new (CmpCommon::statementHeap()) Narrow(
iv,
dataConversionErrorFlag,
&oType,
ITM_NARROW,
FALSE,
TRUE); // TRUE => make nullability of Narrow same as child's
// End_Fix 10-040114-2431
((Narrow*)c)->setMatchChildType(TRUE);
return c;
}
// ***********************************************************************
// SkewedDataPartitioningFunction
// ***********************************************************************
SkewedDataPartitioningFunction::SkewedDataPartitioningFunction(
PartitioningFunction* partFuncForUnskewed,
const skewProperty& sk,
NAMemory* heap
)
: PartitioningFunction(SKEWEDDATA_PARTITIONING_FUNCTION, NULL, heap),
partialPartFunc_(partFuncForUnskewed), skewProperty_(sk),
skewHashList_(NULL)
{
CMPASSERT(NOT sk.isAnySkew());
CMPASSERT(partFuncForUnskewed->canHandleSkew());
ValueIdList pivl;
ValueIdSet pivs;
// Create the partition input variable and partitioning key for this
// function. The main purpose of this is to produce a partitioning key.
// The skew buster partitioning function doesn't have a real
// partitioning key. So, one cannot really decide from a row in
// which partition it is (at least not for a skew element row).
// Because we need to have some partitioning key, we basically say
// "you have to know the partition number in order to compute the
// partition number". This is another way of saying that the
// partition number cannot be computed from the row. We achieve this
// by making the PIV the partitioning key.
// If needed, we can also make the PIV the partitioning expression. We
// cannot make a partitioning key expression with this method, however.
createPIV(pivl);
pivs = pivl;
setPartKey(pivs);
}
void SkewedDataPartitioningFunction::createPIV(ValueIdList &partInputValues)
{
// Create a single partition input value
//
ItemExpr *dummyPIV = new (CmpCommon::statementHeap())
HostVar("_sys_dummySkewBusterPartNo",
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
TRUE);
dummyPIV->synthTypeAndValueId();
// the partition input value is one integer, which is also used as
// the partitioning key
partInputValues.insert(dummyPIV->getValueId());
// Store the partition input values.
//
storePartitionInputValues(partInputValues);
}
void SkewedDataPartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
partialPartFunc_ -> replacePivs(newPivs, newPartKeyPreds);
}
SkewedDataPartitioningFunction::SkewedDataPartitioningFunction(
const SkewedDataPartitioningFunction& other, NAMemory* heap)
: PartitioningFunction(other, heap),
skewHashList_(NULL), // recompute this
partialPartFunc_(other.partialPartFunc_), // may need a deep copy
skewProperty_(other.skewProperty_) // share the skew property
{
}
Lng32 SkewedDataPartitioningFunction::getCountOfPartitions() const
{
return partialPartFunc_->getCountOfPartitions();
}
void SkewedDataPartitioningFunction::createPartitioningKeyPredicates()
{
// do nothing, there aren't any partitioning key preds for a skew
// partition
storePartitioningKeyPredicates(ValueIdSet());
partialPartFunc_ -> createPartitioningKeyPredicates();
}
PartitioningRequirement*
SkewedDataPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap()) RequireSkewed(this);
}
PartitioningFunction*
SkewedDataPartitioningFunction::copy() const
{
SkewedDataPartitioningFunction *result;
result = new (CmpCommon::statementHeap())
SkewedDataPartitioningFunction(*this);
result->partialPartFunc_ = partialPartFunc_->copy();
return result;
}
// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString SkewedDataPartitioningFunction::getText() const
{
NAString result = partialPartFunc_->getText();
// SKEW_EXPLAIN = off --> do no display any details on skew-processing
// method or skew value list.
if ( CmpCommon::getDefault(SKEW_EXPLAIN) == DF_OFF )
return result;
NAString pat, suffix;
switch ( skewProperty_.getIndicator() ) {
case skewProperty::UNIFORM_DISTRIBUTE:
suffix = "-ud";
break;
case skewProperty::BROADCAST:
suffix = "-br";
break;
default:
break;
}
switch (partialPartFunc_ -> getPartitioningFunctionType()) {
case HASH_PARTITIONING_FUNCTION:
pat = "hash";
break;
case HASH_DIST_PARTITIONING_FUNCTION:
pat = "hash1";
break;
case HASH2_PARTITIONING_FUNCTION:
pat = "hash2";
break;
default:
break;
}
size_t loc = result.index(pat);
if ( loc != NA_NPOS ) {
result.insert(loc+pat.length(), suffix);
}
result += skewProperty_.getText();
return result;
}
void SkewedDataPartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent, "SkewedDataPartitioningFunction");
}
// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::comparePartFuncToFunc(): Compare this
// partitioning function to another hash2 function. To be 'SAME'
// must have the same number and order of partitioning key columns and
// the same number of scaled partitions.
// -----------------------------------------------------------------------
COMPARE_RESULT
SkewedDataPartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
// the other has to be a SkewedDataPartitioningFunction
const SkewedDataPartitioningFunction *oth =
other.castToSkewedDataPartitioningFunction();
if(!oth) return INCOMPATIBLE;
// compare the two partial partfuncs.
COMPARE_RESULT c = partialPartFunc_->comparePartFuncToFunc(
*(oth->getPartialPartitioningFunction())
);
if (c != SAME) return INCOMPATIBLE;
// compare the skew property last.
if ( NOT (skewProperty_ == oth->skewProperty_) )
return INCOMPATIBLE;
return SAME;
} // SkewedDataPartitioningFunction::comparePartFuncToFunc()
// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
//::scaleNUmberOfPartitions() are called in following locations
//
// OptPhysRelExpr.cpp
// 3536 NJ::genLeftChild()
// 4892 NJ:createContextForChild() (rightPartFunc->)
// 14166 and 14262 synthDP2PhysicalProperty()
//
// GP.cpp
// 955 GroupAttributes::recommendedOrderForNJProbing()
//
//As a result, the following version will not be called.
PartitioningFunction *
SkewedDataPartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
PartitioningFunction *mayBeNewPartfunc =
partialPartFunc_->scaleNumberOfPartitions(
suggestedNewNumberOfPartitions, partGroupDist);
if ( mayBeNewPartfunc != partialPartFunc_ )
partialPartFunc_ = mayBeNewPartfunc;
return this;
} // SkewedDataPartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
SkewedDataPartitioningFunction::isAGroupingOf(const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
return FALSE;
} // SkewedDataPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::copyAndRemap()
// -----------------------------------------------------------------------
PartitioningFunction*
SkewedDataPartitioningFunction::copyAndRemap
(ValueIdMap& map, NABoolean mapItUp) const
{
SkewedDataPartitioningFunction *newPartFunc =
(SkewedDataPartitioningFunction *)
copy()->castToSkewedDataPartitioningFunction();
CMPASSERT(skewProperty_.getIndicator() != skewProperty::BROADCAST);
newPartFunc->remapIt(this, map, mapItUp);
newPartFunc->partialPartFunc_->remapIt(partialPartFunc_, map, mapItUp);
return newPartFunc;
} // SkewedDataPartitioningFunction::copyAndRemap()
void
SkewedDataPartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
PartitioningFunction::preCodeGen(availableValues);
partialPartFunc_->preCodeGen(availableValues);
}
ItemExpr* SkewedDataPartitioningFunction::createPartitioningExpression()
{
CMPASSERT(partialPartFunc_ -> isAHash2PartitioningFunction() == TRUE);
// cast to TableHashPartitioningFunction class (the parent class
// of Hash2PartitioningFunction) which has the method
// createPartitioningExpressionImp() defined.
ItemExpr* partExpr = ((TableHashPartitioningFunction*)partialPartFunc_)
-> createPartitioningExpressionImp(TRUE /*do varchar cast*/);
storeExpression(partExpr);
return partExpr;
}
// ***********************************************************************
// RangePartitionBoundaries
// ***********************************************************************
// -----------------------------------------------------------------------
// Constructor for RangePartitionBoundaries
// -----------------------------------------------------------------------
RangePartitionBoundaries::RangePartitionBoundaries
(Lng32 numberOfPartitions,
Lng32 numberOfPartitioningKeyColumns, NAMemory *h)
: partKeyColumnCount_(numberOfPartitioningKeyColumns),
origPartKeyColumnCount_(numberOfPartitioningKeyColumns),
partitionCount_(numberOfPartitions),
origPartitionCount_(numberOfPartitions),
boundaryStringsList_(h,numberOfPartitions+1),
boundaryValuesList_(h,numberOfPartitions+1),
boundaryValues_(h,numberOfPartitions+1),
binaryBoundaryValues_(h,numberOfPartitions+1),
encodedBoundaryKeyLength_(-1),
setBinaryBoundaryFirstLastKey_(FALSE),
setupForStatement_(FALSE),
resetAfterStatement_(FALSE),
heap_(h)
{
// MUST be given non zero counts
// make two dummy entries for the first and last key
boundaryValuesList_.insertAt(0,NULL);
boundaryValuesList_.insertAt(numberOfPartitions,NULL);
boundaryStringsList_.insertAt(0, NULL);
boundaryStringsList_.insertAt(numberOfPartitions,NULL);
} // RangePartitionBoundaries::RangePartitionBoundaries()
RangePartitionBoundaries::~RangePartitionBoundaries() {}
void RangePartitionBoundaries::defineUnboundBoundary
(Lng32 partitionNumber,
const ItemExpr * unboundBoundaryValue,
const char *encodedKeyValue)
{
// must insert a 'true' boundary between the first and last (pre-allocated)
// entry, this gives us n insertion points for a table with n partitions,
// new entry delimits partition <partitionNumber>
CMPASSERT(partitionNumber > 0 AND
partitionNumber <= partitionCount_ AND
unboundBoundaryValue != NULL AND
encodedKeyValue != NULL);
boundaryValuesList_.insertAt(partitionNumber, unboundBoundaryValue);
// encodedKeyValue should be NULL only for SQL/MP tables
if (encodedKeyValue)
binaryBoundaryValues_.insertAt(partitionNumber, encodedKeyValue);
}
void RangePartitionBoundaries::bindAddBoundaryValue(Lng32 partitionNumber)
{
if(!boundaryValuesList_[partitionNumber]){
boundaryValues_.insertAt(partitionNumber, NULL);
return;
}
//get unbound boundary value
ItemExpr * unboundBoundaryValue = ((ItemExpr *)boundaryValuesList_[partitionNumber])->
copyTree(CmpCommon::statementHeap());
//new (CmpCommon::statementHeap())
//ItemExpr(*boundaryValuesList_[partitionNumber]);
//bind the boundary value
unboundBoundaryValue->synthTypeAndValueId();
ItemExprList * boundBoundaryValue = new (CmpCommon::statementHeap())
ItemExprList(unboundBoundaryValue,
CmpCommon::statementHeap());
boundaryValues_.insertAt(partitionNumber, boundBoundaryValue);
if (!boundaryStringsList_.used(partitionNumber)) {
NAString result;
// use QUERY_FORMAT to obtain the full SQL text.
boundBoundaryValue->unparse(result, OPTIMIZER_PHASE, QUERY_FORMAT);
boundaryStringsList_.insertAt(partitionNumber, new (heap_) NAString(result));
}
}
// -----------------------------------------------------------------------
// RangePartitionBoundaries::defineBoundary()
// -----------------------------------------------------------------------
void RangePartitionBoundaries::defineBoundary
(Lng32 partitionNumber,
const ItemExprList* boundaryValue,
const char *encodedKeyValue)
{
// must insert a 'true' boundary between the first and last (pre-allocated)
// entry, this gives us n insertion points for a table with n partitions,
// new entry delimits partition <partitionNumber>
CMPASSERT(partitionNumber > 0 AND
partitionNumber <= partitionCount_ AND
boundaryValue != NULL AND
encodedKeyValue != NULL);
boundaryValues_.insertAt(partitionNumber, boundaryValue);
// encodedKeyValue should be NULL only for SQL/MP tables
if (encodedKeyValue)
binaryBoundaryValues_.insertAt(partitionNumber, encodedKeyValue);
} // RangePartitionBoundaries::defineBoundary()
// -----------------------------------------------------------------------
// RangePartitionBoundaries::checkConsistency()
// -----------------------------------------------------------------------
void RangePartitionBoundaries::checkConsistency(
const Lng32 numberOfPartitions) const
{
// If n = numberOfPartitions then entries 1 through n-1 have to be
// present. Each entry delimits the boundary between two partitions.
// Entries 0 and n contain the minimum and maximum permissible
// values, respectively. Entries 0 and n are initialized to NULL in
// the constructor and later added by method
// RangePartitioningFunction::completePartitionBoundaries()
CMPASSERT(numberOfPartitions == partitionCount_ AND
(Lng32)boundaryValues_.entries() == partitionCount_ + 1);
// check the actual boundaries after the key length of the encoded
// key and the key column value ids have been entered, but not before
if (encodedBoundaryKeyLength_ > 0)
{
CMPASSERT(boundaryValues_.entries() == binaryBoundaryValues_.entries());
for (CollIndex index = 1;
index < (CollIndex)numberOfPartitions;
index++)
{
// boundary values in SQL format and encoded key format must be
// filled in
CMPASSERT(boundaryValues_[index]);
CMPASSERT(binaryBoundaryValues_[index]);
// encoded key values must be in ascending order
// CMPASSERT(str_cmp(binaryBoundaryValues_[index-1],
// binaryBoundaryValues_[index],
// encodedBoundaryKeyLength_) < 0);
}
}
} // RangePartitionBoundaries::checkConsistency()
// -----------------------------------------------------------------------
// RangePartitionBoundaries::compareRangePartitionBoundaries()
// -----------------------------------------------------------------------
NABoolean RangePartitionBoundaries::compareRangePartitionBoundaries(
const RangePartitionBoundaries& other,
NABoolean groupingAllowed,
Lng32* maxPartsPerGroup) const
{
NABoolean match = FALSE;
CollIndex i;
CollIndex thisNumPartKeyCols,otherNumPartKeyCols;
const ItemExprList *thisBoundaryValue;
const ItemExprList *otherBoundaryValue;
const ConstValue *thisConstValue;
const ConstValue *otherConstValue;
NABoolean thisIsNegated = FALSE;
NABoolean otherIsNegated = FALSE;
Lng32 currentPartsPerGroup = 1;
Lng32 numOfPartsInLastGroup = 1;
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1; // start with the minimum
// Give up now if "other" has fewer partitions than "this"
if ( other.partitionCount_ < partitionCount_)
{
return FALSE;
}
// NOTE: We CANNOT compare the encoded values! Two values that are
// the SAME will compare as NOT THE SAME if the types of the
// underlying columns are different.
//
// So, we must compare the boundaryValues_ instead. The
// boundaryValues_ are an array of pointers to ItemExpr Lists
// which contain pointers to ConstValue nodes.
//
// We go through the array of ItemExpr Lists, and
// first compare the ValueId's of the ItemExpr Lists which
// contain pointers to the ItemExpr ConstValue nodes that represent
// the first key values. If these match, great -
// the first keys must be the same. But, they will only have the same
// ValueId if they are from the same table, or if the datatypes of
// the partitioning key columns are exactly the same.
// If they don't have the same ValueId, then we need to compare
// the original raw ASCII text as was present in the DDL.
// We go through the ConstValue nodes in ItemExpr lists one
// by one, and compare them. We use the "getConstStr" method
// of the ConstValue class to do this. This method gets the
// original raw text as typed in by the user, and also appends
// the character set type. This may be important if a user
// is joining two tables where the character set types of the
// partitioning key columns are different.
//
// Go through the boundaries of "this" and try to find them
// in "other" Don't bother with the first and last entry,
// they are always the same. When all is said and done, we
// must have found a match in other for every partition boundary
// from "this". The opposite assertion - that every partition
// boundary in "other" found a match in "this" - is not necessary
// if grouping is allowed.
CollIndex myix = 1;
CollIndex otherix = 1;
while (myix < (CollIndex) partitionCount_ AND
otherix < (CollIndex) other.partitionCount_)
{
match = FALSE;
thisBoundaryValue = boundaryValues_[myix];
otherBoundaryValue = other.boundaryValues_[otherix];
// Check for the simple case: are the ValueId's the same?
if (*thisBoundaryValue == *otherBoundaryValue)
{
match = TRUE;
}
else
{
// Traverse the ItemExprList and compare each ConstValue node
// in "this" to the one in "other". The first keys are the
// same only if they both have the same number of first keys,
// they are either both negated or neither are,
// and the string text of each ConstValue node is
// exactly the same.
thisNumPartKeyCols = thisBoundaryValue->entries();
otherNumPartKeyCols = otherBoundaryValue->entries();
if (thisNumPartKeyCols != otherNumPartKeyCols)
{
// If the two partitioning functions specified a different
// number of first key values, then they cannot possibly
// have equivalent partitioning schemes.
return FALSE;
}
else
{
// Hope for the best...
match = TRUE;
for (i = 0;i < thisNumPartKeyCols AND match; i++)
{
thisIsNegated = FALSE;
otherIsNegated = FALSE;
thisConstValue =
((*thisBoundaryValue)[i])->castToConstValue(thisIsNegated);
otherConstValue =
((*otherBoundaryValue)[i])->castToConstValue(otherIsNegated);
if ((thisIsNegated != otherIsNegated) OR
thisConstValue == NULL OR
otherConstValue == NULL OR
(thisConstValue->getConstStr(FALSE) !=
otherConstValue->getConstStr(FALSE)))
match = FALSE;
}
}
}
if (match)
{
// Found this boundary in "other". Advance to the next
// start key in both "this" and "other".
myix++;
otherix++;
if ((maxPartsPerGroup != NULL) AND
(currentPartsPerGroup > *maxPartsPerGroup))
{
*maxPartsPerGroup = currentPartsPerGroup;
}
currentPartsPerGroup = 1; // clear
}
else if (groupingAllowed)
{
// No match. Advance to the next start key in "other"
// to see if that will match.
otherix++;
currentPartsPerGroup++;
}
else
{
// These boundaries don't match, and since grouping is not allowed,
// we are out of luck. Give up now.
return FALSE;
}
} // end while more partitions
// Since we didn't look at the last group, we need to compute the
// number of partitions in it and see if it is the group with
// the largest number of partitions.
numOfPartsInLastGroup = (other.partitionCount_ - otherix) + 1;
if ((maxPartsPerGroup != NULL) AND
(numOfPartsInLastGroup > *maxPartsPerGroup))
{
*maxPartsPerGroup = numOfPartsInLastGroup;
}
if (myix == (CollIndex) partitionCount_)
{
// We have found all of our partition boundaries
// in the other range partition boundaries object.
return TRUE;
}
else
{
// Even with grouping, the partition boundaries don't match. Too bad.
return FALSE;
}
} // RangePartitionBoundaries::compareRangePartitionBoundaries()
//<pb>
//==============================================================================
// Merge two range partition boundaries to produce new range partition
// boundaries. Also produce a node map corresponding to the new range partition
// boundaries.
//
// Input:
// other -- other partition boundary map with which to merge.
// thisNodeMap -- node map associated with this RangePartitionBoundaries
// object.
//
// Output:
// resultNodeMap -- node map associated with resulting RangepartitionBoundaries
// object.
//
// Return:
// Pointer to merged partition boundaries.
//
//==============================================================================
RangePartitionBoundaries*
RangePartitionBoundaries::merge(const RangePartitionBoundaries& other,
const NodeMap& thisNodeMap,
NodeMap& resultNodeMap) const
{
// need binary boundary values to do a merge
if (encodedBoundaryKeyLength_ == 0 OR
other.encodedBoundaryKeyLength_ == 0)
return NULL;
// left is "this", right is "other",
// merge boundaries 0...entries of "this" with boundaries 1...entries-1
// of "other" (don't use -infinity and +infinity of right)
CollIndex thisix = 0;
CollIndex otherix = 1;
CollIndex resultix = 0;
CollIndex maxthis = boundaryValues_.entries();
CollIndex maxother = other.boundaryValues_.entries()-1;
CollIndex maxboth = maxthis + maxother;
RangePartitionBoundaries *result = new(CmpCommon::statementHeap())
RangePartitionBoundaries(1,partKeyColumnCount_,CmpCommon::statementHeap());
Lng32 compResult;
CMPASSERT(partKeyColumnCount_ == other.partKeyColumnCount_ AND
encodedBoundaryKeyLength_ == other.encodedBoundaryKeyLength_);
// continue merging boundaries while at least one index has not reached its
// ending value.
while (thisix < maxthis OR otherix < maxother)
{
// The final boundary has no corresponding node map entry.
// In other words, the number of boundaries is always one more than
// the number of node map entries.
if (maxboth - thisix - otherix > 1)
{
// Since node maps have one less entry than boundary maps, ensure
// that the node map's index does not fall off the end of the node
// map.
CollIndex nodeMapIx = MINOF(thisix, thisNodeMap.getNumEntries() - 1);
// Store node map entry associated with current boundary.
const NodeMapEntry* entry = thisNodeMap.getNodeMapEntry(nodeMapIx);
resultNodeMap.setNodeMapEntry(resultix,
*entry,
CmpCommon::statementHeap());
}
// other index reached max, so comparison is automatically "less"
if (otherix >= maxother)
{
compResult = -1;
}
else
{
// this index reached max, so comparison is automatically "more"
if (thisix >= maxthis)
{
compResult = 1;
}
else
{
// neither index reached max, so do an actual comparison of all
// 'real' boundaries in the encoded representation (the first
// and last boundaries are always the same)
compResult = str_cmp(binaryBoundaryValues_[thisix],
other.binaryBoundaryValues_[otherix],
encodedBoundaryKeyLength_);
}
}
// take the smallest boundary value and insert it into result
if (compResult <= 0)
{
// "this" has the smaller (or equal) boundary value
result->boundaryValues_.insertAt(
resultix,
boundaryValues_[thisix]);
result->binaryBoundaryValues_.insertAt(
resultix,
binaryBoundaryValues_[thisix]);
thisix++;
// if the boundaries are identical, skip both of them
if (compResult == 0)
otherix++;
}
else
{
// "other" has the smaller boundary value
result->boundaryValues_.insertAt(
resultix,
other.boundaryValues_[otherix]);
result->binaryBoundaryValues_.insertAt(
resultix,
other.binaryBoundaryValues_[otherix]);
otherix++;
}
// Increment result index for next boundary.
resultix++;
}
result->encodedBoundaryKeyLength_ = encodedBoundaryKeyLength_;
result->partitionCount_ = resultix - 1;
return result;
} // RangePartitionBoundaries::merge
Lng32 RangePartitionBoundaries::getOptimizedNumberOfPartKeys()
{
// Do an optimization of the partitioning key: what is passed in is
// currently the clustering key. Count the actual number of first
// keys that are specified in the DDL and only use the max. number
// of items instead of all clustering key columns. Example:
//
// create table t (a int, b int, c int, primary key (a,b desc,c)
// partition (add first key (1), add first key (2,2));
//
// In this case, the partitioning key order is not (a,b desc,c), but
// (a, b desc). This optimization should be done in CATMAN. Remove
// this method once that is done.
//
CollIndex numPartKeyCols = 0;
for (CollIndex i = 1; i < (CollIndex) partitionCount_; i++)
{
// Are there more values listed than we assume as partitioning
// key columns? Adjust our assumption if needed.
if ( boundaryValues_[i] ) {
CollIndex numKeySpecs = boundaryValues_[i]->entries();
if (numKeySpecs > numPartKeyCols)
numPartKeyCols = numKeySpecs;
} else {
// no boundary values specified in SQL syntax
// (e.g. when we got binary region boundaries only)
numPartKeyCols = partKeyColumnCount_;
break;
}
}
return numPartKeyCols;
}
Lng32 RangePartitionBoundaries::scaleNumberOfPartitions(
Lng32 suggestedNewNumberOfPartitions,
const NodeMap* nodeMap,
PartitionGroupingDistEnum partGroupDist)
{
// * * * * * * * * * * * * * I M P O R T A N T * * * * * * * * * * * * * *
// * *
// * THIS MEMBER FUNCTION USES THE SAME GROUPING ALGORITHM AS MEMBER *
// * FUNCTION NodeMap::deriveGrouping(). ANY CHANGES TO THIS MEMBER *
// * FUNCTION WOULD NECESSITATE CHANGES TO MEMBER FUNCTION *
// * NodeMap::deriveGrouping() AS WELL. *
// * *
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
// For now, the only way of scaling the number of partitions
// is by eliminating part of the boundaries (deleting all but
// every i-th boundary where i is an integer number).
// There are two problems with this limitation: the suggestion may
// ask for an increase in the number of partitions, or the suggestion
// may cause an imbalance because the suggested number times i is not
// equal to the existing number of partitions.
// Here are the heuristics that we are using until we get smarter:
//
// H1: Treat the suggested number as an upper limit of new partitions.
// Often, the suggestion is the number of CPUs available and plans
// with 13 partitions don't run very well on 12 CPUs.
//
// H2: Distribute the old partitions such that the number of old partitions
// for each new partition varies at most by one (a new partition has
// either o or o+1 old partitions in it).
//
// We have two ways of distributing the partitions: "uniformly" (off
// by at most one) distribute the number of physical partitions
// amongst the groups, or uniformly distribute the number of active
// partitions amongst the groups. The latter is preferable if there
// are many inactive partitions and the active partitions are not
// uniformly distributed amongst them. Actually, if the active
// partition estimate is correct, we cannot go wrong distributing
// the partitions based on the number of active partitions.
// In cases where this would combine partitions from two or more clusters
// it is better not to form such groups, as this would limit autonomy
// and also lead to more messages between clusters. In such cases, this
// method will return with the original partition boundary arrays.
// Make copies in case we need to undo.
ARRAY(const ItemExprList *) undoBoundaryValues(boundaryValues_);
ARRAY(const char *) undoBinaryBoundaryValues(binaryBoundaryValues_);
// As already said, we can't increase the number of partitions
if (suggestedNewNumberOfPartitions >= partitionCount_)
return partitionCount_;
CMPASSERT(suggestedNewNumberOfPartitions > 1); // sanity check
NABoolean useAPDistribution = FALSE;
// Determine partition grouping distribution algorithm to use
if (partGroupDist == DEFAULT_PARTITION_GROUPING)
{
if (CmpCommon::getDefault(BASE_NUM_PAS_ON_ACTIVE_PARTS) == DF_ON)
useAPDistribution = TRUE;
else
useAPDistribution = FALSE;
}
else if (partGroupDist == UNIFORM_PHYSICAL_PARTITION_GROUPING)
{
useAPDistribution = FALSE;
}
else if (partGroupDist == UNIFORM_ACTIVE_PARTITION_GROUPING)
{
useAPDistribution = TRUE;
}
Lng32 newix = 1;
Lng32 prevPartStart = 0;
if (useAPDistribution)
{
// Uniform number of active partitions grouping
CostScalar activePartitions =
((NodeMap *)(nodeMap))->getNumActivePartitions();
Lng32 numActiveParts = (Lng32)activePartitions.getValue();
if (suggestedNewNumberOfPartitions > numActiveParts)
return partitionCount_;
Lng32 currentNumActiveParts = 0;
Lng32 numActivePartsPerGroup = numActiveParts / suggestedNewNumberOfPartitions;
Lng32 remainder = numActiveParts % suggestedNewNumberOfPartitions;
Lng32 oldix = 1;
Lng32 partNum = 0;
NodeMapEntry::PartitionState partState;
NABoolean isPartActive = FALSE;
while ((partNum < partitionCount_) AND
(newix != suggestedNewNumberOfPartitions))
{
// Determine if the current partition is active
partState = nodeMap->getNodeMapEntry((CollIndex)partNum)->
getPartitionState();
isPartActive = ((partState == NodeMapEntry::ACTIVE) OR
(partState == NodeMapEntry::ACTIVE_NO_DATA));
if ((remainder != 0) AND
((newix - 1) + remainder == suggestedNewNumberOfPartitions))
{
// all the remaining groups get one extra active partition
numActivePartsPerGroup++;
remainder = 0; // so we won't do it again
}
if (isPartActive)
{
// one more active partition for this group
currentNumActiveParts++;
if (currentNumActiveParts == numActivePartsPerGroup)
{
// full group - time to end it, so check that partns on same cluster
if (nodeMap->isMultiCluster(prevPartStart, oldix, TRUE))
{
// More than one cluster in a group - undo.
boundaryValues_ = undoBoundaryValues;
binaryBoundaryValues_ = undoBinaryBoundaryValues;
return partitionCount_;
}
// Transfer the values over
boundaryValues_[newix] = boundaryValues_[oldix];
binaryBoundaryValues_[newix] = binaryBoundaryValues_[oldix];
// advance to prepare for next new partition boundary
newix++;
// reset number of active partitions in the current group
currentNumActiveParts = 0;
// remember where that previous group started
prevPartStart = oldix;
}
} // end if current partition is active
// advance to next old partition boundary
oldix++;
// advance to next partition
partNum++;
} // end while more partitions and more new boundaries to produce
}
else // Uniform number of physical partitions grouping
{
// Determine an integer value i such that partitionCount_ / i is
// approximately equal to suggestedNewNumberOfPartitions.
Lng32 numPartsPerGroup = partitionCount_ / suggestedNewNumberOfPartitions;
Lng32 remainder = partitionCount_ % suggestedNewNumberOfPartitions;
// If the remainder is not zero we will end up with an imbalanced new
// partitioning scheme. To reduce the imbalance, distribute the remainder
// partitions such that at most one of them goes into each new partition.
// The easiest way to do this is to give one extra old partition to the
// last <remainder> of the new partitions.
// Remember that entries 0 and partitionCount_ stand for the min and
// max key and should be preserved. For the entries in between, take
// only every <numPartsPerGroup> one and make a new, contiguous array.
Lng32 oldix = numPartsPerGroup;
while (oldix < partitionCount_)
{
if (nodeMap->isMultiCluster(prevPartStart, oldix, FALSE))
{
// More than one cluster in a group - undo.
boundaryValues_ = undoBoundaryValues;
binaryBoundaryValues_ = undoBinaryBoundaryValues;
return partitionCount_;
}
boundaryValues_[newix] = boundaryValues_[oldix];
binaryBoundaryValues_[newix] = binaryBoundaryValues_[oldix];
if (newix + remainder == suggestedNewNumberOfPartitions)
numPartsPerGroup++; // rest of the new partitions get one extra
prevPartStart = oldix;
oldix += numPartsPerGroup;
newix += 1;
}
} // end if uniform number of physical partitions grouping
// move the last entry for the max key
if (nodeMap->isMultiCluster(prevPartStart, partitionCount_,
useAPDistribution))
{
// More than one cluster in a group - undo.
boundaryValues_ = undoBoundaryValues;
binaryBoundaryValues_ = undoBinaryBoundaryValues;
return partitionCount_;
}
boundaryValues_[newix] = boundaryValues_[partitionCount_];
binaryBoundaryValues_[newix] = binaryBoundaryValues_[partitionCount_];
// Sanity check, we should now have exactly suggestedNewNumberOfPartitions+1
// new entries in the boundaryValues_ array.
CMPASSERT(newix == suggestedNewNumberOfPartitions);
// to be nice, make the rest of the array empty
for (newix = newix+1; newix <= partitionCount_; newix++)
{
boundaryValues_.remove(newix);
binaryBoundaryValues_.remove(newix);
}
partitionCount_ = suggestedNewNumberOfPartitions;
return partitionCount_;
}
NABoolean RangePartitionBoundaries::isAGroupingOf(
const RangePartitionBoundaries &other,
Lng32* maxPartsPerGroup) const
{
// Call the method that compares range partition boundaries, with
// the optional parameter that indicates if grouping is OK set to TRUE.
return compareRangePartitionBoundaries(other,TRUE,maxPartsPerGroup);
} // RangePartitionBoundaries::isAGroupingOf()
// -----------------------------------------------------------------------
// RangePartitionBoundaries::getBoundaryValues()
// -----------------------------------------------------------------------
const ItemExprList* RangePartitionBoundaries::getBoundaryValues(Lng32 index) const
{
// It is legal to index entries in the range [0, partitionCount_].
CMPASSERT( (index >= 0) AND (index <= partitionCount_) );
return boundaryValues_[index];
} // RangePartitionBoundaries::getBoundaryValues()
// -----------------------------------------------------------------------
// RangePartitionBoundaries::getBinaryBoundaryValue()
// -----------------------------------------------------------------------
const char * RangePartitionBoundaries::getBinaryBoundaryValue(
Lng32 index) const
{
// It is legal to index entries in the range [0, partitionCount_].
CMPASSERT( (index >= 0) AND (index <= partitionCount_) );
return binaryBoundaryValues_[index];
} // RangePartitionBoundaries::getBinaryBoundaryValue()
// -----------------------------------------------------------------------
// RangePartitionBoundaries::completePartitionBoundaries()
// -----------------------------------------------------------------------
void RangePartitionBoundaries::completePartitionBoundaries(
const ValueIdList& partitioningKeyOrder,
Lng32 encodedBoundaryKeyLength)
{
// recognize from the fact that the encoded key length is
// not set yet that this method hasn't been called before
CMPASSERT(encodedBoundaryKeyLength_ == -1);
NABoolean ascending_key_column = TRUE;
ItemExprList *firstKey = new(CmpCommon::statementHeap())
ItemExprList(CmpCommon::statementHeap());
ItemExprList *lastKey = new(CmpCommon::statementHeap())
ItemExprList(CmpCommon::statementHeap());
// set some data members that could not be set before
partKeyColumnCount_ = partitioningKeyOrder.entries();
encodedBoundaryKeyLength_ = encodedBoundaryKeyLength;
Lng32 i;
// for each partitioning key column
for (i = 0; i < partKeyColumnCount_; i++)
{
// get the type of the partitioning key column
const NAType &prototypeType = partitioningKeyOrder[i].getType();
ItemExpr *minVal = new(CmpCommon::statementHeap())
SystemLiteral(&prototypeType,
TRUE /*want min*/,
TRUE /*allow NULL*/);
ItemExpr *maxVal = new(CmpCommon::statementHeap())
SystemLiteral(&prototypeType,
FALSE,
TRUE);
ItemExpr * orderExpr = partitioningKeyOrder[i].getItemExpr();
// add the min/max values to the first and last key
if (orderExpr->getOperatorType() != ITM_INVERSE)
{
// Ascending key column, create min value for first key and
// max value for last key
ascending_key_column = TRUE;
firstKey->insert(minVal);
lastKey->insert(maxVal);
}
else
{
// Descending key column, create max value for first key and
// min value for last key
ascending_key_column = FALSE;
firstKey->insert(maxVal);
lastKey->insert(minVal);
}
// Check if any first key values don't specify this partitioning
// key column. If so, add the MIN or the MAX value for that
// column as the missing first key value.
for (CollIndex j = 1; j < (CollIndex)partitionCount_; j++)
{
if (boundaryValues_[j]->entries() == (CollIndex)i)
{
// This first key does not have enough entries to have
// specified a value for this column. Add it. Add the
// MIN value if ascending, add the MAX value if descending.
// Need to cast away the const-ness to do this - sorry.
if (ascending_key_column)
((ItemExprList*)boundaryValues_[j])->insert(minVal);
else
((ItemExprList*)boundaryValues_[j])->insert(maxVal);
}
}
} // for each partitioning key column
// finally, add the created begin and end key as the first key and
// after the last start key
boundaryValues_[0] = firstKey;
boundaryValues_[partitionCount_] = lastKey;
// do we have any binary boundary values at all?
if (binaryBoundaryValues_.entries())
{
if(!setBinaryBoundaryFirstLastKey_)
{
// yes, make entries for binary encoded keys with all zeroes and
// all ones for the first and last key, respectively
char * binaryMin =
new(heap_) char[encodedBoundaryKeyLength_];
char * binaryMax =
new(heap_) char[encodedBoundaryKeyLength_];
str_pad(binaryMin,encodedBoundaryKeyLength_,'\0');
str_pad(binaryMax,encodedBoundaryKeyLength_,'\377');
binaryBoundaryValues_.insertAt(0,binaryMin);
binaryBoundaryValues_.insertAt(partitionCount_,binaryMax);
setBinaryBoundaryFirstLastKey_ = TRUE;
}
}
else
{
// indicate that there are no binary key values (happens for SQL/MP)
encodedBoundaryKeyLength_ = 0;
}
checkConsistency(partitionCount_);
} // RangePartitionBoundaries::completePartitionBoundaries()
ItemExpr * getRangePartitionBoundaryValues
(const char * keyValueBuffer,
const Lng32 keyValueBufferSize,
NAMemory* heap,
CharInfo::CharSet strCharSet = CharInfo::UTF8
);
void RangePartitionBoundaries::setupForStatement(NABoolean useStringVersion)
{
if(setupForStatement_)
return;
if ( useStringVersion ) {
for(UInt32 i=0; i < boundaryStringsList_.entries(); i++)
{
if (!boundaryStringsList_.used(i) || !boundaryStringsList_[i])
{
boundaryValuesList_.insertAt(i,NULL);
} else {
boundaryValuesList_.insertAt(i,
getRangePartitionBoundaryValues
( boundaryStringsList_[i]->data(),
boundaryStringsList_[i]->length(),
// Warning: use stmt heap so that the returned itemExpr is NOT
// allocated on heap_, which can be the context heap.
// If the allocation is on the context heap, the memory
// will not be deleted (no way to delete a complicated itemExpr)
// during RangePartitionBoundaries::resetAfterStatement() call and
// we can fail the assertion test
// CMPASSERT(sizeAfterLastStatement_ <= initialSize_ )
// in RangePartitionBoundaries::resetAfterStatement()
CmpCommon::statementHeap()
));
}
}
}
for(UInt32 i=0; i < boundaryValuesList_.entries(); i++)
{
bindAddBoundaryValue(i);
}
setupForStatement_ = TRUE;
resetAfterStatement_ = FALSE;
}
void RangePartitionBoundaries::resetAfterStatement()
{
if(resetAfterStatement_)
return;
boundaryValuesList_.clear();
boundaryValues_.clear();
partKeyColumnCount_ = origPartKeyColumnCount_;
partitionCount_ = origPartitionCount_;
setupForStatement_ = FALSE;
resetAfterStatement_ = TRUE;
}
// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
void RangePartitionBoundaries::print(FILE* ofd, const char* indent,
const char* title) const
{
BUMP_INDENT(indent);
char S[500];
int index;
fprintf(ofd,"%s %s\n",NEW_INDENT,title);
for (index = 0; index < partitionCount_; index++)
{
const ItemExprList* iel = boundaryValues_[index];
snprintf(S, sizeof(S), "boundary[%d]: ", index);
if (iel)
iel->print(ofd, indent, S);
else
fprintf(ofd,"%s %s is empty\n",NEW_INDENT,S);
}
for (int index2 = 0; index2 < partitionCount_; index2++)
if (binaryBoundaryValues_.used(index2) &&
binaryBoundaryValues_[index2])
{
const char *binaryVal = binaryBoundaryValues_[index2];
fprintf(ofd, "binary boundary[%d]: 0x", // %#0*",
index2);
for (int b=0; b<encodedBoundaryKeyLength_; b++)
fprintf(ofd, "%02hhx", binaryVal[b]);
fprintf(ofd, "\n");
}
fprintf(ofd,"%s %s (in binary form)\n",NEW_INDENT,title);
Lng32 keyLen = getEncodedBoundaryKeyLength();
for (index = 0; index < partitionCount_; index++)
{
const char* bValues = getBinaryBoundaryValue(index);
for (Int32 j = 0; j < keyLen; j++) {
fprintf(ofd,"%04x ", (int)bValues[j]);
}
}
} // RangePartitionBoundaries::print()
// ***********************************************************************
// RangePartitioningFunction
// ***********************************************************************
RangePartitioningFunction::RangePartitioningFunction(
const RangePartitioningFunction& other,
NAMemory* heap)
: PartitioningFunction(other, heap),
keyColumnList_(other.keyColumnList_),
orderOfKeyValues_(other.orderOfKeyValues_),
originalKeyColumnList_(other.originalKeyColumnList_),
setupForStatement_(FALSE),
resetAfterStatement_(FALSE)
{
partitionBoundaries_ = new(heap)
RangePartitionBoundaries(*(other.partitionBoundaries_),heap);
}
RangePartitioningFunction::~RangePartitioningFunction() {}
Lng32 RangePartitioningFunction::getCountOfPartitions() const
{ return partitionBoundaries_->getCountOfPartitions(); }
const RangePartitioningFunction*
RangePartitioningFunction::castToRangePartitioningFunction() const
{ return this; }
PartitioningRequirement*
RangePartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireRange(this);
}
PartitioningFunction*
RangePartitioningFunction::copy() const
{ return new(CmpCommon::statementHeap()) RangePartitioningFunction(*this); }
// -----------------------------------------------------------------------
// RangePartitioningFunction::normalizePartitioningKeys()
// Rewrite the partitioning keys of the partitioning function in
// terms of the VEGReference for the VEG to which the partitioning
// key column belongs.
// -----------------------------------------------------------------------
void RangePartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
PartitioningFunction::normalizePartitioningKeys(normWARef);
keyColumnList_.normalizeNode(normWARef);
orderOfKeyValues_.normalizeNode(normWARef);
// don't normalize original key col list, avoid VEGies which could
// cause data type changes.
} // RangePartitioningFunction::normalizePartitioningKeys
// -----------------------------------------------------------------------
// RangePartitioningFunction::comparePartFuncToFunc()
// -----------------------------------------------------------------------
COMPARE_RESULT
RangePartitioningFunction::comparePartFuncToFunc
(const PartitioningFunction &other) const
{
// Make use of the implementation of isAGroupingOf. According to the
// definition of a grouping, two partitioning functions must be the
// same if one is a grouping of the other and if they have the same
// number of partitions. This is not a very straightforward way to
// compare two range partitioning functions but it avoids
// duplication of code.
if (isAGroupingOf(other) AND
getCountOfPartitions() == other.getCountOfPartitions())
return SAME;
else
return INCOMPATIBLE;
} // RangePartitioningFunction::comparePartFuncToFunc()
NABoolean RangePartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
const RangePartitioningFunction *oth =
other.castToRangePartitioningFunction();
// a range partitioning function can only be a grouping of
// another range partitioning function
if (oth == NULL)
return FALSE;
// to be a grouping of the other function, this function has to be
// partitioned on the same columns in the same sequence and order
// (a prefix might work in some cases which we don't recognize
// at this point, sorry)
if (orderOfKeyValues_.entries() != oth->orderOfKeyValues_.entries())
return FALSE;
// compare the key columns and their order
for (CollIndex i = 0; i < orderOfKeyValues_.entries(); i++)
{
if (orderOfKeyValues_[i] != oth->orderOfKeyValues_[i])
{
// value ids are different, but this may just be that we
// have different inversion expressions
if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() !=
ITM_INVERSE OR
oth->orderOfKeyValues_[i].getItemExpr()->getOperatorType() !=
ITM_INVERSE OR
orderOfKeyValues_[i].getItemExpr()->child(0) !=
oth->orderOfKeyValues_[i].getItemExpr()->child(0))
return FALSE; // really different columns
}
}
// -----------------------------------------------------------------
// Compare the partition boundaries
// -----------------------------------------------------------------
return partitionBoundaries_->isAGroupingOf(*oth->partitionBoundaries_,
maxPartsPerGroup);
}
// -----------------------------------------------------------------------
// RangePartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void RangePartitioningFunction::createPartitioningKeyPredicates()
{
if (NOT partKeyPredsCreated())
{
char fabricatedName[20]; // a name for the host variable
char* S = fabricatedName;
CollIndex nCols = keyColumnList_.entries();
ValueIdSet setOfKeyPredicates;
ValueIdList partInputValues(2*nCols+1);
ValueIdList loValues;
ValueIdList hiValues;
// -----------------------------------------------------------------
// create partition input variables: one low value for each range
// key column, one hi value for each range key column, and one
// inclusion indicator column
// -----------------------------------------------------------------
for (CollIndex beginEnd = 0; beginEnd < 2; beginEnd++)
for (CollIndex i = 0; i < nCols; i++)
{
ValueId vid = originalKeyColumnList_[i];
// -- Fabricate a name for the first host variable
if (beginEnd == 0)
sprintf(S,"_sys_HostVarLo%d",i);
else
sprintf(S,"_sys_HostVarHi%d",i);
// -- Build a host var node with the key column's type
HostVar *hv = new(CmpCommon::statementHeap())
HostVar(S, &(vid.getType()), TRUE);
hv->synthTypeAndValueId();
// insert the partition input variable
partInputValues.insert(hv->getValueId());
if (beginEnd == 0)
loValues.insert(hv->getValueId());
else
hiValues.insert(hv->getValueId());
}
// The inclusion indicator is a 4-byte integer, no NULLs allowed.
// At execution time it is set to a non-zero value if the end
// key is excluded and to a zero value if it is included.
ItemExpr *intervalExclusionIndicator =
new(CmpCommon::statementHeap()) HostVar(
"_sys_hostVarExclRange",
new(CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE),
TRUE);
intervalExclusionIndicator->synthTypeAndValueId();
partInputValues.insert(intervalExclusionIndicator->getValueId());
// -----------------------------------------------------------------
// Create the semantic equivalent of multi-valued predicates, taking
// the order of keys into account (which can't be expressed in
// "normal" multi-valued predicates). Set the "special nulls"
// property for all comparison operators that involve nullable
// columns, since we do not want to filter out any NULL values.
// -----------------------------------------------------------------
// -----------------------------------------------------------------
// a GREATER_EQ multi-valued predicate for the begin key
// -----------------------------------------------------------------
CollIndex i = keyColumnList_.entries()-1;
CollHeap *h = CmpCommon::statementHeap();
// the comparison operator for the current column, taking the
// order of the column into account
OperatorTypeEnum compOp,extraOp;
ItemExpr *kc = keyColumnList_[i].getItemExpr();
NABoolean nullable =
keyColumnList_[i].getType().supportsSQLnullLogical();
// start with a greater or equal comparison for the last key
// column (may be the only key column), use less equal if the
// column is descending
if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
ITM_INVERSE)
{
compOp = ITM_LESS_EQ;
extraOp = ITM_GREATER_EQ;
}
else
{
compOp = ITM_GREATER_EQ;
extraOp = ITM_LESS_EQ;
}
ItemExpr * beginPred = new(h) BiRelat(compOp,
kc,
loValues[i].getItemExpr(),
nullable,
TRUE);
// There are now two end predicates generated. The tri-relational predicate
// is the one that enforces the inclusive comparison for the last partition,
// and only exclusive comparison for the other partitions. Thus this
// accurately models way partitions are setup.
// The extraEndPred has been added (HL 6/20/2001) to form a convenient key predicate
// for access by a B-Tree index. Although this is always inclusive, and thus
// will select extra data (one extra uec) for all partitions except the last,
// the extra data will be filtered out by the tri-relational operator.
ItemExpr * extraEndPred = new(h) BiRelat(extraOp,
kc,
hiValues[i].getItemExpr(),
nullable,
TRUE);
// for all other key columns, add (ai > bi) or (ai = bi) and (tfm)
while (i--)
{
if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
ITM_INVERSE)
{
compOp = ITM_LESS;
extraOp = ITM_GREATER;
}
else
{
compOp = ITM_GREATER;
extraOp = ITM_LESS;
}
kc = keyColumnList_[i].getItemExpr();
nullable =
keyColumnList_[i].getType().supportsSQLnullLogical();
beginPred = new(h) BiLogic(
ITM_OR,
new(h) BiRelat(compOp,
kc,
loValues[i].getItemExpr(),
nullable,
TRUE),
new(h) BiLogic(ITM_AND,
new(h) BiRelat(
ITM_EQUAL,
kc,
loValues[i].getItemExpr(),
nullable,
TRUE),
beginPred));
extraEndPred = new(h) BiLogic(
ITM_OR,
new(h) BiRelat(extraOp,
kc,
hiValues[i].getItemExpr(),
nullable,
TRUE),
new(h) BiLogic(ITM_AND,
new(h) BiRelat(
ITM_EQUAL,
kc,
hiValues[i].getItemExpr(),
nullable,
TRUE),
extraEndPred));
}
// -----------------------------------------------------------------
// a less or less-equal predicate for the end key
// -----------------------------------------------------------------
// reset to the end of the key column list
i = keyColumnList_.entries()-1;
// Start with a tri-relational comparison for the last key
// column (may be the only key column), it's a LESS comparison
// if the exclusion indicator is not zero, or LESS_EQUAL otherwise.
// Note: replace "less" with "greater" for descending columns.
if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
ITM_INVERSE)
compOp = ITM_GREATER_OR_GE;
else
compOp = ITM_LESS_OR_LE;
kc = keyColumnList_[i].getItemExpr();
nullable =
keyColumnList_[i].getType().supportsSQLnullLogical();
TriRelational * tr = new(h) TriRelational(
compOp,
kc,
hiValues[i].getItemExpr(),
new(h) BiRelat(ITM_NOT_EQUAL,
new(h) SystemLiteral(0),
intervalExclusionIndicator));
tr->setSpecialNulls(nullable);
ItemExpr *endPred = tr;
// for all other key columns, add (ai < bi) or (ai = bi) and (tfm)
while (i--)
{
if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
ITM_INVERSE)
compOp = ITM_GREATER;
else
compOp = ITM_LESS;
kc = keyColumnList_[i].getItemExpr();
nullable =
keyColumnList_[i].getType().supportsSQLnullLogical();
endPred = new(h) BiLogic(
ITM_OR,
new(h) BiRelat(compOp,
kc,
hiValues[i].getItemExpr(),
nullable),
new(h) BiLogic(ITM_AND,
new(h) BiRelat(
ITM_EQUAL,
kc,
hiValues[i].getItemExpr(),
nullable),
endPred));
}
endPred->synthTypeAndValueId();
beginPred->synthTypeAndValueId();
extraEndPred->synthTypeAndValueId();
setOfKeyPredicates += beginPred->getValueId();
setOfKeyPredicates += endPred->getValueId();
setOfKeyPredicates += extraEndPred->getValueId();
// -- Store the set of key predicates and the partition input values
// in the partitioning attributes.
storePartitioningKeyPredicates(setOfKeyPredicates);
storePartitionInputValues(partInputValues);
}
} // RangePartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
// RangePartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void RangePartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Overwrite the old pivs, part key preds, and part expr. with the new ones.
storePartitionInputValues(newPivs);
storePartitioningKeyPredicates(newPartKeyPreds);
} // RangePartitioningFunction::replacePivs()
PartitioningFunction * RangePartitioningFunction::scaleNumberOfPartitions(
Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
// the power of delegation
suggestedNewNumberOfPartitions =
partitionBoundaries_->scaleNumberOfPartitions(
suggestedNewNumberOfPartitions,
getNodeMap(),
partGroupDist);
return this;
}
// -----------------------------------------------------------------------
// RangePartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void RangePartitioningFunction::remapIt
(const PartitioningFunction* opf,
ValueIdMap& map, NABoolean mapItUp)
{
PartitioningFunction::remapIt(opf, map,mapItUp);
// If we have arrived here, the original partitioning function (*opf)
// MUST be a RangePartitioningFunction().
CMPASSERT(opf->castToRangePartitioningFunction());
// Clear because rewrite insists on it being so.
keyColumnList_.clear();
if (mapItUp)
{
map.rewriteValueIdListUp(keyColumnList_,
opf->castToRangePartitioningFunction()->
keyColumnList_);
}
else
{
map.rewriteValueIdListDown(opf->castToRangePartitioningFunction()->
keyColumnList_,
keyColumnList_);
}
// Clear because rewrite insists on it being so.
orderOfKeyValues_.clear();
if (mapItUp)
{
map.rewriteValueIdListUp(orderOfKeyValues_,
opf->castToRangePartitioningFunction()->
orderOfKeyValues_);
}
else
{
map.rewriteValueIdListDown(opf->castToRangePartitioningFunction()->
orderOfKeyValues_,
orderOfKeyValues_);
}
// do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL
} // RangePartitioningFunction::remapIt()
// -----------------------------------------------------------------------
// RangePartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
RangePartitioningFunction::createPartitioningFunctionForIndexDesc
(IndexDesc *idesc) const
{
const NAFileSet * fileSet = idesc->getNAFileSet();
const NAColumnArray & allColumns = fileSet->getAllColumns();
const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();
Lng32 ixColNumber;
ValueId keyValueId;
ValueIdSet partitioningKey;
ValueIdList partitioningKeyList;
ValueIdList orderOfPartKeyValues;
RangePartitionBoundaries * partBounds;
CollIndex i = 0;
for (i = 0; i < partKeyColumns.entries(); i++)
{
// which column of the index is this (usually this will be == i)
ixColNumber = allColumns.index(partKeyColumns[i]);
// insert the value id of the index column into the partitioning
// key column value id list
keyValueId = idesc->getIndexColumns()[ixColNumber];
partitioningKey += keyValueId;
partitioningKeyList.insertAt(i,keyValueId);
// insert the same value id into the order list, if the column
// is in ascending order, otherwise insert the inverse of the
// column
if (partKeyColumns.isAscending(i))
{
orderOfPartKeyValues.insert(keyValueId);
}
else
{
InverseOrder *invExpr = new(idesc->wHeap())
InverseOrder(keyValueId.getItemExpr());
invExpr->synthTypeAndValueId();
orderOfPartKeyValues.insert(invExpr->getValueId());
}
} // end loop over partitioning key columns
// -----------------------------------------------------------------
// Allocate new range partition boundaries.
// -----------------------------------------------------------------
partBounds = new(idesc->wHeap()) RangePartitionBoundaries
(*getRangePartitionBoundaries(),
idesc->wHeap());
// ---------------------------------------------------------------------
// Determine the minimum number of partitioning keys based on the
// start key values that are specified. Columns for which no explicit
// start key values are specified need not be part of the part key.
// Adjust our bookkeeping if necessary. Remove this logic if it can
// be moved to CATMAN.
// ---------------------------------------------------------------------
CollIndex numPartKeyCols = partBounds->getOptimizedNumberOfPartKeys();
if (partitioningKeyList.entries() > numPartKeyCols)
{
while (partitioningKeyList.entries() > numPartKeyCols)
{
partitioningKeyList.removeAt(numPartKeyCols);
orderOfPartKeyValues.removeAt(numPartKeyCols);
}
partitioningKey.clear();
partitioningKey.insertList(partitioningKeyList);
}
// -----------------------------------------------------------------
// now compute the encoded key length of all the boundary key columns
// (there are no fillers between the encoded key values, so it's ok
// to just add the encoded lengths of the individual columns)
// -----------------------------------------------------------------
Lng32 encodedBoundaryKeyLength = 0;
for (i = 0; i < orderOfPartKeyValues.entries(); i++)
{
encodedBoundaryKeyLength +=
partitioningKeyList[i].getType().getEncodedKeyLength();
}
// -----------------------------------------------------------------
// Add the first and the last boundary (0 and numberOfPartitions)
// at the ends that do not separate two partitions
// -----------------------------------------------------------------
partBounds->completePartitionBoundaries(
orderOfPartKeyValues,
encodedBoundaryKeyLength);
// -----------------------------------------------------------------
// Allocate a new RangePartitioningFunction.
// -----------------------------------------------------------------
RangePartitioningFunction *partFunc
= new(idesc->wHeap()) RangePartitioningFunction
(partitioningKey,
partitioningKeyList,
orderOfPartKeyValues,
partBounds,
getNodeMap()->copy(idesc->wHeap()));
partFunc->createPartitioningKeyPredicates();
return partFunc;
} // RangePartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
// RangePartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* RangePartitioningFunction::createPartitioningExpression()
{
if (getExpression()) // already constructed?
return getExpression(); // reuse it!
// create a lookup function that takes the partitioning key and looks
// up its partition number in the array of split ranges provided by
// this partitioning function.
// first, encode all key values (for asc or desc order) and concatenate
// the encoded strings
ItemExpr *encKey = NULL;
for (CollIndex i=0; i < keyColumnList_.entries(); i++)
{
NABoolean descOrder =
(orderOfKeyValues_[i].getItemExpr()->getOperatorType() == ITM_INVERSE);
ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
if (dataConversionErrorFlag == 0)
{
dataConversionErrorFlag =
new (CmpCommon::statementHeap()) HostVar(
"_sys_repartConvErrorFlg",
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE),
TRUE);
storeConvErrorExpr(dataConversionErrorFlag);
}
// cast the key column to the exact type of the original key column
const NAType &oType = originalKeyColumnList_[i].getType();
ItemExpr *c = new (CmpCommon::statementHeap()) Narrow(
keyColumnList_[i].getItemExpr(),
dataConversionErrorFlag,
&oType);
// form the key encoding of the key column (a character string)
ItemExpr *e = new (CmpCommon::statementHeap()) CompEncode (c,descOrder);
// concatenate the individual key encodings of the key columns
if (encKey == NULL)
encKey = e;
else
encKey = new (CmpCommon::statementHeap()) Concat(encKey,e);
}
// give the concatenated key encodings as an input to the range lookup function
ItemExpr * partFunc = new (CmpCommon::statementHeap()) RangeLookup(
encKey,this);
partFunc->synthTypeAndValueId();
storeExpression(partFunc);
return partFunc;
}
// -----------------------------------------------------------------------
// RangePartitioningFunction::shouldUseSynchronousAccess()
// -----------------------------------------------------------------------
NABoolean RangePartitioningFunction::shouldUseSynchronousAccess(
const ReqdPhysicalProperty* rpp,
const EstLogPropSharedPtr& inputLogProp,
GroupAttributes* ga) const
{
NABoolean shouldUseSynchronousAccess = FALSE;
NABoolean synchronousAccessForced = FALSE;
NABoolean tooManyPAs = FALSE;
ValueIdList partKey = getOrderOfKeyValues();
// Remove from the partitioning key any columns that are covered by
// constants or input values.
partKey.removeCoveredExprs(ga->getCharacteristicInputs());
const LogicalPartitioningRequirement *lpr =
rpp->getLogicalPartRequirement();
// Don't do synchronous access if the user is forcing a PAPA node
// and is not forcing the number of PAs
if ((lpr != NULL) AND lpr->getMustUsePapa() AND
(lpr->getNumClientsReq() == ANY_NUMBER_OF_PARTITIONS))
return FALSE;
// See if the user is trying to force synchronous access.
if ((CmpCommon::getDefault(ATTEMPT_ASYNCHRONOUS_ACCESS) == DF_OFF) OR
((lpr != NULL) AND
(lpr->getNumClientsReq() != ANY_NUMBER_OF_PARTITIONS) AND
(lpr->getNumClientsReq() < getCountOfPartitions())))
{
shouldUseSynchronousAccess = TRUE;
synchronousAccessForced = TRUE;
}
// Synchronous access is a good idea and is ok if there is a required order
// and/or arrangement that came from an operator that is not in DP2,
// and the required order and/or arrangement columns are a leading
// prefix of the partitioning key columns, and enough data is being
// returned that the PA buffers will fill up and so the PA operators
// will block until all data from all preceding partitions (in the
// required order) has been returned. Note that whether the required
// order or arrangement is a leading prefix of the clustering key
// columns will be checked later (by the satisfied method).
// NOTE: we cannot force synchronous access if there is a required
// logical order and/or arrangement! If the code below decides that
// synchronous access is not possible, and the user is trying to
// force synchronous access, then they are out of luck!
if (rpp->getLogicalOrderOrArrangementFlag())
{
if ((rpp->getSortKey() != NULL) AND
(rpp->getArrangedCols() != NULL))
{
// We have both a required order and an arrangement.
if ((partKey.satisfiesReqdOrder(*rpp->getSortKey()) == SAME_ORDER) AND
partKey.satisfiesReqdArrangement(*rpp->getArrangedCols()))
shouldUseSynchronousAccess = TRUE;
else
shouldUseSynchronousAccess = FALSE;
}
else if (rpp->getSortKey() != NULL)
{
// Only a required order.
OrderComparison oc = partKey.satisfiesReqdOrder(*rpp->getSortKey());
NABoolean okToUseSerialOrder = FALSE;
if (CmpCommon::getDefault(ATTEMPT_REVERSE_SYNCHRONOUS_ORDER) != DF_ON)
{
okToUseSerialOrder = (oc == SAME_ORDER);
}
else
{
okToUseSerialOrder = (oc == SAME_ORDER || oc == INVERSE_ORDER);
}
if (okToUseSerialOrder)
shouldUseSynchronousAccess = TRUE;
else
shouldUseSynchronousAccess = FALSE;
}
else if (rpp->getArrangedCols() != NULL)
{
// Only a required arrangement.
if (partKey.satisfiesReqdArrangement(*rpp->getArrangedCols()))
shouldUseSynchronousAccess = TRUE;
else
shouldUseSynchronousAccess = FALSE;
}
}
if (shouldUseSynchronousAccess AND NOT synchronousAccessForced)
{
// Don't do synchronous access if there is a dp2 sort order
// partitioning requirement, as this means we want a DP2 sort
// order, and the main reason for requiring a DP2 sort order
// is to avoid having to access the data synchronously.
if (rpp->getDp2SortOrderPartReq() != NULL)
shouldUseSynchronousAccess = FALSE;
}
if (shouldUseSynchronousAccess AND NOT synchronousAccessForced)
{
// There was a required order or arrangement that could potentially
// need us to access the data synchronously. If synch. access is not
// forced, see if it is a good idea or not.
// get the maximum number of PAs per process that can be allowed.
Int32 maxPAsPerProcess =
(Int32) CmpCommon::getDefaultNumeric(MAX_ACCESS_NODES_PER_ESP);
if (getCountOfPartitions() > maxPAsPerProcess)
{
// Get the logical part requirement, if one exists.
PartitioningRequirement *logPartReq = NULL;
PartitioningFunction *requiredPartFunc = NULL;
if (lpr != NULL)
{
logPartReq = lpr->getLogReq();
if (logPartReq AND logPartReq->
castToFullySpecifiedPartitioningRequirement())
requiredPartFunc = logPartReq->
castToFullySpecifiedPartitioningRequirement()->
getPartitioningFunction();
}
// Determine if we might exceed the maximum number of PAs allowed
// to a process. If we will, then we definitely want to use
// synchronous access if there is an order to preserve.
// The first check is the most restrictive - this is for logical
// subpartitioning. If we do logical subpartitioning, it is possible
// one process will have to access all the partitions, and there
// is nothing we can do about it later.
if (logPartReq != NULL AND
requiredPartFunc AND
requiredPartFunc->castToLogPhysPartitioningFunction() AND
requiredPartFunc->castToLogPhysPartitioningFunction()->
getLogPartType() ==
LogPhysPartitioningFunction::LOGICAL_SUBPARTITIONING)
tooManyPAs = TRUE;
else if ((logPartReq != NULL) AND
(getCountOfPartitions() >
(logPartReq->getCountOfPartitions() * maxPAsPerProcess)))
tooManyPAs = TRUE;
else if ((getCountOfPartitions() >
(rpp->getCountOfPipelines() * maxPAsPerProcess)))
tooManyPAs = TRUE;
}
if (NOT tooManyPAs)
{
// Now check to see how much data will be returned. If it is less
// than twice the amount of data that it takes to fill up all
// the PA buffers, than we will be able to access the majority
// of the data asynchronously before the buffers fill up. So,
// in this case assume that it is not a good idea to access the
// data synchronously. Otherwise, synchronous access is a good
// idea (if it passed all the other checks) because we will be
// accessing the majority of the data after the PA buffers fill up
// and block, and so we will end up accessing the data synchronously
// anyway, so we would rather not pay for all those PA buffers.
// Compute size of data returned in KB. Allocate 16 bytes per
// record for overhead.
CostScalar totalsize =
(ga->outputLogProp(inputLogProp)->getResultCardinality() *
(ga->getRecordLength() + 16.0));
// Compute the size of all the PA buffers.
double buffersize = CmpCommon::getDefaultNumeric(GEN_PA_BUFFER_SIZE);
// A PA buffer can not be larger than 56K. If the table has remote
// partitions, then the actual limit is 31K, so we really should
// check for remote partitions. That would require the index
// descriptor, which we don't have.
buffersize = MINOF (buffersize, 56000);
// number of PA output buffers is always at least one less than the
// total number of pa buffers, as at least one is reserved for input.
double numOutputBuffers =
MAXOF(CmpCommon::getDefaultNumeric(GEN_PA_NUM_BUFFERS) - 1,1);
// Subtract 2000 bytes per buffer for headers.
buffersize = (buffersize * numOutputBuffers) -
(2000 * numOutputBuffers);
if (totalsize < (getCountOfPartitions() * 2.0 * buffersize))
shouldUseSynchronousAccess = FALSE;
}
} // end if order or arrangement that could be preserved with synch. access
return shouldUseSynchronousAccess;
}
SearchKey *
RangePartitioningFunction::createSearchKey(const IndexDesc *indexDesc,
ValueIdSet availInputs,
ValueIdSet additionalPreds) const
{
ValueIdSet preds(getPartitioningKeyPredicates());
ValueIdSet nonKeyColumnSet; // empty set
availInputs += getPartitionInputValues();
preds += additionalPreds;
// make a search key from all that
SearchKey *partSearchKey = new (CmpCommon::statementHeap())
SearchKey(
indexDesc->getPartitioningKey(),
indexDesc->getOrderOfPartitioningKeyValues(),
availInputs,
TRUE, // there isn't really a scan direction for partKey
preds,
this,
nonKeyColumnSet,
indexDesc);
return partSearchKey;
}
/////////////////////////////////////////////////////////////
// Test condition C2.2 (co-partitioned), which is defined as:
// 1. All tables share the same number of partitioned key columns;
// 2. Corresponding partition key columns across all tables are
// of same type;
// 3. All tables should be partitioned into the same number of ranges;
// 4. Corresponding partitions are bounded by identical upper and
// lower boundaries and reside on the same DP2.
/////////////////////////////////////////////////////////////
NABoolean
RangePartitioningFunction::partFuncAndFuncPushDownCompatible(
const PartitioningFunction& x) const
{
const RangePartitioningFunction* other =
x.castToRangePartitioningFunction();
if ( other == NULL ) return FALSE;
Lng32 thisKeyCount = getPartitioningKey().entries();
Lng32 otherKeyCount = other->getPartitioningKey().entries();
// same key count
if (thisKeyCount != otherKeyCount)
return FALSE;
Lng32 thisPartCount = getCountOfPartitions();
Lng32 otherPartCount = other->getCountOfPartitions();
// same part count
if (thisPartCount != otherPartCount)
return FALSE;
// same part column type
for (Lng32 index = 0; index < thisKeyCount; index++)
{
if ( NOT getKeyColumnList()[index].getType().isCompatible
(
other->getKeyColumnList()[index].getType()
)
)
return FALSE;
}
// same boundaries
if ( getRangePartitionBoundaries()->
compareRangePartitionBoundaries
(*(other->getRangePartitionBoundaries())) == FALSE
)
return FALSE;
return TRUE;
}
// -----------------------------------------------------------------------
// RangePartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void RangePartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
ValueIdSet noExternalInputs;
PartitioningFunction::preCodeGen(availableValues);
keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs,
FALSE, NULL, TRUE);
} // RangePartitioningFunction::preCodeGen()
void RangePartitioningFunction::setupForStatement()
{
if(setupForStatement_)
return;
PartitioningFunction::setupForStatement();
if(partitionBoundaries_)
partitionBoundaries_->setupForStatement(TRUE /* use string */);
setupForStatement_ = TRUE;
resetAfterStatement_ = FALSE;
}
void RangePartitioningFunction::resetAfterStatement()
{
if(resetAfterStatement_)
return;
PartitioningFunction::resetAfterStatement();
keyColumnList_.clear();
orderOfKeyValues_.clear();
originalKeyColumnList_.clear();
if(partitionBoundaries_)
partitionBoundaries_->resetAfterStatement();
setupForStatement_ = FALSE;
resetAfterStatement_ = TRUE;
}
// -----------------------------------------------------------------------
// Methods for debugging.
// -----------------------------------------------------------------------
const NAString RangePartitioningFunction::getText() const
{
NAString result("range partitioned ", CmpCommon::statementHeap());
char nparts[20];
sprintf(nparts,"%d",getCountOfPartitions());
result += nparts;
result += " ways on (";
orderOfKeyValues_.unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
result += ")";
if ( partitionBoundaries_ ) {
result += " with boundaries(";
for (Int32 index = 0; index < partitionBoundaries_->getCountOfPartitions(); index++)
{
const ItemExprList* iel = partitionBoundaries_->getBoundaryValues(index);
for (Int32 j=0; j<iel->entries(); j++) {
ItemExpr* ie = (*iel)[j];
if ( ie->getOperatorType() == ITM_CONSTANT ) {
ConstValue* cv = (ConstValue*)ie;
result += "c(";
result += cv->getTextForQuery(QUERY_FORMAT);
result += ")";
} else {
result += "nc(";
result += ie->getText();
result += ")";
}
result += " ";
}
if ( index < partitionBoundaries_->getCountOfPartitions()-1 )
result += ";";
}
result += ")";
/* enable this for debugging of binary key problems
result += " binary (";
Lng32 encodedBoundaryKeyLength = partitionBoundaries_->getEncodedBoundaryKeyLength();
char hexDigits[4];
for (Int32 index2 = 0; index2 < partitionBoundaries_->getCountOfPartitions(); index2++)
{
const char * binaryVal = partitionBoundaries_->getBinaryBoundaryValue(index2);
if (binaryVal)
{
if (index2 > 0)
result += ", ";
result += "b(0x";
for (int b=0; b<encodedBoundaryKeyLength; b++)
{
snprintf(hexDigits, sizeof(hexDigits), "%02hhx", binaryVal[b]);
result += hexDigits;
}
result += ")";
}
}
result += ")";
end of code for binary keys */
}
return result;
}
void RangePartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent, "RangePartitioningFunction");
if (orderOfKeyValues_.entries() > 0)
{
fprintf(ofd,"Ascending/descending order :\n");
for (CollIndex index = 0; index < orderOfKeyValues_.entries(); index++)
{
if (orderOfKeyValues_[index].getItemExpr()->getOperatorType()
== ITM_INVERSE)
fprintf(ofd,"D ");
else
fprintf(ofd,"A ");
if (((index/9)* 9 == index) OR (index == orderOfKeyValues_.entries()-1))
fprintf(ofd,"\n");
}
}
partitionBoundaries_->print(ofd, indent, title);
} // RangePartitioningFunction::print()
NABoolean
compareEncodedKey(const char* low, const char* key, const char* high, Int32 keyLen, NABoolean checkLast)
{
Int32 cmpLow = memcmp(low, key, keyLen);
Int32 cmpHigh = memcmp(key, high, keyLen);
if ( cmpLow <= 0 && cmpHigh < 0 )
return TRUE;
return (checkLast && cmpLow <= 0 && cmpHigh <= 0);
}
NABoolean
compareAsciiKey(const char* low, const char* key, const char* high, Int32, NABoolean checkLast)
{
Int32 cmpLow = strverscmp(low, key);
Int32 cmpHigh = strverscmp(key, high);
if ( cmpLow <= 0 && cmpHigh < 0 )
return TRUE;
return (checkLast && cmpLow <= 0 && cmpHigh <= 0);
}
// find a boundary pair [low, high) with smallest low value in which keys fall, and return the
// index of the boundary low. Return -1 otherwise, or the key lengths are different.
Int32 RangePartitionBoundaries::findBeginBoundary(char* encodedKey, Int32 keyLen,
compFuncPtrT compFunc) const
{
// boundaries are stored in entries in the range [0, partitionCount_]
for (Lng32 i=0; i<=partitionCount_-1; i++ ) {
const char* low = getBinaryBoundaryValue(i);
const char* high = getBinaryBoundaryValue(i+1);
// test if encodedKey is in [low, high)
if ( (*compFunc)(low, encodedKey, high, keyLen, i==partitionCount_-1) )
return i;
}
return -1;
}
// find a boundary pair [low, high) with the largest low value in which keys fall, and return the
// index of the boundary low. Return -1 otherwise, or the key lengths are different.
Int32 RangePartitionBoundaries::findEndBoundary(char* encodedKey, Int32 keyLen,
compFuncPtrT compFunc) const
{
// boundaries are stored in entries in the range [0, partitionCount_]
for (Lng32 i=partitionCount_-1; i>= 0; i--) {
const char* low = getBinaryBoundaryValue(i);
const char* high = getBinaryBoundaryValue(i+1);
// test if encodedKey is in [low, high)
if ( (*compFunc)(low, encodedKey, high, keyLen, i==partitionCount_-1) )
return i;
}
return -1;
}
Int32
RangePartitioningFunction::computeNumOfActivePartitions(SearchKey* skey, const TableDesc* tDesc) const
{
const RangePartitionBoundaries* boundaries = getRangePartitionBoundaries();
Int32 origPartitions = getCountOfPartitions();
Int32 partitions = origPartitions;
Int32 bIndex = 0;
const NATable* naTable = tDesc->getNATable();
if ( naTable->isHiveTable() )
return origPartitions;
NABoolean isNativeHbase = (naTable->isHbaseCellTable() || naTable->isHbaseRowTable());
compFuncPtrT compFuncPtr = ( isNativeHbase ) ? compareAsciiKey: compareEncodedKey;
char* buf = NULL;
Int32 len = 0;
const ValueIdList& beginKey = skey->getBeginKeyValues();
if ( beginKey.computeEncodedKey(tDesc, FALSE, buf, len) ) {
bIndex = boundaries->findBeginBoundary(buf, len, compFuncPtr);
if ( bIndex < 0 )
return origPartitions; // error in deciding the partiton
else
partitions -= bIndex; // bIndex is 0 based.
}
const ValueIdList& endKey = skey->getEndKeyValues();
if ( endKey.computeEncodedKey(tDesc, TRUE, buf, len) ) {
Int32 eIndex = boundaries->findEndBoundary(buf, len, compFuncPtr);
if ( eIndex < 0 ) // error in deciding the partition.
return origPartitions;
if ( eIndex >= bIndex ) //eIndex is also 0 based
partitions -= (getCountOfPartitions() - eIndex - 1);
else
return origPartitions; // end partition is preceeding the start partition!
}
return partitions;
}
// ***********************************************************************
// LogPhysPartitioningFunction
// ***********************************************************************
LogPhysPartitioningFunction::LogPhysPartitioningFunction(
PartitioningFunction * logPartFunc,
PartitioningFunction * physPartFunc,
logPartType logPartType,
Lng32 numOfClients,
NABoolean usePapa,
NABoolean synchronousAccess,
NAMemory* heap)
: PartitioningFunction(LOGPHYS_PARTITIONING_FUNCTION,heap),
logPartFunc_(logPartFunc),physPartFunc_(physPartFunc),
realPartFunc_(NULL),logPartType_(logPartType),
numOfClients_(numOfClients),usePapa_(usePapa),
synchronousAccess_(synchronousAccess)
{
ValueIdSet partKey(logPartFunc->getPartitioningKey());
partKey += physPartFunc->getPartitioningKey();
setPartKey(partKey);
}
LogPhysPartitioningFunction::~LogPhysPartitioningFunction()
{}
const LogPhysPartitioningFunction *
LogPhysPartitioningFunction::castToLogPhysPartitioningFunction() const
{
return this;
}
Lng32 LogPhysPartitioningFunction::getCountOfPartitions() const
{
return physPartFunc_->getCountOfPartitions();
}
PartitioningRequirement*
LogPhysPartitioningFunction::makePartitioningRequirement()
{
CMPASSERT(physPartFunc_);
return physPartFunc_ -> makePartitioningRequirement();
}
// -----------------------------------------------------------------------
// LogPhysPartitioningFunction::getNodeMap()
// Return appropriate node map from underlying partitioning function.
// -----------------------------------------------------------------------
const NodeMap*
LogPhysPartitioningFunction::getNodeMap() const
{
// --------------------------------------------------------------------------
// If no "real" partitioning function exists, extract node map from
// underlying physical partitioning function; otherwise extract node map from
// "real" partitioning function.
// --------------------------------------------------------------------------
if (realPartFunc_ == NULL)
{
return physPartFunc_->getNodeMap();
}
else
{
return realPartFunc_->getNodeMap();
}
} // PartitioningFunction::getNodeMap()
// get any existing (logical or physical) nodemap (or synthesize one) that
// matches logPartFunc_'s partition count requirement
NodeMap* LogPhysPartitioningFunction::getOrMakeSuitableNodeMap
(NABoolean forESP) const
{
// we are the LogPhysPartitioningFunction of the synthesized physical
// property of an Exchange's child (that executes in DP2).
// if logical partn func's nodemap entry count == log partn func partn count
// then return logical partn func's nodemap
NodeMap *nodemap = (NodeMap*)logPartFunc_->getNodeMap();
ULng32 partCnt=(ULng32)logPartFunc_->getCountOfPartitions();
if (nodemap && nodemap->getNumEntries() == partCnt) {
// the logical partitioning of the result already has a suitable nodemap.
// fall thru and use it.
}
else {
// logical partitioning function has no nodemap. try to find one in the
// physical partitioning function of the child.
// if phys partn func's nodemap entry count == log partn func partn count
// then return phys partn func's nodemap.
nodemap = (NodeMap*)physPartFunc_->getNodeMap();
if (nodemap && nodemap->getNumEntries() == partCnt) {
// this can happen in "select * from t035t6;" and other queries against
// vertically partitioned tables like t035t6 (see regress/core/test035).
// fall thru and use it.
}
else {
// only as a last resort do we synthesize a new nodemap
nodemap = nodemap->synthesizeLogicalMap(partCnt, forESP);
}
}
if (logPartFunc_->castToReplicateNoBroadcastPartitioningFunction()) {
for(CollIndex i = 0; i < partCnt; i++) {
nodemap->setPartitionState(i, NodeMapEntry::ACTIVE);
}
}
return nodemap;
}
PartitioningFunction* LogPhysPartitioningFunction::copy() const
{
return new(CmpCommon::statementHeap()) LogPhysPartitioningFunction(*this);
}
NABoolean
LogPhysPartitioningFunction::canProducePartitioningKeyPredicates() const
{
// this is an exception to the rule: for a LogPhysPartitioningFunction
// we assume that there is no need to enforce the physical partitioning
// key predicates, since each DP2 partition has only its own data
// anyway. Therefore, for the purpose of evaluating partitioning key
// predicates and partition input values, we deal with the logical
// partitioning function only, and only in the case where the PA
// node isn't doing the grouping.
// we can do this if we can produce the "real" partitioning function
// and if the real one can produce part key preds
return (logPartFunc_->canProducePartitioningKeyPredicates() OR
logPartType_ == PA_PARTITION_GROUPING OR
logPartType_ == PA_GROUPED_REPARTITIONING);
}
void LogPhysPartitioningFunction::createPartitioningKeyPredicates()
{
CMPASSERT(logPartFunc_->canProducePartitioningKeyPredicates());
// see comment in canProducePartitioningKeyPredicates() above
storePartitioningKeyPredicates(logPartFunc_->getPartitioningKeyPredicates());
storePartitionInputValues(logPartFunc_->getPartitionInputValuesLayout());
}
// -----------------------------------------------------------------------
// LogPhysPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void LogPhysPartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Overwrite the old pivs, part key preds, and part expr. with the new ones.
storePartitionInputValues(newPivs);
storePartitioningKeyPredicates(newPartKeyPreds);
// Also overwrite them in the logical partitioning function.
logPartFunc_->replacePivs(newPivs,newPartKeyPreds);
} // LogPhysPartitioningFunction::replacePivs()
ItemExpr* LogPhysPartitioningFunction::createPartitioningExpression()
{
CMPASSERT(0); // should never reach here, it's not legal to do a
// repartitioning for a logphys part func
return NULL;
}
PartitioningFunction*
LogPhysPartitioningFunction::createRealPartitioningFunction()
{
if (realPartFunc_)
return realPartFunc_;
switch (logPartType_)
{
case PA_PARTITION_GROUPING:
case PA_GROUPED_REPARTITIONING:
// for partition grouping or repartitioning we never split any
// physical partitions and we are processing only physical
// partitions at this point (logical partitions come into the
// picture once we reach the DP2 exchange)
realPartFunc_ = physPartFunc_;
break;
case LOGICAL_SUBPARTITIONING:
{
// Merge the split ranges of logical and physical partitioning
// functions, so we can count the exact number of partitions.
const RangePartitioningFunction *lpf =
logPartFunc_->castToRangePartitioningFunction();
const RangePartitioningFunction *ppf =
physPartFunc_->castToRangePartitioningFunction();
// Log and phys partitioning functions must have the same key
// to be able to produce a common function
if (lpf AND ppf AND
lpf->getOrderOfKeyValues() == ppf->getOrderOfKeyValues())
{
// Make a new RangePartitioningFunction object with the combined
// split boundaries of the two. Also produce a corresponding
// node map for the new boundaries.
NodeMap *mergedMap = new (CmpCommon::statementHeap())
NodeMap(CmpCommon::statementHeap());
RangePartitionBoundaries *rb =
ppf->getRangePartitionBoundaries()->merge(
*lpf->getRangePartitionBoundaries(),
*ppf->getNodeMap(),
*mergedMap);
// return the merged partitioning function if we could merge
if (rb)
realPartFunc_ = new(CmpCommon::statementHeap())
RangePartitioningFunction(
ppf->getPartitioningKey(),
ppf->getKeyColumnList(),
ppf->getOrderOfKeyValues(),
rb,
mergedMap );
}
else
{
// Special case of single partition physical partitioning function.
if (physPartFunc_->castToSinglePartitionPartitioningFunction())
{
realPartFunc_ = logPartFunc_;
// Create a new node map.
NodeMap *nodeMap = new(CmpCommon::statementHeap())
NodeMap(CmpCommon::statementHeap());
// Add a copy of the single physical node map entry for
// each partition in the logical node map.
const NodeMapEntry *entry = physPartFunc_->getNodeMapEntry(0);
for (CollIndex nodeIdx = 0;
nodeIdx < (CollIndex) logPartFunc_->getCountOfPartitions();
nodeIdx++)
{
nodeMap->setNodeMapEntry(nodeIdx,
*entry,
CmpCommon::statementHeap());
}
realPartFunc_->replaceNodeMap(nodeMap);
}
}
}
break;
default:
break;
}
return realPartFunc_; // is NULL if we aren't smart enough to do this
}
COMPARE_RESULT
LogPhysPartitioningFunction::comparePartFuncToFunc
(const PartitioningFunction &other) const
{
COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);
if (c != SAME)
return INCOMPATIBLE;
const LogPhysPartitioningFunction &lppfOther =
(const LogPhysPartitioningFunction &) other;
if (getLogPartType() != lppfOther.getLogPartType())
return INCOMPATIBLE;
if (getNumOfClients() != lppfOther.getNumOfClients())
return INCOMPATIBLE;
if (getUsePapa() != lppfOther.getUsePapa())
return INCOMPATIBLE;
if (getSynchronousAccess() != lppfOther.getSynchronousAccess())
return INCOMPATIBLE;
if (getLogPartitioningFunction()->comparePartFuncToFunc(
*lppfOther.getLogPartitioningFunction()) != SAME)
return INCOMPATIBLE;
if (getPhysPartitioningFunction()->comparePartFuncToFunc(
*lppfOther.getPhysPartitioningFunction()) != SAME)
return INCOMPATIBLE;
return SAME;
}
NABoolean LogPhysPartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
// Handle the trivial case of two functions that are equal. The only
// other way to check for a grouping is to actually form the "real"
// partitioning function and use it for the check.
return (comparePartFuncToFunc(other) == SAME OR
(realPartFunc_ AND
realPartFunc_->isAGroupingOf(other,maxPartsPerGroup)));
}
// -----------------------------------------------------------------------
// LogPhysPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void LogPhysPartitioningFunction::remapIt
(const PartitioningFunction* opf,
ValueIdMap& map, NABoolean mapItUp)
{
// Currently, the only consumer of this method is
// MapValueIds::synthPhysicalProperty, when it executes in DP2
// Map myself
PartitioningFunction::remapIt(opf,map,mapItUp);
// Map the logical function
logPartFunc_ = logPartFunc_->copyAndRemap(map,mapItUp);
// Map the physical function
physPartFunc_ = physPartFunc_->copyAndRemap(map,mapItUp);
} // LogPhysPartitioningFunction::remapIt()
// LogPhysPartitioningFunction::canMaintainSortOrder()
// Can this logPhys partitioning function maintain the order of an
// individual partition of the physical partitioning function. In
// order to maintain the order, a merge expression may be required.
//
NABoolean
LogPhysPartitioningFunction::canMaintainSortOrder(const ValueIdList& sortOrder)
const
{
// The sortOrder is the sort order of an individual partition of the
// physical partitioning function.
//
CollIndex numSortKeys = sortOrder.entries();
// We can always maintain, no sort order.
//
if(numSortKeys == 0) {
return TRUE;
}
// For now, only concerned with table/index partitioning functions.
// (Single, Range or TableHash)
//
const SinglePartitionPartitioningFunction *sppf =
physPartFunc_->castToSinglePartitionPartitioningFunction();
// A single partition can always maintain its order.
//
if(sppf) {
return TRUE;
}
const RangePartitioningFunction *rpf =
physPartFunc_->castToRangePartitioningFunction();
const TableHashPartitioningFunction *hdpf =
physPartFunc_->castToTableHashPartitioningFunction();
CMPASSERT(rpf || hdpf);
if(!getSynchronousAccess()) {
// Can always merge the streams, even if (in the case of
// non-decoupled range) this results in serial access to the
// partitions.
//
return TRUE;
}
if(rpf) {
// For range partitioning, if the partitioning keys are a prefix
// of the clustering key, then in order to maintain the sort order
// the partitions must be accessed synchronously.
//
// If the partitioning keys are not a prefix of the clustering
// key, then the partitions can be merged if the partitions are
// accessed asynchronously. (This case is handled above)
//
// Determine if the sortkey is a prefix of the partitioning key or
// if the partitioning key is a prefix of the sortkey. If either
// of these is true, the sort order can be maintained if the
// partitions are accessed synchronously.
//
const ValueIdList &partKeyCols = rpf->getOrderOfKeyValues();
// This logic may have to change when we become more sophisticated
// in our sort keys and sort requirements. For example, if the
// sort req is (a/2) and the sort key is a
//
if (partKeyCols.entries() <= numSortKeys) {
OrderComparison oc= sortOrder.satisfiesReqdOrder(partKeyCols);
if(oc == SAME_ORDER ||
(CmpCommon::getDefault(ATTEMPT_REVERSE_SYNCHRONOUS_ORDER) == DF_ON &&
oc == INVERSE_ORDER)
)
{
// Range, non-decoupled, sort order == partition order,
// synchronous access: therefore can maintain order.
//
return TRUE;
}
else {
// Not a prefix, so in order to maintain order, must be
// accessing the partitions asynchronously so that a merge
// expression can be used.
//
return FALSE;
}
} else {
if(partKeyCols.satisfiesReqdOrder(sortOrder) == SAME_ORDER) {
// Range, non-decoupled, sort order == partition order,
// synchronous access: therefore can maintain order.
//
return TRUE;
} else {
// Not a prefix, so in order to maintain order, must be
// accessing the partitions asynchronously so that a merge
// expression can be used.
//
return FALSE;
}
}
} else {
// For Hash, synchronous (and all other partitioning functions)
// cannot maintain order.
//
return FALSE;
}
} // LogPhysPartitioningFunction::canMaintainSortOrder()
const NAString LogPhysPartitioningFunction::getText() const
{
NAString result("logphys partitioned(", CmpCommon::statementHeap());
switch (logPartType_)
{
case PA_PARTITION_GROUPING:
result += "grouping";
break;
case LOGICAL_SUBPARTITIONING:
result += "subpartitioning";
break;
case HORIZONTAL_PARTITION_SLICING:
result += "horizontal slicing";
break;
case PA_GROUPED_REPARTITIONING:
result += "repartitioning";
break;
default:
result += "???";
break;
}
if (usePapa_)
{
char numPAs[20];
result += ", PAPA with ";
sprintf(numPAs,"%d",numOfClients_);
result += numPAs;
result += " PA(s)";
}
result += ", log=" + logPartFunc_->getText() +
", phys=" + physPartFunc_->getText() + ")";
return result;
}
const NAString LogPhysPartitioningFunction::getLogForSplitTop() const
{
NAString result("", CmpCommon::statementHeap());
switch (logPartType_)
{
case PA_PARTITION_GROUPING:
//whenever the logical partitioning function is replicate no broadcast
//it is not truly the case of grouping
if (logPartFunc_->isAReplicateNoBroadcastPartitioningFunction())
result += "no grouping, ";
else {
char numParts[20];
result += "grouped ";
sprintf(numParts, "%d to %d,", logPartFunc_->getCountOfPartitions(), physPartFunc_->getCountOfPartitions());
result += numParts;
}
break;
case LOGICAL_SUBPARTITIONING:
result += "subpartitioned, ";
break;
case HORIZONTAL_PARTITION_SLICING:
result += "horizontally sliced, ";
break;
case PA_GROUPED_REPARTITIONING:
result += "repartitioned, ";
break;
default:
result += "???, ";
break;
}
if (usePapa_)
{
char numPAs[20];
result += "PAPA with ";
sprintf(numPAs,"%d",numOfClients_);
result += numPAs;
result += " PA(s)";
}
result += ", " + logPartFunc_->getText();
return result;
}
const NAString LogPhysPartitioningFunction::getPhysForSplitTop() const
{
return physPartFunc_->getText();
}
void LogPhysPartitioningFunction::print(
FILE* ofd,
const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd,indent,"LogPhysPartitioningFunction");
logPartFunc_->print(ofd,"logical: ");
physPartFunc_->print(ofd,"physical: ");
}
// ***********************************************************************
// RoundRobinPartitioningFunction
// ***********************************************************************
PartitioningRequirement*
RoundRobinPartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireRoundRobin(this);
}
PartitioningFunction*
RoundRobinPartitioningFunction::copy() const
{
return new(CmpCommon::statementHeap()) RoundRobinPartitioningFunction(*this);
}
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::comparePartFuncToFunc(): Compare
// this partitioning function to another round robin function. To be
// 'SAME' must have the same number and order of partitioning key
// columns and have the same number of partitions (scaled and
// original).
// -----------------------------------------------------------------------
COMPARE_RESULT
RoundRobinPartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);
if (c != SAME)
return INCOMPATIBLE;
const RoundRobinPartitioningFunction *oth =
other.castToRoundRobinPartitioningFunction();
// Since they compared 'SAME', oth should always exist, so this
// test is redundant.
//
if(!oth)
return INCOMPATIBLE;
// They must be based on the same physical partitioning.
//
if (getCountOfOrigRRPartitions() != oth->getCountOfOrigRRPartitions())
return INCOMPATIBLE;
return SAME;
} // RoundRobinPartitioningFunction::comparePartFuncToFunc()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void RoundRobinPartitioningFunction::createPartitioningKeyPredicates()
{
if (NOT partKeyPredsCreated())
{
// Create the partition input values.
//
ItemExpr *loPart = new (CmpCommon::statementHeap())
HostVar("_sys_HostVarLoRoundRobinPart",
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
TRUE);
ItemExpr *hiPart = new (CmpCommon::statementHeap())
HostVar("_sys_HostVarHiRoundRobinPart",
new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
TRUE);
loPart->synthTypeAndValueId();
hiPart->synthTypeAndValueId();
ValueIdSet setOfKeyPredicates;
ValueIdList partInputValues;
// -----------------------------------------------------------------
// The partitioning key predicate is never used for a round
// robin partitioning so none is generated.
// -----------------------------------------------------------------
// the partition input values are two integer values: lo and hi part #
partInputValues.insert(loPart->getValueId());
partInputValues.insert(hiPart->getValueId());
// Store the empty set of key predicate. This will set the
// boolean 'partKeyPredsCreated' to TRUE.
//
storePartitioningKeyPredicates(setOfKeyPredicates);
// Store the partition input values.
//
storePartitionInputValues(partInputValues);
}
} // RoundRobinPartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void RoundRobinPartitioningFunction::replacePivs(
const ValueIdList& newPivs,
const ValueIdSet& newPartKeyPreds)
{
// Overwrite the old pivs, part key preds, and part expr. with the new ones.
storePartitionInputValues(newPivs);
storePartitioningKeyPredicates(newPartKeyPreds);
} // RoundRobinPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
RoundRobinPartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
const NAFileSet * fileSet = idesc->getNAFileSet();
const NAColumnArray & allColumns = fileSet->getAllColumns();
const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();
Lng32 ixColNumber;
ValueId keyValueId;
ValueIdSet partitioningKey;
for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
{
// which column of the index is this (usually this will be == i)
ixColNumber = allColumns.index(partKeyColumns[i]);
// insert the value id of the index column into the partitioning
// key column value id set
keyValueId = idesc->getIndexColumns()[ixColNumber];
partitioningKey += keyValueId;
} // end loop over partitioning key columns
// -----------------------------------------------------------------
// Allocate a new RoundRobinPartitioningFunction.
// -----------------------------------------------------------------
RoundRobinPartitioningFunction *partFunc
= new(idesc->wHeap()) RoundRobinPartitioningFunction
(getCountOfPartitions(),
partitioningKey,
getNodeMap()->copy(idesc->wHeap()));
partFunc->createPartitioningKeyPredicates();
return partFunc;
} // RoundRobinPartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* RoundRobinPartitioningFunction::createPartitioningExpression()
{
if (getExpression()) // already constructed?
return getExpression(); // reuse it!
CollHeap *heap = CmpCommon::statementHeap();
// The Partitioning Key (SYSKEY)
//
ValueIdList keyValue = getPartitioningKey();
// The type of the partitioning key for round robin is always
// SQLLargeInt (the type of SYSKEY)
//
NAType *desiredType = new (heap) SQLLargeInt(heap, TRUE, FALSE);
// The layout of the SYSKEY is
//
ItemExpr *partKey =
new (heap)
Cast(new (heap)
Shift(ITM_SHIFT_RIGHT,
new (heap)
Cast(keyValue[0].getItemExpr(),
desiredType),
new (heap)
ConstValue(32)),
new (heap) SQLInt(heap, FALSE,FALSE));
NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE);
Lng32 numParts = getCountOfOrigRRPartitions();
char buffer[20];
sprintf(buffer, "%d", numParts);
NAString numPartsStr = buffer;
ItemExpr *origNumParts =
new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);
numParts = getCountOfPartitions();
sprintf(buffer, "%d", numParts);
numPartsStr = buffer;
ItemExpr *scaledNumParts =
new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);
ItemExpr *partFunc =
new (heap)
PAGroup(new (heap)
ProgDistrib(partKey, origNumParts),
scaledNumParts,
origNumParts);
partFunc->synthTypeAndValueId();
storeExpression(partFunc);
return partFunc;
} // RoundRobinPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitionSelectionExpr()
// Create the partition selection expression. 'Partition selection'
// means that an expression is used to determine the partition to
// access as opposed to using the File System to determine the range
// of partitions to access based on a set of partitioning key
// predicates. Partition selection is currently used for Hash Dist and
// Round Robin Partitioning. And the File System is used for Range
// Partitioning. If a partitioning selection expression is created,
// it is cached in the data member 'partitionSelectionExpr_' and the
// partition selection inputs are generated and stored in
// 'partitionSelectionExprInputs_'. This method is redefined for
// HashDistPartitioningFunction and RoundRobinPartitioningFunction.
// -----------------------------------------------------------------------
ItemExpr *
RoundRobinPartitioningFunction::
createPartitionSelectionExpr(const SearchKey *partSearchKey,
const ValueIdSet &availableValues)
{
// If it has already been created, return cached version.
//
if(partitionSelectionExpr())
return partitionSelectionExpr();
CollHeap *heap = CmpCommon::statementHeap();
// Use a host var to provide access to numParts, this will be
// mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
//
ItemExpr *numParts = new (heap) HostVar("_sys_hostVarNumParts",
// int not null
new (heap) SQLInt(heap, FALSE, FALSE),
// is system-supplied
TRUE);
numParts->synthTypeAndValueId();
// Use a host var to provide access to partNum, this will be
// mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
//
ItemExpr *partNum = new (heap) HostVar("_sys_hostVarPartNo",
// int not null
new (heap) SQLInt(heap, FALSE, FALSE),
// is system-supplied
TRUE);
partNum->synthTypeAndValueId();
// Record these hostvars as the inputs to the partitionSelectionExpr.
//
CMPASSERT(partitionSelectionExprInputs().entries() == 0);
partitionSelectionExprInputs().insert(partNum->getValueId());
partitionSelectionExprInputs().insert(numParts->getValueId());
if(assignPartition()) {
// If partitionAssignment is to be done, the partition selection
// expression is Modulus(Cast((partNum+1),SQLInt),numParts). The
// cast (to SQLInt) is there because the type synthesized for
// partNum+1 is a large int, which Modulus can't handle at the
// moment. So, we just cast the sum to a SQLInt for now.
// Probably the best thing to do here is to somehow force the type
// synthesis for BiArith(ITM_PLUS) so that when we add two SQLInts
// we get a result of SQLInt, rather than something bigger. This
// doesn't follow SQL type synthesis rules, but that is OK, since
// the expression is generated internally. $$ TBD $$
// Construct and cache the partitionSelectionExpr for Round Robin
// (partition assignment).
//
partitionSelectionExpr() =
new (heap) Modulus(new (heap) Cast(new (heap)
BiArith(ITM_PLUS, partNum,
new (heap) SystemLiteral(1)),
new (heap) SQLInt(heap, TRUE,FALSE)),
numParts);
// Bind the expression.
//
partitionSelectionExpr()->synthTypeAndValueId();
// PreCodeGen the expression (This maybe should go in
// RoundRobinPartitioningFunction::preCodeGen(), but preCodeGen()
// is typically called before this expression is generated.
//
partitionSelectionExpr()->replaceVEGExpressions(availableValues,
availableValues);
return partitionSelectionExpr();
} else if(partSearchKey->isUnique()) {
// For now, only support a partition selection expression when the
// partition search key is unique (identifies exactly one partition).
//
const ValueIdList &keyValues = partSearchKey->getBeginKeyValues();
// The type of the partitioning key for round robin is always
// SQLLargeInt (the type of SYSKEY)
//
NAType *desiredType = new (heap) SQLLargeInt(heap,TRUE, FALSE);
// The partition selection expression is:
//
// ProgDistrib(Cast(ShiftRight(Cast(<keyvalue>, SQLLargeInt), 32), SQLInt),
// numParts)
//
// First, the <keyvalue> is cast to a SQLLargeInt to make it the
// same type as the partitioning key (SYSKEY). This value is then
// shifted right 32 bits to remove the 16 bits of padding
// introduced for packing and 16 bits of random data used to make
// sure the key is unique, but not used to compute the partition
// number. This is then cast to a SQLInt to make it compatible
// with ProgDistrib. Then ProgDistrib calculates the partNum.
// Construct and cache the partitionSelectionExpr for Round Robin
// (partition assignment).
//
partitionSelectionExpr() =
new (heap) ProgDistrib(new (heap)
Cast(new (heap)
Shift(ITM_SHIFT_RIGHT,
new (heap)
Cast(keyValues[0].getItemExpr(),
desiredType),
new (heap)
SystemLiteral(0x1000)),
new (heap) SQLInt(heap, FALSE,FALSE)),
numParts);
// Bind the expression.
//
partitionSelectionExpr()->synthTypeAndValueId();
// PreCodeGen the expression (This maybe should go in
// RoundRobinPartitioningFunction::preCodeGen(), but preCodeGen()
// is typically called before this expression is generated.
//
partitionSelectionExpr()->replaceVEGExpressions(availableValues,
availableValues);
return partitionSelectionExpr();
} else {
return NULL;
}
} // RoundRobinPartitioningFunction::createPartitionSelectionExpr()
//::scaleNUmberOfPartitions() are called in following locations
//
// OptPhysRelExpr.cpp
// 3536 NJ::genLeftChild()
// 4892 NJ:createContextForChild() (rightPartFunc->)
// 14166 and 14262 synthDP2PhysicalProperty()
//
// GP.cpp
// 955 GroupAttributes::recommendedOrderForNJProbing()
//
//As a result, the following version will not be called.
PartitioningFunction *
RoundRobinPartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
PartitionGroupingDistEnum partGroupDist)
{
if (suggestedNewNumberOfPartitions == 1)
return new(CmpCommon::statementHeap())
SinglePartitionPartitioningFunction();
// If an expression has been generated, then we want to discard it
// because it may no longer be correct.
storeExpression(NULL);
// Allow arbitrary scaling of RoundRobinPartitioningFunctions.
// (The runtime will handle the mapping of physical partitions
// to logical partitions.)
//
if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
NodeMap* newNodeMap =
new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
replaceNodeMap(newNodeMap);
}
partitionCount_ = suggestedNewNumberOfPartitions;
return this;
} // RoundRobinPartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
RoundRobinPartitioningFunction::
isAGroupingOf(const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
const RoundRobinPartitioningFunction *oth =
other.castToRoundRobinPartitioningFunction();
// If other is not a RoundRobinPartitioningFunction, then it cannot
// be a grouping of...
//
if(!oth)
return FALSE;
// To be a grouping of, the key column (SYSKEY) must be the same.
//
if (! (getPartitioningKey() == oth->getPartitioningKey()))
return FALSE;
// If this function has more partitions than other,
// then it cannot be a grouping of.
// Eg. this.numParts: 10 this.origNumParts: 20
// oth.numParts: 5 oth.origNumParts: 20
//
// If the two functions are not based on the same physical function,
// then it cannot be a grouping of.
// Eg. this.numParts: 10 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 30
//
if((getCountOfPartitions() > oth->getCountOfPartitions()) ||
(getCountOfOrigRRPartitions() != oth->getCountOfOrigRRPartitions())) {
return FALSE;
}
// Here the following is known to be TRUE:
//
// (getCountOfPartitions() <= oth->getCountOfPartitions()
//
// AND
//
// (getCountOfOrigRRPartitions() == oth->getCountOfOrigRRPartitions())
//
// Eg. this.numParts: 10 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
// If other has not been scaled (allow arbitrary scaling of one function):
// Eg. this.numParts: 7 this.origNumParts: 20
// oth.numParts: 20 oth.origNumParts: 20
// OR
// If they have both been scaled to the same number of partitions:
// then it is a grouping of.
// Eg. this.numParts: 7 this.origNumParts: 20
// oth.numParts: 7 oth.origNumParts: 20
//
if((oth->getCountOfPartitions() == oth->getCountOfOrigRRPartitions()) ||
(getCountOfPartitions() == oth->getCountOfPartitions())) {
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup =
((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
/ getCountOfPartitions());
return TRUE;
}
// WARNING..... I am not sure if the current code can ever produce
// a situation that would bring control to here. Also, I am not
// sure if the semantics of GROUPING implemented below are correct
// for these situations.
//
// Here the following is known to be TRUE:
//
// both functions have been scaled. (I DON'T THINK THIS CAN HAPPEN)
//
// AND
//
// They are scaled to different sizes.
//
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 7 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// Under these conditions, three things must be true for it to be a
// grouping of:
//
// - the scaled number of partitions must evenly divide the scaled
// number of partitions of other
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// - the other scaling must be a multiple of or evenly divide the
// original number of partitions
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 5 this.origNumParts: 20
// oth.numParts: 40 oth.origNumParts: 20
//
// - this scaling must also be a multiple of or evenly divide the
// original number of partitions
// Eg. this.numParts: 5 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// this.numParts: 40 this.origNumParts: 20
// oth.numParts: 10 oth.origNumParts: 20
//
// If the scaled number of partitions evenly divides the scaled
// number of partitions of other...
//
if((oth->getCountOfPartitions() % getCountOfPartitions()) != 0) {
return FALSE;
}
// AND the other scaling is a multiple of or evenly divides the
// original number of partitions...
//
if(oth->getCountOfOrigRRPartitions() >= oth->getCountOfPartitions()) {
if(oth->getCountOfOrigRRPartitions() % oth->getCountOfPartitions()) {
return FALSE;
}
} else {
if(oth->getCountOfPartitions() % oth->getCountOfOrigRRPartitions()) {
return FALSE;
}
}
// AND this scaling is a multiple of or evenly divides the original
// number of partitions ...
//
if(getCountOfOrigRRPartitions() >= getCountOfPartitions()) {
if(getCountOfOrigRRPartitions() % getCountOfPartitions()) {
return FALSE;
}
} else {
if(getCountOfPartitions() % getCountOfOrigRRPartitions()) {
return FALSE;
}
}
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup =
((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
/ getCountOfPartitions());
// THEN it is a grouping of...
//
return TRUE;
} // RoundRobinPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::codeGen() is defined in
// generator/GenPartFunc.cpp
// -----------------------------------------------------------------------
// -----------------------------------------------------------------------
// Make a new partSearchKey that indicates that PA_PARTITION_GROUPING
// is being done. Note that a search key can not be generated which
// can group RR partitions. For RoundRobinPartitioning, a flag in the
// search key is used to indicate that PA_PARTITION_GROUPING is being
// done and the begin/end key values of the search key are set to the
// partition input values of the partitioning function.
// -----------------------------------------------------------------------
SearchKey *
RoundRobinPartitioningFunction::createSearchKey(const IndexDesc *indexDesc,
ValueIdSet availInputs,
ValueIdSet additionalPreds) const
{
ValueIdSet preds(getPartitioningKeyPredicates());
ValueIdSet nonKeyColumnSet; // empty set
availInputs += getPartitionInputValues();
preds += additionalPreds;
// Call this special constructor that constructs a search key for a
// RoundRobinPartitioningFunction.
//
SearchKey *partSearchKey = new (CmpCommon::statementHeap())
SearchKey(indexDesc->getPartitioningKey(),
indexDesc->getOrderOfPartitioningKeyValues(),
availInputs,
preds,
this,
nonKeyColumnSet,
indexDesc);
return partSearchKey;
} // RoundRobinPartitioningFunction::createSearchKey()
void RoundRobinPartitioningFunction::setupForStatement()
{
if(setupForStatement_)
return;
PartitioningFunction::setupForStatement();
setupForStatement_ = TRUE;
resetAfterStatement_ = FALSE;
}
void RoundRobinPartitioningFunction::resetAfterStatement()
{
if(resetAfterStatement_)
return;
PartitioningFunction::resetAfterStatement();
partitionCount_ = numberOfOrigRRPartitions_;
setupForStatement_ = FALSE;
resetAfterStatement_ = TRUE;
}
const NAString RoundRobinPartitioningFunction::getText() const
{
NAString result("round robin partitioned ", CmpCommon::statementHeap());
char nparts[20];
sprintf(nparts,"%d (%d)", partitionCount_, numberOfOrigRRPartitions_);
result += nparts;
result += " ways on (";
getPartitioningKey().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
result += ")";
return result;
}
void RoundRobinPartitioningFunction::print(
FILE* ofd,
const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd,indent,"RoundRobinPartitioningFunction");
}
const skewProperty ANY_SKEW_PROPERTY(skewProperty::ANY, NULL);
NABoolean skewProperty::operator==(const skewProperty& other) const
{
if ( indicator_ != other.indicator_ )
return FALSE;
if (broadcastOneRow_ != other.broadcastOneRow_)
return FALSE;
if ( skewValues_ == other.skewValues_ )
return TRUE;
if ( skewValues_ != NULL && other.skewValues_ != NULL &&
NOT (*skewValues_ == *(other.skewValues_)) )
return FALSE;
if ( numESPs_ == other.numESPs_ )
return TRUE;
return FALSE;
}
const NAString skewProperty::getText(NABoolean abbre) const
{
NAString result;
char esps[20];
NAString suffix;
if ( abbre == FALSE && getAntiSkewESPs() > 0 ) {
suffix += "(via ";
str_itoa(getAntiSkewESPs(), esps);
suffix += esps;
suffix += " ESP(s)) ";
}
switch ( getIndicator() )
{
case skewProperty::UNIFORM_DISTRIBUTE:
if ( abbre == FALSE ) {
result += " - uniformly distribute ";
result += suffix;
result += "skewed ";
if ( skewValues_ )
result += skewValues_->getText();
} else {
result += "-ud";
}
break;
case skewProperty::BROADCAST:
if ( abbre == FALSE ) {
result += " - broadcast ";
result += suffix;
result += "skewed ";
if ( skewValues_ )
result += skewValues_->getText();
if (getBroadcastOneRow())
result += " and one row ";
} else {
result += "-br";
}
break;
default:
break;
}
return result;
}
Int64List*
SkewedDataPartitioningFunction::buildHashListForSkewedValues()
{
if ( skewHashList_ )
return skewHashList_;
const SkewedValueList* svlist = getSkewProperty().getSkewValues();
if ( svlist == NULL )
return NULL;
skewHashList_ = new (STMTHEAP) Int64List(STMTHEAP, svlist->entries());
CollHeap *heap = CmpCommon::statementHeap();
char data[10]; Int32 len = 0;
ConstValue* cvExp = NULL;
ItemExpr* expForSkewedValue = NULL;
ValueIdList exprs;
UInt32 hashval;
if ( !svlist->needToComputeFinalHash() ) {
// for TRUE MC-SB. hash value for the composite skew value is pre-computed.
for (CollIndex i = 0; i < svlist->entries(); i++) {
hashval = (UInt32)((*svlist)[i].getDblValue());
skewHashList_ -> insertAt(i, hashval);
}
} else {
const NAType* naType = svlist->getNAType();
NABoolean useHash = naType->useHashRepresentation();
UInt32 flags = ExHDPHash::NO_FLAGS;
for (CollIndex i = 0; i < svlist->entries(); i++)
{
if ( (*svlist)[i].getValue().isNull() ) {
// an untyped NULL constant. All null values hash to this constant.
// Ref. module exp_function.cpp, method ExHDPHash::eval() and
// ex_function_hash::eval().
hashval = ExHDPHash::nullHashValue; //666654765;
} else {
if ( useHash ) {
// The skew value for any character or exact numeric data type is
// the hash itself. So here we just copy and cast it back to UInt32.
hashval = (UInt32)((*svlist)[i].getDblValue());
} else {
// If we hit this branch, it means the boundary values are stored in
// the skew list. We can assume these values can be safely casted
// back (without loosing precision). Otherwise, storing-hash-
// value method should be used.
(*svlist)[i].outputToBufferToComputeRTHash(naType,data,len,flags);
hashval = computeHashValue(data, flags, len);
}
}
skewHashList_ -> insertAt(i, hashval);
}
}
return skewHashList_;
}
//============================
HivePartitioningFunction::~HivePartitioningFunction() {}
PartitioningRequirement*
HivePartitioningFunction::makePartitioningRequirement()
{
return new (CmpCommon::statementHeap())
RequireHive(this);
}
PartitioningFunction*
HivePartitioningFunction::copy() const
{
return new (CmpCommon::statementHeap())
HivePartitioningFunction(*this, CmpCommon::statementHeap());
}
// -----------------------------------------------------------------------
// HivePartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void HivePartitioningFunction::createPartitioningKeyPredicates()
{
createBetweenPartitioningKeyPredicates("_sys_HostVarLoHivePart",
"_sys_HostVarHiHivePart");
} // HivePartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
// HivePartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
// Right now we assume that the split function of a hash partitioning
// function is such that no two functions are a grouping of each other.
// The only exception is identity, of course. We use the base class'
// implementation.
//
// With more knowledge about the split function (e.g. by knowing it's
// a simple modulus), one could guarantee that a 4-way hash-partitioning
// scheme is actually a grouping of an 8-way scheme. This is not really
// necessary and therefore not done here.
ItemExpr *
HivePartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) const
{
return new (CmpCommon::statementHeap()) HiveHash(expr);
}
UInt32 HivePartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len)
{
// need a Hive()
// Directly call the implementation function to compute the hash. NULL
// values and VARCHAR data types are not handled.
return FastHash(data, len);
}
ItemExpr * HivePartitioningFunction::getHashingExpression() const
{
// need a Hive()
ItemExpr* hashExpr = NULL;
ItemExpr* partExpr = getExpression();
if ( partExpr ) {
hashExpr = partExpr->child(0);
CMPASSERT(hashExpr AND
hashExpr->getOperatorType()== ITM_HASH);
}
return hashExpr;
}
// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString HivePartitioningFunction::getText() const
{
return getTextImp("hive");
}
void HivePartitioningFunction::print(FILE* ofd, const char* indent,
const char* title) const
{
PartitioningFunction::print(ofd, indent, "HivePartitioningFunction");
} // HivePartitioningFunction::print()
PartitioningFunction*
HivePartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
const NAFileSet * fileSet = idesc->getNAFileSet();
const NAColumnArray & allColumns = fileSet->getAllColumns();
const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();
CollIndex ixColNumber;
ValueId keyValueId;
ValueIdSet partitioningKey;
ValueIdList partitioningKeyList;
for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
{
// which column of the index is this (usually this will be == i)
ixColNumber = allColumns.index(partKeyColumns[i]);
// insert the value id of the index column into the partitioning
// key column value id set
keyValueId = idesc->getIndexColumns()[ixColNumber];
partitioningKey += keyValueId;
partitioningKeyList.insertAt(i,keyValueId);
} // end loop over partitioning key columns
// -----------------------------------------------------------------
// Allocate a new HashPartitioningFunction.
// -----------------------------------------------------------------
HivePartitioningFunction *partFunc = new(idesc->wHeap())
HivePartitioningFunction (partitioningKey,
partitioningKeyList,
getCountOfPartitions(),
getNodeMap()->copy(idesc->wHeap()));
// -----------------------------------------------------------------
// Construct the partitioning key predicates.
// -----------------------------------------------------------------
partFunc->createPartitioningKeyPredicates();
return partFunc;
} // HivePartitioningFunction::createPartitioningFunctionForIndexDesc()
NABoolean HivePartitioningFunction::isAGroupingOf(
const PartitioningFunction &other,
Lng32* maxPartsPerGroup) const
{
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = 1;
if ( comparePartKeyToKey(other) == INCOMPATIBLE )
return FALSE;
// assume we can repartition a hive table with <m> partitions to
// <n> partitions. No question asked.
if ( getCountOfPartitions() < other.getCountOfPartitions() &&
other.getCountOfPartitions() % getCountOfPartitions() != 0 )
return FALSE;
if ( other.getCountOfPartitions() < getCountOfPartitions() &&
getCountOfPartitions() % other.getCountOfPartitions() != 0 )
return FALSE;
// This is a grouping of. Set the maxPartsPerGroup and return TRUE.
if (maxPartsPerGroup != NULL)
*maxPartsPerGroup = other.getCountOfPartitions() / getCountOfPartitions();
return TRUE;
}
void
HivePartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
HashPartitioningFunction::normalizePartitioningKeys(normWARef);
keyColumnList_.normalizeNode(normWARef);
// don't normalize original key col list, avoid VEGies which could
// cause data type changes.
}