| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| **********************************************************************/ |
| /* -*-C++-*- |
| ****************************************************************************** |
| * |
| * File: RuDupElimTaskExecutor.cpp |
| * Description: Implementation of class CRUDupElimTaskExecutor. |
| * |
| * |
| * Created: 05/25/2000 |
| * Language: C++ |
| * |
| * |
| * |
| ****************************************************************************** |
| */ |
| |
| #include "uofsIpcMessageTranslator.h" |
| |
| #include "RuDupElimTaskExecutor.h" |
| #include "RuDupElimTask.h" |
| #include "RuDupElimSQLComposer.h" |
| #include "RuDupElimLogScanner.h" |
| #include "RuDupElimSingleRowResolv.h" |
| #include "RuDupElimRangeResolv.h" |
| |
| #include "RuTbl.h" |
| #include "RuKeyColumn.h" |
| |
| #include "RuGlobals.h" |
| #include "RuOptions.h" |
| #include "RuJournal.h" |
| |
| //--------------------------------------------------------------------------// |
| // PUBLIC AREA |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // Constructor and destructor |
| //--------------------------------------------------------------------------// |
| |
| CRUDupElimTaskExecutor::CRUDupElimTaskExecutor(CRUTask *pParentTask) : |
| inherited(pParentTask), |
| globals_(), |
| ctrlStmtContainer_(CRUDupElimConst::NUM_CONTROL_STMTS), |
| txnTimeLimit_(0), |
| phase_(0), |
| // The units |
| pLogScanner_(NULL), |
| pSingleRowResolver_(NULL), |
| pRangeResolver_(NULL) |
| {} |
| |
| CRUDupElimTaskExecutor::~CRUDupElimTaskExecutor() |
| { |
| delete pLogScanner_; |
| |
| delete pSingleRowResolver_; |
| |
| delete pRangeResolver_; |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::GetStatisticsMap() |
| //--------------------------------------------------------------------------// |
| |
| const CRUDeltaStatisticsMap &CRUDupElimTaskExecutor:: |
| GetStatisticsMap() const |
| { |
| RUASSERT (NULL != pLogScanner_); |
| |
| return pLogScanner_->GetStatisticsMap(); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // INITIALIZATION |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::Init() |
| // |
| // Duplicate Elimination always runs under a (number of) transactions, |
| // and demands the log to be audited. This is very important, because |
| // otherwise in the case of failure the log can get out of sync |
| // with the table. The cases that do not comply with this condition |
| // must be blocked by the catalog/pre-runtime check. |
| // |
| // At the Init stage, the following actions are performed: |
| // (1) The globals are initialized. |
| // (2) The execution units are created. |
| // (3) The SQL statements' text is generated, and the units are |
| // configured by it. |
| // (4) The transaction's timeout is computed. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::Init() |
| { |
| inherited::Init(); |
| |
| CRUDupElimTask *pParentTask = (CRUDupElimTask*) GetParentTask(); |
| |
| RUASSERT(NULL != pParentTask); |
| |
| if (FALSE == pParentTask->NeedToExecuteDE()) |
| { |
| ResetHasWork(); // Nothing to do |
| return; |
| } |
| |
| InitGlobals(pParentTask); |
| |
| InitUnits(pParentTask); |
| |
| InitSQL(pParentTask); |
| |
| InitTxnTimeLimit(); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // FINITE-STATE MACHINE |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::Work() |
| // |
| // The main FSM switch. |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::Work() |
| { |
| switch(GetState()) |
| { |
| case EX_START: |
| { |
| ResetDECompleteFlag(); // Local |
| break; |
| } |
| case EX_PERFORM_PHASE: |
| { |
| PerformPhase(); // Remote, can happen mutiple times |
| break; |
| } |
| case EX_EPILOGUE: // Local |
| { |
| FinalMetadataUpdate(); |
| break; |
| } |
| default: RUASSERT(FALSE); |
| }; |
| } |
| |
| //--------------------------------------------------------------------------// |
| // INTER-PROCESS COMMUNICATION |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::GetIpcBufferSize() |
| // |
| // Implementation of pure virtual. Estimate the IPC buffer size. The buffer |
| // is allocated *before* the Init() method is applied. |
| // |
| // The requestor/server communication model requires that the same buffer |
| // will serve for both request and reply. In most task executors, |
| // the request data is dominant by far, and the buffer size can be estimated |
| // by a (reasonable) constant. However, the DE task executor returns (in the |
| // reply) the delta statistics. The reply buffer size depends therefore |
| // on the number of statistics' blocks. |
| // |
| //--------------------------------------------------------------------------// |
| |
| Lng32 CRUDupElimTaskExecutor::GetIpcBufferSize() const |
| { |
| Lng32 requestSize = SIZE_OF_REQUEST_PACK_BUFFER; |
| |
| Lng32 replySize = |
| ((CRUDupElimTask*) GetParentTask())->GetDeltaStatisticsBufSize() |
| + |
| FIXED_PART_OF_REPLY_PACK_BUFFER; |
| |
| return (requestSize > replySize) ? requestSize : replySize; |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::LoadRequest() |
| // |
| // REMOTE after receiving MAIN. Read the DE globals and control statements, |
| // build the units and load their SQL text. |
| // |
| //--------------------------------------------------------------------------// |
| // LCOV_EXCL_START :cnu |
| void CRUDupElimTaskExecutor:: |
| LoadRequest(CUOFsIpcMessageTranslator &translator) |
| { |
| inherited::LoadRequest(translator); |
| |
| BOOL flag; |
| |
| translator.ReadBlock(&txnTimeLimit_, sizeof(TInt32)); |
| |
| globals_.LoadData(translator); |
| ctrlStmtContainer_.LoadData(translator); |
| |
| // Build the units and load their SQL text |
| RUASSERT(NULL == pLogScanner_); |
| pLogScanner_ = new CRUDupElimLogScanner(globals_, ctrlStmtContainer_); |
| |
| pLogScanner_->LoadRequest(translator); |
| |
| RUASSERT(NULL == pSingleRowResolver_); |
| translator.ReadBlock(&flag, sizeof(BOOL)); |
| if (TRUE == flag) |
| { |
| pSingleRowResolver_ = |
| new CRUDupElimSingleRowResolver(globals_, ctrlStmtContainer_); |
| pSingleRowResolver_->LoadRequest(translator); |
| } |
| |
| RUASSERT(NULL == pRangeResolver_); |
| translator.ReadBlock(&flag, sizeof(BOOL)); |
| if (TRUE == flag) |
| { |
| pRangeResolver_ = |
| new CRUDupElimRangeResolver(globals_, ctrlStmtContainer_); |
| pRangeResolver_->LoadRequest(translator); |
| } |
| } |
| // LCOV_EXCL_STOP |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::StoreReply() |
| // |
| // REMOTE before sending to MAIN, pack the statistics |
| //--------------------------------------------------------------------------// |
| // LCOV_EXCL_START :cnu |
| void CRUDupElimTaskExecutor:: |
| StoreReply(CUOFsIpcMessageTranslator &translator) |
| { |
| inherited::StoreReply(translator); |
| |
| BOOL flag; |
| |
| translator.WriteBlock(&phase_, sizeof(Int32)); |
| |
| pLogScanner_->StoreReply(translator); |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| flag = TRUE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| pSingleRowResolver_->StoreReply(translator); |
| } |
| else |
| { |
| flag = FALSE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| } |
| |
| if (NULL != pRangeResolver_) |
| { |
| flag = TRUE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| pRangeResolver_->StoreReply(translator); |
| } |
| else |
| { |
| flag = FALSE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| } |
| } |
| // LCOV_EXCL_STOP |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::LoadReply() |
| // |
| // MAIN after receiving from REMOTE, unpack the statistics |
| //--------------------------------------------------------------------------// |
| // LCOV_EXCL_START :cnu |
| void CRUDupElimTaskExecutor:: |
| LoadReply(CUOFsIpcMessageTranslator &translator) |
| { |
| inherited::LoadReply(translator); |
| |
| BOOL flag; |
| |
| translator.ReadBlock(&phase_, sizeof(Int32)); |
| pLogScanner_->LoadReply(translator); |
| |
| translator.ReadBlock(&flag, sizeof(BOOL)); |
| if (TRUE == flag) |
| { |
| RUASSERT(NULL != pSingleRowResolver_); |
| pSingleRowResolver_->LoadReply(translator); |
| } |
| |
| translator.ReadBlock(&flag, sizeof(BOOL)); |
| if (TRUE == flag) |
| { |
| RUASSERT(NULL != pRangeResolver_); |
| pRangeResolver_->LoadReply(translator); |
| } |
| } |
| // LCOV_EXCL_STOP |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::StoreRequest() |
| // |
| // MAIN before sending to REMOTE, pack the DE globals and the SQL text. |
| // |
| //--------------------------------------------------------------------------// |
| // LCOV_EXCL_START :cnu |
| void CRUDupElimTaskExecutor:: |
| StoreRequest(CUOFsIpcMessageTranslator &translator) |
| { |
| inherited::StoreRequest(translator); |
| |
| BOOL flag; |
| |
| translator.WriteBlock(&txnTimeLimit_, sizeof(TInt32)); |
| |
| globals_.StoreData(translator); |
| ctrlStmtContainer_.StoreData(translator); |
| |
| pLogScanner_->StoreRequest(translator); |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| flag = TRUE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| pSingleRowResolver_->StoreRequest(translator); |
| } |
| else |
| { |
| flag = FALSE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| } |
| |
| if (NULL != pRangeResolver_) |
| { |
| flag = TRUE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| pRangeResolver_->StoreRequest(translator); |
| } |
| else |
| { |
| flag = FALSE; |
| translator.WriteBlock(&flag, sizeof(BOOL)); |
| } |
| |
| translator.SetMessageType(CUOFsIpcMessageTranslator::RU_DE_EXECUTOR); |
| } |
| // LCOV_EXCL_STOP |
| |
| //--------------------------------------------------------------------------// |
| // PRIVATE AREA |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // INITIALIZATION |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::InitGlobals() |
| // |
| // Initialize the globals object (for the future usage by the units). |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::InitGlobals(CRUDupElimTask *pParentTask) |
| { |
| CRUTbl &tbl = pParentTask->GetTable(); |
| |
| BOOL isRangeResolv = pParentTask->IsRangeResolv(); |
| BOOL isSingleRowResolv = pParentTask->IsSingleRowResolv(); |
| |
| RUASSERT(TRUE == isRangeResolv || TRUE == isSingleRowResolv); |
| // Otherwise, the execution can be skipped |
| |
| // The @IGNORE and @UPDATE_BITMAP columns is extracted from the log |
| // only if there is single-row resolution. |
| |
| Lng32 nCtrlColumns = (TRUE == isSingleRowResolv) ? |
| CRUDupElimConst::NUM_IUD_LOG_CONTROL_COLS_EXTEND |
| : |
| CRUDupElimConst::NUM_IUD_LOG_CONTROL_COLS_BASIC; |
| |
| Lng32 updateBmpSize = (TRUE == isSingleRowResolv) ? |
| tbl.GetUpdateBitmapSize() : 0; |
| |
| globals_.Init( |
| isRangeResolv, |
| isSingleRowResolv, |
| updateBmpSize, |
| nCtrlColumns, |
| tbl.GetKeyColumnList().GetCount(), |
| tbl.GetLastDupElimEpoch(), |
| pParentTask->GetBeginEpoch(), |
| pParentTask->GetEndEpoch(), |
| tbl.GetRangeLogType(), |
| tbl.IsLastDEComplete(), |
| pParentTask->IsSkipCrossTypeResoultion() |
| ); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::InitUnits() |
| // |
| // Create all the necessary units. |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::InitUnits(CRUDupElimTask *pParentTask) |
| { |
| CRUTbl &tbl = pParentTask->GetTable(); |
| CRUEmpCheckVector &vec = tbl.GetEmpCheckVector(); |
| TInt32 beginEpoch = globals_.GetBeginEpoch(); |
| |
| pLogScanner_ = new CRUDupElimLogScanner(globals_, ctrlStmtContainer_); |
| pLogScanner_->SetEmpCheckVector(vec); |
| |
| if (TRUE == globals_.IsRangeResolv()) |
| { |
| pRangeResolver_ = |
| new CRUDupElimRangeResolver(globals_, ctrlStmtContainer_); |
| } |
| |
| // Initialize the single-row resolver only if single-row resolution |
| // is enforced and there are single-row records in the log |
| if (TRUE == globals_.IsSingleRowResolv()) |
| { |
| pSingleRowResolver_ = |
| new CRUDupElimSingleRowResolver(globals_, ctrlStmtContainer_); |
| } |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::InitSQL() |
| // |
| // Generate the SQL text for the query (to compute the log's delta) |
| // and the IUD statements (to update the IUD and the range logs). |
| // Store the SQL in the task executor's units. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::InitSQL(CRUDupElimTask *pParentTask) |
| { |
| Int32 i; |
| CRUDupElimSQLComposer comp(pParentTask, globals_); |
| |
| for (i=0; i<CRUDupElimConst::NUM_CONTROL_STMTS; i++) |
| { |
| comp.ComposeControlText(i); |
| #pragma nowarn(1506) // warning elimination |
| ctrlStmtContainer_.SetStatementText(i, comp.GetSQL()); |
| #pragma warn(1506) // warning elimination |
| } |
| |
| for (i=0; i<CRUDupElimConst::NUM_QUERY_STMTS; i++) |
| { |
| comp.ComposeQueryText(i); |
| #pragma nowarn(1506) // warning elimination |
| pLogScanner_->SetStatementText(i, comp.GetSQL()); |
| #pragma warn(1506) // warning elimination |
| } |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| for (i=0; i<CRUDupElimConst::NUM_SINGLE_RESOLV_STMTS; i++) |
| { |
| comp.ComposeSingleRowResolvText(i); |
| #pragma nowarn(1506) // warning elimination |
| pSingleRowResolver_->SetStatementText(i, comp.GetSQL()); |
| #pragma warn(1506) // warning elimination |
| } |
| } |
| |
| if (NULL != pRangeResolver_) |
| { |
| for (i=0; i<CRUDupElimConst::NUM_RNG_RESOLV_STMTS; i++) |
| { |
| comp.ComposeRangeResolvText(i); |
| #pragma nowarn(1506) // warning elimination |
| pRangeResolver_->SetStatementText(i, comp.GetSQL()); |
| #pragma warn(1506) // warning elimination |
| } |
| } |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::InitTxnTimeLimit() |
| // |
| // Compute the maximum duration of a single DE transaction |
| // (1/8 of the system's AUTOABORT timeout value). |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::InitTxnTimeLimit() |
| { |
| // TEMPORARY CODE! MUST BE REPLACED WHEN THE AUTOABORT |
| // IS SUPPORTED IN UOFS! THIS IS THE DEFAULT AUTOABORT VALUE (2 HOURS). |
| txnTimeLimit_ = 2/*hours*/ * 3600/*seconds in hour*/ / 8/*fraction*/; |
| |
| #ifdef _DEBUG |
| CRUOptions &options = CRUGlobals::GetInstance()->GetOptions(); |
| if (NULL |
| != |
| options.FindDebugOption(CRUGlobals::SHRINK_DE_TXN_TIMELIMIT, "")) |
| { |
| // Override the default timeout value |
| txnTimeLimit_ = 0; |
| } |
| #endif |
| } |
| |
| //--------------------------------------------------------------------------// |
| // FINITE-STATE MACHINE |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::PerformPhase() |
| // |
| // Duplicate elimination is designed to be multi-phase. Each phase executes |
| // in a single transaction. |
| // |
| // A single phase of DE scans a subset of the delta, ordered by the |
| // clustering key value, and eliminates the duplicates within this subset. |
| // The next phase starts from the greatest CK value reached by the previous |
| // one. The current phase is finished (and committed) if the whole delta is |
| // scanned, or the transaction's running time exceeds a certain fraction |
| // of the AUTOABORT system limit. |
| // |
| // Typically, the DE's execution must not take more than one transaction. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::PerformPhase() |
| { |
| RUASSERT(GetState() == EX_PERFORM_PHASE && FALSE == IsTransactionOpen()); |
| |
| if (phase_ <= 1) |
| { |
| PrepareSQL(); |
| } |
| |
| BeginTransaction(); |
| InitPhase(); |
| |
| do |
| { |
| // Read the next record from the delta |
| pLogScanner_->Fetch(); |
| |
| // Apply the resolvers |
| Resolve(); |
| } |
| while (FALSE == CanCompletePhase()); |
| |
| pLogScanner_->EndScan(); |
| CommitTransaction(); |
| |
| phase_++; |
| |
| #ifdef _DEBUG |
| char buf[10]; |
| sprintf(buf, "%d", phase_); |
| |
| // Simulate a breakdown upon phase completion |
| TESTPOINT2(CRUGlobals::TESTPOINT151, CDSString(buf)); |
| #endif |
| |
| if (TRUE == pLogScanner_->IsEntireDeltaScanned()) |
| { |
| SetState(EX_EPILOGUE); |
| } |
| else |
| { |
| SetState(EX_PERFORM_PHASE); |
| } |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::ResetDECompleteFlag() |
| // |
| // Set the metadata variable: T.IS_LAST_DE_COMPLETE <-- 'N' |
| // |
| // If Duplicate Elimination fails in the middle after committing a part |
| // of its transactions, its next invocation (that starts with |
| // T.IS_LAST_DE_COMPLETE = 'N') must take this fact into account. |
| // |
| // In this context, the range resolver cannot know for sure which ranges |
| // have been written to the range log in the "new" epochs (i.e., those |
| // greater than T.LAST_DE_EPOCH). See the code of CRURangeResolver for |
| // details/ |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::ResetDECompleteFlag() |
| { |
| RUASSERT(GetState() == EX_START && FALSE == IsTransactionOpen()); |
| |
| CRUDupElimTask *pTask = (CRUDupElimTask *)GetParentTask(); |
| RUASSERT (NULL != pTask); |
| CRUTbl &tbl = pTask->GetTable(); |
| |
| BeginTransaction(); |
| tbl.SetLastDEComplete(FALSE); |
| tbl.SaveMetadata(); |
| CommitTransaction(); |
| |
| SetState(EX_PERFORM_PHASE); |
| |
| TESTPOINT(CRUGlobals::TESTPOINT150); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::FinalMetadataUpdate() |
| // |
| // Set the metadata variables: |
| // T.LAST_DE_EPOCH <-- The greatest scanned epoch |
| // T.IS_LAST_DE_COMPLETE <-- 'Y' |
| // |
| // T.LAST_DE_EPOCH is the last epoch in which all the clustering key values |
| // have been processed by the DE algorithm. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::FinalMetadataUpdate() |
| { |
| RUASSERT( |
| GetState() == EX_EPILOGUE |
| && |
| FALSE == IsTransactionOpen() |
| ); |
| |
| CRUDupElimTask *pTask = (CRUDupElimTask *)GetParentTask(); |
| RUASSERT (NULL != pTask); |
| |
| CRUTbl &tbl = pTask->GetTable(); |
| |
| BeginTransaction(); |
| tbl.SetLastDEComplete(TRUE); |
| |
| // We only update the last DE epoch if there is a continuity in the DE. |
| if (pTask->GetBeginEpoch() <= globals_.GetLastDEEpoch() + 1) |
| { |
| tbl.SetLastDupElimEpoch(pTask->GetEndEpoch()); |
| } |
| |
| tbl.SaveMetadata(); |
| CommitTransaction(); |
| |
| TESTPOINT(CRUGlobals::TESTPOINT152); |
| |
| #ifdef _DEBUG |
| DumpPerformanceStatistics(); |
| #endif |
| |
| SetState(EX_COMPLETE); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // EXECUTION CORE |
| //--------------------------------------------------------------------------// |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::Resolve() |
| // |
| // Resolve() applies the single-row and range resolvers, respectively, |
| // to perform the duplicate elimination decisions and the corresponding |
| // write operations to the IUD and the range logs. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::Resolve() |
| { |
| if (NULL != pRangeResolver_) |
| { |
| pRangeResolver_->Resolve(*pLogScanner_); |
| } |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| // The order is important! The range resolver must work first, |
| // to save work to the single-row resolver. |
| pSingleRowResolver_->Resolve(*pLogScanner_); |
| } |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::InitPhase() |
| // |
| // Reset the units' state (in the phase greater than 0), |
| // and start performing the query. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::InitPhase() |
| { |
| // Start measuring the transaction's time |
| StartTimer(); |
| |
| if (phase_ > 0) |
| { |
| pLogScanner_->Reset(); |
| |
| if (NULL != pRangeResolver_) |
| { |
| pRangeResolver_->Reset(); |
| } |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| pSingleRowResolver_->Reset(); |
| } |
| } |
| |
| // Start executing the query ... |
| pLogScanner_->StartScan(phase_); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::PrepareSQL() |
| // |
| // Prepare the SQL statements required for the next phase. |
| // |
| // The log scanner will not compile the second query (the one with the |
| // lower bound) until it reaches the second phase. |
| // |
| // For the resolvers, however, compiling the IUD statemnts in the lazy |
| // fashion is not applicable. The statement compilation happens in a |
| // different transaction. However, due to the CLI limitations, switching |
| // to a different transaction in the middle of scan corrupts the scan's |
| // context. This is why all the IUD statements must be prepared in advance |
| // (before the first transaction). |
| // |
| // All the compilations are under the CONTROL TABLE * MDAM ON setting, |
| // which allows the MDAM optimization of both IUD and SELECT statements. |
| // |
| //--------------------------------------------------------------------------// |
| |
| void CRUDupElimTaskExecutor::PrepareSQL() |
| { |
| // CONTROL TABLE * MDAM 'ON' |
| CDMPreparedStatement *pStmt = ctrlStmtContainer_.GetPreparedStatement( |
| CRUDupElimConst::FORCE_MDAM_CT |
| ); |
| pStmt->ExecuteUpdate(TRUE/* special syntax*/); |
| pStmt->Close(); |
| |
| switch (phase_) |
| { |
| case 0: |
| { |
| pLogScanner_->PrepareStatement(CRUDupElimConst::PHASE_0_QUERY); |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| pSingleRowResolver_->PrepareSQL(); |
| } |
| |
| if (NULL != pRangeResolver_) |
| { |
| pRangeResolver_->PrepareSQL(); |
| } |
| |
| break; |
| } |
| case 1: |
| { |
| pLogScanner_->PrepareStatement(CRUDupElimConst::PHASE_1_QUERY); |
| break; |
| } |
| |
| default: RUASSERT(FALSE); // Not supposed to be called |
| } |
| |
| // CONTROL TABLE * RESET |
| pStmt = ctrlStmtContainer_.GetPreparedStatement( |
| CRUDupElimConst::RESET_MDAM_CT |
| ); |
| pStmt->ExecuteUpdate(TRUE/* special syntax*/); |
| pStmt->Close(); |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::CanCompletePhase() |
| // |
| // The task executor decides whether to finish the current phase (and |
| // commit the transaction) in collaboration with the execution units. |
| // The phase is over when one of the two condition holds: |
| // (1) End of input - the entire delta is scanned (the common case). |
| // (2) Intermediate cut: |
| // (2.1) Both resolver agree to complete the phase. |
| // (2.2) The execution has taken enough time to justify the commit. |
| // |
| // Note that the order between the checks 2.1 and 2.2 is important, |
| // because the current time check is expensive. That's why it must not |
| // be done too often. |
| // |
| //--------------------------------------------------------------------------// |
| |
| BOOL CRUDupElimTaskExecutor::CanCompletePhase() |
| { |
| if (TRUE == pLogScanner_->IsEntireDeltaScanned()) |
| { |
| return TRUE; // Case #1 |
| } |
| |
| BOOL rangeResolverOK=TRUE, singleRowResolverOK=TRUE; |
| |
| if (NULL != pRangeResolver_) |
| { |
| rangeResolverOK = pRangeResolver_->CanCompletePhase(); |
| } |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| singleRowResolverOK = pSingleRowResolver_->CanCompletePhase(); |
| |
| if (NULL != pRangeResolver_ |
| && |
| TRUE == pRangeResolver_->IsRangeCollectionEmpty()) |
| { |
| // If the range resolver's collection is empty, |
| // it actually does not care. The phase can be completed |
| // if the single-row resolver wants to. |
| rangeResolverOK = TRUE; |
| } |
| } |
| |
| if (FALSE == rangeResolverOK || FALSE == singleRowResolverOK) |
| { |
| return FALSE; // Condition #2.1 violated |
| } |
| |
| // It's okay with the resolvers. Measure the current timestamp. |
| EndTimer(); |
| |
| // Condition #2.2 |
| // The ">=" relation is very important for simulating short |
| // transactions (when the timer duration can take less than 1 second). |
| return (GetTimerDuration() >= txnTimeLimit_) ? TRUE : FALSE; |
| } |
| |
| //--------------------------------------------------------------------------// |
| // CRUDupElimTaskExecutor::DumpPerformanceStatistics() |
| // |
| // Print the number of IUD statements performed by the task. |
| //--------------------------------------------------------------------------// |
| |
| #ifdef _DEBUG |
| // LCOV_EXCL_START :dpm |
| void CRUDupElimTaskExecutor::DumpPerformanceStatistics() |
| { |
| CRUOptions &options = CRUGlobals::GetInstance()->GetOptions(); |
| if (NULL != options.FindDebugOption(CRUGlobals::DUMP_DE_STATISTICS, "")) |
| { |
| char txnStr[10]; |
| sprintf(txnStr, "%d", phase_); |
| |
| CDSString str( |
| "Task " + GetParentTask()->GetTaskName() + |
| " has been completed in "+CDSString(txnStr)+ |
| " transaction(s).\nIUD statements performed by the task:\n"); |
| |
| if (NULL != pSingleRowResolver_) |
| { |
| pSingleRowResolver_->DumpPerformanceStatistics(str); |
| } |
| |
| if (NULL != pRangeResolver_) |
| { |
| pRangeResolver_->DumpPerformanceStatistics(str); |
| } |
| |
| CRUJournal &journal = CRUGlobals::GetInstance()->GetJournal(); |
| journal.LogMessage(str); |
| } |
| } |
| // LCOV_EXCL_STOP |
| #endif |