| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ************************************************************************** |
| * |
| * File: PartFunc.cpp |
| * Description: Partitioning Function |
| * Created: 11/16/1994 |
| * Language: C++ |
| * |
| * |
| * |
| * |
| ************************************************************************** |
| */ |
| |
| // ----------------------------------------------------------------------- |
| |
| #include "PartReq.h" |
| #include "PhyProp.h" |
| #include "ItemColRef.h" |
| #include "ItemLog.h" |
| #include "ItemFunc.h" |
| #include "ItemOther.h" |
| #include "ItemArith.h" |
| #include "opt.h" |
| #include "str.h" |
| #include "NumericType.h" |
| #include "MiscType.h" |
| #include "NAFileSet.h" |
| #include "SearchKey.h" |
| #include "GroupAttr.h" |
| #include "Generator.h" |
| |
| |
| // To test hash2 part number generated by a direct call to |
| // ExHDHash::hash() and compare the result, just uncomment out |
| // the following two lines. |
| //#include "Generator.h" |
| |
| #include "exp_function.h" |
| |
| // *********************************************************************** |
| // A function having an external linkage to allow display() to |
| // be called on a tree object. This is a workaround for bugs/missing |
| // functionality in ObjectCenter that cause display() to become |
| // an undefined symbol. |
| // *********************************************************************** |
| |
| void displayPartitioningFunction(const PartitioningFunction& pf) |
| { |
| pf.display(); |
| } |
| |
| void displayPartitioningFunction(const PartitioningFunction* pf) |
| { |
| if (pf) |
| pf->display(); |
| } |
| |
| void displayPartitionBoundaries(const RangePartitionBoundaries& pb) |
| { |
| pb.display(); |
| } |
| |
| void displayPartitionBoundaries(const RangePartitionBoundaries* pb) |
| { |
| if (pb) |
| pb->display(); |
| } |
| |
| |
| // *********************************************************************** |
| // PartitioningFunction |
| // *********************************************************************** |
| |
| //------------------------------------------------------------------------ |
| // PartitioningFunction destructor. |
| //------------------------------------------------------------------------ |
| PartitioningFunction::~PartitioningFunction() |
| { |
| if (nodeMap_ != NULL) |
| { |
| delete nodeMap_; |
| } |
| |
| } //PartitioningFunction destructor |
| |
| // ----------------------------------------------------------------------- |
| // Methods for perform type-safe pointer casts. |
| // ----------------------------------------------------------------------- |
| const LogPhysPartitioningFunction* |
| PartitioningFunction::castToLogPhysPartitioningFunction() const |
| { return NULL; } |
| |
| const SinglePartitionPartitioningFunction* |
| PartitioningFunction::castToSinglePartitionPartitioningFunction() const |
| { return NULL; } |
| |
| const ReplicateViaBroadcastPartitioningFunction* |
| PartitioningFunction::castToReplicateViaBroadcastPartitioningFunction() const |
| { return NULL; } |
| |
| const ReplicateNoBroadcastPartitioningFunction* |
| PartitioningFunction::castToReplicateNoBroadcastPartitioningFunction() const |
| { return NULL; } |
| |
| const HashPartitioningFunction* |
| PartitioningFunction::castToHashPartitioningFunction() const |
| { return NULL; } |
| |
| const TableHashPartitioningFunction* |
| PartitioningFunction::castToTableHashPartitioningFunction() const |
| { return NULL; } |
| |
| const HashDistPartitioningFunction* |
| PartitioningFunction::castToHashDistPartitioningFunction() const |
| { return NULL; } |
| |
| const Hash2PartitioningFunction* |
| PartitioningFunction::castToHash2PartitioningFunction() const |
| { return NULL; } |
| |
| const RangePartitioningFunction* |
| PartitioningFunction::castToRangePartitioningFunction() const |
| { return NULL; } |
| |
| const RoundRobinPartitioningFunction* |
| PartitioningFunction::castToRoundRobinPartitioningFunction() const |
| { return NULL; } |
| |
| const SkewedDataPartitioningFunction* |
| PartitioningFunction::castToSkewedDataPartitioningFunction() const |
| { return NULL; }; |
| |
| const HivePartitioningFunction* |
| PartitioningFunction::castToHivePartitioningFunction() const |
| { return NULL; }; |
| |
| |
| |
| // --------------------------------------------------------------------- |
| // Method to test if the partitioning key contains any approximate |
| // numeric type columns. Necessary because in some cases certain |
| // parallel operations do not function properly if the partitioning |
| // key of the table contains approximate numeric columns. |
| // --------------------------------------------------------------------- |
| NABoolean PartitioningFunction::partKeyContainsFloatColumn() const |
| { |
| if (NOT getPartitioningKey().isEmpty()) |
| { |
| ValueIdSet partKeyColumns = getPartitioningKey(); |
| for (ValueId vid = partKeyColumns.init(); |
| partKeyColumns.next(vid); |
| partKeyColumns.advance(vid)) |
| { |
| const NAType& columnType = vid.getType(); |
| if ((columnType.getTypeQualifier() == NA_NUMERIC_TYPE) AND |
| NOT ((NumericType&)columnType).isExact()) |
| { |
| return TRUE; |
| } |
| } // end for |
| } // end if part key is not empty |
| return FALSE; |
| } // partKeyContainsFloatColumn() |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::getNodeMap() |
| // Return base class node map by default. |
| // ----------------------------------------------------------------------- |
| const NodeMap* |
| PartitioningFunction::getNodeMap() const { return nodeMap_; } |
| |
| // use any existing nodemap from my req or my child (or synthesize one) that |
| // matches my partition count requirement |
| void |
| PartitioningFunction::useNodeMapFromReqOrChild(PartitioningRequirement *req, |
| PartitioningFunction *childPF, |
| NABoolean forESP) |
| { |
| ULng32 partCnt = (ULng32)getCountOfPartitions(); |
| NodeMap *myNodeMap = NULL; |
| if (req->isRequirementFullySpecified() && |
| (CmpCommon::getDefault(COMP_BOOL_87) != DF_ON) && getNodeMap() && |
| (getNodeMap()->getNumEntries() == partCnt)) { |
| // we are a copy of a partitioning function that realizes a fully specified |
| // partitioning requirement. So, do nothing. our nodemap is good enough. |
| return; |
| } else if ((CmpCommon::getDefault(COMP_BOOL_87) != DF_ON) && |
| (partCnt == (ULng32)childPF->getCountOfPartitions()) && |
| childPF->getNodeMap() && |
| (childPF->getNodeMap()->getNumEntries() == partCnt)) { |
| // we are a copy of a partitioning function that realizes a fuzzy |
| // partitioning requirement. childPF is the partitioning function of |
| // the synthesized physical property of the child of an Exchange. So, |
| // our child's partitioning function's nodemap is good enough. |
| myNodeMap = childPF->getNodeMap()->copy(); |
| } else { |
| // only as a last resort do we synthesize a new node map |
| |
| // Synthesize a nodemap based on the nodemap of the child and the |
| // desired number of ESPs. Using synthesizeLogicalMap() assumes |
| // that the lower and upper ESPs are associated via grouping. This |
| // assumption is not valid when considering the communication |
| // patterns between the upper and lower ESPs, but this assumption |
| // will lead to a reasonable nodemap for the upper ESPs. |
| myNodeMap = ((NodeMap*)childPF->getNodeMap()) |
| ->synthesizeLogicalMap(partCnt, forESP); |
| } |
| CMPASSERT(myNodeMap); |
| for(CollIndex i = 0; i < partCnt; i++) { |
| myNodeMap->setPartitionState(i, NodeMapEntry::ACTIVE); |
| } |
| CMPASSERT(myNodeMap->getNumActivePartitions() == partCnt); |
| replaceNodeMap(myNodeMap); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::replaceNodeMap() |
| // Replace node map with a newly specified node map. |
| // ----------------------------------------------------------------------- |
| void |
| PartitioningFunction::replaceNodeMap(NodeMap* nodeMap) |
| { |
| if (nodeMap_ != NULL) |
| { |
| #ifndef NDEBUG |
| //assertion for NATable caching. |
| //assert if this object is not on the system heap i.e. (collHeap()!=0) |
| //and if the heap is not the statementHeap. |
| //The assertion would happen if this object is on a NATable heap, |
| //since we should not be changing the partitionins function associated |
| //with a cached NATable object |
| if((collHeap()) && (collHeap() != CmpCommon::statementHeap())) |
| CMPASSERT(FALSE); |
| #endif //NDEBUG |
| //Fix for solution 10-040120-2524 |
| //Unconditionally deleting the nodeMap_ |
| //was causing problems as the object pointed |
| //to by nodeMap_ was being used by other guys also. |
| // |
| //delete the object if is on the system heap. |
| //If it is on the the statement heap then it will |
| //be delete when the statement heap is deleted. |
| if(!nodeMap_->collHeap())delete nodeMap_; |
| } |
| |
| nodeMap_ = nodeMap; |
| |
| } // PartitioningFunction::replaceNodeMap() |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::copy() |
| // Virtual copy constructor returns a copy of myself. |
| // ----------------------------------------------------------------------- |
| PartitioningFunction* PartitioningFunction::copy() const |
| { |
| // illegal to call copy() of the base class |
| CMPABORT; |
| return NULL; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::normalizePartitioningKeys() |
| // Rewrite the partitioning keys of the partitioning function in |
| // terms of the VEGReference for the VEG to which the partitioning |
| // key column belongs. |
| // ----------------------------------------------------------------------- |
| void PartitioningFunction::normalizePartitioningKeys(NormWA& normWARef) |
| { |
| partitioningKeyColumns_.normalizeNode(normWARef); |
| partitioningKeyPredicates_.normalizeNode(normWARef); |
| |
| if(partitioningExpression_) |
| partitioningExpression_ = |
| partitioningExpression_->normalizeNode(normWARef); |
| |
| if(partitionSelectionExpr_) |
| partitionSelectionExpr_ = |
| partitionSelectionExpr_->normalizeNode(normWARef); |
| |
| } // PartitioningFunction::normalizePartitioningKeys |
| |
| // -------------------------------------------------------------------- |
| // A method that is used by optimizer for comparing partitioning |
| // function with the random number partitioning function i.e. it only |
| // compares the partitioning func type and the number of partitions. |
| // It does not compare partitioning key. |
| // -------------------------------------------------------------------- |
| NABoolean |
| PartitioningFunction::isKnownReplicaPartFunc() const |
| { |
| return |
| ( |
| (isASkewedDataPartitioningFunction() && |
| ((SkewedDataPartitioningFunction*)this)->getSkewProperty().isBroadcasted()) |
| || |
| isAReplicateViaBroadcastPartitioningFunction() |
| || |
| isAReplicateNoBroadcastPartitioningFunction()); |
| } |
| |
| COMPARE_RESULT |
| PartitioningFunction::comparePartFuncsForUnion |
| (const PartitioningFunction &other) const |
| { |
| NABoolean myRepPartFunc = isKnownReplicaPartFunc(); |
| NABoolean otherRepPartFunc = other.isKnownReplicaPartFunc(); |
| |
| if ( myRepPartFunc || otherRepPartFunc ) return INCOMPATIBLE; |
| |
| if (getCountOfPartitions() == other.getCountOfPartitions()) |
| return SAME; |
| |
| return INCOMPATIBLE; |
| } // PartitioningFunction::comparePartFuncsForUnion |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::comparePartFuncToFunc() |
| // Partitioning function comparison method for hash, round robin, replication, |
| // and single_partition partitioning functions i.e. everything except regular |
| // range partitioning functions. |
| // "Other" must be a partitioning function. |
| // ----------------------------------------------------------------------- |
| COMPARE_RESULT |
| PartitioningFunction::comparePartFuncToFunc |
| (const PartitioningFunction &other) const |
| { |
| if ((getPartitioningFunctionType() == other.getPartitioningFunctionType()) AND |
| (getCountOfPartitions() == other.getCountOfPartitions()) AND |
| (getPartitioningKey() == other.getPartitioningKey()) |
| ) |
| return SAME; |
| else |
| return INCOMPATIBLE; |
| } // PartitioningFunction::comparePartFuncToFunc |
| |
| static inline |
| PartitioningFunction* getPhysPartFunc(const PartitioningFunction* x) |
| { |
| PartitioningFunction* phys = 0; |
| if ( x -> isALogPhysPartitioningFunction() ) { |
| phys = x -> castToLogPhysPartitioningFunction() -> |
| getPhysPartitioningFunction(); |
| } |
| return phys; |
| } |
| |
| NABoolean PartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| // By default we are not smart and take no risk. However, two |
| // partitioning functions that are equal are always a grouping of |
| // each other. |
| return (comparePartFuncToFunc(other) == SAME); |
| } |
| |
| PartitioningRequirement* PartitioningFunction::makePartitioningRequirement() |
| { |
| // Redefine PartitioningFunction::makePartitioningRequirement() |
| CMPABORT; |
| return NULL; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::scaleNumberOfPartitions |
| // ----------------------------------------------------------------------- |
| PartitioningFunction * PartitioningFunction::scaleNumberOfPartitions( |
| Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| // if we come here this means that the derived class did not |
| // override scaleNumberOfPartitions(). In other words, the derived |
| // class doesn't know how to change its number of partitions. |
| // Therefore, ignore the suggestion and return the current number |
| // of partitions, unless we scale down to 1. |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) { |
| NodeMap* newNodeMap = |
| new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions); |
| replaceNodeMap(newNodeMap); |
| } |
| |
| suggestedNewNumberOfPartitions = getCountOfPartitions(); |
| return this; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::copyAndRemap() |
| // ----------------------------------------------------------------------- |
| PartitioningFunction* |
| PartitioningFunction::copyAndRemap |
| (ValueIdMap& map, NABoolean mapItUp) const |
| { |
| if (getPartitioningKey().entries() == 0) |
| return (PartitioningFunction*)this; |
| PartitioningFunction* newPartFunc = copy(); // invoke virtual copy constructor |
| newPartFunc->remapIt(this, map, mapItUp); |
| |
| return newPartFunc; |
| } // PartitioningFunction::copyAndRemap() |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::remapIt() |
| // ----------------------------------------------------------------------- |
| void PartitioningFunction::remapIt |
| (const PartitioningFunction* opf, |
| ValueIdMap& map, NABoolean mapItUp) |
| { |
| // Clear because rewrite insists on it being so. |
| partitioningKeyColumns_.clear(); |
| partitioningKeyPredicates_.clear(); |
| partitionInputValues_.clear(); |
| partitionInputValuesLayout_.clear(); |
| |
| if (mapItUp) |
| { |
| map.rewriteValueIdSetUp(partitioningKeyColumns_, |
| opf->partitioningKeyColumns_); |
| map.rewriteValueIdSetUp(partitioningKeyPredicates_, |
| opf->partitioningKeyPredicates_); |
| // Note the pivs don't need mapping, they just need to be copied. |
| map.rewriteValueIdSetUp(partitionInputValues_, |
| opf->partitionInputValues_); |
| map.rewriteValueIdListUp(partitionInputValuesLayout_, |
| opf->partitionInputValuesLayout_); |
| } |
| else |
| { |
| map.rewriteValueIdSetDown(opf->partitioningKeyColumns_, |
| partitioningKeyColumns_); |
| map.rewriteValueIdSetDown(opf->partitioningKeyPredicates_, |
| partitioningKeyPredicates_); |
| // Note the pivs don't need mapping, they just need to be copied. |
| map.rewriteValueIdSetDown(opf->partitionInputValues_, |
| partitionInputValues_); |
| map.rewriteValueIdListDown(opf->partitionInputValuesLayout_, |
| partitionInputValuesLayout_); |
| } |
| |
| if (partitioningExpression_) |
| partitioningExpression_ = partitioningExpression_-> |
| mapAndRewrite(map,NOT mapItUp).getItemExpr(); |
| |
| } // PartitioningFunction::remapIt() |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::shouldUseSynchronousAccess() |
| // ----------------------------------------------------------------------- |
| NABoolean PartitioningFunction::shouldUseSynchronousAccess( |
| const ReqdPhysicalProperty* rpp, |
| const EstLogPropSharedPtr& inputLogProp, |
| GroupAttributes* ga) const |
| { |
| // This default implementation handles all partitioning functions |
| // except range and single partition partitioning functions. |
| |
| // Synchronous access only makes sense for range partitioned tables. |
| // So, unless the user is trying to force synchronous access |
| // (by disabling asynchronous access), then return FALSE. |
| |
| NABoolean shouldUseSynchronousAccess = FALSE; |
| |
| // NOTE: we cannot force synchronous access for non-range partitioned |
| // tables if there is a required logical order and/or arrangement! |
| // Doing so could lead to incorrect results! So, if the user is trying to |
| // force synchronous access, then they are out of luck! |
| |
| if (NOT rpp->getLogicalOrderOrArrangementFlag()) |
| { |
| const LogicalPartitioningRequirement *lpr = |
| rpp->getLogicalPartRequirement(); |
| |
| // Don't do synchronous access if the user is forcing a PAPA node |
| // and is not forcing the number of PAs |
| if ((lpr != NULL) AND lpr->getMustUsePapa() AND |
| (lpr->getNumClientsReq() == ANY_NUMBER_OF_PARTITIONS)) |
| return FALSE; |
| |
| // Get the value from the defaults table that specifies whether |
| // asynchronous access is ok. If it is not, then we need to force |
| // synchronous access. Also force synchronous access if the user |
| // forced it via C.Q. shape. |
| |
| // See if the user is trying to force synchronous access. |
| if ((CmpCommon::getDefault(ATTEMPT_ASYNCHRONOUS_ACCESS) == DF_OFF) OR |
| ((lpr != NULL) AND |
| (lpr->getNumClientsReq() != ANY_NUMBER_OF_PARTITIONS) AND |
| (lpr->getNumClientsReq() < getCountOfPartitions()))) |
| { |
| shouldUseSynchronousAccess = TRUE; |
| } |
| |
| } |
| |
| return shouldUseSynchronousAccess; |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // Virtual functions that must be redefined for derived classes. |
| // ----------------------------------------------------------------------- |
| Lng32 PartitioningFunction::getCountOfPartitions() const |
| { |
| // Redefine PartitioningFunction::getCountOfPartitions() |
| CMPABORT; |
| return 1; |
| } |
| |
| NABoolean PartitioningFunction::canProducePartitioningKeyPredicates() const |
| { |
| return TRUE; // the most common case is TRUE |
| } |
| |
| const ValueIdSet& PartitioningFunction::getPartitioningKeyPredicates() const |
| { |
| CMPASSERT(partKeyPredsCreated_); |
| return partitioningKeyPredicates_; |
| } |
| |
| const ValueIdSet& PartitioningFunction::getPartitionInputValues() const |
| { |
| CMPASSERT(partKeyPredsCreated_); |
| return partitionInputValues_; |
| } |
| |
| const ValueIdList& PartitioningFunction::getPartitionInputValuesLayout() const |
| { |
| CMPASSERT(partKeyPredsCreated_); |
| return partitionInputValuesLayout_; |
| } |
| |
| void PartitioningFunction::createPartitioningKeyPredicates() |
| { |
| // Redefine PartitioningFunction::createPartitioningKeyPredicates() |
| CMPABORT; |
| } |
| |
| void PartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Redefine PartitioningFunction::replacePivs() |
| CMPABORT; |
| } |
| |
| PartitioningFunction* |
| PartitioningFunction::createPartitioningFunctionForIndexDesc |
| (IndexDesc *idesc) const |
| { |
| // Redefine PartitioningFunction::createPartitioningFunctionForIndexDesc() |
| CMPABORT; |
| return NULL; |
| } |
| |
| ItemExpr* PartitioningFunction::createPartitioningExpression() |
| { |
| // Redefine PartitioningFunction::createPartitioningExpression() |
| CMPABORT; |
| return NULL; |
| } |
| |
| void PartitioningFunction::createPartSelectionExprFromSearchKey( |
| const ValueId beginPartSelId, |
| const ValueId endPartSelId, |
| ValueIdList &partSelectionValIds) const |
| { |
| partSelectionValIds.insert(beginPartSelId); |
| partSelectionValIds.insert(endPartSelId); |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::preCodeGen() |
| // Rewrite the partitioning keys of the partitioning function that |
| // are expressed using VEGReferences in terms of the available values. |
| // ----------------------------------------------------------------------- |
| void PartitioningFunction::preCodeGen(const ValueIdSet& availableValues) |
| { |
| ValueIdSet noExternalInputs; |
| partitioningKeyColumns_.replaceVEGExpressions |
| (availableValues, noExternalInputs, |
| FALSE, NULL, TRUE); |
| if (partitioningExpression_) |
| { |
| partitioningExpression_ = partitioningExpression_->replaceVEGExpressions( |
| availableValues, |
| noExternalInputs, |
| FALSE, |
| NULL, |
| TRUE); // deep copy because copy constructor made a shallow copy |
| partitioningExpression_->synthTypeAndValueId(TRUE); |
| } |
| |
| } // PartitioningFunction::preCodeGen() |
| |
| const NAString PartitioningFunction::getText() const |
| { |
| CMPABORT; |
| return NAString("some type of partitioning function", |
| CmpCommon::statementHeap()); |
| } |
| |
| void PartitioningFunction::setupForStatement() |
| { |
| if(setupForStatement_) |
| return; |
| |
| setupForStatement_ = TRUE; |
| resetAfterStatement_ = FALSE; |
| } |
| |
| void PartitioningFunction::resetAfterStatement() |
| { |
| if(resetAfterStatement_) |
| return; |
| |
| partitioningKeyColumns_.clear(); |
| partitioningKeyPredicates_.clear(); |
| partitionInputValues_.clear(); |
| partitionInputValuesLayout_.clear(); |
| partitionSelectionExprInputs_.clear(); |
| partKeyPredsCreated_=FALSE; |
| assignPartition_ = FALSE; |
| partitioningExpression_=NULL; |
| dataConversionErrorFlag_=NULL; |
| resetAfterStatement_ = TRUE; |
| setupForStatement_ = FALSE; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging |
| // ----------------------------------------------------------------------- |
| void PartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| BUMP_INDENT(indent); |
| |
| fprintf(ofd,"%s--Partitioning-Function----------------\n",NEW_INDENT); |
| fprintf(ofd,"%s%s (%d)\n", |
| NEW_INDENT, title, getPartitioningFunctionType()); |
| |
| fprintf(ofd,"%snumber of partitions (%d)\n", |
| NEW_INDENT, getCountOfPartitions()); |
| |
| if (NOT partitioningKeyColumns_.isEmpty()) |
| partitioningKeyColumns_.print(ofd, |
| NEW_INDENT, |
| "Partitioning Key Columns"); |
| if(partKeyPredsCreated()) |
| fprintf(ofd,"%sPartition Key Predicates Created\n",NEW_INDENT); |
| |
| if (NOT partitioningKeyPredicates_.isEmpty()) |
| partitioningKeyPredicates_.print(ofd, |
| NEW_INDENT, |
| "Partitioning Key Predicates"); |
| if (NOT partitionInputValues_.isEmpty()) |
| partitionInputValues_.print(ofd, NEW_INDENT, |
| "Partition Input Values"); |
| if (NOT partitionInputValuesLayout_.isEmpty()) |
| partitionInputValuesLayout_.print(ofd, NEW_INDENT, |
| "Partition Input Values Layout"); |
| |
| if (getPartitioningExpression()) { |
| NAString partExpr("Partitioning Expression\n", CmpCommon::statementHeap()); |
| |
| getPartitioningExpression()-> |
| unparse(partExpr, DEFAULT_PHASE, EXPLAIN_FORMAT); |
| |
| fprintf(ofd,partExpr); |
| } |
| |
| if(assignPartition()) |
| fprintf(ofd,"%sDoes Partition Assignment\n",NEW_INDENT); |
| |
| fprintf(ofd,"%s--Partitioning-Function----------------\n",NEW_INDENT); |
| |
| } // PartitioningFunction::print() |
| |
| void PartitioningFunction::display() const { print(); } |
| |
| // *********************************************************************** |
| // SinglePartitionPartitioningFunction |
| // *********************************************************************** |
| |
| SinglePartitionPartitioningFunction::~SinglePartitionPartitioningFunction() {} |
| |
| Lng32 SinglePartitionPartitioningFunction::getCountOfPartitions() const |
| { |
| return EXACTLY_ONE_PARTITION; |
| } |
| |
| PartitioningRequirement* |
| SinglePartitionPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireExactlyOnePartition(this); |
| } |
| |
| const SinglePartitionPartitioningFunction* |
| SinglePartitionPartitioningFunction::castToSinglePartitionPartitioningFunction() |
| const |
| { |
| return this; |
| } |
| |
| PartitioningFunction* |
| SinglePartitionPartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(*this, CmpCommon::statementHeap()); |
| } |
| |
| void SinglePartitionPartitioningFunction::createPartitioningKeyPredicates() |
| { |
| // do nothing, there aren't any partitioning key preds for a single |
| // partition |
| storePartitioningKeyPredicates(ValueIdSet()); |
| } |
| |
| void SinglePartitionPartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // do nothing, there aren't any pivs for a single |
| // partition |
| } |
| |
| // ----------------------------------------------------------------------- |
| // SinglePartitionPartitioningFunction::createPartitioningExpression() |
| // ----------------------------------------------------------------------- |
| ItemExpr* |
| SinglePartitionPartitioningFunction::createPartitioningExpression() |
| { |
| if (getExpression()) // already constructed? |
| return getExpression(); // reuse it! |
| |
| // --------------------------------------------------------------------- |
| // Construct an expression of the form: ConstValue(0) |
| // Allocate ValueIds and type synthesize this partitioning function. |
| // --------------------------------------------------------------------- |
| ItemExpr * partFunc = new (CmpCommon::statementHeap()) |
| Cast(new (CmpCommon::statementHeap()) SystemLiteral(0), |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE)); |
| |
| partFunc->synthTypeAndValueId(); |
| storeExpression(partFunc); |
| return partFunc; |
| } |
| |
| NABoolean SinglePartitionPartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = other.getCountOfPartitions(); |
| |
| // A single partition is a grouping of about anything. There |
| // is only one exception: the replication partitioning "function". |
| // Combining the partitions of a replication partitioning |
| // function would get us multiple copies of the data which is |
| // different from the single partition. |
| return ((other.castToReplicateViaBroadcastPartitioningFunction() == NULL) AND |
| (other.castToReplicateNoBroadcastPartitioningFunction() == NULL)); |
| |
| } |
| |
| // ----------------------------------------------------------------------- |
| // SinglePartitionPartitioningFunction::shouldUseSynchronousAccess() |
| // ----------------------------------------------------------------------- |
| NABoolean SinglePartitionPartitioningFunction::shouldUseSynchronousAccess( |
| const ReqdPhysicalProperty* rpp, |
| const EstLogPropSharedPtr& inputLogProp, |
| GroupAttributes* ga) const |
| { |
| // An unpartitioned table does not need to be accessed synchronously |
| // (or asynchronously, for that matter). So, always return FALSE. |
| |
| return FALSE; |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // Push down checking: testing condition C1.2: all tables are not |
| // partitioned. |
| // ----------------------------------------------------------------------- |
| NABoolean |
| SinglePartitionPartitioningFunction::partFuncAndFuncPushDownCompatible( |
| const PartitioningFunction& other) const |
| { |
| if ( other.castToSinglePartitionPartitioningFunction() == 0 ) |
| return FALSE; |
| |
| return TRUE; |
| } |
| |
| const NAString SinglePartitionPartitioningFunction::getText() const |
| { |
| return "exactly 1 partition"; |
| } |
| |
| void SinglePartitionPartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, |
| "SinglePartitionPartitioningFunction"); |
| } |
| |
| // *********************************************************************** |
| // ReplicateViaBroadcastPartitioningFunction |
| // *********************************************************************** |
| |
| ReplicateViaBroadcastPartitioningFunction:: |
| ~ReplicateViaBroadcastPartitioningFunction() |
| {} |
| |
| Lng32 ReplicateViaBroadcastPartitioningFunction::getCountOfPartitions() const |
| { return numberOfPartitions_; } |
| |
| const ReplicateViaBroadcastPartitioningFunction* |
| ReplicateViaBroadcastPartitioningFunction:: |
| castToReplicateViaBroadcastPartitioningFunction() const |
| { return this; } |
| |
| PartitioningFunction * |
| ReplicateViaBroadcastPartitioningFunction::scaleNumberOfPartitions( |
| Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| numberOfPartitions_ = suggestedNewNumberOfPartitions; |
| return this; |
| } |
| |
| PartitioningRequirement* |
| ReplicateViaBroadcastPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireReplicateViaBroadcast(this); |
| } |
| |
| PartitioningFunction* |
| ReplicateViaBroadcastPartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) |
| ReplicateViaBroadcastPartitioningFunction(*this); |
| } |
| |
| void ReplicateViaBroadcastPartitioningFunction:: |
| createPartitioningKeyPredicates() |
| { |
| // Do nothing, the partitioning key preds of a replication "function" |
| // are the empty set. By doing nothing we will return all data for |
| // each partition. |
| storePartitioningKeyPredicates(ValueIdSet()); |
| } |
| |
| void ReplicateViaBroadcastPartitioningFunction:: |
| replacePivs(const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Do nothing, the pivs of a replication "function" |
| // are the empty set. |
| } |
| |
| ItemExpr* |
| ReplicateViaBroadcastPartitioningFunction::createPartitioningExpression() |
| { |
| return NULL; |
| } |
| |
| NABoolean ReplicateViaBroadcastPartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| // A replication partitioning function is never a grouping of any |
| // other partitioning function than itself. See the definition of a |
| // grouping: it is created by merging two partitions of "other" |
| // zero or more times. |
| return (comparePartFuncToFunc(other) == SAME); |
| } |
| |
| const NAString ReplicateViaBroadcastPartitioningFunction::getText() const |
| { |
| char ntimes[20]; |
| sprintf(ntimes,"%d",numberOfPartitions_); |
| return NAString("broadcast ") + ntimes + " times"; |
| } |
| |
| void ReplicateViaBroadcastPartitioningFunction:: |
| print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction:: |
| print(ofd, indent, "ReplicateViaBroadcastPartitioningFunction"); |
| } |
| |
| // *********************************************************************** |
| // ReplicateNoBroadcastPartitioningFunction |
| // *********************************************************************** |
| |
| ReplicateNoBroadcastPartitioningFunction:: |
| ~ReplicateNoBroadcastPartitioningFunction() |
| {} |
| |
| Lng32 ReplicateNoBroadcastPartitioningFunction::getCountOfPartitions() const |
| { return numberOfPartitions_; } |
| |
| const ReplicateNoBroadcastPartitioningFunction* |
| ReplicateNoBroadcastPartitioningFunction:: |
| castToReplicateNoBroadcastPartitioningFunction() const |
| { return this; } |
| |
| PartitioningFunction * |
| ReplicateNoBroadcastPartitioningFunction::scaleNumberOfPartitions( |
| Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| numberOfPartitions_ = suggestedNewNumberOfPartitions; |
| return this; |
| } |
| |
| PartitioningRequirement* |
| ReplicateNoBroadcastPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireReplicateNoBroadcast(this); |
| } |
| |
| PartitioningFunction* |
| ReplicateNoBroadcastPartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) |
| ReplicateNoBroadcastPartitioningFunction(*this); |
| } |
| |
| void ReplicateNoBroadcastPartitioningFunction:: |
| createPartitioningKeyPredicates() |
| { |
| // Do nothing, the partitioning key preds of a replication "function" |
| // are the empty set. By doing nothing we will return all data for |
| // each partition. |
| storePartitioningKeyPredicates(ValueIdSet()); |
| } |
| |
| void ReplicateNoBroadcastPartitioningFunction:: |
| replacePivs(const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Do nothing, the pivs of a replication "function" |
| // are the empty set. |
| } |
| |
| ItemExpr* |
| ReplicateNoBroadcastPartitioningFunction::createPartitioningExpression() |
| { |
| return NULL; |
| } |
| |
| NABoolean ReplicateNoBroadcastPartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| // A replication partitioning function is never a grouping of any |
| // other partitioning function than itself. See the definition of a |
| // grouping: it is created by merging two partitions of "other" |
| // zero or more times. |
| return (comparePartFuncToFunc(other) == SAME); |
| } |
| |
| const NAString ReplicateNoBroadcastPartitioningFunction::getText() const |
| { |
| char ntimes[20]; |
| sprintf(ntimes,"%d",numberOfPartitions_); |
| return NAString("replicate no broadcast ") + ntimes + " times"; |
| } |
| |
| void ReplicateNoBroadcastPartitioningFunction:: |
| print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction:: |
| print(ofd, indent, "ReplicateNoBroadcastPartitioningFunction"); |
| } |
| |
| // *********************************************************************** |
| // HashPartitioningFunction |
| // *********************************************************************** |
| |
| HashPartitioningFunction::~HashPartitioningFunction() {} |
| |
| Lng32 HashPartitioningFunction::getCountOfPartitions() const |
| { return numberOfHashPartitions_; } |
| |
| const HashPartitioningFunction* |
| HashPartitioningFunction::castToHashPartitioningFunction() const |
| { return this; } |
| |
| PartitioningRequirement* |
| HashPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireHash(this); |
| } |
| |
| PartitioningFunction* |
| HashPartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) |
| HashPartitioningFunction(*this, CmpCommon::statementHeap()); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // PartitioningFunction::createBetweenPartitioningKeyPredicates() |
| // ----------------------------------------------------------------------- |
| void PartitioningFunction::createBetweenPartitioningKeyPredicates( |
| const char * pivLoName, |
| const char * pivHiName, |
| ItemExpr * partNumExpr, |
| NABoolean useHash2Split) |
| { |
| if (NOT partKeyPredsCreated() || partNumExpr != NULL) |
| { |
| ItemExpr * rootPtr; |
| ItemExpr * loPart; |
| ItemExpr * hiPart; |
| ValueIdSet setOfPartKeyPredicates; |
| |
| // by default we use the partitioning function's expression |
| // to compute the partition number |
| if (partNumExpr == NULL) |
| partNumExpr = createPartitioningExpression(); |
| |
| // compute part input values if not already done so |
| if (partitionInputValues_.isEmpty()) |
| { |
| ValueIdList partInputValues; |
| |
| // must specify PIV names if they need to be created |
| CMPASSERT(pivLoName && pivHiName); |
| |
| // the partition input values are two integer values: lo and hi part # |
| loPart = new (CmpCommon::statementHeap()) |
| HostVar(pivLoName, |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE), |
| TRUE); |
| hiPart = new (CmpCommon::statementHeap()) |
| HostVar(pivHiName, |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE), |
| TRUE); |
| loPart->synthTypeAndValueId(); |
| hiPart->synthTypeAndValueId(); |
| partInputValues.insert(loPart->getValueId()); |
| partInputValues.insert(hiPart->getValueId()); |
| storePartitionInputValues(partInputValues); |
| } |
| else |
| { |
| loPart = getPartitionInputValuesLayout()[0].getItemExpr(); |
| hiPart = getPartitionInputValuesLayout()[1].getItemExpr(); |
| } |
| |
| // ----------------------------------------------------------------- |
| // The partitioning key predicate for a hash partitioning selects |
| // a range of hash partitions with the following predicate: |
| // partNumExpr >= :loPart AND partNumExpr < :hiPart |
| // where the hash function is the partitioning function generated |
| // by the createPartitioningExpression() method and the host variables |
| // are generated here as partition input values. |
| // ----------------------------------------------------------------- |
| |
| if (useHash2Split) |
| { |
| // For a HASH2 function, the PIVs are expressed as min and max |
| // hash values to be retrieved. These need to be converted to |
| // partition numbers when we use them here. Note that we need to |
| // use the original number of HASH2 partitions here. For example, |
| // if we have a table salted with 64 salt buckets and we use |
| // 8 ESPs (8 partitions), the predicate needs to select 8 different |
| // salt values (original partitions) for each ESP. |
| CMPASSERT(isAHash2PartitioningFunction()); |
| UInt32 numOfOrigPartns = castToHash2PartitioningFunction()-> |
| getCountOfOrigHashPartitions(); |
| char numPartsString[30]; |
| |
| snprintf(numPartsString,sizeof(numPartsString),"%d",numOfOrigPartns); |
| NAString numPartsLiteral(numPartsString); |
| ConstValue *numPartns = new (CmpCommon::statementHeap()) |
| ConstValue(new (CmpCommon::statementHeap()) |
| SQLInt(CmpCommon::statementHeap(), FALSE, |
| FALSE), |
| (void *) &numOfOrigPartns, |
| (Lng32) sizeof(numOfOrigPartns), |
| &numPartsLiteral, |
| CmpCommon::statementHeap()); |
| |
| loPart = new (CmpCommon::statementHeap()) |
| Hash2Distrib(loPart, numPartns); |
| hiPart = new (CmpCommon::statementHeap()) |
| Hash2Distrib(hiPart, numPartns); |
| } |
| |
| // lower bound |
| rootPtr = new (CmpCommon::statementHeap()) |
| BiRelat(ITM_GREATER_EQ, |
| partNumExpr, |
| loPart, |
| TRUE); |
| rootPtr->synthTypeAndValueId(); |
| setOfPartKeyPredicates += rootPtr->getValueId(); |
| |
| // upper bound |
| rootPtr = new (CmpCommon::statementHeap()) |
| BiRelat((useHash2Split ? ITM_LESS_EQ : ITM_LESS), |
| partNumExpr, |
| hiPart, |
| TRUE); |
| rootPtr->synthTypeAndValueId(); |
| setOfPartKeyPredicates += rootPtr->getValueId(); |
| |
| // Store the set of key predicates in the partitioning attributes. |
| storePartitioningKeyPredicates(setOfPartKeyPredicates); |
| } |
| } // PartitioningFunction::createBetweenPartitioningKeyPredicates() |
| |
| void HashPartitioningFunction::createPartitioningKeyPredicates() |
| { |
| createBetweenPartitioningKeyPredicates( |
| "_sys_HostVarLoHashPart", |
| "_sys_HostVarHiHashPart"); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashPartitioningFunction::replacePivs() |
| // ----------------------------------------------------------------------- |
| void HashPartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Overwrite the old pivs, part key preds, and part expr. with the new ones. |
| storePartitionInputValues(newPivs); |
| storePartitioningKeyPredicates(newPartKeyPreds); |
| } // HashPartitioningFunction::replacePivs() |
| |
| COMPARE_RESULT |
| HashPartitioningFunction::comparePartFuncToFunc |
| (const PartitioningFunction &other) const |
| { |
| COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other); |
| |
| if (c != SAME) |
| return INCOMPATIBLE; |
| |
| return comparePartKeyToKey(other); |
| } |
| |
| COMPARE_RESULT |
| HashPartitioningFunction::comparePartKeyToKey |
| (const PartitioningFunction &other) const |
| { |
| if (getPartitioningFunctionType() != other.getPartitioningFunctionType()) |
| return INCOMPATIBLE; |
| |
| const HashPartitioningFunction &oth = |
| (const HashPartitioningFunction &) other; |
| |
| if (keyColumnList_.entries() != oth.keyColumnList_.entries()) |
| return INCOMPATIBLE; |
| |
| // compare the key columns and their order |
| for (CollIndex i = 0; i < keyColumnList_.entries(); i++) |
| { |
| if (keyColumnList_[i] != oth.keyColumnList_[i]) |
| return INCOMPATIBLE; |
| |
| if ( NOT (originalKeyColumnList_[i].getType() == |
| oth.originalKeyColumnList_[i].getType() ) |
| ) |
| return INCOMPATIBLE; |
| } |
| |
| return SAME; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashPartitioningFunction::isAGroupingOf() |
| // ----------------------------------------------------------------------- |
| // Right now we assume that the split function of a hash partitioning |
| // function is such that no two functions are a grouping of each other. |
| // The only exception is identity, of course. We use the base class' |
| // implementation. |
| // |
| // With more knowledge about the split function (e.g. by knowing it's |
| // a simple modulus), one could guarantee that a 4-way hash-partitioning |
| // scheme is actually a grouping of an 8-way scheme. This is not really |
| // necessary and therefore not done here. |
| |
| // ----------------------------------------------------------------------- |
| // HashPartitioningFunction::createPartitioningExpression() |
| // ----------------------------------------------------------------------- |
| ItemExpr* HashPartitioningFunction::createPartitioningExpression() |
| { |
| if (getExpression()) // already constructed? |
| return getExpression(); // reuse it! |
| |
| // --------------------------------------------------------------------- |
| // Construct an expression of the form: |
| // |
| // Modulus |
| // / \ |
| // / \ |
| // Hash <count of partitions> |
| // | |
| // (narrow(key1), narrow(key2), ..., narrow(keyN)) |
| // |
| // Allocate ValueIds and type synthesize this partitioning function. |
| // --------------------------------------------------------------------- |
| ValueIdList typedKeyCols; |
| |
| for (CollIndex i = 0; i < keyColumnList_.entries(); i++) |
| { |
| // cast the key column to the exact type of the original key column |
| const NAType &oType = originalKeyColumnList_[i].getType(); |
| |
| ItemExpr *c = getCastedItemExpre(keyColumnList_[i].getItemExpr(), oType, |
| CmpCommon::statementHeap()); |
| |
| c->synthTypeAndValueId(); |
| typedKeyCols.insert(c->getValueId()); |
| } |
| |
| ItemExpr * partFunc = |
| new (CmpCommon::statementHeap()) |
| Modulus( |
| buildHashingExpressionForExpr( |
| typedKeyCols.rebuildExprTree(ITM_ITEM_LIST) |
| ), |
| // Hash(typedKeyCols.rebuildExprTree(ITM_ITEM_LIST)), |
| new (CmpCommon::statementHeap()) |
| SystemLiteral(getCountOfPartitions())); |
| |
| // once we support late binding, and/or changing the number of target |
| // partitions at run time, the number of partitions will have |
| // to become some sort of an input value $$$$ |
| |
| partFunc->synthTypeAndValueId(); |
| storeExpression(partFunc); |
| return partFunc; |
| } // HashPartitioningFunction::createPartitioningExpression() |
| |
| ItemExpr * |
| HashPartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) const |
| { |
| return new (CmpCommon::statementHeap()) Hash(expr); |
| } |
| |
| UInt32 HashPartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len) |
| { |
| // Directly call the implementation function to compute the hash. NULL |
| // values and VARCHAR data types are not handled. |
| return FastHash(data, len); |
| } |
| |
| ItemExpr * HashPartitioningFunction::getHashingExpression() const |
| { |
| ItemExpr* hashExpr = NULL; |
| ItemExpr* partExpr = getExpression(); |
| if ( partExpr ) { |
| hashExpr = partExpr->child(0); |
| |
| CMPASSERT(hashExpr AND |
| hashExpr->getOperatorType()== ITM_HASH); |
| } |
| return hashExpr; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashPartitioningFunction::remapIt() |
| // ----------------------------------------------------------------------- |
| void HashPartitioningFunction::remapIt |
| (const PartitioningFunction* opf, |
| ValueIdMap& map, NABoolean mapItUp) |
| { |
| PartitioningFunction::remapIt(opf, map,mapItUp); |
| |
| // If we have arrived here, the original partitioning function (*opf) |
| // MUST be a HashPartitioningFunction(). |
| CMPASSERT(opf->castToHashPartitioningFunction()); |
| |
| // Clear because rewrite insists on it being so. |
| keyColumnList_.clear(); |
| |
| if (mapItUp) |
| { |
| map.rewriteValueIdListUp( |
| keyColumnList_, |
| opf->castToHashPartitioningFunction()->keyColumnList_); |
| } |
| else |
| { |
| map.rewriteValueIdListDown( |
| opf->castToHashPartitioningFunction()->keyColumnList_, |
| keyColumnList_); |
| } |
| |
| // do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL |
| |
| } // HashPartitioningFunction::remapIt() |
| |
| PartitioningFunction * HashPartitioningFunction::scaleNumberOfPartitions( |
| Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) { |
| NodeMap* newNodeMap = |
| new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions); |
| replaceNodeMap(newNodeMap); |
| } |
| |
| numberOfHashPartitions_ = suggestedNewNumberOfPartitions; |
| return this; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashPartitioningFunction::preCodeGen() |
| // Rewrite the partitioning keys of the partitioning function that |
| // are expressed using VEGReferences in terms of the available values. |
| // ----------------------------------------------------------------------- |
| void HashPartitioningFunction::preCodeGen(const ValueIdSet& availableValues) |
| { |
| ValueIdSet noExternalInputs; |
| PartitioningFunction::preCodeGen(availableValues); |
| keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs, |
| FALSE, NULL, TRUE); |
| } // HashPartitioningFunction::preCodeGen() |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging. |
| // ----------------------------------------------------------------------- |
| const NAString HashPartitioningFunction::getTextImp(const char* hashType) const |
| { |
| char nparts[20]; |
| NAString result(hashType, CmpCommon::statementHeap()); |
| result.append(" partitioned "); |
| |
| sprintf(nparts,"%d",numberOfHashPartitions_); |
| result += nparts; |
| result += " ways on ("; |
| //getPartitioningKey().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT); |
| getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT); |
| result += ")"; |
| if (result.contains("randomNum")) |
| result = NAString("round robin partitioned"); |
| return result; |
| } |
| |
| const NAString HashPartitioningFunction::getText() const |
| { |
| return getTextImp("hash"); |
| } |
| |
| void HashPartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, "HashPartitioningFunction"); |
| } // HashPartitioningFunction::print() |
| |
| // Return an expression casting an encoded skew value to oType. |
| static |
| ItemExpr* |
| getCastedSkewValueExpr(const EncodedValue& ev, const NAType& oType, CollHeap* heap) |
| { |
| double x = ev.getDblValue(); |
| return new (heap) Cast( |
| new (heap) ConstValue( |
| new (heap) SQLDoublePrecision(heap, FALSE /* no SQL NULL*/), |
| (char*)&x, sizeof(x) |
| ), |
| &oType |
| ); |
| } |
| |
| // *********************************************************************** |
| // TableHashPartitioningFunction |
| // - The externalized Hash Partitioning functions. |
| // *********************************************************************** |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction Destructor |
| // ----------------------------------------------------------------------- |
| TableHashPartitioningFunction::~TableHashPartitioningFunction() {} |
| |
| void TableHashPartitioningFunction::setupForStatement() |
| { |
| if(setupForStatement_) |
| return; |
| |
| PartitioningFunction::setupForStatement(); |
| |
| setupForStatement_ = TRUE; |
| resetAfterStatement_ = FALSE; |
| } |
| |
| void TableHashPartitioningFunction::resetAfterStatement() |
| { |
| if(resetAfterStatement_) |
| return; |
| |
| PartitioningFunction::resetAfterStatement(); |
| keyColumnList_.clear(); |
| originalKeyColumnList_.clear(); |
| numberOfPartitions_ = numberOfOrigHashPartitions_; |
| |
| setupForStatement_ = FALSE; |
| resetAfterStatement_ = TRUE; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction Safe down cast. |
| // ----------------------------------------------------------------------- |
| const TableHashPartitioningFunction* |
| TableHashPartitioningFunction::castToTableHashPartitioningFunction() const |
| { |
| return this; |
| } |
| |
| Lng32 |
| TableHashPartitioningFunction::getCountOfPartitions() const |
| { |
| return numberOfPartitions_; |
| } |
| |
| PartitioningRequirement* |
| TableHashPartitioningFunction::makePartitioningRequirement() |
| { |
| // Redefine PartitioningFunction::makePartitioningRequirement() |
| CMPABORT; |
| return NULL; |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // ----------------------------------------------------------------------- |
| void |
| TableHashPartitioningFunction::createPartitionSelectionExprInputs() |
| { |
| if (partitionSelectionExprInputs().entries() != 0) |
| return; |
| |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| // Use a host var to provide access to numParts, this will be |
| // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen() |
| // |
| char hvFabricatedName[50]; |
| sprintf(hvFabricatedName, "_sys_hostVarNumParts_%p", this); |
| ItemExpr *numParts = new (heap) |
| HostVar(hvFabricatedName, |
| // int not null |
| new (heap) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE), |
| // is system-supplied |
| TRUE); |
| numParts->synthTypeAndValueId(); |
| |
| // Use a host var to provide access to partNum, this will be |
| // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen() |
| // |
| sprintf(hvFabricatedName, "_sys_hostVarPartNo_%p", this); |
| ItemExpr *partNum = new (heap) |
| HostVar(hvFabricatedName, |
| // int not null |
| new (heap) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE), |
| // is system-supplied |
| TRUE); |
| partNum->synthTypeAndValueId(); |
| |
| // Record these hostvars as the inputs to the partitionSelectionExpr. |
| // |
| partitionSelectionExprInputs().insert(partNum->getValueId()); |
| partitionSelectionExprInputs().insert(numParts->getValueId()); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::normalizePartitioningKeys() |
| // Rewrite the partitioning keys of the partitioning function in |
| // terms of the VEGReference for the VEG to which the partitioning |
| // key column belongs. |
| // ----------------------------------------------------------------------- |
| void |
| TableHashPartitioningFunction::normalizePartitioningKeys(NormWA& normWARef) |
| { |
| PartitioningFunction::normalizePartitioningKeys(normWARef); |
| keyColumnList_.normalizeNode(normWARef); |
| |
| // don't normalize original key col list, avoid VEGies which could |
| // cause data type changes. |
| } |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::createPartitioningKeyPredicates() |
| // Since hash dist partitioning can not create any Key Predicates, |
| // this method simply creates the partition input values (PIVs). |
| // ----------------------------------------------------------------------- |
| void TableHashPartitioningFunction::createPartitioningKeyPredicates() |
| { |
| if (NOT partKeyPredsCreated()) |
| createBetweenPartitioningKeyPredicates("_sys_HostVarLoHashPart", |
| "_sys_HostVarHiHashPart"); |
| |
| // Create the partition selection input values (needed by hash2). |
| createPartitionSelectionExprInputs(); |
| } // TableHashPartitioningFunction::createPartitioningKeyPredicates() |
| |
| // ----------------------------------------------------------------------- |
| // For salted tables, since we store the hash partition value, we |
| // can generate a range predicate for the _SALT_ column |
| // ----------------------------------------------------------------------- |
| void TableHashPartitioningFunction::createPartitioningKeyPredicatesForSaltedTable( |
| ValueId saltCol) |
| { |
| // For now we only allow this call after calling the regular |
| // createPartitioningKeyPredicates() method |
| CMPASSERT(partKeyPredsCreated()); |
| |
| // this allows us to specify NULL for the names here |
| createBetweenPartitioningKeyPredicates(NULL, |
| NULL, |
| saltCol.getItemExpr(), |
| isAHash2PartitioningFunction()); |
| |
| // and we don't need to call this |
| // createPartitionSelectionExprInputs(); |
| } // TableHashPartitioningFunction::createPartitioningKeyPredicates() |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::replacePivs() |
| // ----------------------------------------------------------------------- |
| void TableHashPartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Overwrite the old pivs, part key preds, and part expr. with the new ones. |
| storePartitionInputValues(newPivs); |
| storePartitioningKeyPredicates(newPartKeyPreds); |
| } // TableHashPartitioningFunction::replacePivs() |
| |
| PartitioningFunction* |
| TableHashPartitioningFunction::createPartitioningFunctionForIndexDesc |
| (IndexDesc *idesc) const |
| { |
| // Redefine PartitioningFunction::createPartitioningFunctionForIndexDesc() |
| CMPABORT; |
| return NULL; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::remapIt() |
| // ----------------------------------------------------------------------- |
| void |
| TableHashPartitioningFunction::remapIt(const PartitioningFunction* opf, |
| ValueIdMap& map, |
| NABoolean mapItUp) |
| { |
| PartitioningFunction::remapIt(opf, map,mapItUp); |
| |
| // If we have arrived here, the original partitioning function (*opf) |
| // MUST be a TableHashPartitioningFunction(). |
| // |
| const TableHashPartitioningFunction *oth = |
| opf->castToTableHashPartitioningFunction(); |
| |
| CMPASSERT(oth); |
| |
| // Clear because rewrite insists on it being so. |
| // |
| keyColumnList_.clear(); |
| |
| if (mapItUp) |
| map.rewriteValueIdListUp(keyColumnList_, oth->keyColumnList_); |
| else |
| map.rewriteValueIdListDown(oth->keyColumnList_, keyColumnList_); |
| |
| // do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL |
| |
| } // TableHashPartitioningFunction::remapIt() |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::preCodeGen() |
| // Rewrite the partitioning keys of the partitioning function that |
| // are expressed using VEGReferences in terms of the available values. |
| // ----------------------------------------------------------------------- |
| void |
| TableHashPartitioningFunction::preCodeGen(const ValueIdSet& availableValues) |
| { |
| ValueIdSet noExternalInputs; |
| |
| PartitioningFunction::preCodeGen(availableValues); |
| |
| keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs, |
| FALSE, NULL, TRUE); |
| |
| } // TableHashPartitioningFunction::preCodeGen() |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::createPartitioningExpression() |
| // ----------------------------------------------------------------------- |
| ItemExpr* TableHashPartitioningFunction::createPartitioningExpression() |
| { |
| return TableHashPartitioningFunction::createPartitioningExpressionImp(FALSE); |
| } // TableHashPartitioningFunction::createPartitioningExpression() |
| |
| ItemExpr* |
| TableHashPartitioningFunction::createPartitioningExpressionImp(NABoolean doVarCharCast) |
| { |
| // already constructed with the same doVarCharCast argument? |
| if (getExpression() && doVarCharCast == doVarCharCast_ ) |
| return getExpression(); // reuse it! |
| |
| doVarCharCast_ = doVarCharCast; |
| |
| ValueIdList typedKeyCols; |
| |
| const ValueIdList &keyColumnList = getKeyColumnList(); |
| const ValueIdList &originalKeyColumnList = getOriginalKeyColumnList(); |
| |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| for (CollIndex i = 0; i < keyColumnList.entries(); i++) |
| { |
| const NAType &oType = originalKeyColumnList[i].getType(); |
| ItemExpr *c; |
| ItemExpr *dataConversionErrorFlag = getConvErrorExpr(); |
| if (dataConversionErrorFlag == 0) |
| { |
| dataConversionErrorFlag = |
| new (heap) HostVar("_sys_repartConvErrorFlg", |
| new (heap) SQLInt(heap, TRUE,FALSE), |
| TRUE); |
| storeConvErrorExpr(dataConversionErrorFlag); |
| } |
| |
| // Note that we always generate a Narrow operator here, even |
| // if it's not necessary. Because keyColumnList[i] may be a |
| // VEGReference it may not be possible to determine its |
| // exact type at this time. We could eliminate some cases of |
| // Narrow in the preCodeGen phase but this is not done yet. |
| |
| NAType* finalTypePtr = oType.newCopy(heap); |
| |
| if ( oType.getTypeQualifier() == NA_CHARACTER_TYPE && |
| doVarCharCast == TRUE ) { |
| |
| ValueId vId = originalKeyColumnList[i]; |
| ValueIdSet vidSet; |
| |
| CharType *maxCharType = NULL; |
| |
| switch (vId.getItemExpr()->getOperatorType()) { |
| case ITM_VEG_PREDICATE: |
| case ITM_VEG_REFERENCE: |
| { |
| vId.getItemExpr()->findAll(ITM_BASECOLUMN, vidSet, TRUE, TRUE); |
| for (ValueId x=vidSet.init(); vidSet.next(x); vidSet.advance(x)) |
| { |
| const CharType &ctype = (CharType&)(x.getType()); |
| |
| if ( maxCharType == NULL OR |
| maxCharType->getNominalSize() < ctype.getNominalSize() OR |
| (maxCharType->getNominalSize() == ctype.getNominalSize() AND |
| maxCharType->getStrCharLimit() < ctype.getStrCharLimit()) |
| ) |
| maxCharType = &(CharType&)x.getType() ; |
| } |
| finalTypePtr = maxCharType->equivalentVarCharType(heap); |
| } |
| |
| break; |
| |
| case ITM_INDEXCOLUMN: |
| case ITM_BASECOLUMN: |
| // For skewbuster+OCR nested join, the partition key column |
| // is not a VEG or VEGref since the equality predicates have |
| // been pushed down to the inner table. Here we cast the |
| // column type to equivalent VarCHAR, regardless of the length |
| // of the other column in the join predicate. The optimization |
| // works fine because we just want to trim the trailing |
| // spaces and then hash the trimmed value. |
| finalTypePtr = ((CharType&)(vId.getType())).equivalentVarCharType(heap); |
| break; |
| |
| default: |
| break; |
| } |
| } |
| |
| // leave the statement heap to delete the SQLVarChar object, if created |
| c = new (heap) Narrow(keyColumnList[i].getItemExpr(), |
| dataConversionErrorFlag, |
| finalTypePtr, |
| ITM_NARROW, |
| FALSE /* reverseDataErrorConversionFlag */, |
| TRUE /* match child nullability */); |
| |
| c->synthTypeAndValueId(); |
| |
| typedKeyCols.insert(c->getValueId()); |
| } |
| |
| ItemExpr * partFunc = buildPartitioningExpression(typedKeyCols); |
| |
| #if 0 |
| //=================================================================== |
| // Test the classification expression |
| //=================================================================== |
| ItemExpr* isSkewExpr = createClassificationExpressionForSkewedValues(); |
| |
| if ( isSkewExpr ) { |
| ValueIdList exprs; |
| exprs.insert(isSkewExpr->getValueId()); |
| |
| char resultBuf[2000]; |
| ex_expr::exp_return_type evalReturnCode = exprs.evalAtCompileTime |
| (0, ExpTupleDesc::SQLARK_EXPLODED_FORMAT, resultBuf, 2000); |
| } |
| #endif |
| |
| // once we support late binding, and/or changing the number of target |
| // partitions at run time, the number of partitions will have |
| // to become some sort of an input value $$$$ |
| partFunc->synthTypeAndValueId(); |
| storeExpression(partFunc); |
| |
| #if 0 // test |
| buildHashListForSkewedValues(); |
| #endif |
| |
| return partFunc; |
| |
| |
| } // TableHashPartitioningFunction::createPartitioningExpressionImp() |
| |
| // ----------------------------------------------------------------------- |
| // TableHashPartitioningFunction::createPartitionSelectionExpr() |
| // Create the partition selection expression. 'Partition selection' |
| // means that an expression is used to determine the partition to |
| // access as opposed to using the File System to determine the range |
| // of partitions to access based on a set of partitioning key |
| // predicates. Partition selection is currently used for Hash Dist and |
| // Round Robin Partitioning. And the File System is used for Range |
| // Partitioning. If a partitioning selection expression is created, |
| // it is cached in the data member 'partitionSelectionExpr_' and the |
| // partition selection inputs are generated and stored in |
| // 'partitionSelectionExprInputs_'. This method is redefined for |
| // TableHashPartitioningFunction and RoundRobinPartitioningFunction. |
| // ----------------------------------------------------------------------- |
| ItemExpr * |
| TableHashPartitioningFunction:: |
| createPartitionSelectionExpr(const SearchKey *partSearchKey, |
| const ValueIdSet &availableValues) |
| { |
| |
| // If it has already been created, return cached version. |
| // |
| if(partitionSelectionExpr()) |
| return partitionSelectionExpr(); |
| |
| // For now, only support a partition selection expression when the |
| // partition search key is unique (identifies exactly one partition). |
| // |
| if(!partSearchKey->isUnique()) { |
| return NULL; |
| } else { |
| |
| CollHeap *heap = CmpCommon::statementHeap(); |
| const ValueIdList &keyColumns = partSearchKey->getKeyColumns(); |
| const ValueIdList &keyValues = partSearchKey->getBeginKeyValues(); |
| ValueIdList newValues; |
| |
| createPartitionSelectionExprInputs(); |
| const ValueIdList &partSelExprInputs = partitionSelectionExprInputs(); |
| ItemExpr *numParts = partSelExprInputs[1].getItemExpr(); |
| |
| // Construct the list of key values. Make sure that they are cast |
| // to the exact type of the original key columns since the Hash |
| // function is sensitive to the type of its inputs. Must use the |
| // original key columns, since the keyColumnList may have been |
| // changed. |
| // |
| ValueIdList typedKeyCols; |
| const ValueIdList &originalKeyColumnList = getOriginalKeyColumnList(); |
| |
| for (CollIndex i = 0; i < keyColumns.entries(); i++) |
| { |
| // cast the key value to the exact type of the original key column |
| const NAType &oType = originalKeyColumnList[i].getType(); |
| |
| ItemExpr *c; |
| ItemExpr *dataConversionErrorFlag = getConvErrorExpr(); |
| if (dataConversionErrorFlag == 0) |
| { |
| dataConversionErrorFlag = |
| new (heap) HostVar("_sys_repartConvErrorFlg", |
| new (heap) SQLInt(heap, TRUE,FALSE), |
| TRUE); |
| storeConvErrorExpr(dataConversionErrorFlag); |
| } |
| |
| // Note that we always generate a Narrow operator here, even |
| // if it's not necessary. Because keyColumnList[i] may be a |
| // VEGReference it may not be possible to determine its |
| // exact type at this time. We could eliminate some cases of |
| // Narrow in the preCodeGen phase but this is not done yet. |
| // |
| NAType * finalTypePtr = oType.newCopy(heap); |
| c = new (heap) Narrow(keyValues[i].getItemExpr(), |
| dataConversionErrorFlag, |
| finalTypePtr); |
| c->synthTypeAndValueId(); |
| typedKeyCols.insert(c->getValueId()); |
| } |
| |
| // Construct and cache the partitionSelectionExpr for Table Hash. |
| // |
| partitionSelectionExpr() = |
| buildPartitioningSelectionExpr(typedKeyCols, numParts); |
| |
| // Bind the expression |
| // |
| partitionSelectionExpr()->synthTypeAndValueId(); |
| |
| // PreCodeGen the expression (This maybe should go in |
| // TableHashPartitioningFunction::preCodeGen(), but preCodeGen() is |
| // typically called before this expression is generated. |
| // |
| partitionSelectionExpr()->replaceVEGExpressions(availableValues, |
| availableValues); |
| |
| return partitionSelectionExpr(); |
| } |
| } // TableHashPartitioningFunction::createPartitionSelectionExpr() |
| |
| // ----------------------------------------------------------------------- |
| // Make a new partSearchKey that indicates that PA_PARTITION_GROUPING |
| // is being done. Note that a search key can not be generated which |
| // can group hashed partitions. For TableHashPartitioning, a flag in |
| // the search key is used to indicate that PA_PARTITION_GROUPING is |
| // being done and the begin/end key values of the search key are set |
| // to the partition input values of the partitioning function. |
| // ----------------------------------------------------------------------- |
| SearchKey * |
| TableHashPartitioningFunction::createSearchKey(const IndexDesc *indexDesc, |
| ValueIdSet availInputs, |
| ValueIdSet additionalPreds) const |
| { |
| ValueIdSet preds(getPartitioningKeyPredicates()); |
| ValueIdSet nonKeyColumnSet; // empty set |
| SearchKey *partSearchKey = NULL; |
| |
| availInputs += getPartitionInputValues(); |
| preds += additionalPreds; |
| |
| if (indexDesc->getPrimaryTableDesc()->getNATable()->isHbaseTable()) |
| { |
| // The HbaseAccess executor operator doesn't have a separate |
| // place to handle partitioning key predicates. Instead, we can |
| // only use them as regular key or executor predicates. |
| // For salted tables, we have a chance to read a range of salt |
| // values through a begin/end key. |
| partSearchKey = new (CmpCommon::statementHeap()) |
| SearchKey(indexDesc->getIndexKey(), |
| indexDesc->getOrderOfKeyValues(), |
| availInputs, |
| TRUE, |
| preds, |
| nonKeyColumnSet, |
| indexDesc); |
| } |
| else |
| { |
| // Call this special constructor that constructs a search key for a |
| // TableHashPartitioningFunction. |
| // |
| partSearchKey = new (CmpCommon::statementHeap()) |
| SearchKey(indexDesc->getPartitioningKey(), |
| indexDesc->getOrderOfPartitioningKeyValues(), |
| availInputs, |
| preds, |
| this, |
| nonKeyColumnSet, |
| indexDesc); |
| } |
| |
| return partSearchKey; |
| } // TableHashPartitioningFunction::createSearchKey() |
| |
| |
| ItemExpr * |
| TableHashPartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) |
| const |
| { |
| CollHeap *heap = CmpCommon::statementHeap(); |
| return new (heap) HashDistPartHash(expr); |
| } |
| |
| UInt32 TableHashPartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len) |
| { |
| // Directly call the implementation function to compute the hash. NULL |
| // values and VARCHAR data types are not handled. |
| return ExHDPHash::hash(data, flags, len); |
| } |
| |
| ItemExpr * TableHashPartitioningFunction::getHashingExpression() const |
| { |
| ItemExpr* hashExpr = NULL; |
| ItemExpr* partExpr = getExpression(); |
| if ( partExpr ) { |
| hashExpr = partExpr->child(0); |
| |
| CMPASSERT(hashExpr); |
| |
| if ( hashExpr->getOperatorType() == ITM_PROGDISTRIB ) { |
| hashExpr = hashExpr->child(0); |
| CMPASSERT(hashExpr); |
| } |
| |
| CMPASSERT ( hashExpr->getOperatorType() == ITM_HDPHASH); |
| } |
| return hashExpr; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction Destructor |
| // ----------------------------------------------------------------------- |
| HashDistPartitioningFunction::~HashDistPartitioningFunction() {} |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction Safe down cast. |
| // ----------------------------------------------------------------------- |
| const HashDistPartitioningFunction* |
| HashDistPartitioningFunction::castToHashDistPartitioningFunction() const |
| { |
| return this; |
| } |
| |
| PartitioningRequirement* |
| HashDistPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireHashDist(this); |
| } |
| |
| PartitioningFunction* |
| HashDistPartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) HashDistPartitioningFunction(*this); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging. |
| // ----------------------------------------------------------------------- |
| const NAString HashDistPartitioningFunction::getText() const |
| { |
| NAString result("hash1 partitioned "); |
| |
| char nparts[40]; |
| sprintf(nparts,"%d (%d) ways on (", numberOfPartitions_, |
| numberOfOrigHashPartitions_); |
| result += nparts; |
| |
| getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT); |
| result += ")"; |
| |
| return result; |
| } |
| |
| void HashDistPartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, "HashDistPartitioningFunction"); |
| } // TableHashPartitioningFunction::print() |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction::createPartitioningFunctionForIndexDesc() |
| // ----------------------------------------------------------------------- |
| PartitioningFunction* |
| HashDistPartitioningFunction:: |
| createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const |
| { |
| const NAFileSet * fileSet = idesc->getNAFileSet(); |
| const NAColumnArray & allColumns = fileSet->getAllColumns(); |
| const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns(); |
| |
| CollIndex ixColNumber; |
| ValueId keyValueId; |
| ValueIdSet partitioningKey; |
| ValueIdList partitioningKeyList; |
| |
| for (CollIndex i = 0; i < partKeyColumns.entries(); i++) |
| { |
| // which column of the index is this (usually this will be == i) |
| ixColNumber = allColumns.index(partKeyColumns[i]); |
| |
| // insert the value id of the index column into the partitioning |
| // key column value id set |
| keyValueId = idesc->getIndexColumns()[ixColNumber]; |
| partitioningKey += keyValueId; |
| partitioningKeyList.insertAt(i,keyValueId); |
| } // end loop over partitioning key columns |
| |
| // ----------------------------------------------------------------- |
| // Allocate a new HashPartitioningFunction. |
| // ----------------------------------------------------------------- |
| HashDistPartitioningFunction *partFunc = new(idesc->wHeap()) |
| HashDistPartitioningFunction (partitioningKey, |
| partitioningKeyList, |
| getCountOfPartitions(), |
| getNodeMap()->copy(idesc->wHeap())); |
| |
| // ----------------------------------------------------------------- |
| // Construct the partitioning key predicates. |
| // ----------------------------------------------------------------- |
| partFunc->createPartitioningKeyPredicates(); |
| |
| return partFunc; |
| |
| } // HashDistPartitioningFunction::createPartitioningFunctionForIndexDesc() |
| |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction::comparePartFuncToFunc(): Compare this |
| // partitioning function to another hash dist function. To be 'SAME' |
| // must have the same number and order of partitioning key columns and |
| // have the same number of partitions (scaled and original). |
| // ----------------------------------------------------------------------- |
| COMPARE_RESULT |
| HashDistPartitioningFunction:: |
| comparePartFuncToFunc(const PartitioningFunction &other) const |
| { |
| COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other); |
| |
| if (c != SAME) |
| return INCOMPATIBLE; |
| |
| const HashDistPartitioningFunction *oth = |
| other.castToHashDistPartitioningFunction(); |
| |
| // Since they compared 'SAME', oth should always exist, so this |
| // test is redundant. |
| // |
| if(!oth) |
| return INCOMPATIBLE; |
| |
| // They must be based on the same physical partitioning. |
| // |
| if (getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions()) |
| return INCOMPATIBLE; |
| |
| // Make sure that the keys are in the same order. |
| // |
| if (keyColumnList_.entries() != oth->keyColumnList_.entries()) |
| return INCOMPATIBLE; |
| |
| for (CollIndex i = 0; i < keyColumnList_.entries(); i++) |
| { |
| if (keyColumnList_[i] != oth->keyColumnList_[i]) |
| return INCOMPATIBLE; |
| |
| if ( NOT (originalKeyColumnList_[i].getType() == |
| oth->originalKeyColumnList_[i].getType() ) |
| ) |
| return INCOMPATIBLE; |
| } |
| |
| return SAME; |
| } // HashDistPartitioningFunction::comparePartFuncToFunc() |
| |
| |
| PartitioningFunction * |
| HashDistPartitioningFunction:: |
| scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| // Allow arbitrary scaling down of HashDistPartitioningFunction. |
| // (The runtime will handle the mapping of physical partitions |
| // to logical partitions.) |
| // Scaling up (to a greater number of partitions is not allowed for |
| // HashDistPartitioningFunction. |
| // |
| suggestedNewNumberOfPartitions = |
| (suggestedNewNumberOfPartitions > numberOfPartitions_) |
| ? numberOfPartitions_ |
| : suggestedNewNumberOfPartitions; |
| |
| numberOfPartitions_ = suggestedNewNumberOfPartitions; |
| |
| return this; |
| } // HashDistPartitioningFunction::scaleNumberOfPartitions() |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction::isAGroupingOf() |
| // ----------------------------------------------------------------------- |
| NABoolean |
| HashDistPartitioningFunction::isAGroupingOf(const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| const HashDistPartitioningFunction *oth = |
| other.castToHashDistPartitioningFunction(); |
| |
| // If other is not a HashDistPartitioningFunction, then it cannot |
| // be a grouping of... |
| if (oth == NULL) |
| return FALSE; |
| |
| if (keyColumnList_.entries() != oth->keyColumnList_.entries()) |
| return FALSE; |
| |
| // compare the key columns and their order |
| for (CollIndex i = 0; i < keyColumnList_.entries(); i++) |
| { |
| if (keyColumnList_[i] != oth->keyColumnList_[i]) |
| return FALSE; |
| if ( NOT (originalKeyColumnList_[i].getType() == |
| oth->originalKeyColumnList_[i].getType() ) |
| ) |
| return FALSE; |
| } |
| |
| // If this function has more partitions than other, |
| // then it cannot be a grouping of. |
| // Eg. this.numParts: 10 this.origNumParts: 20 |
| // oth.numParts: 5 oth.origNumParts: 20 |
| // |
| // If the two functions are not based on the same physical function, |
| // then it cannot be a grouping of. |
| // Eg. this.numParts: 10 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 30 |
| // |
| if((getCountOfPartitions() > oth->getCountOfPartitions()) || |
| (getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())) { |
| return FALSE; |
| } |
| |
| // Here the following is known to be TRUE: |
| // |
| // (getCountOfPartitions() <= oth->getCountOfPartitions() |
| // |
| // AND |
| // |
| // (getCountOfOrigHashPartitions() == oth->getCountOfOrigHashPartitions()) |
| // |
| // Eg. this.numParts: 10 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| |
| // If other has not been scaled (allow arbitrary scaling of one function): |
| // Eg. this.numParts: 7 this.origNumParts: 20 |
| // oth.numParts: 20 oth.origNumParts: 20 |
| // OR |
| // If they have both been scaled to the same number of partitions: |
| // then it is a grouping of. |
| // Eg. this.numParts: 7 this.origNumParts: 20 |
| // oth.numParts: 7 oth.origNumParts: 20 |
| // |
| if((oth->getCountOfPartitions() == oth->getCountOfOrigHashPartitions()) || |
| (getCountOfPartitions() == oth->getCountOfPartitions())) { |
| |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = |
| ((oth->getCountOfPartitions() + getCountOfPartitions() - 1) |
| / getCountOfPartitions()); |
| |
| return TRUE; |
| } |
| |
| // WARNING..... I am not sure if the current code can ever produce |
| // a situation that would bring control to here. Also, I am not |
| // sure if the semantics of GROUPING implemented below are correct |
| // for these situations. |
| // |
| // Here the following is known to be TRUE: |
| // |
| // both functions have been scaled. (I DON'T THINK THIS CAN HAPPEN) |
| // |
| // AND |
| // |
| // They are scaled to different sizes. |
| // |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 7 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| |
| // Under these conditions, three things must be true for it to be a |
| // grouping of: |
| // |
| // - the scaled number of partitions must evenly divide the scaled |
| // number of partitions of other |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // - the other scaling must be a multiple of or evenly divide the |
| // original number of partitions |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 40 oth.origNumParts: 20 |
| // |
| // - this scaling must also be a multiple of or evenly divide the |
| // original number of partitions |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 40 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| |
| // If the scaled number of partitions evenly divides the scaled |
| // number of partitions of other... |
| // |
| if((oth->getCountOfPartitions() % getCountOfPartitions()) != 0) { |
| return FALSE; |
| } |
| |
| // AND the other scaling is a multiple of or evenly divides the |
| // original number of partitions... |
| // |
| if(oth->getCountOfOrigHashPartitions() >= oth->getCountOfPartitions()) { |
| if(oth->getCountOfOrigHashPartitions() % oth->getCountOfPartitions()) { |
| return FALSE; |
| } |
| } else { |
| if(oth->getCountOfPartitions() % oth->getCountOfOrigHashPartitions()) { |
| return FALSE; |
| } |
| } |
| |
| // AND this scaling is a multiple of or evenly divides the original |
| // number of partitions ... |
| // |
| if(getCountOfOrigHashPartitions() >= getCountOfPartitions()) { |
| if(getCountOfOrigHashPartitions() % getCountOfPartitions()) { |
| return FALSE; |
| } |
| } else { |
| if(getCountOfPartitions() % getCountOfOrigHashPartitions()) { |
| return FALSE; |
| } |
| } |
| |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = |
| ((oth->getCountOfPartitions() + getCountOfPartitions() - 1) |
| / getCountOfPartitions()); |
| |
| // THEN it is a grouping of... |
| // |
| return TRUE; |
| |
| } // HashDistPartitioningFunction::isAGroupingOf() |
| |
| // ----------------------------------------------------------------------- |
| // - Create expressions for the constant values for the original and |
| // scale number of partitions. Create the hash1 (hash distrib) |
| // function, PAGroup, and return the result. |
| // ----------------------------------------------------------------------- |
| ItemExpr * |
| HashDistPartitioningFunction::buildPartitioningExpression( |
| const ValueIdList &keyCols) const |
| { |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE); |
| char buffer[20]; |
| |
| // Create a ConstValue expression containing the original number of hash |
| // partitions. |
| Lng32 numParts = getCountOfOrigHashPartitions(); |
| sprintf(buffer, "%d", numParts); |
| NAString numPartsStr("origNumParts"); |
| ItemExpr *origNumParts = |
| new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr); |
| |
| // Create a ConstValue expression containing the scaled number of hash |
| // partitions. |
| numParts = getCountOfPartitions(); |
| sprintf(buffer, "%d", numParts); |
| numPartsStr = "scaledNumParts"; |
| ItemExpr *scaledNumParts = new (heap) |
| ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr); |
| |
| // Create the hash distribution function. |
| // partitions. |
| ItemExpr *partFunc = |
| new (heap) |
| ProgDistrib(new (heap) HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), origNumParts); |
| |
| // Add a PAGroup expression and return the partitioning function |
| partFunc = new (heap) PAGroup(partFunc, scaledNumParts, origNumParts); |
| return partFunc; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction::buildPartitioningSelectionExpr |
| // - build the expression used during partition selection |
| // ----------------------------------------------------------------------- |
| ItemExpr * |
| HashDistPartitioningFunction::buildPartitioningSelectionExpr( |
| const ValueIdList &keyCols, |
| ItemExpr *numParts) const |
| { |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| ItemExpr *partFunc = |
| new (heap) |
| ProgDistrib(new (heap) |
| HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), |
| numParts); |
| |
| return partFunc; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction Destructor |
| // ----------------------------------------------------------------------- |
| Hash2PartitioningFunction::~Hash2PartitioningFunction() {} |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction Safe down cast. |
| // ----------------------------------------------------------------------- |
| const Hash2PartitioningFunction* |
| Hash2PartitioningFunction::castToHash2PartitioningFunction() const |
| { |
| return this; |
| } |
| |
| PartitioningRequirement* |
| Hash2PartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireHash2(this); |
| } |
| |
| PartitioningFunction* |
| Hash2PartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) Hash2PartitioningFunction(*this); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging. |
| // ----------------------------------------------------------------------- |
| const NAString Hash2PartitioningFunction::getText() const |
| { |
| NAString result("hash2 partitioned "); |
| |
| char nparts[32]; |
| sprintf(nparts,"%d ways on (", numberOfPartitions_); |
| result += nparts; |
| |
| getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT); |
| result += ")"; |
| |
| return result; |
| } |
| |
| void Hash2PartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, "Hash2PartitioningFunction"); |
| } // TableHashPartitioningFunction::print() |
| |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction::createPartitioningFunctionForIndexDesc() |
| // ----------------------------------------------------------------------- |
| PartitioningFunction* |
| Hash2PartitioningFunction:: |
| createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const |
| { |
| const NAFileSet * fileSet = idesc->getNAFileSet(); |
| const NAColumnArray & allColumns = fileSet->getAllColumns(); |
| const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns(); |
| |
| CollIndex ixColNumber; |
| ValueId keyValueId; |
| ValueIdSet partitioningKey; |
| ValueIdList partitioningKeyList; |
| |
| for (CollIndex i = 0; i < partKeyColumns.entries(); i++) |
| { |
| // which column of the index is this (usually this will be == i) |
| ixColNumber = allColumns.index(partKeyColumns[i]); |
| |
| // insert the value id of the index column into the partitioning |
| // key column value id set |
| keyValueId = idesc->getIndexColumns()[ixColNumber]; |
| partitioningKey += keyValueId; |
| partitioningKeyList.insertAt(i,keyValueId); |
| } // end loop over partitioning key columns |
| |
| // ----------------------------------------------------------------- |
| // Allocate a new HashPartitioningFunction. |
| // ----------------------------------------------------------------- |
| Hash2PartitioningFunction *partFunc = new(idesc->wHeap()) |
| Hash2PartitioningFunction (partitioningKey, |
| partitioningKeyList, |
| getCountOfPartitions(), |
| getNodeMap()->copy(idesc->wHeap())); |
| |
| partFunc->setRestrictedBeginPartNumber(getRestrictedBeginPartNumber()); |
| partFunc->setRestrictedEndPartNumber(getRestrictedEndPartNumber()); |
| |
| // ----------------------------------------------------------------- |
| // Construct the partitioning key predicates. |
| // ----------------------------------------------------------------- |
| partFunc->createPartitioningKeyPredicates(); |
| |
| return partFunc; |
| |
| } // Hash2PartitioningFunction::createPartitioningFunctionForIndexDesc() |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction::createPartSelectionExprFromSearchKey |
| // is called from PartitionAccess::preCodeGen() to create expressions |
| // for determining the beginning and ending partition numbers that |
| // partition access must deal with. Other partitioning schemes |
| // pass partition numbers, but Hash2 passes hash boundaries. Hash2 |
| // passes hash boundaries instead to allow a single ESP to work with |
| // groupings of tables with different numbers of physical partitions. |
| // ----------------------------------------------------------------------- |
| void Hash2PartitioningFunction::createPartSelectionExprFromSearchKey( |
| const ValueId beginPartSelId, |
| const ValueId endPartSelId, |
| ValueIdList &partSelectionValIds) const |
| { |
| ItemExpr *beginPartSelExpr = new (CmpCommon::statementHeap()) |
| Hash2Distrib(beginPartSelId.getItemExpr(), |
| partitionSelectionExprInputs()[1].getItemExpr()); |
| beginPartSelExpr->synthTypeAndValueId(); |
| |
| ItemExpr *endPartSelExpr = new (CmpCommon::statementHeap()) |
| Hash2Distrib(endPartSelId.getItemExpr(), |
| partitionSelectionExprInputs()[1].getItemExpr()); |
| endPartSelExpr->synthTypeAndValueId(); |
| |
| partSelectionValIds.insert(beginPartSelExpr->getValueId()); |
| partSelectionValIds.insert(endPartSelExpr->getValueId()); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction::comparePartFuncToFunc(): Compare this |
| // partitioning function to another hash2 function. To be 'SAME' |
| // must have the same number and order of partitioning key columns and |
| // the same number of scaled partitions. |
| // ----------------------------------------------------------------------- |
| COMPARE_RESULT |
| Hash2PartitioningFunction:: |
| comparePartFuncToFunc(const PartitioningFunction &other) const |
| { |
| COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other); |
| |
| if (c != SAME) |
| return INCOMPATIBLE; |
| |
| const Hash2PartitioningFunction *oth = |
| other.castToHash2PartitioningFunction(); |
| |
| // Since they compared 'SAME', oth should always exist, so this |
| // test is redundant. |
| // |
| if(!oth) |
| return INCOMPATIBLE; |
| |
| // If OLD_HASH2_GROUPING is turned on, then the original number of |
| // partitions must be equal. The OLD_HASH2_GROUPING CQD is for test |
| // purposes and will most likely be removed after the new hash2 |
| // grouping has been tested extensively. |
| if (CmpCommon::getDefault(OLD_HASH2_GROUPING) == DF_ON && |
| getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions()) |
| return INCOMPATIBLE; |
| |
| // The normal behavior of hash2 grouping is that the scaled |
| // number of partitions must be equal. |
| if (getCountOfPartitions() != oth->getCountOfPartitions()) |
| return INCOMPATIBLE; |
| |
| // Make sure that the keys are in the same order. |
| // |
| if (keyColumnList_.entries() != oth->keyColumnList_.entries()) |
| return INCOMPATIBLE; |
| |
| for (CollIndex i = 0; i < keyColumnList_.entries(); i++) |
| { |
| if (keyColumnList_[i] != oth->keyColumnList_[i]) |
| return INCOMPATIBLE; |
| |
| if ( NOT (originalKeyColumnList_[i].getType() == |
| oth->originalKeyColumnList_[i].getType() ) |
| ) |
| return INCOMPATIBLE; |
| } |
| |
| return SAME; |
| |
| } // Hash2PartitioningFunction::comparePartFuncToFunc() |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction::scaleNumberOfPartitions() |
| // ----------------------------------------------------------------------- |
| PartitioningFunction * |
| Hash2PartitioningFunction:: |
| scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| // Do not allow suggestedNewNumberOfPartitions greater than |
| // the number of partitions. |
| suggestedNewNumberOfPartitions = |
| (suggestedNewNumberOfPartitions > numberOfPartitions_) |
| ? numberOfPartitions_ |
| : suggestedNewNumberOfPartitions; |
| |
| // Only allow a suggestedNewNumberOfPartitions that evenly |
| // divides into the number of partitions. |
| CMPASSERT(suggestedNewNumberOfPartitions > 0); |
| while (numberOfPartitions_ % suggestedNewNumberOfPartitions != 0) |
| suggestedNewNumberOfPartitions--; |
| |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) { |
| NodeMap* newNodeMap = |
| new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions); |
| replaceNodeMap(newNodeMap); |
| } |
| |
| numberOfPartitions_ = suggestedNewNumberOfPartitions; |
| |
| return this; |
| |
| } // Hash2PartitioningFunction::scaleNumberOfPartitions() |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction::isAGroupingOf() |
| // ----------------------------------------------------------------------- |
| NABoolean |
| Hash2PartitioningFunction::isAGroupingOf(const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| const Hash2PartitioningFunction *oth = |
| other.castToHash2PartitioningFunction(); |
| |
| // If other is not a Hash2PartitioningFunction, then it cannot |
| // be a grouping of... |
| if (oth == NULL) |
| return FALSE; |
| |
| if (getCountOfPartitions() > oth->getCountOfPartitions() || |
| oth->getCountOfPartitions() % getCountOfPartitions() != 0) |
| return FALSE; |
| |
| // For testing purposes, it was requested that the ability to |
| // provide grouping similar to hash1 be provided. If the |
| // OLD_HASH2_GROUPING CQD is true, then only allow grouping |
| // when the number of original partitions is the same. |
| if (CmpCommon::getDefault(OLD_HASH2_GROUPING) == DF_ON && |
| getCountOfPartitions() != oth->getCountOfOrigHashPartitions()) |
| return FALSE; |
| |
| if (keyColumnList_.entries() != oth->keyColumnList_.entries()) |
| return FALSE; |
| |
| // compare the key columns and their order |
| for (CollIndex i = 0; i < keyColumnList_.entries(); i++) |
| { |
| if (keyColumnList_[i] != oth->keyColumnList_[i]) |
| return FALSE; |
| if ( NOT (originalKeyColumnList_[i].getType() == |
| oth->originalKeyColumnList_[i].getType() ) |
| ) |
| return FALSE; |
| } |
| |
| // This is a grouping of. Set the maxPartsPerGroup and return TRUE. |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = oth->getCountOfPartitions() / getCountOfPartitions(); |
| |
| return TRUE; |
| |
| } // Hash2PartitioningFunction::isAGroupingOf() |
| |
| // ----------------------------------------------------------------------- |
| // Hash2PartitioningFunction::buildPartitioningExpression() |
| // - Create an expression for the Hash2 partitioning function. |
| // ----------------------------------------------------------------------- |
| ItemExpr * |
| Hash2PartitioningFunction::buildPartitioningExpression(const ValueIdList &keyCols) const |
| { |
| CollHeap *heap = CmpCommon::statementHeap(); |
| NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE); |
| |
| // Build a ConstValue expression of the scaled number of partitions. |
| Lng32 numParts = getCountOfPartitions(); |
| char buffer[20]; |
| sprintf(buffer, "%d", numParts); |
| NAString numPartsStr("scaledNumParts"); |
| ItemExpr *scaledNumParts = new (heap) |
| ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr); |
| |
| // Create the hash2 partitioning function expression and return it. |
| ItemExpr *partFunc = new (heap) Hash2Distrib( |
| new (heap) HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), scaledNumParts |
| ); |
| |
| return partFunc; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HashDistPartitioningFunction::buildPartitioningSelectionExpr |
| // - build the expression used during partition selection |
| // ----------------------------------------------------------------------- |
| ItemExpr * |
| Hash2PartitioningFunction::buildPartitioningSelectionExpr( |
| const ValueIdList &keyCols, |
| ItemExpr *numParts) const |
| { |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| ItemExpr *partFunc = new (heap) |
| Hash2Distrib(new (heap) |
| HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), |
| numParts); |
| |
| return partFunc; |
| } |
| |
| // A refactored method whose implementaion is originally contained inside |
| // method HashPartitioningFunction::createPartitioningExpression(). |
| ItemExpr* |
| PartitioningFunction::getCastedItemExpre(ItemExpr* iv, const NAType& oType, CollHeap* heap) |
| { |
| ItemExpr *dataConversionErrorFlag = getConvErrorExpr(); |
| if (dataConversionErrorFlag == 0) |
| { |
| dataConversionErrorFlag = |
| new (CmpCommon::statementHeap()) HostVar( |
| "_sys_repartConvErrorFlg", |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE), |
| TRUE); |
| storeConvErrorExpr(dataConversionErrorFlag); |
| } |
| |
| // Note that we always generate a Narrow operator here, even |
| // if it's not necessary. Because keyColumnList_[i] may be a |
| // VEGReference it may not be possible to determine its |
| // exact type at this time. We could eliminate some cases of |
| // Narrow in the preCodeGen phase but this is not done yet. |
| // Begin_Fix 10-040114-2431 |
| // 02/18/2004 |
| // changed statement below for above mentioned solution |
| ItemExpr* c = new (CmpCommon::statementHeap()) Narrow( |
| iv, |
| dataConversionErrorFlag, |
| &oType, |
| ITM_NARROW, |
| FALSE, |
| TRUE); // TRUE => make nullability of Narrow same as child's |
| // End_Fix 10-040114-2431 |
| |
| ((Narrow*)c)->setMatchChildType(TRUE); |
| return c; |
| } |
| |
| |
| // *********************************************************************** |
| // SkewedDataPartitioningFunction |
| // *********************************************************************** |
| |
| SkewedDataPartitioningFunction::SkewedDataPartitioningFunction( |
| PartitioningFunction* partFuncForUnskewed, |
| const skewProperty& sk, |
| NAMemory* heap |
| ) |
| : PartitioningFunction(SKEWEDDATA_PARTITIONING_FUNCTION, NULL, heap), |
| partialPartFunc_(partFuncForUnskewed), skewProperty_(sk), |
| skewHashList_(NULL) |
| { |
| CMPASSERT(NOT sk.isAnySkew()); |
| CMPASSERT(partFuncForUnskewed->canHandleSkew()); |
| ValueIdList pivl; |
| ValueIdSet pivs; |
| |
| // Create the partition input variable and partitioning key for this |
| // function. The main purpose of this is to produce a partitioning key. |
| // The skew buster partitioning function doesn't have a real |
| // partitioning key. So, one cannot really decide from a row in |
| // which partition it is (at least not for a skew element row). |
| // Because we need to have some partitioning key, we basically say |
| // "you have to know the partition number in order to compute the |
| // partition number". This is another way of saying that the |
| // partition number cannot be computed from the row. We achieve this |
| // by making the PIV the partitioning key. |
| // If needed, we can also make the PIV the partitioning expression. We |
| // cannot make a partitioning key expression with this method, however. |
| createPIV(pivl); |
| pivs = pivl; |
| setPartKey(pivs); |
| } |
| |
| void SkewedDataPartitioningFunction::createPIV(ValueIdList &partInputValues) |
| { |
| // Create a single partition input value |
| // |
| ItemExpr *dummyPIV = new (CmpCommon::statementHeap()) |
| HostVar("_sys_dummySkewBusterPartNo", |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE), |
| TRUE); |
| dummyPIV->synthTypeAndValueId(); |
| |
| // the partition input value is one integer, which is also used as |
| // the partitioning key |
| partInputValues.insert(dummyPIV->getValueId()); |
| |
| // Store the partition input values. |
| // |
| storePartitionInputValues(partInputValues); |
| } |
| |
| void SkewedDataPartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| partialPartFunc_ -> replacePivs(newPivs, newPartKeyPreds); |
| } |
| |
| SkewedDataPartitioningFunction::SkewedDataPartitioningFunction( |
| const SkewedDataPartitioningFunction& other, NAMemory* heap) |
| : PartitioningFunction(other, heap), |
| skewHashList_(NULL), // recompute this |
| partialPartFunc_(other.partialPartFunc_), // may need a deep copy |
| skewProperty_(other.skewProperty_) // share the skew property |
| { |
| } |
| |
| Lng32 SkewedDataPartitioningFunction::getCountOfPartitions() const |
| { |
| return partialPartFunc_->getCountOfPartitions(); |
| } |
| |
| void SkewedDataPartitioningFunction::createPartitioningKeyPredicates() |
| { |
| // do nothing, there aren't any partitioning key preds for a skew |
| // partition |
| storePartitioningKeyPredicates(ValueIdSet()); |
| partialPartFunc_ -> createPartitioningKeyPredicates(); |
| } |
| |
| PartitioningRequirement* |
| SkewedDataPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) RequireSkewed(this); |
| } |
| |
| PartitioningFunction* |
| SkewedDataPartitioningFunction::copy() const |
| { |
| SkewedDataPartitioningFunction *result; |
| |
| result = new (CmpCommon::statementHeap()) |
| SkewedDataPartitioningFunction(*this); |
| result->partialPartFunc_ = partialPartFunc_->copy(); |
| |
| return result; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging. |
| // ----------------------------------------------------------------------- |
| const NAString SkewedDataPartitioningFunction::getText() const |
| { |
| NAString result = partialPartFunc_->getText(); |
| |
| // SKEW_EXPLAIN = off --> do no display any details on skew-processing |
| // method or skew value list. |
| if ( CmpCommon::getDefault(SKEW_EXPLAIN) == DF_OFF ) |
| return result; |
| |
| NAString pat, suffix; |
| |
| switch ( skewProperty_.getIndicator() ) { |
| case skewProperty::UNIFORM_DISTRIBUTE: |
| suffix = "-ud"; |
| break; |
| case skewProperty::BROADCAST: |
| suffix = "-br"; |
| break; |
| default: |
| break; |
| } |
| |
| switch (partialPartFunc_ -> getPartitioningFunctionType()) { |
| case HASH_PARTITIONING_FUNCTION: |
| pat = "hash"; |
| break; |
| case HASH_DIST_PARTITIONING_FUNCTION: |
| pat = "hash1"; |
| break; |
| case HASH2_PARTITIONING_FUNCTION: |
| pat = "hash2"; |
| break; |
| default: |
| break; |
| } |
| size_t loc = result.index(pat); |
| |
| if ( loc != NA_NPOS ) { |
| result.insert(loc+pat.length(), suffix); |
| } |
| |
| result += skewProperty_.getText(); |
| |
| return result; |
| } |
| |
| void SkewedDataPartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, "SkewedDataPartitioningFunction"); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // SkewedDataPartitioningFunction::comparePartFuncToFunc(): Compare this |
| // partitioning function to another hash2 function. To be 'SAME' |
| // must have the same number and order of partitioning key columns and |
| // the same number of scaled partitions. |
| // ----------------------------------------------------------------------- |
| COMPARE_RESULT |
| SkewedDataPartitioningFunction:: |
| comparePartFuncToFunc(const PartitioningFunction &other) const |
| { |
| // the other has to be a SkewedDataPartitioningFunction |
| const SkewedDataPartitioningFunction *oth = |
| other.castToSkewedDataPartitioningFunction(); |
| |
| if(!oth) return INCOMPATIBLE; |
| |
| // compare the two partial partfuncs. |
| COMPARE_RESULT c = partialPartFunc_->comparePartFuncToFunc( |
| *(oth->getPartialPartitioningFunction()) |
| ); |
| |
| if (c != SAME) return INCOMPATIBLE; |
| |
| // compare the skew property last. |
| if ( NOT (skewProperty_ == oth->skewProperty_) ) |
| return INCOMPATIBLE; |
| |
| |
| return SAME; |
| |
| } // SkewedDataPartitioningFunction::comparePartFuncToFunc() |
| |
| // ----------------------------------------------------------------------- |
| // SkewedDataPartitioningFunction::scaleNumberOfPartitions() |
| // ----------------------------------------------------------------------- |
| |
| |
| //::scaleNUmberOfPartitions() are called in following locations |
| // |
| // OptPhysRelExpr.cpp |
| // 3536 NJ::genLeftChild() |
| // 4892 NJ:createContextForChild() (rightPartFunc->) |
| // 14166 and 14262 synthDP2PhysicalProperty() |
| // |
| // GP.cpp |
| // 955 GroupAttributes::recommendedOrderForNJProbing() |
| // |
| //As a result, the following version will not be called. |
| |
| PartitioningFunction * |
| SkewedDataPartitioningFunction:: |
| scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| PartitioningFunction *mayBeNewPartfunc = |
| partialPartFunc_->scaleNumberOfPartitions( |
| suggestedNewNumberOfPartitions, partGroupDist); |
| |
| if ( mayBeNewPartfunc != partialPartFunc_ ) |
| partialPartFunc_ = mayBeNewPartfunc; |
| |
| return this; |
| |
| } // SkewedDataPartitioningFunction::scaleNumberOfPartitions() |
| |
| // ----------------------------------------------------------------------- |
| // SkewedDataPartitioningFunction::isAGroupingOf() |
| // ----------------------------------------------------------------------- |
| NABoolean |
| SkewedDataPartitioningFunction::isAGroupingOf(const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| return FALSE; |
| } // SkewedDataPartitioningFunction::isAGroupingOf() |
| |
| // ----------------------------------------------------------------------- |
| // SkewedDataPartitioningFunction::copyAndRemap() |
| // ----------------------------------------------------------------------- |
| PartitioningFunction* |
| SkewedDataPartitioningFunction::copyAndRemap |
| (ValueIdMap& map, NABoolean mapItUp) const |
| { |
| SkewedDataPartitioningFunction *newPartFunc = |
| (SkewedDataPartitioningFunction *) |
| copy()->castToSkewedDataPartitioningFunction(); |
| |
| CMPASSERT(skewProperty_.getIndicator() != skewProperty::BROADCAST); |
| |
| newPartFunc->remapIt(this, map, mapItUp); |
| newPartFunc->partialPartFunc_->remapIt(partialPartFunc_, map, mapItUp); |
| |
| return newPartFunc; |
| |
| } // SkewedDataPartitioningFunction::copyAndRemap() |
| |
| void |
| SkewedDataPartitioningFunction::preCodeGen(const ValueIdSet& availableValues) |
| { |
| PartitioningFunction::preCodeGen(availableValues); |
| partialPartFunc_->preCodeGen(availableValues); |
| } |
| |
| ItemExpr* SkewedDataPartitioningFunction::createPartitioningExpression() |
| { |
| CMPASSERT(partialPartFunc_ -> isAHash2PartitioningFunction() == TRUE); |
| |
| // cast to TableHashPartitioningFunction class (the parent class |
| // of Hash2PartitioningFunction) which has the method |
| // createPartitioningExpressionImp() defined. |
| ItemExpr* partExpr = ((TableHashPartitioningFunction*)partialPartFunc_) |
| -> createPartitioningExpressionImp(TRUE /*do varchar cast*/); |
| |
| storeExpression(partExpr); |
| return partExpr; |
| } |
| |
| // *********************************************************************** |
| // RangePartitionBoundaries |
| // *********************************************************************** |
| |
| // ----------------------------------------------------------------------- |
| // Constructor for RangePartitionBoundaries |
| // ----------------------------------------------------------------------- |
| RangePartitionBoundaries::RangePartitionBoundaries |
| (Lng32 numberOfPartitions, |
| Lng32 numberOfPartitioningKeyColumns, NAMemory *h) |
| : partKeyColumnCount_(numberOfPartitioningKeyColumns), |
| origPartKeyColumnCount_(numberOfPartitioningKeyColumns), |
| partitionCount_(numberOfPartitions), |
| origPartitionCount_(numberOfPartitions), |
| boundaryStringsList_(h,numberOfPartitions+1), |
| boundaryValuesList_(h,numberOfPartitions+1), |
| boundaryValues_(h,numberOfPartitions+1), |
| binaryBoundaryValues_(h,numberOfPartitions+1), |
| encodedBoundaryKeyLength_(-1), |
| setBinaryBoundaryFirstLastKey_(FALSE), |
| setupForStatement_(FALSE), |
| resetAfterStatement_(FALSE), |
| heap_(h) |
| { |
| // MUST be given non zero counts |
| |
| // make two dummy entries for the first and last key |
| boundaryValuesList_.insertAt(0,NULL); |
| boundaryValuesList_.insertAt(numberOfPartitions,NULL); |
| |
| boundaryStringsList_.insertAt(0, NULL); |
| boundaryStringsList_.insertAt(numberOfPartitions,NULL); |
| |
| } // RangePartitionBoundaries::RangePartitionBoundaries() |
| |
| RangePartitionBoundaries::~RangePartitionBoundaries() {} |
| |
| void RangePartitionBoundaries::defineUnboundBoundary |
| (Lng32 partitionNumber, |
| const ItemExpr * unboundBoundaryValue, |
| const char *encodedKeyValue) |
| { |
| // must insert a 'true' boundary between the first and last (pre-allocated) |
| // entry, this gives us n insertion points for a table with n partitions, |
| // new entry delimits partition <partitionNumber> |
| CMPASSERT(partitionNumber > 0 AND |
| partitionNumber <= partitionCount_ AND |
| unboundBoundaryValue != NULL AND |
| encodedKeyValue != NULL); |
| boundaryValuesList_.insertAt(partitionNumber, unboundBoundaryValue); |
| |
| |
| // encodedKeyValue should be NULL only for SQL/MP tables |
| if (encodedKeyValue) |
| binaryBoundaryValues_.insertAt(partitionNumber, encodedKeyValue); |
| } |
| |
| void RangePartitionBoundaries::bindAddBoundaryValue(Lng32 partitionNumber) |
| { |
| |
| if(!boundaryValuesList_[partitionNumber]){ |
| boundaryValues_.insertAt(partitionNumber, NULL); |
| return; |
| } |
| |
| //get unbound boundary value |
| ItemExpr * unboundBoundaryValue = ((ItemExpr *)boundaryValuesList_[partitionNumber])-> |
| copyTree(CmpCommon::statementHeap()); |
| //new (CmpCommon::statementHeap()) |
| //ItemExpr(*boundaryValuesList_[partitionNumber]); |
| |
| //bind the boundary value |
| unboundBoundaryValue->synthTypeAndValueId(); |
| |
| ItemExprList * boundBoundaryValue = new (CmpCommon::statementHeap()) |
| ItemExprList(unboundBoundaryValue, |
| CmpCommon::statementHeap()); |
| |
| boundaryValues_.insertAt(partitionNumber, boundBoundaryValue); |
| |
| if (!boundaryStringsList_.used(partitionNumber)) { |
| NAString result; |
| |
| // use QUERY_FORMAT to obtain the full SQL text. |
| boundBoundaryValue->unparse(result, OPTIMIZER_PHASE, QUERY_FORMAT); |
| |
| boundaryStringsList_.insertAt(partitionNumber, new (heap_) NAString(result)); |
| |
| } |
| } |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitionBoundaries::defineBoundary() |
| // ----------------------------------------------------------------------- |
| void RangePartitionBoundaries::defineBoundary |
| (Lng32 partitionNumber, |
| const ItemExprList* boundaryValue, |
| const char *encodedKeyValue) |
| { |
| // must insert a 'true' boundary between the first and last (pre-allocated) |
| // entry, this gives us n insertion points for a table with n partitions, |
| // new entry delimits partition <partitionNumber> |
| CMPASSERT(partitionNumber > 0 AND |
| partitionNumber <= partitionCount_ AND |
| boundaryValue != NULL AND |
| encodedKeyValue != NULL); |
| boundaryValues_.insertAt(partitionNumber, boundaryValue); |
| |
| // encodedKeyValue should be NULL only for SQL/MP tables |
| if (encodedKeyValue) |
| binaryBoundaryValues_.insertAt(partitionNumber, encodedKeyValue); |
| } // RangePartitionBoundaries::defineBoundary() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitionBoundaries::checkConsistency() |
| // ----------------------------------------------------------------------- |
| void RangePartitionBoundaries::checkConsistency( |
| const Lng32 numberOfPartitions) const |
| { |
| // If n = numberOfPartitions then entries 1 through n-1 have to be |
| // present. Each entry delimits the boundary between two partitions. |
| // Entries 0 and n contain the minimum and maximum permissible |
| // values, respectively. Entries 0 and n are initialized to NULL in |
| // the constructor and later added by method |
| // RangePartitioningFunction::completePartitionBoundaries() |
| CMPASSERT(numberOfPartitions == partitionCount_ AND |
| (Lng32)boundaryValues_.entries() == partitionCount_ + 1); |
| |
| // check the actual boundaries after the key length of the encoded |
| // key and the key column value ids have been entered, but not before |
| if (encodedBoundaryKeyLength_ > 0) |
| { |
| CMPASSERT(boundaryValues_.entries() == binaryBoundaryValues_.entries()); |
| |
| for (CollIndex index = 1; |
| index < (CollIndex)numberOfPartitions; |
| index++) |
| { |
| // boundary values in SQL format and encoded key format must be |
| // filled in |
| CMPASSERT(boundaryValues_[index]); |
| CMPASSERT(binaryBoundaryValues_[index]); |
| |
| // encoded key values must be in ascending order |
| // CMPASSERT(str_cmp(binaryBoundaryValues_[index-1], |
| // binaryBoundaryValues_[index], |
| // encodedBoundaryKeyLength_) < 0); |
| } |
| } |
| } // RangePartitionBoundaries::checkConsistency() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitionBoundaries::compareRangePartitionBoundaries() |
| // ----------------------------------------------------------------------- |
| NABoolean RangePartitionBoundaries::compareRangePartitionBoundaries( |
| const RangePartitionBoundaries& other, |
| NABoolean groupingAllowed, |
| Lng32* maxPartsPerGroup) const |
| { |
| NABoolean match = FALSE; |
| CollIndex i; |
| CollIndex thisNumPartKeyCols,otherNumPartKeyCols; |
| const ItemExprList *thisBoundaryValue; |
| const ItemExprList *otherBoundaryValue; |
| const ConstValue *thisConstValue; |
| const ConstValue *otherConstValue; |
| NABoolean thisIsNegated = FALSE; |
| NABoolean otherIsNegated = FALSE; |
| Lng32 currentPartsPerGroup = 1; |
| Lng32 numOfPartsInLastGroup = 1; |
| |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; // start with the minimum |
| |
| // Give up now if "other" has fewer partitions than "this" |
| if ( other.partitionCount_ < partitionCount_) |
| { |
| return FALSE; |
| } |
| |
| // NOTE: We CANNOT compare the encoded values! Two values that are |
| // the SAME will compare as NOT THE SAME if the types of the |
| // underlying columns are different. |
| // |
| // So, we must compare the boundaryValues_ instead. The |
| // boundaryValues_ are an array of pointers to ItemExpr Lists |
| // which contain pointers to ConstValue nodes. |
| // |
| // We go through the array of ItemExpr Lists, and |
| // first compare the ValueId's of the ItemExpr Lists which |
| // contain pointers to the ItemExpr ConstValue nodes that represent |
| // the first key values. If these match, great - |
| // the first keys must be the same. But, they will only have the same |
| // ValueId if they are from the same table, or if the datatypes of |
| // the partitioning key columns are exactly the same. |
| // If they don't have the same ValueId, then we need to compare |
| // the original raw ASCII text as was present in the DDL. |
| // We go through the ConstValue nodes in ItemExpr lists one |
| // by one, and compare them. We use the "getConstStr" method |
| // of the ConstValue class to do this. This method gets the |
| // original raw text as typed in by the user, and also appends |
| // the character set type. This may be important if a user |
| // is joining two tables where the character set types of the |
| // partitioning key columns are different. |
| // |
| // Go through the boundaries of "this" and try to find them |
| // in "other" Don't bother with the first and last entry, |
| // they are always the same. When all is said and done, we |
| // must have found a match in other for every partition boundary |
| // from "this". The opposite assertion - that every partition |
| // boundary in "other" found a match in "this" - is not necessary |
| // if grouping is allowed. |
| CollIndex myix = 1; |
| CollIndex otherix = 1; |
| while (myix < (CollIndex) partitionCount_ AND |
| otherix < (CollIndex) other.partitionCount_) |
| { |
| match = FALSE; |
| thisBoundaryValue = boundaryValues_[myix]; |
| otherBoundaryValue = other.boundaryValues_[otherix]; |
| |
| // Check for the simple case: are the ValueId's the same? |
| if (*thisBoundaryValue == *otherBoundaryValue) |
| { |
| match = TRUE; |
| } |
| else |
| { |
| // Traverse the ItemExprList and compare each ConstValue node |
| // in "this" to the one in "other". The first keys are the |
| // same only if they both have the same number of first keys, |
| // they are either both negated or neither are, |
| // and the string text of each ConstValue node is |
| // exactly the same. |
| thisNumPartKeyCols = thisBoundaryValue->entries(); |
| otherNumPartKeyCols = otherBoundaryValue->entries(); |
| if (thisNumPartKeyCols != otherNumPartKeyCols) |
| { |
| // If the two partitioning functions specified a different |
| // number of first key values, then they cannot possibly |
| // have equivalent partitioning schemes. |
| return FALSE; |
| } |
| else |
| { |
| // Hope for the best... |
| match = TRUE; |
| for (i = 0;i < thisNumPartKeyCols AND match; i++) |
| { |
| thisIsNegated = FALSE; |
| otherIsNegated = FALSE; |
| thisConstValue = |
| ((*thisBoundaryValue)[i])->castToConstValue(thisIsNegated); |
| otherConstValue = |
| ((*otherBoundaryValue)[i])->castToConstValue(otherIsNegated); |
| if ((thisIsNegated != otherIsNegated) OR |
| thisConstValue == NULL OR |
| otherConstValue == NULL OR |
| (thisConstValue->getConstStr(FALSE) != |
| otherConstValue->getConstStr(FALSE))) |
| match = FALSE; |
| } |
| } |
| } |
| |
| if (match) |
| { |
| // Found this boundary in "other". Advance to the next |
| // start key in both "this" and "other". |
| myix++; |
| otherix++; |
| if ((maxPartsPerGroup != NULL) AND |
| (currentPartsPerGroup > *maxPartsPerGroup)) |
| { |
| *maxPartsPerGroup = currentPartsPerGroup; |
| } |
| currentPartsPerGroup = 1; // clear |
| } |
| else if (groupingAllowed) |
| { |
| // No match. Advance to the next start key in "other" |
| // to see if that will match. |
| otherix++; |
| currentPartsPerGroup++; |
| } |
| else |
| { |
| // These boundaries don't match, and since grouping is not allowed, |
| // we are out of luck. Give up now. |
| return FALSE; |
| } |
| } // end while more partitions |
| |
| // Since we didn't look at the last group, we need to compute the |
| // number of partitions in it and see if it is the group with |
| // the largest number of partitions. |
| numOfPartsInLastGroup = (other.partitionCount_ - otherix) + 1; |
| if ((maxPartsPerGroup != NULL) AND |
| (numOfPartsInLastGroup > *maxPartsPerGroup)) |
| { |
| *maxPartsPerGroup = numOfPartsInLastGroup; |
| } |
| |
| if (myix == (CollIndex) partitionCount_) |
| { |
| // We have found all of our partition boundaries |
| // in the other range partition boundaries object. |
| return TRUE; |
| } |
| else |
| { |
| // Even with grouping, the partition boundaries don't match. Too bad. |
| return FALSE; |
| } |
| } // RangePartitionBoundaries::compareRangePartitionBoundaries() |
| |
| //<pb> |
| //============================================================================== |
| // Merge two range partition boundaries to produce new range partition |
| // boundaries. Also produce a node map corresponding to the new range partition |
| // boundaries. |
| // |
| // Input: |
| // other -- other partition boundary map with which to merge. |
| // thisNodeMap -- node map associated with this RangePartitionBoundaries |
| // object. |
| // |
| // Output: |
| // resultNodeMap -- node map associated with resulting RangepartitionBoundaries |
| // object. |
| // |
| // Return: |
| // Pointer to merged partition boundaries. |
| // |
| //============================================================================== |
| RangePartitionBoundaries* |
| RangePartitionBoundaries::merge(const RangePartitionBoundaries& other, |
| const NodeMap& thisNodeMap, |
| NodeMap& resultNodeMap) const |
| { |
| // need binary boundary values to do a merge |
| if (encodedBoundaryKeyLength_ == 0 OR |
| other.encodedBoundaryKeyLength_ == 0) |
| return NULL; |
| |
| // left is "this", right is "other", |
| // merge boundaries 0...entries of "this" with boundaries 1...entries-1 |
| // of "other" (don't use -infinity and +infinity of right) |
| CollIndex thisix = 0; |
| CollIndex otherix = 1; |
| CollIndex resultix = 0; |
| CollIndex maxthis = boundaryValues_.entries(); |
| CollIndex maxother = other.boundaryValues_.entries()-1; |
| CollIndex maxboth = maxthis + maxother; |
| RangePartitionBoundaries *result = new(CmpCommon::statementHeap()) |
| RangePartitionBoundaries(1,partKeyColumnCount_,CmpCommon::statementHeap()); |
| Lng32 compResult; |
| |
| CMPASSERT(partKeyColumnCount_ == other.partKeyColumnCount_ AND |
| encodedBoundaryKeyLength_ == other.encodedBoundaryKeyLength_); |
| |
| // continue merging boundaries while at least one index has not reached its |
| // ending value. |
| while (thisix < maxthis OR otherix < maxother) |
| { |
| |
| // The final boundary has no corresponding node map entry. |
| // In other words, the number of boundaries is always one more than |
| // the number of node map entries. |
| if (maxboth - thisix - otherix > 1) |
| { |
| |
| // Since node maps have one less entry than boundary maps, ensure |
| // that the node map's index does not fall off the end of the node |
| // map. |
| CollIndex nodeMapIx = MINOF(thisix, thisNodeMap.getNumEntries() - 1); |
| |
| // Store node map entry associated with current boundary. |
| const NodeMapEntry* entry = thisNodeMap.getNodeMapEntry(nodeMapIx); |
| resultNodeMap.setNodeMapEntry(resultix, |
| *entry, |
| CmpCommon::statementHeap()); |
| } |
| |
| // other index reached max, so comparison is automatically "less" |
| if (otherix >= maxother) |
| { |
| compResult = -1; |
| } |
| else |
| { |
| |
| // this index reached max, so comparison is automatically "more" |
| if (thisix >= maxthis) |
| { |
| compResult = 1; |
| } |
| else |
| { |
| |
| // neither index reached max, so do an actual comparison of all |
| // 'real' boundaries in the encoded representation (the first |
| // and last boundaries are always the same) |
| compResult = str_cmp(binaryBoundaryValues_[thisix], |
| other.binaryBoundaryValues_[otherix], |
| encodedBoundaryKeyLength_); |
| } |
| |
| } |
| |
| // take the smallest boundary value and insert it into result |
| if (compResult <= 0) |
| { |
| // "this" has the smaller (or equal) boundary value |
| result->boundaryValues_.insertAt( |
| resultix, |
| boundaryValues_[thisix]); |
| result->binaryBoundaryValues_.insertAt( |
| resultix, |
| binaryBoundaryValues_[thisix]); |
| thisix++; |
| |
| // if the boundaries are identical, skip both of them |
| if (compResult == 0) |
| otherix++; |
| } |
| else |
| { |
| // "other" has the smaller boundary value |
| result->boundaryValues_.insertAt( |
| resultix, |
| other.boundaryValues_[otherix]); |
| result->binaryBoundaryValues_.insertAt( |
| resultix, |
| other.binaryBoundaryValues_[otherix]); |
| otherix++; |
| } |
| |
| // Increment result index for next boundary. |
| resultix++; |
| |
| } |
| |
| result->encodedBoundaryKeyLength_ = encodedBoundaryKeyLength_; |
| result->partitionCount_ = resultix - 1; |
| |
| return result; |
| } // RangePartitionBoundaries::merge |
| |
| Lng32 RangePartitionBoundaries::getOptimizedNumberOfPartKeys() |
| { |
| // Do an optimization of the partitioning key: what is passed in is |
| // currently the clustering key. Count the actual number of first |
| // keys that are specified in the DDL and only use the max. number |
| // of items instead of all clustering key columns. Example: |
| // |
| // create table t (a int, b int, c int, primary key (a,b desc,c) |
| // partition (add first key (1), add first key (2,2)); |
| // |
| // In this case, the partitioning key order is not (a,b desc,c), but |
| // (a, b desc). This optimization should be done in CATMAN. Remove |
| // this method once that is done. |
| // |
| CollIndex numPartKeyCols = 0; |
| for (CollIndex i = 1; i < (CollIndex) partitionCount_; i++) |
| { |
| // Are there more values listed than we assume as partitioning |
| // key columns? Adjust our assumption if needed. |
| |
| if ( boundaryValues_[i] ) { |
| CollIndex numKeySpecs = boundaryValues_[i]->entries(); |
| if (numKeySpecs > numPartKeyCols) |
| numPartKeyCols = numKeySpecs; |
| } else { |
| // no boundary values specified in SQL syntax |
| // (e.g. when we got binary region boundaries only) |
| numPartKeyCols = partKeyColumnCount_; |
| break; |
| } |
| } |
| |
| return numPartKeyCols; |
| } |
| |
| Lng32 RangePartitionBoundaries::scaleNumberOfPartitions( |
| Lng32 suggestedNewNumberOfPartitions, |
| const NodeMap* nodeMap, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| |
| // * * * * * * * * * * * * * I M P O R T A N T * * * * * * * * * * * * * * |
| // * * |
| // * THIS MEMBER FUNCTION USES THE SAME GROUPING ALGORITHM AS MEMBER * |
| // * FUNCTION NodeMap::deriveGrouping(). ANY CHANGES TO THIS MEMBER * |
| // * FUNCTION WOULD NECESSITATE CHANGES TO MEMBER FUNCTION * |
| // * NodeMap::deriveGrouping() AS WELL. * |
| // * * |
| // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * |
| |
| // For now, the only way of scaling the number of partitions |
| // is by eliminating part of the boundaries (deleting all but |
| // every i-th boundary where i is an integer number). |
| |
| // There are two problems with this limitation: the suggestion may |
| // ask for an increase in the number of partitions, or the suggestion |
| // may cause an imbalance because the suggested number times i is not |
| // equal to the existing number of partitions. |
| // Here are the heuristics that we are using until we get smarter: |
| // |
| // H1: Treat the suggested number as an upper limit of new partitions. |
| // Often, the suggestion is the number of CPUs available and plans |
| // with 13 partitions don't run very well on 12 CPUs. |
| // |
| // H2: Distribute the old partitions such that the number of old partitions |
| // for each new partition varies at most by one (a new partition has |
| // either o or o+1 old partitions in it). |
| // |
| // We have two ways of distributing the partitions: "uniformly" (off |
| // by at most one) distribute the number of physical partitions |
| // amongst the groups, or uniformly distribute the number of active |
| // partitions amongst the groups. The latter is preferable if there |
| // are many inactive partitions and the active partitions are not |
| // uniformly distributed amongst them. Actually, if the active |
| // partition estimate is correct, we cannot go wrong distributing |
| // the partitions based on the number of active partitions. |
| |
| // In cases where this would combine partitions from two or more clusters |
| // it is better not to form such groups, as this would limit autonomy |
| // and also lead to more messages between clusters. In such cases, this |
| // method will return with the original partition boundary arrays. |
| |
| // Make copies in case we need to undo. |
| |
| ARRAY(const ItemExprList *) undoBoundaryValues(boundaryValues_); |
| ARRAY(const char *) undoBinaryBoundaryValues(binaryBoundaryValues_); |
| |
| // As already said, we can't increase the number of partitions |
| if (suggestedNewNumberOfPartitions >= partitionCount_) |
| return partitionCount_; |
| |
| CMPASSERT(suggestedNewNumberOfPartitions > 1); // sanity check |
| |
| NABoolean useAPDistribution = FALSE; |
| |
| // Determine partition grouping distribution algorithm to use |
| if (partGroupDist == DEFAULT_PARTITION_GROUPING) |
| { |
| if (CmpCommon::getDefault(BASE_NUM_PAS_ON_ACTIVE_PARTS) == DF_ON) |
| useAPDistribution = TRUE; |
| else |
| useAPDistribution = FALSE; |
| } |
| else if (partGroupDist == UNIFORM_PHYSICAL_PARTITION_GROUPING) |
| { |
| useAPDistribution = FALSE; |
| } |
| else if (partGroupDist == UNIFORM_ACTIVE_PARTITION_GROUPING) |
| { |
| useAPDistribution = TRUE; |
| } |
| |
| Lng32 newix = 1; |
| Lng32 prevPartStart = 0; |
| |
| if (useAPDistribution) |
| { |
| // Uniform number of active partitions grouping |
| |
| CostScalar activePartitions = |
| ((NodeMap *)(nodeMap))->getNumActivePartitions(); |
| Lng32 numActiveParts = (Lng32)activePartitions.getValue(); |
| |
| if (suggestedNewNumberOfPartitions > numActiveParts) |
| return partitionCount_; |
| |
| Lng32 currentNumActiveParts = 0; |
| Lng32 numActivePartsPerGroup = numActiveParts / suggestedNewNumberOfPartitions; |
| Lng32 remainder = numActiveParts % suggestedNewNumberOfPartitions; |
| Lng32 oldix = 1; |
| Lng32 partNum = 0; |
| NodeMapEntry::PartitionState partState; |
| NABoolean isPartActive = FALSE; |
| |
| while ((partNum < partitionCount_) AND |
| (newix != suggestedNewNumberOfPartitions)) |
| { |
| // Determine if the current partition is active |
| partState = nodeMap->getNodeMapEntry((CollIndex)partNum)-> |
| getPartitionState(); |
| isPartActive = ((partState == NodeMapEntry::ACTIVE) OR |
| (partState == NodeMapEntry::ACTIVE_NO_DATA)); |
| |
| if ((remainder != 0) AND |
| ((newix - 1) + remainder == suggestedNewNumberOfPartitions)) |
| { |
| // all the remaining groups get one extra active partition |
| numActivePartsPerGroup++; |
| remainder = 0; // so we won't do it again |
| } |
| |
| if (isPartActive) |
| { |
| // one more active partition for this group |
| currentNumActiveParts++; |
| if (currentNumActiveParts == numActivePartsPerGroup) |
| { |
| // full group - time to end it, so check that partns on same cluster |
| if (nodeMap->isMultiCluster(prevPartStart, oldix, TRUE)) |
| { |
| // More than one cluster in a group - undo. |
| boundaryValues_ = undoBoundaryValues; |
| binaryBoundaryValues_ = undoBinaryBoundaryValues; |
| return partitionCount_; |
| } |
| // Transfer the values over |
| boundaryValues_[newix] = boundaryValues_[oldix]; |
| binaryBoundaryValues_[newix] = binaryBoundaryValues_[oldix]; |
| // advance to prepare for next new partition boundary |
| newix++; |
| // reset number of active partitions in the current group |
| currentNumActiveParts = 0; |
| // remember where that previous group started |
| prevPartStart = oldix; |
| } |
| } // end if current partition is active |
| // advance to next old partition boundary |
| oldix++; |
| // advance to next partition |
| partNum++; |
| } // end while more partitions and more new boundaries to produce |
| |
| } |
| else // Uniform number of physical partitions grouping |
| { |
| // Determine an integer value i such that partitionCount_ / i is |
| // approximately equal to suggestedNewNumberOfPartitions. |
| Lng32 numPartsPerGroup = partitionCount_ / suggestedNewNumberOfPartitions; |
| Lng32 remainder = partitionCount_ % suggestedNewNumberOfPartitions; |
| |
| // If the remainder is not zero we will end up with an imbalanced new |
| // partitioning scheme. To reduce the imbalance, distribute the remainder |
| // partitions such that at most one of them goes into each new partition. |
| // The easiest way to do this is to give one extra old partition to the |
| // last <remainder> of the new partitions. |
| |
| // Remember that entries 0 and partitionCount_ stand for the min and |
| // max key and should be preserved. For the entries in between, take |
| // only every <numPartsPerGroup> one and make a new, contiguous array. |
| Lng32 oldix = numPartsPerGroup; |
| while (oldix < partitionCount_) |
| { |
| if (nodeMap->isMultiCluster(prevPartStart, oldix, FALSE)) |
| { |
| // More than one cluster in a group - undo. |
| boundaryValues_ = undoBoundaryValues; |
| binaryBoundaryValues_ = undoBinaryBoundaryValues; |
| return partitionCount_; |
| } |
| boundaryValues_[newix] = boundaryValues_[oldix]; |
| binaryBoundaryValues_[newix] = binaryBoundaryValues_[oldix]; |
| |
| if (newix + remainder == suggestedNewNumberOfPartitions) |
| numPartsPerGroup++; // rest of the new partitions get one extra |
| |
| prevPartStart = oldix; |
| oldix += numPartsPerGroup; |
| newix += 1; |
| } |
| |
| } // end if uniform number of physical partitions grouping |
| |
| // move the last entry for the max key |
| if (nodeMap->isMultiCluster(prevPartStart, partitionCount_, |
| useAPDistribution)) |
| { |
| // More than one cluster in a group - undo. |
| boundaryValues_ = undoBoundaryValues; |
| binaryBoundaryValues_ = undoBinaryBoundaryValues; |
| return partitionCount_; |
| } |
| boundaryValues_[newix] = boundaryValues_[partitionCount_]; |
| binaryBoundaryValues_[newix] = binaryBoundaryValues_[partitionCount_]; |
| |
| // Sanity check, we should now have exactly suggestedNewNumberOfPartitions+1 |
| // new entries in the boundaryValues_ array. |
| CMPASSERT(newix == suggestedNewNumberOfPartitions); |
| |
| // to be nice, make the rest of the array empty |
| for (newix = newix+1; newix <= partitionCount_; newix++) |
| { |
| boundaryValues_.remove(newix); |
| binaryBoundaryValues_.remove(newix); |
| } |
| |
| partitionCount_ = suggestedNewNumberOfPartitions; |
| |
| return partitionCount_; |
| } |
| |
| NABoolean RangePartitionBoundaries::isAGroupingOf( |
| const RangePartitionBoundaries &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| // Call the method that compares range partition boundaries, with |
| // the optional parameter that indicates if grouping is OK set to TRUE. |
| return compareRangePartitionBoundaries(other,TRUE,maxPartsPerGroup); |
| } // RangePartitionBoundaries::isAGroupingOf() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitionBoundaries::getBoundaryValues() |
| // ----------------------------------------------------------------------- |
| const ItemExprList* RangePartitionBoundaries::getBoundaryValues(Lng32 index) const |
| { |
| // It is legal to index entries in the range [0, partitionCount_]. |
| CMPASSERT( (index >= 0) AND (index <= partitionCount_) ); |
| return boundaryValues_[index]; |
| } // RangePartitionBoundaries::getBoundaryValues() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitionBoundaries::getBinaryBoundaryValue() |
| // ----------------------------------------------------------------------- |
| const char * RangePartitionBoundaries::getBinaryBoundaryValue( |
| Lng32 index) const |
| { |
| // It is legal to index entries in the range [0, partitionCount_]. |
| CMPASSERT( (index >= 0) AND (index <= partitionCount_) ); |
| return binaryBoundaryValues_[index]; |
| } // RangePartitionBoundaries::getBinaryBoundaryValue() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitionBoundaries::completePartitionBoundaries() |
| // ----------------------------------------------------------------------- |
| void RangePartitionBoundaries::completePartitionBoundaries( |
| const ValueIdList& partitioningKeyOrder, |
| Lng32 encodedBoundaryKeyLength) |
| { |
| // recognize from the fact that the encoded key length is |
| // not set yet that this method hasn't been called before |
| CMPASSERT(encodedBoundaryKeyLength_ == -1); |
| |
| NABoolean ascending_key_column = TRUE; |
| |
| ItemExprList *firstKey = new(CmpCommon::statementHeap()) |
| ItemExprList(CmpCommon::statementHeap()); |
| ItemExprList *lastKey = new(CmpCommon::statementHeap()) |
| ItemExprList(CmpCommon::statementHeap()); |
| |
| // set some data members that could not be set before |
| partKeyColumnCount_ = partitioningKeyOrder.entries(); |
| encodedBoundaryKeyLength_ = encodedBoundaryKeyLength; |
| |
| Lng32 i; |
| |
| // for each partitioning key column |
| for (i = 0; i < partKeyColumnCount_; i++) |
| { |
| // get the type of the partitioning key column |
| const NAType &prototypeType = partitioningKeyOrder[i].getType(); |
| |
| ItemExpr *minVal = new(CmpCommon::statementHeap()) |
| SystemLiteral(&prototypeType, |
| TRUE /*want min*/, |
| TRUE /*allow NULL*/); |
| ItemExpr *maxVal = new(CmpCommon::statementHeap()) |
| SystemLiteral(&prototypeType, |
| FALSE, |
| TRUE); |
| |
| ItemExpr * orderExpr = partitioningKeyOrder[i].getItemExpr(); |
| |
| // add the min/max values to the first and last key |
| if (orderExpr->getOperatorType() != ITM_INVERSE) |
| { |
| // Ascending key column, create min value for first key and |
| // max value for last key |
| ascending_key_column = TRUE; |
| firstKey->insert(minVal); |
| lastKey->insert(maxVal); |
| } |
| else |
| { |
| // Descending key column, create max value for first key and |
| // min value for last key |
| ascending_key_column = FALSE; |
| firstKey->insert(maxVal); |
| lastKey->insert(minVal); |
| } |
| |
| // Check if any first key values don't specify this partitioning |
| // key column. If so, add the MIN or the MAX value for that |
| // column as the missing first key value. |
| for (CollIndex j = 1; j < (CollIndex)partitionCount_; j++) |
| { |
| if (boundaryValues_[j]->entries() == (CollIndex)i) |
| { |
| // This first key does not have enough entries to have |
| // specified a value for this column. Add it. Add the |
| // MIN value if ascending, add the MAX value if descending. |
| // Need to cast away the const-ness to do this - sorry. |
| if (ascending_key_column) |
| ((ItemExprList*)boundaryValues_[j])->insert(minVal); |
| else |
| ((ItemExprList*)boundaryValues_[j])->insert(maxVal); |
| } |
| } |
| } // for each partitioning key column |
| |
| // finally, add the created begin and end key as the first key and |
| // after the last start key |
| boundaryValues_[0] = firstKey; |
| boundaryValues_[partitionCount_] = lastKey; |
| |
| // do we have any binary boundary values at all? |
| if (binaryBoundaryValues_.entries()) |
| { |
| if(!setBinaryBoundaryFirstLastKey_) |
| { |
| // yes, make entries for binary encoded keys with all zeroes and |
| // all ones for the first and last key, respectively |
| char * binaryMin = |
| new(heap_) char[encodedBoundaryKeyLength_]; |
| char * binaryMax = |
| new(heap_) char[encodedBoundaryKeyLength_]; |
| str_pad(binaryMin,encodedBoundaryKeyLength_,'\0'); |
| str_pad(binaryMax,encodedBoundaryKeyLength_,'\377'); |
| binaryBoundaryValues_.insertAt(0,binaryMin); |
| binaryBoundaryValues_.insertAt(partitionCount_,binaryMax); |
| setBinaryBoundaryFirstLastKey_ = TRUE; |
| } |
| } |
| else |
| { |
| // indicate that there are no binary key values (happens for SQL/MP) |
| encodedBoundaryKeyLength_ = 0; |
| } |
| checkConsistency(partitionCount_); |
| } // RangePartitionBoundaries::completePartitionBoundaries() |
| |
| |
| ItemExpr * getRangePartitionBoundaryValues |
| (const char * keyValueBuffer, |
| const Lng32 keyValueBufferSize, |
| NAMemory* heap, |
| CharInfo::CharSet strCharSet = CharInfo::UTF8 |
| ); |
| |
| void RangePartitionBoundaries::setupForStatement(NABoolean useStringVersion) |
| { |
| if(setupForStatement_) |
| return; |
| |
| if ( useStringVersion ) { |
| |
| for(UInt32 i=0; i < boundaryStringsList_.entries(); i++) |
| { |
| if (!boundaryStringsList_.used(i) || !boundaryStringsList_[i]) |
| { |
| boundaryValuesList_.insertAt(i,NULL); |
| } else { |
| boundaryValuesList_.insertAt(i, |
| getRangePartitionBoundaryValues |
| ( boundaryStringsList_[i]->data(), |
| boundaryStringsList_[i]->length(), |
| |
| // Warning: use stmt heap so that the returned itemExpr is NOT |
| // allocated on heap_, which can be the context heap. |
| // If the allocation is on the context heap, the memory |
| // will not be deleted (no way to delete a complicated itemExpr) |
| // during RangePartitionBoundaries::resetAfterStatement() call and |
| // we can fail the assertion test |
| // CMPASSERT(sizeAfterLastStatement_ <= initialSize_ ) |
| // in RangePartitionBoundaries::resetAfterStatement() |
| |
| CmpCommon::statementHeap() |
| )); |
| } |
| } |
| } |
| |
| for(UInt32 i=0; i < boundaryValuesList_.entries(); i++) |
| { |
| bindAddBoundaryValue(i); |
| } |
| |
| setupForStatement_ = TRUE; |
| resetAfterStatement_ = FALSE; |
| } |
| |
| void RangePartitionBoundaries::resetAfterStatement() |
| { |
| if(resetAfterStatement_) |
| return; |
| |
| boundaryValuesList_.clear(); |
| boundaryValues_.clear(); |
| partKeyColumnCount_ = origPartKeyColumnCount_; |
| partitionCount_ = origPartitionCount_; |
| setupForStatement_ = FALSE; |
| resetAfterStatement_ = TRUE; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging. |
| // ----------------------------------------------------------------------- |
| void RangePartitionBoundaries::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| BUMP_INDENT(indent); |
| char S[500]; |
| int index; |
| |
| fprintf(ofd,"%s %s\n",NEW_INDENT,title); |
| for (index = 0; index < partitionCount_; index++) |
| { |
| const ItemExprList* iel = boundaryValues_[index]; |
| snprintf(S, sizeof(S), "boundary[%d]: ", index); |
| if (iel) |
| iel->print(ofd, indent, S); |
| else |
| fprintf(ofd,"%s %s is empty\n",NEW_INDENT,S); |
| } |
| for (int index2 = 0; index2 < partitionCount_; index2++) |
| if (binaryBoundaryValues_.used(index2) && |
| binaryBoundaryValues_[index2]) |
| { |
| const char *binaryVal = binaryBoundaryValues_[index2]; |
| |
| fprintf(ofd, "binary boundary[%d]: 0x", // %#0*", |
| index2); |
| |
| for (int b=0; b<encodedBoundaryKeyLength_; b++) |
| fprintf(ofd, "%02hhx", binaryVal[b]); |
| fprintf(ofd, "\n"); |
| } |
| |
| fprintf(ofd,"%s %s (in binary form)\n",NEW_INDENT,title); |
| Lng32 keyLen = getEncodedBoundaryKeyLength(); |
| for (index = 0; index < partitionCount_; index++) |
| { |
| const char* bValues = getBinaryBoundaryValue(index); |
| for (Int32 j = 0; j < keyLen; j++) { |
| fprintf(ofd,"%04x ", (int)bValues[j]); |
| } |
| } |
| |
| } // RangePartitionBoundaries::print() |
| |
| // *********************************************************************** |
| // RangePartitioningFunction |
| // *********************************************************************** |
| |
| RangePartitioningFunction::RangePartitioningFunction( |
| const RangePartitioningFunction& other, |
| NAMemory* heap) |
| : PartitioningFunction(other, heap), |
| keyColumnList_(other.keyColumnList_), |
| orderOfKeyValues_(other.orderOfKeyValues_), |
| originalKeyColumnList_(other.originalKeyColumnList_), |
| setupForStatement_(FALSE), |
| resetAfterStatement_(FALSE) |
| { |
| partitionBoundaries_ = new(heap) |
| RangePartitionBoundaries(*(other.partitionBoundaries_),heap); |
| } |
| |
| RangePartitioningFunction::~RangePartitioningFunction() {} |
| |
| Lng32 RangePartitioningFunction::getCountOfPartitions() const |
| { return partitionBoundaries_->getCountOfPartitions(); } |
| |
| const RangePartitioningFunction* |
| RangePartitioningFunction::castToRangePartitioningFunction() const |
| { return this; } |
| |
| PartitioningRequirement* |
| RangePartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireRange(this); |
| } |
| |
| PartitioningFunction* |
| RangePartitioningFunction::copy() const |
| { return new(CmpCommon::statementHeap()) RangePartitioningFunction(*this); } |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::normalizePartitioningKeys() |
| // Rewrite the partitioning keys of the partitioning function in |
| // terms of the VEGReference for the VEG to which the partitioning |
| // key column belongs. |
| // ----------------------------------------------------------------------- |
| void RangePartitioningFunction::normalizePartitioningKeys(NormWA& normWARef) |
| { |
| PartitioningFunction::normalizePartitioningKeys(normWARef); |
| keyColumnList_.normalizeNode(normWARef); |
| orderOfKeyValues_.normalizeNode(normWARef); |
| // don't normalize original key col list, avoid VEGies which could |
| // cause data type changes. |
| } // RangePartitioningFunction::normalizePartitioningKeys |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::comparePartFuncToFunc() |
| // ----------------------------------------------------------------------- |
| COMPARE_RESULT |
| RangePartitioningFunction::comparePartFuncToFunc |
| (const PartitioningFunction &other) const |
| { |
| // Make use of the implementation of isAGroupingOf. According to the |
| // definition of a grouping, two partitioning functions must be the |
| // same if one is a grouping of the other and if they have the same |
| // number of partitions. This is not a very straightforward way to |
| // compare two range partitioning functions but it avoids |
| // duplication of code. |
| |
| if (isAGroupingOf(other) AND |
| getCountOfPartitions() == other.getCountOfPartitions()) |
| return SAME; |
| else |
| return INCOMPATIBLE; |
| |
| } // RangePartitioningFunction::comparePartFuncToFunc() |
| |
| NABoolean RangePartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| const RangePartitioningFunction *oth = |
| other.castToRangePartitioningFunction(); |
| |
| // a range partitioning function can only be a grouping of |
| // another range partitioning function |
| if (oth == NULL) |
| return FALSE; |
| |
| // to be a grouping of the other function, this function has to be |
| // partitioned on the same columns in the same sequence and order |
| // (a prefix might work in some cases which we don't recognize |
| // at this point, sorry) |
| if (orderOfKeyValues_.entries() != oth->orderOfKeyValues_.entries()) |
| return FALSE; |
| |
| // compare the key columns and their order |
| for (CollIndex i = 0; i < orderOfKeyValues_.entries(); i++) |
| { |
| if (orderOfKeyValues_[i] != oth->orderOfKeyValues_[i]) |
| { |
| // value ids are different, but this may just be that we |
| // have different inversion expressions |
| if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() != |
| ITM_INVERSE OR |
| oth->orderOfKeyValues_[i].getItemExpr()->getOperatorType() != |
| ITM_INVERSE OR |
| orderOfKeyValues_[i].getItemExpr()->child(0) != |
| oth->orderOfKeyValues_[i].getItemExpr()->child(0)) |
| return FALSE; // really different columns |
| } |
| } |
| |
| // ----------------------------------------------------------------- |
| // Compare the partition boundaries |
| // ----------------------------------------------------------------- |
| return partitionBoundaries_->isAGroupingOf(*oth->partitionBoundaries_, |
| maxPartsPerGroup); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::createPartitioningKeyPredicates() |
| // ----------------------------------------------------------------------- |
| void RangePartitioningFunction::createPartitioningKeyPredicates() |
| { |
| if (NOT partKeyPredsCreated()) |
| { |
| char fabricatedName[20]; // a name for the host variable |
| char* S = fabricatedName; |
| |
| CollIndex nCols = keyColumnList_.entries(); |
| ValueIdSet setOfKeyPredicates; |
| ValueIdList partInputValues(2*nCols+1); |
| ValueIdList loValues; |
| ValueIdList hiValues; |
| |
| // ----------------------------------------------------------------- |
| // create partition input variables: one low value for each range |
| // key column, one hi value for each range key column, and one |
| // inclusion indicator column |
| // ----------------------------------------------------------------- |
| |
| for (CollIndex beginEnd = 0; beginEnd < 2; beginEnd++) |
| for (CollIndex i = 0; i < nCols; i++) |
| { |
| ValueId vid = originalKeyColumnList_[i]; |
| |
| // -- Fabricate a name for the first host variable |
| if (beginEnd == 0) |
| sprintf(S,"_sys_HostVarLo%d",i); |
| else |
| sprintf(S,"_sys_HostVarHi%d",i); |
| |
| // -- Build a host var node with the key column's type |
| HostVar *hv = new(CmpCommon::statementHeap()) |
| HostVar(S, &(vid.getType()), TRUE); |
| hv->synthTypeAndValueId(); |
| |
| // insert the partition input variable |
| partInputValues.insert(hv->getValueId()); |
| if (beginEnd == 0) |
| loValues.insert(hv->getValueId()); |
| else |
| hiValues.insert(hv->getValueId()); |
| } |
| |
| // The inclusion indicator is a 4-byte integer, no NULLs allowed. |
| // At execution time it is set to a non-zero value if the end |
| // key is excluded and to a zero value if it is included. |
| ItemExpr *intervalExclusionIndicator = |
| new(CmpCommon::statementHeap()) HostVar( |
| "_sys_hostVarExclRange", |
| new(CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE), |
| TRUE); |
| intervalExclusionIndicator->synthTypeAndValueId(); |
| partInputValues.insert(intervalExclusionIndicator->getValueId()); |
| |
| // ----------------------------------------------------------------- |
| // Create the semantic equivalent of multi-valued predicates, taking |
| // the order of keys into account (which can't be expressed in |
| // "normal" multi-valued predicates). Set the "special nulls" |
| // property for all comparison operators that involve nullable |
| // columns, since we do not want to filter out any NULL values. |
| // ----------------------------------------------------------------- |
| |
| // ----------------------------------------------------------------- |
| // a GREATER_EQ multi-valued predicate for the begin key |
| // ----------------------------------------------------------------- |
| CollIndex i = keyColumnList_.entries()-1; |
| CollHeap *h = CmpCommon::statementHeap(); |
| |
| // the comparison operator for the current column, taking the |
| // order of the column into account |
| OperatorTypeEnum compOp,extraOp; |
| ItemExpr *kc = keyColumnList_[i].getItemExpr(); |
| NABoolean nullable = |
| keyColumnList_[i].getType().supportsSQLnullLogical(); |
| |
| // start with a greater or equal comparison for the last key |
| // column (may be the only key column), use less equal if the |
| // column is descending |
| if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() == |
| ITM_INVERSE) |
| { |
| compOp = ITM_LESS_EQ; |
| extraOp = ITM_GREATER_EQ; |
| } |
| else |
| { |
| compOp = ITM_GREATER_EQ; |
| extraOp = ITM_LESS_EQ; |
| } |
| |
| ItemExpr * beginPred = new(h) BiRelat(compOp, |
| kc, |
| loValues[i].getItemExpr(), |
| nullable, |
| TRUE); |
| |
| |
| // There are now two end predicates generated. The tri-relational predicate |
| // is the one that enforces the inclusive comparison for the last partition, |
| // and only exclusive comparison for the other partitions. Thus this |
| // accurately models way partitions are setup. |
| |
| // The extraEndPred has been added (HL 6/20/2001) to form a convenient key predicate |
| // for access by a B-Tree index. Although this is always inclusive, and thus |
| // will select extra data (one extra uec) for all partitions except the last, |
| // the extra data will be filtered out by the tri-relational operator. |
| |
| |
| ItemExpr * extraEndPred = new(h) BiRelat(extraOp, |
| kc, |
| hiValues[i].getItemExpr(), |
| nullable, |
| TRUE); |
| |
| |
| // for all other key columns, add (ai > bi) or (ai = bi) and (tfm) |
| while (i--) |
| { |
| if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() == |
| ITM_INVERSE) |
| { |
| compOp = ITM_LESS; |
| extraOp = ITM_GREATER; |
| } |
| else |
| { |
| compOp = ITM_GREATER; |
| extraOp = ITM_LESS; |
| } |
| kc = keyColumnList_[i].getItemExpr(); |
| nullable = |
| keyColumnList_[i].getType().supportsSQLnullLogical(); |
| |
| beginPred = new(h) BiLogic( |
| ITM_OR, |
| new(h) BiRelat(compOp, |
| kc, |
| loValues[i].getItemExpr(), |
| nullable, |
| TRUE), |
| new(h) BiLogic(ITM_AND, |
| new(h) BiRelat( |
| ITM_EQUAL, |
| kc, |
| loValues[i].getItemExpr(), |
| nullable, |
| TRUE), |
| beginPred)); |
| |
| |
| extraEndPred = new(h) BiLogic( |
| ITM_OR, |
| new(h) BiRelat(extraOp, |
| kc, |
| hiValues[i].getItemExpr(), |
| nullable, |
| TRUE), |
| new(h) BiLogic(ITM_AND, |
| new(h) BiRelat( |
| ITM_EQUAL, |
| kc, |
| hiValues[i].getItemExpr(), |
| nullable, |
| TRUE), |
| extraEndPred)); |
| } |
| |
| // ----------------------------------------------------------------- |
| // a less or less-equal predicate for the end key |
| // ----------------------------------------------------------------- |
| |
| // reset to the end of the key column list |
| i = keyColumnList_.entries()-1; |
| |
| // Start with a tri-relational comparison for the last key |
| // column (may be the only key column), it's a LESS comparison |
| // if the exclusion indicator is not zero, or LESS_EQUAL otherwise. |
| // Note: replace "less" with "greater" for descending columns. |
| if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() == |
| ITM_INVERSE) |
| compOp = ITM_GREATER_OR_GE; |
| else |
| compOp = ITM_LESS_OR_LE; |
| |
| kc = keyColumnList_[i].getItemExpr(); |
| nullable = |
| keyColumnList_[i].getType().supportsSQLnullLogical(); |
| |
| TriRelational * tr = new(h) TriRelational( |
| compOp, |
| kc, |
| hiValues[i].getItemExpr(), |
| new(h) BiRelat(ITM_NOT_EQUAL, |
| new(h) SystemLiteral(0), |
| intervalExclusionIndicator)); |
| tr->setSpecialNulls(nullable); |
| ItemExpr *endPred = tr; |
| |
| // for all other key columns, add (ai < bi) or (ai = bi) and (tfm) |
| while (i--) |
| { |
| if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() == |
| ITM_INVERSE) |
| compOp = ITM_GREATER; |
| else |
| compOp = ITM_LESS; |
| |
| kc = keyColumnList_[i].getItemExpr(); |
| nullable = |
| keyColumnList_[i].getType().supportsSQLnullLogical(); |
| |
| endPred = new(h) BiLogic( |
| ITM_OR, |
| new(h) BiRelat(compOp, |
| kc, |
| hiValues[i].getItemExpr(), |
| nullable), |
| new(h) BiLogic(ITM_AND, |
| new(h) BiRelat( |
| ITM_EQUAL, |
| kc, |
| hiValues[i].getItemExpr(), |
| nullable), |
| endPred)); |
| } |
| |
| |
| endPred->synthTypeAndValueId(); |
| beginPred->synthTypeAndValueId(); |
| extraEndPred->synthTypeAndValueId(); |
| |
| setOfKeyPredicates += beginPred->getValueId(); |
| setOfKeyPredicates += endPred->getValueId(); |
| setOfKeyPredicates += extraEndPred->getValueId(); |
| |
| // -- Store the set of key predicates and the partition input values |
| // in the partitioning attributes. |
| storePartitioningKeyPredicates(setOfKeyPredicates); |
| storePartitionInputValues(partInputValues); |
| } |
| } // RangePartitioningFunction::createPartitioningKeyPredicates() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::replacePivs() |
| // ----------------------------------------------------------------------- |
| void RangePartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Overwrite the old pivs, part key preds, and part expr. with the new ones. |
| storePartitionInputValues(newPivs); |
| storePartitioningKeyPredicates(newPartKeyPreds); |
| } // RangePartitioningFunction::replacePivs() |
| |
| PartitioningFunction * RangePartitioningFunction::scaleNumberOfPartitions( |
| Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| // the power of delegation |
| suggestedNewNumberOfPartitions = |
| partitionBoundaries_->scaleNumberOfPartitions( |
| suggestedNewNumberOfPartitions, |
| getNodeMap(), |
| partGroupDist); |
| |
| return this; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::remapIt() |
| // ----------------------------------------------------------------------- |
| void RangePartitioningFunction::remapIt |
| (const PartitioningFunction* opf, |
| ValueIdMap& map, NABoolean mapItUp) |
| { |
| PartitioningFunction::remapIt(opf, map,mapItUp); |
| |
| // If we have arrived here, the original partitioning function (*opf) |
| // MUST be a RangePartitioningFunction(). |
| CMPASSERT(opf->castToRangePartitioningFunction()); |
| |
| // Clear because rewrite insists on it being so. |
| keyColumnList_.clear(); |
| |
| if (mapItUp) |
| { |
| map.rewriteValueIdListUp(keyColumnList_, |
| opf->castToRangePartitioningFunction()-> |
| keyColumnList_); |
| } |
| else |
| { |
| map.rewriteValueIdListDown(opf->castToRangePartitioningFunction()-> |
| keyColumnList_, |
| keyColumnList_); |
| } |
| |
| // Clear because rewrite insists on it being so. |
| orderOfKeyValues_.clear(); |
| |
| if (mapItUp) |
| { |
| map.rewriteValueIdListUp(orderOfKeyValues_, |
| opf->castToRangePartitioningFunction()-> |
| orderOfKeyValues_); |
| } |
| else |
| { |
| map.rewriteValueIdListDown(opf->castToRangePartitioningFunction()-> |
| orderOfKeyValues_, |
| orderOfKeyValues_); |
| } |
| |
| // do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL |
| |
| } // RangePartitioningFunction::remapIt() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::createPartitioningFunctionForIndexDesc() |
| // ----------------------------------------------------------------------- |
| |
| PartitioningFunction* |
| RangePartitioningFunction::createPartitioningFunctionForIndexDesc |
| (IndexDesc *idesc) const |
| { |
| const NAFileSet * fileSet = idesc->getNAFileSet(); |
| const NAColumnArray & allColumns = fileSet->getAllColumns(); |
| const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns(); |
| |
| Lng32 ixColNumber; |
| ValueId keyValueId; |
| ValueIdSet partitioningKey; |
| ValueIdList partitioningKeyList; |
| ValueIdList orderOfPartKeyValues; |
| RangePartitionBoundaries * partBounds; |
| |
| CollIndex i = 0; |
| for (i = 0; i < partKeyColumns.entries(); i++) |
| { |
| // which column of the index is this (usually this will be == i) |
| ixColNumber = allColumns.index(partKeyColumns[i]); |
| |
| // insert the value id of the index column into the partitioning |
| // key column value id list |
| keyValueId = idesc->getIndexColumns()[ixColNumber]; |
| partitioningKey += keyValueId; |
| partitioningKeyList.insertAt(i,keyValueId); |
| |
| // insert the same value id into the order list, if the column |
| // is in ascending order, otherwise insert the inverse of the |
| // column |
| if (partKeyColumns.isAscending(i)) |
| { |
| orderOfPartKeyValues.insert(keyValueId); |
| } |
| else |
| { |
| InverseOrder *invExpr = new(idesc->wHeap()) |
| InverseOrder(keyValueId.getItemExpr()); |
| invExpr->synthTypeAndValueId(); |
| orderOfPartKeyValues.insert(invExpr->getValueId()); |
| } |
| } // end loop over partitioning key columns |
| |
| // ----------------------------------------------------------------- |
| // Allocate new range partition boundaries. |
| // ----------------------------------------------------------------- |
| partBounds = new(idesc->wHeap()) RangePartitionBoundaries |
| (*getRangePartitionBoundaries(), |
| idesc->wHeap()); |
| |
| // --------------------------------------------------------------------- |
| // Determine the minimum number of partitioning keys based on the |
| // start key values that are specified. Columns for which no explicit |
| // start key values are specified need not be part of the part key. |
| // Adjust our bookkeeping if necessary. Remove this logic if it can |
| // be moved to CATMAN. |
| // --------------------------------------------------------------------- |
| CollIndex numPartKeyCols = partBounds->getOptimizedNumberOfPartKeys(); |
| if (partitioningKeyList.entries() > numPartKeyCols) |
| { |
| while (partitioningKeyList.entries() > numPartKeyCols) |
| { |
| partitioningKeyList.removeAt(numPartKeyCols); |
| orderOfPartKeyValues.removeAt(numPartKeyCols); |
| } |
| partitioningKey.clear(); |
| partitioningKey.insertList(partitioningKeyList); |
| } |
| |
| // ----------------------------------------------------------------- |
| // now compute the encoded key length of all the boundary key columns |
| // (there are no fillers between the encoded key values, so it's ok |
| // to just add the encoded lengths of the individual columns) |
| // ----------------------------------------------------------------- |
| Lng32 encodedBoundaryKeyLength = 0; |
| for (i = 0; i < orderOfPartKeyValues.entries(); i++) |
| { |
| encodedBoundaryKeyLength += |
| partitioningKeyList[i].getType().getEncodedKeyLength(); |
| } |
| |
| // ----------------------------------------------------------------- |
| // Add the first and the last boundary (0 and numberOfPartitions) |
| // at the ends that do not separate two partitions |
| // ----------------------------------------------------------------- |
| partBounds->completePartitionBoundaries( |
| orderOfPartKeyValues, |
| encodedBoundaryKeyLength); |
| |
| // ----------------------------------------------------------------- |
| // Allocate a new RangePartitioningFunction. |
| // ----------------------------------------------------------------- |
| RangePartitioningFunction *partFunc |
| = new(idesc->wHeap()) RangePartitioningFunction |
| (partitioningKey, |
| partitioningKeyList, |
| orderOfPartKeyValues, |
| partBounds, |
| getNodeMap()->copy(idesc->wHeap())); |
| partFunc->createPartitioningKeyPredicates(); |
| |
| return partFunc; |
| |
| } // RangePartitioningFunction::createPartitioningFunctionForIndexDesc() |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::createPartitioningExpression() |
| // ----------------------------------------------------------------------- |
| ItemExpr* RangePartitioningFunction::createPartitioningExpression() |
| { |
| if (getExpression()) // already constructed? |
| return getExpression(); // reuse it! |
| |
| // create a lookup function that takes the partitioning key and looks |
| // up its partition number in the array of split ranges provided by |
| // this partitioning function. |
| |
| |
| // first, encode all key values (for asc or desc order) and concatenate |
| // the encoded strings |
| ItemExpr *encKey = NULL; |
| |
| for (CollIndex i=0; i < keyColumnList_.entries(); i++) |
| { |
| NABoolean descOrder = |
| (orderOfKeyValues_[i].getItemExpr()->getOperatorType() == ITM_INVERSE); |
| ItemExpr *dataConversionErrorFlag = getConvErrorExpr(); |
| if (dataConversionErrorFlag == 0) |
| { |
| dataConversionErrorFlag = |
| new (CmpCommon::statementHeap()) HostVar( |
| "_sys_repartConvErrorFlg", |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE), |
| TRUE); |
| storeConvErrorExpr(dataConversionErrorFlag); |
| } |
| |
| // cast the key column to the exact type of the original key column |
| const NAType &oType = originalKeyColumnList_[i].getType(); |
| ItemExpr *c = new (CmpCommon::statementHeap()) Narrow( |
| keyColumnList_[i].getItemExpr(), |
| dataConversionErrorFlag, |
| &oType); |
| // form the key encoding of the key column (a character string) |
| ItemExpr *e = new (CmpCommon::statementHeap()) CompEncode (c,descOrder); |
| |
| // concatenate the individual key encodings of the key columns |
| if (encKey == NULL) |
| encKey = e; |
| else |
| encKey = new (CmpCommon::statementHeap()) Concat(encKey,e); |
| } |
| |
| // give the concatenated key encodings as an input to the range lookup function |
| ItemExpr * partFunc = new (CmpCommon::statementHeap()) RangeLookup( |
| encKey,this); |
| |
| partFunc->synthTypeAndValueId(); |
| storeExpression(partFunc); |
| return partFunc; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::shouldUseSynchronousAccess() |
| // ----------------------------------------------------------------------- |
| NABoolean RangePartitioningFunction::shouldUseSynchronousAccess( |
| const ReqdPhysicalProperty* rpp, |
| const EstLogPropSharedPtr& inputLogProp, |
| GroupAttributes* ga) const |
| { |
| NABoolean shouldUseSynchronousAccess = FALSE; |
| NABoolean synchronousAccessForced = FALSE; |
| NABoolean tooManyPAs = FALSE; |
| ValueIdList partKey = getOrderOfKeyValues(); |
| // Remove from the partitioning key any columns that are covered by |
| // constants or input values. |
| partKey.removeCoveredExprs(ga->getCharacteristicInputs()); |
| |
| |
| const LogicalPartitioningRequirement *lpr = |
| rpp->getLogicalPartRequirement(); |
| |
| // Don't do synchronous access if the user is forcing a PAPA node |
| // and is not forcing the number of PAs |
| if ((lpr != NULL) AND lpr->getMustUsePapa() AND |
| (lpr->getNumClientsReq() == ANY_NUMBER_OF_PARTITIONS)) |
| return FALSE; |
| |
| // See if the user is trying to force synchronous access. |
| if ((CmpCommon::getDefault(ATTEMPT_ASYNCHRONOUS_ACCESS) == DF_OFF) OR |
| ((lpr != NULL) AND |
| (lpr->getNumClientsReq() != ANY_NUMBER_OF_PARTITIONS) AND |
| (lpr->getNumClientsReq() < getCountOfPartitions()))) |
| { |
| shouldUseSynchronousAccess = TRUE; |
| synchronousAccessForced = TRUE; |
| } |
| |
| // Synchronous access is a good idea and is ok if there is a required order |
| // and/or arrangement that came from an operator that is not in DP2, |
| // and the required order and/or arrangement columns are a leading |
| // prefix of the partitioning key columns, and enough data is being |
| // returned that the PA buffers will fill up and so the PA operators |
| // will block until all data from all preceding partitions (in the |
| // required order) has been returned. Note that whether the required |
| // order or arrangement is a leading prefix of the clustering key |
| // columns will be checked later (by the satisfied method). |
| // NOTE: we cannot force synchronous access if there is a required |
| // logical order and/or arrangement! If the code below decides that |
| // synchronous access is not possible, and the user is trying to |
| // force synchronous access, then they are out of luck! |
| |
| if (rpp->getLogicalOrderOrArrangementFlag()) |
| { |
| if ((rpp->getSortKey() != NULL) AND |
| (rpp->getArrangedCols() != NULL)) |
| { |
| // We have both a required order and an arrangement. |
| if ((partKey.satisfiesReqdOrder(*rpp->getSortKey()) == SAME_ORDER) AND |
| partKey.satisfiesReqdArrangement(*rpp->getArrangedCols())) |
| shouldUseSynchronousAccess = TRUE; |
| else |
| shouldUseSynchronousAccess = FALSE; |
| } |
| else if (rpp->getSortKey() != NULL) |
| { |
| // Only a required order. |
| OrderComparison oc = partKey.satisfiesReqdOrder(*rpp->getSortKey()); |
| NABoolean okToUseSerialOrder = FALSE; |
| |
| if (CmpCommon::getDefault(ATTEMPT_REVERSE_SYNCHRONOUS_ORDER) != DF_ON) |
| { |
| okToUseSerialOrder = (oc == SAME_ORDER); |
| } |
| else |
| { |
| okToUseSerialOrder = (oc == SAME_ORDER || oc == INVERSE_ORDER); |
| } |
| |
| if (okToUseSerialOrder) |
| shouldUseSynchronousAccess = TRUE; |
| else |
| shouldUseSynchronousAccess = FALSE; |
| } |
| else if (rpp->getArrangedCols() != NULL) |
| { |
| // Only a required arrangement. |
| if (partKey.satisfiesReqdArrangement(*rpp->getArrangedCols())) |
| shouldUseSynchronousAccess = TRUE; |
| else |
| shouldUseSynchronousAccess = FALSE; |
| } |
| } |
| |
| if (shouldUseSynchronousAccess AND NOT synchronousAccessForced) |
| { |
| // Don't do synchronous access if there is a dp2 sort order |
| // partitioning requirement, as this means we want a DP2 sort |
| // order, and the main reason for requiring a DP2 sort order |
| // is to avoid having to access the data synchronously. |
| if (rpp->getDp2SortOrderPartReq() != NULL) |
| shouldUseSynchronousAccess = FALSE; |
| } |
| |
| if (shouldUseSynchronousAccess AND NOT synchronousAccessForced) |
| { |
| // There was a required order or arrangement that could potentially |
| // need us to access the data synchronously. If synch. access is not |
| // forced, see if it is a good idea or not. |
| |
| // get the maximum number of PAs per process that can be allowed. |
| Int32 maxPAsPerProcess = |
| (Int32) CmpCommon::getDefaultNumeric(MAX_ACCESS_NODES_PER_ESP); |
| |
| if (getCountOfPartitions() > maxPAsPerProcess) |
| { |
| // Get the logical part requirement, if one exists. |
| PartitioningRequirement *logPartReq = NULL; |
| PartitioningFunction *requiredPartFunc = NULL; |
| if (lpr != NULL) |
| { |
| logPartReq = lpr->getLogReq(); |
| if (logPartReq AND logPartReq-> |
| castToFullySpecifiedPartitioningRequirement()) |
| requiredPartFunc = logPartReq-> |
| castToFullySpecifiedPartitioningRequirement()-> |
| getPartitioningFunction(); |
| } |
| |
| // Determine if we might exceed the maximum number of PAs allowed |
| // to a process. If we will, then we definitely want to use |
| // synchronous access if there is an order to preserve. |
| // The first check is the most restrictive - this is for logical |
| // subpartitioning. If we do logical subpartitioning, it is possible |
| // one process will have to access all the partitions, and there |
| // is nothing we can do about it later. |
| if (logPartReq != NULL AND |
| requiredPartFunc AND |
| requiredPartFunc->castToLogPhysPartitioningFunction() AND |
| requiredPartFunc->castToLogPhysPartitioningFunction()-> |
| getLogPartType() == |
| LogPhysPartitioningFunction::LOGICAL_SUBPARTITIONING) |
| tooManyPAs = TRUE; |
| else if ((logPartReq != NULL) AND |
| (getCountOfPartitions() > |
| (logPartReq->getCountOfPartitions() * maxPAsPerProcess))) |
| tooManyPAs = TRUE; |
| else if ((getCountOfPartitions() > |
| (rpp->getCountOfPipelines() * maxPAsPerProcess))) |
| tooManyPAs = TRUE; |
| } |
| |
| if (NOT tooManyPAs) |
| { |
| // Now check to see how much data will be returned. If it is less |
| // than twice the amount of data that it takes to fill up all |
| // the PA buffers, than we will be able to access the majority |
| // of the data asynchronously before the buffers fill up. So, |
| // in this case assume that it is not a good idea to access the |
| // data synchronously. Otherwise, synchronous access is a good |
| // idea (if it passed all the other checks) because we will be |
| // accessing the majority of the data after the PA buffers fill up |
| // and block, and so we will end up accessing the data synchronously |
| // anyway, so we would rather not pay for all those PA buffers. |
| |
| // Compute size of data returned in KB. Allocate 16 bytes per |
| // record for overhead. |
| CostScalar totalsize = |
| (ga->outputLogProp(inputLogProp)->getResultCardinality() * |
| (ga->getRecordLength() + 16.0)); |
| |
| // Compute the size of all the PA buffers. |
| double buffersize = CmpCommon::getDefaultNumeric(GEN_PA_BUFFER_SIZE); |
| // A PA buffer can not be larger than 56K. If the table has remote |
| // partitions, then the actual limit is 31K, so we really should |
| // check for remote partitions. That would require the index |
| // descriptor, which we don't have. |
| buffersize = MINOF (buffersize, 56000); |
| // number of PA output buffers is always at least one less than the |
| // total number of pa buffers, as at least one is reserved for input. |
| double numOutputBuffers = |
| MAXOF(CmpCommon::getDefaultNumeric(GEN_PA_NUM_BUFFERS) - 1,1); |
| // Subtract 2000 bytes per buffer for headers. |
| buffersize = (buffersize * numOutputBuffers) - |
| (2000 * numOutputBuffers); |
| |
| if (totalsize < (getCountOfPartitions() * 2.0 * buffersize)) |
| shouldUseSynchronousAccess = FALSE; |
| } |
| } // end if order or arrangement that could be preserved with synch. access |
| |
| return shouldUseSynchronousAccess; |
| } |
| |
| SearchKey * |
| RangePartitioningFunction::createSearchKey(const IndexDesc *indexDesc, |
| ValueIdSet availInputs, |
| ValueIdSet additionalPreds) const |
| { |
| ValueIdSet preds(getPartitioningKeyPredicates()); |
| |
| ValueIdSet nonKeyColumnSet; // empty set |
| |
| availInputs += getPartitionInputValues(); |
| preds += additionalPreds; |
| |
| // make a search key from all that |
| SearchKey *partSearchKey = new (CmpCommon::statementHeap()) |
| SearchKey( |
| indexDesc->getPartitioningKey(), |
| indexDesc->getOrderOfPartitioningKeyValues(), |
| availInputs, |
| TRUE, // there isn't really a scan direction for partKey |
| preds, |
| this, |
| nonKeyColumnSet, |
| indexDesc); |
| |
| return partSearchKey; |
| } |
| |
| ///////////////////////////////////////////////////////////// |
| // Test condition C2.2 (co-partitioned), which is defined as: |
| // 1. All tables share the same number of partitioned key columns; |
| // 2. Corresponding partition key columns across all tables are |
| // of same type; |
| // 3. All tables should be partitioned into the same number of ranges; |
| // 4. Corresponding partitions are bounded by identical upper and |
| // lower boundaries and reside on the same DP2. |
| ///////////////////////////////////////////////////////////// |
| NABoolean |
| RangePartitioningFunction::partFuncAndFuncPushDownCompatible( |
| const PartitioningFunction& x) const |
| { |
| const RangePartitioningFunction* other = |
| x.castToRangePartitioningFunction(); |
| |
| if ( other == NULL ) return FALSE; |
| |
| Lng32 thisKeyCount = getPartitioningKey().entries(); |
| Lng32 otherKeyCount = other->getPartitioningKey().entries(); |
| |
| // same key count |
| if (thisKeyCount != otherKeyCount) |
| return FALSE; |
| |
| Lng32 thisPartCount = getCountOfPartitions(); |
| Lng32 otherPartCount = other->getCountOfPartitions(); |
| |
| // same part count |
| if (thisPartCount != otherPartCount) |
| return FALSE; |
| |
| // same part column type |
| for (Lng32 index = 0; index < thisKeyCount; index++) |
| { |
| if ( NOT getKeyColumnList()[index].getType().isCompatible |
| ( |
| other->getKeyColumnList()[index].getType() |
| ) |
| ) |
| return FALSE; |
| } |
| |
| // same boundaries |
| if ( getRangePartitionBoundaries()-> |
| compareRangePartitionBoundaries |
| (*(other->getRangePartitionBoundaries())) == FALSE |
| ) |
| return FALSE; |
| |
| return TRUE; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // RangePartitioningFunction::preCodeGen() |
| // Rewrite the partitioning keys of the partitioning function that |
| // are expressed using VEGReferences in terms of the available values. |
| // ----------------------------------------------------------------------- |
| void RangePartitioningFunction::preCodeGen(const ValueIdSet& availableValues) |
| { |
| ValueIdSet noExternalInputs; |
| PartitioningFunction::preCodeGen(availableValues); |
| keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs, |
| FALSE, NULL, TRUE); |
| } // RangePartitioningFunction::preCodeGen() |
| |
| void RangePartitioningFunction::setupForStatement() |
| { |
| if(setupForStatement_) |
| return; |
| |
| PartitioningFunction::setupForStatement(); |
| |
| if(partitionBoundaries_) |
| partitionBoundaries_->setupForStatement(TRUE /* use string */); |
| |
| setupForStatement_ = TRUE; |
| resetAfterStatement_ = FALSE; |
| } |
| |
| void RangePartitioningFunction::resetAfterStatement() |
| { |
| if(resetAfterStatement_) |
| return; |
| |
| PartitioningFunction::resetAfterStatement(); |
| keyColumnList_.clear(); |
| orderOfKeyValues_.clear(); |
| originalKeyColumnList_.clear(); |
| |
| if(partitionBoundaries_) |
| partitionBoundaries_->resetAfterStatement(); |
| |
| setupForStatement_ = FALSE; |
| resetAfterStatement_ = TRUE; |
| } |
| // ----------------------------------------------------------------------- |
| // Methods for debugging. |
| // ----------------------------------------------------------------------- |
| const NAString RangePartitioningFunction::getText() const |
| { |
| NAString result("range partitioned ", CmpCommon::statementHeap()); |
| char nparts[20]; |
| |
| sprintf(nparts,"%d",getCountOfPartitions()); |
| result += nparts; |
| result += " ways on ("; |
| orderOfKeyValues_.unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT); |
| result += ")"; |
| |
| if ( partitionBoundaries_ ) { |
| |
| result += " with boundaries("; |
| |
| for (Int32 index = 0; index < partitionBoundaries_->getCountOfPartitions(); index++) |
| { |
| const ItemExprList* iel = partitionBoundaries_->getBoundaryValues(index); |
| |
| for (Int32 j=0; j<iel->entries(); j++) { |
| |
| ItemExpr* ie = (*iel)[j]; |
| if ( ie->getOperatorType() == ITM_CONSTANT ) { |
| ConstValue* cv = (ConstValue*)ie; |
| |
| result += "c("; |
| result += cv->getTextForQuery(QUERY_FORMAT); |
| result += ")"; |
| } else { |
| result += "nc("; |
| result += ie->getText(); |
| result += ")"; |
| } |
| |
| result += " "; |
| } |
| |
| if ( index < partitionBoundaries_->getCountOfPartitions()-1 ) |
| result += ";"; |
| } |
| |
| result += ")"; |
| /* enable this for debugging of binary key problems |
| result += " binary ("; |
| Lng32 encodedBoundaryKeyLength = partitionBoundaries_->getEncodedBoundaryKeyLength(); |
| char hexDigits[4]; |
| |
| for (Int32 index2 = 0; index2 < partitionBoundaries_->getCountOfPartitions(); index2++) |
| { |
| const char * binaryVal = partitionBoundaries_->getBinaryBoundaryValue(index2); |
| |
| if (binaryVal) |
| { |
| if (index2 > 0) |
| result += ", "; |
| result += "b(0x"; |
| for (int b=0; b<encodedBoundaryKeyLength; b++) |
| { |
| snprintf(hexDigits, sizeof(hexDigits), "%02hhx", binaryVal[b]); |
| result += hexDigits; |
| } |
| result += ")"; |
| } |
| } |
| result += ")"; |
| end of code for binary keys */ |
| } |
| |
| return result; |
| } |
| |
| void RangePartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, "RangePartitioningFunction"); |
| |
| if (orderOfKeyValues_.entries() > 0) |
| { |
| fprintf(ofd,"Ascending/descending order :\n"); |
| for (CollIndex index = 0; index < orderOfKeyValues_.entries(); index++) |
| { |
| if (orderOfKeyValues_[index].getItemExpr()->getOperatorType() |
| == ITM_INVERSE) |
| fprintf(ofd,"D "); |
| else |
| fprintf(ofd,"A "); |
| if (((index/9)* 9 == index) OR (index == orderOfKeyValues_.entries()-1)) |
| fprintf(ofd,"\n"); |
| } |
| } |
| |
| partitionBoundaries_->print(ofd, indent, title); |
| |
| } // RangePartitioningFunction::print() |
| |
| |
| NABoolean |
| compareEncodedKey(const char* low, const char* key, const char* high, Int32 keyLen, NABoolean checkLast) |
| { |
| Int32 cmpLow = memcmp(low, key, keyLen); |
| Int32 cmpHigh = memcmp(key, high, keyLen); |
| |
| if ( cmpLow <= 0 && cmpHigh < 0 ) |
| return TRUE; |
| |
| return (checkLast && cmpLow <= 0 && cmpHigh <= 0); |
| } |
| |
| NABoolean |
| compareAsciiKey(const char* low, const char* key, const char* high, Int32, NABoolean checkLast) |
| { |
| Int32 cmpLow = strverscmp(low, key); |
| Int32 cmpHigh = strverscmp(key, high); |
| |
| if ( cmpLow <= 0 && cmpHigh < 0 ) |
| return TRUE; |
| |
| return (checkLast && cmpLow <= 0 && cmpHigh <= 0); |
| } |
| |
| |
| // find a boundary pair [low, high) with smallest low value in which keys fall, and return the |
| // index of the boundary low. Return -1 otherwise, or the key lengths are different. |
| Int32 RangePartitionBoundaries::findBeginBoundary(char* encodedKey, Int32 keyLen, |
| compFuncPtrT compFunc) const |
| { |
| // boundaries are stored in entries in the range [0, partitionCount_] |
| for (Lng32 i=0; i<=partitionCount_-1; i++ ) { |
| |
| const char* low = getBinaryBoundaryValue(i); |
| const char* high = getBinaryBoundaryValue(i+1); |
| |
| // test if encodedKey is in [low, high) |
| if ( (*compFunc)(low, encodedKey, high, keyLen, i==partitionCount_-1) ) |
| return i; |
| } |
| |
| return -1; |
| } |
| |
| // find a boundary pair [low, high) with the largest low value in which keys fall, and return the |
| // index of the boundary low. Return -1 otherwise, or the key lengths are different. |
| Int32 RangePartitionBoundaries::findEndBoundary(char* encodedKey, Int32 keyLen, |
| compFuncPtrT compFunc) const |
| { |
| // boundaries are stored in entries in the range [0, partitionCount_] |
| for (Lng32 i=partitionCount_-1; i>= 0; i--) { |
| |
| const char* low = getBinaryBoundaryValue(i); |
| const char* high = getBinaryBoundaryValue(i+1); |
| |
| // test if encodedKey is in [low, high) |
| if ( (*compFunc)(low, encodedKey, high, keyLen, i==partitionCount_-1) ) |
| return i; |
| } |
| |
| return -1; |
| } |
| |
| Int32 |
| RangePartitioningFunction::computeNumOfActivePartitions(SearchKey* skey, const TableDesc* tDesc) const |
| { |
| const RangePartitionBoundaries* boundaries = getRangePartitionBoundaries(); |
| |
| Int32 origPartitions = getCountOfPartitions(); |
| Int32 partitions = origPartitions; |
| Int32 bIndex = 0; |
| |
| const NATable* naTable = tDesc->getNATable(); |
| |
| if ( naTable->isHiveTable() ) |
| return origPartitions; |
| |
| NABoolean isNativeHbase = (naTable->isHbaseCellTable() || naTable->isHbaseRowTable()); |
| compFuncPtrT compFuncPtr = ( isNativeHbase ) ? compareAsciiKey: compareEncodedKey; |
| |
| char* buf = NULL; |
| Int32 len = 0; |
| |
| const ValueIdList& beginKey = skey->getBeginKeyValues(); |
| |
| if ( beginKey.computeEncodedKey(tDesc, FALSE, buf, len) ) { |
| |
| bIndex = boundaries->findBeginBoundary(buf, len, compFuncPtr); |
| |
| if ( bIndex < 0 ) |
| return origPartitions; // error in deciding the partiton |
| else |
| partitions -= bIndex; // bIndex is 0 based. |
| } |
| |
| const ValueIdList& endKey = skey->getEndKeyValues(); |
| |
| if ( endKey.computeEncodedKey(tDesc, TRUE, buf, len) ) { |
| |
| Int32 eIndex = boundaries->findEndBoundary(buf, len, compFuncPtr); |
| |
| if ( eIndex < 0 ) // error in deciding the partition. |
| return origPartitions; |
| |
| if ( eIndex >= bIndex ) //eIndex is also 0 based |
| partitions -= (getCountOfPartitions() - eIndex - 1); |
| else |
| return origPartitions; // end partition is preceeding the start partition! |
| } |
| |
| return partitions; |
| } |
| |
| |
| |
| // *********************************************************************** |
| // LogPhysPartitioningFunction |
| // *********************************************************************** |
| |
| LogPhysPartitioningFunction::LogPhysPartitioningFunction( |
| PartitioningFunction * logPartFunc, |
| PartitioningFunction * physPartFunc, |
| logPartType logPartType, |
| Lng32 numOfClients, |
| NABoolean usePapa, |
| NABoolean synchronousAccess, |
| NAMemory* heap) |
| : PartitioningFunction(LOGPHYS_PARTITIONING_FUNCTION,heap), |
| logPartFunc_(logPartFunc),physPartFunc_(physPartFunc), |
| realPartFunc_(NULL),logPartType_(logPartType), |
| numOfClients_(numOfClients),usePapa_(usePapa), |
| synchronousAccess_(synchronousAccess) |
| { |
| ValueIdSet partKey(logPartFunc->getPartitioningKey()); |
| |
| partKey += physPartFunc->getPartitioningKey(); |
| setPartKey(partKey); |
| } |
| |
| LogPhysPartitioningFunction::~LogPhysPartitioningFunction() |
| {} |
| |
| const LogPhysPartitioningFunction * |
| LogPhysPartitioningFunction::castToLogPhysPartitioningFunction() const |
| { |
| return this; |
| } |
| |
| Lng32 LogPhysPartitioningFunction::getCountOfPartitions() const |
| { |
| return physPartFunc_->getCountOfPartitions(); |
| } |
| |
| PartitioningRequirement* |
| LogPhysPartitioningFunction::makePartitioningRequirement() |
| { |
| CMPASSERT(physPartFunc_); |
| return physPartFunc_ -> makePartitioningRequirement(); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // LogPhysPartitioningFunction::getNodeMap() |
| // Return appropriate node map from underlying partitioning function. |
| // ----------------------------------------------------------------------- |
| const NodeMap* |
| LogPhysPartitioningFunction::getNodeMap() const |
| { |
| |
| // -------------------------------------------------------------------------- |
| // If no "real" partitioning function exists, extract node map from |
| // underlying physical partitioning function; otherwise extract node map from |
| // "real" partitioning function. |
| // -------------------------------------------------------------------------- |
| if (realPartFunc_ == NULL) |
| { |
| return physPartFunc_->getNodeMap(); |
| } |
| else |
| { |
| return realPartFunc_->getNodeMap(); |
| } |
| |
| } // PartitioningFunction::getNodeMap() |
| |
| // get any existing (logical or physical) nodemap (or synthesize one) that |
| // matches logPartFunc_'s partition count requirement |
| NodeMap* LogPhysPartitioningFunction::getOrMakeSuitableNodeMap |
| (NABoolean forESP) const |
| { |
| // we are the LogPhysPartitioningFunction of the synthesized physical |
| // property of an Exchange's child (that executes in DP2). |
| |
| // if logical partn func's nodemap entry count == log partn func partn count |
| // then return logical partn func's nodemap |
| NodeMap *nodemap = (NodeMap*)logPartFunc_->getNodeMap(); |
| ULng32 partCnt=(ULng32)logPartFunc_->getCountOfPartitions(); |
| if (nodemap && nodemap->getNumEntries() == partCnt) { |
| // the logical partitioning of the result already has a suitable nodemap. |
| // fall thru and use it. |
| } |
| else { |
| // logical partitioning function has no nodemap. try to find one in the |
| // physical partitioning function of the child. |
| |
| // if phys partn func's nodemap entry count == log partn func partn count |
| // then return phys partn func's nodemap. |
| nodemap = (NodeMap*)physPartFunc_->getNodeMap(); |
| if (nodemap && nodemap->getNumEntries() == partCnt) { |
| // this can happen in "select * from t035t6;" and other queries against |
| // vertically partitioned tables like t035t6 (see regress/core/test035). |
| // fall thru and use it. |
| } |
| else { |
| // only as a last resort do we synthesize a new nodemap |
| nodemap = nodemap->synthesizeLogicalMap(partCnt, forESP); |
| } |
| } |
| if (logPartFunc_->castToReplicateNoBroadcastPartitioningFunction()) { |
| for(CollIndex i = 0; i < partCnt; i++) { |
| nodemap->setPartitionState(i, NodeMapEntry::ACTIVE); |
| } |
| } |
| return nodemap; |
| } |
| |
| PartitioningFunction* LogPhysPartitioningFunction::copy() const |
| { |
| return new(CmpCommon::statementHeap()) LogPhysPartitioningFunction(*this); |
| } |
| |
| NABoolean |
| LogPhysPartitioningFunction::canProducePartitioningKeyPredicates() const |
| { |
| // this is an exception to the rule: for a LogPhysPartitioningFunction |
| // we assume that there is no need to enforce the physical partitioning |
| // key predicates, since each DP2 partition has only its own data |
| // anyway. Therefore, for the purpose of evaluating partitioning key |
| // predicates and partition input values, we deal with the logical |
| // partitioning function only, and only in the case where the PA |
| // node isn't doing the grouping. |
| |
| // we can do this if we can produce the "real" partitioning function |
| // and if the real one can produce part key preds |
| return (logPartFunc_->canProducePartitioningKeyPredicates() OR |
| logPartType_ == PA_PARTITION_GROUPING OR |
| logPartType_ == PA_GROUPED_REPARTITIONING); |
| } |
| |
| void LogPhysPartitioningFunction::createPartitioningKeyPredicates() |
| { |
| CMPASSERT(logPartFunc_->canProducePartitioningKeyPredicates()); |
| |
| // see comment in canProducePartitioningKeyPredicates() above |
| storePartitioningKeyPredicates(logPartFunc_->getPartitioningKeyPredicates()); |
| storePartitionInputValues(logPartFunc_->getPartitionInputValuesLayout()); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // LogPhysPartitioningFunction::replacePivs() |
| // ----------------------------------------------------------------------- |
| void LogPhysPartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Overwrite the old pivs, part key preds, and part expr. with the new ones. |
| storePartitionInputValues(newPivs); |
| storePartitioningKeyPredicates(newPartKeyPreds); |
| // Also overwrite them in the logical partitioning function. |
| logPartFunc_->replacePivs(newPivs,newPartKeyPreds); |
| } // LogPhysPartitioningFunction::replacePivs() |
| |
| ItemExpr* LogPhysPartitioningFunction::createPartitioningExpression() |
| { |
| CMPASSERT(0); // should never reach here, it's not legal to do a |
| // repartitioning for a logphys part func |
| return NULL; |
| } |
| |
| PartitioningFunction* |
| LogPhysPartitioningFunction::createRealPartitioningFunction() |
| { |
| if (realPartFunc_) |
| return realPartFunc_; |
| |
| switch (logPartType_) |
| { |
| case PA_PARTITION_GROUPING: |
| case PA_GROUPED_REPARTITIONING: |
| // for partition grouping or repartitioning we never split any |
| // physical partitions and we are processing only physical |
| // partitions at this point (logical partitions come into the |
| // picture once we reach the DP2 exchange) |
| realPartFunc_ = physPartFunc_; |
| break; |
| |
| case LOGICAL_SUBPARTITIONING: |
| { |
| // Merge the split ranges of logical and physical partitioning |
| // functions, so we can count the exact number of partitions. |
| |
| const RangePartitioningFunction *lpf = |
| logPartFunc_->castToRangePartitioningFunction(); |
| const RangePartitioningFunction *ppf = |
| physPartFunc_->castToRangePartitioningFunction(); |
| |
| // Log and phys partitioning functions must have the same key |
| // to be able to produce a common function |
| if (lpf AND ppf AND |
| lpf->getOrderOfKeyValues() == ppf->getOrderOfKeyValues()) |
| { |
| |
| // Make a new RangePartitioningFunction object with the combined |
| // split boundaries of the two. Also produce a corresponding |
| // node map for the new boundaries. |
| NodeMap *mergedMap = new (CmpCommon::statementHeap()) |
| NodeMap(CmpCommon::statementHeap()); |
| RangePartitionBoundaries *rb = |
| ppf->getRangePartitionBoundaries()->merge( |
| *lpf->getRangePartitionBoundaries(), |
| *ppf->getNodeMap(), |
| *mergedMap); |
| |
| // return the merged partitioning function if we could merge |
| if (rb) |
| realPartFunc_ = new(CmpCommon::statementHeap()) |
| RangePartitioningFunction( |
| ppf->getPartitioningKey(), |
| ppf->getKeyColumnList(), |
| ppf->getOrderOfKeyValues(), |
| rb, |
| mergedMap ); |
| } |
| else |
| { |
| // Special case of single partition physical partitioning function. |
| if (physPartFunc_->castToSinglePartitionPartitioningFunction()) |
| { |
| |
| realPartFunc_ = logPartFunc_; |
| |
| // Create a new node map. |
| NodeMap *nodeMap = new(CmpCommon::statementHeap()) |
| NodeMap(CmpCommon::statementHeap()); |
| |
| // Add a copy of the single physical node map entry for |
| // each partition in the logical node map. |
| const NodeMapEntry *entry = physPartFunc_->getNodeMapEntry(0); |
| for (CollIndex nodeIdx = 0; |
| nodeIdx < (CollIndex) logPartFunc_->getCountOfPartitions(); |
| nodeIdx++) |
| { |
| nodeMap->setNodeMapEntry(nodeIdx, |
| *entry, |
| CmpCommon::statementHeap()); |
| } |
| realPartFunc_->replaceNodeMap(nodeMap); |
| |
| } |
| } |
| } |
| break; |
| |
| default: |
| break; |
| } |
| |
| return realPartFunc_; // is NULL if we aren't smart enough to do this |
| } |
| |
| COMPARE_RESULT |
| LogPhysPartitioningFunction::comparePartFuncToFunc |
| (const PartitioningFunction &other) const |
| { |
| COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other); |
| |
| if (c != SAME) |
| return INCOMPATIBLE; |
| |
| const LogPhysPartitioningFunction &lppfOther = |
| (const LogPhysPartitioningFunction &) other; |
| |
| if (getLogPartType() != lppfOther.getLogPartType()) |
| return INCOMPATIBLE; |
| |
| if (getNumOfClients() != lppfOther.getNumOfClients()) |
| return INCOMPATIBLE; |
| |
| if (getUsePapa() != lppfOther.getUsePapa()) |
| return INCOMPATIBLE; |
| |
| if (getSynchronousAccess() != lppfOther.getSynchronousAccess()) |
| return INCOMPATIBLE; |
| |
| if (getLogPartitioningFunction()->comparePartFuncToFunc( |
| *lppfOther.getLogPartitioningFunction()) != SAME) |
| return INCOMPATIBLE; |
| |
| if (getPhysPartitioningFunction()->comparePartFuncToFunc( |
| *lppfOther.getPhysPartitioningFunction()) != SAME) |
| return INCOMPATIBLE; |
| |
| return SAME; |
| } |
| |
| NABoolean LogPhysPartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| // Handle the trivial case of two functions that are equal. The only |
| // other way to check for a grouping is to actually form the "real" |
| // partitioning function and use it for the check. |
| return (comparePartFuncToFunc(other) == SAME OR |
| (realPartFunc_ AND |
| realPartFunc_->isAGroupingOf(other,maxPartsPerGroup))); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // LogPhysPartitioningFunction::remapIt() |
| // ----------------------------------------------------------------------- |
| void LogPhysPartitioningFunction::remapIt |
| (const PartitioningFunction* opf, |
| ValueIdMap& map, NABoolean mapItUp) |
| { |
| // Currently, the only consumer of this method is |
| // MapValueIds::synthPhysicalProperty, when it executes in DP2 |
| |
| // Map myself |
| PartitioningFunction::remapIt(opf,map,mapItUp); |
| // Map the logical function |
| logPartFunc_ = logPartFunc_->copyAndRemap(map,mapItUp); |
| // Map the physical function |
| physPartFunc_ = physPartFunc_->copyAndRemap(map,mapItUp); |
| } // LogPhysPartitioningFunction::remapIt() |
| |
| // LogPhysPartitioningFunction::canMaintainSortOrder() |
| // Can this logPhys partitioning function maintain the order of an |
| // individual partition of the physical partitioning function. In |
| // order to maintain the order, a merge expression may be required. |
| // |
| NABoolean |
| LogPhysPartitioningFunction::canMaintainSortOrder(const ValueIdList& sortOrder) |
| const |
| { |
| // The sortOrder is the sort order of an individual partition of the |
| // physical partitioning function. |
| // |
| |
| |
| CollIndex numSortKeys = sortOrder.entries(); |
| |
| // We can always maintain, no sort order. |
| // |
| if(numSortKeys == 0) { |
| return TRUE; |
| } |
| |
| // For now, only concerned with table/index partitioning functions. |
| // (Single, Range or TableHash) |
| // |
| const SinglePartitionPartitioningFunction *sppf = |
| physPartFunc_->castToSinglePartitionPartitioningFunction(); |
| |
| // A single partition can always maintain its order. |
| // |
| if(sppf) { |
| return TRUE; |
| } |
| |
| const RangePartitioningFunction *rpf = |
| physPartFunc_->castToRangePartitioningFunction(); |
| |
| const TableHashPartitioningFunction *hdpf = |
| physPartFunc_->castToTableHashPartitioningFunction(); |
| |
| CMPASSERT(rpf || hdpf); |
| |
| if(!getSynchronousAccess()) { |
| // Can always merge the streams, even if (in the case of |
| // non-decoupled range) this results in serial access to the |
| // partitions. |
| // |
| return TRUE; |
| } |
| |
| if(rpf) { |
| |
| // For range partitioning, if the partitioning keys are a prefix |
| // of the clustering key, then in order to maintain the sort order |
| // the partitions must be accessed synchronously. |
| // |
| // If the partitioning keys are not a prefix of the clustering |
| // key, then the partitions can be merged if the partitions are |
| // accessed asynchronously. (This case is handled above) |
| // |
| |
| // Determine if the sortkey is a prefix of the partitioning key or |
| // if the partitioning key is a prefix of the sortkey. If either |
| // of these is true, the sort order can be maintained if the |
| // partitions are accessed synchronously. |
| // |
| |
| const ValueIdList &partKeyCols = rpf->getOrderOfKeyValues(); |
| |
| // This logic may have to change when we become more sophisticated |
| // in our sort keys and sort requirements. For example, if the |
| // sort req is (a/2) and the sort key is a |
| // |
| if (partKeyCols.entries() <= numSortKeys) { |
| |
| OrderComparison oc= sortOrder.satisfiesReqdOrder(partKeyCols); |
| if(oc == SAME_ORDER || |
| (CmpCommon::getDefault(ATTEMPT_REVERSE_SYNCHRONOUS_ORDER) == DF_ON && |
| oc == INVERSE_ORDER) |
| ) |
| |
| { |
| // Range, non-decoupled, sort order == partition order, |
| // synchronous access: therefore can maintain order. |
| // |
| return TRUE; |
| } |
| else { |
| // Not a prefix, so in order to maintain order, must be |
| // accessing the partitions asynchronously so that a merge |
| // expression can be used. |
| // |
| return FALSE; |
| } |
| } else { |
| if(partKeyCols.satisfiesReqdOrder(sortOrder) == SAME_ORDER) { |
| // Range, non-decoupled, sort order == partition order, |
| // synchronous access: therefore can maintain order. |
| // |
| return TRUE; |
| } else { |
| // Not a prefix, so in order to maintain order, must be |
| // accessing the partitions asynchronously so that a merge |
| // expression can be used. |
| // |
| return FALSE; |
| } |
| } |
| } else { |
| |
| // For Hash, synchronous (and all other partitioning functions) |
| // cannot maintain order. |
| // |
| return FALSE; |
| } |
| } // LogPhysPartitioningFunction::canMaintainSortOrder() |
| |
| |
| const NAString LogPhysPartitioningFunction::getText() const |
| { |
| NAString result("logphys partitioned(", CmpCommon::statementHeap()); |
| |
| switch (logPartType_) |
| { |
| case PA_PARTITION_GROUPING: |
| result += "grouping"; |
| break; |
| case LOGICAL_SUBPARTITIONING: |
| result += "subpartitioning"; |
| break; |
| case HORIZONTAL_PARTITION_SLICING: |
| result += "horizontal slicing"; |
| break; |
| case PA_GROUPED_REPARTITIONING: |
| result += "repartitioning"; |
| break; |
| default: |
| result += "???"; |
| break; |
| } |
| if (usePapa_) |
| { |
| char numPAs[20]; |
| |
| result += ", PAPA with "; |
| sprintf(numPAs,"%d",numOfClients_); |
| result += numPAs; |
| result += " PA(s)"; |
| } |
| |
| result += ", log=" + logPartFunc_->getText() + |
| ", phys=" + physPartFunc_->getText() + ")"; |
| return result; |
| } |
| |
| const NAString LogPhysPartitioningFunction::getLogForSplitTop() const |
| { |
| NAString result("", CmpCommon::statementHeap()); |
| |
| switch (logPartType_) |
| { |
| case PA_PARTITION_GROUPING: |
| //whenever the logical partitioning function is replicate no broadcast |
| //it is not truly the case of grouping |
| if (logPartFunc_->isAReplicateNoBroadcastPartitioningFunction()) |
| result += "no grouping, "; |
| else { |
| char numParts[20]; |
| result += "grouped "; |
| sprintf(numParts, "%d to %d,", logPartFunc_->getCountOfPartitions(), physPartFunc_->getCountOfPartitions()); |
| result += numParts; |
| } |
| break; |
| case LOGICAL_SUBPARTITIONING: |
| result += "subpartitioned, "; |
| break; |
| case HORIZONTAL_PARTITION_SLICING: |
| result += "horizontally sliced, "; |
| break; |
| case PA_GROUPED_REPARTITIONING: |
| result += "repartitioned, "; |
| break; |
| default: |
| result += "???, "; |
| break; |
| } |
| if (usePapa_) |
| { |
| char numPAs[20]; |
| |
| result += "PAPA with "; |
| sprintf(numPAs,"%d",numOfClients_); |
| result += numPAs; |
| result += " PA(s)"; |
| } |
| result += ", " + logPartFunc_->getText(); |
| |
| return result; |
| } |
| |
| const NAString LogPhysPartitioningFunction::getPhysForSplitTop() const |
| { |
| return physPartFunc_->getText(); |
| } |
| |
| void LogPhysPartitioningFunction::print( |
| FILE* ofd, |
| const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd,indent,"LogPhysPartitioningFunction"); |
| logPartFunc_->print(ofd,"logical: "); |
| physPartFunc_->print(ofd,"physical: "); |
| } |
| |
| // *********************************************************************** |
| // RoundRobinPartitioningFunction |
| // *********************************************************************** |
| |
| PartitioningRequirement* |
| RoundRobinPartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireRoundRobin(this); |
| } |
| |
| PartitioningFunction* |
| RoundRobinPartitioningFunction::copy() const |
| { |
| return new(CmpCommon::statementHeap()) RoundRobinPartitioningFunction(*this); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::comparePartFuncToFunc(): Compare |
| // this partitioning function to another round robin function. To be |
| // 'SAME' must have the same number and order of partitioning key |
| // columns and have the same number of partitions (scaled and |
| // original). |
| // ----------------------------------------------------------------------- |
| COMPARE_RESULT |
| RoundRobinPartitioningFunction:: |
| comparePartFuncToFunc(const PartitioningFunction &other) const |
| { |
| COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other); |
| |
| if (c != SAME) |
| return INCOMPATIBLE; |
| |
| const RoundRobinPartitioningFunction *oth = |
| other.castToRoundRobinPartitioningFunction(); |
| |
| // Since they compared 'SAME', oth should always exist, so this |
| // test is redundant. |
| // |
| if(!oth) |
| return INCOMPATIBLE; |
| |
| // They must be based on the same physical partitioning. |
| // |
| if (getCountOfOrigRRPartitions() != oth->getCountOfOrigRRPartitions()) |
| return INCOMPATIBLE; |
| |
| return SAME; |
| } // RoundRobinPartitioningFunction::comparePartFuncToFunc() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::createPartitioningKeyPredicates() |
| // ----------------------------------------------------------------------- |
| void RoundRobinPartitioningFunction::createPartitioningKeyPredicates() |
| { |
| if (NOT partKeyPredsCreated()) |
| { |
| // Create the partition input values. |
| // |
| ItemExpr *loPart = new (CmpCommon::statementHeap()) |
| HostVar("_sys_HostVarLoRoundRobinPart", |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE), |
| TRUE); |
| |
| ItemExpr *hiPart = new (CmpCommon::statementHeap()) |
| HostVar("_sys_HostVarHiRoundRobinPart", |
| new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE), |
| TRUE); |
| |
| loPart->synthTypeAndValueId(); |
| hiPart->synthTypeAndValueId(); |
| |
| ValueIdSet setOfKeyPredicates; |
| ValueIdList partInputValues; |
| |
| // ----------------------------------------------------------------- |
| // The partitioning key predicate is never used for a round |
| // robin partitioning so none is generated. |
| // ----------------------------------------------------------------- |
| |
| // the partition input values are two integer values: lo and hi part # |
| partInputValues.insert(loPart->getValueId()); |
| partInputValues.insert(hiPart->getValueId()); |
| |
| // Store the empty set of key predicate. This will set the |
| // boolean 'partKeyPredsCreated' to TRUE. |
| // |
| storePartitioningKeyPredicates(setOfKeyPredicates); |
| |
| // Store the partition input values. |
| // |
| storePartitionInputValues(partInputValues); |
| } |
| } // RoundRobinPartitioningFunction::createPartitioningKeyPredicates() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::replacePivs() |
| // ----------------------------------------------------------------------- |
| void RoundRobinPartitioningFunction::replacePivs( |
| const ValueIdList& newPivs, |
| const ValueIdSet& newPartKeyPreds) |
| { |
| // Overwrite the old pivs, part key preds, and part expr. with the new ones. |
| storePartitionInputValues(newPivs); |
| storePartitioningKeyPredicates(newPartKeyPreds); |
| } // RoundRobinPartitioningFunction::replacePivs() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::createPartitioningFunctionForIndexDesc() |
| // ----------------------------------------------------------------------- |
| PartitioningFunction* |
| RoundRobinPartitioningFunction:: |
| createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const |
| { |
| const NAFileSet * fileSet = idesc->getNAFileSet(); |
| const NAColumnArray & allColumns = fileSet->getAllColumns(); |
| const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns(); |
| |
| Lng32 ixColNumber; |
| ValueId keyValueId; |
| ValueIdSet partitioningKey; |
| |
| for (CollIndex i = 0; i < partKeyColumns.entries(); i++) |
| { |
| // which column of the index is this (usually this will be == i) |
| ixColNumber = allColumns.index(partKeyColumns[i]); |
| |
| // insert the value id of the index column into the partitioning |
| // key column value id set |
| keyValueId = idesc->getIndexColumns()[ixColNumber]; |
| partitioningKey += keyValueId; |
| } // end loop over partitioning key columns |
| |
| // ----------------------------------------------------------------- |
| // Allocate a new RoundRobinPartitioningFunction. |
| // ----------------------------------------------------------------- |
| RoundRobinPartitioningFunction *partFunc |
| = new(idesc->wHeap()) RoundRobinPartitioningFunction |
| (getCountOfPartitions(), |
| partitioningKey, |
| getNodeMap()->copy(idesc->wHeap())); |
| partFunc->createPartitioningKeyPredicates(); |
| |
| return partFunc; |
| |
| } // RoundRobinPartitioningFunction::createPartitioningFunctionForIndexDesc() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::createPartitioningExpression() |
| // ----------------------------------------------------------------------- |
| ItemExpr* RoundRobinPartitioningFunction::createPartitioningExpression() |
| { |
| if (getExpression()) // already constructed? |
| return getExpression(); // reuse it! |
| |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| // The Partitioning Key (SYSKEY) |
| // |
| ValueIdList keyValue = getPartitioningKey(); |
| |
| // The type of the partitioning key for round robin is always |
| // SQLLargeInt (the type of SYSKEY) |
| // |
| NAType *desiredType = new (heap) SQLLargeInt(heap, TRUE, FALSE); |
| |
| // The layout of the SYSKEY is |
| // |
| ItemExpr *partKey = |
| new (heap) |
| Cast(new (heap) |
| Shift(ITM_SHIFT_RIGHT, |
| new (heap) |
| Cast(keyValue[0].getItemExpr(), |
| desiredType), |
| new (heap) |
| ConstValue(32)), |
| new (heap) SQLInt(heap, FALSE,FALSE)); |
| |
| NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE); |
| |
| Lng32 numParts = getCountOfOrigRRPartitions(); |
| char buffer[20]; |
| sprintf(buffer, "%d", numParts); |
| |
| NAString numPartsStr = buffer; |
| ItemExpr *origNumParts = |
| new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr); |
| |
| numParts = getCountOfPartitions(); |
| sprintf(buffer, "%d", numParts); |
| |
| numPartsStr = buffer; |
| ItemExpr *scaledNumParts = |
| new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr); |
| |
| ItemExpr *partFunc = |
| new (heap) |
| PAGroup(new (heap) |
| ProgDistrib(partKey, origNumParts), |
| scaledNumParts, |
| origNumParts); |
| |
| partFunc->synthTypeAndValueId(); |
| storeExpression(partFunc); |
| return partFunc; |
| } // RoundRobinPartitioningFunction::createPartitioningExpression() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::createPartitionSelectionExpr() |
| // Create the partition selection expression. 'Partition selection' |
| // means that an expression is used to determine the partition to |
| // access as opposed to using the File System to determine the range |
| // of partitions to access based on a set of partitioning key |
| // predicates. Partition selection is currently used for Hash Dist and |
| // Round Robin Partitioning. And the File System is used for Range |
| // Partitioning. If a partitioning selection expression is created, |
| // it is cached in the data member 'partitionSelectionExpr_' and the |
| // partition selection inputs are generated and stored in |
| // 'partitionSelectionExprInputs_'. This method is redefined for |
| // HashDistPartitioningFunction and RoundRobinPartitioningFunction. |
| // ----------------------------------------------------------------------- |
| ItemExpr * |
| RoundRobinPartitioningFunction:: |
| createPartitionSelectionExpr(const SearchKey *partSearchKey, |
| const ValueIdSet &availableValues) |
| { |
| // If it has already been created, return cached version. |
| // |
| if(partitionSelectionExpr()) |
| return partitionSelectionExpr(); |
| |
| CollHeap *heap = CmpCommon::statementHeap(); |
| |
| // Use a host var to provide access to numParts, this will be |
| // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen() |
| // |
| ItemExpr *numParts = new (heap) HostVar("_sys_hostVarNumParts", |
| // int not null |
| new (heap) SQLInt(heap, FALSE, FALSE), |
| // is system-supplied |
| TRUE); |
| numParts->synthTypeAndValueId(); |
| |
| // Use a host var to provide access to partNum, this will be |
| // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen() |
| // |
| ItemExpr *partNum = new (heap) HostVar("_sys_hostVarPartNo", |
| // int not null |
| new (heap) SQLInt(heap, FALSE, FALSE), |
| // is system-supplied |
| TRUE); |
| partNum->synthTypeAndValueId(); |
| |
| // Record these hostvars as the inputs to the partitionSelectionExpr. |
| // |
| CMPASSERT(partitionSelectionExprInputs().entries() == 0); |
| partitionSelectionExprInputs().insert(partNum->getValueId()); |
| partitionSelectionExprInputs().insert(numParts->getValueId()); |
| |
| if(assignPartition()) { |
| // If partitionAssignment is to be done, the partition selection |
| // expression is Modulus(Cast((partNum+1),SQLInt),numParts). The |
| // cast (to SQLInt) is there because the type synthesized for |
| // partNum+1 is a large int, which Modulus can't handle at the |
| // moment. So, we just cast the sum to a SQLInt for now. |
| // Probably the best thing to do here is to somehow force the type |
| // synthesis for BiArith(ITM_PLUS) so that when we add two SQLInts |
| // we get a result of SQLInt, rather than something bigger. This |
| // doesn't follow SQL type synthesis rules, but that is OK, since |
| // the expression is generated internally. $$ TBD $$ |
| |
| |
| // Construct and cache the partitionSelectionExpr for Round Robin |
| // (partition assignment). |
| // |
| partitionSelectionExpr() = |
| new (heap) Modulus(new (heap) Cast(new (heap) |
| BiArith(ITM_PLUS, partNum, |
| new (heap) SystemLiteral(1)), |
| new (heap) SQLInt(heap, TRUE,FALSE)), |
| numParts); |
| |
| // Bind the expression. |
| // |
| partitionSelectionExpr()->synthTypeAndValueId(); |
| |
| // PreCodeGen the expression (This maybe should go in |
| // RoundRobinPartitioningFunction::preCodeGen(), but preCodeGen() |
| // is typically called before this expression is generated. |
| // |
| partitionSelectionExpr()->replaceVEGExpressions(availableValues, |
| availableValues); |
| |
| return partitionSelectionExpr(); |
| |
| } else if(partSearchKey->isUnique()) { |
| |
| // For now, only support a partition selection expression when the |
| // partition search key is unique (identifies exactly one partition). |
| // |
| |
| const ValueIdList &keyValues = partSearchKey->getBeginKeyValues(); |
| |
| // The type of the partitioning key for round robin is always |
| // SQLLargeInt (the type of SYSKEY) |
| // |
| NAType *desiredType = new (heap) SQLLargeInt(heap,TRUE, FALSE); |
| |
| // The partition selection expression is: |
| // |
| // ProgDistrib(Cast(ShiftRight(Cast(<keyvalue>, SQLLargeInt), 32), SQLInt), |
| // numParts) |
| // |
| // First, the <keyvalue> is cast to a SQLLargeInt to make it the |
| // same type as the partitioning key (SYSKEY). This value is then |
| // shifted right 32 bits to remove the 16 bits of padding |
| // introduced for packing and 16 bits of random data used to make |
| // sure the key is unique, but not used to compute the partition |
| // number. This is then cast to a SQLInt to make it compatible |
| // with ProgDistrib. Then ProgDistrib calculates the partNum. |
| |
| // Construct and cache the partitionSelectionExpr for Round Robin |
| // (partition assignment). |
| // |
| partitionSelectionExpr() = |
| new (heap) ProgDistrib(new (heap) |
| Cast(new (heap) |
| Shift(ITM_SHIFT_RIGHT, |
| new (heap) |
| Cast(keyValues[0].getItemExpr(), |
| desiredType), |
| new (heap) |
| SystemLiteral(0x1000)), |
| new (heap) SQLInt(heap, FALSE,FALSE)), |
| numParts); |
| |
| // Bind the expression. |
| // |
| partitionSelectionExpr()->synthTypeAndValueId(); |
| |
| // PreCodeGen the expression (This maybe should go in |
| // RoundRobinPartitioningFunction::preCodeGen(), but preCodeGen() |
| // is typically called before this expression is generated. |
| // |
| partitionSelectionExpr()->replaceVEGExpressions(availableValues, |
| availableValues); |
| |
| return partitionSelectionExpr(); |
| |
| } else { |
| return NULL; |
| } |
| |
| } // RoundRobinPartitioningFunction::createPartitionSelectionExpr() |
| |
| //::scaleNUmberOfPartitions() are called in following locations |
| // |
| // OptPhysRelExpr.cpp |
| // 3536 NJ::genLeftChild() |
| // 4892 NJ:createContextForChild() (rightPartFunc->) |
| // 14166 and 14262 synthDP2PhysicalProperty() |
| // |
| // GP.cpp |
| // 955 GroupAttributes::recommendedOrderForNJProbing() |
| // |
| //As a result, the following version will not be called. |
| |
| PartitioningFunction * |
| RoundRobinPartitioningFunction:: |
| scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions, |
| PartitionGroupingDistEnum partGroupDist) |
| { |
| if (suggestedNewNumberOfPartitions == 1) |
| return new(CmpCommon::statementHeap()) |
| SinglePartitionPartitioningFunction(); |
| |
| // If an expression has been generated, then we want to discard it |
| // because it may no longer be correct. |
| storeExpression(NULL); |
| |
| // Allow arbitrary scaling of RoundRobinPartitioningFunctions. |
| // (The runtime will handle the mapping of physical partitions |
| // to logical partitions.) |
| // |
| |
| |
| if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) { |
| NodeMap* newNodeMap = |
| new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions); |
| replaceNodeMap(newNodeMap); |
| } |
| |
| partitionCount_ = suggestedNewNumberOfPartitions; |
| return this; |
| } // RoundRobinPartitioningFunction::scaleNumberOfPartitions() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::isAGroupingOf() |
| // ----------------------------------------------------------------------- |
| NABoolean |
| RoundRobinPartitioningFunction:: |
| isAGroupingOf(const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| const RoundRobinPartitioningFunction *oth = |
| other.castToRoundRobinPartitioningFunction(); |
| |
| // If other is not a RoundRobinPartitioningFunction, then it cannot |
| // be a grouping of... |
| // |
| if(!oth) |
| return FALSE; |
| |
| // To be a grouping of, the key column (SYSKEY) must be the same. |
| // |
| if (! (getPartitioningKey() == oth->getPartitioningKey())) |
| return FALSE; |
| |
| |
| // If this function has more partitions than other, |
| // then it cannot be a grouping of. |
| // Eg. this.numParts: 10 this.origNumParts: 20 |
| // oth.numParts: 5 oth.origNumParts: 20 |
| // |
| // If the two functions are not based on the same physical function, |
| // then it cannot be a grouping of. |
| // Eg. this.numParts: 10 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 30 |
| // |
| if((getCountOfPartitions() > oth->getCountOfPartitions()) || |
| (getCountOfOrigRRPartitions() != oth->getCountOfOrigRRPartitions())) { |
| return FALSE; |
| } |
| |
| // Here the following is known to be TRUE: |
| // |
| // (getCountOfPartitions() <= oth->getCountOfPartitions() |
| // |
| // AND |
| // |
| // (getCountOfOrigRRPartitions() == oth->getCountOfOrigRRPartitions()) |
| // |
| // Eg. this.numParts: 10 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| |
| // If other has not been scaled (allow arbitrary scaling of one function): |
| // Eg. this.numParts: 7 this.origNumParts: 20 |
| // oth.numParts: 20 oth.origNumParts: 20 |
| // OR |
| // If they have both been scaled to the same number of partitions: |
| // then it is a grouping of. |
| // Eg. this.numParts: 7 this.origNumParts: 20 |
| // oth.numParts: 7 oth.origNumParts: 20 |
| // |
| if((oth->getCountOfPartitions() == oth->getCountOfOrigRRPartitions()) || |
| (getCountOfPartitions() == oth->getCountOfPartitions())) { |
| |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = |
| ((oth->getCountOfPartitions() + getCountOfPartitions() - 1) |
| / getCountOfPartitions()); |
| |
| return TRUE; |
| } |
| |
| // WARNING..... I am not sure if the current code can ever produce |
| // a situation that would bring control to here. Also, I am not |
| // sure if the semantics of GROUPING implemented below are correct |
| // for these situations. |
| // |
| // Here the following is known to be TRUE: |
| // |
| // both functions have been scaled. (I DON'T THINK THIS CAN HAPPEN) |
| // |
| // AND |
| // |
| // They are scaled to different sizes. |
| // |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 7 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| |
| // Under these conditions, three things must be true for it to be a |
| // grouping of: |
| // |
| // - the scaled number of partitions must evenly divide the scaled |
| // number of partitions of other |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // - the other scaling must be a multiple of or evenly divide the |
| // original number of partitions |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 40 oth.origNumParts: 20 |
| // |
| // - this scaling must also be a multiple of or evenly divide the |
| // original number of partitions |
| // Eg. this.numParts: 5 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| // this.numParts: 40 this.origNumParts: 20 |
| // oth.numParts: 10 oth.origNumParts: 20 |
| // |
| |
| // If the scaled number of partitions evenly divides the scaled |
| // number of partitions of other... |
| // |
| if((oth->getCountOfPartitions() % getCountOfPartitions()) != 0) { |
| return FALSE; |
| } |
| |
| // AND the other scaling is a multiple of or evenly divides the |
| // original number of partitions... |
| // |
| if(oth->getCountOfOrigRRPartitions() >= oth->getCountOfPartitions()) { |
| if(oth->getCountOfOrigRRPartitions() % oth->getCountOfPartitions()) { |
| return FALSE; |
| } |
| } else { |
| if(oth->getCountOfPartitions() % oth->getCountOfOrigRRPartitions()) { |
| return FALSE; |
| } |
| } |
| |
| // AND this scaling is a multiple of or evenly divides the original |
| // number of partitions ... |
| // |
| if(getCountOfOrigRRPartitions() >= getCountOfPartitions()) { |
| if(getCountOfOrigRRPartitions() % getCountOfPartitions()) { |
| return FALSE; |
| } |
| } else { |
| if(getCountOfPartitions() % getCountOfOrigRRPartitions()) { |
| return FALSE; |
| } |
| } |
| |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = |
| ((oth->getCountOfPartitions() + getCountOfPartitions() - 1) |
| / getCountOfPartitions()); |
| |
| // THEN it is a grouping of... |
| // |
| return TRUE; |
| |
| } // RoundRobinPartitioningFunction::isAGroupingOf() |
| |
| // ----------------------------------------------------------------------- |
| // RoundRobinPartitioningFunction::codeGen() is defined in |
| // generator/GenPartFunc.cpp |
| // ----------------------------------------------------------------------- |
| |
| // ----------------------------------------------------------------------- |
| // Make a new partSearchKey that indicates that PA_PARTITION_GROUPING |
| // is being done. Note that a search key can not be generated which |
| // can group RR partitions. For RoundRobinPartitioning, a flag in the |
| // search key is used to indicate that PA_PARTITION_GROUPING is being |
| // done and the begin/end key values of the search key are set to the |
| // partition input values of the partitioning function. |
| // ----------------------------------------------------------------------- |
| SearchKey * |
| RoundRobinPartitioningFunction::createSearchKey(const IndexDesc *indexDesc, |
| ValueIdSet availInputs, |
| ValueIdSet additionalPreds) const |
| { |
| ValueIdSet preds(getPartitioningKeyPredicates()); |
| ValueIdSet nonKeyColumnSet; // empty set |
| |
| availInputs += getPartitionInputValues(); |
| preds += additionalPreds; |
| |
| // Call this special constructor that constructs a search key for a |
| // RoundRobinPartitioningFunction. |
| // |
| SearchKey *partSearchKey = new (CmpCommon::statementHeap()) |
| SearchKey(indexDesc->getPartitioningKey(), |
| indexDesc->getOrderOfPartitioningKeyValues(), |
| availInputs, |
| preds, |
| this, |
| nonKeyColumnSet, |
| indexDesc); |
| |
| return partSearchKey; |
| } // RoundRobinPartitioningFunction::createSearchKey() |
| |
| void RoundRobinPartitioningFunction::setupForStatement() |
| { |
| if(setupForStatement_) |
| return; |
| |
| PartitioningFunction::setupForStatement(); |
| |
| setupForStatement_ = TRUE; |
| resetAfterStatement_ = FALSE; |
| } |
| |
| void RoundRobinPartitioningFunction::resetAfterStatement() |
| { |
| if(resetAfterStatement_) |
| return; |
| |
| PartitioningFunction::resetAfterStatement(); |
| partitionCount_ = numberOfOrigRRPartitions_; |
| |
| setupForStatement_ = FALSE; |
| resetAfterStatement_ = TRUE; |
| } |
| |
| const NAString RoundRobinPartitioningFunction::getText() const |
| { |
| NAString result("round robin partitioned ", CmpCommon::statementHeap()); |
| char nparts[20]; |
| |
| sprintf(nparts,"%d (%d)", partitionCount_, numberOfOrigRRPartitions_); |
| result += nparts; |
| result += " ways on ("; |
| getPartitioningKey().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT); |
| result += ")"; |
| return result; |
| } |
| |
| void RoundRobinPartitioningFunction::print( |
| FILE* ofd, |
| const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd,indent,"RoundRobinPartitioningFunction"); |
| } |
| |
| |
| const skewProperty ANY_SKEW_PROPERTY(skewProperty::ANY, NULL); |
| |
| NABoolean skewProperty::operator==(const skewProperty& other) const |
| { |
| if ( indicator_ != other.indicator_ ) |
| return FALSE; |
| |
| if (broadcastOneRow_ != other.broadcastOneRow_) |
| return FALSE; |
| |
| if ( skewValues_ == other.skewValues_ ) |
| return TRUE; |
| |
| if ( skewValues_ != NULL && other.skewValues_ != NULL && |
| NOT (*skewValues_ == *(other.skewValues_)) ) |
| return FALSE; |
| |
| if ( numESPs_ == other.numESPs_ ) |
| return TRUE; |
| |
| return FALSE; |
| } |
| |
| const NAString skewProperty::getText(NABoolean abbre) const |
| { |
| NAString result; |
| char esps[20]; |
| NAString suffix; |
| |
| if ( abbre == FALSE && getAntiSkewESPs() > 0 ) { |
| suffix += "(via "; |
| str_itoa(getAntiSkewESPs(), esps); |
| suffix += esps; |
| suffix += " ESP(s)) "; |
| } |
| |
| switch ( getIndicator() ) |
| { |
| case skewProperty::UNIFORM_DISTRIBUTE: |
| |
| if ( abbre == FALSE ) { |
| result += " - uniformly distribute "; |
| result += suffix; |
| result += "skewed "; |
| |
| if ( skewValues_ ) |
| result += skewValues_->getText(); |
| } else { |
| result += "-ud"; |
| } |
| break; |
| |
| case skewProperty::BROADCAST: |
| |
| if ( abbre == FALSE ) { |
| result += " - broadcast "; |
| result += suffix; |
| result += "skewed "; |
| |
| if ( skewValues_ ) |
| result += skewValues_->getText(); |
| |
| if (getBroadcastOneRow()) |
| result += " and one row "; |
| } else { |
| result += "-br"; |
| } |
| break; |
| |
| default: |
| break; |
| } |
| |
| return result; |
| } |
| |
| Int64List* |
| SkewedDataPartitioningFunction::buildHashListForSkewedValues() |
| { |
| if ( skewHashList_ ) |
| return skewHashList_; |
| |
| const SkewedValueList* svlist = getSkewProperty().getSkewValues(); |
| |
| if ( svlist == NULL ) |
| return NULL; |
| |
| skewHashList_ = new (STMTHEAP) Int64List(STMTHEAP, svlist->entries()); |
| CollHeap *heap = CmpCommon::statementHeap(); |
| char data[10]; Int32 len = 0; |
| |
| ConstValue* cvExp = NULL; |
| ItemExpr* expForSkewedValue = NULL; |
| ValueIdList exprs; |
| UInt32 hashval; |
| |
| if ( !svlist->needToComputeFinalHash() ) { |
| |
| // for TRUE MC-SB. hash value for the composite skew value is pre-computed. |
| for (CollIndex i = 0; i < svlist->entries(); i++) { |
| hashval = (UInt32)((*svlist)[i].getDblValue()); |
| skewHashList_ -> insertAt(i, hashval); |
| } |
| |
| } else { |
| |
| const NAType* naType = svlist->getNAType(); |
| NABoolean useHash = naType->useHashRepresentation(); |
| UInt32 flags = ExHDPHash::NO_FLAGS; |
| |
| for (CollIndex i = 0; i < svlist->entries(); i++) |
| { |
| if ( (*svlist)[i].getValue().isNull() ) { |
| // an untyped NULL constant. All null values hash to this constant. |
| // Ref. module exp_function.cpp, method ExHDPHash::eval() and |
| // ex_function_hash::eval(). |
| hashval = ExHDPHash::nullHashValue; //666654765; |
| } else { |
| |
| if ( useHash ) { |
| // The skew value for any character or exact numeric data type is |
| // the hash itself. So here we just copy and cast it back to UInt32. |
| hashval = (UInt32)((*svlist)[i].getDblValue()); |
| } else { |
| |
| // If we hit this branch, it means the boundary values are stored in |
| // the skew list. We can assume these values can be safely casted |
| // back (without loosing precision). Otherwise, storing-hash- |
| // value method should be used. |
| (*svlist)[i].outputToBufferToComputeRTHash(naType,data,len,flags); |
| hashval = computeHashValue(data, flags, len); |
| } |
| } |
| skewHashList_ -> insertAt(i, hashval); |
| } |
| |
| } |
| |
| return skewHashList_; |
| } |
| |
| //============================ |
| |
| HivePartitioningFunction::~HivePartitioningFunction() {} |
| |
| PartitioningRequirement* |
| HivePartitioningFunction::makePartitioningRequirement() |
| { |
| return new (CmpCommon::statementHeap()) |
| RequireHive(this); |
| } |
| |
| PartitioningFunction* |
| HivePartitioningFunction::copy() const |
| { |
| return new (CmpCommon::statementHeap()) |
| HivePartitioningFunction(*this, CmpCommon::statementHeap()); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // HivePartitioningFunction::createPartitioningKeyPredicates() |
| // ----------------------------------------------------------------------- |
| void HivePartitioningFunction::createPartitioningKeyPredicates() |
| { |
| createBetweenPartitioningKeyPredicates("_sys_HostVarLoHivePart", |
| "_sys_HostVarHiHivePart"); |
| |
| } // HivePartitioningFunction::createPartitioningKeyPredicates() |
| |
| |
| // ----------------------------------------------------------------------- |
| // HivePartitioningFunction::isAGroupingOf() |
| // ----------------------------------------------------------------------- |
| // Right now we assume that the split function of a hash partitioning |
| // function is such that no two functions are a grouping of each other. |
| // The only exception is identity, of course. We use the base class' |
| // implementation. |
| // |
| // With more knowledge about the split function (e.g. by knowing it's |
| // a simple modulus), one could guarantee that a 4-way hash-partitioning |
| // scheme is actually a grouping of an 8-way scheme. This is not really |
| // necessary and therefore not done here. |
| |
| |
| ItemExpr * |
| HivePartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) const |
| { |
| return new (CmpCommon::statementHeap()) HiveHash(expr); |
| } |
| |
| UInt32 HivePartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len) |
| { |
| // need a Hive() |
| // Directly call the implementation function to compute the hash. NULL |
| // values and VARCHAR data types are not handled. |
| return FastHash(data, len); |
| } |
| |
| ItemExpr * HivePartitioningFunction::getHashingExpression() const |
| { |
| // need a Hive() |
| ItemExpr* hashExpr = NULL; |
| ItemExpr* partExpr = getExpression(); |
| if ( partExpr ) { |
| hashExpr = partExpr->child(0); |
| |
| CMPASSERT(hashExpr AND |
| hashExpr->getOperatorType()== ITM_HASH); |
| } |
| return hashExpr; |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // Method for debugging. |
| // ----------------------------------------------------------------------- |
| const NAString HivePartitioningFunction::getText() const |
| { |
| return getTextImp("hive"); |
| } |
| |
| void HivePartitioningFunction::print(FILE* ofd, const char* indent, |
| const char* title) const |
| { |
| PartitioningFunction::print(ofd, indent, "HivePartitioningFunction"); |
| } // HivePartitioningFunction::print() |
| |
| PartitioningFunction* |
| HivePartitioningFunction:: |
| createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const |
| { |
| const NAFileSet * fileSet = idesc->getNAFileSet(); |
| const NAColumnArray & allColumns = fileSet->getAllColumns(); |
| const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns(); |
| |
| CollIndex ixColNumber; |
| ValueId keyValueId; |
| ValueIdSet partitioningKey; |
| ValueIdList partitioningKeyList; |
| |
| for (CollIndex i = 0; i < partKeyColumns.entries(); i++) |
| { |
| // which column of the index is this (usually this will be == i) |
| ixColNumber = allColumns.index(partKeyColumns[i]); |
| |
| // insert the value id of the index column into the partitioning |
| // key column value id set |
| keyValueId = idesc->getIndexColumns()[ixColNumber]; |
| partitioningKey += keyValueId; |
| partitioningKeyList.insertAt(i,keyValueId); |
| } // end loop over partitioning key columns |
| |
| // ----------------------------------------------------------------- |
| // Allocate a new HashPartitioningFunction. |
| // ----------------------------------------------------------------- |
| HivePartitioningFunction *partFunc = new(idesc->wHeap()) |
| HivePartitioningFunction (partitioningKey, |
| partitioningKeyList, |
| getCountOfPartitions(), |
| getNodeMap()->copy(idesc->wHeap())); |
| |
| // ----------------------------------------------------------------- |
| // Construct the partitioning key predicates. |
| // ----------------------------------------------------------------- |
| partFunc->createPartitioningKeyPredicates(); |
| |
| return partFunc; |
| |
| } // HivePartitioningFunction::createPartitioningFunctionForIndexDesc() |
| |
| NABoolean HivePartitioningFunction::isAGroupingOf( |
| const PartitioningFunction &other, |
| Lng32* maxPartsPerGroup) const |
| { |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = 1; |
| |
| if ( comparePartKeyToKey(other) == INCOMPATIBLE ) |
| return FALSE; |
| |
| // assume we can repartition a hive table with <m> partitions to |
| // <n> partitions. No question asked. |
| |
| if ( getCountOfPartitions() < other.getCountOfPartitions() && |
| other.getCountOfPartitions() % getCountOfPartitions() != 0 ) |
| return FALSE; |
| |
| if ( other.getCountOfPartitions() < getCountOfPartitions() && |
| getCountOfPartitions() % other.getCountOfPartitions() != 0 ) |
| return FALSE; |
| |
| // This is a grouping of. Set the maxPartsPerGroup and return TRUE. |
| if (maxPartsPerGroup != NULL) |
| *maxPartsPerGroup = other.getCountOfPartitions() / getCountOfPartitions(); |
| |
| return TRUE; |
| |
| } |
| |
| void |
| HivePartitioningFunction::normalizePartitioningKeys(NormWA& normWARef) |
| { |
| HashPartitioningFunction::normalizePartitioningKeys(normWARef); |
| keyColumnList_.normalizeNode(normWARef); |
| |
| // don't normalize original key col list, avoid VEGies which could |
| // cause data type changes. |
| } |
| |