blob: 61163921f5f11ac0b63bdbe2abc139c2efa3f5cf [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
****************************************************************************
*
* File: PartInputDataDesc.cpp
* Description: Data structures related to generating partition input
* tuples for ESPs and partitioned access nodes.
*
* Created: 5/6/98
* Language: C++
*
*
*
*
****************************************************************************
*/
// -----------------------------------------------------------------------
#include "PartInputDataDesc.h"
#include "str.h"
#include "exp_expr.h"
#include "ExpSqlTupp.h"
#include "ExpAtp.h"
#include "ComPackDefs.h"
#include "BaseTypes.h"
// -----------------------------------------------------------------------
// Methods for class ExPartInputDataDesc
// -----------------------------------------------------------------------
ExPartInputDataDesc::ExPartInputDataDesc(ExPartitioningType partType,
ex_cri_desc *partInputCriDesc,
Lng32 partInputDataLength,
Lng32 numPartitions)
: NAVersionedObject(partType)
{
partType_ = partType;
partInputCriDesc_ = partInputCriDesc;
partInputDataLength_ = partInputDataLength;
numPartitions_ = numPartitions;
}
// -----------------------------------------------------------------------
// Methods for class ExHashPartInputData
// -----------------------------------------------------------------------
ExHashPartInputData::ExHashPartInputData(
ex_cri_desc *partInputCriDesc,
Lng32 numPartitions) : ExPartInputDataDesc(HASH_PARTITIONED,
partInputCriDesc,
2*sizeof(Lng32),
numPartitions)
{
}
void ExHashPartInputData::copyPartInputValue(Lng32 fromPartNum,
Lng32 toPartNum,
char *buffer,
Lng32 bufferLength)
{
//ex_assert(bufferLength == 2 * sizeof(long),
// "Hash part intput values are always two 4 byte integers");
str_cpy_all(buffer,(char *) &fromPartNum,sizeof(Lng32));
str_cpy_all(&buffer[sizeof(Lng32)],(char *) &toPartNum,sizeof(Lng32));
}
// -----------------------------------------------------------------------
// Methods for class ExRoundRobinPartInputData
// -----------------------------------------------------------------------
ExRoundRobinPartInputData::
ExRoundRobinPartInputData(ex_cri_desc *partInputCriDesc,
Lng32 numPartitions,
Lng32 numOrigRRPartitions)
: ExPartInputDataDesc(ROUNDROBIN_PARTITIONED,
partInputCriDesc,
2*sizeof(Lng32),
numPartitions),
numOrigRRPartitions_(numOrigRRPartitions)
{
}
void ExRoundRobinPartInputData::copyPartInputValue(Lng32 fromPartNum,
Lng32 toPartNum,
char *buffer,
Lng32 bufferLength)
{
Lng32 scaleFactor = numOrigRRPartitions_/ getNumPartitions();
Lng32 transPoint = numOrigRRPartitions_ % getNumPartitions();
Lng32 loPart;
Lng32 hiPart;
if(fromPartNum < transPoint) {
loPart = fromPartNum * (scaleFactor + 1);
} else {
loPart = (fromPartNum * scaleFactor) + transPoint;
}
if(toPartNum < transPoint) {
hiPart = (toPartNum * (scaleFactor + 1)) + scaleFactor;
} else {
hiPart = (toPartNum * scaleFactor) + scaleFactor + transPoint - 1;
}
str_cpy_all(buffer,(char *) &loPart,sizeof(Lng32));
str_cpy_all(&buffer[sizeof(Lng32)],(char *) &hiPart,sizeof(Lng32));
}
// -----------------------------------------------------------------------
// Methods for class ExRangePartInputData
// -----------------------------------------------------------------------
ExRangePartInputData::ExRangePartInputData(
ex_cri_desc *partInputCriDesc,
Lng32 partInputDataLength,
Lng32 partKeyLength,
Lng32 exclusionIndicatorOffset,
Lng32 numPartitions,
Space *space,
Lng32 useExpressions) : ExPartInputDataDesc(RANGE_PARTITIONED,
partInputCriDesc,
partInputDataLength,
numPartitions)
{
exclusionIndicatorOffset_ = exclusionIndicatorOffset;
exclusionIndicatorLength_ = sizeof(Lng32); // fixed for now
partKeyLength_ = partKeyLength;
alignedPartKeyLength_ = (partKeyLength+7)/8 * 8;
useExpressions_ = useExpressions;
partRangeExprAtp_ = -1; // may be set later
partRangeExprAtpIndex_ = -1; // may be set later
partRangeExprHasBeenEvaluated_ = NOT useExpressions;
if (numPartitions > 0)
{
// allocate space for (numPartitions+1) keys, since n partitions have
// n+1 boundaries, if one counts the ends
Lng32 totalPartRangesLength = alignedPartKeyLength_ * (numPartitions+1);
partRanges_ =
space->allocateAlignedSpace((size_t) totalPartRangesLength);
if (useExpressions_)
{
// Expressions are used at run time to compute the boundaries,
// the creator of this object will later set those expressions
// and (hopefully) set atp and atpindex values. Allocate an
// array of expression pointers, one pointer for each boundary.
partRangeExpressions_ = new(space) ExExprPtr [numPartitions + 1];
// just to be safe, initialize the array with NULLs
for (Lng32 i = 0; i < (numPartitions + 1); i++)
partRangeExpressions_[i] = (ExExprPtrPtr)NULL;
}
else
{
partRangeExpressions_ = (ExExprPtrPtr) NULL;
}
}
else
{
// otherwise, this is a fake partitioning data descriptor
partRanges_ = (NABasicPtr) NULL;
partRangeExpressions_ = (ExExprPtrPtr) NULL;
}
}
ExRangePartInputData::~ExRangePartInputData()
{
}
void ExRangePartInputData::setPartitionStartExpr(Lng32 partNo, ex_expr *expr)
{
//ex_assert(partNo >= 0 AND partNo <= getNumPartitions() AND useExpressions_,
// "Partition expr. number out of range or desc doesn't use exprs");
partRangeExpressions_[partNo] = expr;
}
void ExRangePartInputData::setPartitionStartValue(Lng32 partNo,
char *val)
{
//ex_assert(partNo >= 0 AND partNo <= getNumPartitions(),
// "Partition number out of range");
str_cpy_all(&((char*)partRanges_)[partNo * alignedPartKeyLength_],
val,
partKeyLength_);
}
void ExRangePartInputData::copyPartInputValue(Lng32 fromPartNum,
Lng32 toPartNum,
char *buffer,
Lng32 bufferLength)
{
//ex_assert(fromPartNum >= 0 AND
// fromPartNum <= getNumPartitions() AND
// toPartNum >= 0 AND
// toPartNum <= getNumPartitions() AND
// fromPartNum <= toPartNum AND
// bufferLength >= getPartInputDataLength(),
// "Partition number or buffer length out of range");
//ex_assert(2*partKeyLength_ <= getPartInputDataLength(),
// "Part. input data length must > 2 * key length");
// copy begin key (entry fromPartNum)
str_cpy_all(buffer,
&((char*)partRanges_)[fromPartNum * alignedPartKeyLength_],
partKeyLength_);
// copy end key (entry toPartNum + 1)
str_cpy_all(&buffer[partKeyLength_],
&((char*)partRanges_)[(toPartNum+1) * alignedPartKeyLength_],
partKeyLength_);
// indicate whether the end key is inclusive or exclusive
Lng32 exclusive = (toPartNum < getNumPartitions() - 1);
//ex_assert(exclusionIndicatorLength_ == sizeof(exclusive),
// "Exclusion indicator length must be 4");
str_cpy_all(&buffer[exclusionIndicatorOffset_],
(char *) &exclusive,
sizeof(exclusive));
}
Lng32 ExRangePartInputData::evalExpressions(Space * space,
CollHeap * exHeap,
ComDiagsArea **diags)
{
Lng32 result = 0; // 0 == success
// return if there is no work to do
if (getNumPartitions() == 0 OR partRangeExprHasBeenEvaluated_)
return result;
// Actually need to evaluate all the expressions and store their
// results in the partRanges_ byte array.
// prepare a work atp
atp_struct *workAtp = allocateAtp(getPartInputCriDesc(), space);
tupp_descriptor td;
workAtp->getTupp(partRangeExprAtpIndex_) = &td;
if (workAtp->getDiagsArea() != *diags)
workAtp->setDiagsArea(*diags);
// loop over all expressions, fixing them up and evaluating them
for (Lng32 i = 0; i <= getNumPartitions() AND result == 0; i++)
{
Lng32 offs = i * alignedPartKeyLength_;
workAtp->getTupp(partRangeExprAtpIndex_).setDataPointer
(&((char*)partRanges_)[offs]);
partRangeExpressions_[i]->fixup(0,ex_expr::PCODE_NONE,0,space,exHeap,FALSE,NULL);
if (partRangeExpressions_[i]->eval(workAtp,NULL) == ex_expr::EXPR_ERROR)
result = -1;
}
partRangeExprHasBeenEvaluated_ = (result == 0);
return result;
}
// -----------------------------------------------------------------------
// Methods for class ExHashDistPartInputData
// -----------------------------------------------------------------------
ExHashDistPartInputData::ExHashDistPartInputData(ex_cri_desc *partInputCriDesc,
Lng32 numPartitions,
Lng32 numOrigHashPartitions)
: ExPartInputDataDesc(HASH1_PARTITIONED,
partInputCriDesc,
2*sizeof(Lng32),
numPartitions),
numOrigHashPartitions_(numOrigHashPartitions)
{
}
void ExHashDistPartInputData::copyPartInputValue(Lng32 fromPartNum,
Lng32 toPartNum,
char *buffer,
Lng32 bufferLength)
{
// ex_assert(bufferLength == 2 * sizeof(long),
// "Hash part intput values are always two 4 byte integers");
Lng32 scaleFactor = numOrigHashPartitions_/ getNumPartitions();
Lng32 transPoint = numOrigHashPartitions_ % getNumPartitions();
Lng32 loPart;
Lng32 hiPart;
if(fromPartNum < transPoint) {
loPart = fromPartNum * (scaleFactor + 1);
} else {
loPart = (fromPartNum * scaleFactor) + transPoint;
}
if(toPartNum < transPoint) {
hiPart = (toPartNum * (scaleFactor + 1)) + scaleFactor;
} else {
hiPart = (toPartNum * scaleFactor) + scaleFactor + transPoint - 1;
}
// ex_assert(loPart >= 0 &&
// loPart <= hiPart &&
// hiPart < numOrigHashPartitions_,
// "Hash Dist: invalid input values");
str_cpy_all(buffer,(char *) &loPart,sizeof(Lng32));
str_cpy_all(&buffer[sizeof(Lng32)],(char *) &hiPart,sizeof(Lng32));
}
// -----------------------------------------------------------------------
// Methods for class ExHash2PartInputData
// -----------------------------------------------------------------------
ExHash2PartInputData::ExHash2PartInputData(ex_cri_desc *partInputCriDesc,
Lng32 numPartitions)
: ExPartInputDataDesc(HASH2_PARTITIONED,
partInputCriDesc,
2*sizeof(Lng32),
numPartitions)
{
}
void ExHash2PartInputData::copyPartInputValue(Lng32 fromPartNum,
Lng32 toPartNum,
char *buffer,
Lng32 bufferLength)
{
Int64 numPartitions = (Int64)getNumPartitions();
// For hash2, partition numbers are not passed within the partition input
// values. Instead, the hash boundaries are passed. This allows a single
// ESP to handle tables with different numbers of partitions.
//
// Because the integer math of the hash2 split function causes a rounding
// down, the integer math when determining a hash boundary must round up.
// This explains why " + numPartitions - 1" is seen in the numerator of
// the division below.
ULng32 loHash = (ULng32)((((Int64)fromPartNum << 32)
+ numPartitions - 1) / numPartitions);
// The hiHash value is one less than the hash boundary for the next
// partition number.
ULng32 hiHash = (ULng32)(((((Int64)(toPartNum + 1) << 32)
+ numPartitions - 1) / numPartitions) - 1);
str_cpy_all(buffer,(char *) &loHash,sizeof(Lng32));
str_cpy_all(&buffer[sizeof(Lng32)],(char *) &hiHash,sizeof(Lng32));
}
Long ExPartInputDataDesc::pack(void * space)
{
partInputCriDesc_.pack(space);
return NAVersionedObject::pack(space);
}
Long ExHashPartInputData::pack(void * space)
{
return ExPartInputDataDesc::pack(space);
}
Long ExRoundRobinPartInputData::pack(void * space)
{
return ExPartInputDataDesc::pack(space);
}
Long ExRangePartInputData::pack(void * space)
{
if(useExpressions_)
{
partRangeExpressions_.pack(space,getNumPartitions() + 1);
}
partRanges_.pack(space);
return ExPartInputDataDesc::pack(space);
}
Long ExHashDistPartInputData::pack(void * space)
{
return ExPartInputDataDesc::pack(space);
}
Long ExHash2PartInputData::pack(void * space)
{
return ExPartInputDataDesc::pack(space);
}
Lng32 ExPartInputDataDesc::unpack(void * base, void * reallocator)
{
if(partInputCriDesc_.unpack(base, reallocator)) return -1;
return NAVersionedObject::unpack(base, reallocator);
}
Lng32 ExHashPartInputData::unpack(void * base, void * reallocator)
{
return ExPartInputDataDesc::unpack(base, reallocator);
}
Lng32 ExRoundRobinPartInputData::unpack(void * base, void * reallocator)
{
return ExPartInputDataDesc::unpack(base, reallocator);
}
Lng32 ExRangePartInputData::unpack(void * base, void * reallocator)
{
if(useExpressions_)
{
if(partRangeExpressions_.unpack(base,getNumPartitions() + 1,reallocator)) return -1;
}
if(partRanges_.unpack(base)) return -1;
return ExPartInputDataDesc::unpack(base, reallocator);
}
Lng32 ExHashDistPartInputData::unpack(void * base, void * reallocator)
{
return ExPartInputDataDesc::unpack(base, reallocator);
}
Lng32 ExHash2PartInputData::unpack(void * base, void * reallocator)
{
return ExPartInputDataDesc::unpack(base, reallocator);
}
// -----------------------------------------------------------------------
// This method returns the virtual function table pointer for an object
// with the given class ID; used by NAVersionedObject::driveUnpack().
// -----------------------------------------------------------------------
char *ExPartInputDataDesc::findVTblPtr(short classID)
{
char *vtblPtr;
switch (classID)
{
case HASH_PARTITIONED:
GetVTblPtr(vtblPtr, ExHashPartInputData);
break;
case RANGE_PARTITIONED:
GetVTblPtr(vtblPtr, ExRangePartInputData);
break;
case ROUNDROBIN_PARTITIONED:
GetVTblPtr(vtblPtr, ExRoundRobinPartInputData);
break;
case HASH1_PARTITIONED:
GetVTblPtr(vtblPtr, ExHashDistPartInputData);
break;
case HASH2_PARTITIONED:
GetVTblPtr(vtblPtr, ExHash2PartInputData);
break;
default:
GetVTblPtr(vtblPtr, ExPartInputDataDesc);
break;
}
return vtblPtr;
}
void ExPartInputDataDesc::fixupVTblPtr()
{
char * to_vtbl_ptr = (char *) this;
char * from_vtbl_ptr;
switch (partType_)
{
case HASH_PARTITIONED:
{
ExHashPartInputData partInputDataDesc (NULL,1);
from_vtbl_ptr = (char *)&partInputDataDesc;
str_cpy_all(to_vtbl_ptr, from_vtbl_ptr, sizeof(char *));
}
break;
case RANGE_PARTITIONED:
{
ExRangePartInputData partInputDataDesc (NULL,0,0,0,0,NULL,0);
from_vtbl_ptr = (char *)&partInputDataDesc;
str_cpy_all(to_vtbl_ptr, from_vtbl_ptr, sizeof(char *));
}
break;
case ROUNDROBIN_PARTITIONED:
{
ExRoundRobinPartInputData partInputDataDesc (NULL,1,1);
from_vtbl_ptr = (char *)&partInputDataDesc;
str_cpy_all(to_vtbl_ptr, from_vtbl_ptr, sizeof(char *));
}
break;
case HASH1_PARTITIONED:
{
ExHashDistPartInputData partInputDataDesc (NULL,1,1);
from_vtbl_ptr = (char *)&partInputDataDesc;
str_cpy_all(to_vtbl_ptr, from_vtbl_ptr, sizeof(char *));
}
break;
case HASH2_PARTITIONED:
{
ExHash2PartInputData partInputDataDesc (NULL,1);
from_vtbl_ptr = (char *)&partInputDataDesc;
str_cpy_all(to_vtbl_ptr, from_vtbl_ptr, sizeof(char *));
}
break;
default:
{
// ex_assert(0,"Invalid partitioning type");
}
}
}