| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| |
| #include <stdio.h> |
| #include <stdarg.h> |
| #include <limits.h> |
| #include <time.h> |
| #include <sys/time.h> |
| #include <sys/syscall.h> |
| #include <unistd.h> |
| #include <pthread.h> |
| #include "Platform.h" |
| #include "ExSMCommon.h" |
| #include "ExSMGlobals.h" |
| #include "ExSMEvent.h" |
| #include "ExSMTask.h" |
| #include "ExSMTrace.h" |
| #include "Ipc.h" |
| #include "Ex_esp_msg.h" |
| #include "ComQueue.h" |
| |
| //------------------------------------------------------------------------- |
| // Message text buffer for SeaMonster reader thread assertion failures |
| //------------------------------------------------------------------------- |
| __thread char ExSM_AssertBuf[32]; |
| |
| //------------------------------------------------------------------------- |
| // Compare two SeaMonster targets |
| //------------------------------------------------------------------------- |
| bool ExSM_TargetsEqual(const sm_target_t &a, const sm_target_t &b) |
| { |
| bool result = false; |
| if (a.node == b.node && a.pid == b.pid && a.tag == b.tag && a.id == b.id) |
| result = true; |
| return result; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Return the thread identifier |
| //------------------------------------------------------------------------- |
| int ExSM_GetThreadID() |
| { |
| return (int) syscall(__NR_gettid); |
| } |
| |
| //------------------------------------------------------------------------- |
| // Report the largest chunk size we can transmit using SeaMonster. We |
| // call SeaMonster to retrieve the absolute maximum and then use 50% |
| // of that value as our effective maximum. |
| // ------------------------------------------------------------------------- |
| uint32_t EXSM_INTRA_MAX_CHUNK = 0; |
| uint32_t EXSM_INTER_MAX_CHUNK = 0; |
| uint32_t ExSM_GetMaxChunkSize(NABoolean intra) |
| { |
| if ( (intra & EXSM_INTRA_MAX_CHUNK == 0) || |
| (!intra & EXSM_INTER_MAX_CHUNK == 0) ) |
| { |
| int actualMax = 0; |
| int rc = 0; |
| |
| if (intra) |
| rc = SM_ctl(0, SM_GET_MAXINTRABUFFSIZE, &actualMax); |
| else |
| rc = SM_ctl(0, SM_GET_BUFFSIZE, &actualMax); |
| |
| exsm_assert_rc(rc, "SM_ctl"); |
| exsm_assert(actualMax > 0, |
| "SeaMonster buffer size must be greater than 0"); |
| |
| if (intra) |
| EXSM_INTRA_MAX_CHUNK = (uint32_t) (actualMax / 2); |
| else |
| EXSM_INTER_MAX_CHUNK = (uint32_t) (actualMax / 2); |
| |
| EXSM_TRACE(EXSM_TRACE_SEND | EXSM_TRACE_SM_CALLS, |
| "SM_ctl sm max %d sql max %u", (int) actualMax, |
| intra? (unsigned int) EXSM_INTRA_MAX_CHUNK : (unsigned int) EXSM_INTER_MAX_CHUNK); |
| } |
| |
| if (intra) |
| return EXSM_INTRA_MAX_CHUNK; |
| else |
| return EXSM_INTER_MAX_CHUNK; |
| } |
| |
| //------------------------------------------------------------------------- |
| // SM initialize |
| //------------------------------------------------------------------------- |
| int32_t ExSM_Initialize(ExSMGlobals *smGlobals, |
| ExExeStmtGlobals *stmtGlobals) |
| { |
| // Add an INIT event to the in-memory trace |
| ExSMEvent::add(ExSMEvent::Init); |
| |
| // Initialize SeaMonster |
| int32_t rc = SM_init(0, smGlobals->getSQNodeNum(), 0); |
| char buf[30]; |
| SM_strerror_r(rc, buf, sizeof(buf)); |
| EXSM_TRACE(EXSM_TRACE_INIT|EXSM_TRACE_SM_CALLS, "SM_init(%d) returned %s", |
| (int) smGlobals->getSQNodeNum(), buf); |
| if (rc != 0) |
| { |
| ExSMEvent::add(ExSMEvent::SMError, NULL, rc); |
| ExSMGlobals::addDiags("SM_init", rc, stmtGlobals); |
| } |
| else |
| { |
| // After SM is initialized but before the reader thread is |
| // created, register a special SM ID for executor internal |
| // communication |
| int64_t exeInternalSMID = smGlobals->getExeInternalSMID(); |
| int32_t rc2 = ExSM_Register(exeInternalSMID); |
| exsm_assert_rc(rc2, "ExSM_Register"); |
| } |
| |
| return rc; |
| } |
| |
| //------------------------------------------------------------------------- |
| // SM finalize |
| //------------------------------------------------------------------------- |
| int32_t ExSM_Finalize(ExSMGlobals *smGlobals) |
| { |
| int32_t rc = 0; |
| |
| // Cancel the special SM ID for executor internal communication |
| int64_t exeInternalSMID = smGlobals->getExeInternalSMID(); |
| ExSM_Cancel(exeInternalSMID); |
| |
| // Call SM_finalize |
| ExSMEvent::add(ExSMEvent::Finalize); |
| rc = SM_finalize(0); |
| char buf[30]; |
| SM_strerror_r(rc, buf, sizeof(buf)); |
| EXSM_TRACE(EXSM_TRACE_EXIT, "SM_finalize returned %s", buf); |
| |
| return rc; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Send a SeaMonster message |
| //------------------------------------------------------------------------- |
| int32_t ExSM_Send(ExSMGlobals *smGlobals, // IN |
| const sm_target_t &target, // IN |
| const void *data, // IN |
| size_t numBytes, // IN |
| ExSM_MessageType msgType, // IN |
| bool isPrepostRequired, // IN |
| bool &messageWasSent, // OUT |
| int maxRetries, // IN (-1=try forever) |
| int64_t sendCount, // IN (for tracing only) |
| const void *firstChunk) // IN (useful for queued buffers) |
| { |
| // The sendCount value is provided by the caller and is used in |
| // tracing calls only. For example a connection object can pass in a |
| // value representing the count of sends so far on that connection. |
| |
| exsm_assert(smGlobals, "Invalid SM globals pointer"); |
| |
| Int32 errorRetrys = 0; |
| const Int32 NumErrorRetries = 10; |
| timespec retryintervals[NumErrorRetries] = { |
| { 0, 10*1000*1000 } // 10 ms |
| , { 0, 100*1000*1000 } // 100 ms |
| , { 1, 0 } // 1 second |
| , { 3, 0 } // 3 seconds |
| , { 6, 0 } // 6 seconds |
| , { 10, 0 } // 10 seconds |
| , { 15, 0 } // 15 seconds |
| , { 15, 0 } // 15 seconds |
| , { 15, 0 } // 15 seconds |
| , { 15, 0 } // 15 seconds |
| } ; |
| |
| int32_t rc = 0; |
| int32_t retcode = 0; |
| |
| const char *tagForTracing = (msgType == EXSM_MSG_REQUEST ? "DN" : |
| msgType == EXSM_MSG_REPLY ? "UP" : |
| msgType == EXSM_MSG_SHORT ? "SD" : |
| "??"); |
| |
| // The count variable is for tracing only and is based on a global |
| // message counter in SM globals |
| uint32_t count = |
| (msgType == EXSM_MSG_REQUEST ? (1 + smGlobals->getSendRequestCount()) : |
| msgType == EXSM_MSG_REPLY ? (1 + smGlobals->getSendReplyCount()) : |
| msgType == EXSM_MSG_SHORT ? (1 + smGlobals->getSendShortCount()) : |
| 0); |
| |
| bool done = false; |
| int numRetries = 0; |
| |
| // This function has the ability to retry SM_send repeatedly in a |
| // retry loop. We sleep after each attempt so the retry loop doesn't |
| // turn into a busy spin. The default sleep is currently 1000 |
| // nano000seconds = 1 millisecond. |
| UInt32 nanosecondsToSleep = 1000000; |
| |
| // Loop until one of the following is encountered |
| // * Successful send |
| // * Non-retryable error |
| // * Retry limit is reached |
| |
| while (!done) |
| { |
| EXSM_TRACE(EXSM_TRACE_SEND | EXSM_TRACE_SM_CALLS, |
| "SEND %d %s %d:%d:%" PRId64 ":%d:0x%c bytes %d " |
| "prepost %d count %d", |
| (int) count, tagForTracing, |
| (int) target.node, (int) target.pid, target.id, |
| (int) ExSMTag_GetTagWithoutQualifier(target.tag), |
| (char) ExSMTag_GetQualifierDisplay(target.tag), |
| (int) numBytes, (int) isPrepostRequired, sendCount+1); |
| |
| sm_chunk_t chunk; |
| char *originalBuf = NULL; |
| |
| memset(&chunk, 0, sizeof(sm_chunk_t)); |
| chunk.tgt = target; |
| chunk.buff = (char *) data; |
| chunk.size = numBytes; |
| if (isPrepostRequired) |
| chunk.flags |= SM_FLAG_SEND_TO_PREPOST; |
| |
| // firstChunk is only true for preposted buffer and null for short sends |
| // only for preposted buffers we need to set the chunk handle to the starting buffer address |
| // during the firstChunk is different from the chunk that we are sending |
| // Also add the firstChunk to in memeory trace only when sending second chunk and beyond. |
| if (firstChunk) |
| { |
| chunk.handle = (char *) firstChunk; |
| if (chunk.buff != chunk.handle) |
| originalBuf = (char *)firstChunk; |
| } |
| |
| // Add a SEND event to the in-memory trace. Do not add the event |
| // if this is a retry. |
| ExSMEvent *event = NULL; |
| if (numRetries == 0) |
| event = ExSMEvent::add(ExSMEvent::Send, &target, |
| 0, chunk.size, chunk.flags, (int64_t) chunk.buff, (int64_t)originalBuf); |
| |
| // SEND |
| rc = SM_put(0, 1, &chunk); |
| retcode = (rc == SM_ERR_INCHUNK ? chunk.errcode : rc); |
| |
| // Add retcode to the trace event (this does not generate a |
| // new event) |
| if (event) |
| event->setOptional1(retcode); |
| |
| if (retcode != 0) |
| { |
| char buf1[30]; |
| char buf2[30]; |
| SM_strerror_r(rc, buf1, sizeof(buf1)); |
| SM_strerror_r(chunk.errcode, buf2, sizeof(buf2)); |
| EXSM_TRACE(EXSM_TRACE_SEND, "SEND %d ERROR %s %s data %p", |
| (int) count, buf1, buf2, data); |
| } |
| |
| // Cases to consider |
| // |
| // (a) Successful send. Break from the while loop. |
| // |
| // (b) The buffer is queued. Treat this the same as successful |
| // send. Break from the while loop. |
| // |
| // (c) Return code is EAGAIN. Loop again unless the timeout has |
| // expired. |
| // |
| // (d) Return code is SM_ERR_REMOTE_TARGET. We get this error when the other end (target) |
| // cancelled the query id. In most cases this could happen in the middle of execution |
| // of a query and there is node down and the other end is ahead in processing the |
| // clean up and cancled the query id already but did not go away yet. |
| // There is no way to know if the other end cancelled because it is going away |
| // or it is a real error. So retry the send for a minute or so and if the other end |
| // went away it will get SM_ERR_NO_PEER eventually for which we just ignore the error |
| // in ESP and exit and in master it will be propagated to the user. |
| // |
| // (e) Other errors. Break from the while loop and return the |
| // error code. |
| |
| if (retcode == 0 || retcode == SM_ERR_QUEUED) |
| { |
| // Cases (a) and (b) |
| messageWasSent = true; |
| done = true; |
| } |
| else if (retcode == SM_ERR_EAGAIN) |
| { |
| // Case (c) |
| // Loop again if the timeout has not expired. |
| // Retry if maxRetries is less than zero (the caller wants to |
| // retry forever) or we have not yet reached maxRetries |
| if (maxRetries < 0 || numRetries < maxRetries) |
| { |
| timespec tspec = { 0, (long) nanosecondsToSleep }; |
| nanosleep(&tspec, NULL); |
| numRetries++; |
| } |
| else |
| { |
| done = true; |
| } |
| } |
| else if (retcode == SM_ERR_REMOTE_TARGET) |
| { |
| // Case (d) |
| // Loop till the number of error retries are not expired. |
| // The sleep for EGAIN is different than error retries for the node down scenarios |
| // as EAGAIN's should not wait that long to retry. |
| if (errorRetrys < NumErrorRetries) |
| nanosleep(&retryintervals[errorRetrys++], NULL); |
| else |
| done = true; |
| } |
| else |
| { |
| // Case (e) |
| done = true; |
| } |
| |
| // If we are about to exit the loop and an event was not generated |
| // in this iteration, generate an event now |
| if (done && event == NULL) |
| ExSMEvent::add(ExSMEvent::SendRetry, &target, retcode, numRetries); |
| |
| } // while (!done) |
| |
| if (messageWasSent) |
| { |
| if (msgType == EXSM_MSG_REQUEST) |
| smGlobals->incrSendRequestCount(); |
| else if (msgType == EXSM_MSG_REPLY) |
| smGlobals->incrSendReplyCount(); |
| else if (msgType == EXSM_MSG_SHORT) |
| smGlobals->incrSendShortCount(); |
| } |
| |
| return retcode; |
| } |
| |
| int32_t ExSM_Post(const sm_target_t &target, |
| size_t dataSize, |
| const void *data, |
| ExSMTask *smTask, |
| NABoolean isServer) // IN (for tracing only) |
| { |
| sm_chunk_t chunk; |
| memset(&chunk, 0, sizeof(sm_chunk_t)); |
| chunk.tgt = target; |
| chunk.size = dataSize; |
| chunk.buff = (char *)data; |
| chunk.handle = (char *)smTask; |
| chunk.flags |= SM_FLAG_PREPOST; |
| |
| int32_t rc = SM_put(0, 1, &chunk); |
| |
| int32_t result = (rc == SM_ERR_INCHUNK ? chunk.errcode : rc); |
| |
| // Add a POST event to the in-memory trace |
| ExSMEvent::add(ExSMEvent::Post, &target, |
| result, chunk.size, chunk.flags, (int64_t) chunk.buff); |
| |
| EXSM_TRACE(EXSM_TRACE_BUFFER | EXSM_TRACE_SM_CALLS, |
| "PREPOST %s %d:%d:%" PRId64 ":%d:0x%c %p %d rc %d %d", |
| (isServer ? "S" : "C"), |
| (int) target.node, (int) target.pid, target.id, |
| (int) ExSMTag_GetTagWithoutQualifier(target.tag), |
| (char) ExSMTag_GetQualifierDisplay(target.tag), |
| chunk.buff, (int) dataSize, |
| (int) rc, (int) chunk.errcode); |
| |
| return result; |
| } |
| |
| |
| //------------------------------------------------------------------------- |
| // Function to send a SeaMonster short message |
| //------------------------------------------------------------------------- |
| int32_t ExSM_SendShortMessage(ExSMGlobals *smGlobals, |
| const sm_target_t &target, |
| const void *data, |
| size_t bytesToSend) |
| { |
| bool messageWasSent = false; |
| int32_t result = |
| ExSM_Send(smGlobals, |
| target, |
| data, |
| bytesToSend, |
| EXSM_MSG_SHORT, |
| false, // isPrepostRequired |
| messageWasSent, // OUT |
| -1, // retry forever |
| 0, NULL); // send count (for tracing only) |
| return result; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Wrapper function used by the reader thread to set the length of an |
| // IPC message buffer. Eliminates the need for the reader thread |
| // function to include or be aware of our IPC classes. |
| //------------------------------------------------------------------------- |
| void ExSM_SetMessageLength(void *buf, size_t len) |
| { |
| IpcMessageBuffer *msgBuf = (IpcMessageBuffer *) buf; |
| msgBuf->setMessageLength(len); |
| } |
| |
| //------------------------------------------------------------------------- |
| // Wrapper function used by the reader thread to allocate an |
| // IpcMessageBuffer. The size of the data region in the new buffer |
| // will be dataBytes. Eliminates the need for the reader thread |
| // function to include or be aware of our IPC classes. |
| // ------------------------------------------------------------------------- |
| void *ExSM_AllocateMessageBuffer(size_t dataBytes, |
| NAMemory *threadSafeHeap) |
| { |
| IpcMessageBuffer *msgBuf = |
| IpcMessageBuffer::allocate(dataBytes, |
| NULL, // IpcMessageStreamBase * |
| threadSafeHeap, // NAMemory * |
| 0); |
| return msgBuf; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Function to return the SeaMonster prepost address of a message |
| // buffer. The prepost address is the first byte of data that gets |
| // transmitted. The IpcMessageBuffer header is not transmitted. |
| //------------------------------------------------------------------------- |
| void *ExSM_GetMessageBufferPrepostAddr(void *data) |
| { |
| void *result = NULL; |
| if (data) |
| { |
| IpcMessageBuffer *msgBuf = (IpcMessageBuffer *) data; |
| result = msgBuf->data(); |
| } |
| return result; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Function to return the address of a message buffer given the |
| // SeaMonster prepost address. The prepost address immediately follows |
| // an IpcMessageBuffer instance. |
| //------------------------------------------------------------------------- |
| void *ExSM_GetMessageBufferAddr(void *prepostAddr) |
| { |
| void *result = NULL; |
| if (prepostAddr) |
| result = ((char *) prepostAddr - sizeof(IpcMessageBuffer)); |
| |
| return result; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Function to return the IPC message object type of the first object |
| // that follows the header (the header is an InternalMsgHdrInfoStruct) |
| // in the IpcMessageBuffer pointed to by the data argument. |
| // |
| // If the object type cannot be determined "UNKNOWN" is returned. |
| //------------------------------------------------------------------------- |
| const char *ExSM_GetMessageBufferType(void *data, size_t dataBytes) |
| { |
| if (data && dataBytes >= sizeof(IpcMessageObj)) |
| { |
| IpcMessageObj *msgObj = (IpcMessageObj *) data; |
| IpcMessageObj *next = msgObj->getNextFromOffset(); |
| if (next && ((void *) next > data) && |
| (((char *) data + dataBytes) >= |
| ((char *) next + sizeof(IpcMessageObj)))) |
| { |
| Int32 type = next->getType(); |
| return getESPMessageObjTypeString((ESPMessageObjTypeEnum) type); |
| } |
| } |
| |
| return "UNKNOWN"; |
| } |
| |
| //------------------------------------------------------------------------- |
| // Function to map between virtual and real SQ node numbers |
| //------------------------------------------------------------------------- |
| int32_t ExSM_GetNodeID(int32_t nodeNum) |
| { |
| // Two cases to consider: |
| // |
| // (a) Running on a cluster, or running on a workstation with both |
| // Seaquest and SeaMonster using virtual nodes. This is default |
| // behavior. The function will return the value of nodeNum that was |
| // passed in. |
| // |
| // (b) This is a workstation, Seaquest is running with virtual |
| // nodes, but SeaMonster is not running with virtual nodes. From |
| // SeaMonster's point of view there is only one node and it does |
| // not matter how many virtual nodes we have. The function will |
| // always return 0. |
| // |
| // This case is detected when: |
| // SQ_VIRTUAL_NODES env var is not NULL |
| // SM_VIRTUALNODE env var is "0" |
| |
| static bool firstTime = true; |
| static bool smVirtualNodes = true; |
| |
| if (firstTime) |
| { |
| firstTime = false; |
| |
| if (getenv("SQ_VIRTUAL_NODES")) |
| { |
| // If we are running with SQ virtual nodes, but without |
| // SeaMonster virtual nodes, we need to set smVirtualNodes to |
| // false |
| char *sm_envvar = getenv("SM_VIRTUALNODE"); |
| if (sm_envvar && sm_envvar[0] == '0') |
| smVirtualNodes = false; |
| } |
| } |
| |
| if (smVirtualNodes) |
| return nodeNum; |
| else |
| return 0; // from SeaMonster's point of view it does not matter how |
| // many virtual nodes we have, we are on the same hw. |
| } |
| |
| //------------------------------------------------------------------------- |
| // We maintain a global list of completed send buffers |
| // * Each list node is an instance of struct ExSM_MessageList |
| // * The list is protected by a global mutex |
| // * The list nodes are allocated on a thread-safe heap |
| // * The list is only visible in this file. Global functions are |
| // provided to add and remove elements. |
| //------------------------------------------------------------------------- |
| struct ExSM_MessageList |
| { |
| IpcMessageBuffer *elem_; |
| ExSM_MessageList *next_; |
| }; |
| |
| // Global list pointer |
| static ExSM_MessageList *EXSM_COMPLETED_SENDS = NULL; |
| |
| // Global mutex |
| static pthread_mutex_t COMPLETED_SEND_MUTEX = PTHREAD_MUTEX_INITIALIZER; |
| |
| // Function to add a list element |
| void ExSM_AddCompletedSendBuffer(void *buf) |
| { |
| // Get a pointer to the thread-safe IPC heap |
| ExSMGlobals *glob = ExSMGlobals::GetExSMGlobals(); |
| NAMemory *heap = glob->getThreadSafeHeap(); |
| exsm_assert(heap, "Invalid heap pointer"); |
| |
| // Allocate a new list node |
| ExSM_MessageList *newElem = (ExSM_MessageList *) |
| heap->allocateMemory(sizeof(ExSM_MessageList)); |
| |
| // Lock the list |
| int rc = pthread_mutex_lock(&COMPLETED_SEND_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| |
| // Add the new node to the head of the list |
| newElem->elem_ = (IpcMessageBuffer *) buf; |
| newElem->next_ = EXSM_COMPLETED_SENDS; |
| EXSM_COMPLETED_SENDS = newElem; |
| |
| // Unlock the list |
| rc = pthread_mutex_unlock(&COMPLETED_SEND_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_unlock"); |
| } |
| |
| // Function to remove a list element. The element is returned in the |
| // buf output argument. The return value will be true if the list was |
| // non-empty and an element did get removed. |
| bool ExSM_RemoveCompletedSendBuffer(void *&buf) |
| { |
| buf = NULL; |
| bool removed = false; |
| |
| if (EXSM_COMPLETED_SENDS) |
| { |
| // Lock the list |
| int rc = pthread_mutex_lock(&COMPLETED_SEND_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| |
| // Remove the head node |
| ExSM_MessageList *headNode = EXSM_COMPLETED_SENDS; |
| if (headNode) |
| EXSM_COMPLETED_SENDS = headNode->next_; |
| |
| // Unlock the list |
| rc = pthread_mutex_unlock(&COMPLETED_SEND_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_unlock"); |
| |
| if (headNode) |
| { |
| // Keep a copy of the first element |
| buf = headNode->elem_; |
| |
| // Delete the old head node |
| ExSMGlobals *glob = ExSMGlobals::GetExSMGlobals(); |
| NAMemory *heap = glob->getThreadSafeHeap(); |
| exsm_assert(heap, "Invalid heap pointer"); |
| heap->deallocateMemory(headNode); |
| |
| removed = true; |
| } |
| } |
| |
| return removed; |
| } |
| |
| //------------------------------------------------------------------------- |
| // We maintain a global collection of active IDs and functions to |
| // register, cancel, and find an ID. Access to the collection is |
| // protected by a mutex. The register function is a no-op if the ID |
| // is already in the collection, and the cancel function is a no-op |
| // if the ID is not in the collection. |
| //------------------------------------------------------------------------- |
| HashQueue *EXSM_IDS = NULL; |
| pthread_mutex_t EXSM_IDS_MUTEX = PTHREAD_MUTEX_INITIALIZER; |
| |
| // Register an ID and add it to the global collection of query |
| // IDs. The function is a no-op if the ID is already in the |
| // collection. |
| // |
| // Returns 0 if no errors were encountered, otherwise a SeaMonster |
| // error code. |
| int32_t ExSM_Register(const int64_t &smQueryID) |
| { |
| int32_t smResult = 0; |
| int rc = 0; |
| |
| // Lock the collection |
| rc = pthread_mutex_lock(&EXSM_IDS_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| |
| // Create the collection if necessary |
| if (EXSM_IDS == NULL) |
| { |
| ExSMGlobals *glob = ExSMGlobals::GetExSMGlobals(); |
| NAMemory *heap = glob ? glob->getThreadSafeHeap() : NULL; |
| exsm_assert(heap, "Invalid heap pointer"); |
| |
| EXSM_IDS = new (heap) HashQueue(heap); |
| exsm_assert(EXSM_IDS, "Invalid hash queue pointer"); |
| } |
| |
| // Register the ID if it is not already in the collection |
| bool found = ExSM_FindID(smQueryID, false); |
| if (found) |
| { |
| EXSM_TRACE(EXSM_TRACE_SM_CALLS, |
| "REGISTER %" PRId64 " already active, entries %d\n", |
| smQueryID, (int) EXSM_IDS->numEntries()); |
| } |
| else |
| { |
| // Add a REGISTER event to the in-memory trace. The ID is passed |
| // in a dummy sm_target_t structure. |
| sm_target_t tgt; |
| memset(&tgt, 0, sizeof(tgt)); |
| tgt.id = smQueryID; |
| ExSMEvent::add(ExSMEvent::Register, &tgt); |
| |
| // Register the ID. Store the SM return value and return it to the |
| // caller. This function does not react to SM errors -- that is |
| // left to the caller. |
| smResult = SM_register(0, smQueryID); |
| EXSM_TRACE(EXSM_TRACE_CANCEL | EXSM_TRACE_SM_CALLS, |
| "SM_register %" PRId64 " rc %d entries %d", |
| smQueryID, (int) smResult, (int) EXSM_IDS->numEntries()); |
| |
| if (smResult == 0) |
| { |
| // Add the ID to the global collection of active IDs |
| EXSM_IDS->insert((char *) &smQueryID, sizeof(smQueryID), |
| (void *) smQueryID); |
| } |
| } |
| |
| // Unlock the collection |
| rc = pthread_mutex_unlock(&EXSM_IDS_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| |
| return smResult; |
| } |
| |
| // Cancel an ID and remove it from the global collection of query |
| // IDs. The function is a no-op if the ID is not already in the |
| // collection. |
| // |
| // Returns 0 if no errors were encountered, otherwise a SeaMonster |
| // error code. |
| int32_t ExSM_Cancel(const int64_t &smQueryID) |
| { |
| int32_t smResult = 0; |
| |
| if (EXSM_IDS) |
| { |
| int rc = 0; |
| |
| // Lock the collection |
| rc = pthread_mutex_lock(&EXSM_IDS_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| |
| // Cancel the ID if it is already in the collection |
| bool found = ExSM_FindID(smQueryID, false); |
| if (found) |
| { |
| // Remove the ID from the global collection of active IDs |
| EXSM_IDS->remove((char *) &smQueryID, sizeof(smQueryID), |
| (void *) smQueryID); |
| |
| // Add a CANCEL event to the in-memory trace. The ID is passed |
| // in a dummy sm_target_t structure. |
| sm_target_t tgt; |
| memset(&tgt, 0, sizeof(tgt)); |
| tgt.id = smQueryID; |
| ExSMEvent::add(ExSMEvent::Cancel, &tgt); |
| |
| // Cancel the ID. Store the SM return value and return it to the |
| // caller. This function does not react to SM errors -- that is |
| // left to the caller. |
| smResult = SM_cancel(0, smQueryID); |
| EXSM_TRACE(EXSM_TRACE_CANCEL | EXSM_TRACE_SM_CALLS, |
| "SM_cancel %" PRId64 " rc %d entries %d", |
| smQueryID, (int) smResult, (int) EXSM_IDS->numEntries()); |
| } |
| else |
| { |
| EXSM_TRACE(EXSM_TRACE_SM_CALLS, |
| "CANCEL %" PRId64 " is not active, entries %d\n", |
| smQueryID, (int) EXSM_IDS->numEntries()); |
| } |
| |
| // Unlock the collection |
| rc = pthread_mutex_unlock(&EXSM_IDS_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| } |
| |
| return smResult; |
| } |
| |
| // Find an ID in the global collection. If lock is true, access is |
| // protected by a mutex. |
| bool ExSM_FindID(const int64_t &smQueryID, bool lock) |
| { |
| bool result = false; |
| |
| if (EXSM_IDS) |
| { |
| int rc = 0; |
| |
| // Lock the collection if the caller requested locking |
| if (lock) |
| { |
| rc = pthread_mutex_lock(&EXSM_IDS_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| } |
| |
| // Walk the hash bucket looking for a matching ID |
| void *entry = NULL; |
| EXSM_IDS->position((char *) &smQueryID, sizeof(smQueryID)); |
| while ((entry = EXSM_IDS->getNext()) != NULL) |
| { |
| int64_t current = (int64_t) entry; |
| if (current == smQueryID) |
| { |
| result = true; |
| break; |
| } |
| } |
| |
| // Unlock the collection if the caller requested locking |
| if (lock) |
| { |
| rc = pthread_mutex_unlock(&EXSM_IDS_MUTEX); |
| exsm_assert_rc(rc, "pthread_mutex_lock"); |
| } |
| } |
| |
| return result; |
| } |