blob: 343d8d2466e76420642c23f3394926f238498508 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: RuRefreshTaskExecutor.cpp
* Description: Implementation of class CRURefreshTaskExecutor.
*
*
* Created: 04/06/2000
* Language: C++
*
*
*
******************************************************************************
*/
#include "dsstringlist.h"
#include "uofsIpcMessageTranslator.h"
#include "uofsSystem.h"
#include "RuRefreshTaskExecutor.h"
#include "RuRefreshTask.h"
#include "RuRefreshSQLComposer.h"
#include "RuMV.h"
#include "RuTbl.h"
#include "RuDeltaDef.h"
#include "RuForceOptions.h"
#include "RuJournal.h"
#include "RuKeyColumn.h"
#include "dmconnection.h"
//--------------------------------------------------------------------------//
// Constructor
//--------------------------------------------------------------------------//
CRURefreshTaskExecutor::CRURefreshTaskExecutor(CRUTask *pParentTask) :
inherited(pParentTask),
rootMVName_(""),
rootMVSchema_(""),
rootMVCatalog_(""),
rootMVUID_(0),
rootMVType_(CDDObject::eRECOMPUTE),
isRecompute_(FALSE),
numOfStmtInContainer_(0),
pRefreshTEDynamicContainer_(NULL),
forceFlags_(NO_FORCE),
tableLockProtocol_(NULL)
{}
CRURefreshTaskExecutor::~CRURefreshTaskExecutor()
{
delete pRefreshTEDynamicContainer_;
if (NULL != tableLockProtocol_)
delete tableLockProtocol_;
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::StoreRequest()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRURefreshTaskExecutor::
StoreRequest(CUOFsIpcMessageTranslator &translator)
{
inherited::StoreRequest(translator);
translator.WriteBlock(&isRecompute_,sizeof(BOOL));
Int32 stringSize = rootMVName_.GetLength() + 1;
translator.WriteBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.WriteBlock(rootMVName_.c_string(), stringSize);
#pragma warn(1506) // warning elimination
stringSize = rootMVSchema_.GetLength() + 1;
translator.WriteBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.WriteBlock(rootMVSchema_.c_string(), stringSize);
#pragma warn(1506) // warning elimination
stringSize = rootMVCatalog_.GetLength() + 1;
translator.WriteBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.WriteBlock(rootMVCatalog_.c_string(), stringSize);
#pragma warn(1506) // warning elimination
translator.WriteBlock(&rootMVUID_, sizeof(TInt64));
translator.WriteBlock(&rootMVType_, sizeof(Int32));
translator.WriteBlock(&forceFlags_, sizeof(TInt32));
if (NO_FORCE != forceFlags_)
{
translator.WriteBlock(&numOfStmtInContainer_, sizeof(TInt32));
// Handle refresh executor sql dynamic container
pRefreshTEDynamicContainer_->StoreData(translator);
}
// Handle lock table dynamic container
short lockTable;
if (NULL != tableLockProtocol_ )
{
lockTable = 1;
translator.WriteBlock(&lockTable, sizeof(short));
tableLockProtocol_->StoreData(translator);
}
else
{
lockTable = 0;
translator.WriteBlock(&lockTable, sizeof(short));
}
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::StoreReply()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRURefreshTaskExecutor::
StoreReply(CUOFsIpcMessageTranslator &translator)
{
inherited::StoreReply(translator);
translator.WriteBlock(&isRecompute_,sizeof(BOOL));
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::LoadRequest()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRURefreshTaskExecutor::
LoadRequest(CUOFsIpcMessageTranslator &translator)
{
inherited::LoadRequest(translator);
translator.ReadBlock(&isRecompute_,sizeof(BOOL));
Int32 stringSize;
char buffer[CUOFsIpcMessageTranslator::MaxMsgSize];
translator.ReadBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.ReadBlock(buffer, stringSize);
#pragma warn(1506) // warning elimination
rootMVName_ = CDSString(buffer);
translator.ReadBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.ReadBlock(buffer, stringSize);
#pragma warn(1506) // warning elimination
rootMVSchema_ = CDSString(buffer);
translator.ReadBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.ReadBlock(buffer, stringSize);
#pragma warn(1506) // warning elimination
rootMVCatalog_ = CDSString(buffer);
translator.ReadBlock(&rootMVUID_, sizeof(TInt64));
translator.ReadBlock(&rootMVType_, sizeof(Int32));
translator.ReadBlock(&forceFlags_, sizeof(TInt32));
if (NO_FORCE != forceFlags_)
{
translator.ReadBlock(&numOfStmtInContainer_, sizeof(TInt32));
pRefreshTEDynamicContainer_ =
#pragma nowarn(1506) // warning elimination
new CRUSQLDynamicStatementContainer(numOfStmtInContainer_);
#pragma warn(1506) // warning elimination
// Handle refresh executor sql dynamic container
pRefreshTEDynamicContainer_->LoadData(translator);
}
// Handle lock table dynamic container
short lockTable;
translator.ReadBlock(&lockTable, sizeof(short));
if (1 == lockTable)
{
tableLockProtocol_ = new CRUTableLockProtocol();
tableLockProtocol_->LoadData(translator);
}
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::LoadReply()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRURefreshTaskExecutor::
LoadReply(CUOFsIpcMessageTranslator &translator)
{
inherited::LoadReply(translator);
translator.ReadBlock(&isRecompute_,sizeof(BOOL));
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::Init()
//
// Task executor initializiation. It may be that the executor
// does not have any work to do and will skip execution
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::Init()
{
inherited::Init();
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
SetRecompute(rootMV.WillBeRecomputed());
rootMVName_ = rootMV.GetFullName();
rootMVSchema_ = rootMV.GetCatName() + "." + rootMV.GetSchName();
rootMVCatalog_ = rootMV.GetCatName();
rootMVType_ = rootMV.GetRefreshType();
rootMVUID_ = rootMV.GetUID();
// Runtime consistency check.
// Check requirement that the regular table's deltas
// are empty (if they must be).
CheckSingleDeltaRestriction();
ComposeMySql();
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ComposeMySql()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ComposeMySql()
{
CRURefreshSQLComposer myComposer(GetRefreshTask());
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
numOfStmtInContainer_ = rootMV.GetTablesUsedByMe().GetCount() + FIRST_TBL_STAT;
pRefreshTEDynamicContainer_ =
#pragma nowarn(1506) // warning elimination
new CRUSQLDynamicStatementContainer(numOfStmtInContainer_);
#pragma warn(1506) // warning elimination
ComposeForceStatements();
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ComposeForceStatements()
//
// The Refresh SQL container contains ONLY Control SQL statement.
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ComposeForceStatements()
{
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
const CRUMVForceOptions* pForceOption = rootMV.GetMVForceOption();
CRURefreshSQLComposer myComposer(GetRefreshTask());
// Deal with explain data
// LCOV_EXCL_START :rfi
if (NULL != pForceOption &&
CRUForceOptions::EXPLAIN_ON == pForceOption->GetExplainOption())
{
forceFlags_ = forceFlags_ | FORCE_SHOW_EXPLAIN;
myComposer.ComposeShowExplain();
pRefreshTEDynamicContainer_->SetStatementText(SHOW_EXPLAIN, myComposer.GetSQL());
}
// LCOV_EXCL_STOP
ComposeCQSStatements();
ComposeMDAMStatements();
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ComposeMDAMStatements()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ComposeMDAMStatements()
{
BOOL hasActiveCQS = FALSE;
CRURefreshSQLComposer myComposer(GetRefreshTask());
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
const CRUMVForceOptions* pForceOption = rootMV.GetMVForceOption();
// Even if there are no force options, we still force mdam for tables
// with more than two columns.
ComposeControlTableStmtForUsedTables();
// MV base tables force options
// LCOV_EXCL_START :rfi
if (NULL != pForceOption &&
CRUForceOptions::MDAM_NO_FORCE != pForceOption->GetMDAMoption())
{
// Compose CONTROL TABLE mv_name MDAM option
myComposer.ComposeCntrlTableMDAMText(pForceOption->GetMDAMoption(),
&rootMV.GetFullName());
pRefreshTEDynamicContainer_->SetStatementText(MV_MDAM_STAT, myComposer.GetSQL());
forceFlags_ |= FORCE_MV_MDAM;
}
// Compose reset MDAMs
if (forceFlags_ & FORCE_MV_MDAM || forceFlags_ & FORCE_TABLE_MDAM)
{
// Compose CONTROL TABLE * MDAM 'RESET'
myComposer.ComposeResetCntrlTableMDAMText();
pRefreshTEDynamicContainer_->SetStatementText(RESET_MDAM_STAT, myComposer.GetSQL());
}
// LCOV_EXCL_STOP
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ComposeCQSStatements()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ComposeCQSStatements()
{
BOOL hasActiveCQS = FALSE;
CRURefreshSQLComposer myComposer(GetRefreshTask());
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
const CRUMVForceOptions* pForceOption = rootMV.GetMVForceOption();
if (NULL == pForceOption)
{
return;
}
// MV force options
// LCOV_EXCL_START :rfi
if ((CRUForceOptions::GB_NO_FORCE != pForceOption->GetGroupByoption() ||
CRUForceOptions::JOIN_NO_FORCE != pForceOption->GetJoinoption()))
{
// Compose Control query shape for internal refresh stmt
myComposer.ComposeControlQueryShape();
pRefreshTEDynamicContainer_->SetStatementText(CQS_STAT, myComposer.GetSQL());
forceFlags_ = forceFlags_ | FORCE_CQS;
hasActiveCQS = TRUE;
}
// Deal with a specific user CQS
if (FALSE == pForceOption->GetCQSStatment().IsEmpty())
{
pRefreshTEDynamicContainer_->SetStatementText
(USER_CQS_FORCE, pForceOption->GetCQSStatment());
forceFlags_ = forceFlags_ | FORCE_USER_CQS;
hasActiveCQS = TRUE;
}
if (TRUE == hasActiveCQS)
{
myComposer.ComposeControlQueryShapeCut();
pRefreshTEDynamicContainer_->SetStatementText(CQS_CUT_STAT, myComposer.GetSQL());
}
// LCOV_EXCL_STOP
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ComposeControlTableStmtForUsedTables()
//
// Compose the Control MDAM option for all base tables for forced tables
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :rfi
void CRURefreshTaskExecutor::ComposeControlTableStmtForUsedTables()
{
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
const CRUMVForceOptions* pForceOption = rootMV.GetMVForceOption();
CRUForceOptions::MdamOptions mdamOpt = CRUForceOptions::MDAM_NO_FORCE;
Int32 stmtIndex = FIRST_TBL_STAT;
if (NULL != pForceOption)
{
VerifyForceTblListCorrectness();
if (pForceOption->IsTableStarUsed())
{
mdamOpt = pForceOption->GetTableStarOption();
}
}
CRUTblList &tblList = rootMV.GetTablesUsedByMe();
DSListPosition pos = tblList.GetHeadPosition();
while (NULL != pos)
{
CRUTbl *pTbl = tblList.GetNext(pos);
RUASSERT(NULL != pTbl);
ComposeControlTableStmtForUsedTable(*pTbl,stmtIndex,mdamOpt);
}
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ComposeControlTableStmtForUsedTables()
//
// Compose the Control MDAM option for a single table for forced tables
//
// For table with more then two columns and non empty range log the default
// is to force MDAM
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ComposeControlTableStmtForUsedTable(CRUTbl &tbl,
Int32 &stmtIndex,
CRUForceOptions::MdamOptions mdamOpt)
{
CRURefreshSQLComposer myComposer(GetRefreshTask());
if (CRUForceOptions::MDAM_NO_FORCE == mdamOpt)
{
CRUTableForceOptions *forceOpt = GetForceOptionForTable(tbl.GetFullName());
if (NULL == forceOpt)
{
mdamOpt = GetDefaultMdamOptForTable(tbl);
}
else
{
mdamOpt = forceOpt->GetMdamOptions();
}
}
if (CRUForceOptions::MDAM_NO_FORCE != mdamOpt)
{
// Compose CONTROL TABLE table_name MDAM option
myComposer.ComposeCntrlTableMDAMText(mdamOpt, &(tbl.GetFullName()));
pRefreshTEDynamicContainer_->SetStatementText
#pragma nowarn(1506) // warning elimination
(stmtIndex++, myComposer.GetSQL());
#pragma warn(1506) // warning elimination
forceFlags_ |= FORCE_TABLE_MDAM;
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::GetDefaultMdamOptForTable()
//
// If the table has more then two key columns and the range log is not empty
// then mdam must be forced on the table unless the user forced otherwise
//--------------------------------------------------------------------------//
CRUForceOptions::MdamOptions CRURefreshTaskExecutor::GetDefaultMdamOptForTable(CRUTbl &tbl)
{
Int32 numTableKeyCol = tbl.GetKeyColumnList().GetCount();
if (numTableKeyCol < 2)
{
return CRUForceOptions::MDAM_NO_FORCE;
}
if (FALSE == GetRefreshTask()->IsSingleDeltaRefresh())
{
return CRUForceOptions::MDAM_NO_FORCE;
}
const CRUDeltaDef *pDdef = GetRootMV().GetDeltaDefByUid(tbl.GetUID());
if (NULL != pDdef && TRUE == pDdef->isRangeLogNonEmpty_)
{
return CRUForceOptions::MDAM_ON;
}
return CRUForceOptions::MDAM_NO_FORCE;
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::GetForceOptionForTable()
//--------------------------------------------------------------------------//
CRUTableForceOptions* CRURefreshTaskExecutor::GetForceOptionForTable(const CDSString &name)
{
CRUMV &rootMV = GetRefreshTask()->GetRootMV();
const CRUMVForceOptions* pForceOption = rootMV.GetMVForceOption();
if (NULL == pForceOption)
{
return NULL;
}
const CRUTableForceOptionsList &tblList = pForceOption->GetTableForceList();
DSListPosition pos = tblList.GetHeadPosition();
while (NULL != pos)
{
CRUTableForceOptions* forceOpt = tblList.GetNext(pos);
if (name == forceOpt->GetFullName())
{
return forceOpt;
}
}
return NULL;
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::VerifyForceTblListCorrectness()
//
// Check that the user did not refered to a un-existing base table
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::VerifyForceTblListCorrectness()
{
const CRUMVForceOptions &forceOption =
*GetRefreshTask()->GetRootMV().GetMVForceOption();
const CRUTableForceOptionsList &tblList = forceOption.GetTableForceList();
DSListPosition pos = tblList.GetHeadPosition();
while (NULL != pos)
{
CRUTableForceOptions* forceOpt = tblList.GetNext(pos);
CRUTbl *pTbl = GetRefreshTask()->GetRootMV().GetUsedTableByName(forceOpt->GetFullName());
if(NULL == pTbl)
{
CRUException ex;
ex.SetError(IDS_RU_FORCE_FILE_TABLE_NOT_EXISTS);
ex.AddArgument(GetRefreshTask()->GetRootMV().GetFullName());
ex.AddArgument(forceOpt->GetFullName());
throw ex;
}
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::FinalMetadataUpdate()
//
// After the refresh is complete, update the metadata
// of the refreshed MV(s) in the SMD and UMD tables.
//
// (1) If the root MV is an ON REQUEST MV, promote its
// epoch vector with regards to the underlying tables.
// (2) For every MV handled by the task:
// (2.1) Update the MV's REFRESHED_AT timestamp.
// (2.2) Update the MV's statistics (different for
// the root and non-root MV's).
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::FinalMetadataUpdate()
{
CRURefreshTask *pParentTask = GetRefreshTask();
RUASSERT (NULL != pParentTask);
TInt32 currDuration = 0;
TInt64 ts = CalculateTimeStamp();
if (TRUE == pParentTask->NeedToExecuteInternalRefresh())
{
currDuration = GetTimerDuration();
}
CRUMV *pPrevMV, *pCurrMV = NULL;
CRUMVList &mvList = pParentTask->GetMVList();
DSListPosition pos = mvList.GetHeadPosition();
while (NULL != pos)
{
pPrevMV = pCurrMV;
pCurrMV = mvList.GetNext(pos);
// Update the MV's REFRESHED_AT timestamp
pCurrMV->SetTimestamp(ts);
// publish the refresh event to the PUBLISH table
NABoolean isRecomputePublish = (IsRecompute() == TRUE);
pCurrMV->PublishMVRefresh(isRecomputePublish);
if (CDDObject::eON_STATEMENT != pCurrMV->GetRefreshType())
{
if (NULL == pPrevMV)
{
// This is the Root MV (the first in the list)
if (CDDObject::eON_REQUEST == pCurrMV->GetRefreshType())
{
// For every table T used by MV,
// set MV.EPOCH[T]<--T.CURRENT_EPOCH.
UpdateRootMVCurrentEpochVector();
}
UpdateRootMVStatistics(currDuration);
}
else
{
UpdateNonRootMVStatistics(pCurrMV, pPrevMV, currDuration);
}
}
pCurrMV->SaveMetadata();
}
if (CDDObject::eON_STATEMENT != GetRootMV().GetRefreshType())
{
IncrementTopMvCurrentEpoch();
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::IncrementTblCurrentEpoch()
//
//
// If the top MV in the task (i.e., the one that no other
// MV in the task uses it) is also an involved table (i.e.,
// there is another MV in the other task that uses it), then:
// (1) Increment the MV's current epoch.
// (2) Update the table's timestamp (for the further propagation).
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::IncrementTopMvCurrentEpoch()
{
CRUMV &mv = GetRefreshTask()->GetTopMV();
if (TRUE == mv.IsInvolvedTbl())
{
CRUTbl *pTbl = mv.GetTblInterface();
pTbl->IncrementCurrentEpoch(TRUE /* through DDOL */);
pTbl->SaveMetadata();
pTbl->SetTimestamp(mv.GetTimestamp());
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::UpdateRootMVCurrentEpochVector()
//
// For every table T used by the task's root MV,
// set MV.EPOCH[T] <-- T.CURRENT_EPOCH
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::UpdateRootMVCurrentEpochVector()
{
CRUMV &rootMV = GetRootMV();
CRUTblList &tblList = rootMV.GetTablesUsedByMe();
DSListPosition pos = tblList.GetHeadPosition();
while (NULL != pos)
{
CRUTbl *pTbl = tblList.GetNext(pos);
rootMV.SetEpoch(pTbl->GetUID(), pTbl->GetCurrentEpoch());
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::UpdateRootMVStatistics()
//
// Update the statistics for the root MV's refresh duration
// and (optionally) the delta size.
//
// In the case the MV was recomputed, or refreshed incrementally
// with more than one delta, only the total duration is updated.
// In the case a single-delta refresh happened, both the duration
// *with regards to this delta*, and the delta size are updated.
//
// For each type of update, the new value of the heuristic
// is computed as a weighted average between the old heuristic
// and the new measurement.
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::UpdateRootMVStatistics(TInt32 currDuration)
{
CRURefreshTask &task = *GetRefreshTask();
if (FALSE == task.NeedToExecuteInternalRefresh())
{
return;
}
CRUDeltaDefList &deltaDefList = task.GetDeltaDefList();
TInt32 prevDuration, nextDuration;
CRUMV &rootMV = GetRootMV();
if (TRUE == IsRecompute())
{
prevDuration = rootMV.GetRecomputeDurationEstimate();
nextDuration = EvaluateHeuristic(currDuration, prevDuration);
rootMV.SetRecomputeDurationEstimate(nextDuration);
return;
}
switch(deltaDefList.GetCount())
{
case 0:
{
// This cannot happen
// (NeedToExecuteInternalRefresh() must have returned FALSE).
RUASSERT(FALSE);
}
case 1:
{
CRUDeltaDef *pDdef = deltaDefList.GetAt(0);
// A special case: update the refresh duration,
// and (optionally) the delta size.
UpdateSingleDeltaStatistics(pDdef, currDuration);
break;
}
case 2:
{
prevDuration = rootMV.GetRefreshDurationEstimateWith_2_Deltas();
nextDuration = EvaluateHeuristic(currDuration, prevDuration);
rootMV.SetRefreshDurationEstimateWith_2_Deltas(nextDuration);
break;
}
case 3:
{
prevDuration = rootMV.GetRefreshDurationEstimateWith_3_Deltas();
nextDuration = EvaluateHeuristic(currDuration, prevDuration);
rootMV.SetRefreshDurationEstimateWith_3_Deltas(nextDuration);
break;
}
default:
{
prevDuration = rootMV.GetRefreshDurationEstimateWith_N_Deltas();
nextDuration = EvaluateHeuristic(currDuration, prevDuration);
rootMV.SetRefreshDurationEstimateWith_N_Deltas(nextDuration);
break;
}
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::UpdateSingleDeltaStatistics()
//
// Update the heuristics for refresh duration for the *specific*
// table's delta and the delta size (if Duplicate Elimination
// was done before, and hence, the delta size is known).
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::
UpdateSingleDeltaStatistics(CRUDeltaDef *pDdef, TInt32 currDuration)
{
CRUMV &rootMV = GetRootMV();
TInt32 prevDuration, nextDuration;
prevDuration = rootMV.GetRefreshDurationEstimate(pDdef->tblUid_);
nextDuration = EvaluateHeuristic(currDuration, prevDuration);
rootMV.SetRefreshDurationEstimate(pDdef->tblUid_, nextDuration);
// Appling the delta size computation heuristic
// If there are no statistics, 0 is the default for the delta size
TInt32 nextDelta = 0;
if (NULL != pDdef->pStat_)
{
// There are statistics, hence the delta size is known
TInt32 currDelta = pDdef->pStat_->GetDeltaSize();
TInt32 prevDelta = rootMV.GetDeltaSizeEstimate(pDdef->tblUid_);
nextDelta = EvaluateHeuristic(currDelta, prevDelta);
}
rootMV.SetDeltaSizeEstimate(pDdef->tblUid_, nextDelta);
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::UpdateNonRootMVStatistics()
//
// A non-root MV receives its delta pipelined from the MV
// previous MV in the list. Hence, the statistic is with regards
// to the previous MV's (single) delta.
//
// There is no estimate on the delta size that is pipelined
// between the MVs, hence the heuristic is set to 0.
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::
UpdateNonRootMVStatistics(CRUMV *pCurrMV,
CRUMV *pPrevMV,
TInt32 currDuration)
{
if (FALSE == GetRefreshTask()->NeedToExecuteInternalRefresh())
{
return;
}
TInt64 tblUid = pPrevMV->GetUID();
TInt32 prevDuration =
pCurrMV->GetRefreshDurationEstimate(tblUid);
TInt32 nextDuration =
EvaluateHeuristic(currDuration, prevDuration);
pCurrMV->SetRefreshDurationEstimate(tblUid, prevDuration);
pCurrMV->SetDeltaSizeEstimate(tblUid, 0);
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::EvaluateHeuristic()
//
// The heuristic is an exponential backoff function that gives a
// part of weight to the old value and the new measurement:
//
// new = a*old + b*measure, where a+b=1
//
//--------------------------------------------------------------------------//
TInt32 CRURefreshTaskExecutor::EvaluateHeuristic(TInt32 curr, TInt32 prev)
{
const double prevFactor = 0.8;
const double currFactor = 0.2;
if ( 0 == prev )
{
return curr;
}
return static_cast<TInt32> ( prevFactor*prev + currFactor*curr );
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::CalculateTimeStamp()
//
// All the MVs that the task executor takes care of
// will receive the same timestamp. This must be the timestamp
// of the database cut that all the tables under the MV represent.
//
// The cases are as follows:
// (1) All the root MV's expanded tree is being refreshed.
// (1.1) The root MV is based only on regular tables/ON STATEMENT MVs
// (i.e., fully synchronized tables). If there is more than
// one such table, they will belong to the same equivalence
// set, and will have the same timestamp. Hence, the MV will
// inherit their timestamp.
// (1.2) The MV is also based on involved MVs. All the MVs used by
// it inherit inductively the equivalence set's common timestamp.
// Hence, the MV will inherit the same timestamp.
// (2) The root MV is based on non-involved MVs.
// (2.1) If there is more than one such used MV - inconsistency.
// (2.2) Otherwise, the root MV will inherit the external MV's
// timestamp. The timestamp will be smaller that all of the
// other used table's timestamps (current time measured in this
// invocation of the utility).
//
// In all the cases, the MV's timestamp is the minimum between the
// used objects' timestamps.
//
//--------------------------------------------------------------------------//
TInt64 CRURefreshTaskExecutor::CalculateTimeStamp()
{
CRUTblList &tblList = GetRootMV().GetTablesUsedByMe();
DSListPosition pos = tblList.GetHeadPosition();
CRUTbl *pTbl = tblList.GetNext(pos);
TInt64 minTS = pTbl->GetTimestamp();
while (NULL != pos)
{
pTbl = tblList.GetNext(pos);
TInt64 nextTS = pTbl->GetTimestamp();
minTS = (nextTS < minTS) ? nextTS : minTS;
}
if (0 == minTS)
{
// If the timestamp has not been executed until now,
// this is time to compute it.
// RUASSERT(CDDObject::eRECOMPUTE == GetRootMV().GetRefreshType());
minTS = CRUGlobals::GetCurrentTimestamp();
}
return minTS;
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::GenOpeningMessage()
//
// CALLED BY: LogOpeningMessage() method
//
// Generate the message about successful refresh.
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::GenOpeningMessage(CDSString &msg)
{
CRURefreshTask &task = *GetRefreshTask();
if (task.GetMVList().GetCount() > 1)
{
msg += RefreshDiags[2];
task.DumpNamesOfAllMVs(msg);
msg += RefreshDiags[4];
}
else
{
msg += RefreshDiags[3];
task.DumpNamesOfAllMVs(msg);
msg += RefreshDiags[5];
}
}
//--------------------------------------------------------------------------//
// CRURefreshTask::GenClosureMessage()
//
// CALLED BY: LogOpeningMessage() method
//
// Generate the message about successful refresh.
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::GenClosureMessage(CDSString &msg)
{
CRURefreshTask &task = *GetRefreshTask();
if (task.GetMVList().GetCount() > 1)
{
if (FALSE == GetRefreshTask()->NeedToExecuteInternalRefresh())
{
msg += RefreshDiags[2];
task.DumpNamesOfAllMVs(msg);
msg += RefreshDiags[6];
}
else
{
msg += RefreshDiags[2];
task.DumpNamesOfAllMVs(msg);
msg += RefreshDiags[7];
}
}
else
{
if (FALSE == GetRefreshTask()->NeedToExecuteInternalRefresh())
{
msg += RefreshDiags[3];
task.DumpNamesOfAllMVs(msg);
msg += RefreshDiags[8];
}
else
{
msg += RefreshDiags[3];
task.DumpNamesOfAllMVs(msg);
msg += RefreshDiags[9];
}
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::CheckSingleDeltaRestriction()
//
// Init() callee
//
// If the task must perform a runtime consistency check,
// this means that the MV which is based on a single MV
// and one or more tables is refreshed standalone. In this
// context, the deltas of all the fully-synchronized used
// objects (not ON REQUEST MVs) must be empty.
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::CheckSingleDeltaRestriction()
{
CRUMV &rootMV = GetRootMV();
CRUTblList &tblList = rootMV.GetFullySyncTablesUsedByMe();
DSListPosition pos = tblList.GetHeadPosition();
while (NULL != pos)
{
CRUTbl *pTbl = tblList.GetNext(pos);
RUASSERT (TRUE == pTbl->IsFullySynchronized());
if (TRUE == pTbl->IsEmptyDeltaNeeded()
&&
TRUE == rootMV.IsDeltaNonEmpty(*pTbl))
{
CRUException errDesc;
errDesc.SetError(IDS_RU_INCONSISTENT_JOIN);
errDesc.AddArgument(rootMVName_);
throw errDesc;
}
}
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ApplyIRCompilerDefaults()
//
// Apply the force options for the internal refresh statement
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ApplyIRCompilerDefaults()
{
// Always turn on the explain generation for IR commands
SetExplainGeneration();
SetAllowOfflineAccess();
if (NO_FORCE == forceFlags_)
{
return;
}
// LCOV_EXCL_START :rfi
RUASSERT(NULL != pRefreshTEDynamicContainer_);
CRUGlobals::GetInstance()->GetJournal().LogMessage(
"USING THE FOLLOWING CONTROL STATMENTS :\n");
if (FORCE_CQS & forceFlags_)
{
ApplyCQSForInternalRefresh();
}
if (FORCE_USER_CQS & forceFlags_)
{
ApplyUserCQSForInternalRefresh();
}
if (FORCE_MV_MDAM & forceFlags_)
{
CDMPreparedStatement *pStat = pRefreshTEDynamicContainer_->
GetPreparedStatement(MV_MDAM_STAT);
ExecuteStatement(*pStat, MV_MDAM_STAT);
CRUGlobals::GetInstance()->GetJournal().LogMessage(
pRefreshTEDynamicContainer_->GetLastSQL(MV_MDAM_STAT));
}
if (FORCE_TABLE_MDAM & forceFlags_)
{
for (Int32 i=FIRST_TBL_STAT;i<pRefreshTEDynamicContainer_->GetNumOfStmt();i++)
{
CDMPreparedStatement *pStat = pRefreshTEDynamicContainer_->
#pragma nowarn(1506) // warning elimination
GetPreparedStatement(i);
#pragma warn(1506) // warning elimination
if (NULL == pStat)
{
// No more control statements for table
break;
}
ExecuteStatement(*pStat, i);
CRUGlobals::GetInstance()->GetJournal().LogMessage(
#pragma nowarn(1506) // warning elimination
pRefreshTEDynamicContainer_->GetLastSQL(i));
#pragma warn(1506) // warning elimination
}
}
// LCOV_EXCL_STOP
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::SetExplainGeneration()
//
// Execute Control query default GENERATE_EXPLAIN 'ON'
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::SetExplainGeneration()
{
CDSString stmt("control query default GENERATE_EXPLAIN 'ON'");
CDMConnection conn;
conn.SetAllowSpecialSyntax(TRUE);
CDMStatement *pStmt = conn.CreateStatement();
pStmt->ExecuteUpdate(stmt, TRUE);
conn.DeleteStatement(pStmt);
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::SetAllowOfflineAccess()
//
// Execute Control query default CAT_PERMIT_OFFLINE_ACCESS 'ON'
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::SetAllowOfflineAccess()
{
CDSString stmt("control query default CAT_PERMIT_OFFLINE_ACCESS 'ON'");
CDMConnection conn;
conn.SetAllowSpecialSyntax(TRUE);
CDMStatement *pStmt = conn.CreateStatement();
pStmt->ExecuteUpdate(stmt, TRUE);
conn.DeleteStatement(pStmt);
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ResetIRCompilerDefaults()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ResetIRCompilerDefaults()
{
// LCOV_EXCL_START :rfi
if ((FORCE_CQS & forceFlags_) || (FORCE_USER_CQS & forceFlags_))
{
CDMPreparedStatement *pStat = pRefreshTEDynamicContainer_->
GetPreparedStatement(CQS_CUT_STAT);
ExecuteStatement(*pStat, CQS_CUT_STAT);
}
if (forceFlags_ & FORCE_MV_MDAM ||
forceFlags_ & FORCE_TABLE_MDAM)
{
CDMPreparedStatement *pStat =
pRefreshTEDynamicContainer_->GetPreparedStatement(RESET_MDAM_STAT);
ExecuteStatement(*pStat,RESET_MDAM_STAT);
}
// LCOV_EXCL_STOP
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ApplyCQSForInternalRefresh()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ApplyCQSForInternalRefresh()
{
CDMPreparedStatement *pStat = pRefreshTEDynamicContainer_->
GetPreparedStatement(CQS_STAT);
ExecuteStatement(*pStat, CQS_STAT);
CRUGlobals::GetInstance()->GetJournal().LogMessage(
pRefreshTEDynamicContainer_->GetLastSQL(CQS_STAT));
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ApplyUserCQSForInternalRefresh()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ApplyUserCQSForInternalRefresh()
{
CDMPreparedStatement *pStat = pRefreshTEDynamicContainer_->
GetPreparedStatement(USER_CQS_FORCE);
ExecuteStatement(*pStat, USER_CQS_FORCE);
CRUGlobals::GetInstance()->GetJournal().LogMessage(
pRefreshTEDynamicContainer_->GetLastSQL(USER_CQS_FORCE));
}
//--------------------------------------------------------------------------//
// CRURefreshTaskExecutor::ExecuteShowExplain()
//
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::ExecuteShowExplain()
{
if (FALSE == (FORCE_SHOW_EXPLAIN & forceFlags_))
{
return;
}
CDMPreparedStatement *pStat = pRefreshTEDynamicContainer_->
GetPreparedStatement(SHOW_EXPLAIN);
ExecuteStatement(*pStat, SHOW_EXPLAIN);
CRUGlobals::GetInstance()->GetJournal().LogMessage(
pRefreshTEDynamicContainer_->GetLastSQL(SHOW_EXPLAIN));
}
//--------------------------------------------------------------------------//
// Called from the PROLOGUE state for remotely executed tasks.
// Returns FALSE if the main process lost its locks.
//--------------------------------------------------------------------------//
NABoolean CRURefreshTaskExecutor::StartTableLockProtocol()
{
RUASSERT(NULL != tableLockProtocol_);
if ( TRUE == tableLockProtocol_->IsEnsureUsedTableLockContinuityNeeded() &&
TRUE == IsInTaskProcess())
{
if (FALSE == tableLockProtocol_->EnsureUsedTableLockContinuity() )
{
// The main process lost his locks , so it must
// have fallen. We must exit with out doing anything
#ifdef _DEBUG
cout << "The main process lost his locks , so it must have died.\n";
#endif
return FALSE;
}
}
return TRUE;
}
//--------------------------------------------------------------------------//
// Called from the REMOTE_END state for remotely executed tasks.
//--------------------------------------------------------------------------//
void CRURefreshTaskExecutor::EndTableLockProtocol()
{
RUASSERT(NULL != tableLockProtocol_);
if ( TRUE == tableLockProtocol_->IsEnsureUsedTableLockContinuityNeeded() &&
TRUE == IsInTaskProcess())
{
tableLockProtocol_->UnLockUsedTablePartitions();
}
}
/*
******************************************************************************
*** Class CRUTableLockProtocol
******************************************************************************
*/
//--------------------------------------------------------------------------//
// CRUTableLockProtocol::Init()
//
//--------------------------------------------------------------------------//
void CRUTableLockProtocol::Init(CRUTblList &tblList,
CRUDeltaDefList &DeltaDefList)
{
CRUDeltaDef *pDdef = NULL;
CRUTbl *curTable = NULL;
// For each table in the used table list, find the
// corresponding delta def.
DSListPosition tPos = tblList.GetHeadPosition();
while (tPos != NULL)
{
curTable = tblList.GetNext(tPos);
DSListPosition dPos = DeltaDefList.GetHeadPosition();
while (dPos != NULL)
{
pDdef = DeltaDefList.GetNext(dPos);
// Match each table to its delta def by table UID.
if (curTable->GetUID() == pDdef->tblUid_)
{
// We have a match!
// We need this lock only if the range log has any data in it.
if (TRUE == pDdef->isRangeLogNonEmpty_)
{
if (LocksList_ == NULL)
{
// This is the first match we have found - alloc the list.
LocksList_ = new LocksList();
NothingToLock_ = FALSE;
}
// Alloc a new lock object, init it and add it to the list.
CRUSingleTableLockProtocol *thisLock = new CRUSingleTableLockProtocol();
thisLock->Init(curTable, pDdef->toEpoch_);
LocksList_->AddTail(thisLock);
}
break; // Done with this table.
} // if Matching table UID
} // Loop on DeltaDefList
} // Loop on tblList
}
//--------------------------------------------------------------------------//
// CRUTableLockProtocol::StoreData()
//
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRUTableLockProtocol::StoreData(CUOFsIpcMessageTranslator &translator)
{
// Store the NothingToLock_ flag
translator.WriteBlock(&NothingToLock_, sizeof(BOOL));
if (NothingToLock_)
return;
// Store the number of elements in the lock list
Lng32 count = LocksList_->GetCount();
translator.WriteBlock(&count, sizeof(Lng32));
// Store the lock elements.
CRUSingleTableLockProtocol *thisLock = NULL;
DSListPosition pos = LocksList_->GetHeadPosition();
while (pos != NULL)
{
thisLock = LocksList_->GetNext(pos);
thisLock->StoreData(translator);
}
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUTableLockProtocol::LoadData()
//
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRUTableLockProtocol::LoadData(CUOFsIpcMessageTranslator &translator)
{
// Load the NothingToLock_ flag
translator.ReadBlock(&NothingToLock_, sizeof(BOOL));
if (NothingToLock_)
return;
// Load the number of elements in the lock list
Lng32 count;
translator.ReadBlock(&count, sizeof(Lng32));
// Load the lock elements.
LocksList_ = new LocksList();
for (Lng32 i=0; i<count; i++)
{
// Alloc a new lock object, init it and add it to the list.
CRUSingleTableLockProtocol *thisLock = new CRUSingleTableLockProtocol();
thisLock->LoadData(translator);
LocksList_->AddTail(thisLock);
}
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUTableLockProtocol::IsEnsureUsedTableLockContinuityNeeded()
//
//--------------------------------------------------------------------------//
BOOL CRUTableLockProtocol::IsEnsureUsedTableLockContinuityNeeded() const
{
if (NothingToLock_)
return FALSE;
// Iterate on the lock elements.
CRUSingleTableLockProtocol *thisLock = NULL;
DSListPosition pos = LocksList_->GetHeadPosition();
while (pos != NULL)
{
thisLock = LocksList_->GetNext(pos);
if (thisLock->IsEnsureUsedTableLockContinuityNeeded())
return TRUE;
}
return FALSE;
}
//--------------------------------------------------------------------------//
// CRUTableLockProtocol::EnsureUsedTableLockContinuity()
//
// called by work() in the remote process
//
// The function returns TRUE is all is well.
// It returns FALSE if it detected new rows in the log file, which means
// the main process has died.
//--------------------------------------------------------------------------//
BOOL CRUTableLockProtocol::EnsureUsedTableLockContinuity()
{
if (NothingToLock_)
return TRUE;
// Iterate on the lock elements.
CRUSingleTableLockProtocol *thisLock = NULL;
DSListPosition pos = LocksList_->GetHeadPosition();
while (pos != NULL)
{
thisLock = LocksList_->GetNext(pos);
if (thisLock->EnsureUsedTableLockContinuity() == FALSE)
return FALSE;
}
return TRUE;
}
//--------------------------------------------------------------------------//
// CRUTableLockProtocol::UnLockUsedTablePartitions()
//
//--------------------------------------------------------------------------//
void CRUTableLockProtocol::UnLockUsedTablePartitions()
{
if (NothingToLock_)
return;
// Iterate on the lock elements.
CRUSingleTableLockProtocol *thisLock = NULL;
DSListPosition pos = LocksList_->GetHeadPosition();
while (pos != NULL)
{
thisLock = LocksList_->GetNext(pos);
thisLock->UnLockUsedTablePartitions();
}
}
/*
******************************************************************************
*** Class CRUSingleTableLockProtocol
******************************************************************************
*/
CRUSingleTableLockProtocol::CRUSingleTableLockProtocol() :
pUsedTblFileNameList_(NULL),
pPartitionFileList_(NULL),
pEmpCheck_(NULL),
endEpoch_(0)
{
}
CRUSingleTableLockProtocol::~CRUSingleTableLockProtocol()
{
delete pUsedTblFileNameList_;
delete pPartitionFileList_;
delete pEmpCheck_;
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::Init()
//
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::Init(CRUTbl *pTbl,
TInt32 EndEpoch)
{
endEpoch_ = EndEpoch;
pUsedTblFileNameList_ = new CDSStringList();
CopyPartitionFileNames(pTbl);
BuildEmpCheck(pTbl);
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::CopyPartitionFileNames()
//
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::CopyPartitionFileNames(CRUTbl *pTbl)
{
pTbl->BuildPartitionFileNamesList();
const CDSStringList &sourceList = pTbl->GetPartitionFileNamesList();
DSListPosition pos = sourceList.GetHeadPosition();
while (NULL != pos)
{
CDSString *pName = new CDSString(*(sourceList.GetNext(pos)));
pUsedTblFileNameList_->AddTail(pName);
}
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::EnsureUsedTableLockContinuity()
//
// This function preforms the actions that are needed to ensure the lock
// continuity on the used table.The reason for this actions are explained
// in this class description.
//
// The function returns TRUE is all is well.
// It returns FALSE if it detected new rows in the log file, which means
// the main process has died.
//--------------------------------------------------------------------------//
BOOL CRUSingleTableLockProtocol::EnsureUsedTableLockContinuity()
{
LockUsedTablePartitions();
if ( TRUE == AnyNewRecordsSinceEpochIncrement() )
{
UnLockUsedTablePartitions();
return FALSE;
}
return TRUE;
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::BuildEmpCheck()
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::BuildEmpCheck(CRUTbl *pTbl)
{
CRUEmpCheckVector empVector;
empVector.AddEpochForCheck(endEpoch_ + 1);
empVector.Build();
pEmpCheck_ = new CRUEmpCheck(empVector);
pEmpCheck_->ComposeSQL(*pTbl);
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::AnyNewRecordsSinceEpochIncrement()
//--------------------------------------------------------------------------//
BOOL CRUSingleTableLockProtocol::AnyNewRecordsSinceEpochIncrement()
{
pEmpCheck_->PerformCheck();
BOOL ret = pEmpCheck_->GetVector().IsDeltaNonEmpty(endEpoch_ + 1);
return ret;
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::LockUsedTablePartitions()
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::LockUsedTablePartitions()
{
pPartitionFileList_ = new CUOFsFileList();
DSListPosition pos = pUsedTblFileNameList_->GetHeadPosition();
while (NULL != pos)
{
CDSString &pName = *pUsedTblFileNameList_->GetNext(pos);
CUOFsFile *file = new CUOFsFile();
file->FileOpen(pName,CUOFsFile::eProtected,CUOFsFile::eReadOnly);
pPartitionFileList_->AddTail(file);
}
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::UnLockUsedTablePartitions()
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::UnLockUsedTablePartitions()
{
DSListPosition pos = pPartitionFileList_->GetHeadPosition();
while (NULL != pos)
{
CUOFsFile *file = pPartitionFileList_->GetNext(pos);
file->FileClose();
}
}
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::StoreData(CUOFsIpcMessageTranslator &translator)
{
StoreDataFileNameList(translator);
pEmpCheck_->StoreData(translator);
translator.WriteBlock(&endEpoch_, sizeof(TInt32));
}
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::LoadData(CUOFsIpcMessageTranslator &translator)
{
LoadDataFileNameList(translator);
pEmpCheck_ = new CRUEmpCheck();
pEmpCheck_->LoadData(translator);
translator.ReadBlock(&endEpoch_,sizeof(TInt32));
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::StoreDataFileNameList()
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::StoreDataFileNameList(CUOFsIpcMessageTranslator &translator)
{
Int32 size = pUsedTblFileNameList_->GetCount();
translator.WriteBlock(&size,sizeof(Int32));
DSListPosition pos = pUsedTblFileNameList_->GetHeadPosition();
while (NULL != pos)
{
Int32 stringSize;
CDSString &str = *(pUsedTblFileNameList_->GetNext(pos));
stringSize = str.GetLength() + 1;
translator.WriteBlock(&stringSize, sizeof(Int32));
#pragma nowarn(1506) // warning elimination
translator.WriteBlock(str.c_string(), stringSize);
#pragma warn(1506) // warning elimination
}
}
//--------------------------------------------------------------------------//
// CRUSingleTableLockProtocol::LoadDataFileNameList()
//--------------------------------------------------------------------------//
void CRUSingleTableLockProtocol::
LoadDataFileNameList(CUOFsIpcMessageTranslator &translator)
{
Int32 size;
translator.ReadBlock(&size,sizeof(Int32));
pUsedTblFileNameList_ = new CDSStringList();
for (Int32 i=0;i<size;i++)
{
Int32 stringSize;
const Int32 bufSize = CUOFsSystem::IpcMaxGuardianPathNameLength;
char buf[bufSize];
translator.ReadBlock(&stringSize, sizeof(Int32));
RUASSERT(bufSize > stringSize);
#pragma nowarn(1506) // warning elimination
translator.ReadBlock(buf, stringSize);
#pragma warn(1506) // warning elimination
CDSString *objName = new CDSString(buf);
pUsedTblFileNameList_->AddTail(objName);
}
}