blob: c987002e26ded70dfba20ff3a9094355695e14a4 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
#ifndef CLUSTER_H
#define CLUSTER_H
/* -*-C++-*-
*****************************************************************************
*
* File: cluster.h
* Description: Cluster of a hash table for hash grby / hash join
* (a cluster is a collection of hash buckets that are
* stored in the same overflow file, if needed)
* Created: 8/14/96
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include "hash_table.h"
#include "CommonStructs.h"
#include "ScratchSpace.h"
#include "ex_exe_stmt_globals.h"
#define END_OF_BUF_LIST ((ULng32) -1)
// To be used as the limits on input size estimates by HJ, HGB
#define MAX_INPUT_SIZE 0x200000000LL
#define MIN_INPUT_SIZE 100000LL
class Cluster;
class ExClusterStats;
class Bucket;
class IOTimer;
class ExBMOStats;
class MemoryMonitor;
#include "HashBufferHeader.h"
#include "ExpError.h"
#include "NAMemory.h"
// forward declarations
class ExScratchFileOptions;
class ClusterDB;
class HashBuffer;
class HashBufferSerial;
/////////////////////////////////////////////////////////////////////////////
// The HashBuffer is written to temporary file
/////////////////////////////////////////////////////////////////////////////
class HashBuffer : public NABasicObject {
friend class HashBufferSerial;
public:
HashBuffer (Cluster * cluster);
// A constructor for cases when the HashBuffer is used without a
// Cluster object.
// Currently, this is only used by the UniqueHashJoin (ExUniqueHashJoinTcb).
//
HashBuffer (ULng32 bufferSize,
ULng32 rowSize,
NABoolean useVariableLength,
CollHeap *heap,
ClusterDB * clusterDb,
ExeErrorCode * rc);
~HashBuffer();
inline ULng32 getMaxRowLength() const;
inline HashBuffer * getPrev() const;
inline void setPrev(HashBuffer * prev);
inline HashBuffer * getNext() const;
inline void setNext(HashBuffer * next);
inline char * getDataPointer() const;
inline void deallocateData(CollHeap * heap) ;
inline ULng32 getRowCount() const;
inline HashBufferHeader * getHeader() const;
void init(Cluster * cluster);
inline char * getFirstRow() const;
// Does this buffer contain variable length or fixed length rows.
inline NABoolean isVariableLength() const;
inline NABoolean considerBufferDefrag() const;
// Cast this instance to a pointer to a HashBufferSerial in order to
// use the Serial Interface
inline HashBufferSerial *castToSerial() const;
inline ULng32 getFreeSpace() const
{
return freeSpace_;
}
private:
Cluster * cluster_; // the cluster to which the buffer belongs
ULng32 bufferSize_; // the size of the buffer
ULng32 maxRowLength_; // allocated size of a row
NABoolean isVariableLength_; // Are the rows in this buffer variable length?
NABoolean considerBufferDefrag_;
char * rows_; // points to row[0]
ULng32 maxNumFullRowsSer_; // the number of full rows that will fit in buffer
ULng32 freeSpace_; // amount of space left in buffer
char * currRow_; // used for scanning the buffer
char * nextAvailRow_; // pointer to next row to allocate
HashBuffer * next_; // pointer to next buffer in memory
HashBuffer * prev_; // pointer to previous buffer in memory
CollHeap * heap_ ; // used when there is no cluster_
union {
char * data_; // pointer to the buffer itself
HashBufferHeader * header_; // another interpretation of the first bytes
}; // in the buffer
};
/////////////////////////////////////////////////////////////////////////////
// inline functions of HashBufferHeader
/////////////////////////////////////////////////////////////////////////////
inline ULng32 HashBuffer::getMaxRowLength() const {
return maxRowLength_;
};
inline HashBuffer * HashBuffer::getPrev() const {
return prev_;
};
inline void HashBuffer::setPrev(HashBuffer * prev) {
prev_ = prev;
};
inline HashBuffer * HashBuffer::getNext() const {
return next_;
};
inline void HashBuffer::setNext(HashBuffer * next) {
next_ = next;
};
inline char * HashBuffer::getDataPointer() const {
return data_;
};
inline void HashBuffer::deallocateData(CollHeap * heap) {
if ( data_ ) heap->deallocateMemory(data_);
data_ = NULL;
};
// Access members of Header.
inline ULng32 HashBuffer::getRowCount() const {
return header_->getRowCount();
};
// Does this buffer contain variable length or fixed length rows.
inline NABoolean HashBuffer::isVariableLength() const {
return isVariableLength_;
};
inline NABoolean HashBuffer::considerBufferDefrag() const {
return considerBufferDefrag_;
};
inline char *HashBuffer::getFirstRow() const
{
return rows_;
};
inline HashBufferHeader * HashBuffer::getHeader() const{
return header_;
};
// Cast this instance to a pointer to a HashBufferSerial in order to
// use the Serial Interface
inline HashBufferSerial *HashBuffer::castToSerial() const {
return (HashBufferSerial *)this;
};
// HashBufferSerial - This class contains the methods for the Serial
// Interface to the HashBuffer class. Using this interface, the
// caller can access rows in the buffer one after the other in series,
// but cannot randomly access any given row based on a row index.
// Currently used by HashJoin and HashGroupBy.
//
class HashBufferSerial : public HashBuffer {
private:
// Constuctors and Destuctor are private - They are never used.
HashBufferSerial (Cluster * cluster);
// A constructor for cases when the HashBuffer is used without a
// Cluster object.
// Currently, this is only used by the UniqueHashJoin (ExUniqueHashJoinTcb).
//
HashBufferSerial (ULng32 bufferSize,
ULng32 rowSize,
NABoolean useVariableLength,
CollHeap *heap,
ExeErrorCode * rc);
~HashBufferSerial() {};
public:
// The serial interface to the HashBuffers. Rows can be accessed in
// order (series) from the start of the buffer to the end. Rows cannot be
// accessed randomly (via a row index or number)
inline void setRowCount(ULng32 cnt);
// Set rowcount to 0 and reset buffer to be empty.
inline void clearRowCount();
// Methods dealing with how many rows can fit in a buffer
// Get the estimated number of rows per buffer. It is an estimate
// since the rows can be variable length
inline ULng32 getMaxNumFullRows() const;
// Recalculate the number of rows that can be allocated from this buffer.
inline ULng32 getRemainingNumFullRows() const;
// Is this buffer full?
inline NABoolean notFull() const;
// Methods for accessing rows in series based on an internal cursor (currRow_)
inline void positionOnFirstRow();
// return the next row in the buffer
inline HashRow *getCurrentRowAndAdvance();
inline HashRow *getCurrentRow();
// Methods for accessing rows in series based on an external cursor (the previous row)
// Return a pointer to the first row,
inline HashRow *getFirstRow() const;
// Get the next row after 'currentRow' based on the length of currentRow
inline HashRow *getNextRow(HashRow *currentRow) const;
// Allocate a row of size maxRowLength_ from this HashBuffer
inline HashRow *getFreeRow();
inline HashRow *getFreeRow(ULng32 spaceNeeded);
// Resize the last row allocated from this HashBuffer
inline void resizeLastRow(ULng32 newSize, HashRow *dataPointer);
// Get row length. If varLen, get length from row. If fixed, based
// on maxRowLength_
inline ULng32 getRowLength(HashRow *row) const;
// Set the row length in the row. If row is fixed length, this method does nothing
inline void setRowLength(HashRow *row, UInt32 len);
void print() const;
void fixup();
};
// If this buffer has rows, but the internal data members indicate
// that it is empty, reset the data members to the correct
// positions.
inline void HashBufferSerial::fixup() {
// Fixup the internal data members if they are in an inconsistent state
if (getRowCount() > 0 && nextAvailRow_ == rows_) {
if (isVariableLength()) {
nextAvailRow_ = rows_;
for(ULng32 i = 0; i < getRowCount(); i++) {
HashRow *nextRow = (HashRow *)nextAvailRow_;
nextRow->checkEye();
nextAvailRow_ = ((char *)nextRow + nextRow->getRowLength() + sizeof(HashRow));
}
} else {
nextAvailRow_ = rows_ + (getRowCount() * maxRowLength_);
}
freeSpace_ = (data_ + bufferSize_) - nextAvailRow_;
}
}
inline void HashBufferSerial::setRowCount(ULng32 cnt) {
header_->setRowCount(cnt);
// Initialize nextAvailRow, fixup will set it to the right value based on cnt.
nextAvailRow_ = rows_;
// Fixup the internal data members after changing the row count
fixup();
};
// Set rowcount to 0 and reset buffer to be empty.
inline void HashBufferSerial::clearRowCount() {
header_->setRowCount(0);
freeSpace_ = bufferSize_ - ROUND8(sizeof(HashBufferHeader));
nextAvailRow_ = rows_;
};
// The serial interface to the HashBuffers. Rows can be accessed in
// order (series) from the start of the buffer to the end. Rows cannot be
// accessed randomly (via a row index or number)
// Methods dealing with how many rows can fit in a buffer
// Get the estimated number of rows per buffer. It is an estimate
// since the rows can be variable length
inline ULng32 HashBufferSerial::getMaxNumFullRows() const {
return maxNumFullRowsSer_;
};
// Recalculate the number of rows that can be allocated from this buffer.
inline ULng32 HashBufferSerial::getRemainingNumFullRows() const {
return (freeSpace_ / maxRowLength_);
};
// Is this buffer full?
inline NABoolean HashBufferSerial::notFull() const
{
return (freeSpace_ >= maxRowLength_);
};
// Methods for accessing rows in series based on an internal cursor (currRow_)
inline void HashBufferSerial::positionOnFirstRow()
{
currRow_ = (char *)getFirstRow();
};
// return a pointer to the current row and advance the current row cursor.
inline HashRow *HashBufferSerial::getCurrentRowAndAdvance()
{
// The case of an empty buffer is also handled below because at the first
// call for this buffer currRow_ == rows_ == nextAvailRow_
if(currRow_ >= nextAvailRow_) {
return NULL;
} else {
HashRow *nextRow = (HashRow *)currRow_;
currRow_ = (char *)getNextRow(nextRow);
if(nextRow) nextRow->checkEye();
return nextRow;
}
};
// return a pointer to the current row and do not advance the current row cursor.
inline HashRow *HashBufferSerial::getCurrentRow()
{
if(currRow_ >= nextAvailRow_)
{
return NULL;
}
else
{
return (HashRow *)currRow_;
}
};
// Methods for accessing rows in series based on an external cursor (the previous row)
// Return a pointer to the first row,
inline HashRow *HashBufferSerial::getFirstRow() const
{
return (HashRow *)rows_;
};
// Return a pointer to the next row
// The value of currentRow, must point to the current row
// of the buffer
inline HashRow *HashBufferSerial::getNextRow(HashRow *currentRow) const
{
if(!currentRow) {
return NULL;
}
char *row = NULL;
if (isVariableLength()) {
row = ((char *)currentRow + currentRow->getRowLength() + sizeof(HashRow));
} else {
row = ((char *)currentRow + maxRowLength_);
}
if(row >= nextAvailRow_) {
return NULL;
} else {
((HashRow *)row)->checkEye();
return (HashRow *)row;
}
};
// Allocate a row of size maxRowLength_ from this HashBuffer
inline HashRow *HashBufferSerial::getFreeRow() {
if (freeSpace_ >= maxRowLength_) {
// we have space, "allocate" a row
HashRow *nextRow = (HashRow *)nextAvailRow_;
nextAvailRow_ = nextAvailRow_ + maxRowLength_;
freeSpace_ -= maxRowLength_;
header_->incRowCount();
nextRow->setEye();
return nextRow;
} else {
return NULL;
}
};
// Allocate a row of size a given length from this HashBuffer
inline HashRow *HashBufferSerial::getFreeRow(ULng32 spaceNeeded)
{
spaceNeeded = ROUND4(spaceNeeded);
if (freeSpace_ >= spaceNeeded)
{
// we have space, "allocate" a row
HashRow *nextRow = (HashRow *)nextAvailRow_;
nextAvailRow_ = nextAvailRow_ + spaceNeeded;
freeSpace_ -= spaceNeeded;
header_->incRowCount();
nextRow->setEye();
return nextRow;
}
else
{
return NULL;
}
};
// Resize the last row allocated from this HashBuffer
inline void HashBufferSerial::resizeLastRow(ULng32 newSize, HashRow *dataPointer)
{
dataPointer->checkEye();
if (isVariableLength()) {
UInt32 size = newSize + sizeof(HashRow);
UInt32 oldSize = nextAvailRow_ - (char *)dataPointer;
if (size <= maxRowLength_ && oldSize == maxRowLength_) {
nextAvailRow_ = ((char *)dataPointer) + size;
freeSpace_ = (data_ + bufferSize_) - nextAvailRow_;
} else {
// Trying to make row larger. This is not expected.
// return false indicating an error.
ex_assert( 0 , "Trying to resize a row to a larger size");
}
} else {
// Nothing to do for fixed length rows.
}
dataPointer->checkEye();
}
// Get row length. If varLen, get length from row. If fixed, based
// on maxRowLength_
inline ULng32 HashBufferSerial::getRowLength(HashRow *row) const {
row->checkEye();
if (isVariableLength()) {
ULng32 len = row->getRowLength();
ex_assert(len <= maxRowLength_ - sizeof(HashRow), "Bad rowlength in HashBuffer");
return len;
} else {
return maxRowLength_ - sizeof(HashRow);
}
}
// Set the row length in the row. If row is fixed length, this method does nothing
inline void HashBufferSerial::setRowLength(HashRow *row, UInt32 len) {
row->checkEye();
if (isVariableLength()) {
ex_assert(len <= maxRowLength_ - sizeof(HashRow), "Bad rowlength in HashBuffer");
row->setRowLength(len);
}
row->checkEye();
}
/////////////////////////////////////////////////////////////////////////////
// A Hash Bucket. Several Buckets might share a Cluster
/////////////////////////////////////////////////////////////////////////////
class Bucket {
public:
Bucket();
~Bucket() {};
void init();
inline ULng32 getRowCount() const;
inline Cluster * getInnerCluster () const;
inline Cluster * getOuterCluster () const;
inline NABoolean insert (atp_struct * newEntry,
ex_expr * moveExpr,
SimpleHashValue hashValue,
ExeErrorCode * rc,
NABoolean skipMemoryCheck = FALSE);
inline void setInnerCluster (Cluster * innerCluster);
inline void setOuterCluster (Cluster * outerCluster);
private:
Cluster * innerCluster_; // corresponding inner Cluster
Cluster * outerCluster_; // corresponding outer Cluster
ULng32 rowCount_; // # of inner Cluster rows in this bucket
};
/////////////////////////////////////////////////////////////////////////////
// inline functions for Bucket
/////////////////////////////////////////////////////////////////////////////
inline ULng32 Bucket::getRowCount() const {
return rowCount_;
};
inline Cluster * Bucket::getInnerCluster () const {
return innerCluster_;
};
inline Cluster * Bucket::getOuterCluster () const {
return outerCluster_;
};
inline void Bucket::setInnerCluster (Cluster * innerCluster) {
innerCluster_ = innerCluster;
};
inline void Bucket::setOuterCluster (Cluster * outerCluster) {
outerCluster_ = outerCluster;
};
/////////////////////////////////////////////////////////////////////////////
// The ClusterDB (cluster description block) contains all information
// common to all clusters.
/////////////////////////////////////////////////////////////////////////////
class ClusterDB : public NABasicObject {
friend class Cluster;
friend class HashBuffer;
friend class ExSequenceTcb;
public:
enum HashOperator {
HASH_JOIN,
HYBRID_HASH_JOIN,
ORDERED_HASH_JOIN,
UNIQUE_HASH_JOIN,
HASH_GROUP_BY,
CROSS_PRODUCT,
SEQUENCE_OLAP // not a hash operator; but uses ClusterDB and Cluster
};
ClusterDB (HashOperator hashOperator,
ULng32 bufferSize,
atp_struct * workAtp,
Lng32 explainNodeId,
short hashTableRowAtpIndex1,
short hashTableRowAtpIndex2,
ex_expr * searchExpr,
Bucket * buckets,
ULng32 bucketCount,
ULng32 availableMemory,
MemoryMonitor * memMonitor,
short pressureThreshold,
ExExeStmtGlobals * stmtGlobals,
ExeErrorCode * rc,
NABoolean noOverFlow = FALSE,
NABoolean isPartialGroupBy = FALSE,
unsigned short minBuffersToFlush = 1,
ULng32 numInBatch = 0,
UInt16 forceOverflowEvery = FALSE,
UInt16 forceHashLoopAfterNumBuffers = FALSE,
UInt16 forceClusterSplitAfterMB = FALSE,
ExSubtask *ioEventHandler_ = NULL,
ex_tcb * callingTcb_ = NULL,
UInt16 scratchThresholdPct_ = 10,
NABoolean doLog = FALSE,
NABoolean bufferedWrites = FALSE,
NABoolean disableCmpHintsOverflow = TRUE,
ULng32 memoryQuotaMB = 0, // i.e. no limit
ULng32 minMemoryQuotaMB = 0, // don't go below this min
ULng32 minMemBeforePressureCheck = 0, // min before check
Float32 bmoCitizenshipFactor = 0,
Int32 pMemoryContingencyMB = 0,
Float32 estimateErrorPenalty = 0,
Float32 hashMemEstInKBPerNode = 0,
ULng32 initialHashTableSize = 0, // default not resizable
ExOperStats * hashOperStats = NULL
);
~ClusterDB();
inline Cluster * getClusterList() const;
inline void setClusterList(Cluster * clusterList);
inline Cluster * getClusterToFlush() const;
inline void setClusterToFlush(Cluster * cluster);
void setBMOMaxMemThresholdMB(UInt16 mm)
{ bmoMaxMemThresholdMB_ = mm; }
inline Cluster * getClusterToProbe() const;
inline void setClusterToProbe(Cluster * cluster);
inline Cluster * getClusterToRead() const;
inline void setClusterToRead(Cluster * cluster);
NABoolean setOuterClusterToRead(Cluster * cluster, ExeErrorCode * rc);
inline Cluster * getClusterReturnRightRows() const;
inline void setClusterReturnRightRows(Cluster * cluster);
inline NABoolean isHashLoop() const { return hashLoop_; }
inline Int64 getMemoryHWM() const;
inline ULng32 getBufferSize() const { return bufferSize_; }
void setScratchIOVectorSize(Int16 vectorSize)
{
scratchIOVectorSize_ = vectorSize;
}
Int32 getScratchIOVectorSize() { return scratchIOVectorSize_;}
void setScratchOverflowMode(ScratchOverflowMode ovMode)
{ overFlowMode_ = ovMode;}
ScratchOverflowMode getScratchOverflowMode(void)
{ return overFlowMode_;}
void chooseClusterToFlush(Cluster * lastResort);
HashScratchSpace *getScratch() const;
inline NABoolean sawPressure() const { return sawPressure_; }
inline Int64 totalPhase3TimeNoHL() { return totalPhase3TimeNoHL_; }
inline Int64 maxPhase3Time() { return maxPhase3Time_; }
inline Int64 minPhase3Time() { return minPhase3Time_; }
inline ULng32 numClustersNoHashLoop()
{ return numClustersNoHashLoop_; }
void updatePhase3Time(Int64 someClusterTime);
// Yield all the memory quota allocated
void yieldAllMemoryQuota();
void YieldQuota(UInt32 memNeeded); // internal routine
// Check if not enough free quota, else yield excess
NABoolean checkQuotaOrYield(UInt32 numFlushed, UInt32 maxSizeMB);
// Yield un-needed memory quota
void yieldUnusedMemoryQuota(Cluster * theOFList = NULL,
ULng32 extraBuffers = 1);
ULng32 memoryQuotaMB() { return memoryQuotaMB_ ; }
// each cluster overflows into another sequence; sometimes a cluster needs
// several sequences (after cluster-split, or for DISTINCT HGB)
UInt32 generateSequenceID() { return ++sequenceGenerator_; }
void getScratchErrorDetail(Lng32 &scratchError,
Lng32 &scratchSysError,
Lng32 &scratchSysErrorDetail,
char *errorMsg);
static ULng32 roundUpToPrime(ULng32 noOfClusters);
NABoolean enoughMemory(ULng32 size, NABoolean checkCompilerHints = FALSE);
private:
void updateMemoryStats();
void updateIOStats();
HashOperator hashOperator_; // indicates the type of the tcb
// using this clusterDB
NABoolean hashLoop_; // are we in a hash loop (PHASE_3)
NABoolean isPartialGroupBy_; // for hash grouping
NABoolean noOverFlow_; // if true, we do not overflow
// if set, all memory related info is
// ignored and we always try to allocate
// new buffers in main memory. If the
// allocation is not sucessfull, we die
ULng32 bufferSize_; // global size of hash buffers
NAHeap * bufferHeap_; // keep the I/O buffers for all the
// clusters in a separate heap. This
// should help to reduce fragmentation.
atp_struct * workAtp_; // global work Atp
Lng32 explainNodeId_; // add to any EMS RT messages.
// the following three data members are only used for hash join and hash
// table materialize. They are required for sorted insert of a row into
// a hash chain. HGB does not need this, because every row (group) is unique
short hashTableRowAtpIndex1_; // index into atp for serach and move expr
short hashTableRowAtpIndex2_;
ex_expr * searchExpr_; // to insert a row in the hash table
Bucket * buckets_; // global Buckets array
ULng32 bucketCount_; // total number of buckets
ULng32 availableMemory_; // for the HJ (in bytes)
ULng32 memoryUsed_; // main memory used for buffers and HT
// this is different from Cluster.totalClusterSize_. memoryUsed_ tells us how
// much main memory the operator uses right now. Cluster.totalClusterSize_
// tells us the size of a cluster!
ExOperStats * hashOperStats_; // stats for this hash operator
ULng32 totalIOCnt_; // total IO ( # buffers read or writen )
MemoryMonitor * memMonitor_; // for dynamic memory management
short pressureThreshold_;
NABoolean sawPressure_; // set when we see pressure the
// first time
ExExeStmtGlobals * stmtGlobals_; // for dynamic memory allocation
ULng32 maxClusterSize_; // in bytes
Cluster * clusterToFlush_; // cluster which is to be flushed
Cluster * clusterToProbe_; // cluster which is to be probed
Cluster * clusterToRead_; // cluster which is to be read
Cluster * clusterReturnRightRows_; // cluster which is to be read
Cluster * clusterList_; // list of all clusters
HashScratchSpace * tempFile_; // for overflow handling
ExSubtask *ioEventHandler_;
ex_tcb * callingTcb_;
UInt16 scratchThresholdPct_;
UInt32 sequenceGenerator_; // to generate unique IDs for clusters
HashBuffer * outerReadBuffers_ ; // buffers for reading an outer cluster
// for testing/regressions only !!
UInt16 forceOverflowEvery_; // deny memory after every so many requests
UInt16 forceOverflowCounter_; // dynamic counter to track when to deny
UInt16 forceHashLoopAfterNumBuffers_; // After #buffers were read from o/f
UInt16 forceClusterSplitAfterMB_; // split after cluster reached such size
NABoolean doLog_; // log to EMS major overflow events
NABoolean bufferedWrites_; // use bufferred writes
NABoolean disableCmpHintsOverflow_;
ULng32 memoryQuotaMB_ ; // can't use more than that (0 = no limit)
ULng32 minMemoryQuotaMB_ ; // don't go below the minimum
ULng32 minMemBeforePressureCheck_; // no pressure/hints checks below that
Float32 bmoCitizenshipFactor_;
Int32 pMemoryContingencyMB_;
Float32 estimateErrorPenalty_;
Float32 hashMemEstInKBPerNode_;
Int64 totalPhase3TimeNoHL_;
Int64 maxPhase3Time_;
Int64 minPhase3Time_;
ULng32 numClustersNoHashLoop_;
ULng32 bmoMaxMemThresholdMB_;
NABoolean earlyOverflowStarted_; // if o/f based on compiler hints
// These fields are used to ensure that #buckets and #hash-table-entries
// have no common prime factors (to make even use of the hash table entries)
NABoolean evenFactor_; // # buckets is even (i.e., Hash-Join)
ULng32 primeFactor_; // # buckets (divided by buckets-per-cluster)
// used by HGB only - initial HT size to be resized up as needed
ULng32 initialHashTableSize_;
// The num of concurrent IOs is used as a minimum for the number of
// buffers to keep for a flushed cluster before it is flushed again
// (to improve IO of sequential buffers)
unsigned short minBuffersToFlush_;
unsigned short minNumWriteOuterBatch_;
unsigned short maxNumWriteOuterBatch_;
unsigned short numReadOuterBatch_;
Int32 scratchIOVectorSize_;
ScratchOverflowMode overFlowMode_;
ExBMOStats *bmoStats_;
};
/////////////////////////////////////////////////////////////////////////////
// inline functions of ClusterDB
/////////////////////////////////////////////////////////////////////////////
inline Cluster * ClusterDB::getClusterList() const {
return clusterList_;
};
inline void ClusterDB::setClusterList(Cluster * clusterList) {
clusterList_ = clusterList;
};
inline Cluster * ClusterDB::getClusterToFlush() const {
return clusterToFlush_;
};
inline Cluster * ClusterDB::getClusterToProbe() const {
return clusterToProbe_;
};
inline void ClusterDB::setClusterToProbe(Cluster * cluster) {
clusterToProbe_ = cluster;
};
inline Cluster * ClusterDB::getClusterToRead() const {
return clusterToRead_;
};
inline void ClusterDB::setClusterToRead(Cluster * cluster) {
clusterToRead_ = cluster;
};
inline Cluster * ClusterDB::getClusterReturnRightRows() const {
return clusterReturnRightRows_;
};
inline void ClusterDB::setClusterReturnRightRows(Cluster * cluster) {
clusterReturnRightRows_ = cluster;
};
inline HashScratchSpace * ClusterDB::getScratch() const {
return tempFile_;
};
/////////////////////////////////////////////////////////////////////////////
// ClusterBitMap is used for left joins with a hash loop or for right joins
/////////////////////////////////////////////////////////////////////////////
class ClusterBitMap : public NABasicObject {
public:
ClusterBitMap(ULng32 size, ExeErrorCode * rc);
~ClusterBitMap();
void setBit(ULng32 bitIndex);
NABoolean testBit(ULng32 bitIndex);
private:
ULng32 size_; // size of bitmap (# of bits)
char * bitMap_; // the bitmap itself
};
/////////////////////////////////////////////////////////////////////////////
// a Cluster is the unit of overflow handling
/////////////////////////////////////////////////////////////////////////////
class Cluster : public NABasicObject {
friend class ClusterDB;
friend class HashBuffer;
friend class ExSequenceTcb;
public:
enum ClusterState {
FLUSHED, // cluster is on temporary file, max 1 buffer in memory
IN_MEMORY, // all buffers of the cluster are in main memory
CHAINED // cluster is IN_MEMORY and rows are chained into
// hash table
};
Cluster(ClusterState state,
ClusterDB * clusterDb,
Bucket * buckets,
ULng32 bucketCount, // for ths cluster
ULng32 rowLength,
NABoolean useVariableLength,
NABoolean considerBufferDefrag,
short insertAtpIndex,
NABoolean isInner, // is this an inner Cluster?
NABoolean bitMapEnable, // Enable bitmap allocation for
// left outer join and semi join
//
// A bitmap is allocated for the
// outer cluster in Phase 3
// when the inner cluster
// cannot be read in memory
// in full.
Cluster * next, // next cluster in list
ExeErrorCode *rc,
HashBuffer * bufferPool = NULL); // an already existing buffer
~Cluster();
inline ClusterState getState() const;
inline ULng32 getRowsInBuffer() const;
inline ULng32 getRowCount() const;
inline ULng32 getReadCount() const;
inline void incReadCount();
inline NABoolean endOfCluster();
inline NABoolean hasBitMap();
void updateBitMap();
NABoolean testBitMap();
// (The three methods below are only for Non-Aggregate Hash-Groupby)
inline HashRow *getLastDataPointer() { return lastDataPointer_; }
inline NABoolean returnOverflowRows() { return returnOverflowRows_; }
inline void initializeReadPosition();
inline NABoolean bitMapEnable() const;
inline NABoolean isInner() const;
inline Cluster * getNext() const;
Cluster * getOuterCluster() const { return buckets_->getOuterCluster(); };
inline void setOuterCluster(Cluster * outerCluster);
HashTable * getHashTable() const { return hashTable_; };
inline HashBuffer * getFirstBuffer() const { return bufferPool_; };
inline void setNext(Cluster * next);
void positionOnFirstRowInBuffer()
{ // position on the first row in main memory
scanPosition_ = bufferPool_;
scanPosition_->castToSerial()->positionOnFirstRow();
};
// remove the hash table from the cluster
void removeHashTable();
// insert a new row into the cluster. If insert() was not sucessful, it
// returns FALSE. In this case we have to flush a cluster. The only reason
// for insert() not to be sucessful is, if we could not t allocate a
// new buffer for this cluster. In case of a partial group by, we do not
// flush a cluster, instead, we return the current row as a partial group.
NABoolean insert(atp_struct * newEntry,
ex_expr * moveExpr,
SimpleHashValue hashValue,
ExeErrorCode * rc,
NABoolean skipMemoryCheck = FALSE);
void positionOnFirstRowInFirstOuterBuffer()
{ // position on the first row in the first outer buffer in memory
for ( HashBuffer * tmpB = clusterDb_->outerReadBuffers_; tmpB ; tmpB = tmpB->getNext() )
tmpB->castToSerial()->fixup();
scanPosition_ = clusterDb_->outerReadBuffers_ ;
scanPosition_->castToSerial()->positionOnFirstRow();
};
// flush the Cluster. If the flushing is not complete yet (I/Os pending)
// flush() returns FALSE. Otherwise TRUE.
NABoolean flush(ExeErrorCode * rc);
// spill a cluster. Spill is the same as flush. The only difference is,
// that a spilled cluster still has a hash table (this is only used for
// hash group by)
NABoolean spill(ExeErrorCode * rc,
NABoolean noAggregates = FALSE );
// read the Cluster. If the reading is not complete yet (I/Os pending)
// read() returns FALSE. Otherwise TRUE. If the Cluster is an inner
// Cluster read() reads as many buffers as possible. If it is an outer
// Cluster, read() reads just one bufffer.
NABoolean read(ExeErrorCode * rc);
// create a hash table for this Cluster and chain all rows of the Cluster
// into this hash table.
NABoolean chain(NABoolean forceChaining, ExeErrorCode * rc);
// set up an outer Cluster for this inner Cluster (if required).
ExeErrorCode setUpOuter(ULng32 rowLength,
short atpIndex,
NABoolean bitMapEnable);
// initialize the global ClusterDB scratch file if needed, and set it for
// this cluster. Also create a new read handle.
NABoolean initScratch(ExeErrorCode * rc);
// reset the cluster pair (inner and outer) for the next iteration
// of a hash loop
void resetForHashLoop();
ULng32 numLoops()
{ return numLoops_; } // if hash loop - how many times so far (logging)
Int64 startTimePhase3() { return startTimePhase3_; };
void setStartTimePhase3(Int64 theTime) { startTimePhase3_ = theTime ; };
// set up a cluster stats entry in heap and return it
ExClusterStats* getStats(CollHeap* heap);
// release all buffers of the Cluster
void releaseAllHashBuffers();
// return next row from buffer
HashRow * advance();
HashRow * getCurrentRow();
// Report the size (in bytes) of this cluster
inline Int64 clusterSize() {return totalClusterSize_; } // total size (bytes)
// Internal utility: Calculate memory and create a hash table
HashTable * createHashTable(UInt32 memForHashTable,
ExeErrorCode * rc,
// skip mem check for HGB; it starts at min HT
NABoolean checkMemory = FALSE );
inline char * getDefragBuffer()
{
return defragBuffer_;
}
private:
// split a Cluster into two. Adjust buckets accordingly.
NABoolean checkAndSplit(ExeErrorCode * rc) ;
HashBuffer * lastSqueezedBuffer_ ; // last buffer "squeezed"
NABoolean dontSplit_; // don't split this cluster (e.g., it's skewed)
// Normally every cluster keeps its sequence ID in seqID_[0]. But for HJ
// after every split, the cluster gets a new sequence ID (like a stack
// seqID_[1}, and last seqID_[2]). HGB DISTINCT uses seqID_[1] when it needs
// to keep seperately 2 sequences, of returned and not-returned rows.
UInt32 seqID_[3]; // 3 = log( bucketsPerCluster , 2 ) + 1
UInt16 seqIDIndex_; // current seq-ID (between 0 and the top of the stack)
UInt16 maxSeqIDIndex_; // the max seq-ID, at the top of the stack
UInt16 batchCountDown_; // count buffers in a batch (for split only)
HashBuffer * keepRecentBuffer_; // don't flush most recent (not full) buffer
// return the memory size for building a hash table of given entry count
ULng32 getMemorySizeForHashTable(ULng32 entryCount);
ClusterState state_;
ClusterDB * clusterDb_;
Bucket * buckets_; // the buckets of this Cluster
ULng32 bucketCount_; // # of buckets of the Cluster
ULng32 rowLength_;
NABoolean useVariableLength_;
NABoolean considerBufferDefrag_;
short insertAtpIndex_;
NABoolean isInner_;
NABoolean bitMapEnable_;
HashScratchSpace * tempFile_;
ClusterPassBack * readHandle_;
Cluster * next_; // next Cluster in list
NABoolean ioPending_; // this Cluster has an I/O in flight
Int64 totalClusterSize_; // total size of this cluster (bytes)
ClusterBitMap * outerBitMap_;
HashTable * hashTable_;
ULng32 rowCount_; // # of rows stored in this cluster
ULng32 readCount_; // for inner: number of rows read in
// this loop of a hash loop
// (== rowCount_ if no hash loop)
// for outer: total number of rows
// read
ULng32 writeIOCnt_; // counters for overflow I/Os
ULng32 readIOCnt_;
HashBuffer * bufferPool_; // first buffer of a potential list
ULng32 numInMemBuffers_; // number of in-memory buffers for this cluster
HashBuffer * scanPosition_; // for scanning all buffers
// for concurrent writing of multiple buffers, need to know which is next
HashBuffer * nextBufferToFlush_;
// For OLAP - when reading buffers from overflow for "bounded following",
// where Cluster::read() may need to jump back to first buffer in the list.
HashBuffer * firstBufferInList_;
// Usually NULL; only OLAP sets this pointer at the last flush (for that
// partition) to avoid flushing the remaining buffers.
HashBuffer * afterLastBufferToFlush_;
// for concurrent reading of multiple buffers, need to know who is next
HashBuffer * nextBufferToRead_;
// Usually NULL; only OLAP with unbounded following that starts from a
// bounded following needs to set this pointer.
HashBuffer * afterLastBufferToRead_;
// keep hash-buffer location where the most recent row was inserted
// (Used for non-aggr hash groupby overflow, to return this row immediately)
HashRow *lastDataPointer_ ;
NABoolean returnOverflowRows_;
ULng32 numLoops_; // if hash loop - how many times so far (logging)
Int64 startTimePhase3_; // the time this cluster started phase 3 (logging)
NABoolean flushMe_ ; // make this cluster a candidate for a flush
// Internal methods
NABoolean setHashLoop(ExeErrorCode * rc);
NABoolean squeezeOutRowsOfOtherClusters(ExeErrorCode * rc);
inline NABoolean IONotReady(ExeErrorCode * rc,
UInt32 readSeqID = 0, // non-zero only for read
NABoolean checkAll = FALSE );
inline NABoolean IONotComplete(ExeErrorCode * rc)
{ return IONotReady(rc, 0, TRUE); };
NABoolean completeCurrentRead_ ; // synch buffers read so far
Int16 buffersRead_ ;
char * defragBuffer_;
};
/////////////////////////////////////////////////////////////////////////////
// inline functions of Cluster
/////////////////////////////////////////////////////////////////////////////
inline Cluster::ClusterState Cluster::getState() const {
return state_;
};
// get the number of rows of the Cluster which are in the buffer in main
// memory
inline ULng32 Cluster::getRowsInBuffer() const {
return bufferPool_->getRowCount();
};
// get the total number of rows in the Cluster
inline ULng32 Cluster::getRowCount() const {
return rowCount_;
};
inline ULng32 Cluster::getReadCount() const {
return readCount_;
};
inline void Cluster::incReadCount() {
readCount_++;
};
inline NABoolean Cluster::hasBitMap() {
return (outerBitMap_ != NULL);
};
inline void Cluster::initializeReadPosition() {
// start reading from the earlier written list of buffers (whose seq-ID
// was pushed an index up, because we always finish reading at index 0)
seqIDIndex_ = maxSeqIDIndex_ ; // update the max
}
inline NABoolean Cluster::endOfCluster() {
// we always finish reading with the seq at index 0
if ( ! maxSeqIDIndex_ ) // there was no (cluster split or) HGB distinct o/f
return seqIDIndex_ == 0 && readHandle_->endOfSequence() ;
ex_assert( !isInner_ , "Can not handle a cluster that was split");
// below: only for HGB distinct
// Check returnOverflowRows_ because this variable indicates that some read
// has completed at sequence index 0. Otherwise the rest of the condition is
// also true before we start reading that sequence.
return returnOverflowRows_ &&
seqIDIndex_ == 0 && readHandle_->endOfSequence() ;
};
inline NABoolean Cluster::IONotReady(ExeErrorCode * rc,
UInt32 readSeqID,
NABoolean checkAll )
{
RESULT IOStatus = checkAll ? tempFile_->checkIOAll() : // all opens free ?
!readSeqID ? tempFile_->checkIOWrite() : // any open available for write ?
tempFile_->checkIORead(readHandle_, readSeqID ); // any open for read ?
// are all/any previous I/Os still in flight, and we can not issue a new I/O ?
if ( IOStatus != IO_COMPLETE ) {
if ( IOStatus == OTHER_ERROR || IOStatus == SCRATCH_FAILURE ) {
// if it's the first read when there was no prior write -- that's OK
if ( readSeqID && readHandle_->endOfSequence() )
return FALSE; // the code after call to readThru would handle that case
*rc = EXE_SORT_ERROR; // it's an error
}
// else the I/O is not complete yet; go back to the scheduler
return TRUE;
}
return FALSE; // IO is ready
}
inline NABoolean Cluster::bitMapEnable() const {
return bitMapEnable_;
};
inline NABoolean Cluster::isInner() const {
return isInner_;
};
inline Cluster * Cluster::getNext() const {
return next_;
};
inline void Cluster::setOuterCluster(Cluster * outerCluster) {
// mark the given cluster as an outer for this (inner) cluster
if (outerCluster)
outerCluster->isInner_ = FALSE;
// set all references in the buckets
for (ULng32 i = 0; i < bucketCount_; i++)
buckets_[i].setOuterCluster(outerCluster);
};
inline void Cluster::setNext(Cluster * next) {
next_ = next;
};
// This ClusterDB method needs to follow the reclaration of the class Cluster
inline void ClusterDB::setClusterToFlush(Cluster * cluster) {
clusterToFlush_ = cluster;
if ( ! cluster ) return; // just performed a reset
// start with the first (also most recent) buffer in chain
clusterToFlush_->nextBufferToFlush_ = clusterToFlush_->bufferPool_ ;
// when cluster is selected explicitly -- flush all the buffers (incl. last)
clusterToFlush_->keepRecentBuffer_ = NULL;
};
// The Bucket::insert is just a wrapper over Cluster::insert that also
// updates the per-Bucket rows counter
// (This declaration needs to follow the declaration of class Cluster.)
inline NABoolean Bucket::insert(atp_struct * newEntry,
ex_expr * moveExpr,
SimpleHashValue hashValue,
ExeErrorCode * rc,
NABoolean skipMemoryCheck) {
NABoolean isInserted = innerCluster_->insert(newEntry,
moveExpr,
hashValue,
rc,
skipMemoryCheck);
if (isInserted)
rowCount_++;
return isInserted;
};
class IOTimer
{
public:
IOTimer () ;
void resetTimer(); // re-zero the elapsed time (not needed on first use)
NABoolean startTimer(); // do nothing, return FALSE is already started
Int64 endTimer(); // Return elapsed time.
private:
NABoolean ioStarted_;
Int64 startTime_;
Int64 accumTime_;
};
inline IOTimer::IOTimer () :
ioStarted_(FALSE) ,
startTime_(0),
accumTime_(0)
{};
#endif