blob: 0d90ddb6d81c15c0a30ca251c1e15dc05ccea86d [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: RuExecController.cpp
* Description: Implementation of class CRUExecController
*
*
* Created: 05/07/2000
* Language: C++
*
*
*
******************************************************************************
*/
#include "RuExecController.h"
#include "RuTaskExecutor.h"
#include "RuGlobals.h"
#include "RuOptions.h"
#include "uofsException.h"
#ifdef NA_LINUX
const char * CRUExecController::taskServerName_ = "tdm_arkutp";
#endif
#ifdef NA_NSK
const char * CRUExecController::taskServerName_ = "mxutp";
#endif
//--------------------------------------------------------------------------//
// Constructor
//--------------------------------------------------------------------------//
CRUExecController::CRUExecController(BOOL useParallelism) :
inherited(),
runningTaskList_(eItemsArentOwned),
processPool_(CRUExecController::taskServerName_,
CUOFsTaskProcess::MXUTP,
CUOFsTaskProcess::OSS,
TRUE), // Launch processes on other CPUs
useParallelism_(useParallelism)
{}
//--------------------------------------------------------------------------//
// CRUExecController::HandleRequest()
//
// The main request switch
//--------------------------------------------------------------------------//
void CRUExecController::HandleRequest(CRURuntimeControllerRqst *pRqst)
{
try
{
switch (pRqst->GetType())
{
case CRURuntimeControllerRqst::START_TASK:
{
HandleStartTaskRqst(pRqst);
break;
}
case CRURuntimeControllerRqst::FINISH_TASK:
{
HandleFinishTaskRqst(pRqst);
break;
}
case CRURuntimeControllerRqst::EXECUTE_TASK_STEP:
{
HandleExecuteTaskStepRqst(pRqst);
break;
}
case CRURuntimeControllerRqst::AWAIT_EVENT:
{
HandleAwaitEventRqst();
break;
}
default: RUASSERT(FALSE); // Illegal request
}
}
catch (CDSException &ex)
{
HandleRequestFailure(pRqst, ex);
return;
}
// LCOV_EXCL_START :rfi
catch (...)
{
// Unknown error happened
CRUException ex; // New exception object
HandleRequestFailure(pRqst, ex);
return;
}
// LCOV_EXCL_STOP
}
//--------------------------------------------------------------------------//
// INDIVIDUAL REQUEST HANDLERS
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
// CRUExecController::HandleStartTaskRqst()
//
// Start the task's execution
// (1) Build the task's executor and integrate the task
// into the controller's data structures.
// (2) Ignite the task's execution by posting the first
// EXECUTE_TASK_STEP request to myself.
//
//--------------------------------------------------------------------------//
void CRUExecController::HandleStartTaskRqst(CRURuntimeControllerRqst *pRqst)
{
CRUTask &task = pRqst->GetTask();
task.SetRunning(TRUE);
runningTaskList_.AddTail(&task);
task.BuildExecutor();
// Initiate the task's execution: post the EXECUTE_TASK_STEP to myself
this->PostRequest(new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::EXECUTE_TASK_STEP, &task));
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleFinishTaskRqst()
//
// Finish the task's execution.
// (1) Remove the task from the controller's data structures.
// (2) Release the task's executor (which can occupy much memory).
// (3) Post the request to the flow controller to process the
// task's finish.
//
//--------------------------------------------------------------------------//
void CRUExecController::HandleFinishTaskRqst(CRURuntimeControllerRqst *pRqst)
{
CRUTask &task = pRqst->GetTask();
task.SetRunning(FALSE);
DSListPosition pos = runningTaskList_.FindTaskPos(task.GetId());
RUASSERT(NULL != pos);
runningTaskList_.RemoveAt(pos);
// If the task was executed remotely,
// de-associate the process that handled it from the task
DeAllocateTaskProcess(task);
task.DeleteExecutor();
// Tell the flow controller to process the task's completion
GetPeerController()->PostRequest(new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::FINISH_TASK, &task));
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleExecuteTaskStepRqst()
//
// Switch between the local and remote execution scenarios.
//--------------------------------------------------------------------------//
void CRUExecController::HandleExecuteTaskStepRqst(CRURuntimeControllerRqst *pRqst)
{
CRUTask &task = pRqst->GetTask();
if (TRUE == useParallelism_
&&
TRUE == task.GetExecutor().IsNextStepRemotelyExecutable())
{
// The next step will be executed in the task process
HandleRemoteTaskStepExecution(task);
}
else
{
// The next step will be executed in arkcmp
HandleLocalTaskStepExecution(task);
}
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleAwaitEventRqst()
//
// No work to do so far. Wait until some I/O from a task process happens.
//
// When listening on the communication channel, ignore the transaction
// completion messages that TMF sends when the T-file is open.
//
//--------------------------------------------------------------------------//
void CRUExecController::HandleAwaitEventRqst()
{
Lng32 pid;
for (;;)
{
try
{
pid = processPool_.ReceiveFromAnyProcess(-1); // An indefinite wait
HandleReturnOfRemoteExecutor(pid);
return;
}
// LCOV_EXCL_START :rfi
catch (CUOFsTaskProcessPool::CUOFsUnknownProcessException &e)
{
// If it's the TMF completion message, then try to listen again
CUOFsTransManager &tm =
CRUGlobals::GetInstance()->GetTransactionManager();
if (tm.GetTMFFileNum() != e.GetFileNum())
{
throw e;
}
}
// LCOV_EXCL_STOP
}
}
//--------------------------------------------------------------------------//
// TASK STEP EXECUTION
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
// CRUExecController::HandleLocalTaskStepExecution()
//
// Execute the next step of execution in the main process.
//
// If the step fails, or the task's execution completes,
// post the FINISH_TASK request to myself.
// Otherwise, post the EXECUTE_TASK_STEP request to myself,
// in order to initiate the next step.
//--------------------------------------------------------------------------//
void CRUExecController::HandleLocalTaskStepExecution(CRUTask &task)
{
CRUTaskExecutor &taskEx = task.GetExecutor();
CRURuntimeControllerRqst *pRqst;
// The step has been completed successfully.
// Check whether the task's execution is complete.
if (CRUTaskExecutor::EX_COMPLETE == taskEx.GetState())
{
// Post a request to myself to handle the task's completion
pRqst = new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::FINISH_TASK,
&task
);
this->PostRequest(pRqst);
return;
}
// If the task's previous step has left it in an
// unfinished transaction - switch back to it
taskEx.SwitchTransContextBack();
// The real action happens here: execute one step
taskEx.Work();
// Post a new request to myself to continue the execution.
pRqst = new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::EXECUTE_TASK_STEP,
&task
);
this->PostRequest(pRqst);
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleRemoteTaskStepExecution()
//
// (1) Submit the task executor's context to the task process
// that has been associated with it.
// (2) Post a new scheduling request to the flow controller.
//
//--------------------------------------------------------------------------//
void CRUExecController::HandleRemoteTaskStepExecution(CRUTask &task)
{
// If the task's previous step has left it
// in an unfinished transaction - switch back to it
task.GetExecutor().SwitchTransContextBack();
// Serialize the task executor's context
// and send it to the task process
ShipWorkToRemoteProcess(task);
// Ask for a new task to perform
GetPeerController()->PostRequest(
new CRURuntimeControllerRqst(CRURuntimeControllerRqst::SCHEDULE)
);
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleRequestFailure()
//--------------------------------------------------------------------------//
void CRUExecController::
HandleRequestFailure(CRURuntimeControllerRqst *pRqst, CDSException &ex)
{
CRURuntimeControllerRqst::Type reqType = pRqst->GetType();
switch (reqType)
{
case CRURuntimeControllerRqst::AWAIT_EVENT:
{
// If this is an AWAIT_EVENT request, this is a severe error.
// The whole utility will fail.
ex.SetError(IDS_RU_AWAITIO_FAILURE);
throw ex;
}
case CRURuntimeControllerRqst::FINISH_TASK:
{
// If this is a FINISH_TASK request, then the error handler
// has failed (obviously, following a software bug).
// Do not queue the FINISH_TASK request anew, because
// this will cause an endless loop. The whole utility will fail.
throw ex;
}
case CRURuntimeControllerRqst::START_TASK:
case CRURuntimeControllerRqst::EXECUTE_TASK_STEP:
{
// Otherwise, the error handling is more focused
HandleTaskFailure(pRqst->GetTask(), ex);
break;
}
default:
{
RUASSERT(FALSE);
}
}
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleTaskFailure()
//
// When a task fails:
// (1) Copy the error message (described in the
// exception object) to the task object.
// (2) If the task was in the middle of transaction,
// abort this transaction.
// (3) Post a FINISH_TASK request to myself.
//
//--------------------------------------------------------------------------//
void CRUExecController::HandleTaskFailure(CRUTask &task, CDSException &ex)
{
// Stack a new error
ex.SetError(IDS_RU_TASK_EX_FAILED);
ex.AddArgument(task.GetTaskName());
// Stamp the task as failed
CDSException &errDesc = task.GetErrorDesc();
errDesc = ex; // Copy the error information
// Release the dangling transaction ...
task.GetExecutor().RollbackTransaction();
// Post a request to myself to handle the task's completion
this->PostRequest(new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::FINISH_TASK, &task));
}
//--------------------------------------------------------------------------//
// CRUExecController::ShipWorkToRemoteProcess()
//
// Serialize the task's context and send it to the process
// associated with this task.
//--------------------------------------------------------------------------//
void CRUExecController::ShipWorkToRemoteProcess(CRUTask &task)
{
CRUTaskExecutor &executor = task.GetExecutor();
if (-1 == executor.GetProcessId())
{
// This is the first time that the executor will be shipped.
// Associate the task with a task process (possibly creating it).
AllocateTaskProcess(task);
// The initial buffer allocation
executor.AllocateBuffer();
}
// Pack the IPC message ...
SerializeTaskExecutor(executor);
// Send !
processPool_[executor.GetProcessId()].Send(executor.GetTranslator());
}
//--------------------------------------------------------------------------//
// CRUExecController::SerializeTaskExecutor()
//
// Store the request in the UOFS buffer. If there is not
// enough memory - try to double the buffer until the
// UOFS message size limit is reached.
//
//--------------------------------------------------------------------------//
void CRUExecController::SerializeTaskExecutor(CRUTaskExecutor &executor)
{
CUOFsIpcMessageTranslator *pTranslator = NULL;
for (;;)
{
pTranslator = &(executor.GetTranslator());
try
{
// Try to serialize the executor until there is enough room
pTranslator->StartWrite();
executor.StoreRequest(*pTranslator);
pTranslator->EndWrite();
break;
}
// LCOV_EXCL_START :rfi
catch (CUOFsBufferOverFlowException &ex)
{
if (pTranslator->GetBufferSize()
>=
CUOFsIpcMessageTranslator::MaxMsgSize)
{
// No more room, propagate the exception
ex.SetError(IDS_RU_REMOTE_EX_IMPOSSIBLE);
ex.AddArgument(executor.GetParentTask()->GetTaskName());
throw ex;
}
// The buffer can be resized up to the maximum
executor.ReAllocateBuffer(2 /*factor*/);
}
// LCOV_EXCL_STOP
}
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleRemoteExecutorCompletion()
//
// The message has been received from the task process
// that was associated with the executor.
// Handle the success and failure cases.
//
//--------------------------------------------------------------------------//
void CRUExecController::HandleReturnOfRemoteExecutor(Lng32 pid)
{
CRUTask *pTask = FindRunningTask(pid);
RUASSERT(NULL != pTask);
CRUTaskExecutor &executor = pTask->GetExecutor();
CUOFsIpcMessageTranslator &translator = executor.GetTranslator();
if (TRUE == translator.IsSystemMessage()
// The task process crashed
||
CUOFsIpcMessageTranslator::APPLICATION_ERROR
==
translator.GetMessageType()
// The task process remained alive but caught an exception
)
{
HandleRemoteExecutorFailure(pTask, pid, translator);
}
else
{
HandleRemoteExecutorSuccess(pTask, translator);
}
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleRemoteExecutorSuccess()
//--------------------------------------------------------------------------//
void CRUExecController::
HandleRemoteExecutorSuccess(CRUTask *pTask,
CUOFsIpcMessageTranslator &translator)
{
CRUTaskExecutor &executor = pTask->GetExecutor();
// De-serialize the executor's context
translator.StartRead();
executor.LoadReply(translator);
translator.EndRead();
// Post a request to myself to continue the task's executor running
CRURuntimeControllerRqst *pRqst =
new CRURuntimeControllerRqst(
CRURuntimeControllerRqst::EXECUTE_TASK_STEP,
pTask
);
this->PostRequest(pRqst);
}
//--------------------------------------------------------------------------//
// CRUExecController::HandleRemoteExecutorFailure()
//--------------------------------------------------------------------------//
// LCOV_EXCL_START :rfi
void CRUExecController::
HandleRemoteExecutorFailure(CRUTask *pTask,
Lng32 pid,
CUOFsIpcMessageTranslator &translator)
{
CRUException ex;
if (TRUE == translator.IsSystemMessage())
{
// The task process has crashed.
// No valuable information can be extracted.
processPool_.HandleSystemMessage(translator, pid);
}
else
{
// De-serialize the exception object from the message
translator.StartRead();
ex.LoadData(translator);
translator.EndRead();
}
ex.SetError(IDS_RU_REMOTE_EXECUTION_FAILURE);
HandleTaskFailure(*pTask, ex);
}
// LCOV_EXCL_STOP
//--------------------------------------------------------------------------//
// GENERAL-PURPOSE METHODS
//--------------------------------------------------------------------------//
//--------------------------------------------------------------------------//
// CRUExecController::FindRunningTask()
//
// Locate a remotely executed task by pid.
//--------------------------------------------------------------------------//
CRUTask *CRUExecController::FindRunningTask(Lng32 pid)
{
CRUTask *pTask = NULL;
DSListPosition pos = runningTaskList_.GetHeadPosition();
while (NULL != pos)
{
pTask = runningTaskList_.GetNext(pos);
if (pTask->GetExecutor().GetProcessId() == pid)
{
break;
}
}
return pTask;
}
//--------------------------------------------------------------------------//
/// CRUExecController::AllocateTaskProcess()
//
// If at least one step of the task will be executed
// in the task process, associate the task with the
// process that it will be executed in.
//
// If the task process does not exist yet - create it.
//
//--------------------------------------------------------------------------//
void CRUExecController::AllocateTaskProcess(CRUTask &task)
{
RUASSERT(TRUE == useParallelism_);
Lng32 pid = processPool_.GetInactiveTaskProcessPid();
if (-1 == pid)
{
// No inactive process, create a new server ...
pid = InitiateTaskProcess();
}
task.GetExecutor().SetProcessId(pid);
processPool_[pid].SetBusy(TRUE);
}
//-------------------------------------------------------------------//
// CRUExecController::InitiateTaskProcess()
//
// Activate a new arkutp server and return its process id.
// The handshake protocol includes sending the serialized
// CRUGlobals object to the server process.
//
//-------------------------------------------------------------------//
Lng32 CRUExecController::InitiateTaskProcess()
{
Lng32 pid = processPool_.LaunchTaskProcess();
char buffer[CRUOptions::PACK_BUFFER_SIZE];
CUOFsIpcMessageTranslator translator(buffer, CRUOptions::PACK_BUFFER_SIZE);
char *parentQid = CRUGlobals::GetInstance()->getParentQid();
short len;
if (parentQid != NULL)
len = (short)strlen(parentQid);
else
len = 0;
// Serialize the utility's command-line options ...
translator.StartWrite();
// Tell him his pid
translator.WriteBlock(&pid,sizeof(Lng32));
translator.WriteBlock(&len,sizeof(short));
if (len > 0)
translator.WriteBlock(parentQid, len);
CRUGlobals::GetInstance()->GetOptions().StoreData(translator);
translator.SetMessageType(CUOFsIpcMessageTranslator::RU_GLOBALS);
translator.EndWrite();
// Send them to the task process ...
processPool_[pid].Send(translator);
// And receive the reply
try
{
processPool_[pid].Receive();
}
catch (CUOFsException &e)
{
// Something wrong has happened during send
processPool_[pid].Shutdown(FALSE /* don't send a message */);
throw e;
}
// Verify that the answer was correct
RUASSERT(
translator.GetMessageType() == CUOFsIpcMessageTranslator::RU_GLOBALS
);
return pid;
}
//-------------------------------------------------------------------//
// CRUExecController::DeAllocateTaskProcess()
//
// De-associate the process from the task that it has been executing.
//-------------------------------------------------------------------//
void CRUExecController::DeAllocateTaskProcess(CRUTask &task)
{
Lng32 pid = task.GetExecutor().GetProcessId();
if (-1 == pid)
{
// The task was executed in the main process
return;
}
processPool_[pid].SetBusy(FALSE);
}
//-------------------------------------------------------------------//
// CRUExecController::ShutDownTaskProcesses()
//-------------------------------------------------------------------//
void CRUExecController::ShutDownTaskProcesses()
{
processPool_.ShutdownAllTaskProcesses();
}