blob: cfbe07b5fb884567bd03d4f4ccc46e4513f6de77 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ex_ssmp_main.cpp
* Description: This is the main program for SSMP. SQL stats merge process.
*
* Created: 05/08/2006
* Language: C++
*
*****************************************************************************
*/
#include "Platform.h"
#ifdef _DEBUG
#include <fstream>
#include <iostream>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#endif
#include <errno.h>
#include "ExCextdecs.h"
#include "ex_ex.h"
#include "Ipc.h"
#include "Globals.h"
#include "SqlStats.h"
#include "memorymonitor.h"
#include "ssmpipc.h"
#include "rts_msg.h"
#include "PortProcessCalls.h"
#include "seabed/ms.h"
#include "seabed/fs.h"
extern void my_mpi_fclose();
#include "SCMVersHelp.h"
DEFINE_DOVERS(mxssmp)
void runServer(Int32 argc, char **argv);
void processAccumulatedStatsReq(SsmpNewIncomingConnectionStream *ssmpMsgStream, SsmpGlobals *ssmpGlobals);
Int32 main(Int32 argc, char **argv)
{
dovers(argc, argv);
msg_debug_hook("mxssmp", "mxssmp.hook");
try {
file_init(&argc, &argv);
file_mon_process_startup(true);
}
catch (SB_Fatal_Excep &e) {
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, e.what(), 0);
exit(1);
}
atexit(my_mpi_fclose);
// setup log4cxx, need to be done here so initLog4cxx can have access to
// process information since it is needed to compose the log name
// the log4cxx log name for this ssmp process will be
// based on this process' node number as suffix sscp_<nid>.log
QRLogger::instance().setModule(QRLogger::QRL_SSMP);
QRLogger::instance().initLog4cxx("log4cxx.trafodion.ssmp.config");
// Synchronize C and C++ output streams
ios::sync_with_stdio();
#ifdef _DEBUG
// Redirect stdout and stderr to files named in environment
// variables
const char *stdOutFile = getenv("SQ_SSCP_STDOUT");
const char *stdErrFile = getenv("SQ_SSCP_STDERR");
Int32 fdOut = -1;
Int32 fdErr = -1;
if (stdOutFile && stdOutFile[0])
{
fdOut = open(stdOutFile,
O_WRONLY | O_APPEND | O_CREAT | O_SYNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fdOut >= 0)
{
fprintf(stderr, "[Redirecting MXUDR stdout to %s]\n", stdOutFile);
fflush(stderr);
dup2(fdOut, fileno(stdout));
}
else
{
fprintf(stderr, "*** WARNING: could not open %s for redirection: %s.\n",
stdOutFile, strerror(errno));
}
}
if (stdErrFile && stdErrFile[0])
{
fdErr = open(stdErrFile,
O_WRONLY | O_APPEND | O_CREAT | O_SYNC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fdErr >= 0)
{
fprintf(stderr, "[Redirecting MXUDR stderr to %s]\n", stdErrFile);
fflush(stderr);
dup2(fdErr, fileno(stderr));
}
else
{
fprintf(stderr, "*** WARNING: could not open %s for redirection: %s.\n",
stdErrFile, strerror(errno));
}
}
runServer(argc, argv);
if (fdOut >= 0)
{
close(fdOut);
}
if (fdErr >= 0)
{
close(fdErr);
}
#else
runServer(argc, argv);
#endif
return 0;
}
void runServer(Int32 argc, char **argv)
{
Int32 shmId;
StatsGlobals *statsGlobals = (StatsGlobals *)shareStatsSegment(shmId);
Int32 r = 0;
while (statsGlobals == NULL && ++r < 10)
{ // try 9 more times if the shared segement is not available
DELAY(100); // delay for 1 sec.
statsGlobals = (StatsGlobals *)shareStatsSegment(shmId);
}
if (statsGlobals == NULL)
{
char tmbuf[64];
time_t now;
struct tm *nowtm;
now = time(NULL);
nowtm = localtime(&now);
strftime(tmbuf, sizeof tmbuf, "%Y-%m-%d %H:%M:%S ", nowtm);
cout << tmbuf << "SSCP didn't create/initialize the RMS shared segment"
<< ", SSMP exited\n";
NAExit(0);
}
CliGlobals *cliGlobals = CliGlobals::createCliGlobals(FALSE);
statsGlobals->setSsmpPid(cliGlobals->myPin());
statsGlobals->setSsmpTimestamp(cliGlobals->myStartTime());
short error = statsGlobals->openStatsSemaphore(
statsGlobals->getSsmpProcSemId());
ex_assert(error == 0, "Error in opening the semaphore");
cliGlobals->setStatsGlobals(statsGlobals);
cliGlobals->setSharedMemId(shmId);
// Handle possibility that the previous instance of MXSSMP died
// while holding the stats semaphore. This code has been covered in
// a manual unit test, but it is not possible to cover this easily in
// an automated test.
if (statsGlobals->getSemPid() != -1)
{
NAProcessHandle prevSsmpPhandle((SB_Phandle_Type *)
statsGlobals->getSsmpProcHandle());
prevSsmpPhandle.decompose();
if (statsGlobals->getSemPid() == prevSsmpPhandle.getPin())
{
NAProcessHandle myPhandle;
myPhandle.getmine();
myPhandle.decompose();
int error =
statsGlobals->releaseAndGetStatsSemaphore(
statsGlobals->getSsmpProcSemId(),
(pid_t) myPhandle.getPin(),
(pid_t) prevSsmpPhandle.getPin());
ex_assert(error == 0,
"releaseAndGetStatsSemaphore() returned error");
statsGlobals->releaseStatsSemaphore(
statsGlobals->getSsmpProcSemId(),
(pid_t) myPhandle.getPin());
}
}
XPROCESSHANDLE_GETMINE_(statsGlobals->getSsmpProcHandle());
NAHeap *ssmpHeap = cliGlobals->getExecutorMemory();
IpcEnvironment *ssmpIpcEnv = new (ssmpHeap) IpcEnvironment(ssmpHeap,
cliGlobals->getEventConsumed(), FALSE, IPC_SQLSSMP_SERVER,
FALSE, TRUE);
SsmpGlobals *ssmpGlobals = new (ssmpHeap) SsmpGlobals(ssmpHeap, ssmpIpcEnv,
statsGlobals);
// Currently open $RECEIVE with 2048
SsmpGuaReceiveControlConnection *cc =
new (ssmpHeap) SsmpGuaReceiveControlConnection(ssmpIpcEnv,
ssmpGlobals,
2048);
ssmpIpcEnv->setControlConnection(cc);
while (TRUE)
{
/*
* Until ssmp starts receiving messages, disable this check.
* We need ssmp to wake up periodically to perform garbage collection.
*
// wait for the first open message to come in
while (cc->getConnection() == NULL)
cc->wait(IpcInfiniteTimeout);
// start the first receive operation
#ifdef _DEBUG_RTS
cerr << "No. of Requesters-1 " << cc->getNumRequestors() << " \n";
#endif
while (cc->getNumRequestors() > 0)
for (;;)
{
ssmpGlobals->work();
}
}
*/
// wait for system messages only until ssmp starts receiving msgs.
cc->wait(300);
// go do GC.
ssmpGlobals->work();
}
}