blob: bf2d2275dcda6598443090f9295d210def81d835 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
**********************************************************************/
#ifndef _RU_TASK_EXECUTOR_H_
#define _RU_TASK_EXECUTOR_H_
/* -*-C++-*-
******************************************************************************
*
* File: RuTaskExecutor.h
* Description: Definition of class CRUTaskExecutor.
*
* Created: 8/23/1999
* Language: C++
*
*
******************************************************************************
*/
#include "refresh.h"
#include "dsplatform.h"
#include "dsstring.h"
#include "RuException.h"
#include "RuGlobals.h"
class CUOFsIpcMessageTranslator;
//--------------------------------------------------------------------------//
// CRUTaskExecutor
//
// A generic task executor abstract class.
//
// The classes that derive from this class will implement
// the concrete task functionality.
//
// The generic task executor works as a finite-state machine (FSM).
// Its execution starts from the EX_START state, and ends in the
// EX_COMPLETE state. The intermediate states are up to the
// derived classes. A single execution step (i.e., transition
// between states) is handled by the call to the Work() method,
// which should be provided by the derived classes.
//
// The Work() method may throw exceptions when errors happen. I.e.,
// it does not have to provide its own exception handlers. These
// exceptions will be handled at some upper level.
//
// A single task's step can be either executed under a single
// transaction, or under no transaction at all. The executor
// object will keep its transactional context from the previous
// step. The object will be able to switch back to its saved
// context (which is important because the utility handles
// multiple transactions), and also abort the current transaction
// in the case of failure. Task executors employ the UOFS transaction
// manager for supporting the nowaited transaction API.
//
// For most kinds of tasks, the major part of task execution
// can be transferred to the task process. In this case, there
// will be two copies of the executor object: one on side of the
// the main process (arkcmp), and the other on the side of the
// task process.
//
// Execution handlers on the both process's sides will exchange
// the (partially) serialized contexts of the executor objects.
// The communication is a two-way requester-server IPC. The
// serialization/de-serialization methods are:
// StoreRequest()/LoadRequest()/StoreReply()/LoadReply().
//
//--------------------------------------------------------------------------//
class CRUTask;
class CDMPreparedStatement;
class REFRESH_LIB_CLASS CRUTaskExecutor {
public:
// Common finite automata states
CRUTaskExecutor(CRUTask *pParentTask = NULL);
virtual ~CRUTaskExecutor();
//----------------------------------//
// Accessors
//----------------------------------//
public:
CRUTask *GetParentTask() const
{
return pParentTask_;
}
Lng32 GetState() const
{
return state_;
}
// Get the process ID that I associate with
Lng32 GetProcessId() const
{
return processId_;
}
// Do I want to execute at all ? (Be determined in the Init() function)
BOOL HasWork() const
{
return hasWork_;
}
// We time the execution duration in seconds
TInt32 GetTimerDuration() const
{
RUASSERT(startTimer_ <= endTimer_);
return (TInt32)((endTimer_ - startTimer_) / 1000000);
}
public:
CUOFsIpcMessageTranslator &GetTranslator()
{
RUASSERT (NULL != pIpcTranslator_);
return *pIpcTranslator_;
}
//-- FSM states common for all of the task executors
enum {
// Dummy common finite automata states
EX_START = 1,
EX_COMPLETE = 2,
// States that must execute locally are 100..199
MAIN_STATES_START = 100,
// States that can execute remotely are 200+
REMOTE_STATES_START = 200
};
// Can the next step of the task's execution
// be executed remotely (in a task process)?
BOOL IsNextStepRemotelyExecutable() const
{
return state_ >= REMOTE_STATES_START;
}
BOOL IsInTaskProcess() const
{
return NULL == pParentTask_;
}
//----------------------------------//
// Mutators
//----------------------------------//
public:
//-- Execution mechanism - delegated to the derived classes
// The main FSM switch
virtual void Work() = 0;
// Must be called before calling to Work()
// This function must not be overridden and should be only refined
virtual void Init() = 0;
public:
// These functions serialize/de-serialize the executor's context
// for the message communication with the remote server process
// Used in the main process side
inline virtual void StoreRequest(CUOFsIpcMessageTranslator &translator) = 0;
inline virtual void LoadReply(CUOFsIpcMessageTranslator &translator) = 0;
// Used in the remote process side
inline virtual void LoadRequest(CUOFsIpcMessageTranslator &translator) = 0;
inline virtual void StoreReply(CUOFsIpcMessageTranslator &translator) = 0;
public:
void SetState(Lng32 state)
{
state_ = state;
}
// If we want to skip execution this function is called
void ResetHasWork()
{
hasWork_ = FALSE;
SetState(EX_COMPLETE);
}
void SetProcessId(Lng32 pid)
{
processId_ = pid;
}
void StartTimer()
{
startTimer_ = CRUGlobals::GetCurrentTimestamp();
}
void EndTimer()
{
endTimer_ = CRUGlobals::GetCurrentTimestamp();
}
public:
// Initial IPC buffer allocation
void AllocateBuffer();
// Resize the IPC buffer by the constant factor (if it's not big enough)
void ReAllocateBuffer(Int32 factor);
void ExecuteStatement(
CDMPreparedStatement &stmt,
Lng32 errorCode,
const char *errorArgument = NULL,
BOOL needRowCount = FALSE,
BOOL isQuery = FALSE);
public:
//----------------------------------//
// Transactions handling
//----------------------------------//
virtual void BeginTransaction();
virtual void CommitTransaction();
// If the previous step of execution has did not
// complete a transaction - switch back to it
void SwitchTransContextBack();
// A part of error handling.
// If the executor that failed was in a middle
// of a transaction - try to abort this transaction
// (which might have been already aborted by TMF).
void RollbackTransaction();
// Leave the transaction and enter a transaction nil state
// (no current transaction)
void LeaveTransaction();
// Is there an open transaction
BOOL IsTransactionOpen();
// Get the transaction Id
// associated with the next step of the execution
Lng32 GetTransIdx() const
{
return transIdx_;
}
protected:
// A pure virtual.
// How much memory does the executor need for request serialization?
// Each executor will define its own value.
// If overflow happens, the buffer can be resized (up to the
// maximum message size).
virtual Lng32 GetIpcBufferSize() const = 0;
// ExecuteStatement() callee.
// Default behavior for dealing with execution error
virtual void HandleSqlError(CDSException &ex,
Lng32 errorCode,
const char *errorArgument = NULL);
void LoadData(CUOFsIpcMessageTranslator &translator);
void StoreData(CUOFsIpcMessageTranslator &translator);
private:
void SetTransIdx(Lng32 transIdx)
{
transIdx_ = transIdx;
}
void CreateBufferAndTranslator(Int32 bufsize);
private:
CRUTask *pParentTask_;
BOOL hasWork_;
// The IPC buffer and its manager
char *pIpcBuffer_;
CUOFsIpcMessageTranslator *pIpcTranslator_;
TInt64 startTimer_;
TInt64 endTimer_;
// Transaction currently associated with the task
Lng32 transIdx_;
// These are the only data members that travles between processes
// The current execution state
Lng32 state_;
// The process associated with the task
Lng32 processId_;
};
// LCOV_EXCL_START :cnu
void CRUTaskExecutor::StoreRequest(CUOFsIpcMessageTranslator &translator)
{
StoreData(translator);
}
void CRUTaskExecutor::LoadReply(CUOFsIpcMessageTranslator &translator)
{
LoadData(translator);
}
// Used in the remote process side
void CRUTaskExecutor::LoadRequest(CUOFsIpcMessageTranslator &translator)
{
LoadData(translator);
}
void CRUTaskExecutor::StoreReply(CUOFsIpcMessageTranslator &translator)
{
StoreData(translator);
}
// LCOV_EXCL_STOP
#endif