| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: ex_ssmp_main.cpp |
| * Description: This is the main program for SSMP. SQL stats merge process. |
| * |
| * Created: 05/08/2006 |
| * Language: C++ |
| * |
| ***************************************************************************** |
| */ |
| #include "Platform.h" |
| #ifdef _DEBUG |
| #include <fstream> |
| #include <iostream> |
| #include <fcntl.h> |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #endif |
| #include <errno.h> |
| #include "ExCextdecs.h" |
| #include "ex_ex.h" |
| #include "Ipc.h" |
| #include "Globals.h" |
| #include "SqlStats.h" |
| #include "memorymonitor.h" |
| #include "ssmpipc.h" |
| #include "rts_msg.h" |
| #include "PortProcessCalls.h" |
| #include "seabed/ms.h" |
| #include "seabed/fs.h" |
| extern void my_mpi_fclose(); |
| #include "SCMVersHelp.h" |
| DEFINE_DOVERS(mxssmp) |
| |
| void runServer(Int32 argc, char **argv); |
| |
| void processAccumulatedStatsReq(SsmpNewIncomingConnectionStream *ssmpMsgStream, SsmpGlobals *ssmpGlobals); |
| |
| Int32 main(Int32 argc, char **argv) |
| { |
| dovers(argc, argv); |
| msg_debug_hook("mxssmp", "mxssmp.hook"); |
| |
| try { |
| file_init(&argc, &argv); |
| file_mon_process_startup(true); |
| } |
| catch (SB_Fatal_Excep &e) { |
| SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, e.what(), 0); |
| exit(1); |
| } |
| |
| atexit(my_mpi_fclose); |
| // setup log4cxx, need to be done here so initLog4cxx can have access to |
| // process information since it is needed to compose the log name |
| // the log4cxx log name for this ssmp process will be |
| // based on this process' node number as suffix sscp_<nid>.log |
| QRLogger::instance().setModule(QRLogger::QRL_SSMP); |
| QRLogger::instance().initLog4cxx("log4cxx.trafodion.ssmp.config"); |
| |
| // Synchronize C and C++ output streams |
| ios::sync_with_stdio(); |
| #ifdef _DEBUG |
| // Redirect stdout and stderr to files named in environment |
| // variables |
| const char *stdOutFile = getenv("SQ_SSCP_STDOUT"); |
| const char *stdErrFile = getenv("SQ_SSCP_STDERR"); |
| Int32 fdOut = -1; |
| Int32 fdErr = -1; |
| |
| if (stdOutFile && stdOutFile[0]) |
| { |
| fdOut = open(stdOutFile, |
| O_WRONLY | O_APPEND | O_CREAT | O_SYNC, |
| S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); |
| if (fdOut >= 0) |
| { |
| fprintf(stderr, "[Redirecting MXUDR stdout to %s]\n", stdOutFile); |
| fflush(stderr); |
| dup2(fdOut, fileno(stdout)); |
| } |
| else |
| { |
| fprintf(stderr, "*** WARNING: could not open %s for redirection: %s.\n", |
| stdOutFile, strerror(errno)); |
| } |
| } |
| |
| if (stdErrFile && stdErrFile[0]) |
| { |
| fdErr = open(stdErrFile, |
| O_WRONLY | O_APPEND | O_CREAT | O_SYNC, |
| S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); |
| if (fdErr >= 0) |
| { |
| fprintf(stderr, "[Redirecting MXUDR stderr to %s]\n", stdErrFile); |
| fflush(stderr); |
| dup2(fdErr, fileno(stderr)); |
| } |
| else |
| { |
| fprintf(stderr, "*** WARNING: could not open %s for redirection: %s.\n", |
| stdErrFile, strerror(errno)); |
| } |
| } |
| |
| runServer(argc, argv); |
| |
| if (fdOut >= 0) |
| { |
| close(fdOut); |
| } |
| if (fdErr >= 0) |
| { |
| close(fdErr); |
| } |
| #else |
| runServer(argc, argv); |
| #endif |
| return 0; |
| } |
| |
| void runServer(Int32 argc, char **argv) |
| { |
| Int32 shmId; |
| StatsGlobals *statsGlobals = (StatsGlobals *)shareStatsSegment(shmId); |
| Int32 r = 0; |
| while (statsGlobals == NULL && ++r < 10) |
| { // try 9 more times if the shared segement is not available |
| DELAY(100); // delay for 1 sec. |
| statsGlobals = (StatsGlobals *)shareStatsSegment(shmId); |
| } |
| if (statsGlobals == NULL) |
| { |
| char tmbuf[64]; |
| time_t now; |
| struct tm *nowtm; |
| now = time(NULL); |
| nowtm = localtime(&now); |
| strftime(tmbuf, sizeof tmbuf, "%Y-%m-%d %H:%M:%S ", nowtm); |
| |
| cout << tmbuf << "SSCP didn't create/initialize the RMS shared segment" |
| << ", SSMP exited\n"; |
| NAExit(0); |
| } |
| |
| CliGlobals *cliGlobals = CliGlobals::createCliGlobals(FALSE); |
| statsGlobals->setSsmpPid(cliGlobals->myPin()); |
| statsGlobals->setSsmpTimestamp(cliGlobals->myStartTime()); |
| short error = statsGlobals->openStatsSemaphore( |
| statsGlobals->getSsmpProcSemId()); |
| ex_assert(error == 0, "Error in opening the semaphore"); |
| |
| cliGlobals->setStatsGlobals(statsGlobals); |
| cliGlobals->setSharedMemId(shmId); |
| |
| // Handle possibility that the previous instance of MXSSMP died |
| // while holding the stats semaphore. This code has been covered in |
| // a manual unit test, but it is not possible to cover this easily in |
| // an automated test. |
| if (statsGlobals->getSemPid() != -1) |
| { |
| NAProcessHandle prevSsmpPhandle((SB_Phandle_Type *) |
| statsGlobals->getSsmpProcHandle()); |
| prevSsmpPhandle.decompose(); |
| if (statsGlobals->getSemPid() == prevSsmpPhandle.getPin()) |
| { |
| NAProcessHandle myPhandle; |
| myPhandle.getmine(); |
| myPhandle.decompose(); |
| int error = |
| statsGlobals->releaseAndGetStatsSemaphore( |
| statsGlobals->getSsmpProcSemId(), |
| (pid_t) myPhandle.getPin(), |
| (pid_t) prevSsmpPhandle.getPin()); |
| ex_assert(error == 0, |
| "releaseAndGetStatsSemaphore() returned error"); |
| |
| statsGlobals->releaseStatsSemaphore( |
| statsGlobals->getSsmpProcSemId(), |
| (pid_t) myPhandle.getPin()); |
| } |
| } |
| |
| XPROCESSHANDLE_GETMINE_(statsGlobals->getSsmpProcHandle()); |
| |
| NAHeap *ssmpHeap = cliGlobals->getExecutorMemory(); |
| |
| IpcEnvironment *ssmpIpcEnv = new (ssmpHeap) IpcEnvironment(ssmpHeap, |
| cliGlobals->getEventConsumed(), FALSE, IPC_SQLSSMP_SERVER, |
| FALSE, TRUE); |
| |
| SsmpGlobals *ssmpGlobals = new (ssmpHeap) SsmpGlobals(ssmpHeap, ssmpIpcEnv, |
| statsGlobals); |
| |
| // Currently open $RECEIVE with 2048 |
| SsmpGuaReceiveControlConnection *cc = |
| new (ssmpHeap) SsmpGuaReceiveControlConnection(ssmpIpcEnv, |
| ssmpGlobals, |
| 2048); |
| |
| ssmpIpcEnv->setControlConnection(cc); |
| |
| while (TRUE) |
| { |
| |
| /* |
| * Until ssmp starts receiving messages, disable this check. |
| * We need ssmp to wake up periodically to perform garbage collection. |
| * |
| // wait for the first open message to come in |
| while (cc->getConnection() == NULL) |
| cc->wait(IpcInfiniteTimeout); |
| |
| // start the first receive operation |
| #ifdef _DEBUG_RTS |
| cerr << "No. of Requesters-1 " << cc->getNumRequestors() << " \n"; |
| #endif |
| while (cc->getNumRequestors() > 0) |
| for (;;) |
| { |
| ssmpGlobals->work(); |
| } |
| } |
| */ |
| // wait for system messages only until ssmp starts receiving msgs. |
| cc->wait(300); |
| // go do GC. |
| ssmpGlobals->work(); |
| } |
| } |
| |