blob: d2d16d960e105f3d2eda25788e269cb2836db6aa [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: RuFlowController.cpp
* Description: Implementation of class CRUFlowController
*
*
* Created: 05/07/2000
* Language: C++
*
*
*
******************************************************************************
*/
#include "RuFlowController.h"
#include "RuGlobals.h"
#include "RuJournal.h"
#include "RuDependenceGraph.h"
#include "RuRefreshTask.h"
//--------------------------------------------------------------------------//
// Constructor
//--------------------------------------------------------------------------//
CRUFlowController::
CRUFlowController(CRUDependenceGraph &dg,
TInt32 maxParallelism) :
inherited(),
dg_(dg),
maxParallelism_(maxParallelism),
nRunningTasks_(0),
didTaskFailuresHappen_(FALSE)
{}
//--------------------------------------------------------------------------//
// CRUFlowController::HandleRequest()
//
// The main request switch
//--------------------------------------------------------------------------//
void CRUFlowController::HandleRequest(CRURuntimeControllerRqst *pRqst)
{
switch (pRqst->GetType())
{
case CRURuntimeControllerRqst::SCHEDULE:
{
HandleScheduleRqst();
break;
}
case CRURuntimeControllerRqst::FINISH_TASK:
{
HandleFinishTaskRqst(pRqst);
break;
}
default: RUASSERT(FALSE); // illegal request
}
}
//--------------------------------------------------------------------------//
// TASK SCHEDULING
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
// CRUFlowController::HandleScheduleRqst()
//
// Process the SCHEDULE request.
//
// The first request is originated by the utility,
// in order to ignite the execution.
//
// The following ones can be initiated either by the flow
// controller itself (upon task completion), or by the peer
// controller (when there is room for additional parallelism).
//
//--------------------------------------------------------------------------//
void CRUFlowController::HandleScheduleRqst()
{
CRUTask *pScheduledTask = NULL;
RUASSERT(nRunningTasks_ <= maxParallelism_);
if (nRunningTasks_ != maxParallelism_)
{
dg_.Schedule(); // Find a ready task to run
pScheduledTask = dg_.GetScheduledTask();
if (NULL != pScheduledTask)
{
// Initiate the scheduled task's execution ...
nRunningTasks_++;
RouteScheduledTask(pScheduledTask);
return;
}
}
// Cannot schedule a new task (for whatever reason).
// If some tasks are still being executed, await their completion.
if (nRunningTasks_ > 0)
{
GetPeerController()->PostRequest(
new CRURuntimeControllerRqst(CRURuntimeControllerRqst::AWAIT_EVENT)
);
return;
}
// All the tasks complete. Do not post new requests.
// Execution over!
RUASSERT(TRUE == dg_.AreAllTasksComplete());
}
//--------------------------------------------------------------------------//
// CRUFlowController::RouteScheduledTask()
//
// If the scheduled task is doomed, jump directly to the completion
// handler. Otherwise, submit it to the peer controller for execution.
//
//--------------------------------------------------------------------------//
void CRUFlowController::RouteScheduledTask(CRUTask *pScheduledTask)
{
CRURuntimeControllerRqst *pRqst = NULL;
if (0 != pScheduledTask->GetStatus())
{
// Something is wrong with this task, don't pass it to the peer
pRqst = new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::FINISH_TASK,
pScheduledTask
);
this->PostRequest(pRqst);
}
else
{
// Post the new request for task execution to the peer
pRqst = new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::START_TASK,
pScheduledTask
);
GetPeerController()->PostRequest(pRqst);
}
}
//--------------------------------------------------------------------------//
// TASK COMPLETION
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
// CRUFlowController::HandleFinishTaskRqst()
//
// Process the FINISH_TASK request.
//
// This request is generated by the peer controller upon task completion,
// or by myself if a doomed task is scheduled.
//
//--------------------------------------------------------------------------//
void CRUFlowController::HandleFinishTaskRqst(CRURuntimeControllerRqst *pRqst)
{
nRunningTasks_--;
CRUTask &task = pRqst->GetTask();
if (0 != task.GetStatus())
{
if (TRUE == NeedToReportTaskFailure(task))
{
// Raise the flag only if some failures were reported.
didTaskFailuresHappen_ = TRUE;
CRUJournal &journal = CRUGlobals::GetInstance()->GetJournal();
journal.LogError(task.GetErrorDesc());
}
// Propagate the error to the neighbors
NotifyTaskEnvironmentOnFailure(task);
}
// Remove the task from the graph, together with the adjacent edges
dg_.Reduce(&task);
// Initiate the next scheduling event to keep things going
this->PostRequest(
new CRURuntimeControllerRqst(CRURuntimeControllerRqst::SCHEDULE)
);
}
//--------------------------------------------------------------------------//
// CRUFlowController::NotifyTaskEnvironmentOnFailure()
//
// When the task completes the execution with failure
// (for whatever reason), it must notify all its successors
// and predecessors about this fact.
//
//--------------------------------------------------------------------------//
void CRUFlowController::NotifyTaskEnvironmentOnFailure(CRUTask &task)
{
RUASSERT (0 != task.GetStatus());
DSListPosition pos;
CRUTaskList &succList = task.GetTasksThatDependOnMe();
CRUTaskList &predList = task.GetTasksThatIDependOn();
// Notify the successors
pos = succList.GetHeadPosition();
while (NULL != pos)
{
CRUTask *pSuccTask = succList.GetNext(pos);
// No task can complete while the successor is being
RUASSERT(FALSE == pSuccTask->IsRunning());
pSuccTask->HandlePredecessorFailure(task);
}
// Notify the predecessors, if any
pos = predList.GetHeadPosition();
while (NULL != pos)
{
CRUTask *pPredTask = predList.GetNext(pos);
if (FALSE == pPredTask->IsRunning())
{
// Backward error propagation.
// The predecessor may be running
// (I received the error from the other predecessor)
pPredTask->HandleSuccessorFailure(task);
}
}
}
//--------------------------------------------------------------------------//
// CRUFlowController::NeedToReportTaskFailure()
//
// Report about two kinds of failed tasks:
// (1) Refresh tasks (which contain the failed MVs' names).
// (2) Other tasks that have caused errors by themselves (i.e., do not
// relay the other tasks' errors).
//
//--------------------------------------------------------------------------//
BOOL CRUFlowController::NeedToReportTaskFailure(CRUTask &task)
{
Lng32 status = task.GetStatus();
RUASSERT(0 != status);
if (CRUTask::REFRESH == task.GetType())
{
return TRUE;
}
status = -status; // SQL errors are negative
if (IDS_RU_TASK_PREDECESSOR_PROBLEM == status
||
IDS_RU_OBSOLETE_PROBLEM == status)
{
return FALSE;
}
return TRUE;
}