blob: 70f041ce12098ae909dd7766928d5860d52ec1a2 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: RuSimpleRefreshTaskExecutor.cpp
* Description: Implementation of class CRUSimpleRefreshTaskExecutor.
*
*
* Created: 08/14/2000
* Language: C++
*
*
*
******************************************************************************
*/
#include "RuSimpleRefreshTaskExecutor.h"
#include "RuSimpleRefreshSQLComposer.h"
#include "RuMV.h"
#include "RuTbl.h"
#include "RuGlobals.h"
#include "RuJournal.h"
#include "uofsIpcMessageTranslator.h"
//--------------------------------------------------------------------------//
// Constructor and destructor
//--------------------------------------------------------------------------//
CRUSimpleRefreshTaskExecutor::
CRUSimpleRefreshTaskExecutor(CRURefreshTask *pParentTask) :
inherited(pParentTask),
isSingleDeltaRefresh_(TRUE),
simpleRefreshTEDynamicContainer_(NUM_OF_SQL_STMT),
numOfPhases_(0)
{}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::StoreRequest()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRUSimpleRefreshTaskExecutor::
StoreRequest(CUOFsIpcMessageTranslator &translator)
{
inherited::StoreRequest(translator);
// Handle isSingleDeltaRefresh_ data member
translator.WriteBlock(&isSingleDeltaRefresh_,sizeof(BOOL));
// Handle refresh executor sql dynamic container
simpleRefreshTEDynamicContainer_.StoreData(translator);
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::StoreReply()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRUSimpleRefreshTaskExecutor::
StoreReply(CUOFsIpcMessageTranslator &translator)
{
inherited::StoreReply(translator);
// Handle numOfPhases_ data member
translator.WriteBlock(&numOfPhases_,sizeof(short));
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::LoadRequest()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRUSimpleRefreshTaskExecutor::
LoadRequest(CUOFsIpcMessageTranslator &translator)
{
inherited::LoadRequest(translator);
// Handle isSingleDeltaRefresh_ data member
translator.ReadBlock(&isSingleDeltaRefresh_,sizeof(BOOL));
// Handle refresh executor sql dynamic container
simpleRefreshTEDynamicContainer_.LoadData(translator);
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::LoadReply()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :cnu
void CRUSimpleRefreshTaskExecutor::
LoadReply(CUOFsIpcMessageTranslator &translator)
{
inherited::LoadReply(translator);
// Handle numOfPhases_ data member
translator.ReadBlock(&numOfPhases_,sizeof(short));
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::Init()
//
// Initialize data members by analyzing the RefreshTask
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::Init()
{
inherited::Init();
RUASSERT(TRUE == HasWork());
if (FALSE == GetRefreshTask()->NeedToExecuteInternalRefresh())
{
// This is an incremental refresh that does not have any delta
// We do nothing here and the proper treatment is done in the
// derived classes
return;
}
CRURefreshTask *pRefreshTask = GetRefreshTask();
isSingleDeltaRefresh_ = pRefreshTask->IsSingleDeltaRefresh();
ComposeMySql();
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::ComposeMySql()
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::ComposeMySql()
{
CRUSimpleRefreshSQLComposer myComposer(GetRefreshTask());
BOOL isRecompute = IsRecompute();
if (FALSE == isRecompute)
{
// THE INTERNAL REFRESH (incremental) statement
myComposer.ComposeRefresh();
simpleRefreshTEDynamicContainer_.SetStatementText(
INTERNAL_REFRESH,myComposer.GetSQL());
}
// Always compose the INTERNAL REFRESH RECOMPUTE syntax
// (because we might revert to it during the MIN/MAX refresh)
if (0 != (GetRootMV().GetRefreshPatternMap() & CRUMV::NODELETE))
{
RUASSERT(TRUE == isRecompute);
myComposer.ComposeRecompute(CRUSimpleRefreshSQLComposer::REC_NODELETE);
}
else
{
// We must delete all data from the MV prior to the recomputation
myComposer.ComposeRecompute(CRUSimpleRefreshSQLComposer::REC_DELETE);
}
simpleRefreshTEDynamicContainer_.SetStatementText(
RECOMPUTE,myComposer.GetSQL());
// The LOCK TABLE ... statement
myComposer.ComposeLock(GetRootMVName(),TRUE);
simpleRefreshTEDynamicContainer_.SetStatementText(
RU_LOCK_TABLE,myComposer.GetSQL());
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::HandleSqlError()
//
// Overriding the defualt behavior
//
// If we receive MIN_MAX_RECOMPUTATION_NEEDED sql code,
// it means that we need to perform a full recomputation of this MV
// (an MV with min/max on non insert only tables may require this method).
//
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :rfi
void CRUSimpleRefreshTaskExecutor::HandleSqlError(CDSException &ex,
Lng32 errorCode,
const char *errorArgument)
{
if (ex.GetErrorCode(0) == MIN_MAX_RECOMPUTATION_NEEDED)
{
throw NeedRecomputeException();
}
inherited::HandleSqlError(ex,errorCode,errorArgument);
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::PrepareRecomputeMV()
//
// The base for recomputing an MV
//--------------------------------------------------------------------------//
CDMPreparedStatement *CRUSimpleRefreshTaskExecutor::PrepareRecomputeMV()
{
ApplyIRCompilerDefaults();
CDMPreparedStatement *pStat =
simpleRefreshTEDynamicContainer_.GetPreparedStatement(RECOMPUTE);
ResetIRCompilerDefaults();
ExecuteShowExplain();
return pStat;
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::ExecuteRecomputeMV()
//
// The base for recomputing an MV
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::ExecuteRecomputeMV(CDMPreparedStatement *pStat)
{
ExecuteStatement(*pStat,IDS_RU_IREFRESH_FAILED);
SetRecompute(TRUE);
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::PrepareSingleDeltaRefresh()
//
// Compile a single-delta INTERNAL REFRESH
// (Requires only a single phase).
//--------------------------------------------------------------------------//
CDMPreparedStatement *CRUSimpleRefreshTaskExecutor::PrepareSingleDeltaRefresh()
{
LOGTIME("Starting to compile single-delta refresh \n");
CDMPreparedStatement *pStat =
simpleRefreshTEDynamicContainer_.GetPreparedStatement(INTERNAL_REFRESH);
return pStat;
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::ExecuteSingleDeltaRefresh()
//
// Execute a single-delta INTERNAL REFRESH
// (Requires only a single phase).
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::ExecuteSingleDeltaRefresh(CDMPreparedStatement *pStat)
{
LOGTIME("Starting to execute single-delta refresh \n");
ExecuteStatement(*pStat,IDS_RU_IREFRESH_FAILED);
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::PrepareMultiDeltasRefresh()
//
// Compile all phases of multi-delta INTERNAL REFRESH.
//
//--------------------------------------------------------------------------//
CRUSimpleRefreshTaskExecutor::StmtList *CRUSimpleRefreshTaskExecutor::PrepareMultiDeltasRefresh()
{
// Compilation Stage
LOGTIME("Starting to compile multi-delta refresh \n");
CRUSimpleRefreshTaskExecutor::StmtList *stmts = new StmtList();
#ifdef NA_LINUX
Int32 nid = 0;
// get the node id of this process
if (XZFIL_ERR_OK != msg_mon_get_my_info(&nid, NULL, NULL, 0, NULL, NULL, NULL, NULL))
{
CRUException ex;
ex.SetError(IDS_RU_UNEXPECTED);
ex.AddArgument("Call to <msg_mon_get_my_info> to get node id failed.");
throw ex;
}
#endif
// Get the statement before compiling it.
CDMPreparedStatement *pStat = NULL;
BOOL needMorePhases = TRUE;
while (TRUE == needMorePhases)
{
needMorePhases = CompileMultiDeltasIRefresh(pStat);
stmts->AddTail(pStat);
numOfPhases_++;
#ifdef NA_LINUX
// check the RMS shared memory after every CHECK_RMS_EACH_N_STMT statements
if (numOfPhases_%CHECK_RMS_EACH_N_STMT == 0)
{
Int32 heapUsed=0;
Int32 heapTotal=0;
pStat->getRmsHeapStats(nid, heapUsed, heapTotal);
// round up the percentage number
float percentOfRMSHeapUsed = (heapUsed*100.0/heapTotal)+0.5;
// raise an error an bail out if we already used more than the allowed RMS shared memory
if (percentOfRMSHeapUsed > PERCENT_OF_RMS_SMEM_USED)
{
CRUException ex;
ex.SetError(IDS_RU_EXHAUSTED_RMS_SHARED_MEM);
ex.AddArgument((Int32)(percentOfRMSHeapUsed));
ex.AddArgument(numOfPhases_);
throw ex;
}
}
#endif
}
return stmts;
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::ExecuteMultiDeltasRefresh()
//
// Execute all phases of multi-delta INTERNAL REFRESH.
//
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::ExecuteMultiDeltasRefresh(StmtList *stmts)
{
// Execution stage
LOGTIME("Starting to execute multi-delta refresh \n");
CDMPreparedStatement *pStat = NULL;
DSListPosition pos = stmts->GetHeadPosition();
while (pos != NULL)
{
pStat = stmts->GetNext(pos);
ExecuteStatement(*pStat,IDS_RU_IREFRESH_FAILED);
}
// This is probably a memory leak, but this delete causes a crash
// in the destructor when deleting the statement container.
//delete stmts;
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::CompileMultiDeltasIRefresh()
//
// If we recieve MIN_MAX_RECOMPUTATION_NEEDED sql code ,it means that we
// need to make a full recomputation of this mv ( mv that has min/max on non
// insert only tables may require this method).
// Another case is MULTIDELTA_RECOMPUTATION_NEEDED, which means its a multi-
// delta MJV that is just too big, so instead of risking an optimizer crash,
// we just recompute (which will probably be faster than executing such a
// huge tree anyway).
//
// When more phases are needed we recieve a warning code after the
// compilation was done
//--------------------------------------------------------------------------//
BOOL CRUSimpleRefreshTaskExecutor::
CompileMultiDeltasIRefresh(CDMPreparedStatement *&pStat)
{
simpleRefreshTEDynamicContainer_.SetCompiledTimeParam
(INTERNAL_REFRESH,0,CRUSQLComposer::TInt32ToStr(numOfPhases_));
try {
// The FALSE is to avoid deleting used statements.
// They will be deleted later in the list.
pStat = simpleRefreshTEDynamicContainer_.
GetPreparedStatement(INTERNAL_REFRESH, FALSE);
}
// LCOV_EXCL_START :rfi
catch (CDSException &ex)
{
if (ex.GetErrorCode(0) == MIN_MAX_RECOMPUTATION_NEEDED ||
ex.GetErrorCode(0) == MULTIDELTA_RECOMPUTATION_NEEDED )
{
// In case we need a multi phase refresh and we have min-max
// columns we just do a recomputation instead
throw NeedRecomputeException();
}
ex.SetError(IDS_RU_IREFRESH_FAILED);
ex.AddArgument(simpleRefreshTEDynamicContainer_.
GetLastSQL(INTERNAL_REFRESH));
throw ex; // Re-throw
}
// LCOV_EXCL_STOP
if (TRUE == pStat->HasWarning() )
{
return HasWarningCode(pStat->GetWarningException(), MORE_PHASE_CODE);
}
else
{
return FALSE;
}
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::HasWarningCode()
//
//--------------------------------------------------------------------------//
BOOL CRUSimpleRefreshTaskExecutor::HasWarningCode(CDSException ex,Lng32 errorCode) const
{
for (Int32 i=0;i<ex.GetNumErrors();i++)
{
if (ex.GetErrorCode(i) == errorCode)
{
return TRUE;
}
}
return FALSE;
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::FinalMetadataUpdate()
//
// Refine the parent class's metadata update for a task
// that performed a recompute.
//
// (1) After a successful refresh, the MV's status
// must become INITIALIZED.
// (2) If the MV is ON REQUEST, update the value of
// MV.RECOMPUTE_EPOCH.
//
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::FinalMetadataUpdate()
{
inherited::FinalMetadataUpdate();
CRURefreshTask *pParentTask = GetRefreshTask();
if (FALSE == IsRecompute())
{
// The recompute can be either expected (by the task)
// or unexpected (the refresh started as incremented,
// but finished by recompute). In both cases, check
// the executor, rather than the parent task.
return;
}
CRUMV &rootMV = GetRootMV();
if (CDDObject::eINITIALIZED != rootMV.GetMVStatus())
{
RUASSERT(CDDObject::eNON_INITIALIZED == rootMV.GetMVStatus() ||
CDDObject::eUNAVAILABLE == rootMV.GetMVStatus());
rootMV.SetMVStatus(CDDObject::eINITIALIZED);
rootMV.SaveMetadata();
}
if (CDDObject::eON_REQUEST == rootMV.GetRefreshType())
{
CRUTbl *pTbl = rootMV.GetTblInterface();
RUASSERT(NULL != pTbl);
// Set MV.RECOMPUTE_EPOCH <-- MV.CURRENT_EPOCH
if (TRUE == rootMV.IsInvolvedTbl())
{
// If the mv is also an involved mv, the epoch was already
// incremented by inherited::FinalMetadataUpdate()
pTbl->SetRecomputeEpoch(pTbl->GetCurrentEpoch() - 1);
}
else
{
pTbl->SetRecomputeEpoch(pTbl->GetCurrentEpoch());
}
pTbl->SaveMetadata();
}
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::LogOpeningMessage()
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::LogOpeningMessage()
{
CDSString msg;
GenOpeningMessage(msg);
if (TRUE == IsRecompute())
{
msg += RefreshDiags[10];
}
msg += RefreshDiags[11] + CDSString("...\n");
CRUGlobals::GetInstance()->GetJournal().LogMessage(msg);
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::LockMV()
//
// Execute a lock table sql statement on the mv
// We don't need to implement here an unlock statement because except for
// unaudited mv the lock will be removed in the end of the transaction by
// itself
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::LockMV()
{
CDMPreparedStatement *pStat = simpleRefreshTEDynamicContainer_.
GetPreparedStatement(RU_LOCK_TABLE);
ExecuteStatement(*pStat,IDS_RU_LOCK_TABLE_FAILED);
}
//--------------------------------------------------------------------------//
// CRUSimpleRefreshTaskExecutor::LogClosureMessage()
//--------------------------------------------------------------------------//
void CRUSimpleRefreshTaskExecutor::LogClosureMessage()
{
CDSString msg;
GenClosureMessage(msg); // MV ... has been refreshed
if (TRUE == IsRecompute())
{
msg += RefreshDiags[10]; // (by recompute)
}
else
{
// Phases == 0 means MV was up to date.
if (GetNumOfPhases() > 0)
{
if (IsSingleDeltaRefresh())
{
msg += RefreshDiags[19]; // from a single delta
}
else
{
Lng32 deltas = GetRefreshTask()->GetDeltaDefList().GetCount();
// from ... deltas
msg += RefreshDiags[20];
msg += CRUSQLComposer::TInt32ToStr(deltas);
msg += RefreshDiags[21];
if (GetNumOfPhases() > 1)
{
// (in ... phases)
msg += RefreshDiags[12];
msg += CRUSQLComposer::TInt32ToStr(GetNumOfPhases());
msg += RefreshDiags[13];
}
}
}
}
if (TRUE == GetRefreshTask()->NeedToExecuteInternalRefresh())
{
// in a single transaction
msg += RefreshDiags[11];
}
msg += CDSString(".\n");
CRUGlobals::GetInstance()->GetJournal().LogMessage(msg);
}