blob: 809c0f199288d8fd4e12f324a7105ff93c82ca72 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: RuDupElimLogScanner.cpp
* Description: Implementation of class CRUDupElimLogScanner
*
*
* Created: 07/02/2000
* Language: C++
*
*
*
******************************************************************************
*/
#include "RuDupElimLogScanner.h"
#include "dmresultset.h"
#include "RuDeltaDef.h"
#include "RuDupElimGlobals.h"
//--------------------------------------------------------------------------//
// Constructor and destructor
//--------------------------------------------------------------------------//
CRUDupElimLogScanner::
CRUDupElimLogScanner(const CRUDupElimGlobals &globals,
CRUSQLDynamicStatementContainer &ctrlStmtContainer) :
inherited(
globals,
ctrlStmtContainer,
CRUDupElimConst::NUM_QUERY_STMTS),
// State variables
tupleDesc_(globals.GetNumCKColumns()),
ckStartColumn_(0),
pCurrentStmt_(NULL),
pResultSet_(NULL),
inputBufIndex_(0),
pCurrentRec_(NULL),
pPrevRec_(NULL),
pLowerBoundRec_(NULL),
isEntireDeltaScanned_(FALSE),
ckTag_(0),
// Statistics
statMap_(),
empCheckVector_()
{
for (Int32 i =0; i<IB_SIZE; i++)
{
inputBuf_[i] = NULL;
}
}
CRUDupElimLogScanner::~CRUDupElimLogScanner()
{
for (Int32 i=0; i<IB_SIZE; i++)
{
if (NULL != inputBuf_[i])
{
delete inputBuf_[i];
}
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::Reset()
//
// Reset the state variables before the next phase.
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::Reset()
{
RUASSERT(FALSE == isEntireDeltaScanned_);
// Cache the clustering key value to start from ...
pLowerBoundRec_ = pCurrentRec_;
pCurrentRec_ = NULL;
pPrevRec_ = NULL;
inputBufIndex_ = 0;
ckTag_ = 0;
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::StartScan()
//
// Start executing the query and allocate the resources
//
// SELECT <T_CK columns, control columns> FROM <T-IUD-log>
// WHERE
// (
// @EPOCH = {+/-} begin_epoch
// {AND (col1, col2, ...) >= (?, ?, ..) DIRECTEDBY (ASC/DESC, ...)}
// )
// UNION ALL
// SELECT <T_CK columns, control columns> FROM <T-IUD-log>
// WHERE
// (
// @EPOCH = {+/-} begin_epoch+1
// {AND (col1, col2, ...) >= (?, ?, ..) DIRECTEDBY (ASC/DESC, ...)}
// )
// ...
// UNION ALL
// SELECT <T_CK columns, control columns> FROM <T-IUD-log>
// WHERE
// (
// @EPOCH = {+/-} end_epoch
// {AND (col1, col2, ...) >= (?, ?, ..) DIRECTEDBY (ASC/DESC, ...)}
// )
//
// ORDER BY <T_CK columns>, @TS
// FOR BROWSE ACCESS;
//
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::StartScan(Int32 phase)
{
RUASSERT(NULL == pCurrentStmt_ && NULL == pResultSet_);
if (0 == phase)
{
pCurrentStmt_ = GetPreparedStatement(CRUDupElimConst::PHASE_0_QUERY);
}
else
{
RUASSERT(phase > 0);
// The same statement for each phase starting from 1
pCurrentStmt_ = GetPreparedStatement(CRUDupElimConst::PHASE_1_QUERY);
// Set the parameters for the lower bound ...
CopyLowerBoundParams();
}
// Start the execution ...
pCurrentStmt_->ExecuteQuery();
pResultSet_ = pCurrentStmt_->GetResultSet();
if (0 == phase)
{
// Allocate the buffer space and setup the output descriptors
SetupScan();
}
else
{
// Nothing. Both queries have the same result set's structure.
// Hence, we need not re-allocate the buffers.
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::Fetch()
//
// Retrieve the next row from the delta, if there is one,
// and fetch its data into the next position in the cyclic buffer.
//
// Update the record's tags (for future use by the resolvers)
// and the delta statistics.
//
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::Fetch()
{
// Start the cyclic shift ...
pPrevRec_ = pCurrentRec_;
if (TRUE == pResultSet_->Next())
{
// End the cyclic shift ...
pCurrentRec_ = inputBuf_[inputBufIndex_];
// ... and retrieve the new data
pCurrentRec_->Build(*pResultSet_, ckStartColumn_);
// Move the pointer inside the buffer
inputBufIndex_ = (inputBufIndex_+1) % IB_SIZE;
if (NULL == pPrevRec_
||
pCurrentRec_->GetCKTuple() != pPrevRec_->GetCKTuple())
{
// A new distinct CK value has been encountered
ckTag_++;
}
pCurrentRec_->SetCKTag(ckTag_);
UpdateStatistics();
}
else
{
// End of input reached !
isEntireDeltaScanned_ = TRUE;
pCurrentStmt_->Close();
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::EndScan()
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::EndScan()
{
RUASSERT(
NULL != pCurrentStmt_
&&
NULL != pResultSet_);
pCurrentStmt_->Close();
pCurrentStmt_ = NULL;
pResultSet_ = NULL;
}
//--------------------------------------------------------------------------//
// PRIVATE AREA
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::SetupScan()
//
// When the *first* scan is started, initialize the tuple descriptors
// and allocate the input buffer space
//
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::SetupScan()
{
const CRUDupElimGlobals &globals = GetDupElimGlobals();
Int32 i;
// Which index do the CK columns start from in the tuple?
// Count from 1 + control columns + syskey
ckStartColumn_ = globals.GetNumCtrlColumns()+2;
// Retrieve the tuple's descriptors ...
Lng32 len = tupleDesc_.GetLength();
for (i=0; i < len; i++)
{
Int32 colIndex = i + ckStartColumn_;
#pragma nowarn(1506) // warning elimination
tupleDesc_.GetItemDesc(i).Build(*pResultSet_, colIndex);
#pragma warn(1506) // warning elimination
}
// Allocate space for input buffer
Int32 updateBmpSize = globals.GetUpdateBmpSize();
for (i =0; i<IB_SIZE; i++)
{
inputBuf_[i] = new CRUIUDLogRecord(tupleDesc_, updateBmpSize);
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::UpdateStatistics()
//
// Update the statistics for each MV that will observe
// the current log record, i.e., each MV that has
//
// MV.EPOCH[T] >= this-record-epoch
//
// For this, iterate through all the epochs in the emptiness
// check vector, and pick the relevant structures in the hash.
//
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::UpdateStatistics()
{
CRUEmpCheckVector::Elem *pElem;
CRUEmpCheckVecIterator it(empCheckVector_);
for (;NULL != (pElem = it.GetCurrentElem()); it.Next())
{
TInt32 ep = pElem->epoch_;
if (pCurrentRec_->GetEpoch() < ep)
{
// This MV has observed me already
continue;
}
// The statistics structure in the hash
CRUDeltaStatistics &stat = statMap_[ep];
if (FALSE == pCurrentRec_->IsSingleRowOp())
{
if (FALSE == pCurrentRec_->IsBeginRange())
{
UpdateRangeStatistics(stat);
}
}
else
{
UpdateSingleRowStatistics(stat);
}
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::UpdateRangeStatistics()
//
// Update the statistics entry from a range record.
//
/// Summarize the number of ranges and the number of rows covered by them.
// If at least one range record without a range size estimate is
// encountered (i.e., @RANGE_SIZE = -1), then the whole statistics record
// will remain without the estimate (no matter what other records are
// encountered).
//
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::UpdateRangeStatistics(CRUDeltaStatistics &stat)
{
stat.IncrementCounter(stat.nRanges_);
if (CRUDeltaStatistics::RANGE_SIZE_UNKNOWN == stat.nRangeCoveredRows_)
{
// We have already seen one range without an estimate
return;
}
TInt32 rangeSize = pCurrentRec_->GetRangeSize();
if (CRUDeltaStatistics::RANGE_SIZE_UNKNOWN == rangeSize)
{
stat.nRangeCoveredRows_ = CRUDeltaStatistics::RANGE_SIZE_UNKNOWN;
}
else
{
stat.IncrementCounter(stat.nRangeCoveredRows_, rangeSize);
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::UpdateSingleRowStatistics()
//
// Update the statistics entry from a single-row record
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::UpdateSingleRowStatistics(CRUDeltaStatistics &stat)
{
RUASSERT (TRUE == GetDupElimGlobals().IsSingleRowResolv());
if (FALSE == pCurrentRec_->IsPartOfUpdate())
{
if (TRUE == pCurrentRec_->IsInsert())
{
stat.IncrementCounter(stat.nInsertedRows_);
}
else
{
stat.IncrementCounter(stat.nDeletedRows_);
}
}
else
{
stat.IncrementCounter(stat.nUpdatedRows_);
// Compute the superposition of update bitmaps
CRUUpdateBitmap bmp(*(pCurrentRec_->GetUpdateBitmap()));
if (NULL == stat.pUpdateBitmap_)
{
stat.pUpdateBitmap_ = new CRUUpdateBitmap(bmp);
}
else
{
*(stat.pUpdateBitmap_) |= bmp;
}
}
}
//--------------------------------------------------------------------------//
// CRUDupElimLogScanner::CopyLowerBoundParams()
//
// Set the lower-bound parameters to the phase 1 query statement.
// The lower bound is the maximum clustering key value reached by the
// previous phase.
//
// The query statement is composed of the variable number of blocks.
// The lower bound tuple values must be passed as parameters to every
// block of the query.
//
//--------------------------------------------------------------------------//
void CRUDupElimLogScanner::CopyLowerBoundParams()
{
RUASSERT(NULL != pLowerBoundRec_);
Int32 tupleDescLen = tupleDesc_.GetLength();
// Compute the number of the query's parameters
Lng32 nParams = pCurrentStmt_->GetParamCount();
// There is an integral number of blocks
RUASSERT(nParams > 0 && 0 == nParams % tupleDescLen);
Int32 nBlocks = nParams/tupleDescLen;
for (Int32 i=0; i<nBlocks; i++)
{
pLowerBoundRec_->
CopyCKTupleValuesToParams(*pCurrentStmt_, 1+i*tupleDescLen);
}
}