blob: e668a5591af92c12704db93b94a4272b21507135 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: IpcSockets.cpp
* Description: OS related code for socket-based IPC (see Ipc.h)
*
* Created: 10/6/95
* Language: C++
*
*
*****************************************************************************
*/
// -----------------------------------------------------------------------
#include "Platform.h"
#include "copyright.h"
// At this point we don't really want to use sockets on NSK because
// of problems with C/C++ runtime. Set this define to comment out all
// offending code (and maybe a little more than that)
// Uncomment the next line to debug IPC problems (log of client's I/O)
// #define LOG_IPC
#ifdef LOG_IPC
void IpcSockLogTimestamp(Int32 fdesc); // see bottom of file
#endif
#ifndef DISABLE_SOCKET_IPC
// for sockaddr struct and flags passed to socket system calls
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
const Int32 SOCKET_ERROR = -1;
typedef size_t length_t;
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <iomanip>
#endif /* DISABLE_SOCKET_IPC */
#include "Collections.h"
#include "Ipc.h"
#include "ComDiags.h"
#include "str.h"
// comment this out once NAHeap.h is in common
// #include "NAHeap.h"
// for now declare overloaded operator new here
void * operator new(size_t size, CollHeap* h);
#ifdef DISABLE_SOCKET_IPC
// -----------------------------------------------------------------------
// Stubs for configurations where socket IPC is disabled
// -----------------------------------------------------------------------
SockSocket::SockSocket(IpcEnvironment *env)
{
ABORT("Stub for SockSocket::SockSocket()");
}
SockSocket::~SockSocket()
{
ABORT("Stub for SockSocket::~SockSocket()");
}
SockIPAddress::SockIPAddress()
{
ABORT("Stub for SockIPAddress::SockIPAddress()");
}
SockConnection::SockConnection(IpcEnvironment *env,
const IpcProcessId &serviceProcId,
NABoolean thisIsTheControlConnection,
const char *eye) :
IpcConnection(env,serviceProcId,eye), sock_(env), ioq_(env->getHeap())
{
ABORT("Stub for SockConnection::SockConnection()");
}
// also need to stub out all virtual methods of SockConnection,
// constructor needs them
SockConnection::~SockConnection()
{
ABORT("Stub for SockConnection::~SockConnection()");
}
void SockConnection::send(IpcMessageBuffer *buffer)
{
ABORT("Stub for void SockConnection::send(IpcMessageBuffer *buffer)");
}
void SockConnection::receive(IpcMessageStreamBase *msg)
{
ABORT("Stub for void SockConnection::receive(IpcMessageStreamBase *msg)");
}
WaitReturnStatus SockConnection::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox)
{
ABORT("Stub for short SockConnection::wait(IpcTimeout timeout)");
return WAIT_INTERRUPT;
}
SockConnection * SockConnection::castToSockConnection()
{
ABORT("Stub for SockConnection *SockConnection::castToSockConnection()");
return NULL;
}
Int32 SockConnection::numQueuedSendMessages()
{
ABORT("Stub for int SockConnection::numQueuedSendMessages()");
return 0;
}
Int32 SockConnection::numQueuedReceiveMessages()
{
ABORT("Stub for int SockConnection::numQueuedReceiveMessages()");
return 0;
}
void SockConnection::populateDiagsArea(ComDiagsArea *&diags,
CollHeap *diagsHeap)
{
ABORT("Stub for void SockConnection::populateDiagsArea()");
}
SockControlConnection::SockControlConnection(IpcEnvironment *env, const char *eye) :
IpcControlConnection(IPC_DOM_INTERNET, eye), listnerSocket_(env)
{
ABORT("Stub for SockControlConnection::SockControlConnection()");
}
IpcConnection * SockControlConnection::getConnection() const
{
ABORT("Stub for SockControlConnection::getConnection()");
return NULL;
}
SockControlConnection * SockControlConnection::castToSockControlConnection()
{
ABORT("Stub for SockControlConnection::castToSockControlConnection()");
return NULL;
}
void SockControlConnection::acceptNewConnectionRequest(SockConnection *conn)
{
ABORT("Stub for SockControlConnection::acceptNewConnectionRequest()");
}
IpcConnection * IpcServerClass::createInternetProcess(
ComDiagsArea **diags,
CollHeap *diagsHeap,
const char *nodeName,
const char *className,
IpcCpuNum /*cpuNum (sorry, no support for SMPs yet)*/,
NABoolean /*usesTransactions (sorry, no transactions in cyberspace*/,
SockPortNumber defaultPortNumber)
{
ABORT("Stub for IpcServerClass::createInternetProcess()");
return NULL;
}
IpcConnection * IpcServerClass::forkProcess(ComDiagsArea **diags,
CollHeap *diagsHeap,
const char * /*nodeName*/,
const char *className,
IpcCpuNum /*cpuNum*/,
NABoolean /*usesTransactions*/)
{
ABORT("Stub for IpcServerClass::forkProcess()");
return NULL;
}
SockErrNo SockIPAddress::set(const char *hostName)
{
ABORT("Stub for SockIPAddress::set()");
return 0;
}
#else
// -----------------------------------------------------------------------
// The REAL methods
// -----------------------------------------------------------------------
// -----------------------------------------------------------------------
// Methods for class SockErrNo
// -----------------------------------------------------------------------
Int32 SockErrNo::setFromerrno()
{
#ifdef NA_ERRNO_AS_PROCEDURE
... insert code for GUARDIAN style errno here
#else
errno_ = errno;
#endif
return errno_;
}
// -----------------------------------------------------------------------
// Methods for class SockIPAddress
// -----------------------------------------------------------------------
SockIPAddress::SockIPAddress()
{
// IP address 0.0.0.0 is an invalid (wildcard) IP address
a_.ipAddress_[0] = 0;
a_.ipAddress_[1] = 0;
a_.ipAddress_[2] = 0;
a_.ipAddress_[3] = 0;
}
SockIPAddress::SockIPAddress(const struct sockaddr &sa)
{
a_.ipAddress_[0] = sa.sa_data[2];
a_.ipAddress_[1] = sa.sa_data[3];
a_.ipAddress_[2] = sa.sa_data[4];
a_.ipAddress_[3] = sa.sa_data[5];
}
SockErrNo SockIPAddress::set(const char *hostName)
{
SockErrNo result;
struct hostent *hostEnt = NULL;
// initialize the address to something
a_.ipAddress_[0] = 0;
a_.ipAddress_[1] = 0;
a_.ipAddress_[2] = 0;
a_.ipAddress_[3] = 0;
if (hostName != NULL)
hostEnt = gethostbyname(hostName);
else
{
char me[IpcNodeNameMaxLength];
if (gethostname(me,IpcNodeNameMaxLength) < 0)
result.setFromerrno();
else
hostEnt = gethostbyname(me);
}
// could use errno_h to get better error reporting
if (hostEnt == NULL)
result = ENOENT;
if (NOT result.hasError())
{
// hostEnt->h_addr_list points to an array of pointers to addresses,
// where each address is an array of chars (in network order)
char *addr = hostEnt->h_addr_list[0];
if (addr == NULL)
result = ENXIO; // addr pointer was bad
else
{
a_.ipAddress_[0] = addr[0];
a_.ipAddress_[1] = addr[1];
a_.ipAddress_[2] = addr[2];
a_.ipAddress_[3] = addr[3];
}
}
return result;
}
// -----------------------------------------------------------------------
// Methods for class SockService
// -----------------------------------------------------------------------
SockService::SockService(const char *serviceName,
SockPortNumber defaultPortNumber)
{
struct servent *servEnt;
servEnt = getservbyname(serviceName,NULL);
if (servEnt == NULL)
{
// Service not found, using default port number
assert(defaultPortNumber != NoSockPortNumber);
portNum_ = defaultPortNumber;
}
else
portNum_ = servEnt->s_port;
// don't delete servEnt, it is owned by the system
}
// -----------------------------------------------------------------------
// Methods for class SockSocket
// -----------------------------------------------------------------------
SockSocket::SockSocket(IpcEnvironment *env)
{
environment_ = env;
expectedBytes_ =
receivedBytesSoFar_ = 0;
partiallyReceivedBuffer_ = NULL;
// create an unbound socket for the TCP/IP protocol
fdesc_ = socket(AF_INET, // domain = IP
SOCK_STREAM, // type = stream
SockTCPProtocol); // protocol = TCP
if (fdesc_ < 0)
setFromerrno("creating an unbound socket with socket()");
}
SockSocket::SockSocket(SockFdesc fd, IpcEnvironment *env)
{
environment_ = env;
expectedBytes_ =
receivedBytesSoFar_ = 0;
partiallyReceivedBuffer_ = NULL;
fdesc_ = fd;
}
SockSocket::~SockSocket()
{
if (fdesc_ >= 0)
close(fdesc_);
}
SockPortNumber SockSocket::bindOrConnect(const SockIPAddress &ipAddr,
SockPortNumber port,
NABoolean bindOnly)
{
struct sockaddr targetSockAddr;
SockPortNumber result = port;
Int32 retcode;
lastError_.clear();
// doing everything byte by byte takes care of endianness problems
// (just in case you don't like this code)
targetSockAddr.sa_family = AF_INET;
targetSockAddr.sa_data[0] = ((UInt32)port) / 256;
targetSockAddr.sa_data[1] = ((UInt32)port) % 256;
targetSockAddr.sa_data[2] = ipAddr.a_.ipAddress_[0];
targetSockAddr.sa_data[3] = ipAddr.a_.ipAddress_[1];
targetSockAddr.sa_data[4] = ipAddr.a_.ipAddress_[2];
targetSockAddr.sa_data[5] = ipAddr.a_.ipAddress_[3];
for (Int32 i = 6; i < 14; i++)
targetSockAddr.sa_data[i] = 0;
// this code depends on the definition of sockaddr, make sure
// we don't do the wrong thing
assert(sizeof(targetSockAddr) == 16);
// if we want to accept incoming connections, then the bind system
// call is the right thing to do now, if we want to connect to a server
// then the connect system call is what we want
if (bindOnly)
{
// the bind call just binds the socket to the specified port,
// it doesn't wait for a connection to come in (see listen and accept)
// NOTE: we want to call the system calls bind and connect, not
// the member functions with the same names
// NOTE: the port number of the socket gets altered by the call
retcode = ::bind(fdesc_,&targetSockAddr,sizeof(targetSockAddr));
}
else
{
// the connect system call actually starts up a connection; once
// the call succeeds we can send and receive
// NOTE: the port number of the socket gets altered by the call
retcode = ::connect(fdesc_,&targetSockAddr,sizeof(targetSockAddr));
}
if (retcode == SOCKET_ERROR)
setFromerrno("::bind() or ::connect()");
// return the port that the socket is now bound to
socklen_t namelen = sizeof(targetSockAddr);
retcode = ::getsockname(fdesc_,&targetSockAddr,&namelen);
if (retcode == SOCKET_ERROR)
setFromerrno("::getsockname()");
return (unsigned char)targetSockAddr.sa_data[0] * 256 + (unsigned char)targetSockAddr.sa_data[1];
}
SockPortNumber SockSocket::listen(SockPortNumber port)
{
lastError_.clear();
// bind the socket to the given port number or, if NoSockPortNumber is
// specified, to any port
SockPortNumber result = bind(SockIPAddress(),port);
if (NOT lastError_.hasError())
{
// prepare the socket for ::accept() calls by calling ::listen()
if (::listen(fdesc_,5) < 0)
setFromerrno("::listen()");
}
else
{
result = NoSockPortNumber;
}
return result;
}
SockPortNumber SockSocket::receiveListnerPortNum()
{
// read the message "Listening to port xxxxxxxxxxx\n" from the
// socket to the newly created server and decode the xxxxxxxxxxx to
// ascii
// Listening to port xxxxxxxxxxx\n
// 012345678901234567890123456789
const Int32 numCharsInListnerMsg = 30;
const Int32 numCharsInPrefix = 18;
char listnerMsg[numCharsInListnerMsg];
// wait for the server to send the port number and read it into a buffer
Int32 receivedBytes = recv(fdesc_,listnerMsg,numCharsInListnerMsg,0);
// make sure we got the right message
short temp = str_cmp(listnerMsg,"Listening to port ",numCharsInPrefix);
assert(receivedBytes == numCharsInListnerMsg AND
temp == 0);
// decode the ascii number
SockPortNumber result = atoi(&listnerMsg[numCharsInPrefix]);
# ifdef LOG_IPC
IpcSockLogTimestamp(fdesc_);
cerr << "server listens to port " << result << endl;
# endif
return result;
}
void SockSocket::assignToStdInOut()
{
lastError_.clear();
// assign the socket to stdin
ABORT ("Not Implemented or Needed in current milestone");
// now set the new file descriptor number in the
// object to stdin (Note: server program MUST NOT use stdin
// for other purposes than IPC procedures)
fdesc_ = SockStdin;
# ifdef LOG_IPC
IpcSockLogTimestamp(SockStdin);
cerr << "Assigned stdin to client conn." << endl;
# endif
}
NABoolean SockSocket::send(IpcMessageBuffer *message,
IpcTimeout timeout)
{
IpcMessageObjSize bytesToSend = message->getMessageLength();
IpcMessageObjSize bytesSentSoFar = 0;
Lng32 bytesSentThisTime;
lastError_.clear();
// ---------------------------------------------------------------------
// If a timeout is specified, do a select first before receiving data
// NOTE: we wait <timeout> 10ms intervals each time we go through
// the loop, although we should wait only a total or <timeout> intervals
// (usually, once a message head is coming in, the tail is not far away)
// ---------------------------------------------------------------------
if (timeout != IpcInfiniteTimeout)
{
// two structs from sys/time.h and sys/types.h to describe the
// timeout in seconds/microseconds to wait and a bitmap of file
// descriptors
timeval timeOutVal;
fd_set readyDescriptors;
// make a bitmap with the bit for fdesc_ set to 1
FD_ZERO(&readyDescriptors);
FD_SET(fdesc_,&readyDescriptors);
// store the timeout value in timeOutVal
timeOutVal.tv_sec = timeout / 100;
timeOutVal.tv_usec = timeout % 100;
Int32 retcode =
select(FD_SETSIZE,NULL,&readyDescriptors,NULL,&timeOutVal);
if (retcode < 0)
{
// indicate an error by returning TRUE and setting the error status
setFromerrno("::select() for send");
return TRUE;
}
if (retcode == 0)
{
// the timeout expired, return without receiving something
return FALSE;
}
} // non-infinite timeout
// ---------------------------------------------------------------------
// call ::send until all of the data is sent (do it waited from here)
// ---------------------------------------------------------------------
while (bytesToSend > 0 AND NOT lastError_.hasError())
{
bytesSentThisTime = ::send(fdesc_,
message->data(bytesSentSoFar),
(Int32) bytesToSend,
0);
if (bytesSentThisTime <= 0)
{
setFromerrno("::send()");
}
else
{
// prepare for sending the next chunk or for leaving the loop
bytesToSend -= bytesSentThisTime;
bytesSentSoFar += (IpcMessageObjSize) bytesSentThisTime;
}
}
# ifdef LOG_IPC
IpcSockLogTimestamp(fdesc_);
IpcMessageObj *firstUserObj = (IpcMessageObj *)
(message->data(sizeof(InternalMsgHdrInfoStruct)));
cerr << "sent " << bytesSentSoFar << " bytes, objtype "
<< firstUserObj->getType() << endl;
# endif
// we did send it
return TRUE;
}
NABoolean SockSocket::receive(IpcMessageBuffer * &message,
IpcTimeout timeout)
{
// receive buffer for socket (may be stack variable or alloc. buffer)
IpcMessageBufferPtr sockReceiveBuffer;
// copy of the message header on the stack
char header[sizeof(InternalMsgHdrInfoStruct)];
// variable that remembers how many bytes came in the message
Lng32 receivedBytesThisTime;
// the actual message length (initialize to header length until we know it)
if (receivedBytesSoFar_ == 0)
{
// start over with a fresh receive operation
expectedBytes_ = sizeof(InternalMsgHdrInfoStruct);
}
// the user may specify a special receive buffer, but not multiple
// receive buffers for the same message
if (partiallyReceivedBuffer_ == NULL AND message != NULL)
partiallyReceivedBuffer_ = message;
assert(message == NULL OR partiallyReceivedBuffer_ == message);
// if no buffer is allocated by the caller, use the on-stack header
// for the initial chunk
if (partiallyReceivedBuffer_)
sockReceiveBuffer = partiallyReceivedBuffer_->data();
else
sockReceiveBuffer = header;
// each call clears the error info and a length is only returned when
// the message has arrived completely and the return value is TRUE
lastError_.clear();
// ---------------------------------------------------------------------
// Keep on receiving in a loop until we have read all of the message
// (and no more than the message).
// ---------------------------------------------------------------------
while (expectedBytes_ > receivedBytesSoFar_ AND NOT lastError_.hasError())
{
// ---------------------------------------------------------------------
// If a timeout is specified, do a select first before receiving data
// NOTE: we wait <timeout> 10ms intervals each time we go through
// the loop, although we should wait only a total or <timeout> intervals
// (usually, once a message head is coming in, the tail is not far away)
// ---------------------------------------------------------------------
if (timeout != IpcInfiniteTimeout)
{
// two structs from sys/time.h and sys/types.h to describe the
// timeout in seconds/microseconds to wait and a bitmap of file
// descriptors
timeval timeOutVal;
fd_set readyDescriptors;
// make a bitmap with the bit for fdesc_ set to 1
FD_ZERO(&readyDescriptors);
FD_SET(fdesc_,&readyDescriptors);
// store the timeout value in timeOutVal
timeOutVal.tv_sec = timeout / 100;
timeOutVal.tv_usec = timeout % 100;
timeval *tt = (timeval*)&timeOutVal;
Int32 retcode =
select(FD_SETSIZE,&readyDescriptors,NULL,NULL,tt);
if (retcode < 0)
{
// indicate an error by returning TRUE and length 0
setFromerrno("::select for receive");
// indicate the error by returning a zero-length message
if (partiallyReceivedBuffer_ != NULL)
{
partiallyReceivedBuffer_->setMessageLength(0);
message = partiallyReceivedBuffer_;
}
partiallyReceivedBuffer_ = NULL;
expectedBytes_ = 0;
receivedBytesSoFar_ = 0;
return TRUE;
}
if (retcode == 0)
{
// the timeout expired, return without receiving something
return FALSE;
}
} // non-infinite timeout
receivedBytesThisTime = recv(
fdesc_,
&sockReceiveBuffer[receivedBytesSoFar_],
(Int32) (expectedBytes_ - receivedBytesSoFar_),
0);
if (receivedBytesThisTime <= 0)
{
setFromerrno("::recv()");
}
else
{
// if we completely received the header then increase the expected
// message length to the actual value sent in the header
if (receivedBytesSoFar_ + receivedBytesThisTime ==
sizeof(InternalMsgHdrInfoStruct))
{
// a pointer to the message header
InternalMsgHdrInfoStruct *headerPtr =
(InternalMsgHdrInfoStruct *) sockReceiveBuffer;
#ifdef NA_LITTLE_ENDIAN
// need to turn around bytes on a little-endian machine,
// since the header info is always sent in big-endian format
headerPtr->turnByteOrder();
#endif
// read the total message length out of the received header
expectedBytes_ = headerPtr->totalLength_;
// allocate a message buffer of maxLength unless one was
// provided already by the caller (caller becomes new owner
// of the buffer)
if (sockReceiveBuffer == header)
{
CollHeap *heap =
(environment_ ? environment_->getHeap() : NULL);
// we used the provisional buffer for the receive,
// go allocate the real buffer with the correct length
partiallyReceivedBuffer_ = IpcMessageBuffer::allocate(
expectedBytes_,
NULL, // don't know the IpcMessageStream object here
heap,
0);
// the actual receive buffer starts a few bytes further up
sockReceiveBuffer = partiallyReceivedBuffer_->data();
// copy the received header into the allocated buffer
str_cpy_all(sockReceiveBuffer,
header,
sizeof(InternalMsgHdrInfoStruct));
}
else
{
// if a buffer was supplied then it better be big enough
assert(partiallyReceivedBuffer_->getBufferLength() >=
expectedBytes_);
}
} // message header received
// prepare for receiving the next chunk or for leaving the loop
receivedBytesSoFar_ += receivedBytesThisTime;
} // no receive error
} // while expectedBytes_ > receivedBytesSoFar_
// at this point we are done with a message or we got a bad error
if (lastError_.hasError())
{
// indicate the error by returning a zero-length message
if (partiallyReceivedBuffer_ != NULL)
partiallyReceivedBuffer_->setMessageLength(0);
}
else
{
// set message length in the buffer
partiallyReceivedBuffer_->setMessageLength(expectedBytes_);
}
# ifdef LOG_IPC
IpcSockLogTimestamp(fdesc_);
IpcMessageObj *firstUserObj = (IpcMessageObj *)
(partiallyReceivedBuffer_->data(sizeof(InternalMsgHdrInfoStruct)));
cerr << "received " << receivedBytesSoFar_ << " bytes, objtype "
<< firstUserObj->getType() << endl;
# endif
// indicate we are ready for the next message
expectedBytes_ = 0;
receivedBytesSoFar_ = 0;
message = partiallyReceivedBuffer_;
partiallyReceivedBuffer_ = NULL;
return TRUE;
}
NABoolean SockSocket::accept(SockFdesc &fdesc, IpcTimeout timeout)
{
// ---------------------------------------------------------------------
// If a timeout is specified, do a select first before receiving data
// NOTE: we wait <timeout> 10ms intervals each time we go through
// the loop, although we should wait only a total or <timeout> intervals
// (usually, once a message head is coming in, the tail is not far away)
// ---------------------------------------------------------------------
if (timeout != IpcInfiniteTimeout)
{
// two structs from sys/time.h and sys/types.h to describe the
// timeout in seconds/microseconds to wait and a bitmap of file
// descriptors
timeval timeOutVal;
fd_set readyDescriptors;
// make a bitmap with the bit for fdesc_ set to 1
FD_ZERO(&readyDescriptors);
FD_SET(fdesc_,&readyDescriptors);
// store the timeout value in timeOutVal
timeOutVal.tv_sec = timeout / 100;
timeOutVal.tv_usec = timeout % 100;
Int32 retcode =
select(FD_SETSIZE,&readyDescriptors,NULL,NULL,&timeOutVal);
if (retcode < 0)
{
// indicate an error by returning TRUE and setting the error
setFromerrno("::select for accept");
fdesc = InvalidFdesc;
return TRUE;
}
if (retcode == 0)
{
// the timeout expired, return without receiving something
return FALSE;
}
} // non-infinite timeout
struct sockaddr clientSockAddr;
socklen_t addrlen = sizeof(clientSockAddr);
// call the accept() system call, waiting until a client wants to connect
Int32 retcode = ::accept(fdesc_,&clientSockAddr,&addrlen);
if (retcode >= 0)
{
// for successful calls, accept() returns the file descriptor
// of a newly created socket that can be used to communicate to
// the client (the existing socket continues to be the lisner
// port, no real messages are ever sent on it)
fdesc = retcode;
# ifdef LOG_IPC
IpcSockLogTimestamp(fdesc);
SockIPAddress clientIPAddr(clientSockAddr);
SockPortNumber clientPort = clientSockAddr.sa_data[0] * 256 +
clientSockAddr.sa_data[1];
IpcProcessId clientPid(clientIPAddr,clientPort);
char clientPidAsString[200];
clientPid.toAscii(clientPidAsString,200);
cerr << "accepted connection from " << clientPidAsString << endl;
# endif
}
else
{
// a negative returncode indicates an error
fdesc = InvalidFdesc;
setFromerrno("::accept()");
}
// something happened, either an error or a new client request
return TRUE;
}
SockErrNo SockSocket::setFromerrno(const char *msg)
{
lastError_.setFromerrno();
# ifdef LOG_IPC
IpcSockLogTimestamp(fdesc_);
cerr << "socket error " << lastError_.geterrno() << ", msg: " << msg << endl;
# endif
return lastError_;
}
// -----------------------------------------------------------------------
// Methods for class SockConnection
// -----------------------------------------------------------------------
SockConnection::SockConnection(IpcEnvironment *env,
const IpcProcessId &serviceProcId,
NABoolean thisIsTheControlConnection,
const char *eye) :
IpcConnection(env,serviceProcId, eye), sock_(env),
isClient_(TRUE), lastReplyTag_(0), ioq_(env->getHeap())
{
port_ = sock_.connect(serviceProcId.getIPAddress(),
serviceProcId.getPortNumber());
if (thisIsTheControlConnection)
{
// The process id that was passed in specified a service for
// inetd. A new process got created and opened a new port on
// which it listens for new requests. Catch the message of
// the new process where it announces its listner port and
// set that listner port in the server process id. This makes
// the server process id unique and allows other processes to
// connect to the server by calling the same constructor, but
// with the thisIsTheControlConnection parameter set to FALSE.
setOtherEnd(IpcProcessId(serviceProcId.getIPAddress(),
sock_.receiveListnerPortNum()));
}
else
{
// we already got the server process id with the listner port
// set; no new process got created, use the original process id
}
if (NOT sock_.hasError())
{
// a connection was established to the specified port,
// this is the client side
setState(ESTABLISHED);
# ifdef LOG_IPC
IpcSockLogTimestamp(sock_.getFdesc());
char serverPidAsString[200];
serviceProcId.toAscii(serverPidAsString,200);
cerr << "connected to server " << serverPidAsString
<< " on port " << port_ << endl;
# endif
}
else
{
// it blew
setErrorInfo(sock_.getError().geterrno());
setState(ERROR_STATE);
}
}
SockConnection::SockConnection(
IpcEnvironment *env,
SockFdesc fdesc,
NABoolean isClient,
const char *eye) : IpcConnection(env,IpcProcessId(), eye),
sock_(fdesc,env), port_(NoSockPortNumber),
isClient_(isClient), lastReplyTag_(0),
ioq_(env->getHeap())
{
if (fdesc != InvalidFdesc)
{
if (isClient)
setState(ESTABLISHED); // client may send right now
else
setState(REPLY_PENDING); // sender needs to wait for client's message
}
}
SockConnection::~SockConnection()
{
// deallocate ioq entries
for (Int32 i = 0; i < (Int32) ioq_.entries(); i++)
{
getEnvironment()->getHeap()->deallocateMemory(ioq_[i]);
}
}
void SockConnection::send(IpcMessageBuffer *buffer)
{
socketIOQueueEntry *sendEntry = NULL;
short replyTag;
if (isClient_)
{
// if the client sends, add a new entry to the I/O queue since
// the send operation starts a new round trip
sendEntry = new(getEnvironment()->getHeap()) socketIOQueueEntry;
ioq_.insert(sendEntry);
// increment reply tag
// (wrap around at some arbitrary number that is large enough)
lastReplyTag_++;
if (lastReplyTag_ > 4711)
lastReplyTag_ = 0;
replyTag = lastReplyTag_;
sendEntry->msg_ = buffer->getMessageStream();
sendEntry->recvBuffer_ = NULL;
sendEntry->replyTag_ = lastReplyTag_;
sendEntry->receiving_ = FALSE;
}
else
{
// for the server, a send operation is actually a reply and it
// completes the operation; find the request with the correct
// reply tag
replyTag = buffer->getReplyTag();
// find the correct reply tag in the I/O queue
for (Int32 i = 0; i < (Int32) ioq_.entries() AND sendEntry == NULL; i++)
{
if (ioq_[i]->replyTag_ == replyTag)
sendEntry = ioq_[i];
}
assert(sendEntry AND sendEntry->recvBuffer_ == NULL AND
NOT sendEntry->receiving_);
}
// Put the reply tag into the message itself. A little note: there are
// three places where we store reply tags: in the I/O queue entry,
// in the IpcMessageBuffer header (the part that doesn't get sent in
// a message) and in the message header that does get sent. In NSK there
// is no need to send a reply tag, it is maintained by NSK, so we
// need only two places in NSK. We always keep the IpcMessageBuffer
// reply tag consistent with the one in the message header, and we
// use the reply tag to find the correct I/O queue slot when we
// receive in the client or when we send (reply) in the server. For
// a socket connection, a reply tag is generated when the client sends,
// it is then sent to the server and finally sent back to the client
// where it is used to match the reply with the original send.
((InternalMsgHdrInfoStruct *) buffer->data())->sockReplyTag_ = replyTag;
// set I/O queue entry
sendEntry->sent_ = FALSE;
sendEntry->sendBuffer_ = buffer;
// indicate that I/O operations are pending, connection is active
setState(SENDING);
// try to send this message or another, earlier, message
tryToSendMore();
// NOTE: send queue of the base class is not used
}
void SockConnection::receive(IpcMessageStreamBase *msg)
{
socketIOQueueEntry *receiveEntry = NULL;
if (isClient_)
{
// in the client, find the send I/O that was issued by this message
// stream
for (Int32 i = 0; i < (Int32)ioq_.entries() AND receiveEntry == NULL; i++)
{
if (ioq_[i]->msg_ == msg)
receiveEntry = ioq_[i];
}
// make sure there is an entry that has been sent already
assert(receiveEntry AND receiveEntry->sent_ AND
NOT receiveEntry->receiving_ AND
receiveEntry->sendBuffer_ == NULL);
}
else
{
// in the server, add a new entry to the I/O queue
receiveEntry = new(getEnvironment()->getHeap()) socketIOQueueEntry;
ioq_.insert(receiveEntry);
// set IO queue entry
receiveEntry->sent_ = FALSE;
receiveEntry->msg_ = msg;
receiveEntry->sendBuffer_ = NULL;
receiveEntry->recvBuffer_ = NULL;
}
// set rest of IO queue entry
receiveEntry->receiving_ = TRUE;
setState(RECEIVING);
// we are now at a state where wait() will be able to complete the
// receive I/O
}
WaitReturnStatus SockConnection::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox)
{
// try to send some more messages
tryToSendMore(); // $$$$ use of zero timeout is not ideal
// loop over the I/O queue and try to receive some messages and try
// to call callbacks
for (Int32 i = 0; i < (Int32)ioq_.entries(); i++)
{
socketIOQueueEntry *e = ioq_[i];
if (e->receiving_)
{
// this is an entry that is receiving, try to get a message
// buffer from the socket (but in the client this will not
// necessarily be the one that is associated with this IO
// queue entry)
IpcMessageBuffer *buffer = NULL;
NABoolean completed = sock_.receive(buffer,timeout);
if (sock_.hasError())
{
// it blew
setErrorInfo(sock_.getError().geterrno());
setState(ERROR_STATE);
ABORT("Can't handle this error via ComDiags");
}
if (completed)
{
// find the IO queue entry that is associated with the
// received buffer
// get the sent reply tag from the message header
short replyTag =
((InternalMsgHdrInfoStruct *) buffer->data())->sockReplyTag_;
if (isClient_)
{
// in the client, use the reply tag to find the associated
// entry
NABoolean found = FALSE;
for (Int32 j = 0; j < (Int32)ioq_.entries() AND NOT found; j++)
{
if (ioq_[j]->replyTag_ == replyTag)
{
e = ioq_[j];
found = TRUE;
}
}
assert(found AND e->sent_ AND
e->recvBuffer_ == NULL);
}
else
{
// in the server the buffer is matched with the current
// I/O queue entry; we know it is in the receiving state
assert(e->receiving_ AND
NOT e->sent_ AND
e->sendBuffer_ == NULL AND
e->recvBuffer_ == NULL);
// store the sent reply tag with the buffer and in the IOQ
// (IpcMessageStream will maintain it there and
// SocketConnection::send() will match in up later)
buffer->setReplyTag(replyTag);
e->replyTag_ = replyTag;
}
// e now has a buffer associated with it
e->recvBuffer_ = buffer;
}
// let the timeout expire only once
timeout = IpcImmediately;
}
// if a message was sent/received and the callback wasn't called yet
// then do this now
if (e->sent_ AND e->sendBuffer_)
{
// can call a send callback
// save the message buffer on the stack and reset the IO queue entry
IpcMessageBuffer *buffer = e->sendBuffer_;
if (isClient_)
{
// setting the buffer pointer back to NULL indicates that
// the callback has been called
e->sendBuffer_ = NULL;
}
else
{
// in the server, calling the send callback is the last
// thing to do in the request-reply cycle
ioq_.removeAt(i);
// if this was the last active I/O then set state to inactive
if (ioq_.entries() == 0)
setState(REPLY_PENDING);
}
# ifdef LOG_IPC
IpcSockLogTimestamp(sock_.getFdesc());
cerr << "calling send callback" << endl;
# endif
// finally, call the callback, which may have a lot of side-effects
buffer->callSendCallback(this);
buffer->decrRefCount();
// bail out, don't trust environment after calling a callback
return WAIT_OK;
}
else if (e->receiving_ AND e->recvBuffer_)
{
// can call a receive callback
// save the message buffer on the stack and reset the IO queue entry
IpcMessageBuffer *buffer = e->recvBuffer_;
if (isClient_)
{
// finished send/receive cycle if receive callback is called
// in the client
ioq_.removeAt(i);
// if this was the last active I/O then set state to inactive
if (ioq_.entries() == 0)
setState(ESTABLISHED);
}
else
{
// in the server we'll keep the I/O queue entry until the
// reply comes
e->recvBuffer_ = NULL;
e->receiving_ = FALSE;
}
# ifdef LOG_IPC
IpcSockLogTimestamp(sock_.getFdesc());
cerr << "calling receive callback" << endl;
# endif
// finally, call the callback, which may have a lot of side-effects
buffer->addCallback(e->msg_);
buffer->callReceiveCallback(this);
// bail out, don't trust environment after calling a callback
return WAIT_OK;
}
} // for each ioq_ entry
return WAIT_OK;
}
SockConnection * SockConnection::castToSockConnection()
{
return this;
}
Int32 SockConnection::numQueuedSendMessages()
{
// the current implementation assumes that no messages can
// be queued and that only one message can be sent nowait
assert(sendQueueEntries() == 0 OR
sendQueueEntries() == 1 AND sendIOPending());
return sendQueueEntries();
}
Int32 SockConnection::numQueuedReceiveMessages()
{
// the current implementation assumes that no messages can
// be queued and that only one message can be received nowait
assert(receiveQueueEntries() == 0 OR
receiveQueueEntries() == 1 AND receiveIOPending());
return receiveQueueEntries();
}
void SockConnection::populateDiagsArea(ComDiagsArea *&diags,
CollHeap *diagsHeap)
{
if (sock_.hasError())
{
IpcAllocateDiagsArea(diags,diagsHeap);
*diags << DgSqlCode(-2035) << DgInt0(sock_.getError().geterrno());
getEnvironment()->getMyOwnProcessId(IPC_DOM_INTERNET).
addProcIdToDiagsArea(*diags,0);
getOtherEnd().addProcIdToDiagsArea(*diags,1);
}
}
void SockConnection::setFdesc(SockFdesc fdesc, NABoolean isClient)
{
assert(getState() == INITIAL);
sock_.setFdesc(fdesc);
if (isClient)
setState(ESTABLISHED);
else
setState(REPLY_PENDING);
}
SockPortNumber SockConnection::connect(const SockIPAddress &ipAddr, SockPortNumber port)
{ SockPortNumber port_ = sock_.connect(ipAddr,port);
setOtherEnd(IpcProcessId(ipAddr,sock_.receiveListnerPortNum())); // (3/6/97)
if (NOT sock_.hasError())
{
// a connection was established to the specified port,
// this is the client side
setState(ESTABLISHED);
}
else
{
// it blew
setErrorInfo(sock_.getError().geterrno());
setState(ERROR_STATE);
};
return port_;
}
void SockConnection::tryToSendMore()
{
// while there are unsent entries left, try to send them if the socket
// is ready
for (Int32 i = 0; i < (Int32) ioq_.entries(); i++)
{
// can we send the message in this io queue entry?
if (NOT ioq_[i]->sent_ AND ioq_[i]->sendBuffer_)
{
// try to send the next message
if (sock_.send(ioq_[i]->sendBuffer_,IpcImmediately))
{
// message has been sent or an error has occurred during send
ioq_[i]->sent_ = TRUE;
if (sock_.hasError())
{
// it blew
setErrorInfo(sock_.getError().geterrno());
setState(ERROR_STATE);
return;
}
}
else
return; // socket is busy, don't try another one
}
}
}
// -----------------------------------------------------------------------
// Methods for class SockPairConnection
// -----------------------------------------------------------------------
SockPairConnection::SockPairConnection(
IpcEnvironment *env, const char *eye) :
SockConnection(env,InvalidFdesc,TRUE,eye)
{
ABORT ("Not Implemented milestone 1");
}
SockPairConnection::SockPairConnection(
IpcEnvironment *env, SockFdesc fd) : SockConnection(env,fd,FALSE)
{
otherEnd_ = NULL;
}
SockPairConnection::~SockPairConnection() {}
SockPairConnection *SockPairConnection::otherEnd()
{
// make sure to delete the pointer to the other end before returning
// it, since the other end of the connection will go to a different process
SockPairConnection *temp = otherEnd_;
otherEnd_ = NULL;
return temp;
}
void SockPairConnection::doConnectNow()
{port_ = connect(ipAddr_,port_);
}
// -----------------------------------------------------------------------
// Methods for class SockListnerPort
// -----------------------------------------------------------------------
SockListnerPort::SockListnerPort(IpcEnvironment *env,
SockFdesc fdesc,
SockControlConnection *cc,
const char *eye) :
SockConnection(env,fdesc,FALSE,eye)
{
cc_ = cc;
// we should always be listening to new requests, the state never changes
setState(RECEIVING);
}
void SockListnerPort::send(IpcMessageBuffer *)
{
ABORT("SockListnerPort::send()");
}
void SockListnerPort::receive(IpcMessageStreamBase *)
{
ABORT("SockListnerPort::receive()");
}
WaitReturnStatus SockListnerPort::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox)
{
// new file descriptor in case a connection request comes
SockFdesc newFdesc;
// call the accept system call to wait for client connection requests
if (socket().accept(newFdesc,timeout))
{
// It worked, we have a new client and a socket to it in newFdesc.
// Create a new IPC socket connection to the client.
SockConnection *newConn =
new(getEnvironment()->getHeap()) SockConnection(
getEnvironment(),newFdesc,FALSE);
// call the virtual method of the control connection that is
// supposed to know what to do with a new client connection
cc_->acceptNewConnectionRequest(newConn);
// the state remains "RECEIVING", listen for more connection requests
}
return WAIT_OK;
}
// -----------------------------------------------------------------------
// Methods for class SockControlConnection
// -----------------------------------------------------------------------
SockControlConnection::SockControlConnection(
IpcEnvironment *env, const char *eye)
: IpcControlConnection(IPC_DOM_INTERNET, eye),
listnerSocket_(env)
{
// This process was forked by the internet demon inetd or by the master ESP
// Stdin and stdout are one socket that is already connected to the client.
controlConnection_ = new(env->getHeap()) SockConnection(env,SockStdin,FALSE);
incrNumRequestors();
// bind the new listner socket to any port for communication with any process
// and listen on the socket
listnerPortNum_ = listnerSocket_.listen();
// create a connection object that will listen on the listner port
// and stay active forever
listnerPort_ = new(env->getHeap()) SockListnerPort(
env,listnerSocket_.getFdesc(),this);
// the listener port has some arbitrary port number; send the listener port
// number back to the client as a string "Listening to port xxxxxxxxxxx\n"
cout << "Listening to port " << setw(11) << listnerPortNum_ << "\n" << flush;
}
SockControlConnection::SockControlConnection(
IpcEnvironment *env, Int32 inheritedSocket, Int32 passedPort, const char *eye)
: IpcControlConnection(IPC_DOM_INTERNET),
listnerSocket_(env)
{
assert(0);
}
IpcConnection *SockControlConnection::getConnection() const
{
return controlConnection_;
}
SockControlConnection * SockControlConnection::castToSockControlConnection()
{
return this;
}
void SockControlConnection::acceptNewConnectionRequest(SockConnection *conn)
{
// the default implementation is to reject new connection requests
// should have a deleteMe() method for IpcConnection $$$$
conn->~SockConnection();
}
// -----------------------------------------------------------------------
// Some methods for class IpcServerClass
// -----------------------------------------------------------------------
IpcConnection * IpcServerClass::createInternetProcess(
ComDiagsArea **diags,
CollHeap *diagsHeap,
const char *nodeName,
const char *className,
IpcCpuNum /*cpuNum (sorry, no support for SMPs yet)*/,
NABoolean /*usesTransactions (sorry, no transactions in cyberspace*/,
SockPortNumber defaultPortNumber)
{
IpcNodeName theNode(IPC_DOM_INTERNET,nodeName);
// the inetd service identified by className_
SockService service(className,defaultPortNumber);
// Establish a connection to the service, which will cause
// a new server process to be spawned
IpcConnection *result = new(environment_->getHeap()) SockConnection(
environment_,
IpcProcessId(theNode.getIPAddress(),service.getPortNumber()),
TRUE);
// do an error check and set diags area if it's used
if (diags)
result->populateDiagsArea(*diags,diagsHeap);
return result;
}
IpcConnection * IpcServerClass::forkProcess(ComDiagsArea **diags,
CollHeap *diagsHeap,
const char * /*nodeName*/,
const char *className,
IpcCpuNum /*cpuNum*/,
NABoolean /*usesTransactions*/)
{
IpcConnection *result = NULL;
Int32 serverPid;
SockPairConnection *sockPairServerConn;
SockPairConnection *clientConn;
sockPairServerConn = new SockPairConnection(environment_);
clientConn = sockPairServerConn->otherEnd();
result = sockPairServerConn;
serverPid = fork();
if (serverPid < 0)
ABORT("Couldn't fork the server process");
if (serverPid != 0)
{
// I'm the master
// the connection to the client is not needed here
delete clientConn;
clientConn = NULL;
}
else
{
// delete (close) the connection to the server, I AM the server
delete result;
result = NULL;
// assign the connection to the client to stdin/stdout, this
// is the convention for a socket-based server to receive data
clientConn->assignToStdInOut();
// access the process' environment and propagate it
extern char **environ;
// the command line looks like "<className> -fork"
const char *argv[3];
argv[0] = (char *)className;
argv[1] = "-fork";
argv[2] = 0;
// add "-debug" to the command line arguments if an environment
// variable is set
if (getenv("DEBUG_SERVER"))
argv[1] = "-fork -debug";
if (getenv("DEBUG_FORK"))
NADebug();
// I'm the server, call exec() on the server program file
ABORT("Not implemented for milestone 1");
// won't ever come out of here
}
return result;
}
#ifdef LOG_IPC
void IpcSockLogTimestamp(Int32 fdesc)
{
cerr << getpid() << "(" << fdesc << ") ";
}
#endif
#endif /* DISABLE_SOCKET_IPC */