blob: 149b99f0f989cee91811d4dc608d1a33abd0f86d [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: UdrStreams.cpp
* Description: Server-side message stream classes for UDRs
* Created: 06/10/2001
* Language: C++
*
*
*****************************************************************************
*/
#include "UdrStreams.h"
#include "udrglobals.h"
#include "udrdefs.h"
#include "udrutil.h"
#include "sql_buffer.h"
#include "UdrDebug.h"
#include "spinfo.h"
#include "udrdecs.h"
#include "rosetta/rosgen.h"
#include "nsk/nskprocess.h"
#include "zsysc.h"
extern void processAnInvokeMessage(UdrGlobals *UdrGlob,
UdrServerDataStream &msgStream,
UdrDataBuffer &request);
extern void processAnRSFetchMessage(UdrGlobals *udrGlob,
UdrServerDataStream &msgStream,
UdrHandle udrHandle,
RSHandle rsHandle,
UdrDataBuffer *request);
extern void processAnRSContinueMessage(UdrGlobals *udrGlob,
UdrServerDataStream &msgStream,
UdrRSContinueMsg &request);
void UdrServerDataStream::actOnReceive(IpcConnection *conn)
{
const char *moduleName = "UdrServerDataStream::actOnReceive()";
IpcMessageObjType t;
NABoolean somethingArrived = FALSE;
//
// A note about control flow in this method. It was originally
// written under the assumption that only one message arrives at a
// time. It would call getNextReceiveMsg() only once, then process
// incoming objects, then return. It turns out this creates a memory
// leak. Internally within the stream, the incoming message buffer
// never moved from the "receive" list to the "in use" list and
// never got cleaned up. Now this method calls getNextReceiveMsg()
// until it returns FALSE. The result is that after the incoming
// message has been processed, the next getNextReceiveMsg() call
// will move the request buffer to the "in use" list where it later
// gets cleaned up by a call to cleanupBuffers() or
// releaseBuffers().
//
while (getNextReceiveMsg(t))
{
somethingArrived = TRUE;
while (getNextObjType(t))
{
switch (t)
{
case UDR_MSG_DATA_HEADER:
{
// Do nothing for now except extract the object from the stream
UdrDataHeader *h = new (receiveMsgObj()) UdrDataHeader(this);
} // case UDR_MSG_DATA_HEADER
break;
case UDR_MSG_CONTINUE_REQUEST:
{
// Extract the object from the stream and call SPInfo's work()
UdrContinueMsg *m = new (receiveMsgObj()) UdrContinueMsg(this);
switch (spinfo_->getParamStyle())
{
case COM_STYLE_SQLROW_TM:
case COM_STYLE_CPP_OBJ:
case COM_STYLE_JAVA_OBJ:
spinfo_->workTM();
break;
default:
spinfo_->work();
break;
}
} // case UDR_MSG_CONTINUE_REQUEST
break;
case UDR_MSG_DATA_REQUEST:
{
UdrDataBuffer *request = new (receiveMsgObj()) UdrDataBuffer(this);
if (udrGlob_->verbose_ &&
udrGlob_->traceLevel_ >= TRACE_DETAILS &&
udrGlob_->showMain_)
{
ServerDebug( "");
ServerDebug("[UdrServ (%s)] Invoke Request has %ld tupps.",
moduleName,
request->getSqlBuffer()->getTotalTuppDescs());
ServerDebug("[UdrServ (%s)] Invoke Request SQL Buffer",
moduleName);
displaySqlBuffer(request->getSqlBuffer(),
(Lng32) request->getSqlBufferLength());
}
udrGlob_->numReqInvokeSP_++;
// Let the SPInfo process the request
spinfo_->setCurrentRequest(request);
spinfo_->work();
} // case UDR_MSG_DATA_REQUEST
break;
case UDR_MSG_TMUDF_DATA_HEADER:
{
// Extract the object. UDR Handle and RS Handle are
// interesting things in this message.
UdrTmudfDataHeader *h = new (receiveMsgObj()) UdrTmudfDataHeader(this);
UdrHandle udrHandle = h->getHandle();
NABoolean anyMore = getNextObjType(t);
UDR_ASSERT(anyMore, "DATA_REQUEST must follow TMUDF DATA_HEADER");
UDR_ASSERT(t == UDR_MSG_DATA_REQUEST,
"DATA_REQUEST must follow TMUDF DATA_HEADER");
UdrDataBuffer *request = new (receiveMsgObj())
UdrDataBuffer(this,
FALSE); //Avoid unpack.
// Let the SPInfo process the request
spinfo_->setCurrentRequest(request);
spinfo_->workTM();
}
break;
case UDR_MSG_RS_DATA_HEADER:
{
// Extract the object. UDR Handle and RS Handle are
// interesting things in this message.
UdrRSDataHeader *h = new (receiveMsgObj()) UdrRSDataHeader(this);
UdrHandle udrHandle = h->getHandle();
RSHandle rsHandle = h->getRSHandle();
NABoolean anyMore = getNextObjType(t);
UDR_ASSERT(anyMore, "DATA_REQUEST must follow RS_DATA_HEADER");
UDR_ASSERT(t == UDR_MSG_DATA_REQUEST,
"DATA_REQUEST must follow RS_DATA_HEADER");
UdrDataBuffer *request = new (receiveMsgObj()) UdrDataBuffer(this);
udrGlob_->numReqRSFetch_++;
processAnRSFetchMessage(udrGlob_, *this, udrHandle,
rsHandle, request);
// We need the request buffer in a state where the stream
// knows it can be freed. We call SqlBuffer::bufferFull() to
// accomplish this even though the method name is a bit
// misleading.
SqlBuffer *sqlBuf = request->getSqlBuffer();
UDR_ASSERT(sqlBuf,
"UDR request buffer is corrupt or contains no data");
sqlBuf->bufferFull();
}
break;
case UDR_MSG_RS_CONTINUE:
{
UdrRSContinueMsg *rsContinue =
new (receiveMsgObj()) UdrRSContinueMsg(this);
udrGlob_->numReqRSContinue_++;
processAnRSContinueMessage(udrGlob_, *this, *rsContinue);
}
break;
default:
{
UDR_ASSERT(FALSE,
"Unknown message type arrived on UDR data stream");
} // default
break;
} // switch (t)
} // while (getNextObjType(t))
} // while (getNextReceiveMsg(t))
// Make sure all reply buffers have been given to the connection. If
// the only objects that arrived during this callback were continue
// requests, then this action allows reply buffers associated with
// those continue requests to propagate out of the stream and onto
// the connection.
//
// If numOfInputBuffers() is > 0 then we do not need to do anything
// at this point. This callback will be invoked again when the
// incoming message is complete and ready to be processed.
Lng32 numInputBuffers = numOfInputBuffers();
if (somethingArrived && numInputBuffers == 0 &&
spinfo_->getCurrentRequest() == NULL)
{
responseDone();
if (udrGlob_->verbose_ &&
udrGlob_->traceLevel_ >= TRACE_IPMS &&
udrGlob_->showMain_)
{
ServerDebug("[UdrServ (%s)] All messages marked as replied", moduleName);
}
// Cleanup any unpacked message buffers containing only objects
// that are no longer in use, as determined by the virtual method
// IpcMessageObj::msgObjIsFree()
releaseBuffers(); // Do final garbage collection
}
} // UdrServerDataStream::actOnReceive()
void
UdrGuaControlConnection::actOnSystemMessage(
short messageNum,
IpcMessageBufferPtr sysMsg,
IpcMessageObjSize sysMsgLen,
short clientFileNumber,
const GuaProcessHandle &clientPhandle,
GuaConnectionToClient *connection)
{
const char * moduleName = "UdrGuaControlConnection::actOnSystemMessage";
switch (messageNum)
{
case ZSYS_VAL_SMSG_OPEN:
{
if (udrGlob_->verbose_ &&
udrGlob_->traceLevel_ >= TRACE_IPMS &&
udrGlob_->showMain_)
{
ServerDebug(
"[UdrServ (%s)] A new connection %p is opened to the Server.",
moduleName,
connection);
}
// Create a new message stream. Link the connection.
// Then call receive on it to bring it into receiving mode.
UdrServerControlStream *newControlStream =
new (udrGlob_->getIpcHeap())
UdrServerControlStream(udrGlob_->getIpcEnvironment(),
udrGlob_,
UDR_STREAM_SERVER_CONTROL,
UdrServerControlStreamVersionNumber);
newControlStream->addRecipient(connection);
newControlStream->receive(FALSE);
}
break;
default:
// do nothing for all other kinds of system messages
break;
} // switch
// The parent class already handles the job of closing all connections
// who lost their client process by failed processes, failed CPUs and
// failed systems or networks. Check here that we die if all our
// requestors go away, but don't die if the first system message is
// something other than an OPEN message.
if (getNumRequestors() == 0 AND initialized_)
{
NAExit(0);
}
else if (NOT initialized_ AND getNumRequestors() > 0)
{
// the first requestor came in
initialized_ = TRUE;
}
} // UdrGuaControlConnection::actOnSystemMessage()
void
UdrServerControlStream::actOnReceive(IpcConnection *conn)
{
const char *moduleName = "UdrServerControlStream::actOnReceive";
NABoolean doTrace = (udrGlob_->verbose_ &&
udrGlob_->traceLevel_ >= TRACE_IPMS &&
udrGlob_->showMain_) ? TRUE : FALSE;
if (doTrace)
{
ServerDebug("[UdrServ (%s)]", moduleName);
ServerDebug(" A new message arrived on connection %p.", conn);
}
// Create a new stream and give the message to that stream
UdrServerReplyStream *newReplyStream =
new (udrGlob_->getUdrHeap()) UdrServerReplyStream(
udrGlob_->getIpcEnvironment(),
udrGlob_,
UDR_STREAM_SERVER_REPLY,
UdrServerReplyStreamVersionNumber);
giveMessageTo(*newReplyStream, conn);
// Add the stream to Udr Globals list of streams
udrGlob_->addReplyStream(newReplyStream);
if (doTrace)
{
ServerDebug(" The new message is scheduled for processing.");
ServerDebug(" The connection %p will be placed on receive mode.",
conn);
ServerDebug("[UdrServ (%s)] Done", moduleName);
}
// We have retrieved the message from stream, make it ready for
// next message.
receive(FALSE);
} // UdrServerControlStream::actOnReceive()
void UdrServerDataStream::activateCurrentMsgTransaction()
{
udrGlob_->getControlConnection()->setUserTransReplyTag(getReplyTag());
return;
}
void UdrServerReplyStream::routeMessage(UdrServerDataStream &other)
{
// Store reply tag in data stream before routing message to it.
other.setReplyTag(getReplyTag());
giveReceiveMsgTo(other);
}
void UdrServerReplyStream::activateCurrentMsgTransaction()
{
udrGlob_->getControlConnection()->setUserTransReplyTag(getReplyTag());
return;
}