blob: 7e1a4b3cab579e9bbff985100dd3b6017aafd840 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExUdrServer.cpp
* Description: Client-side process management for UDR servers
*
* Created: 08/16/2000
* Language: C++
*
*
*****************************************************************************
*/
#include "ex_stdh.h"
#include "ExUdrServer.h"
#include "ExUdrClientIpc.h"
#include "ExpError.h"
#include "ExCextdecs.h"
#include "ComRtUtils.h"
#include "PortProcessCalls.h"
#include "seabed/fs.h"
#include "seabed/ms.h"
#ifdef UDR_DEBUG
extern const char *GetWorkRetcodeString(ExWorkProcRetcode r);
/*
static const char *GetStatusString(ExUdrServer::ExUdrServerStatus s)
{
switch (s)
{
case ExUdrServer::EX_UDR_SUCCESS:
return "Success";
break;
case ExUdrServer::EX_UDR_WARNING:
return "Warning";
break;
case ExUdrServer::EX_UDR_ERROR:
return "Error";
break;
}
return "***UNKNOWN***";
} */
#define UdrDebug0(s) \
( UdrPrintf(traceFile_,(s)) )
#define UdrDebug1(s,a1) \
( UdrPrintf(traceFile_,(s),(a1)) )
#define UdrDebug2(s,a1,a2) \
( UdrPrintf(traceFile_,(s),(a1),(a2)) )
#define UdrDebug3(s,a1,a2,a3) \
( UdrPrintf(traceFile_,(s),(a1),(a2),(a3)) )
#define UdrDebug4(s,a1,a2,a3,a4) \
( UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4)) )
#define UdrDebug5(s,a1,a2,a3,a4,a5) \
( UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4),(a5)) )
#else
//
// Debug macros are no-ops in the release build
//
#define UdrDebug0(s)
#define UdrDebug1(s,a1)
#define UdrDebug2(s,a1,a2)
#define UdrDebug3(s,a1,a2,a3)
#define UdrDebug4(s,a1,a2,a3,a4)
#define UdrDebug5(s,a1,a2,a3,a4,a5)
#endif // UDR_DEBUG
//
// Helper functions allowing the ExUdrServer class and its callers to
// determine if a process ID is NULL and to nullify a process
// ID. Internally in our IpcProcessId objects we consider a valid
// process ID to be anything with a domain other than IPC_DOM_INVALID.
// The default IpcProcessId constructor sets the domain to
// IPC_DOM_INVALID so this constructor can be used to instantiate a
// NULL process ID.
//
NABoolean ProcessIdIsNull(const IpcProcessId &id)
{
NABoolean result = FALSE;
if (id.getDomain() == IPC_DOM_INVALID)
{
result = TRUE;
}
return result;
}
void InvalidateProcessId(IpcProcessId &id)
{
//
// The default IpcProcessId constructor is used to instantiate
// a process ID with domain IPC_DOM_INVALID.
//
IpcProcessId nullPid;
//
// Invoke the IpcProcessId assignment operator to invalidate
// id. Arguments to this operator are passed by reference, not on
// the stack.
//
id = nullPid;
}
// -----------------------------------------------------------------------
// ExUdrServer
// -----------------------------------------------------------------------
ExUdrServer::ExUdrServer(IpcEnvironment *env,
const Int32 &userId,
const char *options,
const char *optionDelimiters,
const char *userName,
const char *userPassword,
IpcServerClass *serverClass)
: state_(EX_UDR_NOT_STARTED),
ipcEnvironment_(env),
udrServerClass_(serverClass),
ipcServer_(NULL),
serverProcessId_(),
startAttempts_(0),
userId_(userId),
userName_(NULL),
userPassword_(NULL),
options_(NULL),
optionDelimiters_(NULL),
refCount_(0),
dedicated_(FALSE),
inUse_(FALSE),
inUseConns_(NULL),
freeConns_(NULL)
#ifdef UDR_DEBUG
, traceFile_(NULL)
#endif
{
ex_assert(options && optionDelimiters,
"No runtime options specified for UDR server startup");
CollHeap *h = myIpcHeap();
Int32 len = str_len(options);
options_ = new (h) char[len + 1];
str_cpy_all(options_, options, len + 1);
len = str_len(optionDelimiters);
optionDelimiters_ = new (h) char[len + 1];
str_cpy_all(optionDelimiters_, optionDelimiters, len + 1);
if (userName)
{
len = str_len(userName);
userName_ = new (h) char[len + 1];
str_cpy_all(userName_, userName, len + 1);
}
if (userPassword)
{
len = str_len(userPassword);
userPassword_ = new (h) char[len + 1];
str_cpy_all(userPassword_, userPassword, len + 1);
}
inUseConns_ = new (h) NAList<IpcConnection *>(h);
freeConns_ = new (h) NAList<IpcConnection *>(h);
}
ExUdrServer::~ExUdrServer()
{
UdrDebug1("[BEGIN ExUdrServer destructor] %p", this);
stop();
CollHeap *h = myIpcHeap();
NADELETEBASIC(options_, h);
NADELETEBASIC(optionDelimiters_, h);
NADELETEBASIC(userName_, h);
NADELETEBASIC(userPassword_, h);
UdrDebug1("[END ExUdrServer destructor] %p", this);
}
CollHeap *ExUdrServer::myIpcHeap() const
{
return myIpcEnv()->getHeap();
}
//
// Bring a UDR Server process to life if one hasn't been
// started already
//
ExUdrServer::ExUdrServerStatus ExUdrServer::start(ComDiagsArea **diags,
CollHeap *diagsHeap,
Int64 transId,
IpcProcessId &newId,
NABoolean usesTransactions)
{
#ifdef UDR_DEBUG
UdrDebug1("[BEGIN ExUdrServer::start()] %p", this);
UdrDebug1(" Startup options '%s'", options_);
UdrDebug1(" Startup option delimiters '%s'", optionDelimiters_);
if (diags && *diags)
{
Lng32 numDiags = (*diags)->getNumber();
UdrDebug1(" The diagnostics area initially contains %d entries",
numDiags);
}
else
{
UdrDebug0(" No diagnostics area exists yet");
}
#endif // UDR_DEBUG
// The newId and result variables hold our return values. We will
// assume failure initially and set the values to something else
// once we are sure we have succeeded.
InvalidateProcessId(newId);
ExUdrServerStatus result = EX_UDR_ERROR;
if (ready())
{
#ifdef UDR_DEBUG
char buf[300];
serverProcessId_.toAscii(buf, 300);
UdrDebug1(" A server is already running. Process ID %s", buf);
#endif // UDR_DEBUG
newId = serverProcessId_;
result = EX_UDR_SUCCESS;
}
else
{
stop();
UdrDebug0(" About to start the UDR server...");
// Notes on UDR Server startup
// - We are using a nowait depth of 2 for first connection. SPJs
// that cannot return RS use first connection for IPC.
// We use nowait depth of 3 for all other connections used by SPJs
// that can return RS. Look at getAnIpcConnection() for code details.
//
// - By specifying IPC_CPU_DONT_CARE as the CPU number in this
// call to allocateServerProcess() we get the default behavior
// from Guardian which is to start the process on the same CPU as
// the caller.
Lng32 nowaitDepth = 2;
#ifdef _DEBUG
char *e = getenv("UDR_NOWAIT_DEPTH");
if (e && e[0])
nowaitDepth = atol(e);
#endif
UdrDebug1(" Using a nowait depth of %d", nowaitDepth);
NABoolean waitedCreation = TRUE;
// co-locate the tdm_udrserv with the executor process (master or ESP)
// This is done for a couple of reasons: One is that since the ESPs
// are evenly balanced across the CPUs, this ensures an even distribution
// of the tdm_udrservs as well, probably better than a random distribution.
// The second reason is that for certain maintenance UDFs (only example
// so far is udf(event_log_reader())), we must ensure that we run one
// tdm_udrserv on each node of the cluster, and we do that by starting
// one ESP per node.
IpcCpuNum collocatedCPU =
myIpcEnv()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).getCpuNum();
ipcServer_ =
udrServerClass_->allocateServerProcess(diags,
diagsHeap,
NULL,
collocatedCPU,
1, // espLevel (not relevant for UDR servers)
usesTransactions,
waitedCreation,
nowaitDepth);
#ifdef UDR_DEBUG
UdrDebug1(" allocateServerProcess() returned %p", ipcServer_);
if (diags && *diags)
{
Lng32 numDiags = (*diags)->getNumber();
UdrDebug1(" The diagnostics area contains %d entries",
numDiags);
}
else
{
UdrDebug0(" No diagnostics area exists");
}
#endif // UDR_DEBUG
startAttempts_++;
if (diags && *diags)
{
Lng32 sqlcode = ((*diags)->mainSQLCODE());
if (sqlcode < 0)
{
//
// An error occurrred
//
// $$$$
// Need to verify whether we can release the server class
// instance Looks like allocateServerProcess() can return
// non-NULL and also generate diagnostics. If that happens and
// we call ipcServer_->release() an assertion fails because
// ipcServer_ does not yet have a valid process handle. The
// assertion is in IpcProcessId::getPhandle().
//
UdrDebug0(" ***");
UdrDebug1(" *** WARNING: Errors occurred. Main SQLCODE is %d",
sqlcode);
UdrDebug0(" ***");
ipcServer_ = NULL;
}
}
if (ipcServer_ && ipcServer_->getControlConnection())
{
// We enter this block once a server process has successfully
// been started.
setState(EX_UDR_READY);
// Record the process ID
serverProcessId_ = ipcServer_->getControlConnection()->getOtherEnd();
#ifdef UDR_DEBUG
char buf[300];
serverProcessId_.toAscii(buf, 300);
UdrDebug1(" A new server was started. Process ID %s", buf);
#endif // UDR_DEBUG
// Set a flag in the control connection indicating whether or
// not to perform integrity checks on incoming buffers.
NABoolean trust = FALSE;
#ifdef UDR_DEBUG
if (getenv("UDR_TRUST_REPLIES"))
{
trust = TRUE;
}
#endif
ipcServer_->getControlConnection()->setTrustIncomingBuffers(trust);
// Send down any requested runtime options. Right now the only
// options we support are JVM startup options.
sendStartupOptions(diags, diagsHeap, transId);
} // if (ipcServer_ && ipcServer_->getControlConnection())
if (ready())
{
newId = serverProcessId_;
result = EX_UDR_SUCCESS;
}
else
{
UdrDebug0(" Unable to start the UDR server");
//
// If ready() is not TRUE and no diagnostics have been created yet,
// create the generic "Unable to receive reply from MXUDR" diagnostic
// here.
//
Lng32 numDiags = 0;
if (diags && *diags)
{
numDiags = (*diags)->getNumber();
}
if (numDiags == 0)
{
UdrDebug0(" ***");
UdrDebug0(" *** WARNING: Errors occurred but no diagnostics created");
UdrDebug0(" ***");
if (diags)
{
if (!(*diags))
{
*diags = ComDiagsArea::allocate(diagsHeap);
}
**diags << DgSqlCode(-EXE_UDR_REPLY_ERROR);
}
}
stop();
result = EX_UDR_ERROR;
} // if (!ready())
} // if (ready()) ... else ...
UdrDebug1("[END ExUdrServer::start()] %p", this);
return result;
}
void ExUdrServer::sendStartupOptions(ComDiagsArea **diags,
CollHeap *diagsHeap,
Int64 transId)
{
// Send down any requested runtime options. Right now the only
// options we support are JVM startup options. To do the work we
// will allocate a stream and a message on the IPC heap. The
// message gets cleaned up eventually when its reference count
// reaches zero. The stream takes care of its own cleanup by
// putting itself on the IPC environment's list of "completed"
// streams, and cleanup of that list is always guaranteed to be
// done at a safe time.
// This method should only be called after startup was successful
ex_assert(ipcServer_ && ipcServer_->getControlConnection(),
"Do not call this method without a running server");
UdrDebug0(" About to send startup options to the server");
NAMemory *ipcHeap = myIpcHeap();
// If the options_ is set to OFF or ANYTHING and there is
// no userName_ then we don't have anything to do here
if (userName_ == NULL &&
(str_cmp_ne(options_, "OFF") == 0 || str_cmp_ne(options_, "ANYTHING") == 0))
{
return ;
}
// Send the user name also in startup options as
// "-Dsqlmx.udr.username=userName_"
char *userNameOption = NULL;
Int32 userNameOptionLen = 0;
if (userName_)
{
const char *userNamePrefix = "-Dsqlmx.udr.username=";
Int32 userNamePrefixLen = str_len(userNamePrefix);
Int32 userNameLen = str_len(userName_);
userNameOption = new (ipcHeap) char[userNamePrefixLen + userNameLen + 1];
str_sprintf(userNameOption, "%s%s", userNamePrefix, userName_);
userNameOptionLen = str_len(userNameOption);
}
// Send the user password also in startup options as
// "-Dsqlmx.udr.password=userPassword_"
char *passwordOption = NULL;
Int32 passwordOptionLen = 0;
if (userPassword_)
{
const char *passwordPrefix = "-Dsqlmx.udr.password=";
Int32 passwordPrefixLen = str_len(passwordPrefix);
Int32 passwordLen = str_len(userPassword_);
passwordOption = new (ipcHeap) char[passwordPrefixLen + passwordLen + 1];
str_sprintf(passwordOption, "%s%s", passwordPrefix, userPassword_);
passwordOptionLen = str_len(passwordOption);
}
char *optionsToSend = NULL;
Int32 delimiterLen = 1;
Int32 tmpLen = 0;
if (str_cmp_ne(options_, "OFF") != 0 &&
str_cmp_ne(options_, "ANYTHING") != 0)
{
if (userName_)
{
Int32 optionsLen = str_len(options_);
Int32 len = optionsLen + delimiterLen + userNameOptionLen
+ delimiterLen + passwordOptionLen;
optionsToSend = new (ipcHeap) char[len + 1];
str_cpy_all(optionsToSend, options_, optionsLen);
optionsToSend[optionsLen] = optionDelimiters_[0];
// Copy the user name
str_cpy_all(optionsToSend + optionsLen + delimiterLen,
userNameOption,
userNameOptionLen);
tmpLen = optionsLen + delimiterLen + userNameOptionLen;
optionsToSend[tmpLen] = optionDelimiters_[0];
// Copy the user password
str_cpy_all(optionsToSend + tmpLen + delimiterLen,
passwordOption,
passwordOptionLen);
optionsToSend[len] = '\0';
}
else
{
Int32 optionsLen = str_len(options_);
optionsToSend = new (ipcHeap) char[optionsLen + 1];
str_cpy_all(optionsToSend, options_, optionsLen + 1);
}
}
else
{
if (userName_)
{
if (userPassword_)
{
Int32 len = userNameOptionLen + delimiterLen + passwordOptionLen;
optionsToSend = new (ipcHeap) char[len + 1];
// Copy the user name
str_cpy_all(optionsToSend, userNameOption, userNameOptionLen);
optionsToSend[userNameOptionLen] = ' ';
// Copy the user password
str_cpy_all(optionsToSend + userNameOptionLen + delimiterLen,
passwordOption, passwordOptionLen);
optionsToSend[len] = '\0';
}
else
{
optionsToSend = new (ipcHeap) char[userNameOptionLen + 1];
// Copy the user name
str_cpy_all(optionsToSend, userNameOption, userNameOptionLen + 1);
}
}
else
{
// No need to send any options. We never come here because this
// case is already checked above.
return;
}
}
NABoolean isTransactional = (transId == -1 ? FALSE : TRUE);
UdrClientControlStream *stream = new (ipcHeap)
UdrClientControlStream(myIpcEnv(),
NULL, // tcb
NULL, // stmt globals
TRUE, // keep diags for caller
isTransactional);
#ifdef UDR_DEBUG
if (traceFile_)
{
stream->setTraceFile(traceFile_);
}
#endif
UdrSessionMsg *msg = new (ipcHeap)
UdrSessionMsg(UdrSessionMsg::UDR_SESSION_TYPE_JAVA_OPTIONS,
0, ipcHeap);
msg->addString(optionsToSend);
msg->addString(optionDelimiters_);
IpcConnection *conn = ipcServer_->getControlConnection();
stream->addRecipient(conn);
*stream << *msg;
stream->send(TRUE, // TRUE indicates a waited send
transId);
msg->decrRefCount();
NADELETEBASIC(userNameOption, ipcHeap);
NADELETEBASIC(passwordOption, ipcHeap);
NADELETEBASIC(optionsToSend, ipcHeap);
//--------------------------------------------------------------------
// We just completed a waited send of the startup options. There
// are a couple of error checks we need to perform now.
// a) The stream may have encountered IPC errors and put itself into
// an error state.
// b) The IPC could have been successful but the server may have
// returned SQL diags in its reply. Our UDR-specific message
// stream subclass caches these diags for us.
//--------------------------------------------------------------------
// This boolean will track whether we need to add a generic
// "Unable to receive reply from MXUDR" condition to the diags
// area.
NABoolean addUdrCondition = FALSE;
// a) Did the stream put itself into an error state?
if (stream->getErrorInfo() != 0)
{
UdrDebug0(" The message stream encountered errors");
addUdrCondition = TRUE;
if (diags)
{
if (*diags == NULL)
{
*diags = ComDiagsArea::allocate(diagsHeap);
}
conn->populateDiagsArea(*diags, diagsHeap);
}
stop();
}
else
{
// b) Did the server return diags?
ComDiagsArea *diagsFromServer = stream->extractUdrDiags();
if (diagsFromServer)
{
UdrDebug0(" The server returned diagnostics in its reply");
if (diags)
{
addUdrCondition = TRUE;
if (*diags == NULL)
{
*diags = ComDiagsArea::allocate(diagsHeap);
}
(*diags)->mergeAfter(*diagsFromServer);
}
diagsFromServer->decrRefCount();
stop();
}
}
// If either a) or b) was true, we have diags to return but they
// may not be UDR-specific so we add a generic "Unable to
// receive reply from MXUDR" condition here.
if (diags && addUdrCondition)
{
if (*diags == NULL)
{
*diags = ComDiagsArea::allocate(diagsHeap);
}
**diags << DgSqlCode(-EXE_UDR_REPLY_ERROR);
}
}
//
// Bring down the server process. Note that this is not a forceful
// "kill" method. If the server is busy or hung then this call may not
// actually stop the process. Under normal circumstances the call to
// release() in this method will close the control connection to the
// server, the server will detect that its only client has gone away,
// and the server will exit.
//
ExUdrServer::ExUdrServerStatus ExUdrServer::stop()
{
UdrDebug1("[BEGIN ExUdrServer::stop()] %p", this);
// Release all connections that were opened for this
// Server instance. We destruct the connections that are not being
// used. The in-use connections will be freed in releaseConnection()
// when they are tried to use next time.
for ( ; freeConns_->entries(); )
{
IpcConnection *conn = freeConns_->at(0);
freeConns_->removeAt(0);
delete conn;
}
if (ipcServer_)
{
UdrDebug0(" About to release the server class instance");
//
// This call will remove the IpcServer instance from the server
// class and deallocate the IpcServer instance
//
ipcServer_->release();
ipcServer_ = NULL;
InvalidateProcessId(serverProcessId_);
}
setState(EX_UDR_NOT_STARTED);
UdrDebug1("[END ExUdrServer::stop()] %p", this);
return EX_UDR_SUCCESS;
}
ExUdrServer::ExUdrServerStatus ExUdrServer::kill(ComDiagsArea *diags)
{
UdrDebug1("[BEGIN ExUdrServer::kill()] %p", this);
short result = 0;
char asciiPhandle[300];
serverProcessId_.toAscii(asciiPhandle, 300);
UdrDebug1(" UDR Server process handle is %s", asciiPhandle);
if (!ProcessIdIsNull(serverProcessId_))
{
if (serverProcessId_.getDomain() == IPC_DOM_GUA_PHANDLE)
{
NAProcessHandle serverPhandle(
(SB_Phandle_Type *) &(serverProcessId_.getPhandle().phandle_));
Int32 guaRetcode = serverPhandle.decompose();
if (XZFIL_ERR_OK == guaRetcode)
msg_mon_stop_process_name(serverPhandle.getPhandleString());
UdrDebug1(" PROCESS_STOP_ returned %d", (Int32) result);
if (diags != NULL)
{
*diags << DgSqlCode(EXE_UDR_ATTEMPT_TO_KILL)
<< DgString0(asciiPhandle)
<< DgInt0((Int32) result);
}
}
else
{
UdrDebug0(" *** ERROR: UDR Server is not a Guardian process");
}
}
else
{
UdrDebug0(" Process handle is not valid");
}
UdrDebug1("[END ExUdrServer::kill()] %p", this);
return EX_UDR_SUCCESS;
}
IpcConnection *ExUdrServer::getUdrControlConnection() const
{
IpcConnection *result = NULL;
if (ready() && ipcServer_)
{
result = ipcServer_->getControlConnection();
if (result && result->getState() == IpcConnection::ERROR_STATE)
result = NULL;
}
return result;
}
// A free connection from freeConns_ will be moved into inUseConns_ list.
// If there is no available free connection, a new IPC Connection will
// be created.
IpcConnection *ExUdrServer::getAnIpcConnection() const
{
UdrDebug0("[BEGIN ExUdrServer::getAnIpcConnection()]");
IpcConnection *conn = NULL;
CollIndex numFreeConns = freeConns_->entries();
if (numFreeConns > 0)
{
// remove from freeList_ and add it in inUseList_
conn = freeConns_->at(numFreeConns - 1);
freeConns_->removeAt(numFreeConns - 1);
inUseConns_->insert(conn);
UdrDebug1(" An existing connection %p will be reused", conn);
}
else
{
Lng32 nowaitDepth = DEFAULT_NOWAIT_DEPTH;
#ifdef _DEBUG
char *e = getenv("UDR_NOWAIT_DEPTH");
if (e && e[0])
nowaitDepth = atol(e);
#endif
UdrDebug1(" Using a nowait depth of %d", nowaitDepth);
// create a new connection and add it in inUseList_
conn = serverProcessId_.createConnectionToServer(myIpcEnv(),
TRUE,
nowaitDepth);
// Set a flag in connection indicating whether or
// not to perform integrity checks on incoming buffers.
NABoolean trust = FALSE;
#ifdef UDR_DEBUG
if (getenv("UDR_TRUST_REPLIES"))
{
trust = TRUE;
}
#endif
conn->setTrustIncomingBuffers(trust);
UdrDebug1(" A new connection %p is created", conn);
inUseConns_->insert(conn);
}
UdrDebug0("[END ExUdrServer::getAnIpcConnection()]");
return conn;
}
// Releases a connection back to freeConns_ for later use.
//
// Note: An opened connection will never be closed unless there is a
// problem accessing the server. This might be okay in most cases.
// But it's waste of resources in an app where many CALL stmts are opened
// at one point and does not do much with them later.
// Executor can have a model where the number of free conns are limited and
// a conn will be closed when it is being released if the free conn
// limit is reached. There are several ways to set this limit. One way
// is by way of session defaults.
void ExUdrServer::releaseConnection(IpcConnection *conn)
{
if (conn == NULL)
return;
// Don't need to do anything for control connection since
// control connection is not added to these lists.
if (conn == getUdrControlConnection())
return;
if (! inUseConns_->remove(conn))
{
// If UDR Server dies, we will have NULL control connection by
// the time we come here. In that case, we don't need to assert
// because 'conn' might be control connection
if (getUdrControlConnection())
ex_assert(0, "A connection that is being released is not in use.");
}
// Conn will be deleted in the following cases
// 1. conn got error
// 2. UDR Server is in error state because some other conn got error
// 3. UDR Server is restarted after an error
if (conn->getState() == IpcConnection::ERROR_STATE ||
state_ == ExUdrServer::EX_UDR_BROKEN ||
! (conn->getOtherEnd() == serverProcessId_))
delete conn;
else
freeConns_->insert(conn);
return;
}
NABoolean ExUdrServer::isIOPending(IpcConnection *conn) const
{
NABoolean result = FALSE;
if (ipcServer_ && conn)
{
NABoolean ioPendingOnConnection =
conn->sendIOPending() || conn->receiveIOPending();
NABoolean anythingQueuedOnConnection =
(conn->numQueuedSendMessages() > 0)
|| (conn->numQueuedReceiveMessages() > 0);
if (ioPendingOnConnection || anythingQueuedOnConnection ||
conn->numReceiveCallbacksPending() > 0)
{
result = TRUE;
}
}
return result;
}
void ExUdrServer::completeUdrRequests(IpcConnection *conn,
NABoolean waitForAllIO) const
{
#ifdef UDR_DEBUG
NABoolean firstTime = TRUE;
#endif // UDR_DEBUG
NABoolean done = FALSE;
while (!done && isIOPending(conn))
{
#ifdef UDR_DEBUG
if (firstTime)
{
firstTime = FALSE;
UdrDebug0("***");
UdrDebug0("*** I/O is still pending on the UDR control connection");
}
UdrDebug0("*** Waiting for one UDR I/O to complete...");
#endif // UDR_DEBUG
// Wait on 'conn'
conn->wait(IpcInfiniteTimeout);
UdrDebug0("*** A UDR I/O has completed.");
if (!waitForAllIO)
{
done = TRUE;
}
}
#ifdef UDR_DEBUG
if (!firstTime)
{
UdrDebug0("***");
}
#endif // UDR_DEBUG
}
// Matchmaking logic to determine if this server has the requested
// attributes
NABoolean ExUdrServer::match(const Int32 &userId,
const char *options,
const char *optionDelimiters) const
{
// Two instances are considered a match if they have the same user
// identity and any of the following are true:
// - One or both instances have runtime options of "ANYTHING"
// - Both instances have runtime options of "OFF"
// - Both instances have matching runtime options and a matching
// option delimiter string
if (userId_ != userId)
{
return FALSE;
}
if (str_cmp_ne(options, "ANYTHING") == 0 ||
str_cmp_ne(options_, "ANYTHING") == 0)
{
return TRUE;
}
if (str_cmp_ne(options, "OFF") == 0 &&
str_cmp_ne(options_, "OFF") == 0)
{
return TRUE;
}
if (str_cmp_ne(options_, options) == 0 &&
str_cmp_ne(optionDelimiters_, optionDelimiters) == 0)
{
return TRUE;
}
return FALSE;
}
// -----------------------------------------------------------------------
// ExUdrServerManager
// -----------------------------------------------------------------------
ExUdrServerManager::ExUdrServerManager(IpcEnvironment* env,
ComUInt32 maxServersPerGroup)
: ipcEnvironment_(env),
maxServersPerGroup_(maxServersPerGroup),
serverPool_(myIpcHeap()),
okToRetainOneServer_(FALSE)
#ifdef UDR_DEBUG
, traceFile_(NULL)
#endif
{
//
// Create the UDR server class. This does not actually
// start any processes. Use allocateServerProcess() to
// do that.
//
// This object is copied into each ExUdrServer object.
// ExUdrServer->start() method calls allocateServerProcess()
// on this object to create a process.
//
udrServerClass_ = new (myIpcHeap())
IpcServerClass(ipcEnvironment_, IPC_SQLUDR_SERVER);
#ifdef UDR_DEBUG
if ((getenv("UDR_""DEBUG") != NULL) ||
(getenv("UDR_SERVER_MGR_DEBUG") != NULL))
{
traceFile_ = stdout;
}
#endif
}
ExUdrServerManager::~ExUdrServerManager()
{
//
// stop all the running ExUdrServer processes.
//
for (CollIndex i = 0; i < serverPool_.entries(); i++)
{
delete serverPool_[i];
}
if (udrServerClass_)
{
//
// class IpcServerClass does not have a destructor so to delete
// udrServerClass_ all we have to do is deallocate the memory from
// the IPC heap.
//
NADELETEBASIC(udrServerClass_, myIpcHeap());
}
}
//
// Returns an ExUdrServer with the requested attributes
//
// Creates a new ExUdrServer object if the number of existing servers
// with matching attributes is less than maximum number of servers
// allowed per group. Otherwise, we return one of the servers we
// already have with the lowest reference count.
//
// Successful completion of this method does not guarantee that the
// server process is actually started.
//
ExUdrServer* ExUdrServerManager::acquireUdrServer(const Int32 &userId,
const char *options,
const char *optionDelimiters,
const char *userName,
const char *userPassword,
NABoolean dedicated)
{
#ifdef UDR_DEBUG
if ((getenv("UDR_""DEBUG") != NULL) ||
(getenv("UDR_SERVER_MGR_DEBUG") != NULL))
{
traceFile_ = stdout;
}
else
{
traceFile_ = NULL;
}
#endif
UdrDebug0("[BEGIN ExUdrServerManager::acquireUdrServer()]");
UdrDebug1(" options: '%s'", options);
UdrDebug1(" delimiters: '%s'", optionDelimiters);
UdrDebug1(" Max servers per group: %u", getMaxServersPerGroup());
ExUdrServer *udrServer = NULL;
ComUInt32 lowestRefCnt = 0;
ComUInt32 numServersMatched = 0;
CollIndex entries = serverPool_.entries();
for (CollIndex i = 0; i < entries; i++)
{
ExUdrServer *s = serverPool_[i];
if (s->match(userId, options, optionDelimiters) &&
(!s->isDedicated()))
{
numServersMatched++;
if (udrServer == NULL || s->getRefCount() < lowestRefCnt)
{
udrServer = serverPool_[i];
lowestRefCnt = udrServer->getRefCount();
}
}
}
UdrDebug2(" Found %u matching server%s", numServersMatched,
numServersMatched == 1 ? "" : "s");
if (entries > 0 && numServersMatched == 0)
{
UdrDebug0(" A new group is being encountered");
UdrDebug0(" About to release idle servers...");
// We are seeing a server group for the first time, and it is not
// the only group we are currently managing. We will no longer
// treat this application as one that only requires a single UDR
// server.
okToRetainOneServer_ = FALSE;
// Now release all idle servers (those with a ref count of
// zero). The loop here traverses the list in reverse order.
CollIndex idx = entries;
while (idx--)
{
ExUdrServer *curr = serverPool_[idx];
if (curr->getRefCount() == 0)
{
delete curr;
serverPool_.removeAt(idx);
}
}
UdrDebug0(" Done releasing idle servers");
}
NABoolean reUseExistingServer = FALSE;
if (numServersMatched >= getMaxServersPerGroup())
{
//
// We hit our limit for servers in this group. We will return the
// one with the lowest reference count.
//
reUseExistingServer = TRUE;
}
// If the request is for a dedicated server and the decision is to
// reuse an existing server, make sure the selected server's reference
// count is zero, else do not reuse this server.
if(dedicated && reUseExistingServer &&
(udrServer->getRefCount() != 0))
{
reUseExistingServer = FALSE;
}
if (reUseExistingServer)
{
//
// We hit our limit for servers in this group. We will return the
// one with the lowest reference count.
//
udrServer->incrRefCount();
UdrDebug0(" No more servers can be started in this group");
UdrDebug1(" Server %p will be reused", udrServer);
}
else
{
// If the requested options were "ANYTHING" then we are actually
// going to start a server as if the user specified "OFF". So
// that future requests for "OFF" will match this new server we
// are creating, we will change "ANYTHING" to "OFF" here. Note
// that the "ANYTHING" and "OFF" strings were uppercased by the
// SQL compiler when the plan was generated.
const char *newOptions = options;
if (str_cmp_ne(options, "ANYTHING") == 0)
{
newOptions = "OFF";
}
udrServer = new (myIpcHeap()) ExUdrServer(ipcEnvironment_,
userId,
newOptions,
optionDelimiters,
userName,
userPassword,
udrServerClass_);
#ifdef UDR_DEBUG
if (traceFile_)
{
udrServer->setTraceFile(traceFile_);
}
#endif
UdrDebug1(" Created a new ExUdrServer instance %p", udrServer);
udrServer->setRefCount(1);
serverPool_.insert(udrServer);
}
UdrDebug2(" Returning ExUdrServer %p, ref count %u",
udrServer, udrServer->getRefCount());
UdrDebug0("[END ExUdrServerManager::acquireUdrServer()]");
return udrServer;
}
//
// A method to decrement the reference count on an ExUdrServer
// instance. Callers need to pass in a non-NULL argument because no
// checking is done here. When a reference count reaches zero we do
// one of two things:
//
// a) Retain the ExUdrServer instance as an idle server in our
// pool. This will be done as a performance optimization if the
// application has not yet made use of multiple UDR servers.
//
// b) Stop the server and remove it from the pool. We choose this
// option once we have detected that the application is using
// multiple UDR servers.
//
void ExUdrServerManager::releaseUdrServer(ExUdrServer *udrServer)
{
UdrDebug1("[BEGIN ExUdrServerManager::releaseUdrServer(%p)", udrServer);
NABoolean found = FALSE;
for (CollIndex i = 0; !found && i < serverPool_.entries(); i++)
{
if (udrServer == serverPool_[i])
{
found = TRUE;
udrServer->decrRefCount();
udrServer->setDedicated(FALSE);
UdrDebug2(" Found server %p. Ref count decremented to %u",
udrServer, udrServer->getRefCount());
if (udrServer->getRefCount() == 0)
{
if (okToRetainOneServer_ && serverPool_.entries() == 1)
{
// Do nothing. So far this application appears to only
// require a single UDR server so we will retain this one.
UdrDebug0(" There is one server in the pool, it will be retained");
}
else
{
delete udrServer;
serverPool_.removeAt(i);
}
}
} // if (udrServer == serverPool_[i])
} // for each server in the pool
UdrDebug1("[END ExUdrServerManager::releaseUdrServer(%p)]", udrServer);
}