blob: b53a40ebe8d4e524379fe70bb8f0e3427ffda761 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
#ifndef EXSM_GLOBALS_H
#define EXSM_GLOBALS_H
// The ExSMGLobals class by the SQL main thread and SM reader
// thread. If a function is guaranteed to only be called by the main
// thread, for example the ExSMGlobals constructor, that function can
// use SQL classes such as NAMemory, ComDiagsArea, etc. If a function
// can possibly be called by the reader thread it should hae no
// dependencies on SQL because our SQL code is generally not
// thread-safe.
#include <sys/types.h>
#include <unistd.h>
#include "ExSMCommon.h"
#include "ExSMTaskList.h"
#include "ExSMReadyList.h"
#include "ExSMTrace.h"
class ExSMGlobals;
class ComDiagsArea;
class NAMemory;
class ex_tcb;
class ExExeStmtGlobals;
class SMConnection;
class ExSMGlobals
{
public:
// Thread states used for initialization and exit handling
//
// The normal state progression for the main thread is:
// NOT_STARTED -> STARTED -> DONE
//
// The normal state progression for the reader thread is:
// NOT_STARTED -> STARTED -> PROCESSING_SHUTDOWN -> DONE
//
// Or if the reader thread encounters a NO SERVICE error from SM:
// NOT_STARTED -> STARTED -> TERMINATED_DUE_TO_ERROR
//
enum ThreadState
{
// Main thread and reader thread
NOT_STARTED = 0,
STARTED,
DONE,
// Reader thread only
PROCESSING_SHUTDOWN,
TERMINATED_DUE_TO_ERROR
};
// Constants used for SM message handling
enum MsgConstants
{
// How long to wait for reader thread to start - in seconds
READER_STARTUP_WAIT = 120,
// How long to wait for reader thread to shutdown - in seconds
READER_SHUTDOWN_WAIT = 120
};
// *** THIS FUNCTION DEPENDS ON SQL CODE AND SHOULD ONLY BE CALLED
// *** FROM THE MAIN SQL THREAD. DO NOT CALL THIS FUNCTION FROM THE
// *** SM READER THREAD.
ExSMGlobals(bool isMasterExecutor);
ExSMGlobals(); // Do not implement
virtual ~ExSMGlobals();
static ExSMGlobals *GetExSMGlobals() { return smGlobals_; }
static bool getSMInitialized() { return smInitialized_; }
const char *getThreadStateString(ThreadState ts) const;
// Static member functions to reserve SM IDs. One ID is reserved
// in each process for executor internal communication. For example,
// the SHUTDOWN message from the main thread to the reader thread in
// the same process.
static int64_t reserveSMID();
static int64_t getExeInternalSMID() { return exeInternalSMID_; }
// *** THIS FUNCTION DEPENDS ON SQL CODE AND SHOULD ONLY BE CALLED
// *** FROM THE MAIN SQL THREAD. DO NOT CALL THIS FUNCTION FROM THE
// *** SM READER THREAD.
static ExSMGlobals *InitSMGlobals(ExExeStmtGlobals *stmtGlob);
// *** THIS FUNCTION DEPENDS ON SQL CODE AND SHOULD ONLY BE CALLED
// *** FROM THE MAIN SQL THREAD. DO NOT CALL THIS FUNCTION FROM THE
// *** SM READER THREAD.
ExSMTask *addTask(const sm_target_t &tgt,
uint32_t queueSize,
int32_t *scheduledAddr,
NAMemory *heap,
ex_tcb *tcb,
SMConnection *smConnection_);
static void addDiags(const char *functionName,
int32_t rc,
ExExeStmtGlobals *stmtGlob);
void addReaderThreadError(ExExeStmtGlobals *stmtGlob);
void removeTask(ExSMTask *t);
bool isMasterExecutor() const { return master_; }
ExSMTaskList *getSMTaskList() { return &smTaskList_; }
ExSMReadyList *getReadyList() { return &readyList_; }
ThreadState getMainThreadState() { return mainThreadState_; }
void setMainThreadState(ThreadState s) { mainThreadState_ = s; }
ThreadState getReaderThreadState() const { return readerThreadState_; }
// Set the reader thread state. Optionally:
// * acquire and release the reader thread state lock
// * signal the reader thread state condition variable
void setReaderThreadState(ThreadState s,
bool doLocking = true,
bool doSignal = false);
pid_t getMainThreadPID() { return mainThreadPID_; }
int32_t getSQNodeNum() { return sqNodeNum_; }
pthread_t getMainThreadTID() { return mainThreadThreadId_; }
pthread_t getReaderThreadTID() { return readerThreadThreadId_; }
pthread_mutex_t *getReaderThreadStateLock()
{ return &readerThreadStateLock_; }
pthread_cond_t *getReaderThreadStateCond()
{ return &readerThreadStateCond_; }
uint32_t getSendRequestCount() const { return sendRequestCount_; }
uint32_t incrSendRequestCount() { return ++sendRequestCount_; }
uint32_t getSendReplyCount() const { return sendReplyCount_; }
uint32_t incrSendReplyCount() { return ++sendReplyCount_; }
uint32_t getSendShortCount() const { return sendShortCount_; }
uint32_t incrSendShortCount() { return ++sendShortCount_; }
uint32_t getRecvRequestCount() const { return recvRequestCount_; }
uint32_t incrRecvRequestCount() { return ++recvRequestCount_; }
uint32_t getRecvReplyCount() const { return recvReplyCount_; }
uint32_t incrRecvReplyCount() { return ++recvReplyCount_; }
uint32_t getRecvShortCount() const { return recvShortCount_; }
uint32_t incrRecvShortCount() { return ++recvShortCount_; }
bool getTraceEnabled() const { return traceEnabled_; }
void setTraceEnabled(bool b);
void setTraceLevel(uint32_t lvl) { traceLevel_ = lvl; }
uint32_t getTraceLevel() const { return traceLevel_; }
const char * getTraceFilePrefix() const { return traceFilePrefix_; }
void setTraceFilePrefix(const char *pref) { traceFilePrefix_ = pref; }
const char *getSessionID() const { return sessionID_; }
NAMemory *getThreadSafeHeap() { return threadSafeHeap_; }
// Counter for SM fixup replies. See comments below with the data
// member.
static uint32_t getFixupReplyCount() { return fixupReplyCount_; }
static void incrFixupReplyCount() { fixupReplyCount_++; }
static void initFixupReplyCount() { fixupReplyCount_ = 0; }
const Int32 getReaderThreadSmErrorNumber() { return readerThreadSmErrorNumber_; }
void setReaderThreadSmErrorNumber(Int32 error) { readerThreadSmErrorNumber_ = error; }
const char * getReaderThreadSmErrorFunction() { return readerThreadSmErrorFunction_; }
void setReaderThreadSmErrorFunction(const char* function)
{ strcpy(readerThreadSmErrorFunction_, function); }
void handleReaderThreadError(int32_t rc,
const char *function,
sm_handle_t dataHandle,
ExSMTaskList *smTaskList = NULL);
protected:
// Create a session ID string that becomes part of the output file
// name when tracing is enabled
static const char *createSessionID(ExExeStmtGlobals *stmtGlob);
static bool smInitialized_;
static ExSMGlobals *smGlobals_;
bool master_;
bool traceEnabled_;
uint32_t traceLevel_;
const char *traceFilePrefix_;
const char *sessionID_;
ExSMTaskList smTaskList_;
ExSMReadyList readyList_;
pid_t mainThreadPID_;
int32_t sqNodeNum_;
uint32_t sendRequestCount_;
uint32_t sendReplyCount_;
uint32_t sendShortCount_;
uint32_t recvRequestCount_;
uint32_t recvReplyCount_;
uint32_t recvShortCount_;
// A global increasing counter is used to store the next available
// SeaMonster ID. In the master executor every new query is given a new
// ID.
//
// We reserve one ID for communication from the main thread to
// the reader thread in the same process. The main thread calls SM
// to initialize this ID right after a successful call to
// SM_init. The reader thread cancels this ID after receiving a
// SHUTDOWN message from the main thread.
//
// The ID for executor internal communication will be 1
//
// The IDs assigned to queries will start at 2 and go up
static int64_t nextAvailableSMID_;
static int64_t exeInternalSMID_;
// Main thread: thread ID and state
pthread_t mainThreadThreadId_;
ThreadState mainThreadState_;
// Reader thread: thread ID and state
pthread_t readerThreadThreadId_;
ThreadState readerThreadState_;
Int32 readerThreadSmErrorNumber_;
char readerThreadSmErrorFunction_[32];
// A lock and condition variable for accessing reader thread state
pthread_mutex_t readerThreadStateLock_;
pthread_cond_t readerThreadStateCond_;
// The thread-safe heap gives the reader thread the ability to
// allocate IPC message buffers. Sometimes those buffers are
// deallocated by the main thread and access to the heap will be
// serialized by a mutex.
NAMemory *threadSafeHeap_;
// During download of a new query to ESPs if the query uses SM, we
// expect ESPs to reply two ways. They reply on the Seabed control
// connection and with a short message over SM. This global
// variable counts the SM replies.
//
// Note: we are assuming that in the master only one download is in
// progress at any given time.
static uint32_t fixupReplyCount_;
}; // class ExSMGlobals
#endif // EXSM_GLOBALS_H