blob: e56c7614211dc304405b40e081cd488f40511e52 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// File: ssmpipc.cpp
// Description: Class declaration for SSCP IPC infrastructure
//
// Created: 5/02/2006
**********************************************************************/
#include "Platform.h"
#include "ex_stdh.h"
#include "ssmpipc.h"
#include "ComCextdecs.h"
#include <semaphore.h>
#include "nsk/nskport.h"
#include "zsysc.h"
#include "NAStdlib.h"
#include "Ex_esp_msg.h"
#include "ComQueue.h"
#include "ComRtUtils.h"
#include "ComSqlId.h"
#include "Globals.h"
#include "SqlStats.h"
#include "ex_stdh.h"
#include "ExStats.h"
#include "ComDiags.h"
#include "PortProcessCalls.h"
#include "Statement.h"
#include "ComSqlId.h"
ExSsmpManager::ExSsmpManager(IpcEnvironment *env)
: env_(env)
{
ssmpServerClass_ = new(env->getHeap()) IpcServerClass(
env_,
IPC_SQLSSMP_SERVER,
IPC_USE_PROCESS);
ssmps_ = new (env->getHeap()) HashQueue(env->getHeap(), 16);
deletedSsmps_ = new(env->getHeap()) NAList<IpcServer *>(env->getHeap());
}
ExSsmpManager::~ExSsmpManager()
{
cleanupDeletedSsmpServers();
NADELETE(deletedSsmps_, NAList, env_->getHeap());
ssmps_->position();
NADELETE(ssmps_, HashQueue, env_->getHeap());
if (ssmpServerClass_)
{
NADELETE(ssmpServerClass_, IpcServerClass, env_->getHeap());
}
}
IpcServer *ExSsmpManager::getSsmpServer(NAHeap *heap, char *nodeName, short cpuNum,
ComDiagsArea *&diagsArea)
{
char ssmpProcessName[50];
IpcServer *ssmpServer = NULL;
Int32 processNameLen = 0;
char *tmpProcessName;
tmpProcessName = ssmpServerClass_->getProcessName(nodeName,
(short) str_len(nodeName), cpuNum, ssmpProcessName);
ex_assert(tmpProcessName != NULL, "ProcessName can't be null");
processNameLen = str_len(tmpProcessName);
// Check if we already have this SSMP server
ssmps_->position(tmpProcessName, processNameLen);
ssmpServer = (IpcServer *) ssmps_->getNext();
while (ssmpServer != NULL)
{
if (str_cmp(ssmpServer->castToIpcGuardianServer()->getProcessName(),
tmpProcessName, processNameLen) == 0)
{
GuaConnectionToServer *cbGCTS = ssmpServer->getControlConnection()->castToGuaConnectionToServer();
// We need to keep 2 entries free - To send QueryFinishedMessage and to get the response for query started message
if (cbGCTS->numReceiveCallbacksPending()+2 >= cbGCTS->getNowaitDepth())
{
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(heap);
*diagsArea << DgSqlCode(-2026)
<< DgString0(tmpProcessName)
<< DgInt0(GetCliGlobals()->myCpu())
<< DgInt1(GetCliGlobals()->myPin());
return NULL;
}
return ssmpServer;
}
ssmpServer = (IpcServer *) ssmps_->getNext();
}
// We don't have this SSMP server, so we'll try to allocate one.
ssmpServer = ssmpServerClass_->allocateServerProcess(&diagsArea,
env_->getHeap(),
nodeName,
cpuNum,
IPC_PRIORITY_DONT_CARE,
FALSE);
if (ssmpServer != NULL && ssmpServer->castToIpcGuardianServer()->isReady())
{
tmpProcessName = (char *)ssmpServer->castToIpcGuardianServer()->getProcessName();
ssmps_->insert(tmpProcessName, str_len(tmpProcessName), ssmpServer);
}
else
{
if (ssmpServer != NULL)
{
ssmpServerClass_->freeServerProcess(ssmpServer);
ssmpServer = NULL;
}
}
return ssmpServer;
}
void ExSsmpManager::removeSsmpServer(char *nodeName, short cpuNum)
{
char ssmpProcessName[50];
IpcServer *ssmpServer = NULL;
Int32 processNameLen = 0;
char *tmpProcessName;
tmpProcessName = ssmpServerClass_->getProcessName(nodeName,
(short) str_len(nodeName), cpuNum, ssmpProcessName);
ex_assert(tmpProcessName != NULL, "ProcessName can't be null");
processNameLen = str_len(tmpProcessName);
ssmps_->position(tmpProcessName, processNameLen);
ssmpServer = (IpcServer *)ssmps_->getNext();
while (ssmpServer != NULL)
{
//Only remove the returned ssmpServer if its processName matches the processName
//we passed in.
if (str_cmp(ssmpServer->castToIpcGuardianServer()->getProcessName(),
tmpProcessName, processNameLen) == 0)
{
ssmps_->remove(ssmpServer);
deletedSsmps_->insert(ssmpServer);
break;
}
ssmpServer = (IpcServer *)ssmps_->getNext();
}
}
void ExSsmpManager::cleanupDeletedSsmpServers()
{
IpcServer *ssmp;
while (deletedSsmps_->getFirst(ssmp))
{
ssmpServerClass_->freeServerProcess(ssmp);
}
}
SsmpGlobals::SsmpGlobals(NAHeap *ssmpheap, IpcEnvironment *ipcEnv, StatsGlobals *statsGlobals)
: heap_(ssmpheap),
statsGlobals_(statsGlobals),
ipcEnv_(ipcEnv),
recipients_(ipcEnv_->getAllConnections(),ipcEnv_->getHeap()),
activeQueryMgr_(ipcEnv_, ssmpheap),
pendingQueryMgr_(this, ssmpheap)
{
sscpServerClass_ = NULL;
sscps_ = new (heap_) HashQueue(heap_, 16);
deletedSscps_ = new(heap_) NAList<IpcServer *>(heap_);
#ifdef _DEBUG_RTS
statsCollectionInterval_ = 5 * 60; //in seconds
#else
statsCollectionInterval_ = 30;// in seconds
#endif
char defineName[24+1];
int error;
short mergeInterval, statsTimeout, sqlSrcLen;
//Check to see if the user wants to use a different merge interval (default is 30 seocnds).
//Set this define as follows: ADD DEFINE =_MX_RTS_MERGE_INTERVAL, CLASS DEFAULTS, VOLUME $A.Bnnnnn
//where nnnnn is the interval in seconds.
char *ln_attrValue = getenv("_MX_RTS_MERGE_INTERVAL");
if (ln_attrValue)
{
mergeInterval = atoi(ln_attrValue);
statsCollectionInterval_ = (Int64) MAXOF (mergeInterval, 30);
}
statsTimeout_ = 300; // in Centi-seconds
//Set this define as follows: ADD DEFINE =_MX_RTS_MERGE_TIMEOUT, CLASS DEFAULTS, VOLUME $A.Bnnnnn
//where nnnnn is the max number of queries.
ln_attrValue = getenv("_MX_RTS_MERGE_TIMEOUT");
if (ln_attrValue)
{
statsTimeout = atoi(ln_attrValue);
if (statsTimeout > 1)
statsTimeout_ = statsTimeout;
else
statsTimeout_ = 300;
}
storeSqlSrcLen_ = RMS_STORE_SQL_SOURCE_LEN;
//Set this define as follows: ADD DEFINE =_MX_RTS_SQL_SOURCE_LEN, CLASS DEFAULTS, VOLUME $A.Bnnnnn
//where nnnnn is the length of the source string
ln_attrValue = getenv("_MX_RTS_SQL_SOURCE_LEN");
if (ln_attrValue)
{
sqlSrcLen = atoi(ln_attrValue);
if (sqlSrcLen < 0)
storeSqlSrcLen_ = 0;
else
storeSqlSrcLen_ = sqlSrcLen;
}
CliGlobals *cliGlobals = GetCliGlobals();
char programDir[100];
short processType;
char myNodeName[MAX_SEGMENT_NAME_LEN+1];
Lng32 myNodeNumber;
short myNodeNameLen = MAX_SEGMENT_NAME_LEN;
Int64 myStartTime;
short pri;
char myProcessName[PROCESSNAME_STRING_LEN];
error = statsGlobals_->openStatsSemaphore(semId_);
ex_assert(error == 0, "BINSEM_OPEN returned an error");
if (ComRtGetProgramInfo(programDir, 100, processType,
myCpu_, myPin_,
myNodeNumber, myNodeName, myNodeNameLen, myStartTime, myProcessName))
{
ex_assert(0,"Error in ComRtGetProgramInfo");
}
pri = 0;
error = statsGlobals_->getStatsSemaphore(semId_, myPin_);
NAProcessHandle phandle;
(void)phandle.getmine(statsGlobals->getSsmpProcHandle());
statsGlobals_->setSsmpPid(myPin_);
statsGlobals_->setSsmpPriority(pri);
statsGlobals_->setSsmpTimestamp(myStartTime);
statsGlobals_->setStoreSqlSrcLen(storeSqlSrcLen_);
statsGlobals_->setSsmpProcSemId(semId_);
cliGlobals->setSemId(semId_);
statsHeap_ = (NAHeap *)statsGlobals->getStatsHeap()->allocateHeapMemory(sizeof *statsHeap_);
statsHeap_ = new (statsHeap_, statsGlobals->getStatsHeap())
NAHeap("Process Stats Heap", statsGlobals->getStatsHeap(),
8192,
0);
statsGlobals_->setSscpOpens(0);
statsGlobals_->setSscpDeletedOpens(0);
statsGlobals_->releaseStatsSemaphore(semId_, myPin_);
deallocatedSscps_ = new (heap_) Queue(heap_);
doingGC_ = FALSE;
// Debug code to force merge if the =_MX_SSMP_FORCE_MERGE define
// is specified..
ln_attrValue = getenv("_MX_SSMP_FORCE_MERGE");
if (ln_attrValue)
forceMerge_ = TRUE;
else
forceMerge_ = FALSE;
pendingSscpMessages_ = new (heap_) Queue(heap_);
}
SsmpGlobals::~SsmpGlobals()
{
cleanupDeletedSscpServers();
NADELETE(deletedSscps_, NAList, heap_);
NADELETE(sscps_, HashQueue, heap_);
if (sscpServerClass_ != NULL)
{
NADELETE(sscpServerClass_, IpcServerClass, heap_);
}
sem_close((sem_t *)semId_);
}
ULng32 SsmpGlobals::allocateServers()
{
// Attempt connect to all SSCPs
if (sscpServerClass_ == NULL)
{
Int32 noOfNodes;
Int32 *cpuArray = NULL;
noOfNodes = ComRtGetCPUArray(cpuArray, heap_);
if (noOfNodes == 0)
return 0;
statsGlobals_->setNodesInCluster(noOfNodes);
sscpServerClass_ = new(heap_) IpcServerClass(ipcEnv_, IPC_SQLSSCP_SERVER, IPC_USE_PROCESS);
for (Int32 i = 0 ; i < noOfNodes ; i++)
{
allocateServer(NULL, 0, cpuArray[i]);
}
NADELETEBASIC(cpuArray, heap_);
}
else
{
ServerId *serverId;
IpcServer *server;
deallocatedSscps_->position();
while ((serverId = (ServerId *)deallocatedSscps_->getNext()) != NULL)
{
server = allocateServer(serverId->nodeName_, (short)str_len(serverId->nodeName_),
serverId->cpuNum_);
if (server != NULL)
{
deallocatedSscps_->remove(NULL);
statsGlobals_->setSscpDeletedOpens(getNumDeallocatedServers());
NADELETEBASIC(serverId, heap_);
}
}
}
return sscps_->entries();
}
IpcServer *SsmpGlobals::allocateServer(char *nodeName, short nodeNameLen, short cpuNum)
{
short len;
IpcServer *server;
const char *processName;
ComDiagsArea *diagsArea = NULL;
ServerId serverId;
// No first connection yet
if (sscpServerClass_ == NULL)
return NULL;
serverId.nodeName_[0] = '\0';
serverId.cpuNum_ = cpuNum;
IpcAllocateDiagsArea(diagsArea, heap_);
server = sscpServerClass_->allocateServerProcess(&diagsArea,
heap_,
serverId.nodeName_,
serverId.cpuNum_,
IPC_PRIORITY_DONT_CARE,
1, // espLevel
FALSE);
if (server != NULL && server->castToIpcGuardianServer()->isReady())
{
processName = server->castToIpcGuardianServer()->getProcessName();
sscps_->insert(processName, str_len(processName), server);
statsGlobals_->setSscpOpens(getNumAllocatedServers());
recipients_ += server->getControlConnection()->getId();
}
else
{
if (server != NULL)
{
sscpServerClass_->freeServerProcess(server);
server = NULL;
}
insertDeallocatedSscp(serverId.nodeName_, serverId.cpuNum_);
}
diagsArea->decrRefCount();
return server;
}
void SsmpGlobals::insertDeallocatedSscp(char *nodeName, short cpuNum )
{
ServerId *serverId;
short len = strlen(nodeName);
deallocatedSscps_->position();
while ((serverId = (ServerId *)deallocatedSscps_->getNext()) != NULL)
{
// ServerId already exists in deallocatedSscps_ list and hence don't add again
// But, do delete it if the cpu is Down
if ((str_cmp(serverId->nodeName_, nodeName, len) == 0) && serverId->cpuNum_ == cpuNum)
{
if (!ComRtGetCpuStatus(nodeName, cpuNum))
{
deallocatedSscps_->remove(NULL);
statsGlobals_->setSscpDeletedOpens(getNumDeallocatedServers());
NADELETEBASIC(serverId, heap_);
}
return;
}
}
// If the CPU is DOWN, then we don’t
// want to insert it into deallocateSscps_ until we
// get a CPUUP message later. Because there is no point
// trying to reopen the process until its CPU is up.
if (ComRtGetCpuStatus(nodeName, cpuNum))
{
ServerId *entry = new (heap_) ServerId;
str_cpy_all(entry->nodeName_, nodeName, len);
entry->nodeName_[len] = '\0';
entry->cpuNum_ = cpuNum;
deallocatedSscps_->insert(entry, sizeof(ServerId));
statsGlobals_->setSscpDeletedOpens(getNumDeallocatedServers());
}
}
ULng32 SsmpGlobals::deAllocateServer(char *nodeName, short nodeNameLen, short cpuNum )
{
char sscpProcessName[50];
ServerId serverId;
short len;
IpcServer *sscpServer;
if (sscpServerClass_ == NULL)
return 0;
serverId.nodeName_[0] = '\0';
serverId.cpuNum_ = cpuNum;
char *tmpProcessName;
tmpProcessName = sscpServerClass_->getProcessName(serverId.nodeName_,
(short) str_len(serverId.nodeName_), cpuNum, sscpProcessName);
ex_assert(tmpProcessName != NULL, "ProcessName can't be null");
sscps_->position(tmpProcessName, str_len(tmpProcessName));
while ((sscpServer = (IpcServer *)sscps_->getNext()) != NULL)
{
//Only remove the returned sscpServer if its processName matches the processName
//we passed in.
if (str_cmp(sscpServer->castToIpcGuardianServer()->getProcessName(),
tmpProcessName, str_len(tmpProcessName)) == 0)
break;
}
if (sscpServer != NULL)
{
// Remove the sscpServer if it is in the list, i.e., it has not already
// been removed.
sscps_->remove(sscpServer);
deletedSscps_->insert(sscpServer);
statsGlobals_->setSscpOpens(getNumAllocatedServers());
insertDeallocatedSscp(serverId.nodeName_, serverId.cpuNum_);
}
return sscps_->entries();
}
void SsmpGlobals::cleanupDeletedSscpServers()
{
NAList<IpcServer *> notReadyToCleanup(heap_);
IpcServer *sscp;
while (deletedSscps_->getFirst(sscp))
{
IpcConnection *conn = sscp->getControlConnection();
if (conn->numQueuedSendMessages() ||
conn->numQueuedReceiveMessages() ||
conn->numReceiveCallbacksPending() ||
conn->hasActiveIOs())
{
notReadyToCleanup.insert(sscp);
}
else
{
sscpServerClass_->freeServerProcess(sscp);
}
}
deletedSscps_->insert(notReadyToCleanup);
}
void SsmpGlobals::allocateServerOnNextRequest(char *nodeName,
short nodeNameLen,
short cpuNum)
{
char sscpProcessName[50];
ServerId serverId;
short len;
IpcServer *sscpServer;
if (sscpServerClass_ == NULL)
return;
serverId.nodeName_[0] = '\0';
serverId.cpuNum_ = cpuNum;
// Next, the code will do an integrity check, to make sure that
// this server isn't already allocated. If we fail the test,
// we will pretend that we got a NodeDown and NodeUp message and
// issue an EMS event.
char *tmpProcessName;
tmpProcessName = sscpServerClass_->getProcessName(serverId.nodeName_,
(short) str_len(serverId.nodeName_), cpuNum, sscpProcessName);
ex_assert(tmpProcessName != NULL, "ProcessName can't be null");
sscps_->position(tmpProcessName, str_len(tmpProcessName));
while ((sscpServer = (IpcServer *)sscps_->getNext()) != NULL)
{
if (str_cmp(sscpServer->castToIpcGuardianServer()->getProcessName(),
tmpProcessName, str_len(tmpProcessName)) == 0)
break;
}
if (sscpServer != NULL)
{
// We don't seem to have gotten a Node Down message.
// Pretend we got one and do the Node Down message processing.
// Note that deAllocateServer will execute insertDeallocatedSscp.
deAllocateServer(nodeName, nodeNameLen, cpuNum);
// Issue an EMS event to help track occurrences of this phenomenon
char msg[100];
str_sprintf(msg,"Node UP received before Node DOWN for nid: %d", cpuNum);
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msg, 0);
}
else
{
// By placing the server into the deallocatedSscps_, we ensure that
// the next time we need to send a request to the server for stats,
// we will first allocate it.
insertDeallocatedSscp(serverId.nodeName_, serverId.cpuNum_);
}
return;
}
static Int64 GcInterval = -1;
static Int64 SikGcInterval = -1;
void SsmpGlobals::work()
{
getIpcEnv()->getAllConnections()->waitOnAll(getStatsMergeTimeout());
finishPendingSscpMessages();
// Cleanup IpcEnvironment
cleanupDeletedSscpServers();
getIpcEnv()->deleteCompletedMessages();
//Perform cancel escalation, as needed.
pendingQueryMgr_.killPendingCanceled();
StatsGlobals *statsGlobals = getStatsGlobals();
statsGlobals->cleanupDanglingSemaphore(TRUE);
// Every time we get here either because we timed out or because a request came in, check whether
// any GC needs to be done to deallocate space for the master executor for any final stats. If
// it has been at least 20 minutes since the last time we did GC, and it has been at least 15
// minutes since a collector has requested stats for a query, and the master stats for that
// query has its canBeGCed flag set to true, we'll clean it up.
Int64 currTime = NA_JulianTimestamp();
Int64 temp2 = (Int64)(currTime - statsGlobals->getLastGCTime());
if (GcInterval < 0)
{
// call getenv once per process
char *sct = getenv("RMS_GC_INTERVAL_SECONDS");
if (sct)
{
GcInterval = ((Int64) str_atoi(sct, str_len(sct))) * 1000 * 1000;
if (GcInterval < 10*1000*1000)
GcInterval = 10*1000*1000;
}
else
GcInterval = 10 * 60 * 1000 * 1000; // 10 minutes
}
if (SikGcInterval < 0)
{
// Note: If you change this logic see also the logic to update
// siKeyGCinterval_ in optimizer/opt.cpp.
// call getenv once per process
char *sct = getenv("RMS_SIK_GC_INTERVAL_SECONDS");
if (sct)
{
SikGcInterval = ((Int64) str_atoi(sct, str_len(sct))) * 1000 * 1000;
if (SikGcInterval < 10*1000*1000)
SikGcInterval = 10*1000*1000;
}
else
SikGcInterval = (Int64)24 * 60 * 60 * 1000 * 1000; // 24 hours
}
if ((temp2 > GcInterval) && !doingGC())
{
// It's been more than 20 minutes since we did a full GC. Do it again now
// and update the time of last GC.
statsGlobals->checkForDeadProcesses(myPin_);
setDoingGC(TRUE);
int error = statsGlobals->getStatsSemaphore(semId_, myPin_);
statsGlobals->doFullGC();
statsGlobals->setLastGCTime(NA_JulianTimestamp());
statsGlobals->cleanupOldSikeys(SikGcInterval);
setDoingGC(FALSE);
statsGlobals->releaseStatsSemaphore(semId_, myPin_);
}
}
void SsmpGlobals::addRecipients(SscpClientMsgStream *msgStream)
{
IpcServer *server;
sscps_->position();
while ((server = (IpcServer *)sscps_->getNext()) != NULL)
{
msgStream->addRecipient(server->getControlConnection());
msgStream->incNumOfClientRequestsSent();
}
}
void SsmpGlobals::finishPendingSscpMessages()
{
SscpClientMsgStream *sscpClientMsgStream;
if (pendingSscpMessages_->numEntries() == 0)
return;
pendingSscpMessages_->position();
Int64 currTimestamp = NA_JulianTimestamp();
while ((sscpClientMsgStream = (SscpClientMsgStream *)pendingSscpMessages_->getNext()) != NULL)
{
if ((currTimestamp - sscpClientMsgStream->getMergeStartTime()) >= (getStatsMergeTimeout() * 1000))
{
sscpClientMsgStream->sendMergedStats();
pendingSscpMessages_->remove(NULL);
}
}
}
void SsmpGlobals::removePendingSscpMessage(SscpClientMsgStream *sscpClientMsgStream)
{
SscpClientMsgStream *lcSscpClientMsgStream;
pendingSscpMessages_->position();
while ((lcSscpClientMsgStream = (SscpClientMsgStream *)pendingSscpMessages_->getNext()) != NULL)
{
if (lcSscpClientMsgStream == sscpClientMsgStream)
{
pendingSscpMessages_->remove(NULL);
break;
}
}
}
bool SsmpGlobals::getQidFromPid( Int32 pid, // IN
Int32 minimumAge, // IN
char *queryId, // OUT
Lng32 &queryIdLen // OUT
)
{
bool foundQid = false;
StatsGlobals *statsGlobals = getStatsGlobals();
short error =
statsGlobals->getStatsSemaphore(getSemId(), myPin());
SyncHashQueue *ssList = statsGlobals->getStmtStatsList();
ssList->position();
StmtStats *ss = (StmtStats *)ssList->getNext();
while (ss != NULL)
{
if (ss->getPid() == pid &&
ss->getMasterStats() &&
(ss->getMasterStats()->timeSinceBlocking(minimumAge) > 0))
{
bool finishedSearch = true;
ExMasterStats *m = ss->getMasterStats();
char *parentQid = m->getParentQid();
Lng32 parentQidLen = m->getParentQidLen();
if (parentQid != NULL)
{
// If this query has a parent in this cpu, will keep looking.
Int64 parentCpu = -1;
ComSqlId::getSqlQueryIdAttr(ComSqlId::SQLQUERYID_CPUNUM,
parentQid, parentQidLen, parentCpu, NULL);
if (parentCpu == statsGlobals->getCpu())
{
finishedSearch = false;
}
else
; // Even tho this query has a parent, it is not
// executing on the same node as this query's
// process, so following the semantics of
// cancel-by-pid, we will not attempt to
// cancel the parent.
}
if (finishedSearch)
{
queryIdLen = ss->getQueryIdLen();
str_cpy_all(queryId, ss->getQueryId(), queryIdLen);
foundQid = true;
break;
}
}
ss = (StmtStats *)ssList->getNext();
}
statsGlobals->releaseStatsSemaphore(getSemId(), myPin());
return foundQid;
}
bool SsmpGlobals::cancelQueryTree(char *queryId, Lng32 queryIdLen,
CancelQueryRequest *request,
ComDiagsArea **diags)
{
bool didCancel = false;
bool hasChildQid = false;
char childQid[ComSqlId::MAX_QUERY_ID_LEN + 1];
Lng32 childQidLen = 0;
StatsGlobals *statsGlobals = getStatsGlobals();
int error =
statsGlobals->getStatsSemaphore(getSemId(), myPin());
StmtStats *cqStmtStats = statsGlobals->getMasterStmtStats(
queryId, queryIdLen,
RtsQueryId::ANY_QUERY_);
ExMasterStats *m = cqStmtStats ? cqStmtStats->getMasterStats(): NULL;
if (m && m->getChildQid())
{
hasChildQid = true;
str_cpy_all(childQid, m->getChildQid(), m->getChildQidLen());
childQid[m->getChildQidLen()] = '\0';
childQidLen = m->getChildQidLen();
}
statsGlobals->releaseStatsSemaphore(getSemId(), myPin());
if (cqStmtStats == NULL)
{
; // race condition
return false;
}
if (hasChildQid)
{
if (request->getCancelLogging())
{
char thisQid[ComSqlId::MAX_QUERY_ID_LEN + 1];
str_cpy_all(thisQid, queryId, queryIdLen);
thisQid[queryIdLen] = '\0';
char msg[120 + // the constant text
ComSqlId::MAX_QUERY_ID_LEN +
ComSqlId::MAX_QUERY_ID_LEN ];
str_sprintf(msg,
"cancelQueryTree for %s , found a child qid %s",
thisQid, childQid);
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msg, 0);
}
didCancel = cancelQueryTree(childQid,childQidLen, request, diags);
}
if (cancelQuery(queryId, queryIdLen, request, diags))
didCancel = true;
return didCancel;
}
bool SsmpGlobals::cancelQuery(char *queryId, Lng32 queryIdLen,
CancelQueryRequest *request,
ComDiagsArea **diags)
{
bool didAttemptCancel = false;
Int64 cancelStartTime = request->getCancelStartTime ();
Int32 ceFirstInterval = request->getFirstEscalationInterval();
Int32 ceSecondInterval = request->getSecondEscalationInterval();
NABoolean ceSaveabend = request->getCancelEscalationSaveabend();
bool cancelLogging = request->getCancelLogging();
short sqlErrorCode = 0;
const char *sqlErrorDesc = NULL;
StatsGlobals *statsGlobals = getStatsGlobals();
int error;
char tempQid[ComSqlId::MAX_QUERY_ID_LEN+1];
ActiveQueryEntry * aq = (queryId ? getActiveQueryMgr().getActiveQuery(
queryId, queryIdLen) : NULL);
if (aq == NULL)
{
error = statsGlobals->getStatsSemaphore(getSemId(), myPin());
StmtStats *cqStmtStats = statsGlobals->getMasterStmtStats(
queryId, queryIdLen,
RtsQueryId::ANY_QUERY_);
if (cqStmtStats == NULL)
sqlErrorCode = -EXE_CANCEL_QID_NOT_FOUND;
else
{
ExMasterStats * cMasterStats = cqStmtStats->getMasterStats();
if (cMasterStats)
{
Statement::State stmtState = (Statement::State)cMasterStats->getState();
if (stmtState != Statement::OPEN_ &&
stmtState != Statement::FETCH_ &&
stmtState != Statement::STMT_EXECUTE_)
{
sqlErrorCode = -EXE_CANCEL_NOT_POSSIBLE;
sqlErrorDesc = "The query is not in OPEN or FETCH or EXECUTE state";
}
else
{
sqlErrorCode = -EXE_CANCEL_NOT_POSSIBLE;
sqlErrorDesc = "The query is not registered with the cancel broker";
}
}
else
{
sqlErrorCode = -EXE_CANCEL_NOT_POSSIBLE;
sqlErrorDesc = "The query state is not known";
}
}
statsGlobals->releaseStatsSemaphore(getSemId(), myPin());
}
else
if (aq && (aq->getQueryStartTime() <= cancelStartTime))
{
didAttemptCancel = true;
// Make sure query is activated first. If it is already activated,
// some error conditions will be raised. Ignore these.
ComDiagsArea *unimportantDiags = NULL;
activateFromQid(queryId, queryIdLen,
ACTIVATE, unimportantDiags, cancelLogging);
if (unimportantDiags)
unimportantDiags->decrRefCount();
StatsGlobals *statsGlobals = getStatsGlobals();
int error = statsGlobals->getStatsSemaphore(getSemId(), myPin());
StmtStats *cqStmtStats = statsGlobals->getMasterStmtStats(
queryId, queryIdLen,
RtsQueryId::ANY_QUERY_);
if (cqStmtStats == NULL)
{
; // race condition - query is gone, but we haven't cleanup up
// active query entry yet.
}
else
{
ExMasterStats * cMasterStats = cqStmtStats->getMasterStats();
if (cMasterStats)
{
cMasterStats->setCanceledTime(JULIANTIMESTAMP());
cMasterStats->setCancelComment(request->getComment());
}
}
statsGlobals->releaseStatsSemaphore(getSemId(), myPin());
// Set up for escalation later.
if ((ceFirstInterval != 0) || (ceSecondInterval != 0))
{
getPendingQueryMgr().addPendingQuery(aq,
ceFirstInterval, ceSecondInterval, ceSaveabend, cancelLogging);
}
// This call makes the reply to the Query Started message.
getActiveQueryMgr().rmActiveQuery(
queryId, queryIdLen, getHeap(),
CB_CANCEL, cancelLogging);
}
else
{
sqlErrorCode = -EXE_CANCEL_NOT_POSSIBLE;
sqlErrorDesc = "You tried to cancel the subsequent execution of the query";
}
ComDiagsArea *lcDiags = NULL;
if (sqlErrorCode != 0)
{
str_cpy_all(tempQid, queryId, queryIdLen);
tempQid[queryIdLen] = '\0';
lcDiags = ComDiagsArea::allocate(getHeap());
if (sqlErrorDesc != NULL)
{
*lcDiags << DgSqlCode(sqlErrorCode)
<< DgString0(tempQid)
<< DgString1(sqlErrorDesc);
}
else
*lcDiags << DgSqlCode(sqlErrorCode)
<< DgString0(tempQid);
}
*diags = lcDiags;
return didAttemptCancel;
}
void SsmpGlobals::suspendOrActivate(
char *queryId, Lng32 qidLen, SuspendOrActivate sOrA,
bool suspendLogging)
{
allocateServers();
SscpClientMsgStream *sscpMsgStream = new (getHeap())
SscpClientMsgStream((NAHeap *)getHeap(), getIpcEnv(), this, NULL);
sscpMsgStream->setUsedToSendCbMsgs();
addRecipients(sscpMsgStream);
sscpMsgStream->clearAllObjects();
SuspendActivateServersRequest *requestForSscp = new (getHeap())
SuspendActivateServersRequest((RtsHandle) sscpMsgStream,
getHeap(),
(sOrA == SUSPEND), suspendLogging
);
*sscpMsgStream << *requestForSscp;
RtsQueryId *qidObjForSscp = new (getHeap())
RtsQueryId( getHeap(), queryId, qidLen);
*sscpMsgStream << *qidObjForSscp;
// Send the Message to all
// Do not do this? ssmpGlobals_->addPendingSscpMessage(sscpMsgStream);
sscpMsgStream->send(FALSE);
requestForSscp->decrRefCount();
qidObjForSscp->decrRefCount();
}
bool SsmpGlobals::activateFromQid(
char *qid, Lng32 qidLen,
SuspendOrActivate /*
sOrAOrC */,
ComDiagsArea *&diags,
bool suspendLogging)
{
bool doAttemptActivate = true;
// Find the query.
StatsGlobals *statsGlobals = getStatsGlobals();
int error = statsGlobals->getStatsSemaphore(getSemId(), myPin());
SyncHashQueue *stmtStatsList = statsGlobals->getStmtStatsList();
stmtStatsList->position(qid, qidLen);
StmtStats *kqStmtStats = NULL;
ExMasterStats *masterStats = NULL;
while (NULL != (kqStmtStats = (StmtStats *)stmtStatsList->getNext()))
{
if (str_cmp(kqStmtStats->getQueryId(), qid, qidLen) == 0)
{
// Can control only queries which are executing.
masterStats = kqStmtStats->getMasterStats();
if ( masterStats &&
(masterStats->getExeStartTime() != -1) &&
(masterStats->getExeEndTime() == -1) )
break;
}
}
if (!masterStats)
{
doAttemptActivate = false;
if (diags == NULL)
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_RTS_QID_NOT_FOUND)
<< DgString0(qid);
}
else if (!masterStats->isQuerySuspended())
{
doAttemptActivate = false;
if (diags == NULL)
diags = ComDiagsArea::allocate(getHeap());
if (!masterStats->isReadyToSuspend())
*diags << DgSqlCode(-EXE_SUSPEND_QID_NOT_ACTIVE);
else
*diags << DgSqlCode(-EXE_SUSPEND_NOT_SUSPENDED);
}
if (doAttemptActivate)
{
suspendOrActivate(qid, qidLen, ACTIVATE, suspendLogging);
masterStats->setQuerySuspended(false);
}
statsGlobals->releaseStatsSemaphore(getSemId(), myPin());
if (doAttemptActivate && suspendLogging)
{
char msg[80 + // the constant text
ComSqlId::MAX_QUERY_ID_LEN];
str_sprintf(msg,
"MXSSMP has processed a request to reactivate query %s.",
qid);
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msg, 0);
}
return doAttemptActivate;
}
void SsmpGuaReceiveControlConnection::actOnSystemMessage(
short messageNum,
IpcMessageBufferPtr sysMsg,
IpcMessageObjSize sysMsgLen,
short clientFileNumber,
const GuaProcessHandle &clientPhandle,
GuaConnectionToClient *connection)
{
switch (messageNum)
{
case ZSYS_VAL_SMSG_OPEN:
{
SsmpNewIncomingConnectionStream *newStream = new(getEnv()->getHeap())
SsmpNewIncomingConnectionStream((NAHeap *)getEnv()->getHeap(),
getEnv(),getSsmpGlobals());
ex_assert(connection != NULL,"Must create connection for open sys msg");
newStream->addRecipient(connection);
newStream->receive(FALSE);
}
initialized_ = TRUE;
break;
case ZSYS_VAL_SMSG_CLOSE:
ssmpGlobals_->getActiveQueryMgr().clientIsGone(clientPhandle,
clientFileNumber);
ssmpGlobals_->getPendingQueryMgr().clientIsGone(clientPhandle,
clientFileNumber);
break;
case ZSYS_VAL_SMSG_PROCDEATH:
{
zsys_ddl_smsg_procdeath_def *msg =
(zsys_ddl_smsg_procdeath_def *) sysMsg;
SB_Phandle_Type *phandle = (SB_Phandle_Type *)&msg->z_phandle;
Int32 cpu;
pid_t pid;
SB_Int64_Type seqNum = 0;
if (XZFIL_ERR_OK == XPROCESSHANDLE_DECOMPOSE_(
phandle, &cpu, &pid
, NULL // nodeNumber
, NULL // nodeName
, 0 // nodeNameLen input
, NULL // nodeNameLen output
, NULL // processName
, 0 // processNameLen input
, NULL // processNameLen output
, &seqNum
))
{
if (cpu == ssmpGlobals_->myCpu())
ssmpGlobals_->getStatsGlobals()->verifyAndCleanup(pid, seqNum);
}
}
break;
case ZSYS_VAL_SMSG_CPUDOWN:
{
zsys_ddl_smsg_cpudown_def *msg =
(zsys_ddl_smsg_cpudown_def *) sysMsg;
#ifdef _DEBUG
cout << "Cpu Down received for NULL " << msg->z_cpunumber << endl;
#endif
ssmpGlobals_->deAllocateServer(NULL, 0, msg->z_cpunumber);
}
break;
case ZSYS_VAL_SMSG_REMOTECPUDOWN:
{
zsys_ddl_smsg_remotecpudown_def *msg =
(zsys_ddl_smsg_remotecpudown_def *) sysMsg;
#ifdef _DEBUG
char la_nodename[msg->z_nodename_len + 1];
memcpy(la_nodename, msg->z_nodename, (size_t) msg->z_nodename_len);
la_nodename[msg->z_nodename_len] = '\0';
cout << "Remote CPU DOWN received for " << la_nodename << " " << msg->z_cpunumber << endl;
#endif
ssmpGlobals_->deAllocateServer(msg->z_nodename, msg->z_nodename_len, msg->z_cpunumber);
}
break;
case ZSYS_VAL_SMSG_CPUUP:
{
zsys_ddl_smsg_cpuup_def *msg =
(zsys_ddl_smsg_cpuup_def *) sysMsg;
#ifdef _DEBUG
cout << "CPU UP received for NULL " << msg->z_cpunumber << endl;
#endif
ssmpGlobals_->allocateServerOnNextRequest(NULL, 0, msg->z_cpunumber);
}
break;
case ZSYS_VAL_SMSG_REMOTECPUUP:
{
zsys_ddl_smsg_remotecpuup_def *msg =
(zsys_ddl_smsg_remotecpuup_def *) sysMsg;
#ifdef _DEBUG
char la_nodename[msg->z_nodename_len + 1];
memcpy( la_nodename, msg->z_nodename, (size_t) msg->z_nodename_len );
la_nodename[msg->z_nodename_len] = '\0';
cout << "Remote CPU UP received for " << la_nodename << " " << msg->z_cpunumber;
#endif
ssmpGlobals_->allocateServerOnNextRequest(msg->z_nodename,
msg->z_nodename_len, msg->z_cpunumber);
}
break;
case ZSYS_VAL_SMSG_NODEUP:
break;
case ZSYS_VAL_SMSG_NODEDOWN:
ssmpGlobals_->releaseOrphanEntries();
break;
case XZSYS_VAL_SMSG_SHUTDOWN:
NAExit(0);
break;
default:
// do nothing for all other kinds of system messages
break;
} // switch
}
SsmpNewIncomingConnectionStream::~SsmpNewIncomingConnectionStream()
{
if (sscpDiagsArea_)
{
sscpDiagsArea_->decrRefCount();
sscpDiagsArea_ = NULL;
}
}
void SsmpNewIncomingConnectionStream::actOnSend(IpcConnection *connection)
{
// check for OS errors
if (connection->getErrorInfo() == 0)
ssmpGlobals_->incSsmpReplyMsg(connection->getLastSentMsg()->getMessageLength());
}
void SsmpNewIncomingConnectionStream::actOnSendAllComplete()
{
// Wait for the next request for the same stream
clearAllObjects();
receive(FALSE);
}
void SsmpNewIncomingConnectionStream::actOnReceive(IpcConnection *connection)
{
// check for OS errors
if (connection->getErrorInfo() != 0)
return;
ssmpGlobals_->incSsmpReqMsg(connection->getLastReceivedMsg()->getMessageLength());
// take a look at the type of the first object in the message
switch(getNextObjType())
{
case RTS_MSG_STATS_REQ:
actOnStatsReq(connection);
break;
case RTS_MSG_CPU_STATS_REQ:
actOnCpuStatsReq(connection);
break;
case RTS_MSG_EXPLAIN_REQ:
actOnExplainReq(connection);
break;
case CANCEL_QUERY_STARTED_REQ:
actOnQueryStartedReq(connection);
break;
case CANCEL_QUERY_FINISHED_REQ:
actOnQueryFinishedReq(connection);
break;
case CANCEL_QUERY_REQ:
actOnCancelQueryReq(connection);
break;
case SUSPEND_QUERY_REQ:
actOnSuspendQueryReq(connection);
break;
case ACTIVATE_QUERY_REQ:
actOnActivateQueryReq(connection);
break;
case SECURITY_INVALID_KEY_REQ:
actOnSecInvalidKeyReq(connection);
break;
case LOB_LOCK_REQ:
actOnLobLockReq(connection);
break;
default:
ex_assert(FALSE,"Invalid request from client");
}
}
void SsmpNewIncomingConnectionStream::actOnReceiveAllComplete()
{
if (getState() == ERROR_STATE)
addToCompletedList();
}
void SsmpNewIncomingConnectionStream::actOnQueryStartedReq(IpcConnection *connection)
{
IpcMessageObjVersion msgVer;
msgVer = getNextObjVersion();
ex_assert(msgVer <= currRtsStatsReqVersionNumber, "Up-rev message received.");
QueryStarted *request = new (getHeap())
QueryStarted(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case RTS_QUERY_ID:
{
RtsQueryId *queryId = new (getHeap())
RtsQueryId(getHeap());
// Get the query Id from IPC
*this >> *queryId;
clearAllObjects();
ssmpGlobals_->getActiveQueryMgr().addActiveQuery(
queryId->getQid(), queryId->getQueryIdLen(),
request->getStartTime(), request->getMasterPhandle(),
request->getExecutionCount(), this, connection);
request->decrRefCount();
queryId->decrRefCount();
}
break;
default:
ex_assert(0, "something besides an RTS_QUERY_ID followed QueryStarted");
break;
}
}
else
ex_assert(0, "expected an RTS_QUERY_ID following a QueryStarted");
// start another receive operation for the next request
receive(FALSE);
return;
}
void SsmpNewIncomingConnectionStream::actOnQueryFinishedReq(
IpcConnection *connection)
{
ex_assert(getNextObjVersion()
<= currRtsStatsReqVersionNumber, "Up-rev message received.");
QueryFinished *request = new (getHeap())
QueryFinished(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
request->decrRefCount();
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case RTS_QUERY_ID:
{
RtsQueryId *queryId = new (getHeap())
RtsQueryId(getHeap());
// Get the query Id from IPC
*this >> *queryId;
clearAllObjects();
// This call makes the reply to the Query Started message.
ssmpGlobals_->getActiveQueryMgr().rmActiveQuery(queryId->getQid(),
queryId->getQueryIdLen(), getHeap(), CB_COMPLETE,
false /*no cancel logging */);
queryId->decrRefCount();
// Now, make a reply to the Query Finished message.
RmsGenericReply *qfReply = new (getHeap())
RmsGenericReply(getHeap());
*this << *qfReply;
qfReply->decrRefCount();
send(FALSE);
}
break;
default:
ex_assert(0, "something besides an RTS_QUERY_ID followed QueryFinished");
break;
}
}
else
ex_assert(0, "expected an RTS_QUERY_ID following a QueryFinished");
}
void SsmpNewIncomingConnectionStream::actOnCancelQueryReq(
IpcConnection *connection)
{
ex_assert(getNextObjVersion() <= currRtsStatsReqVersionNumber,
"Up-rev message received.");
CancelQueryRequest *request = new (getHeap())
CancelQueryRequest(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
Int32 minimumAge = request->getMinimumAge();
bool cancelByPid = request->getCancelByPid();
char queryId[200];
Lng32 queryIdLen = 0;
bool didAttemptCancel = false;
bool haveAQid = false;
if (cancelByPid)
{
haveAQid = ssmpGlobals_->getQidFromPid(request->getCancelPid(),
minimumAge, queryId, queryIdLen);
}
else
{
ex_assert(moreObjects(),
"expected an RTS_QUERY_ID following a CancelQuery");
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
ex_assert(RTS_QUERY_ID == objType,
"something besides an RTS_QUERY_ID followed CancelQuery");
RtsQueryId *msgQueryId = new (getHeap()) RtsQueryId(getHeap());
// Get the query Id from IPC
*this >> *msgQueryId;
ex_assert(msgQueryId->getQueryIdLen() <= sizeof(queryId),
"query id received is too long");
queryIdLen = msgQueryId->getQueryIdLen();
str_cpy_all(queryId, msgQueryId->getQid(), queryIdLen);
msgQueryId->decrRefCount();
haveAQid = true;
}
clearAllObjects();
ComDiagsArea *diags = NULL;
if (haveAQid)
didAttemptCancel = getSsmpGlobals()->cancelQueryTree(
queryId, queryIdLen, request,
&diags);
// Now, make a reply to the Cancel Query message.
RtsHandle rtsHandle = (RtsHandle) this;
ControlQueryReply *cqReply = new (getHeap())
ControlQueryReply(rtsHandle, getHeap(),
didAttemptCancel || cancelByPid);
*this << *cqReply;
if (!(didAttemptCancel ||cancelByPid))
{
if (diags == NULL)
{
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_CANCEL_QID_NOT_FOUND);
}
*this << *diags;
}
send(FALSE);
cqReply->decrRefCount();
if (diags)
diags->decrRefCount();
request->decrRefCount();
}
void SsmpNewIncomingConnectionStream::actOnSuspendQueryReq(
IpcConnection *connection)
{
ex_assert(getNextObjVersion()
<= CurrSuspendQueryReplyVersionNumber, "Up-rev message received.");
SuspendQueryRequest *request = new (getHeap())
SuspendQueryRequest(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case RTS_QUERY_ID:
{
bool doAttemptSuspend = true;
ComDiagsArea *diags = NULL;
RtsQueryId *queryId = new (getHeap())
RtsQueryId(getHeap());
// Get the query Id from IPC
*this >> *queryId;
clearAllObjects();
char *qid = queryId->getQid();
Lng32 qidLen = queryId->getQueryIdLen();
// Find the query.
StatsGlobals *statsGlobals = ssmpGlobals_->getStatsGlobals();
int error = statsGlobals->getStatsSemaphore(
ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
SyncHashQueue *stmtStatsList = statsGlobals->getStmtStatsList();
stmtStatsList->position(qid, qidLen);
StmtStats *kqStmtStats = NULL;
while (NULL != (kqStmtStats = (StmtStats *)stmtStatsList->getNext()))
{
if (str_cmp(kqStmtStats->getQueryId(), qid, qidLen) == 0)
{
// Can control only queries which have an ExMasterStats.
if (NULL != kqStmtStats->getMasterStats())
break;
}
}
ExMasterStats *masterStats = NULL;
if (kqStmtStats)
masterStats = kqStmtStats->getMasterStats();
if (masterStats)
{
if(!masterStats->isReadyToSuspend())
{
doAttemptSuspend = false;
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_SUSPEND_QID_NOT_ACTIVE);
}
else if (!request->getIsForced())
{
// See if safe to suspend.
if (masterStats &&
masterStats->getSuspendMayHaveAuditPinned())
{
doAttemptSuspend = false;
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_SUSPEND_AUDIT);
}
else if (masterStats &&
masterStats->getSuspendMayHoldLock())
{
doAttemptSuspend = false;
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_SUSPEND_LOCKS);
}
}
if (doAttemptSuspend && masterStats->isQuerySuspended())
{
doAttemptSuspend = false;
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_SUSPEND_ALREADY_SUSPENDED);
}
if (doAttemptSuspend)
{
// sanity checking here - may be better for MXSSMP to fail now
// than for MXSSCPs to fail later.
ExStatisticsArea *statsArea = kqStmtStats->getStatsArea();
ex_assert(statsArea,
"Eligible subject query has no ExStatisticsArea");
ExOperStats *rootStats = statsArea->getRootStats();
ex_assert(rootStats,
"Eligible subject query has no root ExOperStats");
ex_assert((rootStats->statType() == ExOperStats::ROOT_OPER_STATS) ||
(rootStats->statType() == ExOperStats::MEAS_STATS),
"Eligible subject query does not have correct stats.");
ssmpGlobals_->suspendOrActivate(qid, qidLen, SUSPEND,
request->getSuspendLogging());
masterStats->setQuerySuspended(true);
}
}
else
{
doAttemptSuspend = false;
diags = ComDiagsArea::allocate(getHeap());
*diags << DgSqlCode(-EXE_RTS_QID_NOT_FOUND)
<< DgString0(queryId->getQid());
}
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
if (doAttemptSuspend && request->getSuspendLogging())
{
char msg[80 + // the constant text
ComSqlId::MAX_QUERY_ID_LEN];
str_sprintf(msg,
"MXSSMP has processed a request to suspend query %s.",
qid);
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msg, 0);
}
queryId->decrRefCount();
// Now, make a reply to the Suspend Query message.
RtsHandle rtsHandle = (RtsHandle) this;
ControlQueryReply *cqReply = new (getHeap())
ControlQueryReply(rtsHandle, getHeap(),
doAttemptSuspend);
*this << *cqReply;
cqReply->decrRefCount();
if (diags)
{
*this << *diags;
diags->decrRefCount();
}
send(FALSE);
}
break;
default:
ex_assert(0,
"something besides an RTS_QUERY_ID followed SuspendQueryRequest");
break;
}
}
else
ex_assert(0,
"expected an RTS_QUERY_ID following a SuspendQueryRequest");
request->decrRefCount();
}
void SsmpNewIncomingConnectionStream::actOnActivateQueryReq(
IpcConnection *connection)
{
ex_assert(getNextObjVersion()
<= CurrSuspendQueryReplyVersionNumber, "Up-rev message received.");
ActivateQueryRequest *request = new (getHeap())
ActivateQueryRequest(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
bool suspendLogging = request->getSuspendLogging();
request->decrRefCount();
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case RTS_QUERY_ID:
{
ComDiagsArea *diags = NULL;
RtsQueryId *queryId = new (getHeap())
RtsQueryId(getHeap());
// Get the query Id from IPC
*this >> *queryId;
clearAllObjects();
char *qid = queryId->getQid();
Lng32 qidLen = queryId->getQueryIdLen();
bool didAttemptActivate =
ssmpGlobals_->activateFromQid(qid, qidLen, ACTIVATE,
diags, suspendLogging);
queryId->decrRefCount();
// Now, make a reply to the Activate Query message.
RtsHandle rtsHandle = (RtsHandle) this;
ControlQueryReply *cqReply = new (getHeap())
ControlQueryReply(rtsHandle, getHeap(),
didAttemptActivate);
*this << *cqReply;
cqReply->decrRefCount();
if (diags)
{
*this << *diags;
diags->decrRefCount();
}
send(FALSE);
}
break;
default:
ex_assert(0,
"something besides an RTS_QUERY_ID followed SuspendQueryRequest");
break;
}
}
else
ex_assert(0,
"expected an RTS_QUERY_ID following a SuspendQueryRequest");
}
void SsmpNewIncomingConnectionStream::actOnLobLockReq(
IpcConnection *connection)
{
IpcMessageObjVersion msgVer = getNextObjVersion();
StatsGlobals *statsGlobals;
NABoolean releasingLock = FALSE;
CliGlobals *cliGlobals = GetCliGlobals();
ex_assert(msgVer <= CurrLobLockVersionNumber,
"Up-rev message received.");
LobLockRequest *llReq= new (getHeap()) LobLockRequest(getHeap());
*this >> *llReq;
setHandle(llReq->getHandle());
ex_assert(!moreObjects(),"Unexpected objects following LobLockRequest");
clearAllObjects();
//check and set the lock in the local shared segment
statsGlobals = ssmpGlobals_->getStatsGlobals();
char *inLobLockId = NULL;
inLobLockId = llReq->getLobLockId();
if (inLobLockId[0] == '-') //we are releasing this lock. No need to check.
inLobLockId = NULL;
else
{
inLobLockId = &inLobLockId[1];
statsGlobals->checkLobLock(cliGlobals, inLobLockId);
}
if (inLobLockId)
{
//It's already set, don't propagate
if (sscpDiagsArea_== NULL)
sscpDiagsArea_ = ComDiagsArea::allocate(ssmpGlobals_->getHeap());
*sscpDiagsArea_<< DgSqlCode(-EXE_LOB_CONCURRENT_ACCESS_ERROR);
RmsGenericReply *reply = new(getHeap())
RmsGenericReply(getHeap());
*this << *reply;
*this << *sscpDiagsArea_;
this->clearSscpDiagsArea();
send(FALSE);
reply->decrRefCount();
}
else
{
// Forward request to all mxsscps.
ssmpGlobals_->allocateServers();
SscpClientMsgStream *sscpMsgStream = new (heap_)
SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this);
sscpMsgStream->setUsedToSendLLMsgs();
ssmpGlobals_->addRecipients(sscpMsgStream);
sscpMsgStream->clearAllObjects();
*sscpMsgStream << *llReq;
llReq->decrRefCount();
sscpMsgStream->send(FALSE);
}
// Reply to client when the msgs to mxsscp have all completed. The reply
// is made from the sscpMsgStream's callback.
}
void SsmpNewIncomingConnectionStream::actOnSecInvalidKeyReq(
IpcConnection *connection)
{
IpcMessageObjVersion msgVer = getNextObjVersion();
ex_assert(msgVer <= CurrSecurityInvalidKeyVersionNumber,
"Up-rev message received.");
SecInvalidKeyRequest *sikReq = new (getHeap())
SecInvalidKeyRequest(getHeap());
*this >> *sikReq;
setHandle(sikReq->getHandle());
ex_assert( !moreObjects(),
"Unexpected objects following SecInvalidKeyRequest");
clearAllObjects();
// Forward request to all mxsscps.
ssmpGlobals_->allocateServers();
SscpClientMsgStream *sscpMsgStream = new (heap_)
SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this);
sscpMsgStream->setUsedToSendSikMsgs();
ssmpGlobals_->addRecipients(sscpMsgStream);
sscpMsgStream->clearAllObjects();
*sscpMsgStream << *sikReq;
sikReq->decrRefCount();
sscpMsgStream->send(FALSE);
// Reply to client when the msgs to mxsscp have all completed. The reply
// is made from the sscpMsgStream's callback.
}
void SsmpNewIncomingConnectionStream::actOnStatsReq(IpcConnection *connection)
{
IpcMessageObjVersion msgVer;
StatsGlobals *statsGlobals;
int error;
char *qid;
pid_t pid = 0;
short cpu;
Int64 timeStamp;
Lng32 queryNumber;
RtsQueryId *rtsQueryId = NULL;
StmtStats *stmtStats = NULL;
msgVer = getNextObjVersion();
ex_assert(msgVer <= currRtsStatsReqVersionNumber, "Up-rev message received.");
RtsStatsReq *request = new (getHeap())
RtsStatsReq(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
setWmsProcess(request->getWmsProcess());
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case RTS_QUERY_ID:
{
RtsQueryId *queryId = new (getHeap())
RtsQueryId(getHeap());
// Get the query Id from IPC
*this >> *queryId;
clearAllObjects();
statsGlobals = ssmpGlobals_->getStatsGlobals();
short reqType = queryId->getStatsReqType();
short subReqType = queryId->getSubReqType();
switch (reqType)
{
case SQLCLI_STATS_REQ_QID:
qid = queryId->getQid();
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
stmtStats =
statsGlobals->getMasterStmtStats(qid, str_len(qid), queryId->getActiveQueryNum());
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(TRUE);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
if (stmtStats != NULL)
{
getMergedStats(request, queryId, stmtStats,
reqType, queryId->getStatsMergeType());
}
break;
case SQLCLI_STATS_REQ_QID_DETAIL:
qid = queryId->getQid();
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
stmtStats =
statsGlobals->getMasterStmtStats(qid, str_len(qid), 1);
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(TRUE);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
if (stmtStats != NULL)
{
getMergedStats(request, queryId, stmtStats,
reqType, queryId->getStatsMergeType());
}
break;
case SQLCLI_STATS_REQ_PROCESS_INFO:
pid = queryId->getPid();
getProcessStats(reqType, subReqType, pid);
break;
case SQLCLI_STATS_REQ_PID:
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
stmtStats = statsGlobals->getStmtStats(pid, queryId->getActiveQueryNum());
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(TRUE);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
if (stmtStats != NULL)
{
if (stmtStats->isMaster())
{
getMergedStats(request, NULL, stmtStats,
reqType, queryId->getStatsMergeType());
}
else
{
RtsStatsReply *reply = new (getHeap())
RtsStatsReply(request->getHandle(), getHeap());
clearAllObjects();
setType(IPC_MSG_SSMP_REPLY);
setVersion(CurrSsmpReplyMessageVersion);
*this << *reply;
rtsQueryId = new (getHeap()) RtsQueryId(getHeap(),
stmtStats->getQueryId(), stmtStats->getQueryIdLen(),
// We need to use ANY_QUERY_ otherwise you might get QUERY_ID not found
// if the query gets finished before stats is merged
(UInt16)SQLCLI_SAME_STATS, RtsQueryId::ANY_QUERY_);
*this << *(rtsQueryId);
send(FALSE);
stmtStats->setStmtStatsUsed(FALSE);
reply->decrRefCount();
rtsQueryId->decrRefCount();
}
}
break;
case SQLCLI_STATS_REQ_QID_INTERNAL:
cpu = queryId->getCpu();
pid = queryId->getPid();
timeStamp = queryId->getTimeStamp();
queryNumber = queryId->getQueryNumber();
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
stmtStats = statsGlobals->getStmtStats(cpu, pid, timeStamp, queryNumber);
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(TRUE);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
if (stmtStats != NULL)
{
ex_assert(stmtStats->isMaster() == TRUE, "Should be Master here");
getMergedStats(request, NULL, stmtStats,
reqType, queryId->getStatsMergeType());
}
break;
case SQLCLI_STATS_REQ_CPU:
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
stmtStats = statsGlobals->getStmtStats(queryId->getActiveQueryNum());
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(TRUE);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
if (stmtStats != NULL)
{
ex_assert(stmtStats->isMaster() == TRUE, "Should be Master here");
getMergedStats(request, NULL, stmtStats,
reqType, queryId->getStatsMergeType());
}
break;
default:
break;
}
request->decrRefCount();
queryId->decrRefCount();
// If there is no stmt stats, reply back with empty stats
if (stmtStats == NULL && reqType != SQLCLI_STATS_REQ_PROCESS_INFO)
sendMergedStats(NULL, 0, reqType, NULL, FALSE);
}
break;
default:
break;
}
}
}
void SsmpNewIncomingConnectionStream::getProcessStats(short reqType,
short subReqType,
pid_t pid)
{
ExStatisticsArea *mergedStats=NULL;
ExProcessStats *exProcessStats=NULL;
ExProcessStats *tmpExProcessStats=NULL;
StatsGlobals *statsGlobals = ssmpGlobals_->getStatsGlobals();
exProcessStats = statsGlobals->getExProcessStats(pid);
if (exProcessStats != NULL)
{
mergedStats = new (getHeap()) ExStatisticsArea(getHeap(),
0,
ComTdb::OPERATOR_STATS,
ComTdb::OPERATOR_STATS);
mergedStats->setStatsEnabled(TRUE);
mergedStats->setSubReqType(subReqType);
tmpExProcessStats = new (mergedStats->getHeap())
ExProcessStats(mergedStats->getHeap());
tmpExProcessStats->copyContents(exProcessStats);
mergedStats->insert(tmpExProcessStats);
}
sendMergedStats(mergedStats, 0, reqType, NULL, FALSE);
}
void SsmpNewIncomingConnectionStream::getMergedStats(RtsStatsReq *request,
RtsQueryId *queryId,
StmtStats *stmtStats,
short reqType,
UInt16 statsMergeType)
{
Int64 currTime = NA_JulianTimestamp();
RtsQueryId *rtsQueryId = NULL;
ExStatisticsArea *mergedStats;
ExStatisticsArea *srcMergedStats;
// MergedStats_ in stmtStats can't be used directly in Ipc layer
// since the stats->entries_ will be traversed at the time of send
// This can cause corruption since hashQueue is not protected using semaphore
// So, use local stats area instead when the statsArea is returned within the
// RTS_MERGE_INTERVAL_TIMEOUT from SSMP itself. When the statsArea is merged from SSCPs
// set the mergedStats in stmtStats_ after it has been shipped out, to avoid other processes
// accessing the mergedStats_ while it is being traversed in Ipc layer
if (reqType == SQLCLI_STATS_REQ_QID && (ssmpGlobals_->getForceMerge() == FALSE)
&& (ssmpGlobals_->getNumDeallocatedServers() == 0)
&& (stmtStats->mergeReqd() == FALSE)
&& ((currTime - stmtStats->getLastMergedTime()) <
(ssmpGlobals_->getStatsCollectionInterval() * 1000000))
&& isWmsProcess())
{
if ((srcMergedStats = stmtStats->getMergedStats()) != NULL)
{
ComTdb::CollectStatsType tempStatsMergeType =
(statsMergeType == SQLCLI_SAME_STATS ?
srcMergedStats->getOrigCollectStatsType() :
(ComTdb::CollectStatsType)statsMergeType);
if (srcMergedStats->getCollectStatsType() == tempStatsMergeType)
{
mergedStats = new (getHeap()) ExStatisticsArea(getHeap(),
0,
tempStatsMergeType,
srcMergedStats->getOrigCollectStatsType());
mergedStats->setStatsEnabled(TRUE);
StatsGlobals *statsGlobals = ssmpGlobals_->getStatsGlobals();
int error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
mergedStats->merge(srcMergedStats, statsMergeType);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
sendMergedStats(mergedStats, 0, reqType, stmtStats, FALSE);
return;
}
}
}
ssmpGlobals_->allocateServers();
// Give the received message to SscpClientMsgStream
SscpClientMsgStream *sscpMsgStream = new (heap_) SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this);
ssmpGlobals_->addRecipients(sscpMsgStream);
sscpMsgStream->clearAllObjects();
*sscpMsgStream << *request;
// If the incoming RtsQueryId (queryId) is null, construct the
// RtsQueryId from stmtStats
if (queryId == NULL)
{
rtsQueryId = new (getHeap()) RtsQueryId(getHeap(), stmtStats->getQueryId(),
stmtStats->getQueryIdLen(), statsMergeType,
RtsQueryId::ANY_QUERY_);
*sscpMsgStream << *rtsQueryId;
sscpMsgStream->setReqType(rtsQueryId->getStatsReqType());
sscpMsgStream->setDetailLevel(rtsQueryId->getDetailLevel());
}
else
{
*sscpMsgStream << *queryId ;
sscpMsgStream->setReqType(queryId->getStatsReqType());
sscpMsgStream->setDetailLevel(queryId->getDetailLevel());
}
sscpMsgStream->setStmtStats(stmtStats);
sscpMsgStream->setMergeStartTime(NA_JulianTimestamp());
// Send the Message to all
ssmpGlobals_->addPendingSscpMessage(sscpMsgStream);
sscpMsgStream->send(FALSE);
if (rtsQueryId != NULL)
rtsQueryId->decrRefCount();
return;
}
void SsmpNewIncomingConnectionStream::actOnCpuStatsReq(IpcConnection *connection)
{
IpcMessageObjVersion msgVer;
StmtStats *stmtStats;
StatsGlobals *statsGlobals;
ExStatisticsArea *stats;
ExStatisticsArea *cpuStats = NULL;
ExMasterStats *masterStats;
short currQueryNum = 0;
msgVer = getNextObjVersion();
if (msgVer > currRtsCpuStatsReqVersionNumber)
// Send Error
;
RtsCpuStatsReq *request = new (getHeap())
RtsCpuStatsReq(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
short reqType = request->getReqType();
short noOfQueries = request->getNoOfQueries();
Lng32 filter = request->getFilter();
short subReqType = request->getSubReqType();
clearAllObjects();
if (request->getCpu() != -1)
{
setType(IPC_MSG_SSMP_REPLY);
setVersion(CurrSsmpReplyMessageVersion);
RtsStatsReply *reply = new (getHeap())
RtsStatsReply(request->getHandle(), getHeap());
*this << *reply;
statsGlobals = ssmpGlobals_->getStatsGlobals();
switch (reqType)
{
case SQLCLI_STATS_REQ_RMS_INFO:
{
cpuStats = new (getHeap())
ExStatisticsArea(getHeap(), 0, ComTdb::RMS_INFO_STATS, ComTdb::RMS_INFO_STATS);
cpuStats->setSubReqType(subReqType);
cpuStats->setStatsEnabled(TRUE);
ExRMSStats *rmsStats = new (getHeap()) ExRMSStats(getHeap());
rmsStats->copyContents(statsGlobals->getRMSStats());
NAHeap *statsHeap = statsGlobals->getStatsHeap();
rmsStats->setGlobalStatsHeapAlloc(statsHeap->getTotalSize());
rmsStats->setGlobalStatsHeapUsed(statsHeap->getAllocSize());
rmsStats->setStatsHeapWaterMark(statsHeap->getHighWaterMark());
rmsStats->setNoOfStmtStats(statsGlobals->getStmtStatsList()->numEntries());
rmsStats->setSemPid(statsGlobals->getSemPid());
rmsStats->setSscpOpens(ssmpGlobals_->getNumAllocatedServers());
rmsStats->setSscpDeletedOpens(ssmpGlobals_->getNumDeallocatedServers());
rmsStats->setNumQueryInvKeys(statsGlobals->getRecentSikeys()->entries());
cpuStats->insert(rmsStats);
if (request->getNoOfQueries() == RtsCpuStatsReq::INIT_RMS_STATS_)
statsGlobals->getRMSStats()->reset();
break;
}
case SQLCLI_STATS_REQ_ET_OFFENDER:
{
cpuStats = new (getHeap())
ExStatisticsArea(getHeap(), 0, ComTdb::ET_OFFENDER_STATS,
ComTdb::ET_OFFENDER_STATS);
cpuStats->setStatsEnabled(TRUE);
int error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
SyncHashQueue *stmtStatsList = statsGlobals->getStmtStatsList();
stmtStatsList->position();
Int64 currTimestamp = NA_JulianTimestamp();
while ((stmtStats = (StmtStats *)stmtStatsList->getNext()) != NULL)
{
masterStats = stmtStats->getMasterStats();
if (masterStats != NULL)
cpuStats->appendCpuStats(masterStats, FALSE,
subReqType, filter, currTimestamp);
}
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
break;
}
case SQLCLI_STATS_REQ_MEM_OFFENDER:
{
cpuStats = new (getHeap())
ExStatisticsArea(getHeap(), 0, ComTdb::MEM_OFFENDER_STATS,
ComTdb::MEM_OFFENDER_STATS);
cpuStats->setStatsEnabled(TRUE);
cpuStats->setSubReqType(subReqType);
statsGlobals->getMemOffender(cpuStats, filter);
}
break;
case SQLCLI_STATS_REQ_CPU_OFFENDER:
{
short noOfQueries = request->getNoOfQueries();
int error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
SyncHashQueue * stmtStatsList = statsGlobals->getStmtStatsList();
stmtStatsList->position();
switch (noOfQueries)
{
case RtsCpuStatsReq::INIT_CPU_STATS_HISTORY_:
while ((stmtStats = (StmtStats *)stmtStatsList->getNext()) != NULL)
{
stats = stmtStats->getStatsArea();
if (stats != NULL)
stats->setCpuStatsHistory();
}
break;
default:
if ( noOfQueries == RtsCpuStatsReq::ALL_ACTIVE_QUERIES_)
noOfQueries = 32767;
cpuStats = new (getHeap())
ExStatisticsArea(getHeap(), 0, ComTdb::CPU_OFFENDER_STATS,
ComTdb::CPU_OFFENDER_STATS);
cpuStats->setStatsEnabled(TRUE);
while ((stmtStats = (StmtStats *)stmtStatsList->getNext()) != NULL
&& currQueryNum <= noOfQueries)
{
stats = stmtStats->getStatsArea();
if (stats != NULL)
{
if (cpuStats->appendCpuStats(stats))
currQueryNum++;
}
}
break;
}
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
break;
}
default:
break;
}
if (cpuStats != NULL && cpuStats->numEntries() > 0)
*this << *(cpuStats);
send(FALSE);
reply->decrRefCount();
if (cpuStats != NULL)
NADELETE(cpuStats, ExStatisticsArea, cpuStats->getHeap());
}
else
{
ssmpGlobals_->allocateServers();
// Give the received message to SscpClientMsgStream
SscpClientMsgStream *sscpMsgStream = new (heap_) SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this);
ssmpGlobals_->addRecipients(sscpMsgStream);
sscpMsgStream->clearAllObjects();
*sscpMsgStream << *request;
sscpMsgStream->setMergeStartTime(NA_JulianTimestamp());
sscpMsgStream->setReqType(request->getReqType());
sscpMsgStream->setSubReqType(request->getSubReqType());
ssmpGlobals_->addPendingSscpMessage(sscpMsgStream);
sscpMsgStream->send(FALSE);
}
request->decrRefCount();
}
void SsmpNewIncomingConnectionStream::actOnExplainReq(IpcConnection *connection)
{
IpcMessageObjVersion msgVer;
StmtStats *stmtStats;
StatsGlobals *statsGlobals;
RtsExplainFrag *explainFrag = NULL;
RtsExplainFrag *srcExplainFrag;
short currQueryNum = 0;
msgVer = getNextObjVersion();
if (msgVer > currRtsExplainReqVersionNumber)
// Send Error
;
RtsExplainReq *request = new (getHeap())
RtsExplainReq(INVALID_RTS_HANDLE, getHeap());
*this >> *request;
setHandle(request->getHandle());
clearAllObjects();
setType(RTS_MSG_EXPLAIN_REPLY);
setVersion(currRtsExplainReplyVersionNumber);
RtsExplainReply *reply = new (getHeap())
RtsExplainReply(request->getHandle(), getHeap());
*this << *reply;
statsGlobals = ssmpGlobals_->getStatsGlobals();
int error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
stmtStats = statsGlobals->getMasterStmtStats(request->getQid(), request->getQidLen(),
RtsQueryId::ANY_QUERY_);
if (stmtStats != NULL)
{
srcExplainFrag = stmtStats->getExplainInfo();
if (srcExplainFrag != NULL)
explainFrag = new (getHeap()) RtsExplainFrag(getHeap(), srcExplainFrag);
}
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
if (explainFrag)
*this << *(explainFrag);
send(FALSE);
reply->decrRefCount();
request->decrRefCount();
if (explainFrag)
explainFrag->decrRefCount();
}
void SsmpNewIncomingConnectionStream::sscpIpcError(IpcConnection *conn)
{
if (sscpDiagsArea_ == NULL)
sscpDiagsArea_ = ComDiagsArea::allocate(ssmpGlobals_->getHeap());
conn->populateDiagsArea(sscpDiagsArea_, ssmpGlobals_->getHeap());
}
void SscpClientMsgStream::actOnStatsReply(IpcConnection *connection)
{
IpcMessageObjVersion msgVer;
msgVer = getNextObjVersion();
int error;
if (msgVer > currRtsStatsReplyVersionNumber)
// Send Error
;
RtsStatsReply *reply = new (getHeap())
RtsStatsReply(INVALID_RTS_HANDLE, getHeap());
*this >> *reply;
incNumSqlProcs(reply->getNumSqlProcs());
incNumCpus(reply->getNumCpus());
while (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
short reqType = getReqType();
switch (objType)
{
case IPC_SQL_STATS_AREA:
{
ExStatisticsArea *stats = new (getHeap())
ExStatisticsArea(getHeap());
// Get the query Id from IPC
*this >> *stats;
switch (reqType)
{
case SQLCLI_STATS_REQ_QID:
{
StatsGlobals *statsGlobals = ssmpGlobals_->getStatsGlobals();
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
if (mergedStats_ == NULL)
{
if (isReplySent())
mergedStats_ = new (getHeap())
ExStatisticsArea(getHeap(), 0, stats->getCollectStatsType(),
stats->getOrigCollectStatsType());
else
mergedStats_ = new (getSsmpGlobals()->getStatsHeap())
ExStatisticsArea(getSsmpGlobals()->getStatsHeap(), 0, stats->getCollectStatsType(),
stats->getOrigCollectStatsType());
// Can we always assume that the stats is enabled ?????
// The stats should be enabled for the DISPLAY STATISTICS or SELECT ...TABLE(STATSISTICS...)
// to display something
mergedStats_->setStatsEnabled(TRUE);
}
mergedStats_->merge(stats);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
break;
}
case SQLCLI_STATS_REQ_QID_DETAIL:
case SQLCLI_STATS_REQ_CPU_OFFENDER:
case SQLCLI_STATS_REQ_SE_OFFENDER:
case SQLCLI_STATS_REQ_ET_OFFENDER:
case SQLCLI_STATS_REQ_MEM_OFFENDER:
case SQLCLI_STATS_REQ_RMS_INFO:
{
if (mergedStats_ == NULL)
{
mergedStats_ = new (getHeap())
ExStatisticsArea(getHeap(), 0, stats->getCollectStatsType(),
stats->getOrigCollectStatsType());
mergedStats_->setStatsEnabled(TRUE);
mergedStats_->setSubReqType(subReqType_);
}
mergedStats_->setDetailLevel(getDetailLevel());
mergedStats_->appendCpuStats(stats, TRUE);
if (reqType == SQLCLI_STATS_REQ_QID_DETAIL && stats->getMasterStats() != NULL)
{
ExMasterStats *masterStats = new (getHeap()) ExMasterStats((NAHeap *)getHeap());
masterStats->copyContents(stats->getMasterStats());
mergedStats_->setMasterStats(masterStats);
}
break;
}
default:
break;
}
NADELETE(stats, ExStatisticsArea, getHeap());
}
break;
default:
break;
}
}
reply->decrRefCount();
#ifdef _DEBUG_RTS
cerr << "Ssmp Merged Stats " << mergedStats_ << " \n";
#endif
}
SscpClientMsgStream::~SscpClientMsgStream()
{
if (mergedStats_ != NULL)
{
NADELETE(mergedStats_, ExStatisticsArea, mergedStats_->getHeap());
}
}
// method called upon send complete
void SscpClientMsgStream::actOnSendAllComplete()
{
// once all sends have completed, initiate a nowait receive from all
// SSCPs that we sent this message to
clearAllObjects();
receive(FALSE);
}
void SscpClientMsgStream::actOnReceive(IpcConnection* connection)
{
IpcMessageObjType replyType;
if (connection->getErrorInfo() != 0)
{
if (ssmpStream_)
ssmpStream_->sscpIpcError(connection);
delinkConnection(connection);
return;
}
else
{
numOfClientRequestsSent_--;
// check for protocol errors
ex_assert(getType() == IPC_MSG_SSCP_REPLY AND
getVersion() == CurrSscpReplyMessageVersion AND
moreObjects(),
"Invalid message from client");
// take a look at the type of the first object in the message
replyType = getNextObjType();
switch(replyType)
{
case RTS_MSG_STATS_REPLY:
actOnStatsReply(connection);
break;
case CANCEL_QUERY_KILL_SERVERS_REPLY:
{
CancelQueryKillServersReply *reply = new (getHeap())
CancelQueryKillServersReply(INVALID_RTS_HANDLE, getHeap());
*this >> *reply;
reply->decrRefCount();
break;
}
case IPC_MSG_RMS_REPLY:
{
RmsGenericReply *reply = new (getHeap())
RmsGenericReply(getHeap());
*this >> *reply;
reply->decrRefCount();
break;
}
default:
ex_assert(FALSE,"Invalid reply from client");
}
}
}
void SscpClientMsgStream::actOnReceiveAllComplete()
{
// If we have received responses to all requests we sent out, we mark this stream as having
// completed its work. The IPCEnv will call the destructor at a time when it is safe to do so.
if (! isReplySent())
{
switch (completionProcessing_)
{
case STATS:
{
sendMergedStats();
break;
}
case CB:
{
// Control broker - cancel/suspend/activate. Nothing to do.
break;
}
case SIK:
{
replySik();
break;
}
case LL:
{
replyLL();
break;
}
default:
{
ex_assert(FALSE, "Unknown completionProcessing_ flag.");
break;
}
}
ssmpGlobals_->removePendingSscpMessage(this);
}
addToCompletedList();
}
void SscpClientMsgStream::replySik()
{
RmsGenericReply *reply = new(getHeap())
RmsGenericReply(getHeap());
*ssmpStream_ << *reply;
if (ssmpStream_->getSscpDiagsArea())
{
// Pass errors from communication w/ SSCPs back to the
// client.
*ssmpStream_ << *(ssmpStream_->getSscpDiagsArea());
ssmpStream_->clearSscpDiagsArea();
}
ssmpStream_->send(FALSE);
reply->decrRefCount();
}
void SscpClientMsgStream::replyLL()
{
RmsGenericReply *reply = new(getHeap())
RmsGenericReply(getHeap());
*ssmpStream_ << *reply;
if (ssmpStream_->getSscpDiagsArea())
{
// Pass errors from communication w/ SSCPs back to the
// client.
*ssmpStream_ << *(ssmpStream_->getSscpDiagsArea());
ssmpStream_->clearSscpDiagsArea();
}
ssmpStream_->send(FALSE);
reply->decrRefCount();
}
void SscpClientMsgStream::sendMergedStats()
{
StmtStats *stmtStats;
ExStatisticsArea *mergedStats;
int error;
stmtStats = getStmtStats();
short reqType = getReqType();
if (mergedStats_ == NULL && reqType == SQLCLI_STATS_REQ_QID &&
stmtStats != NULL && stmtStats->getMasterStats() != NULL)
{
StatsGlobals *statsGlobals = ssmpGlobals_->getStatsGlobals();
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
mergedStats_ = new (ssmpGlobals_->getStatsHeap())
ExStatisticsArea(ssmpGlobals_->getStatsHeap(), 0,
stmtStats->getMasterStats()->getCollectStatsType());
// Can we always assume that the stats is enabled ?????
// The stats should be enabled for the DISPLAY STATISTICS or SELECT ...TABLE(STATSISTICS...)
// to display something
mergedStats_->setStatsEnabled(TRUE);
ExMasterStats *masterStats = new (ssmpGlobals_->getStatsHeap())
ExMasterStats(ssmpGlobals_->getStatsHeap());
masterStats->copyContents(stmtStats->getMasterStats());
mergedStats_->setMasterStats(masterStats);
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
}
ExMasterStats *masterStats;
if (mergedStats_ != NULL)
{
masterStats = mergedStats_->getMasterStats();
if (masterStats != NULL)
{
masterStats->setNumCpus(getNumCpus());
}
}
// Get a reference to mergedStats_ before setReplySent() sets it zero
mergedStats = mergedStats_;
setReplySent();
ssmpStream_->sendMergedStats(mergedStats, getNumOfErrorRequests()+getNumOfClientRequestsPending()+
getSsmpGlobals()->getNumDeallocatedServers(), reqType, stmtStats,
(reqType == SQLCLI_STATS_REQ_QID));
}
void SscpClientMsgStream::delinkConnection(IpcConnection *conn)
{
char nodeName[MAX_SEGMENT_NAME_LEN+1];
IpcCpuNum cpu;
numOfErrorRequests_++;
numOfClientRequestsSent_--;
conn->getOtherEnd().getNodeName().getNodeNameAsString(nodeName);
cpu = conn->getOtherEnd().getCpuNum();
#ifdef _DEBUG
cout << "delinking " << nodeName << " in CPU " << cpu << endl;
#endif
ssmpGlobals_->deAllocateServer(nodeName, (short)str_len(nodeName), (short)cpu);
}
void SsmpNewIncomingConnectionStream::sendMergedStats(ExStatisticsArea *mergedStats, short numErrors,
short reqType, StmtStats *stmtStats, NABoolean updateMergeStats)
{
StatsGlobals *statsGlobals;
int error;
RtsStatsReply *reply = new (getHeap())
RtsStatsReply(getHandle(), getHeap());
reply->setNumSscpErrors(numErrors);
setType(IPC_MSG_SSMP_REPLY);
setVersion(CurrSsmpReplyMessageVersion);
*this << *reply;
switch (reqType)
{
case SQLCLI_STATS_REQ_QID:
if (mergedStats != NULL)
*this << *(mergedStats);
send (FALSE);
statsGlobals = ssmpGlobals_->getStatsGlobals();
error = statsGlobals->getStatsSemaphore(ssmpGlobals_->getSemId(),
ssmpGlobals_->myPin());
if (stmtStats != NULL)
{
if (updateMergeStats)
stmtStats->setMergedStats(mergedStats);
stmtStats->setStmtStatsUsed(FALSE);
if (isWmsProcess() && stmtStats->canbeGCed())
statsGlobals->removeQuery(stmtStats->getPid(), stmtStats, TRUE);
}
statsGlobals->releaseStatsSemaphore(ssmpGlobals_->getSemId(), ssmpGlobals_->myPin());
break;
case SQLCLI_STATS_REQ_QID_DETAIL:
if (mergedStats != NULL)
*this << *(mergedStats);
send (FALSE);
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(FALSE);
break;
case SQLCLI_STATS_REQ_CPU_OFFENDER:
case SQLCLI_STATS_REQ_SE_OFFENDER:
case SQLCLI_STATS_REQ_ET_OFFENDER:
case SQLCLI_STATS_REQ_RMS_INFO:
case SQLCLI_STATS_REQ_MEM_OFFENDER:
case SQLCLI_STATS_REQ_PROCESS_INFO:
if (mergedStats != NULL)
*this << *(mergedStats);
send (FALSE);
break;
case SQLCLI_STATS_REQ_PID:
case SQLCLI_STATS_REQ_CPU:
case SQLCLI_STATS_REQ_QID_INTERNAL:
if (mergedStats != NULL &&
mergedStats->getMasterStats() != NULL)
*this << *(mergedStats);
send (FALSE);
if (stmtStats != NULL)
stmtStats->setStmtStatsUsed(FALSE);
break;
}
reply->decrRefCount();
if (!(updateMergeStats) && mergedStats != NULL)
{
NADELETE(mergedStats, ExStatisticsArea, mergedStats->getHeap());
}
}
void SsmpClientMsgStream::actOnSend(IpcConnection* connection)
{
if (connection->getErrorInfo() != 0)
stats_ = NULL;
}
void SsmpClientMsgStream::actOnSendAllComplete()
{
clearAllObjects();
receive(FALSE);
}
void SsmpClientMsgStream::actOnReceive(IpcConnection* connection)
{
replyRecvd_ = TRUE;
if (connection->getErrorInfo() != 0)
{
stats_ = NULL;
if (diagsForClient_)
connection->populateDiagsArea(diagsForClient_, getHeap());
delinkConnection(connection);
return;
}
// take a look at the type of the first object in the message
switch(getNextObjType())
{
case RTS_MSG_STATS_REPLY:
actOnStatsReply(connection);
break;
case RTS_MSG_EXPLAIN_REPLY:
actOnExplainReply(connection);
break;
case IPC_MSG_RMS_REPLY:
actOnGenericReply();
break;
default:
ex_assert(FALSE,"Invalid reply from first client message");
}
}
void SsmpClientMsgStream::actOnReceiveAllComplete()
{
// We mark this stream as having completed its work. The IPCEnv will
// call the destructor at a time when it is safe to do so.
addToCompletedList();
}
void SsmpClientMsgStream::actOnStatsReply(IpcConnection* connection)
{
IpcMessageObjVersion msgVer;
msgVer = getNextObjVersion();
if (msgVer > currRtsStatsReplyVersionNumber)
// Send Error
;
RtsStatsReply *reply = new (getHeap())
RtsStatsReply(INVALID_RTS_HANDLE, getHeap());
*this >> *reply;
numSscpReqFailed_ = reply->getNumSscpErrors();
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case IPC_SQL_STATS_AREA:
{
stats_ = new (getHeap())
ExStatisticsArea(getHeap());
*this >> *stats_;
}
break;
case RTS_QUERY_ID:
{
rtsQueryId_ = new (getHeap())
RtsQueryId(getHeap());
*this >> *rtsQueryId_;
}
break;
default:
break;
}
}
else
stats_ = NULL;
reply->decrRefCount();
// we don't want to decrement the stats, since we want to pass it on
}
void SsmpClientMsgStream::actOnExplainReply(IpcConnection* connection)
{
IpcMessageObjVersion msgVer;
msgVer = getNextObjVersion();
if (msgVer > currRtsExplainReplyVersionNumber)
// Send Error
;
RtsExplainReply *reply = new (getHeap())
RtsExplainReply(INVALID_RTS_HANDLE, getHeap());
*this >> *reply;
numSscpReqFailed_ = 0;
if (moreObjects())
{
RtsMessageObjType objType =
(RtsMessageObjType) getNextObjType();
switch (objType)
{
case RTS_EXPLAIN_FRAG:
{
explainFrag_ = new (getHeap())
RtsExplainFrag(getHeap());
*this >> *explainFrag_;
}
break;
default:
break;
}
}
else
explainFrag_ = NULL;
reply->decrRefCount();
}
void SsmpClientMsgStream::delinkConnection(IpcConnection *conn)
{
char nodeName[MAX_SEGMENT_NAME_LEN+1];
IpcCpuNum cpu;
conn->getOtherEnd().getNodeName().getNodeNameAsString(nodeName);
cpu = conn->getOtherEnd().getCpuNum();
ssmpManager_->removeSsmpServer(nodeName, (short)cpu);
}
void SsmpClientMsgStream::actOnGenericReply()
{
RmsGenericReply *reply = new (getHeap()) RmsGenericReply(getHeap());
*this >> *reply;
while (moreObjects())
{
IpcMessageObjType objType = getNextObjType();
ex_assert(objType == IPC_SQL_DIAG_AREA,
"Unknown object returned from mxssmp.");
if (diagsForClient_)
*this >> *diagsForClient_;
}
reply->decrRefCount();
}