blob: 5d557c3fd2f13b7c6916e0234f914b28393206fa [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: SortTopN.cpp
*
* Description: This file contains the implementation of all member functions
* of the class TopN.
*
* 1. Sort would initially maintain Top N array of elements to being with.
* 2. Read records into TopN array.
* 3. Once TopN array is full, heapify the array into max heap. Top node in
* the heap is always the highest node.
* 4. Subsequent record read either gets discarded( if greater than top node)
* or replace top node( if lesser then top node) . if replaced top node,
* re-balance the heap.
* 5. Repeat steps 4 until last record is read.
* 6. sort the final heap using heap sort.
*******************************************************************************/
#include <iostream>
#include <fstream>
#ifndef DEBUG
#undef NDEBUG
#define NDEBUG
#endif
#include "ex_stdh.h"
#include "SortTopN.h"
#include "ScratchSpace.h"
#include "logmxevent.h"
#include "SortUtil.h"
#include "ex_ex.h"
#include "ExStats.h"
//------------------------------------------------------------------------
// Class Constructor.
//------------------------------------------------------------------------
SortTopN::SortTopN(ULng32 runsize, ULng32 sortmaxmem, ULng32 recsize,
NABoolean doNotallocRec, ULng32 keysize,
SortScratchSpace* scratch, NABoolean iterSort,
CollHeap* heap, SortError* sorterror, Lng32 explainNodeId, ExBMOStats *bmoStats, SortUtil* sortutil):
SortAlgo(runsize, recsize, doNotallocRec, keysize, scratch, explainNodeId, bmoStats),
loopIndex_(0), heap_(heap), sortError_(sorterror),
sortUtil_(sortutil)
{
//runsize is TopN size. Fixed.
allocRunSize_ = runsize;
//Actual run size after all elements read.
runSize_ = 0;
isHeapified_ = FALSE;
topNKeys_ = (RecKeyBuffer *) new (heap_) BYTE[sizeof(RecKeyBuffer) * allocRunSize_];
// Below asserts useful in debug mode.
ex_assert(topNKeys_ != NULL, "Sort: Initial topNKeys_ allocation failed");
recNum_ = 0;
if (bmoStats_)
bmoStats_->updateBMOHeapUsage((NAHeap *)heap_);
}
SortTopN::~SortTopN(void)
{
if (topNKeys_ != NULL)
{
//No need to release the tupps here
//since these tupps are consumed by
//parent operators and released.
//for (int i = 0; i < runSize_; i++)
//topNKeys_[i].rec_->releaseTupp();
NADELETEBASIC(topNKeys_, heap_);
topNKeys_ = NULL;
}
if (bmoStats_)
bmoStats_->updateBMOHeapUsage((NAHeap *)heap_);
}
Lng32 SortTopN::sortSend(void *rec, ULng32 len, void* tupp)
{
//if heap not built means, TopN array has more slots
//available to fill.
if(! isHeapified_)
{
ex_assert(loopIndex_ >= 0, "TopN::sortSend: loopIndex_ is < 0");
ex_assert(loopIndex_ < allocRunSize_, "TopN::sortSend: loopIndex_ > allocRunSize_");
ex_assert(rec != NULL, "TopN::sortSend: rec is NULL");
Record * newRec = new (heap_)Record(rec, len, tupp, heap_, sortError_);
topNKeys_[loopIndex_].key_ = newRec->extractKey(keySize_,
sortUtil_->config()->numberOfBytesForRecordSize());
topNKeys_[loopIndex_].rec_ = newRec;
if (++loopIndex_ == allocRunSize_)
{
//Reaching here means, we have filled up the array.
//Now heapify the array to start accepting/eliminating new elements from now on.
//Note lookIndex_ contains the current number of filled elements.
buildHeap();
}
if (bmoStats_)
bmoStats_->updateBMOHeapUsage((NAHeap *)heap_);
return SORT_SUCCESS;
}
//Reaching here means, heap is already built.
//Check if the new rec belongs to this heap by comparing the
//new rec key with the root node of the heap ( root node is always the greatest).
insertRec(rec, len, tupp);
return SORT_SUCCESS;
}
void SortTopN::buildHeap()
{
if(!isHeapified_)
{
//loopIndex_ is now <= TopN
runSize_ = loopIndex_;
satisfyHeap();
isHeapified_ = TRUE;
}
}
//Satisfy Heap makes sure the heap is balanced maxHeap.
//Note this does not mean heap is sorted. It just makes sure
//the higher level nodes are greater than lower level nodes.
//Topmost node or root will be the highest.
void SortTopN::satisfyHeap()
{
//nothing to do if zero or one record.
if(runSize_ <= 1)
return;
for (int i = (runSize_/2 ); i >= 0; i--)
siftDown(topNKeys_, i, runSize_-1);
}
void SortTopN::insertRec(void *rec, ULng32 len, void* tupp)
{
ex_assert(isHeapified_, "TopN::insertRec: not heapified");
int root = 0; //Always, root node is the zero'th element in array.
Record * newrec = new (heap_)Record(rec, len, tupp, heap_, sortError_);
insertRecKey_.key_ = newrec->extractKey(keySize_,
sortUtil_->config()->numberOfBytesForRecordSize());
insertRecKey_.rec_ = newrec;
if (compare(topNKeys_[root].key_ ,insertRecKey_.key_) == KEY1_IS_GREATER)
{
swap( &topNKeys_[root],&insertRecKey_);
//After swap, satisfy or rebalance the heap.
satisfyHeap();
}
//Once swapped, or not swapped, delete the unneeded record.
//This step is very important to discard the unwanted record.
//Note releaseTupp() also deletes the tupp allocated in sql
//buffer. This makes space for new records read in.
insertRecKey_.rec_->releaseTupp();
NADELETEBASIC(insertRecKey_.rec_, heap_);
}
Lng32 SortTopN::sortSendEnd()
{
Lng32 retcode = SORT_SUCCESS;
ex_assert(loopIndex_ >= 0, "TopN::sortSendEnd: loopIndex_ is < 0");
buildHeap();
sortHeap();
return retcode;
}
//----------------------------------------------------------------------
// Name : sortHeap
//
//
// Description : The heap is already balanced. This step sorts the heap.
//----------------------------------------------------------------------
void SortTopN::sortHeap()
{
//nothing to do if zero or one record.
if(runSize_ <= 1)
return;
for (int i = runSize_-1; i >= 1; i--)
{
swap(&topNKeys_[0],&topNKeys_[i]);
siftDown(topNKeys_, 0, i-1);
}
}
Lng32 SortTopN::sortReceive(void *rec, ULng32& len)
{
//This method applicable to overflow records only
return SORT_FAILURE;
}
Lng32 SortTopN::sortReceive(void*& rec, ULng32& len, void*& tupp)
{
if (recNum_ < runSize_)
{
topNKeys_[recNum_].rec_->getRecordTupp(rec, recSize_, tupp);
len = recSize_;
recNum_++;
}
else
{
len = 0;
}
return SORT_SUCCESS;
}
//----------------------------------------------------------------------
// Name : siftDown
//
// Parameters : ..
//
// Description : Given the root node,rebalances the heap.Child nodes are less
// value than parent nodes. Top most node or root contains
// highest value.
//
//
//----------------------------------------------------------------------
void SortTopN::siftDown(RecKeyBuffer keysToSort[], Int64 root, Int64 bottom)
{
Int64 done, maxChild;
done = 0;
while ((root*2 <= bottom) && (!done))
{
if (root*2 == bottom)
maxChild = root * 2;
else if (compare(keysToSort[root * 2].key_ ,
keysToSort[root * 2 + 1].key_) >= KEY1_IS_GREATER)
maxChild = root * 2;
else
maxChild = root * 2 + 1;
if (compare(keysToSort[root].key_ ,keysToSort[maxChild].key_) <=KEY1_IS_SMALLER)
{
swap( &keysToSort[root],&keysToSort[maxChild]);
root = maxChild;
}
else
done = 1;
}
}
//----------------------------------------------------------------------
// Name : swap
//
// Parameters : ..
//
// Description : Swaps two elements from the QuickSort workspace. May
// consider making this inline rather than a seperate
// procedure call for performance reasons.
//
// Recompareturn Value :
// SORT_SUCCESS if everything goes on well.
// SORT_FAILURE if any error encounterd.
//
//----------------------------------------------------------------------
NABoolean SortTopN::swap(RecKeyBuffer* recKeyOne, RecKeyBuffer* recKeyTwo)
{
char* tempKey;
Record* tempRec;
tempKey = recKeyOne->key_;
tempRec = recKeyOne->rec_;
recKeyOne->key_ = recKeyTwo->key_;
recKeyOne->rec_ = recKeyTwo->rec_;
recKeyTwo->key_ = tempKey;
recKeyTwo->rec_ = tempRec;
return SORT_SUCCESS;
}
UInt32 SortTopN::getOverheadPerRecord(void)
{
return (sizeof(RecKeyBuffer) + sizeof(Record));
}