| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: spinfocallback.cpp |
| * Description: The SPInfo APIs provided for LmRoutines to call back |
| * SPInfo methods. |
| * |
| * Created: |
| * Language: C++ |
| * |
| ***************************************************************************** |
| */ |
| |
| #include "Platform.h" // 64-BIT |
| #include "spinfoCallback.h" |
| #include "spinfo.h" |
| #include "udrextrn.h" |
| #include "exp_expr.h" |
| #include "sql_buffer.h" |
| #include "ex_queue.h" |
| #include "udrutil.h" |
| #include "LmRoutineCppObj.h" |
| // include the file generated by this command and checked into git: |
| // javah -d $TRAF_HOME/../sql/langman org.trafodion.sql.udr.UDR |
| #include "org_trafodion_sql_udr_UDR.h" |
| |
| extern UdrGlobals *UDR_GLOBALS; |
| extern Int32 PerformWaitedReplyToClient(UdrGlobals *UdrGlob, |
| UdrServerDataStream &msgStream); |
| extern void doMessageBox(UdrGlobals *UdrGlob, Int32 trLevel, |
| NABoolean moduleType, const char *moduleName); |
| |
| extern NABoolean allocateReplyRow(UdrGlobals *UdrGlob, |
| SqlBuffer &replyBuffer, // [IN] A reply buffer |
| queue_index parentIndex, // [IN] Identifies the request queue entry |
| Int32 replyRowLen, // [IN] Length of reply row |
| char *&newReplyRow, // [OUT] The allocated reply row |
| ControlInfo *&newControlInfo, // [OUT] The allocated ControlInfo entry |
| ex_queue::up_status upStatus // [IN] Q_OK_MMORE, Q_NO_DATA, Q_SQLERROR |
| ); |
| extern NABoolean allocateEODRow(UdrGlobals *UdrGlob, |
| SqlBuffer &replyBuffer, |
| queue_index parentIndex); |
| |
| |
| Int32 sendReqBufferWaitedReply(UdrGlobals *udrGlobals, |
| SPInfo *sp, |
| Int32 tableIndex) |
| { |
| //get datastream |
| UdrServerDataStream *dataStream = sp->getDataStream(); |
| |
| // construct a reply with flags indicating more data required |
| // for this table index. |
| UdrDataBuffer *reply = new (*dataStream, sp->getReplyBufferSize()) |
| UdrDataBuffer(sp->getReplyBufferSize(), UdrDataBuffer::UDR_DATA_OUT, NULL); |
| |
| //Sql buffer is initilized when created on IPC stream. no need to init again |
| //because the buffer state will change from INUSE to EMPTY state. This will |
| //cause IPC to not even send the buffer as part of reply. |
| |
| //Indicate the table index so that client can send a buffer |
| //that is applicable to this table. |
| reply->setTableIndex(tableIndex); |
| |
| //indicate that server expects a buffer from client. |
| reply->setSendMoreData(TRUE); |
| |
| // set SPInfo state to INVOKED_GETROWS for error checking purposes. |
| // Also helps debugging. |
| if(sp->getSPInfoState() != SPInfo::INVOKED) |
| { |
| return SQLUDR_ERROR; |
| } |
| sp->setSPInfoState(SPInfo::INVOKED_GETROWS); |
| |
| //now wait for IO completion. Once IO completes |
| //necessary updates to spInfo are already performed. |
| Int32 result = PerformWaitedReplyToClient(udrGlobals, *dataStream); |
| |
| // reset SpInfo state back to INVOKED state. |
| if(sp->getSPInfoState() != SPInfo::INVOKED_GETROWS) |
| { |
| return SQLUDR_ERROR; |
| } |
| sp->setSPInfoState(SPInfo::INVOKED); |
| |
| return result; |
| } |
| |
| void SpInfoGetNextRow(char *rowData, |
| Int32 tableIndex, |
| SQLUDR_Q_STATE *queue_state |
| ) |
| { |
| |
| UdrGlobals *udrGlobals = UDR_GLOBALS; |
| |
| |
| const char *moduleName = "SPInfo::SpInfoGetNextRow"; |
| |
| doMessageBox(udrGlobals, |
| TRACE_SHOW_DIALOGS, |
| udrGlobals->showInvoke_, |
| moduleName); |
| |
| // Assumption here that there is always one SP when table mapping |
| // UDR is used. |
| SPInfo *sp = udrGlobals->getCurrSP(); |
| if(!sp) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return; |
| } |
| |
| // Access SQL buffer that corresponds to table index |
| SqlBuffer *getSqlBuf = sp->getReqSqlBuffer(tableIndex); |
| if (getSqlBuf == NULL) |
| { |
| //Perform a waited reply to client requesting for more data, |
| Int32 status = sendReqBufferWaitedReply(udrGlobals, sp, tableIndex); |
| if(status == SQLUDR_ERROR) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return; |
| } |
| |
| //expect sql buffer to be populated in spinfo now. So get it. |
| getSqlBuf = sp->getReqSqlBuffer(tableIndex); |
| |
| if (getSqlBuf == NULL) |
| { |
| //Something is wrong this time. |
| *queue_state = SQLUDR_Q_CANCEL; |
| return; |
| } |
| } |
| |
| down_state downState; |
| tupp requestRow; |
| NABoolean endOfData = getSqlBuf->moveOutSendOrReplyData( |
| TRUE, // [IN] sending? (vs. replying) |
| &downState, // [OUT] queue state |
| requestRow, // [OUT] new data tupp_descriptor |
| NULL, // [OUT] new ControlInfo area |
| NULL, // [OUT] new diags tupp_descriptor |
| NULL); // [OUT] new stats area |
| |
| if (endOfData) // indicates no more tupples in the buffer. |
| { |
| //Check if the client already indicated that the current buffer |
| //was the last buffer. If it is last buffer, return. |
| if(sp->isLastReqSqlBuffer(tableIndex)) |
| { |
| *queue_state = SQLUDR_Q_EOD; |
| return; |
| } |
| |
| //As a safety measure, memset the sql buffer since we expect new |
| //data in sql buffer after performing waited send below. |
| memset(getSqlBuf,'\0', sp->getRequestBufferSize()); |
| |
| //Perform a waited reply to client requesting for more data, |
| Int32 status = sendReqBufferWaitedReply(udrGlobals, sp, tableIndex); |
| if(status == SQLUDR_ERROR) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return; |
| } |
| |
| // Now get the new buffer. |
| getSqlBuf = sp->getReqSqlBuffer(tableIndex); |
| if (getSqlBuf == NULL) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return; |
| } |
| |
| //Extract the row again. |
| endOfData = getSqlBuf->moveOutSendOrReplyData( |
| TRUE, // [IN] sending? (vs. replying) |
| &downState, // [OUT] queue state |
| requestRow, // [OUT] new data tupp_descriptor |
| NULL, // [OUT] new ControlInfo area |
| NULL, // [OUT] new diags tupp_descriptor |
| NULL); |
| |
| if (endOfData) |
| { |
| |
| // It is possible for client to return a empty sql buffer and |
| // and indicate that it is the last buffer. In this case, return |
| // EOD. before that check for any last set of rows if any. |
| if(sp->isLastReqSqlBuffer(tableIndex)) |
| { |
| *queue_state = SQLUDR_Q_EOD; |
| return; |
| } |
| |
| //we just got the buffer, it cannot be empty unless |
| //it is last buffer indication. |
| *queue_state = SQLUDR_Q_CANCEL; |
| return; |
| } |
| } |
| |
| memset(rowData, '\0', sp->getInputRowLength(tableIndex)); |
| memcpy(rowData, requestRow.getDataPointer(), sp->getInputRowLength(tableIndex)); |
| |
| *queue_state = SQLUDR_Q_MORE; |
| return; |
| |
| } |
| |
| Int32 sendEmitWaitedReply(UdrGlobals *udrGlobals, |
| SPInfo *sp, |
| SqlBuffer *emitSqlBuffer, |
| NABoolean includesEOD) |
| { |
| Int32 result = 0; |
| const char *moduleName = "sendEmitWaitedReply"; |
| |
| NABoolean traceInvokeDataAreas = false; |
| if (udrGlobals->verbose_ && udrGlobals->showInvoke_ && |
| udrGlobals->traceLevel_ >= TRACE_DATA_AREAS) |
| traceInvokeDataAreas = true; |
| |
| if (traceInvokeDataAreas) |
| { |
| ServerDebug(""); |
| ServerDebug("[UdrServ (%s)] EMIT SQL Buffer", moduleName); |
| |
| displaySqlBuffer(emitSqlBuffer, sp->getReplyBufferSize()); |
| } |
| |
| //get datastream |
| UdrServerDataStream *dataStream = sp->getDataStream(); |
| |
| // construct a reply with flags indicating continue request |
| // needed. |
| UdrDataBuffer *reply = new (*dataStream, sp->getReplyBufferSize()) |
| UdrDataBuffer(sp->getReplyBufferSize(), UdrDataBuffer::UDR_DATA_OUT, NULL); |
| |
| // Before copying the sqlBuffer from current heap to the one in IPC stream, |
| // pack the buffer. |
| emitSqlBuffer->drivePack(); |
| |
| // memcpy the emitSqlBuffer into reply sqlBuffer that just got created |
| // inside UdrDataBuffer. We could avoid this memcpy. Not sure how? PV |
| memcpy(reply->getSqlBuffer(), emitSqlBuffer, sp->getReplyBufferSize()); |
| |
| if (includesEOD) |
| reply->setLastBuffer(TRUE); |
| |
| // indicate to client to send a continue request, we don't seem to |
| // need more input data right now |
| reply->setSendMoreData(FALSE); |
| |
| // set SPInfo state to INVOKED_EMITROWS for error checking purposes. |
| // Also helps debugging. |
| if(sp->getSPInfoState() != SPInfo::INVOKED) |
| { |
| return SQLUDR_ERROR; |
| } |
| sp->setSPInfoState(SPInfo::INVOKED_EMITROWS); |
| |
| if (includesEOD) |
| { |
| // return EOD to client and return without waiting |
| sendDataReply(udrGlobals, *dataStream, sp); |
| |
| sp->setSPInfoState(SPInfo::LOADED); |
| } |
| else |
| { |
| // Return data back to client and wait for a new message |
| // from the client. Restore context of this invocation |
| // once we have a new request. |
| result = PerformWaitedReplyToClient(udrGlobals, *dataStream); |
| |
| // reset SpInfo state back to INVOKED state. |
| if(sp->getSPInfoState() != SPInfo::INVOKED_EMITROWS) |
| { |
| return SQLUDR_ERROR; |
| } |
| sp->setSPInfoState(SPInfo::INVOKED); |
| } |
| |
| return result; |
| } |
| |
| |
| // This method is used for the older TMUDF C interface. It does not |
| // include an EOD entry in the SqlBuffer. We send a the buffer from |
| // the UDF without an EOD and ask for a continue message. The caller |
| // of the language manger "invokeRoutine()" method then sends the |
| // EOD in a separate reply. |
| Int32 SpInfoEmitRow (char *rowData, |
| Int32 tableIndex, |
| SQLUDR_Q_STATE *queue_state |
| ) |
| { |
| UdrGlobals *udrGlobals = UDR_GLOBALS; |
| |
| const char *moduleName = "SPInfo::SpInfoEmitRow"; |
| |
| doMessageBox(udrGlobals, |
| TRACE_SHOW_DIALOGS, |
| udrGlobals->showInvoke_, |
| moduleName); |
| |
| // Assumption here that there is always one SP when table mapping |
| // UDR is used. |
| SPInfo *sp = udrGlobals->getCurrSP(); |
| if(!sp) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return SQLUDR_ERROR; |
| } |
| |
| |
| // Access emit SQL buffer that corresponds to table index |
| SqlBuffer *emitSqlBuffer = sp->getEmitSqlBuffer(tableIndex); |
| |
| // if SQLUDR_Q_EOD is received as the very first emit, there |
| // is nothing much to do. We could avoid allocating emitSqlbuffer. |
| if((emitSqlBuffer == NULL ) && (*queue_state == SQLUDR_Q_EOD)) |
| return SQLUDR_SUCCESS; |
| |
| // If SQLUDR_Q_EOD is received and emitSqlBuffer is partially filled, |
| // then send the emitSqlBUffer to client in awaited call. Note that |
| // there is no need to insert a Q_NO_DATA tupple in the buffer. |
| // Q_NO_DATA tupple is set in the final reply from workTM(). |
| if((emitSqlBuffer != NULL ) && (*queue_state == SQLUDR_Q_EOD) && |
| (emitSqlBuffer->getTotalTuppDescs() > 0)) |
| { |
| Int32 status = sendEmitWaitedReply(udrGlobals, sp, emitSqlBuffer, FALSE); |
| |
| if(status == SQLUDR_ERROR) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return SQLUDR_ERROR; |
| } |
| |
| return SQLUDR_SUCCESS; |
| } |
| |
| if(*queue_state == SQLUDR_Q_EOD) |
| { |
| return SQLUDR_SUCCESS; |
| } |
| |
| // If we reach here means, atleast one row is available to be emitted. |
| if(emitSqlBuffer == NULL) |
| { |
| //allocate one and set it back in SPInfo. |
| NAHeap *udrHeap = udrGlobals->getUdrHeap(); |
| emitSqlBuffer = |
| (SqlBuffer*)udrHeap->allocateMemory(sp->getReplyBufferSize()); |
| |
| if (emitSqlBuffer == NULL) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return SQLUDR_ERROR; |
| } |
| |
| emitSqlBuffer->driveInit(sp->getReplyBufferSize(), FALSE, SqlBuffer::NORMAL_); |
| emitSqlBuffer->bufferInUse(); |
| sp->setEmitSqlBuffer(emitSqlBuffer, tableIndex); |
| } |
| |
| //Allocate a row inside replyBuffer. |
| char *replyData = NULL; |
| ControlInfo *replyControlInfo = NULL; |
| NABoolean replyRowAllocated = |
| allocateReplyRow(udrGlobals, |
| *emitSqlBuffer, // [IN] A reply buffer |
| sp->getParentIndex(), // [IN] Identifies the request queue entry |
| sp->getReplyRowSize(),// [IN] Length of reply row |
| replyData, // [OUT] The allocated reply row |
| replyControlInfo, // [OUT] The allocated ControlInfo entry |
| ex_queue::Q_OK_MMORE // [IN] Q_OK_MMORE, Q_NO_DATA, Q_SQLERROR |
| ); |
| |
| if (!replyRowAllocated) |
| { |
| // Since buffer is full send this buffer off to client and then continue. |
| Int32 status = sendEmitWaitedReply(udrGlobals, sp, emitSqlBuffer, FALSE); |
| |
| if(status == SQLUDR_ERROR) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return SQLUDR_ERROR; |
| } |
| |
| // Now that we got continue message back from client, lets continue |
| // filling up emitSqlbuffer again. |
| emitSqlBuffer->driveInit(sp->getReplyBufferSize(), FALSE, SqlBuffer::NORMAL_); |
| |
| replyRowAllocated = |
| allocateReplyRow(udrGlobals, |
| *emitSqlBuffer, // [IN] A reply buffer |
| sp->getParentIndex(), // [IN] Identifies the request queue entry |
| sp->getReplyRowSize(),// [IN] Length of reply row |
| replyData, // [OUT] The allocated reply row |
| replyControlInfo, // [OUT] The allocated ControlInfo entry |
| ex_queue::Q_OK_MMORE // [IN] Q_OK_MMORE, Q_NO_DATA, Q_SQLERROR |
| ); |
| |
| if(!replyRowAllocated) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return SQLUDR_ERROR; |
| } |
| } |
| memcpy(replyData, rowData, sp->getReplyRowSize()); |
| |
| /* |
| This piece of code emits every token, for testing purposes. |
| // Since buffer is full send this buffer off to client and then continue. |
| int status = sendEmitWaitedReply(udrGlobals, sp, emitSqlBuffer); |
| |
| if(status == SQLUDR_ERROR) |
| { |
| *queue_state = SQLUDR_Q_CANCEL; |
| return SQLUDR_ERROR; |
| } |
| |
| // Now that we got continue message back from client, lets continue |
| // filling up emitSqlbuffer again. |
| emitSqlBuffer->driveInit(sp->getReplyBufferSize(), FALSE, SqlBuffer::NORMAL_); |
| */ |
| return SQLUDR_SUCCESS; |
| |
| } |
| |
| // This method is used for the TMUDF C++ and Java interfaces. It can also be |
| // used to send an EOD row, which is included in the last reply buffer of |
| // regular results. The EOD call is done by the caller of the UDF code, |
| // not the UDF code itself. |
| void SpInfoEmitRowCpp(char *rowData, |
| Int32 tableIndex, |
| SQLUDR_Q_STATE *queue_state |
| ) |
| { |
| UdrGlobals *udrGlobals = UDR_GLOBALS; |
| |
| const char *moduleName = "SPInfo::SpInfoEmitRowCpp"; |
| |
| doMessageBox(udrGlobals, |
| TRACE_SHOW_DIALOGS, |
| udrGlobals->showInvoke_, |
| moduleName); |
| |
| // Assumption here that there is always one SP when table mapping |
| // UDR is used. |
| SPInfo *sp = udrGlobals->getCurrSP(); |
| if(!sp) |
| throw tmudr::UDRException(38900, "Missing SPInfo for this UDF"); |
| |
| LmRoutine *routine = static_cast<LmRoutine *>(sp->getLMHandle()); |
| |
| // Access emit SQL buffer that corresponds to table index |
| SqlBuffer *emitSqlBuffer = sp->getEmitSqlBuffer(tableIndex); |
| |
| //Allocate a row inside replyBuffer. |
| char *replyData = NULL; |
| ControlInfo *replyControlInfo = NULL; |
| NABoolean replyRowAllocated = FALSE; |
| int numRetries = 0; |
| |
| while (!replyRowAllocated) |
| { |
| if(emitSqlBuffer == NULL) |
| { |
| //allocate one and set it back in SPInfo. |
| NAHeap *udrHeap = udrGlobals->getUdrHeap(); |
| emitSqlBuffer = |
| (SqlBuffer*)udrHeap->allocateMemory(sp->getReplyBufferSize()); |
| |
| if (emitSqlBuffer == NULL) |
| throw tmudr::UDRException(38900, "Unable to allocate an emitRow SqlBuffer"); |
| |
| emitSqlBuffer->driveInit(sp->getReplyBufferSize(), FALSE, SqlBuffer::NORMAL_); |
| emitSqlBuffer->bufferInUse(); |
| sp->setEmitSqlBuffer(emitSqlBuffer, tableIndex); |
| } |
| |
| if (*queue_state != SQLUDR_Q_EOD) |
| replyRowAllocated = allocateReplyRow( |
| udrGlobals, |
| *emitSqlBuffer, // [IN] A reply buffer |
| sp->getParentIndex(), // [IN] Identifies the request queue entry |
| sp->getReplyRowSize(),// [IN] Length of reply row |
| replyData, // [OUT] The allocated reply row |
| replyControlInfo, // [OUT] The allocated ControlInfo entry |
| ex_queue::Q_OK_MMORE);// [IN] Q_OK_MMORE, Q_NO_DATA, Q_SQLERROR |
| else |
| replyRowAllocated = allocateEODRow( |
| udrGlobals, |
| *emitSqlBuffer, |
| sp->getParentIndex()); |
| |
| if (!replyRowAllocated) |
| if (numRetries++ < 1) |
| { |
| // Since buffer is full send this buffer off to client and then continue. |
| Int32 status = sendEmitWaitedReply(udrGlobals, |
| sp, |
| emitSqlBuffer, |
| FALSE); |
| |
| if(status == SQLUDR_ERROR) |
| throw tmudr::UDRException(38900, "Error in sending result buffer from TMUDF"); |
| |
| // Now that we got continue message back from client, lets continue |
| // filling up emitSqlbuffer again. |
| emitSqlBuffer->driveInit(sp->getReplyBufferSize(), FALSE, SqlBuffer::NORMAL_); |
| } |
| else |
| throw tmudr::UDRException( |
| 38900, |
| "Unable to allocate %d bytes in result SqlBuffer of size %d", |
| (int) sp->getReplyBufferSize(), |
| (int) sp->getReplyBufferSize()); |
| } |
| |
| if (routine && |
| routine->getLanguage() == COM_LANGUAGE_CPP) |
| { |
| LmRoutineCppObj *cppRoutine = static_cast<LmRoutineCppObj *>(routine); |
| |
| if (cppRoutine->getInvocationInfo()->getDebugFlags() & tmudr::UDRInvocationInfo::VALIDATE_WALLS) |
| cppRoutine->validateWalls(); |
| } |
| |
| if (replyData) |
| memcpy(replyData, rowData, sp->getReplyRowSize()); |
| else if (*queue_state == SQLUDR_Q_EOD) |
| { |
| // if the UDR signals EOD then send the partially filled buffer up |
| Int32 status = sendEmitWaitedReply(udrGlobals, sp, emitSqlBuffer, TRUE); |
| |
| if(status == SQLUDR_ERROR) |
| throw tmudr::UDRException(38900, "Error in sending buffer with EOD from TMUDF"); |
| } |
| } |
| |
| // C native methods called from Java TMUDFs via JNI |
| extern "C" { |
| |
| JNIEXPORT void JNICALL Java_org_trafodion_sql_udr_UDR_SpInfoGetNextRowJava( |
| JNIEnv *jni, |
| jobject, |
| jbyteArray rowData, |
| jint tableIndex, |
| jobject queueState) |
| { |
| LmLanguageManagerJava *javaLm = UDR_GLOBALS->getJavaLM(); |
| SQLUDR_Q_STATE queueStateLocal = SQLUDR_Q_MORE; |
| jbyte *rowElems = jni->GetByteArrayElements(rowData, NULL); |
| |
| SpInfoGetNextRow(reinterpret_cast<char *>(rowElems), |
| (Int32) tableIndex, |
| &queueStateLocal); |
| // free the JNI row buffer and copy back the data read |
| jni->ReleaseByteArrayElements(rowData, rowElems, 0); |
| |
| // pass the queue state back to the Java caller |
| jni->SetIntField(queueState, |
| static_cast<jfieldID>(javaLm->getUdrQueueStateField()), |
| static_cast<jint>(queueStateLocal)); |
| } |
| |
| JNIEXPORT void JNICALL Java_org_trafodion_sql_udr_UDR_SpInfoEmitRowJava( |
| JNIEnv *jni, |
| jobject, |
| jbyteArray rowData, |
| jint tableIndex, |
| jobject queueState) |
| { |
| SQLUDR_Q_STATE queueStateLocal = SQLUDR_Q_MORE; |
| jbyte *rowElems = jni->GetByteArrayElements(rowData, NULL); |
| |
| SpInfoEmitRowCpp(reinterpret_cast<char *>(rowElems), |
| (Int32) tableIndex, |
| &queueStateLocal); |
| // free the JNI row buffer without copying back any data |
| jni->ReleaseByteArrayElements(rowData, rowElems, JNI_ABORT); |
| } |
| |
| } // end extern "C" |