blob: 7171e9bc1e05a33086f33e81d5f27b52736cd87a [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// File: ssmpIpc.h
// Description: Class declaration for SSCP IPC infrastructure
//
// Created: 5/08/2006
**********************************************************************/
#ifndef _SSMPIPC_H_
#define _SSMPIPC_H_
#include "Platform.h"
#include "Ipc.h"
#include "Collections.h"
#include "rts_msg.h"
#include "ComQueue.h"
#include "SqlStats.h"
#include "CancelBroker.h"
class StatsGlobals ;
class StmtStats ;
class HashQueue ;
class SscpClientMsgStream ;
class SsmpNewIncomingConnectionStream;
class ActiveQueryMgr;
class PendingQueryMgr;
class ExStatisticsArea;
typedef struct ServerId
{
char nodeName_[MAX_SEGMENT_NAME_LEN+1];
short cpuNum_;
} ServerId;
/////////////////////////////////////////////////////////////
// class ExSsmpManager
/////////////////////////////////////////////////////////////
class ExSsmpManager
{
public:
ExSsmpManager(IpcEnvironment *env);
~ExSsmpManager();
IpcServer *getSsmpServer(NAHeap *heap, char *nodeName, short cpuNum, ComDiagsArea *&diagsArea);
IpcEnvironment *getIpcEnvironment() { return env_; }
void removeSsmpServer(char *nodeName, short cpuNum);
void cleanupDeletedSsmpServers();
IpcServerClass *getServerClass() { return ssmpServerClass_; }
private:
IpcEnvironment *env_;
IpcServerClass *ssmpServerClass_;
HashQueue *ssmps_;
NAList<IpcServer *> *deletedSsmps_; // list of ssmp servers to be deleted
}; // ExSsmpManager
enum SuspendOrActivate {
SUSPEND,
ACTIVATE };
/////////////////////////////////////////////////////////////
// class SsmpGlobals
/////////////////////////////////////////////////////////////
class SsmpGlobals
{
public:
SsmpGlobals(NAHeap *ssmpheap, IpcEnvironment *ipcEnv,
StatsGlobals *statsGlobals);
~SsmpGlobals();
NAHeap *getHeap() { return heap_; }
StatsGlobals *getStatsGlobals() { return statsGlobals_; }
void releaseOrphanEntries() {}
ULng32 allocateServers();
IpcServer *allocateServer(char *nodeName, short nodeNameLen, short cpuNum);
void allocateServerOnNextRequest(char *nodeName,
short nodeNameLen,
short cpuNum);
ULng32 deAllocateServer(char *nodeName, short nodeNameLen, short cpuNum );
void work();
Int64 getStatsCollectionInterval() { return statsCollectionInterval_; }
Int64 getStatsMergeTimeout() { return statsTimeout_; }
Long &getSemId() { return semId_; }
IpcEnvironment *getIpcEnv() { return ipcEnv_; }
IpcSetOfConnections getRecipients() { return recipients_; }
void addRecipients(SscpClientMsgStream *msgStream);
NAHeap *getStatsHeap() { return statsHeap_; }
Int32 myCpu() { return myCpu_; }
pid_t myPin() { return myPin_; }
NABoolean getForceMerge() { return forceMerge_; }
Lng32 getNumDeallocatedServers() { return deallocatedSscps_->numEntries(); }
inline NABoolean doingGC () { return doingGC_;}
inline void setDoingGC(NABoolean value) { doingGC_ = value; }
inline Lng32 getNumPendingSscpMessages() { return pendingSscpMessages_->numEntries(); }
inline void finishPendingSscpMessages();
inline void addPendingSscpMessage(SscpClientMsgStream *sscpClientMsgStream)
{
pendingSscpMessages_->insert(sscpClientMsgStream, sizeof(sscpClientMsgStream));
}
void removePendingSscpMessage(SscpClientMsgStream *sscpClientMsgStream);
inline short getStoreSqlLen() { return storeSqlSrcLen_; }
inline short getNumAllocatedServers() {return (short)sscps_->numEntries(); }
inline void incSsmpReqMsg(Int64 msgBytes) { statsGlobals_->incSsmpReqMsg(msgBytes); }
inline void incSsmpReplyMsg(Int64 msgBytes) { statsGlobals_->incSsmpReplyMsg(msgBytes); }
void insertDeallocatedSscp(char *nodeName, short cpuNum);
bool cancelQueryTree(char *queryId, Lng32 queryIdLen,
CancelQueryRequest *request,
ComDiagsArea **diags);
bool cancelQuery(char *queryId, Lng32 queryIdLen,
CancelQueryRequest *request,
ComDiagsArea **diags);
inline ActiveQueryMgr &getActiveQueryMgr() { return activeQueryMgr_; }
inline PendingQueryMgr &getPendingQueryMgr() { return pendingQueryMgr_; }
void cleanupDeletedSscpServers();
bool getQidFromPid( Int32 pid, // IN
Int32 minimumAge, // IN
char *queryId, // OUT
Lng32 &queryIdLen // OUT
);
bool activateFromQid(char *queryId, Lng32 qidLen,
SuspendOrActivate sOrA, // Param is placeholder.
// Someday may handle cancel.
ComDiagsArea *&diags,
bool suspendLogging);
void suspendOrActivate(char *queryId, Lng32 qidLen,
SuspendOrActivate sOrA, bool suspendLogging);
private:
NAHeap *heap_; // pointer to heap for process duration storage
StatsGlobals *statsGlobals_;
IpcEnvironment *ipcEnv_;
IpcServerClass *sscpServerClass_;
HashQueue *sscps_;
NAList<IpcServer *> *deletedSscps_; // list of sscp servers to be deleted
Int64 statsCollectionInterval_;
Int64 statsTimeout_;
IpcSetOfConnections recipients_;
Long semId_;
Int32 myCpu_;
pid_t myPin_;
NAHeap *statsHeap_; // Heap to store merged stats
Queue *deallocatedSscps_;
NABoolean forceMerge_;
NABoolean doingGC_;
Queue *pendingSscpMessages_;
short storeSqlSrcLen_;
ActiveQueryMgr activeQueryMgr_;
PendingQueryMgr pendingQueryMgr_;
}; // SsmpGlobals
class SsmpGuaReceiveControlConnection : public GuaReceiveControlConnection
{
public:
SsmpGuaReceiveControlConnection(
IpcEnvironment *env,
SsmpGlobals *ssmpGlobals,
short receiveDepth = 256) :
GuaReceiveControlConnection(env,
receiveDepth)
{ ssmpGlobals_ = ssmpGlobals; }
virtual void actOnSystemMessage(
short messageNum,
IpcMessageBufferPtr sysMsg,
IpcMessageObjSize sysMsgLen,
short clientFileNumber,
const GuaProcessHandle &clientPhandle,
GuaConnectionToClient *connection);
SsmpGlobals *getSsmpGlobals() { return ssmpGlobals_; }
private:
SsmpGlobals *ssmpGlobals_;
}; // SsmpGuaReceiveControlConnection
// -----------------------------------------------------------------------
// An object that holds a new connection, created by a Guardian open
// system message, until the first application message comes in
// -----------------------------------------------------------------------
class SsmpNewIncomingConnectionStream : public IpcMessageStream
{
public:
SsmpNewIncomingConnectionStream(NAHeap *heap, IpcEnvironment *ipcEnv,
SsmpGlobals *ssmpGlobals)
: IpcMessageStream(ipcEnv,
IPC_MSG_SSMP_REPLY,
CurrSsmpReplyMessageVersion,
#ifndef USE_SB_NEW_RI
RTS_STATS_MSG_BUF_SIZE,
#else
ipcEnv->getGuaMaxMsgIOSize(),
#endif
TRUE)
, sscpDiagsArea_(NULL)
{
ipcEnv_ = ipcEnv;
ssmpGlobals_ = ssmpGlobals;
heap_ = heap;
handle_ = INVALID_RTS_HANDLE;
wmsProcess_ = FALSE;
}
~SsmpNewIncomingConnectionStream();
virtual void actOnSend(IpcConnection *connection);
virtual void actOnSendAllComplete();
virtual void actOnReceive(IpcConnection *connection);
virtual void actOnReceiveAllComplete();
SsmpGlobals *getSsmpGlobals() { return ssmpGlobals_; }
NAHeap *getHeap() { return heap_; }
IpcEnvironment *getIpcEnv() { return ipcEnv_; }
void actOnStatsReq(IpcConnection *connection);
void actOnCpuStatsReq(IpcConnection *connection);
void actOnExplainReq(IpcConnection *connection);
void actOnQueryStartedReq(IpcConnection *connection);
void actOnQueryFinishedReq(IpcConnection *connection);
void actOnCancelQueryReq(IpcConnection *connection);
void actOnSuspendQueryReq(IpcConnection *connection);
void actOnActivateQueryReq(IpcConnection *connection);
void actOnSecInvalidKeyReq(IpcConnection *connection);
void actOnLobLockReq(IpcConnection *connection);
void getProcessStats(short reqType,
short subReqType,
pid_t pid);
void getMergedStats(RtsStatsReq *request,
RtsQueryId *queryId,
StmtStats *stmtStats,
short reqType,
UInt16 statsMergeType);
void sendMergedStats(ExStatisticsArea *mergedStats, short numErrors, short reqType,
StmtStats *stmtStats, NABoolean updateMergeStats);
void sscpIpcError(IpcConnection *conn);
ComDiagsArea *getSscpDiagsArea() { return sscpDiagsArea_; }
void clearSscpDiagsArea() { sscpDiagsArea_->decrRefCount();
sscpDiagsArea_ = NULL; }
inline RtsHandle getHandle() { return handle_; }
inline void setHandle(const RtsHandle h) { handle_ = h; }
inline NABoolean isWmsProcess() { return wmsProcess_; }
inline void setWmsProcess(NABoolean flag) { wmsProcess_ = flag; }
private:
NAHeap *heap_;
IpcEnvironment *ipcEnv_;
SsmpGlobals *ssmpGlobals_;
ComDiagsArea *sscpDiagsArea_;
RtsHandle handle_;
NABoolean wmsProcess_;
}; // SsmpNewIncomingConnectionStream
// -----------------------------------------------------------------------
// The message stream used by the send top node to exchange data with
// the send bottom node via an Ipc connection
// -----------------------------------------------------------------------
class SscpClientMsgStream : public IpcMessageStream
{
public:
// constructor
SscpClientMsgStream(NAHeap *heap, IpcEnvironment *ipcEnv, SsmpGlobals *ssmpGlobals,
SsmpNewIncomingConnectionStream *ssmpStream)
: IpcMessageStream(ipcEnv,
IPC_MSG_SSCP_REQUEST,
CurrSscpRequestMessageVersion,
#ifndef USE_SB_NEW_RI
RTS_STATS_MSG_BUF_SIZE,
#else
ipcEnv->getGuaMaxMsgIOSize(),
#endif
TRUE), // Share the objects,
heap_(heap)
{
ssmpGlobals_ = ssmpGlobals;
mergedStats_ = NULL;
mergeStartTime_ = 0;
numOfClientRequestsSent_ = 0;
numOfErrorRequests_ = 0;
replySent_ = FALSE;
ssmpStream_ = ssmpStream;
numSqlProcs_ = 0;
numCpus_ = 0;
stmtStats_ = NULL;
detailLevel_ = 0;
completionProcessing_ = STATS;
subReqType_ = -1;
}
~SscpClientMsgStream();
// method called upon send complete
virtual void actOnSendAllComplete();
// method called upon receive complete
virtual void actOnReceive(IpcConnection* connection);
virtual void actOnReceiveAllComplete();
void actOnStatsReply(IpcConnection *connection);
void delinkConnection(IpcConnection *conn);
NAHeap *getHeap()
{ return heap_; }
inline Int64 getMergeStartTime() { return mergeStartTime_; }
inline void setMergeStartTime(Int64 startTime) { mergeStartTime_ = startTime; }
ExStatisticsArea *getMergedStats() { return mergedStats_; }
void incNumOfClientRequestsSent() { numOfClientRequestsSent_++; }
Lng32 getNumOfClientRequestsPending() { return numOfClientRequestsSent_; }
Lng32 getNumOfErrorRequests() { return numOfErrorRequests_; }
SsmpGlobals *getSsmpGlobals() { return ssmpGlobals_; }
NABoolean isReplySent() { return replySent_; }
SsmpNewIncomingConnectionStream *getSsmpStream() {return ssmpStream_; }
void setReplySent()
{
replySent_ = TRUE;
mergedStats_ = NULL;
}
void sendMergedStats();
inline short getReqType() { return reqType_; }
inline void setReqType(short reqType) { reqType_ = reqType; }
inline void incNumSqlProcs(short i=1) { numSqlProcs_+= i; }
inline void incNumCpus(short i=1) { numCpus_ += i; }
inline short getNumSqlProcs() { return numSqlProcs_; }
inline short getNumCpus() { return numCpus_; }
inline StmtStats *getStmtStats() { return stmtStats_; }
inline void setStmtStats(StmtStats *stmtStats)
{ stmtStats_ = stmtStats; }
inline void setDetailLevel(short level)
{ detailLevel_ = level; }
inline short getDetailLevel() { return detailLevel_; }
inline void setUsedToSendCbMsgs() { completionProcessing_ = CB; }
inline void setUsedToSendSikMsgs() { completionProcessing_ = SIK; }
inline void setUsedToSendLLMsgs() { completionProcessing_ = LL; }
void replySik();
void replyLL();
inline short getSubReqType() { return subReqType_; }
inline void setSubReqType(short subReqType) { subReqType_ = subReqType; }
private:
NAHeap *heap_;
ExStatisticsArea *mergedStats_;
Int64 mergeStartTime_;
Lng32 numOfClientRequestsSent_;
SsmpGlobals *ssmpGlobals_;
Lng32 numOfErrorRequests_;
NABoolean replySent_;
SsmpNewIncomingConnectionStream *ssmpStream_;
short reqType_;
short numSqlProcs_;
short numCpus_;
StmtStats *stmtStats_;
enum { STATS, CB, SIK,LL} completionProcessing_;
short detailLevel_;
short subReqType_;
};
// -----------------------------------------------------------------------
// The message stream used by the collector in ExStatsTcb or ExExplainTcb
// via an Ipc connection
// -----------------------------------------------------------------------
class SsmpClientMsgStream : public IpcMessageStream
{
public:
// constructor
SsmpClientMsgStream(NAHeap *heap, ExSsmpManager *ssmpManager,
ComDiagsArea *diagsForClient = NULL)
: IpcMessageStream(ssmpManager->getIpcEnvironment(),
IPC_MSG_SSMP_REQUEST,
CurrSsmpRequestMessageVersion,
#ifndef USE_SB_NEW_RI
RTS_STATS_MSG_BUF_SIZE,
#else
ssmpManager->getIpcEnvironment()->getGuaMaxMsgIOSize(),
#endif
TRUE),
heap_(heap)
, ssmpManager_(ssmpManager)
, diagsForClient_(diagsForClient)
{
stats_ = NULL;
replyRecvd_ = FALSE;
rtsQueryId_ = NULL;
numSscpReqFailed_ = 0;
explainFrag_ = NULL;
}
// method called upon send complete
virtual void actOnSend(IpcConnection* connection);
virtual void actOnSendAllComplete();
// method called upon receive complete
virtual void actOnReceive(IpcConnection* connection);
virtual void actOnReceiveAllComplete();
virtual void delinkConnection(IpcConnection *);
void actOnStatsReply(IpcConnection* connection);
void actOnExplainReply(IpcConnection *connection);
void actOnGenericReply();
ExStatisticsArea *getStats() { return stats_; };
NAHeap *getHeap() { return heap_; }
NABoolean isReplyReceived() { return replyRecvd_; }
RtsQueryId *getRtsQueryId() { return rtsQueryId_; }
short getNumSscpReqFailed() { return numSscpReqFailed_; }
RtsExplainFrag *getExplainFrag() { return explainFrag_; }
private:
NAHeap *heap_;
ExStatisticsArea *stats_;
ExSsmpManager *ssmpManager_;
ComDiagsArea *diagsForClient_; // non-null if client wants ipc diags.
NABoolean replyRecvd_;
RtsQueryId *rtsQueryId_;
short numSscpReqFailed_;
RtsExplainFrag *explainFrag_;
};
#endif // _SSMPIPC_H_