| // ********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // File: ExCancel.h |
| // Description: Class declaration for ExCancelTdb and ExCancelTcb. |
| // |
| // Created: Oct 15, 2009 |
| // ********************************************************************** |
| #include "Platform.h" |
| |
| #include "ex_stdh.h" |
| #include "ComTdb.h" |
| #include "ex_tcb.h" |
| #include "ExpError.h" |
| #include "ExCancel.h" |
| #include "SqlStats.h" |
| #include "Globals.h" |
| #include "Context.h" |
| #include "ex_exe_stmt_globals.h" |
| #include "ComCextdecs.h" |
| #include "seabed/ms.h" // msg_mon_get_process_info |
| |
| ///////////////////////////////////////////////////////////////////////// |
| // Methods for ExCancelTdb |
| ///////////////////////////////////////////////////////////////////////// |
| |
| ex_tcb * ExCancelTdb::build(ex_globals * glob) |
| { |
| ExMasterStmtGlobals * exe_glob = glob->castToExExeStmtGlobals()-> |
| castToExMasterStmtGlobals(); |
| |
| ex_assert(exe_glob,"Cancel operator must be in master."); |
| |
| ExCancelTcb *cancel_tcb = |
| new(exe_glob->getSpace()) ExCancelTcb(*this, exe_glob); |
| |
| ex_assert(cancel_tcb, "Error building ExCancelTcb."); |
| |
| // Add subtasks to the scheduler. |
| cancel_tcb->registerSubtasks(); |
| |
| return (cancel_tcb); |
| } |
| |
| |
| ///////////////////////////////////////////////////////////////////////// |
| // Methods for ExCancelTcb |
| ///////////////////////////////////////////////////////////////////////// |
| |
| ExCancelTcb::ExCancelTcb(const ExCancelTdb & cancel_tdb, ex_globals *glob) : |
| ex_tcb(cancel_tdb, 1, glob) |
| , step_(NOT_STARTED) |
| , ioSubtask_(NULL) |
| , cbServer_(NULL) |
| , cancelStream_(NULL) |
| , cpu_ (-1) |
| , pid_ (-1) |
| , retryQidNotActive_(false) |
| , retryCount_(0) |
| { |
| nodeName_[0] = '\0'; |
| |
| allocateParentQueues(qparent_, |
| FALSE ); // don't alloc pstate. |
| |
| retryQidNotActive_ = (getenv("SQLMX_REGRESS") != NULL); |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| // register tcb for work |
| |
| void ExCancelTcb::registerSubtasks() |
| { |
| ex_tcb::registerSubtasks(); |
| |
| // register a non-queue event for the IPC with the send top node |
| ioSubtask_ = |
| getGlobals()->getScheduler()->registerNonQueueSubtask(sWork,this); |
| } |
| |
| ExCancelTcb::~ExCancelTcb() |
| { |
| freeResources(); |
| } |
| |
| void ExCancelTcb::freeResources() |
| { |
| if (qparent_.up) |
| { |
| delete qparent_.up; |
| qparent_.up = NULL; |
| } |
| if (qparent_.down) |
| { |
| delete qparent_.down; |
| qparent_.down = NULL; |
| } |
| ex_assert(cancelStream_ == NULL, "freeResources called before step_ DONE."); |
| ex_assert(cbServer_ == NULL, "freeResources called before step_ DONE."); |
| } |
| |
| ExWorkProcRetcode ExCancelTcb::work() |
| { |
| |
| ExMasterStmtGlobals *masterGlobals = |
| getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals(); |
| |
| CliGlobals *cliGlobals = masterGlobals->getCliGlobals(); |
| |
| while ((qparent_.down->isEmpty() == FALSE) && |
| (qparent_.up->isFull() == FALSE)) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = DONE; |
| else |
| { |
| retryCount_ = 0; |
| // Priv checking is done during compilation. To support |
| // REVOKE, prevent a prepared CANCEL/SUSPEND/ACTIVATE |
| // that was compiled more than 1 second ago from executing |
| // by raising the 8734 error to force an AQR. |
| Int64 microSecondsSinceCompile = NA_JulianTimestamp() - |
| masterGlobals->getStatement()->getCompileEndTime(); |
| |
| if (microSecondsSinceCompile > 1000*1000) |
| { |
| |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| *diagsArea << DgSqlCode(-CLI_INVALID_QUERY_PRIVS); |
| reportError(diagsArea); |
| step_ = DONE; |
| break; |
| } |
| |
| // Figure out which MXSSMP broker to use. |
| if (cancelTdb().getAction() == ComTdbCancel::CancelByPname) |
| { |
| int nid = -1; |
| int rc = msg_mon_get_process_info(cancelTdb().getCancelPname(), |
| &nid, &pid_); |
| switch (rc) |
| { |
| case XZFIL_ERR_OK: |
| cpu_ = (short) nid; |
| break; |
| case XZFIL_ERR_NOTFOUND: |
| case XZFIL_ERR_BADNAME: |
| case XZFIL_ERR_NOSUCHDEV: |
| { |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| *diagsArea << DgSqlCode(-EXE_CANCEL_PROCESS_NOT_FOUND); |
| *diagsArea << DgString0(cancelTdb().getCancelPname()); |
| reportError(diagsArea); |
| |
| step_ = DONE; |
| break; |
| } |
| default: |
| { |
| char buf[200]; |
| str_sprintf(buf, "Unexpected error %d returned from " |
| "msg_mon_get_process_info", rc); |
| ex_assert(0, buf); |
| } |
| } |
| if (step_ != NOT_STARTED) |
| break; |
| } |
| else if (cancelTdb().getAction() == ComTdbCancel::CancelByNidPid) |
| { |
| cpu_ = (short) cancelTdb().getCancelNid(); |
| pid_ = cancelTdb().getCancelPid(); |
| |
| // check that process exists, if not report error. |
| char processName[MS_MON_MAX_PROCESS_NAME]; |
| int rc = msg_mon_get_process_name(cpu_, pid_, processName); |
| if (XZFIL_ERR_OK == rc) |
| ; // good. nid & pid are valid. |
| else |
| { |
| if ((XZFIL_ERR_NOTFOUND != rc) && |
| (XZFIL_ERR_BADNAME != rc) && |
| (XZFIL_ERR_NOSUCHDEV != rc)) |
| { |
| // Log rc in case it needs investigation later. |
| char buf[200]; |
| str_sprintf(buf, "Unexpected error %d returned from " |
| "msg_mon_get_process_name", rc); |
| SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, |
| buf, 0); |
| } |
| char nid_pid_str[32]; |
| str_sprintf(nid_pid_str, "%d, %d", cpu_, pid_); |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| *diagsArea << DgSqlCode(-EXE_CANCEL_PROCESS_NOT_FOUND); |
| *diagsArea << DgString0(nid_pid_str); |
| reportError(diagsArea); |
| |
| step_ = DONE; |
| break; |
| } |
| } |
| else |
| { |
| char * qid = cancelTdb().qid_; |
| Lng32 qid_len = str_len(qid); |
| |
| // This static method is defined in SqlStats.cpp. It side-effects |
| // the nodeName and cpu_ according to the input qid. |
| if (getMasterCpu( |
| qid, qid_len, nodeName_, sizeof(nodeName_) - 1, cpu_) == -1) |
| { |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| *diagsArea << DgSqlCode(-EXE_RTS_INVALID_QID); |
| |
| reportError(diagsArea); |
| |
| step_ = DONE; |
| break; |
| } |
| } |
| |
| // Testpoints for hard to reproduce problems: |
| bool fakeError8028 = false; |
| fakeError8028 = (getenv("HP_FAKE_ERROR_8028") != NULL); |
| if ((cliGlobals->getCbServerClass() == NULL) || fakeError8028) |
| { |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| *diagsArea << DgSqlCode(-EXE_CANCEL_PROCESS_NOT_FOUND); |
| *diagsArea << DgString0("$ZSM000"); |
| |
| reportError(diagsArea); |
| |
| step_ = DONE; |
| break; |
| } |
| |
| ComDiagsArea *diagsArea = NULL; |
| bool fakeError2024 = false; |
| fakeError2024 = (getenv("HP_FAKE_ERROR_2024") != NULL); |
| |
| if (fakeError2024) |
| { |
| cbServer_ = NULL; |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| if (getenv("HP_FAKE_ERROR_8142")) |
| { |
| *diagsArea << DgSqlCode(-8142); |
| *diagsArea << DgString0(__FILE__); |
| *diagsArea << DgString1("cbServer_ is NULL"); |
| } |
| else |
| *diagsArea << DgSqlCode(-2024); |
| } |
| else |
| cbServer_ = cliGlobals->getCbServerClass()->allocateServerProcess( |
| &diagsArea, |
| cliGlobals->getEnvironment()->getHeap(), |
| nodeName_, |
| cpu_, |
| IPC_PRIORITY_DONT_CARE, |
| FALSE, // usesTransactions |
| TRUE, // waitedCreation |
| 2 // maxNowaitRequests -- cancel+(1 extra). |
| ); |
| |
| |
| if (cbServer_ == NULL || cbServer_->getControlConnection() == NULL) |
| { |
| ex_assert(diagsArea != NULL, |
| "allocateServerProcess failed, but no diags"); |
| |
| // look for SQLCode 2024 |
| // "*** ERROR[2024] Server Process $0~string0 |
| // is not running or could not be created. Operating System |
| // Error $1~int0 was returned." |
| // Remap to cancel-specfic error 8028. |
| if (diagsArea->contains(-2024) && |
| cancelTdb().actionIsCancel()) |
| { |
| diagsArea->deleteError(diagsArea->returnIndex(-2024)); |
| reportError(diagsArea, true, EXE_CANCEL_PROCESS_NOT_FOUND, |
| nodeName_, cpu_); |
| } |
| else |
| reportError(diagsArea); |
| |
| step_ = DONE; |
| break; |
| } |
| |
| // the reportError method was not called -- see break above. |
| if (diagsArea != NULL) |
| diagsArea->decrRefCount(); |
| |
| //Create the stream on the IpcHeap, since we don't dispose |
| // of it immediately. We just add it to the list of completed |
| // messages in the IpcEnv, and it is disposed of later. |
| |
| cancelStream_ = new (cliGlobals->getIpcHeap()) |
| CancelMsgStream(cliGlobals->getEnvironment(), this); |
| |
| cancelStream_->addRecipient(cbServer_->getControlConnection()); |
| |
| } |
| |
| step_ = SEND_MESSAGE; |
| |
| break; |
| } // end case NOT_STARTED |
| |
| #pragma warning (disable : 4291) |
| |
| case SEND_MESSAGE: |
| { |
| RtsHandle rtsHandle = (RtsHandle) this; |
| |
| if (cancelTdb().actionIsCancel()) |
| { |
| Int64 cancelStartTime = JULIANTIMESTAMP(); |
| |
| Lng32 firstEscalationInterval = cliGlobals->currContext()-> |
| getSessionDefaults()->getCancelEscalationInterval(); |
| |
| Lng32 secondEscalationInterval = cliGlobals->currContext()-> |
| getSessionDefaults()->getCancelEscalationMxosrvrInterval(); |
| |
| NABoolean cancelEscalationSaveabend = cliGlobals->currContext()-> |
| getSessionDefaults()->getCancelEscalationSaveabend(); |
| |
| bool cancelLogging = (TRUE == cliGlobals->currContext()-> |
| getSessionDefaults()->getCancelLogging()); |
| |
| CancelQueryRequest *cancelMsg = new (cliGlobals->getIpcHeap()) |
| CancelQueryRequest(rtsHandle, cliGlobals->getIpcHeap(), |
| cancelStartTime, |
| firstEscalationInterval, |
| secondEscalationInterval, |
| cancelEscalationSaveabend, |
| cancelTdb().getCommentText(), |
| str_len(cancelTdb().getCommentText()), |
| cancelLogging, |
| cancelTdb().action_ != ComTdbCancel::CancelByQid, |
| pid_, |
| cancelTdb().getCancelPidBlockThreshold()); |
| |
| #pragma warning (default : 4291) |
| |
| *cancelStream_ << *cancelMsg; |
| |
| cancelMsg->decrRefCount(); |
| } |
| else if (ComTdbCancel::Suspend == cancelTdb().action_) |
| { |
| |
| bool suspendLogging = (TRUE == cliGlobals->currContext()-> |
| getSessionDefaults()->getSuspendLogging()); |
| |
| #pragma warning (disable : 4291) |
| SuspendQueryRequest * suspendMsg = new (cliGlobals->getIpcHeap()) |
| SuspendQueryRequest(rtsHandle, cliGlobals->getIpcHeap(), |
| ComTdbCancel::Force == |
| cancelTdb().forced_, |
| suspendLogging); |
| #pragma warning (default : 4291) |
| |
| *cancelStream_ << *suspendMsg; |
| |
| suspendMsg->decrRefCount(); |
| } |
| else |
| { |
| ex_assert( |
| ComTdbCancel::Activate == cancelTdb().action_, |
| "invalid action for ExCancelTcb"); |
| |
| bool suspendLogging = (TRUE == cliGlobals->currContext()-> |
| getSessionDefaults()->getSuspendLogging()); |
| |
| #pragma warning (disable : 4291) |
| ActivateQueryRequest * activateMsg = new (cliGlobals->getIpcHeap()) |
| ActivateQueryRequest(rtsHandle, cliGlobals->getIpcHeap(), |
| suspendLogging); |
| #pragma warning (default : 4291) |
| |
| *cancelStream_ << *activateMsg; |
| |
| activateMsg->decrRefCount(); |
| } |
| |
| if ((cancelTdb().getAction() != ComTdbCancel::CancelByPname) && |
| (cancelTdb().getAction() != ComTdbCancel::CancelByNidPid)) |
| { |
| char * qid = cancelTdb().qid_; |
| Lng32 qid_len = str_len(qid); |
| |
| #pragma warning (disable : 4291) |
| RtsQueryId *rtsQueryId = new (cliGlobals->getIpcHeap()) |
| RtsQueryId( cliGlobals->getIpcHeap(), qid, qid_len); |
| #pragma warning (default : 4291) |
| |
| *cancelStream_ << *rtsQueryId; |
| rtsQueryId->decrRefCount(); |
| } |
| |
| // send a no-wait request to the cancel broker. |
| cancelStream_->send(FALSE); |
| |
| step_ = GET_REPLY; |
| // Come back when I/O completes. |
| return WORK_OK; |
| |
| break; |
| } // end case SEND_MESSAGE |
| |
| case GET_REPLY: |
| { |
| |
| // Handle general IPC error. |
| bool fakeError201 = false; |
| fakeError201 = (getenv("HP_FAKE_ERROR_201") != NULL); |
| if ((cbServer_->getControlConnection()->getErrorInfo() != 0) || |
| fakeError201) |
| { |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| cbServer_->getControlConnection()-> |
| populateDiagsArea( diagsArea, getGlobals()->getDefaultHeap()); |
| |
| if (fakeError201) |
| { |
| *diagsArea << DgSqlCode(-2034) << DgInt0(201) |
| << DgString0("I say") << DgString1("control broker"); |
| } |
| |
| if (diagsArea->contains(-8921)) |
| { |
| // Should not get timeout error 8921. Get a core-file |
| // of the SSMP and this process too so that this can be |
| // debugged. |
| cbServer_->getControlConnection()-> |
| dumpAndStopOtherEnd(true, false); |
| genLinuxCorefile("Unexpected timeout error"); |
| } |
| |
| reportError(diagsArea); |
| |
| step_ = DONE; |
| break; |
| } |
| |
| // See if stream has the reply yet. |
| if (!cancelStream_->moreObjects()) |
| return WORK_OK; |
| |
| #pragma warning (disable : 4291) |
| |
| ControlQueryReply *reply = new (cliGlobals->getIpcHeap()) |
| ControlQueryReply(INVALID_RTS_HANDLE, cliGlobals->getIpcHeap()); |
| |
| #pragma warning (default : 4291) |
| |
| *cancelStream_ >> *reply; |
| |
| if (reply->didAttemptControl()) |
| { |
| // yeaah! |
| cancelStream_->clearAllObjects(); |
| } |
| else |
| { |
| if (cancelStream_->moreObjects() && |
| cancelStream_->getNextObjType() == IPC_SQL_DIAG_AREA) |
| { |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| *cancelStream_ >> *diagsArea; |
| cancelStream_->clearAllObjects(); |
| |
| if ( retryQidNotActive_ && |
| (diagsArea->mainSQLCODE() == -EXE_SUSPEND_QID_NOT_ACTIVE) && |
| (++retryCount_ <= 60)) |
| { |
| SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, |
| "Retrying error 8672.", 0); |
| DELAY(500); |
| diagsArea->decrRefCount(); |
| step_ = SEND_MESSAGE; |
| break; |
| } |
| |
| reportError(diagsArea); |
| } |
| else |
| ex_assert(0, "Control failed, but no diagnostics."); |
| } |
| |
| step_ = DONE; |
| break; |
| } |
| case DONE: |
| { |
| if (cancelStream_) |
| { |
| cancelStream_->addToCompletedList(); |
| cancelStream_ = NULL; |
| } |
| if (cbServer_) |
| { |
| cbServer_->release(); |
| cbServer_ = NULL; |
| } |
| |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| up_entry->copyAtp(pentry_down); |
| up_entry->upState.parentIndex = pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.setMatchNo(1); |
| up_entry->upState.status = ex_queue::Q_NO_DATA; |
| qparent_.up->insert(); |
| |
| qparent_.down->removeHead(); |
| |
| step_ = NOT_STARTED; |
| break; |
| } |
| default: |
| ex_assert( 0, "Unknown step_."); |
| } |
| } // while have a request and have room for a reply. |
| return WORK_OK; |
| } |
| |
| void ExCancelTcb::reportError(ComDiagsArea *da, bool addCondition, |
| Lng32 SQLCode, char *nodeName, short cpu) |
| { |
| ex_queue_entry *down_entry = qparent_.down->getHeadEntry(); |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| |
| ComDiagsArea *prevDiagsArea = down_entry->getDiagsArea(); |
| if (prevDiagsArea) |
| da->mergeAfter(*prevDiagsArea); |
| |
| if (addCondition) |
| { |
| ComDiagsArea *diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| *diagsArea << DgSqlCode(-SQLCode); |
| |
| if (nodeName) |
| { |
| char processName[50]; |
| CliGlobals *cliGlobals = getGlobals()->castToExExeStmtGlobals()-> |
| castToExMasterStmtGlobals()->getCliGlobals(); |
| |
| cliGlobals->getCbServerClass()->getProcessName(nodeName, |
| (short)str_len(nodeName), cpu, processName); |
| *diagsArea << DgString0(processName); |
| } |
| |
| diagsArea->mergeAfter(*da); |
| up_entry->setDiagsArea(diagsArea); |
| da->decrRefCount(); |
| } |
| else |
| up_entry->setDiagsArea(da); |
| |
| up_entry->upState.status = ex_queue::Q_SQLERROR; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.parentIndex = down_entry->downState.parentIndex; |
| up_entry->upState.setMatchNo(0); |
| qparent_.up->insert(); |
| return; |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // Methods for CancelMsgStream. |
| // ----------------------------------------------------------------------- |
| |
| void CancelMsgStream::actOnSendAllComplete() |
| { |
| clearAllObjects(); |
| receive(FALSE); // FALSE means no-waited. |
| return; |
| } |
| |
| void CancelMsgStream::actOnReceiveAllComplete() |
| { |
| ExExeStmtGlobals *glob = cancelTcb_->getGlobals()-> |
| castToExExeStmtGlobals(); |
| cancelTcb_->tickleSchedulerWork(TRUE); |
| } |
| |