blob: 0413d53714e513249d7ba6fc81b47525dd6a715a [file] [log] [blame]
// **********************************************************************
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// @@@ END COPYRIGHT @@@
// **********************************************************************
#ifndef _QMS_H_
#define _QMS_H_
#include "QRIpc.h"
#include "QmsRequest.h"
#include "QRLogger.h"
#include "Collections.h"
#ifndef NA_LINUX
#include <zsysc.h>
#include "seabed/fs.h"
#include "seabed/ms.h"
#include "seabed/int/opts.h"
#include <sys/time.h>
#include "nsk/nskprocess.h"
extern "C" {
#include "cextdecs/cextdecs.h"
#include "zsysc.h"
// Classes defined in this file.
class QmmException;
class QmmGuaReceiveControlConnection;
class Qmm;
class QRProcessStub;
class QmsStub;
class QmpStub;
class QmmMessageStream;
// This is an extension of the QR namespace.
namespace QR
const short MAX_SEGMENTS = 32;
const short CPUS_PER_SEGMENT = 16;
const short SEGMENT_NAME_LEN = 8; // leading \, 5 for sys name, 2 for seg#
// Command-line option determines how we start QMP.
enum StartOpt
SPAWN, // use PROCESS_SPAWN_ directly
SERVER, // use allocateServerProcess()
NONE // don't start QMP; will be done separately
// Command-line option determines how we listen for incoming messages.
enum ListenOpt
using namespace QR;
* Exception thrown for an error in QMM processing.
class QmmException : public QRException
* Creates an exception with text consisting of the passed template filled in
* with the values of the other arguments.
* @param[in] msgTemplate Template for construction of the full message;
* contains printf-style placeholders for arguments,
* passed as part of a variable argument list.
* @param[in] ... Variable argument list, consisting of a value for each
* placeholder in the message template.
QmmException(const char *msgTemplate ...)
: QRException()
qrBuildMessage(msgTemplate, msgBuffer_);
virtual ~QmmException()
}; //QmmException
class QmmGuaReceiveControlConnection : public GuaReceiveControlConnection
QmmGuaReceiveControlConnection(IpcEnvironment* env, Qmm* qmm)
: GuaReceiveControlConnection(env),
virtual ~QmmGuaReceiveControlConnection()
virtual void actOnSystemMessage(short messageNum,
IpcMessageBufferPtr sysMsg,
IpcMessageObjSize sysMsgLen,
short clientFileNumber,
const GuaProcessHandle& clientPhandle,
GuaConnectionToClient* connection);
Qmm* qmm_;
}; // QmmGuaReceiveControlConnection
class Qmm : public NABasicObject
static Qmm* getInstance(CollHeap* heap = NULL)
if (!instance_)
instance_ = new Qmm(heap);
return instance_;
const IpcEnvironment* getEnvironment() const
return ipcEnv_;
void allocateQmsPool();
void checkAndRetryQms(Int16 maxRetries = 3, Int16 delaySeconds = 20);
void allocateQms(); // for Windows testing
void startQmp(short cpu);
void startQms();
void executeMessageLoop();
//QRRequestResult handlePublishRequest(QRMessageRequest& request);
QRRequestResult handlePublishRequest(QRMessageStream* msgStream);
QRRequestResult handleAllocateRequest(QRMessageStream* msgStream);
* Processes a request originating from the message interface. Currently, the
* only supported request for QMM is a Publish message from QMP. A message
* object for the response is created and returned as the function value.
* @param request The message-based request.
* @return Message object to be returned as the response to this request.
//QRMessageObj* processRequestMessage(QRMessageRequest& request);
QRMessageObj* processRequestMessage(QRMessageStream* msgStream);
void handleClientExit(const short* phandle, short messageNum);
void setListenOpt(ListenOpt opt)
listenOpt_ = opt;
void setQmpStartOpt(StartOpt startOpt)
qmpStartOpt_ = startOpt;
QmsStub* getQmsStub(Int32 segNum, short cpuNum)
return qmsPool_[(segNum-1) * CPUS_PER_SEGMENT + cpuNum];
void relayPendingPubsToQms();
Qmm(CollHeap* heap);
virtual ~Qmm() //@ZX -- call freeServerProcess for qmp, qms's
delete qmsServerClass_;
//delete qmsMsgStream_;
IpcTimeout getWaitTimeout();
static Qmm* instance_;
QmsStub** qmsPool_;
short qmsCount_;
QmpStub* qmp_;
IpcEnvironment* ipcEnv_;
IpcServerClass* qmsServerClass_;
//QRMessageStream* qmsMsgStream_;
NAList<QRXmlMessageObj*> pendingPubs_;
ListenOpt listenOpt_;
StartOpt qmpStartOpt_;
CollHeap* heap_;
//class HeadQmm : public Qmm
class QRProcessStub : public NABasicObject
static void checkRestarts();
static const NAList<QRProcessStub*> getRestartList()
return restartList_;
QRProcessStub(CollHeap* heap);
virtual ~QRProcessStub()
#ifdef NA_LINUX
Int32 operator==(SB_Phandle_Type ph) const
return !memcmp((char*)&processHandle_, (char*)&ph, sizeof(SB_Phandle_Type));
SB_Phandle_Type getProcessHandle() const
return processHandle_;
void setProcessHandle(SB_Phandle_Type ph)
memcpy(&processHandle_, &ph, sizeof(SB_Phandle_Type));
//int operator==(const zsys_ddl_phandle_def& ph) const
Int32 operator==(const short* ph) const
// @ZXros -- following won't compile now on Windows. Used in Qmm::handleClientExit() to see who died.
#ifdef NA_NSK
//return !strncmp(processHandle_.u_z_data.z_byte, ph.u_z_data.z_byte, 20);
return !memcmp((char*)processHandle_, (char*)ph, 20);
return 0;
const short* getProcessHandle() const
return &processHandle_[0];
//void setProcessHandle(const zsys_ddl_phandle_def& ph)
void setProcessHandle(const short* ph)
//processHandle_ = ph;
memcpy((char*)processHandle_, (char*)ph, 20);
//sizeof(zsys_ddl_phandle_def));// zsys_ddl_phandle_def_Size);
Int64 getLockoutEndTS() const
return lockoutEndTS_;
* Nulls out the process handle. A null process handle consists of all -1s.
void nullProcessHandle();
void scheduleRestart();
virtual NABoolean start() = 0;
static NAList<QRProcessStub*> restartList_;
void setLockout();
//zsys_ddl_phandle_def processHandle_;
#ifdef NA_LINUX
SB_Phandle_Type processHandle_;
short processHandle_[10];
Int64 lockoutEndTS_;
Lng32 retryNumber_;
CollHeap* heap_;
class QmsStub : public QRProcessStub
// Qmm class will call this from its ctor.
static void setQmsServerClass(IpcServerClass* serverClass)
qmsServerClass_ = serverClass;
enum Status
QmsStub(short segmentNumber, char* segmentName, short cpuNumber,
short segmentStatus, NABoolean cpuExists,
NABoolean unreachableCpu,
IpcEnvironment* ipcEnv, //QRMessageStream* qmsMsgStream,
CollHeap* heap);
virtual ~QmsStub()
delete qmsProcessName_;
delete qmsMsgStream_;
Status getStatus() const
return status_;
short getCpuNumber() const
return cpuNumber_;
short getSegmentNumber() const
return segmentNumber_;
void setStatus(Status newStatus)
status_ = newStatus;
virtual NABoolean start();
virtual void disable(Status reason);
void publish(QRXmlMessageObj* xmlMsgObj);
QmsStub(const QmsStub&);
Int32 operator=(const QmsStub&);
static IpcServerClass* qmsServerClass_;
short segmentNumber_;
char segmentName_[SEGMENT_NAME_LEN + 1];
short cpuNumber_;
short segmentStatus_;
char* qmsProcessName_;
IpcServer* qmsServer_;
Status status_;
QRMessageStream* qmsMsgStream_;
class QmpStub : public QRProcessStub
QmpStub(IpcEnvironment& ipcEnv, StartOpt qmpStartOpt,
short cpu, CollHeap* heap)
: QRProcessStub(heap),
virtual ~QmpStub()
{ //@ZX
virtual NABoolean start();
void spawnProcess(IpcEnvironment& ipcEnv, short cpu);
void allocateProcess(IpcEnvironment& ipcEnv, short cpu);
#ifdef NA_WINNT
void launchNSKLiteProcess(IpcEnvironment& ipcEnv, short p_pe);
QmpStub(const QmpStub&);
Int32 operator=(const QmpStub&);
static IpcServerClass* qmpServerClass_;
IpcServer* qmpServer_;
IpcEnvironment& ipcEnv_;
StartOpt qmpStartOpt_;
short cpu_;
NABoolean debugQmp_;
class QmmMessageStream : public QRMessageStream
* Creates a message stream used to convey Query Rewrite message objects.
* @param *env The IPC environment containing the stream.
* @param logger Logger to use.
* @param thisEnd Name of the program unit defining the stream (used only
* for logging.
* @param heap Heap used for dynamic allocation.
* @param msgType Type of messages carried by the stream.
QmmMessageStream(IpcEnvironment *env,
const NAString& thisEnd,
Qmm* qmm,
NAMemory* heap = NULL,
IpcMessageType msgType = UNSPECIFIED_QR_MESSAGE)
: QRMessageStream(env, thisEnd, heap, msgType),
* Callback function invoked after a message is sent through the stream.
* @param connection The IpcConnection through which the message has been
* sent.
//virtual void actOnSend(IpcConnection* connection);
* Callback function invoked after a message is received through the stream.
* @param connection The IpcConnection through which the message has been
* received.
virtual void actOnReceive(IpcConnection* connection);
virtual void actOnSendAllComplete()
Qmm* qmm_;
}; // QmmMessageStream
#endif /* _QMS_H_ */