| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: ExUdrServer.cpp |
| * Description: Client-side process management for UDR servers |
| * |
| * Created: 08/16/2000 |
| * Language: C++ |
| * |
| * |
| ***************************************************************************** |
| */ |
| |
| #include "ex_stdh.h" |
| #include "ExUdrServer.h" |
| #include "ExUdrClientIpc.h" |
| #include "ExpError.h" |
| #include "ExCextdecs.h" |
| #include "ComRtUtils.h" |
| #include "PortProcessCalls.h" |
| |
| #include "seabed/fs.h" |
| #include "seabed/ms.h" |
| |
| #ifdef UDR_DEBUG |
| extern const char *GetWorkRetcodeString(ExWorkProcRetcode r); |
| /* |
| static const char *GetStatusString(ExUdrServer::ExUdrServerStatus s) |
| { |
| switch (s) |
| { |
| case ExUdrServer::EX_UDR_SUCCESS: |
| return "Success"; |
| break; |
| case ExUdrServer::EX_UDR_WARNING: |
| return "Warning"; |
| break; |
| case ExUdrServer::EX_UDR_ERROR: |
| return "Error"; |
| break; |
| } |
| return "***UNKNOWN***"; |
| } */ |
| |
| #define UdrDebug0(s) \ |
| ( UdrPrintf(traceFile_,(s)) ) |
| #define UdrDebug1(s,a1) \ |
| ( UdrPrintf(traceFile_,(s),(a1)) ) |
| #define UdrDebug2(s,a1,a2) \ |
| ( UdrPrintf(traceFile_,(s),(a1),(a2)) ) |
| #define UdrDebug3(s,a1,a2,a3) \ |
| ( UdrPrintf(traceFile_,(s),(a1),(a2),(a3)) ) |
| #define UdrDebug4(s,a1,a2,a3,a4) \ |
| ( UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4)) ) |
| #define UdrDebug5(s,a1,a2,a3,a4,a5) \ |
| ( UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4),(a5)) ) |
| |
| #else |
| // |
| // Debug macros are no-ops in the release build |
| // |
| #define UdrDebug0(s) |
| #define UdrDebug1(s,a1) |
| #define UdrDebug2(s,a1,a2) |
| #define UdrDebug3(s,a1,a2,a3) |
| #define UdrDebug4(s,a1,a2,a3,a4) |
| #define UdrDebug5(s,a1,a2,a3,a4,a5) |
| |
| #endif // UDR_DEBUG |
| |
| // |
| // Helper functions allowing the ExUdrServer class and its callers to |
| // determine if a process ID is NULL and to nullify a process |
| // ID. Internally in our IpcProcessId objects we consider a valid |
| // process ID to be anything with a domain other than IPC_DOM_INVALID. |
| // The default IpcProcessId constructor sets the domain to |
| // IPC_DOM_INVALID so this constructor can be used to instantiate a |
| // NULL process ID. |
| // |
| NABoolean ProcessIdIsNull(const IpcProcessId &id) |
| { |
| NABoolean result = FALSE; |
| if (id.getDomain() == IPC_DOM_INVALID) |
| { |
| result = TRUE; |
| } |
| return result; |
| } |
| void InvalidateProcessId(IpcProcessId &id) |
| { |
| // |
| // The default IpcProcessId constructor is used to instantiate |
| // a process ID with domain IPC_DOM_INVALID. |
| // |
| IpcProcessId nullPid; |
| // |
| // Invoke the IpcProcessId assignment operator to invalidate |
| // id. Arguments to this operator are passed by reference, not on |
| // the stack. |
| // |
| id = nullPid; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // ExUdrServer |
| // ----------------------------------------------------------------------- |
| ExUdrServer::ExUdrServer(IpcEnvironment *env, |
| const Int32 &userId, |
| const char *options, |
| const char *optionDelimiters, |
| const char *userName, |
| const char *userPassword, |
| IpcServerClass *serverClass) |
| : state_(EX_UDR_NOT_STARTED), |
| ipcEnvironment_(env), |
| udrServerClass_(serverClass), |
| ipcServer_(NULL), |
| serverProcessId_(), |
| startAttempts_(0), |
| userId_(userId), |
| userName_(NULL), |
| userPassword_(NULL), |
| options_(NULL), |
| optionDelimiters_(NULL), |
| refCount_(0), |
| dedicated_(FALSE), |
| inUse_(FALSE), |
| inUseConns_(NULL), |
| freeConns_(NULL) |
| #ifdef UDR_DEBUG |
| , traceFile_(NULL) |
| #endif |
| { |
| ex_assert(options && optionDelimiters, |
| "No runtime options specified for UDR server startup"); |
| |
| CollHeap *h = myIpcHeap(); |
| |
| Int32 len = str_len(options); |
| options_ = new (h) char[len + 1]; |
| str_cpy_all(options_, options, len + 1); |
| |
| len = str_len(optionDelimiters); |
| optionDelimiters_ = new (h) char[len + 1]; |
| str_cpy_all(optionDelimiters_, optionDelimiters, len + 1); |
| |
| if (userName) |
| { |
| len = str_len(userName); |
| userName_ = new (h) char[len + 1]; |
| str_cpy_all(userName_, userName, len + 1); |
| } |
| |
| if (userPassword) |
| { |
| len = str_len(userPassword); |
| userPassword_ = new (h) char[len + 1]; |
| str_cpy_all(userPassword_, userPassword, len + 1); |
| } |
| |
| inUseConns_ = new (h) NAList<IpcConnection *>(h); |
| freeConns_ = new (h) NAList<IpcConnection *>(h); |
| } |
| |
| ExUdrServer::~ExUdrServer() |
| { |
| UdrDebug1("[BEGIN ExUdrServer destructor] %p", this); |
| |
| stop(); |
| |
| CollHeap *h = myIpcHeap(); |
| NADELETEBASIC(options_, h); |
| NADELETEBASIC(optionDelimiters_, h); |
| NADELETEBASIC(userName_, h); |
| NADELETEBASIC(userPassword_, h); |
| |
| UdrDebug1("[END ExUdrServer destructor] %p", this); |
| } |
| |
| CollHeap *ExUdrServer::myIpcHeap() const |
| { |
| return myIpcEnv()->getHeap(); |
| } |
| |
| // |
| // Bring a UDR Server process to life if one hasn't been |
| // started already |
| // |
| ExUdrServer::ExUdrServerStatus ExUdrServer::start(ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| Int64 transId, |
| IpcProcessId &newId, |
| NABoolean usesTransactions) |
| { |
| #ifdef UDR_DEBUG |
| UdrDebug1("[BEGIN ExUdrServer::start()] %p", this); |
| UdrDebug1(" Startup options '%s'", options_); |
| UdrDebug1(" Startup option delimiters '%s'", optionDelimiters_); |
| if (diags && *diags) |
| { |
| Lng32 numDiags = (*diags)->getNumber(); |
| UdrDebug1(" The diagnostics area initially contains %d entries", |
| numDiags); |
| } |
| else |
| { |
| UdrDebug0(" No diagnostics area exists yet"); |
| } |
| #endif // UDR_DEBUG |
| |
| // The newId and result variables hold our return values. We will |
| // assume failure initially and set the values to something else |
| // once we are sure we have succeeded. |
| InvalidateProcessId(newId); |
| ExUdrServerStatus result = EX_UDR_ERROR; |
| |
| if (ready()) |
| { |
| #ifdef UDR_DEBUG |
| char buf[300]; |
| serverProcessId_.toAscii(buf, 300); |
| UdrDebug1(" A server is already running. Process ID %s", buf); |
| #endif // UDR_DEBUG |
| newId = serverProcessId_; |
| result = EX_UDR_SUCCESS; |
| } |
| else |
| { |
| stop(); |
| |
| UdrDebug0(" About to start the UDR server..."); |
| |
| // Notes on UDR Server startup |
| // - We are using a nowait depth of 2 for first connection. SPJs |
| // that cannot return RS use first connection for IPC. |
| // We use nowait depth of 3 for all other connections used by SPJs |
| // that can return RS. Look at getAnIpcConnection() for code details. |
| // |
| // - By specifying IPC_CPU_DONT_CARE as the CPU number in this |
| // call to allocateServerProcess() we get the default behavior |
| // from Guardian which is to start the process on the same CPU as |
| // the caller. |
| |
| Lng32 nowaitDepth = 2; |
| |
| #ifdef _DEBUG |
| char *e = getenv("UDR_NOWAIT_DEPTH"); |
| if (e && e[0]) |
| nowaitDepth = atol(e); |
| #endif |
| UdrDebug1(" Using a nowait depth of %d", nowaitDepth); |
| |
| NABoolean waitedCreation = TRUE; |
| // co-locate the tdm_udrserv with the executor process (master or ESP) |
| // This is done for a couple of reasons: One is that since the ESPs |
| // are evenly balanced across the CPUs, this ensures an even distribution |
| // of the tdm_udrservs as well, probably better than a random distribution. |
| // The second reason is that for certain maintenance UDFs (only example |
| // so far is udf(event_log_reader())), we must ensure that we run one |
| // tdm_udrserv on each node of the cluster, and we do that by starting |
| // one ESP per node. |
| IpcCpuNum collocatedCPU = |
| myIpcEnv()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).getCpuNum(); |
| |
| ipcServer_ = |
| udrServerClass_->allocateServerProcess(diags, |
| diagsHeap, |
| NULL, |
| collocatedCPU, |
| 1, // espLevel (not relevant for UDR servers) |
| usesTransactions, |
| waitedCreation, |
| nowaitDepth); |
| |
| #ifdef UDR_DEBUG |
| UdrDebug1(" allocateServerProcess() returned %p", ipcServer_); |
| if (diags && *diags) |
| { |
| Lng32 numDiags = (*diags)->getNumber(); |
| UdrDebug1(" The diagnostics area contains %d entries", |
| numDiags); |
| } |
| else |
| { |
| UdrDebug0(" No diagnostics area exists"); |
| } |
| #endif // UDR_DEBUG |
| |
| startAttempts_++; |
| |
| if (diags && *diags) |
| { |
| Lng32 sqlcode = ((*diags)->mainSQLCODE()); |
| if (sqlcode < 0) |
| { |
| // |
| // An error occurrred |
| // |
| // $$$$ |
| // Need to verify whether we can release the server class |
| // instance Looks like allocateServerProcess() can return |
| // non-NULL and also generate diagnostics. If that happens and |
| // we call ipcServer_->release() an assertion fails because |
| // ipcServer_ does not yet have a valid process handle. The |
| // assertion is in IpcProcessId::getPhandle(). |
| // |
| UdrDebug0(" ***"); |
| UdrDebug1(" *** WARNING: Errors occurred. Main SQLCODE is %d", |
| sqlcode); |
| UdrDebug0(" ***"); |
| ipcServer_ = NULL; |
| } |
| } |
| |
| if (ipcServer_ && ipcServer_->getControlConnection()) |
| { |
| // We enter this block once a server process has successfully |
| // been started. |
| |
| setState(EX_UDR_READY); |
| |
| // Record the process ID |
| serverProcessId_ = ipcServer_->getControlConnection()->getOtherEnd(); |
| |
| #ifdef UDR_DEBUG |
| char buf[300]; |
| serverProcessId_.toAscii(buf, 300); |
| UdrDebug1(" A new server was started. Process ID %s", buf); |
| #endif // UDR_DEBUG |
| |
| // Set a flag in the control connection indicating whether or |
| // not to perform integrity checks on incoming buffers. |
| NABoolean trust = FALSE; |
| #ifdef UDR_DEBUG |
| if (getenv("UDR_TRUST_REPLIES")) |
| { |
| trust = TRUE; |
| } |
| #endif |
| ipcServer_->getControlConnection()->setTrustIncomingBuffers(trust); |
| |
| |
| // Send down any requested runtime options. Right now the only |
| // options we support are JVM startup options. |
| sendStartupOptions(diags, diagsHeap, transId); |
| |
| } // if (ipcServer_ && ipcServer_->getControlConnection()) |
| |
| if (ready()) |
| { |
| newId = serverProcessId_; |
| result = EX_UDR_SUCCESS; |
| } |
| else |
| { |
| UdrDebug0(" Unable to start the UDR server"); |
| |
| // |
| // If ready() is not TRUE and no diagnostics have been created yet, |
| // create the generic "Unable to receive reply from MXUDR" diagnostic |
| // here. |
| // |
| Lng32 numDiags = 0; |
| if (diags && *diags) |
| { |
| numDiags = (*diags)->getNumber(); |
| } |
| if (numDiags == 0) |
| { |
| UdrDebug0(" ***"); |
| UdrDebug0(" *** WARNING: Errors occurred but no diagnostics created"); |
| UdrDebug0(" ***"); |
| if (diags) |
| { |
| if (!(*diags)) |
| { |
| *diags = ComDiagsArea::allocate(diagsHeap); |
| } |
| **diags << DgSqlCode(-EXE_UDR_REPLY_ERROR); |
| } |
| } |
| |
| stop(); |
| result = EX_UDR_ERROR; |
| |
| } // if (!ready()) |
| |
| } // if (ready()) ... else ... |
| |
| UdrDebug1("[END ExUdrServer::start()] %p", this); |
| return result; |
| } |
| |
| void ExUdrServer::sendStartupOptions(ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| Int64 transId) |
| { |
| // Send down any requested runtime options. Right now the only |
| // options we support are JVM startup options. To do the work we |
| // will allocate a stream and a message on the IPC heap. The |
| // message gets cleaned up eventually when its reference count |
| // reaches zero. The stream takes care of its own cleanup by |
| // putting itself on the IPC environment's list of "completed" |
| // streams, and cleanup of that list is always guaranteed to be |
| // done at a safe time. |
| |
| // This method should only be called after startup was successful |
| ex_assert(ipcServer_ && ipcServer_->getControlConnection(), |
| "Do not call this method without a running server"); |
| |
| UdrDebug0(" About to send startup options to the server"); |
| NAMemory *ipcHeap = myIpcHeap(); |
| |
| // If the options_ is set to OFF or ANYTHING and there is |
| // no userName_ then we don't have anything to do here |
| if (userName_ == NULL && |
| (str_cmp_ne(options_, "OFF") == 0 || str_cmp_ne(options_, "ANYTHING") == 0)) |
| { |
| return ; |
| } |
| |
| // Send the user name also in startup options as |
| // "-Dsqlmx.udr.username=userName_" |
| char *userNameOption = NULL; |
| Int32 userNameOptionLen = 0; |
| |
| if (userName_) |
| { |
| const char *userNamePrefix = "-Dsqlmx.udr.username="; |
| Int32 userNamePrefixLen = str_len(userNamePrefix); |
| Int32 userNameLen = str_len(userName_); |
| userNameOption = new (ipcHeap) char[userNamePrefixLen + userNameLen + 1]; |
| str_sprintf(userNameOption, "%s%s", userNamePrefix, userName_); |
| userNameOptionLen = str_len(userNameOption); |
| } |
| |
| // Send the user password also in startup options as |
| // "-Dsqlmx.udr.password=userPassword_" |
| char *passwordOption = NULL; |
| Int32 passwordOptionLen = 0; |
| |
| if (userPassword_) |
| { |
| const char *passwordPrefix = "-Dsqlmx.udr.password="; |
| Int32 passwordPrefixLen = str_len(passwordPrefix); |
| Int32 passwordLen = str_len(userPassword_); |
| passwordOption = new (ipcHeap) char[passwordPrefixLen + passwordLen + 1]; |
| str_sprintf(passwordOption, "%s%s", passwordPrefix, userPassword_); |
| passwordOptionLen = str_len(passwordOption); |
| } |
| |
| char *optionsToSend = NULL; |
| Int32 delimiterLen = 1; |
| Int32 tmpLen = 0; |
| if (str_cmp_ne(options_, "OFF") != 0 && |
| str_cmp_ne(options_, "ANYTHING") != 0) |
| { |
| if (userName_) |
| { |
| Int32 optionsLen = str_len(options_); |
| Int32 len = optionsLen + delimiterLen + userNameOptionLen |
| + delimiterLen + passwordOptionLen; |
| |
| optionsToSend = new (ipcHeap) char[len + 1]; |
| str_cpy_all(optionsToSend, options_, optionsLen); |
| optionsToSend[optionsLen] = optionDelimiters_[0]; |
| |
| // Copy the user name |
| str_cpy_all(optionsToSend + optionsLen + delimiterLen, |
| userNameOption, |
| userNameOptionLen); |
| tmpLen = optionsLen + delimiterLen + userNameOptionLen; |
| optionsToSend[tmpLen] = optionDelimiters_[0]; |
| |
| // Copy the user password |
| str_cpy_all(optionsToSend + tmpLen + delimiterLen, |
| passwordOption, |
| passwordOptionLen); |
| optionsToSend[len] = '\0'; |
| } |
| else |
| { |
| Int32 optionsLen = str_len(options_); |
| optionsToSend = new (ipcHeap) char[optionsLen + 1]; |
| str_cpy_all(optionsToSend, options_, optionsLen + 1); |
| } |
| } |
| else |
| { |
| if (userName_) |
| { |
| if (userPassword_) |
| { |
| Int32 len = userNameOptionLen + delimiterLen + passwordOptionLen; |
| |
| optionsToSend = new (ipcHeap) char[len + 1]; |
| |
| // Copy the user name |
| str_cpy_all(optionsToSend, userNameOption, userNameOptionLen); |
| optionsToSend[userNameOptionLen] = ' '; |
| |
| // Copy the user password |
| str_cpy_all(optionsToSend + userNameOptionLen + delimiterLen, |
| passwordOption, passwordOptionLen); |
| optionsToSend[len] = '\0'; |
| } |
| else |
| { |
| optionsToSend = new (ipcHeap) char[userNameOptionLen + 1]; |
| |
| // Copy the user name |
| str_cpy_all(optionsToSend, userNameOption, userNameOptionLen + 1); |
| } |
| } |
| else |
| { |
| // No need to send any options. We never come here because this |
| // case is already checked above. |
| return; |
| } |
| } |
| |
| NABoolean isTransactional = (transId == -1 ? FALSE : TRUE); |
| |
| UdrClientControlStream *stream = new (ipcHeap) |
| UdrClientControlStream(myIpcEnv(), |
| NULL, // tcb |
| NULL, // stmt globals |
| TRUE, // keep diags for caller |
| isTransactional); |
| |
| #ifdef UDR_DEBUG |
| if (traceFile_) |
| { |
| stream->setTraceFile(traceFile_); |
| } |
| #endif |
| |
| UdrSessionMsg *msg = new (ipcHeap) |
| UdrSessionMsg(UdrSessionMsg::UDR_SESSION_TYPE_JAVA_OPTIONS, |
| 0, ipcHeap); |
| msg->addString(optionsToSend); |
| msg->addString(optionDelimiters_); |
| |
| IpcConnection *conn = ipcServer_->getControlConnection(); |
| stream->addRecipient(conn); |
| *stream << *msg; |
| stream->send(TRUE, // TRUE indicates a waited send |
| transId); |
| |
| msg->decrRefCount(); |
| NADELETEBASIC(userNameOption, ipcHeap); |
| NADELETEBASIC(passwordOption, ipcHeap); |
| NADELETEBASIC(optionsToSend, ipcHeap); |
| |
| //-------------------------------------------------------------------- |
| // We just completed a waited send of the startup options. There |
| // are a couple of error checks we need to perform now. |
| // a) The stream may have encountered IPC errors and put itself into |
| // an error state. |
| // b) The IPC could have been successful but the server may have |
| // returned SQL diags in its reply. Our UDR-specific message |
| // stream subclass caches these diags for us. |
| //-------------------------------------------------------------------- |
| |
| // This boolean will track whether we need to add a generic |
| // "Unable to receive reply from MXUDR" condition to the diags |
| // area. |
| NABoolean addUdrCondition = FALSE; |
| |
| // a) Did the stream put itself into an error state? |
| if (stream->getErrorInfo() != 0) |
| { |
| UdrDebug0(" The message stream encountered errors"); |
| addUdrCondition = TRUE; |
| if (diags) |
| { |
| if (*diags == NULL) |
| { |
| *diags = ComDiagsArea::allocate(diagsHeap); |
| } |
| conn->populateDiagsArea(*diags, diagsHeap); |
| } |
| stop(); |
| } |
| else |
| { |
| // b) Did the server return diags? |
| ComDiagsArea *diagsFromServer = stream->extractUdrDiags(); |
| if (diagsFromServer) |
| { |
| UdrDebug0(" The server returned diagnostics in its reply"); |
| if (diags) |
| { |
| addUdrCondition = TRUE; |
| if (*diags == NULL) |
| { |
| *diags = ComDiagsArea::allocate(diagsHeap); |
| } |
| (*diags)->mergeAfter(*diagsFromServer); |
| } |
| |
| diagsFromServer->decrRefCount(); |
| stop(); |
| } |
| } |
| |
| // If either a) or b) was true, we have diags to return but they |
| // may not be UDR-specific so we add a generic "Unable to |
| // receive reply from MXUDR" condition here. |
| if (diags && addUdrCondition) |
| { |
| if (*diags == NULL) |
| { |
| *diags = ComDiagsArea::allocate(diagsHeap); |
| } |
| **diags << DgSqlCode(-EXE_UDR_REPLY_ERROR); |
| } |
| |
| } |
| |
| // |
| // Bring down the server process. Note that this is not a forceful |
| // "kill" method. If the server is busy or hung then this call may not |
| // actually stop the process. Under normal circumstances the call to |
| // release() in this method will close the control connection to the |
| // server, the server will detect that its only client has gone away, |
| // and the server will exit. |
| // |
| ExUdrServer::ExUdrServerStatus ExUdrServer::stop() |
| { |
| UdrDebug1("[BEGIN ExUdrServer::stop()] %p", this); |
| |
| // Release all connections that were opened for this |
| // Server instance. We destruct the connections that are not being |
| // used. The in-use connections will be freed in releaseConnection() |
| // when they are tried to use next time. |
| for ( ; freeConns_->entries(); ) |
| { |
| IpcConnection *conn = freeConns_->at(0); |
| freeConns_->removeAt(0); |
| delete conn; |
| } |
| |
| if (ipcServer_) |
| { |
| UdrDebug0(" About to release the server class instance"); |
| // |
| // This call will remove the IpcServer instance from the server |
| // class and deallocate the IpcServer instance |
| // |
| ipcServer_->release(); |
| ipcServer_ = NULL; |
| InvalidateProcessId(serverProcessId_); |
| } |
| |
| setState(EX_UDR_NOT_STARTED); |
| |
| UdrDebug1("[END ExUdrServer::stop()] %p", this); |
| return EX_UDR_SUCCESS; |
| } |
| |
| ExUdrServer::ExUdrServerStatus ExUdrServer::kill(ComDiagsArea *diags) |
| { |
| UdrDebug1("[BEGIN ExUdrServer::kill()] %p", this); |
| short result = 0; |
| |
| char asciiPhandle[300]; |
| serverProcessId_.toAscii(asciiPhandle, 300); |
| UdrDebug1(" UDR Server process handle is %s", asciiPhandle); |
| |
| if (!ProcessIdIsNull(serverProcessId_)) |
| { |
| if (serverProcessId_.getDomain() == IPC_DOM_GUA_PHANDLE) |
| { |
| NAProcessHandle serverPhandle( |
| (SB_Phandle_Type *) &(serverProcessId_.getPhandle().phandle_)); |
| Int32 guaRetcode = serverPhandle.decompose(); |
| if (XZFIL_ERR_OK == guaRetcode) |
| msg_mon_stop_process_name(serverPhandle.getPhandleString()); |
| UdrDebug1(" PROCESS_STOP_ returned %d", (Int32) result); |
| if (diags != NULL) |
| { |
| *diags << DgSqlCode(EXE_UDR_ATTEMPT_TO_KILL) |
| << DgString0(asciiPhandle) |
| << DgInt0((Int32) result); |
| } |
| } |
| else |
| { |
| UdrDebug0(" *** ERROR: UDR Server is not a Guardian process"); |
| } |
| } |
| else |
| { |
| UdrDebug0(" Process handle is not valid"); |
| } |
| |
| UdrDebug1("[END ExUdrServer::kill()] %p", this); |
| return EX_UDR_SUCCESS; |
| } |
| |
| IpcConnection *ExUdrServer::getUdrControlConnection() const |
| { |
| IpcConnection *result = NULL; |
| if (ready() && ipcServer_) |
| { |
| result = ipcServer_->getControlConnection(); |
| |
| if (result && result->getState() == IpcConnection::ERROR_STATE) |
| result = NULL; |
| } |
| return result; |
| } |
| |
| // A free connection from freeConns_ will be moved into inUseConns_ list. |
| // If there is no available free connection, a new IPC Connection will |
| // be created. |
| IpcConnection *ExUdrServer::getAnIpcConnection() const |
| { |
| UdrDebug0("[BEGIN ExUdrServer::getAnIpcConnection()]"); |
| |
| IpcConnection *conn = NULL; |
| CollIndex numFreeConns = freeConns_->entries(); |
| |
| if (numFreeConns > 0) |
| { |
| // remove from freeList_ and add it in inUseList_ |
| conn = freeConns_->at(numFreeConns - 1); |
| freeConns_->removeAt(numFreeConns - 1); |
| inUseConns_->insert(conn); |
| |
| UdrDebug1(" An existing connection %p will be reused", conn); |
| } |
| else |
| { |
| Lng32 nowaitDepth = DEFAULT_NOWAIT_DEPTH; |
| |
| #ifdef _DEBUG |
| char *e = getenv("UDR_NOWAIT_DEPTH"); |
| if (e && e[0]) |
| nowaitDepth = atol(e); |
| #endif |
| UdrDebug1(" Using a nowait depth of %d", nowaitDepth); |
| |
| // create a new connection and add it in inUseList_ |
| conn = serverProcessId_.createConnectionToServer(myIpcEnv(), |
| TRUE, |
| nowaitDepth); |
| |
| // Set a flag in connection indicating whether or |
| // not to perform integrity checks on incoming buffers. |
| NABoolean trust = FALSE; |
| #ifdef UDR_DEBUG |
| if (getenv("UDR_TRUST_REPLIES")) |
| { |
| trust = TRUE; |
| } |
| #endif |
| |
| conn->setTrustIncomingBuffers(trust); |
| |
| |
| UdrDebug1(" A new connection %p is created", conn); |
| inUseConns_->insert(conn); |
| } |
| |
| UdrDebug0("[END ExUdrServer::getAnIpcConnection()]"); |
| |
| return conn; |
| } |
| |
| // Releases a connection back to freeConns_ for later use. |
| // |
| // Note: An opened connection will never be closed unless there is a |
| // problem accessing the server. This might be okay in most cases. |
| // But it's waste of resources in an app where many CALL stmts are opened |
| // at one point and does not do much with them later. |
| // Executor can have a model where the number of free conns are limited and |
| // a conn will be closed when it is being released if the free conn |
| // limit is reached. There are several ways to set this limit. One way |
| // is by way of session defaults. |
| void ExUdrServer::releaseConnection(IpcConnection *conn) |
| { |
| if (conn == NULL) |
| return; |
| |
| // Don't need to do anything for control connection since |
| // control connection is not added to these lists. |
| if (conn == getUdrControlConnection()) |
| return; |
| |
| if (! inUseConns_->remove(conn)) |
| { |
| // If UDR Server dies, we will have NULL control connection by |
| // the time we come here. In that case, we don't need to assert |
| // because 'conn' might be control connection |
| if (getUdrControlConnection()) |
| ex_assert(0, "A connection that is being released is not in use."); |
| } |
| |
| // Conn will be deleted in the following cases |
| // 1. conn got error |
| // 2. UDR Server is in error state because some other conn got error |
| // 3. UDR Server is restarted after an error |
| if (conn->getState() == IpcConnection::ERROR_STATE || |
| state_ == ExUdrServer::EX_UDR_BROKEN || |
| ! (conn->getOtherEnd() == serverProcessId_)) |
| delete conn; |
| else |
| freeConns_->insert(conn); |
| |
| return; |
| } |
| |
| NABoolean ExUdrServer::isIOPending(IpcConnection *conn) const |
| { |
| NABoolean result = FALSE; |
| if (ipcServer_ && conn) |
| { |
| NABoolean ioPendingOnConnection = |
| conn->sendIOPending() || conn->receiveIOPending(); |
| NABoolean anythingQueuedOnConnection = |
| (conn->numQueuedSendMessages() > 0) |
| || (conn->numQueuedReceiveMessages() > 0); |
| if (ioPendingOnConnection || anythingQueuedOnConnection || |
| conn->numReceiveCallbacksPending() > 0) |
| { |
| result = TRUE; |
| } |
| } |
| return result; |
| } |
| |
| void ExUdrServer::completeUdrRequests(IpcConnection *conn, |
| NABoolean waitForAllIO) const |
| { |
| #ifdef UDR_DEBUG |
| NABoolean firstTime = TRUE; |
| #endif // UDR_DEBUG |
| NABoolean done = FALSE; |
| |
| while (!done && isIOPending(conn)) |
| { |
| #ifdef UDR_DEBUG |
| if (firstTime) |
| { |
| firstTime = FALSE; |
| UdrDebug0("***"); |
| UdrDebug0("*** I/O is still pending on the UDR control connection"); |
| } |
| UdrDebug0("*** Waiting for one UDR I/O to complete..."); |
| #endif // UDR_DEBUG |
| |
| // Wait on 'conn' |
| conn->wait(IpcInfiniteTimeout); |
| UdrDebug0("*** A UDR I/O has completed."); |
| |
| if (!waitForAllIO) |
| { |
| done = TRUE; |
| } |
| } |
| |
| #ifdef UDR_DEBUG |
| if (!firstTime) |
| { |
| UdrDebug0("***"); |
| } |
| #endif // UDR_DEBUG |
| |
| } |
| |
| // Matchmaking logic to determine if this server has the requested |
| // attributes |
| NABoolean ExUdrServer::match(const Int32 &userId, |
| const char *options, |
| const char *optionDelimiters) const |
| { |
| // Two instances are considered a match if they have the same user |
| // identity and any of the following are true: |
| // - One or both instances have runtime options of "ANYTHING" |
| // - Both instances have runtime options of "OFF" |
| // - Both instances have matching runtime options and a matching |
| // option delimiter string |
| |
| if (userId_ != userId) |
| { |
| return FALSE; |
| } |
| |
| if (str_cmp_ne(options, "ANYTHING") == 0 || |
| str_cmp_ne(options_, "ANYTHING") == 0) |
| { |
| return TRUE; |
| } |
| |
| if (str_cmp_ne(options, "OFF") == 0 && |
| str_cmp_ne(options_, "OFF") == 0) |
| { |
| return TRUE; |
| } |
| |
| if (str_cmp_ne(options_, options) == 0 && |
| str_cmp_ne(optionDelimiters_, optionDelimiters) == 0) |
| { |
| return TRUE; |
| } |
| |
| return FALSE; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // ExUdrServerManager |
| // ----------------------------------------------------------------------- |
| ExUdrServerManager::ExUdrServerManager(IpcEnvironment* env, |
| ComUInt32 maxServersPerGroup) |
| : ipcEnvironment_(env), |
| maxServersPerGroup_(maxServersPerGroup), |
| serverPool_(myIpcHeap()), |
| okToRetainOneServer_(FALSE) |
| #ifdef UDR_DEBUG |
| , traceFile_(NULL) |
| #endif |
| { |
| // |
| // Create the UDR server class. This does not actually |
| // start any processes. Use allocateServerProcess() to |
| // do that. |
| // |
| // This object is copied into each ExUdrServer object. |
| // ExUdrServer->start() method calls allocateServerProcess() |
| // on this object to create a process. |
| // |
| udrServerClass_ = new (myIpcHeap()) |
| IpcServerClass(ipcEnvironment_, IPC_SQLUDR_SERVER); |
| |
| #ifdef UDR_DEBUG |
| if ((getenv("UDR_""DEBUG") != NULL) || |
| (getenv("UDR_SERVER_MGR_DEBUG") != NULL)) |
| { |
| traceFile_ = stdout; |
| } |
| #endif |
| } |
| |
| ExUdrServerManager::~ExUdrServerManager() |
| { |
| // |
| // stop all the running ExUdrServer processes. |
| // |
| for (CollIndex i = 0; i < serverPool_.entries(); i++) |
| { |
| delete serverPool_[i]; |
| } |
| |
| if (udrServerClass_) |
| { |
| // |
| // class IpcServerClass does not have a destructor so to delete |
| // udrServerClass_ all we have to do is deallocate the memory from |
| // the IPC heap. |
| // |
| NADELETEBASIC(udrServerClass_, myIpcHeap()); |
| } |
| } |
| |
| // |
| // Returns an ExUdrServer with the requested attributes |
| // |
| // Creates a new ExUdrServer object if the number of existing servers |
| // with matching attributes is less than maximum number of servers |
| // allowed per group. Otherwise, we return one of the servers we |
| // already have with the lowest reference count. |
| // |
| // Successful completion of this method does not guarantee that the |
| // server process is actually started. |
| // |
| ExUdrServer* ExUdrServerManager::acquireUdrServer(const Int32 &userId, |
| const char *options, |
| const char *optionDelimiters, |
| const char *userName, |
| const char *userPassword, |
| NABoolean dedicated) |
| { |
| #ifdef UDR_DEBUG |
| if ((getenv("UDR_""DEBUG") != NULL) || |
| (getenv("UDR_SERVER_MGR_DEBUG") != NULL)) |
| { |
| traceFile_ = stdout; |
| } |
| else |
| { |
| traceFile_ = NULL; |
| } |
| #endif |
| |
| UdrDebug0("[BEGIN ExUdrServerManager::acquireUdrServer()]"); |
| UdrDebug1(" options: '%s'", options); |
| UdrDebug1(" delimiters: '%s'", optionDelimiters); |
| UdrDebug1(" Max servers per group: %u", getMaxServersPerGroup()); |
| |
| ExUdrServer *udrServer = NULL; |
| ComUInt32 lowestRefCnt = 0; |
| ComUInt32 numServersMatched = 0; |
| CollIndex entries = serverPool_.entries(); |
| |
| for (CollIndex i = 0; i < entries; i++) |
| { |
| ExUdrServer *s = serverPool_[i]; |
| if (s->match(userId, options, optionDelimiters) && |
| (!s->isDedicated())) |
| { |
| numServersMatched++; |
| if (udrServer == NULL || s->getRefCount() < lowestRefCnt) |
| { |
| udrServer = serverPool_[i]; |
| lowestRefCnt = udrServer->getRefCount(); |
| } |
| } |
| } |
| |
| UdrDebug2(" Found %u matching server%s", numServersMatched, |
| numServersMatched == 1 ? "" : "s"); |
| |
| if (entries > 0 && numServersMatched == 0) |
| { |
| UdrDebug0(" A new group is being encountered"); |
| UdrDebug0(" About to release idle servers..."); |
| |
| // We are seeing a server group for the first time, and it is not |
| // the only group we are currently managing. We will no longer |
| // treat this application as one that only requires a single UDR |
| // server. |
| okToRetainOneServer_ = FALSE; |
| |
| // Now release all idle servers (those with a ref count of |
| // zero). The loop here traverses the list in reverse order. |
| CollIndex idx = entries; |
| while (idx--) |
| { |
| ExUdrServer *curr = serverPool_[idx]; |
| if (curr->getRefCount() == 0) |
| { |
| delete curr; |
| serverPool_.removeAt(idx); |
| } |
| } |
| UdrDebug0(" Done releasing idle servers"); |
| } |
| |
| NABoolean reUseExistingServer = FALSE; |
| |
| if (numServersMatched >= getMaxServersPerGroup()) |
| { |
| // |
| // We hit our limit for servers in this group. We will return the |
| // one with the lowest reference count. |
| // |
| reUseExistingServer = TRUE; |
| } |
| |
| // If the request is for a dedicated server and the decision is to |
| // reuse an existing server, make sure the selected server's reference |
| // count is zero, else do not reuse this server. |
| if(dedicated && reUseExistingServer && |
| (udrServer->getRefCount() != 0)) |
| { |
| reUseExistingServer = FALSE; |
| } |
| |
| |
| if (reUseExistingServer) |
| { |
| // |
| // We hit our limit for servers in this group. We will return the |
| // one with the lowest reference count. |
| // |
| udrServer->incrRefCount(); |
| UdrDebug0(" No more servers can be started in this group"); |
| UdrDebug1(" Server %p will be reused", udrServer); |
| } |
| else |
| { |
| // If the requested options were "ANYTHING" then we are actually |
| // going to start a server as if the user specified "OFF". So |
| // that future requests for "OFF" will match this new server we |
| // are creating, we will change "ANYTHING" to "OFF" here. Note |
| // that the "ANYTHING" and "OFF" strings were uppercased by the |
| // SQL compiler when the plan was generated. |
| const char *newOptions = options; |
| if (str_cmp_ne(options, "ANYTHING") == 0) |
| { |
| newOptions = "OFF"; |
| } |
| |
| udrServer = new (myIpcHeap()) ExUdrServer(ipcEnvironment_, |
| userId, |
| newOptions, |
| optionDelimiters, |
| userName, |
| userPassword, |
| udrServerClass_); |
| #ifdef UDR_DEBUG |
| if (traceFile_) |
| { |
| udrServer->setTraceFile(traceFile_); |
| } |
| #endif |
| |
| UdrDebug1(" Created a new ExUdrServer instance %p", udrServer); |
| |
| udrServer->setRefCount(1); |
| serverPool_.insert(udrServer); |
| } |
| |
| UdrDebug2(" Returning ExUdrServer %p, ref count %u", |
| udrServer, udrServer->getRefCount()); |
| UdrDebug0("[END ExUdrServerManager::acquireUdrServer()]"); |
| |
| return udrServer; |
| } |
| |
| // |
| // A method to decrement the reference count on an ExUdrServer |
| // instance. Callers need to pass in a non-NULL argument because no |
| // checking is done here. When a reference count reaches zero we do |
| // one of two things: |
| // |
| // a) Retain the ExUdrServer instance as an idle server in our |
| // pool. This will be done as a performance optimization if the |
| // application has not yet made use of multiple UDR servers. |
| // |
| // b) Stop the server and remove it from the pool. We choose this |
| // option once we have detected that the application is using |
| // multiple UDR servers. |
| // |
| void ExUdrServerManager::releaseUdrServer(ExUdrServer *udrServer) |
| { |
| UdrDebug1("[BEGIN ExUdrServerManager::releaseUdrServer(%p)", udrServer); |
| |
| NABoolean found = FALSE; |
| for (CollIndex i = 0; !found && i < serverPool_.entries(); i++) |
| { |
| if (udrServer == serverPool_[i]) |
| { |
| found = TRUE; |
| udrServer->decrRefCount(); |
| udrServer->setDedicated(FALSE); |
| |
| UdrDebug2(" Found server %p. Ref count decremented to %u", |
| udrServer, udrServer->getRefCount()); |
| |
| if (udrServer->getRefCount() == 0) |
| { |
| if (okToRetainOneServer_ && serverPool_.entries() == 1) |
| { |
| // Do nothing. So far this application appears to only |
| // require a single UDR server so we will retain this one. |
| UdrDebug0(" There is one server in the pool, it will be retained"); |
| } |
| else |
| { |
| delete udrServer; |
| serverPool_.removeAt(i); |
| } |
| } |
| |
| } // if (udrServer == serverPool_[i]) |
| } // for each server in the pool |
| |
| UdrDebug1("[END ExUdrServerManager::releaseUdrServer(%p)]", udrServer); |
| } |
| |