| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: IpcSockets.cpp |
| * Description: OS related code for socket-based IPC (see Ipc.h) |
| * |
| * Created: 10/6/95 |
| * Language: C++ |
| * |
| * |
| ***************************************************************************** |
| */ |
| |
| // ----------------------------------------------------------------------- |
| |
| #include "Platform.h" |
| #include "copyright.h" |
| |
| |
| // At this point we don't really want to use sockets on NSK because |
| // of problems with C/C++ runtime. Set this define to comment out all |
| // offending code (and maybe a little more than that) |
| |
| |
| |
| // Uncomment the next line to debug IPC problems (log of client's I/O) |
| // #define LOG_IPC |
| |
| #ifdef LOG_IPC |
| void IpcSockLogTimestamp(Int32 fdesc); // see bottom of file |
| #endif |
| |
| #ifndef DISABLE_SOCKET_IPC |
| // for sockaddr struct and flags passed to socket system calls |
| |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <netdb.h> |
| const Int32 SOCKET_ERROR = -1; |
| typedef size_t length_t; |
| |
| #include <errno.h> |
| #include <sys/time.h> |
| |
| #include <iostream> |
| #include <iomanip> |
| #endif /* DISABLE_SOCKET_IPC */ |
| |
| #include "Collections.h" |
| #include "Ipc.h" |
| #include "ComDiags.h" |
| #include "str.h" |
| |
| // comment this out once NAHeap.h is in common |
| // #include "NAHeap.h" |
| // for now declare overloaded operator new here |
| void * operator new(size_t size, CollHeap* h); |
| |
| |
| #ifdef DISABLE_SOCKET_IPC |
| |
| // ----------------------------------------------------------------------- |
| // Stubs for configurations where socket IPC is disabled |
| // ----------------------------------------------------------------------- |
| |
| SockSocket::SockSocket(IpcEnvironment *env) |
| { |
| ABORT("Stub for SockSocket::SockSocket()"); |
| } |
| |
| SockSocket::~SockSocket() |
| { |
| ABORT("Stub for SockSocket::~SockSocket()"); |
| } |
| |
| SockIPAddress::SockIPAddress() |
| { |
| ABORT("Stub for SockIPAddress::SockIPAddress()"); |
| } |
| |
| SockConnection::SockConnection(IpcEnvironment *env, |
| const IpcProcessId &serviceProcId, |
| NABoolean thisIsTheControlConnection, |
| const char *eye) : |
| IpcConnection(env,serviceProcId,eye), sock_(env), ioq_(env->getHeap()) |
| { |
| ABORT("Stub for SockConnection::SockConnection()"); |
| } |
| |
| // also need to stub out all virtual methods of SockConnection, |
| // constructor needs them |
| SockConnection::~SockConnection() |
| { |
| ABORT("Stub for SockConnection::~SockConnection()"); |
| } |
| |
| void SockConnection::send(IpcMessageBuffer *buffer) |
| { |
| ABORT("Stub for void SockConnection::send(IpcMessageBuffer *buffer)"); |
| } |
| |
| void SockConnection::receive(IpcMessageStreamBase *msg) |
| { |
| ABORT("Stub for void SockConnection::receive(IpcMessageStreamBase *msg)"); |
| } |
| |
| WaitReturnStatus SockConnection::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox) |
| { |
| ABORT("Stub for short SockConnection::wait(IpcTimeout timeout)"); |
| return WAIT_INTERRUPT; |
| } |
| |
| SockConnection * SockConnection::castToSockConnection() |
| { |
| ABORT("Stub for SockConnection *SockConnection::castToSockConnection()"); |
| return NULL; |
| } |
| |
| Int32 SockConnection::numQueuedSendMessages() |
| { |
| ABORT("Stub for int SockConnection::numQueuedSendMessages()"); |
| return 0; |
| } |
| |
| Int32 SockConnection::numQueuedReceiveMessages() |
| { |
| ABORT("Stub for int SockConnection::numQueuedReceiveMessages()"); |
| return 0; |
| } |
| |
| void SockConnection::populateDiagsArea(ComDiagsArea *&diags, |
| CollHeap *diagsHeap) |
| { |
| ABORT("Stub for void SockConnection::populateDiagsArea()"); |
| } |
| |
| SockControlConnection::SockControlConnection(IpcEnvironment *env, const char *eye) : |
| IpcControlConnection(IPC_DOM_INTERNET, eye), listnerSocket_(env) |
| { |
| ABORT("Stub for SockControlConnection::SockControlConnection()"); |
| } |
| |
| IpcConnection * SockControlConnection::getConnection() const |
| { |
| ABORT("Stub for SockControlConnection::getConnection()"); |
| return NULL; |
| } |
| |
| SockControlConnection * SockControlConnection::castToSockControlConnection() |
| { |
| ABORT("Stub for SockControlConnection::castToSockControlConnection()"); |
| return NULL; |
| } |
| |
| void SockControlConnection::acceptNewConnectionRequest(SockConnection *conn) |
| { |
| ABORT("Stub for SockControlConnection::acceptNewConnectionRequest()"); |
| } |
| |
| IpcConnection * IpcServerClass::createInternetProcess( |
| ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| const char *nodeName, |
| const char *className, |
| IpcCpuNum /*cpuNum (sorry, no support for SMPs yet)*/, |
| NABoolean /*usesTransactions (sorry, no transactions in cyberspace*/, |
| SockPortNumber defaultPortNumber) |
| { |
| ABORT("Stub for IpcServerClass::createInternetProcess()"); |
| return NULL; |
| } |
| |
| IpcConnection * IpcServerClass::forkProcess(ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| const char * /*nodeName*/, |
| const char *className, |
| IpcCpuNum /*cpuNum*/, |
| NABoolean /*usesTransactions*/) |
| { |
| ABORT("Stub for IpcServerClass::forkProcess()"); |
| return NULL; |
| } |
| |
| SockErrNo SockIPAddress::set(const char *hostName) |
| { |
| ABORT("Stub for SockIPAddress::set()"); |
| return 0; |
| } |
| |
| |
| #else |
| |
| // ----------------------------------------------------------------------- |
| // The REAL methods |
| // ----------------------------------------------------------------------- |
| |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockErrNo |
| // ----------------------------------------------------------------------- |
| Int32 SockErrNo::setFromerrno() |
| { |
| #ifdef NA_ERRNO_AS_PROCEDURE |
| ... insert code for GUARDIAN style errno here |
| #else |
| errno_ = errno; |
| #endif |
| return errno_; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockIPAddress |
| // ----------------------------------------------------------------------- |
| |
| SockIPAddress::SockIPAddress() |
| { |
| // IP address 0.0.0.0 is an invalid (wildcard) IP address |
| a_.ipAddress_[0] = 0; |
| a_.ipAddress_[1] = 0; |
| a_.ipAddress_[2] = 0; |
| a_.ipAddress_[3] = 0; |
| } |
| |
| SockIPAddress::SockIPAddress(const struct sockaddr &sa) |
| { |
| a_.ipAddress_[0] = sa.sa_data[2]; |
| a_.ipAddress_[1] = sa.sa_data[3]; |
| a_.ipAddress_[2] = sa.sa_data[4]; |
| a_.ipAddress_[3] = sa.sa_data[5]; |
| } |
| |
| SockErrNo SockIPAddress::set(const char *hostName) |
| { |
| SockErrNo result; |
| |
| struct hostent *hostEnt = NULL; |
| |
| // initialize the address to something |
| a_.ipAddress_[0] = 0; |
| a_.ipAddress_[1] = 0; |
| a_.ipAddress_[2] = 0; |
| a_.ipAddress_[3] = 0; |
| |
| if (hostName != NULL) |
| hostEnt = gethostbyname(hostName); |
| else |
| { |
| char me[IpcNodeNameMaxLength]; |
| |
| if (gethostname(me,IpcNodeNameMaxLength) < 0) |
| result.setFromerrno(); |
| else |
| hostEnt = gethostbyname(me); |
| } |
| |
| // could use errno_h to get better error reporting |
| if (hostEnt == NULL) |
| result = ENOENT; |
| |
| if (NOT result.hasError()) |
| { |
| // hostEnt->h_addr_list points to an array of pointers to addresses, |
| // where each address is an array of chars (in network order) |
| char *addr = hostEnt->h_addr_list[0]; |
| |
| if (addr == NULL) |
| result = ENXIO; // addr pointer was bad |
| else |
| { |
| a_.ipAddress_[0] = addr[0]; |
| a_.ipAddress_[1] = addr[1]; |
| a_.ipAddress_[2] = addr[2]; |
| a_.ipAddress_[3] = addr[3]; |
| } |
| } |
| |
| return result; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockService |
| // ----------------------------------------------------------------------- |
| |
| SockService::SockService(const char *serviceName, |
| SockPortNumber defaultPortNumber) |
| { |
| struct servent *servEnt; |
| |
| servEnt = getservbyname(serviceName,NULL); |
| if (servEnt == NULL) |
| { |
| // Service not found, using default port number |
| assert(defaultPortNumber != NoSockPortNumber); |
| portNum_ = defaultPortNumber; |
| } |
| else |
| portNum_ = servEnt->s_port; |
| // don't delete servEnt, it is owned by the system |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockSocket |
| // ----------------------------------------------------------------------- |
| |
| SockSocket::SockSocket(IpcEnvironment *env) |
| { |
| environment_ = env; |
| expectedBytes_ = |
| receivedBytesSoFar_ = 0; |
| partiallyReceivedBuffer_ = NULL; |
| |
| // create an unbound socket for the TCP/IP protocol |
| fdesc_ = socket(AF_INET, // domain = IP |
| SOCK_STREAM, // type = stream |
| SockTCPProtocol); // protocol = TCP |
| if (fdesc_ < 0) |
| setFromerrno("creating an unbound socket with socket()"); |
| } |
| |
| SockSocket::SockSocket(SockFdesc fd, IpcEnvironment *env) |
| { |
| environment_ = env; |
| expectedBytes_ = |
| receivedBytesSoFar_ = 0; |
| partiallyReceivedBuffer_ = NULL; |
| |
| fdesc_ = fd; |
| } |
| |
| |
| SockSocket::~SockSocket() |
| { |
| if (fdesc_ >= 0) |
| close(fdesc_); |
| } |
| |
| SockPortNumber SockSocket::bindOrConnect(const SockIPAddress &ipAddr, |
| SockPortNumber port, |
| NABoolean bindOnly) |
| { |
| struct sockaddr targetSockAddr; |
| SockPortNumber result = port; |
| Int32 retcode; |
| |
| lastError_.clear(); |
| |
| // doing everything byte by byte takes care of endianness problems |
| // (just in case you don't like this code) |
| targetSockAddr.sa_family = AF_INET; |
| targetSockAddr.sa_data[0] = ((UInt32)port) / 256; |
| targetSockAddr.sa_data[1] = ((UInt32)port) % 256; |
| targetSockAddr.sa_data[2] = ipAddr.a_.ipAddress_[0]; |
| targetSockAddr.sa_data[3] = ipAddr.a_.ipAddress_[1]; |
| targetSockAddr.sa_data[4] = ipAddr.a_.ipAddress_[2]; |
| targetSockAddr.sa_data[5] = ipAddr.a_.ipAddress_[3]; |
| for (Int32 i = 6; i < 14; i++) |
| targetSockAddr.sa_data[i] = 0; |
| |
| // this code depends on the definition of sockaddr, make sure |
| // we don't do the wrong thing |
| assert(sizeof(targetSockAddr) == 16); |
| |
| // if we want to accept incoming connections, then the bind system |
| // call is the right thing to do now, if we want to connect to a server |
| // then the connect system call is what we want |
| if (bindOnly) |
| { |
| // the bind call just binds the socket to the specified port, |
| // it doesn't wait for a connection to come in (see listen and accept) |
| |
| // NOTE: we want to call the system calls bind and connect, not |
| // the member functions with the same names |
| // NOTE: the port number of the socket gets altered by the call |
| retcode = ::bind(fdesc_,&targetSockAddr,sizeof(targetSockAddr)); |
| } |
| else |
| { |
| // the connect system call actually starts up a connection; once |
| // the call succeeds we can send and receive |
| // NOTE: the port number of the socket gets altered by the call |
| retcode = ::connect(fdesc_,&targetSockAddr,sizeof(targetSockAddr)); |
| } |
| |
| if (retcode == SOCKET_ERROR) |
| setFromerrno("::bind() or ::connect()"); |
| |
| // return the port that the socket is now bound to |
| |
| socklen_t namelen = sizeof(targetSockAddr); |
| retcode = ::getsockname(fdesc_,&targetSockAddr,&namelen); |
| if (retcode == SOCKET_ERROR) |
| setFromerrno("::getsockname()"); |
| |
| return (unsigned char)targetSockAddr.sa_data[0] * 256 + (unsigned char)targetSockAddr.sa_data[1]; |
| |
| } |
| |
| SockPortNumber SockSocket::listen(SockPortNumber port) |
| { |
| lastError_.clear(); |
| |
| // bind the socket to the given port number or, if NoSockPortNumber is |
| // specified, to any port |
| SockPortNumber result = bind(SockIPAddress(),port); |
| if (NOT lastError_.hasError()) |
| { |
| // prepare the socket for ::accept() calls by calling ::listen() |
| if (::listen(fdesc_,5) < 0) |
| |
| setFromerrno("::listen()"); |
| } |
| else |
| { |
| result = NoSockPortNumber; |
| } |
| return result; |
| } |
| |
| SockPortNumber SockSocket::receiveListnerPortNum() |
| { |
| // read the message "Listening to port xxxxxxxxxxx\n" from the |
| // socket to the newly created server and decode the xxxxxxxxxxx to |
| // ascii |
| |
| // Listening to port xxxxxxxxxxx\n |
| // 012345678901234567890123456789 |
| |
| const Int32 numCharsInListnerMsg = 30; |
| const Int32 numCharsInPrefix = 18; |
| |
| char listnerMsg[numCharsInListnerMsg]; |
| |
| // wait for the server to send the port number and read it into a buffer |
| Int32 receivedBytes = recv(fdesc_,listnerMsg,numCharsInListnerMsg,0); |
| |
| // make sure we got the right message |
| short temp = str_cmp(listnerMsg,"Listening to port ",numCharsInPrefix); |
| assert(receivedBytes == numCharsInListnerMsg AND |
| temp == 0); |
| |
| // decode the ascii number |
| SockPortNumber result = atoi(&listnerMsg[numCharsInPrefix]); |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(fdesc_); |
| cerr << "server listens to port " << result << endl; |
| # endif |
| |
| return result; |
| } |
| |
| void SockSocket::assignToStdInOut() |
| { |
| lastError_.clear(); |
| |
| // assign the socket to stdin |
| ABORT ("Not Implemented or Needed in current milestone"); |
| |
| // now set the new file descriptor number in the |
| // object to stdin (Note: server program MUST NOT use stdin |
| // for other purposes than IPC procedures) |
| fdesc_ = SockStdin; |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(SockStdin); |
| cerr << "Assigned stdin to client conn." << endl; |
| # endif |
| } |
| |
| NABoolean SockSocket::send(IpcMessageBuffer *message, |
| IpcTimeout timeout) |
| { |
| IpcMessageObjSize bytesToSend = message->getMessageLength(); |
| IpcMessageObjSize bytesSentSoFar = 0; |
| Lng32 bytesSentThisTime; |
| |
| lastError_.clear(); |
| |
| // --------------------------------------------------------------------- |
| // If a timeout is specified, do a select first before receiving data |
| // NOTE: we wait <timeout> 10ms intervals each time we go through |
| // the loop, although we should wait only a total or <timeout> intervals |
| // (usually, once a message head is coming in, the tail is not far away) |
| // --------------------------------------------------------------------- |
| if (timeout != IpcInfiniteTimeout) |
| { |
| // two structs from sys/time.h and sys/types.h to describe the |
| // timeout in seconds/microseconds to wait and a bitmap of file |
| // descriptors |
| timeval timeOutVal; |
| fd_set readyDescriptors; |
| |
| // make a bitmap with the bit for fdesc_ set to 1 |
| FD_ZERO(&readyDescriptors); |
| FD_SET(fdesc_,&readyDescriptors); |
| |
| // store the timeout value in timeOutVal |
| timeOutVal.tv_sec = timeout / 100; |
| timeOutVal.tv_usec = timeout % 100; |
| |
| Int32 retcode = |
| select(FD_SETSIZE,NULL,&readyDescriptors,NULL,&timeOutVal); |
| |
| if (retcode < 0) |
| { |
| // indicate an error by returning TRUE and setting the error status |
| setFromerrno("::select() for send"); |
| return TRUE; |
| } |
| |
| if (retcode == 0) |
| { |
| // the timeout expired, return without receiving something |
| return FALSE; |
| } |
| } // non-infinite timeout |
| |
| // --------------------------------------------------------------------- |
| // call ::send until all of the data is sent (do it waited from here) |
| // --------------------------------------------------------------------- |
| while (bytesToSend > 0 AND NOT lastError_.hasError()) |
| { |
| bytesSentThisTime = ::send(fdesc_, |
| message->data(bytesSentSoFar), |
| (Int32) bytesToSend, |
| 0); |
| if (bytesSentThisTime <= 0) |
| { |
| setFromerrno("::send()"); |
| } |
| else |
| { |
| // prepare for sending the next chunk or for leaving the loop |
| bytesToSend -= bytesSentThisTime; |
| bytesSentSoFar += (IpcMessageObjSize) bytesSentThisTime; |
| } |
| } |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(fdesc_); |
| IpcMessageObj *firstUserObj = (IpcMessageObj *) |
| (message->data(sizeof(InternalMsgHdrInfoStruct))); |
| cerr << "sent " << bytesSentSoFar << " bytes, objtype " |
| << firstUserObj->getType() << endl; |
| # endif |
| |
| // we did send it |
| return TRUE; |
| } |
| |
| NABoolean SockSocket::receive(IpcMessageBuffer * &message, |
| IpcTimeout timeout) |
| { |
| // receive buffer for socket (may be stack variable or alloc. buffer) |
| IpcMessageBufferPtr sockReceiveBuffer; |
| |
| // copy of the message header on the stack |
| char header[sizeof(InternalMsgHdrInfoStruct)]; |
| // variable that remembers how many bytes came in the message |
| Lng32 receivedBytesThisTime; |
| |
| // the actual message length (initialize to header length until we know it) |
| if (receivedBytesSoFar_ == 0) |
| { |
| // start over with a fresh receive operation |
| expectedBytes_ = sizeof(InternalMsgHdrInfoStruct); |
| } |
| |
| // the user may specify a special receive buffer, but not multiple |
| // receive buffers for the same message |
| if (partiallyReceivedBuffer_ == NULL AND message != NULL) |
| partiallyReceivedBuffer_ = message; |
| assert(message == NULL OR partiallyReceivedBuffer_ == message); |
| |
| // if no buffer is allocated by the caller, use the on-stack header |
| // for the initial chunk |
| if (partiallyReceivedBuffer_) |
| sockReceiveBuffer = partiallyReceivedBuffer_->data(); |
| else |
| sockReceiveBuffer = header; |
| |
| // each call clears the error info and a length is only returned when |
| // the message has arrived completely and the return value is TRUE |
| lastError_.clear(); |
| |
| // --------------------------------------------------------------------- |
| // Keep on receiving in a loop until we have read all of the message |
| // (and no more than the message). |
| // --------------------------------------------------------------------- |
| |
| while (expectedBytes_ > receivedBytesSoFar_ AND NOT lastError_.hasError()) |
| { |
| |
| // --------------------------------------------------------------------- |
| // If a timeout is specified, do a select first before receiving data |
| // NOTE: we wait <timeout> 10ms intervals each time we go through |
| // the loop, although we should wait only a total or <timeout> intervals |
| // (usually, once a message head is coming in, the tail is not far away) |
| // --------------------------------------------------------------------- |
| if (timeout != IpcInfiniteTimeout) |
| { |
| // two structs from sys/time.h and sys/types.h to describe the |
| // timeout in seconds/microseconds to wait and a bitmap of file |
| // descriptors |
| timeval timeOutVal; |
| fd_set readyDescriptors; |
| |
| // make a bitmap with the bit for fdesc_ set to 1 |
| FD_ZERO(&readyDescriptors); |
| FD_SET(fdesc_,&readyDescriptors); |
| |
| // store the timeout value in timeOutVal |
| timeOutVal.tv_sec = timeout / 100; |
| timeOutVal.tv_usec = timeout % 100; |
| |
| timeval *tt = (timeval*)&timeOutVal; |
| Int32 retcode = |
| select(FD_SETSIZE,&readyDescriptors,NULL,NULL,tt); |
| |
| if (retcode < 0) |
| { |
| // indicate an error by returning TRUE and length 0 |
| setFromerrno("::select for receive"); |
| // indicate the error by returning a zero-length message |
| if (partiallyReceivedBuffer_ != NULL) |
| { |
| partiallyReceivedBuffer_->setMessageLength(0); |
| message = partiallyReceivedBuffer_; |
| } |
| partiallyReceivedBuffer_ = NULL; |
| expectedBytes_ = 0; |
| receivedBytesSoFar_ = 0; |
| return TRUE; |
| } |
| |
| if (retcode == 0) |
| { |
| // the timeout expired, return without receiving something |
| return FALSE; |
| } |
| } // non-infinite timeout |
| |
| receivedBytesThisTime = recv( |
| fdesc_, |
| &sockReceiveBuffer[receivedBytesSoFar_], |
| (Int32) (expectedBytes_ - receivedBytesSoFar_), |
| 0); |
| |
| if (receivedBytesThisTime <= 0) |
| { |
| setFromerrno("::recv()"); |
| } |
| else |
| { |
| // if we completely received the header then increase the expected |
| // message length to the actual value sent in the header |
| if (receivedBytesSoFar_ + receivedBytesThisTime == |
| sizeof(InternalMsgHdrInfoStruct)) |
| { |
| // a pointer to the message header |
| InternalMsgHdrInfoStruct *headerPtr = |
| (InternalMsgHdrInfoStruct *) sockReceiveBuffer; |
| |
| #ifdef NA_LITTLE_ENDIAN |
| // need to turn around bytes on a little-endian machine, |
| // since the header info is always sent in big-endian format |
| headerPtr->turnByteOrder(); |
| #endif |
| |
| // read the total message length out of the received header |
| expectedBytes_ = headerPtr->totalLength_; |
| |
| // allocate a message buffer of maxLength unless one was |
| // provided already by the caller (caller becomes new owner |
| // of the buffer) |
| if (sockReceiveBuffer == header) |
| { |
| CollHeap *heap = |
| (environment_ ? environment_->getHeap() : NULL); |
| |
| // we used the provisional buffer for the receive, |
| // go allocate the real buffer with the correct length |
| partiallyReceivedBuffer_ = IpcMessageBuffer::allocate( |
| expectedBytes_, |
| NULL, // don't know the IpcMessageStream object here |
| heap, |
| 0); |
| |
| // the actual receive buffer starts a few bytes further up |
| sockReceiveBuffer = partiallyReceivedBuffer_->data(); |
| |
| // copy the received header into the allocated buffer |
| str_cpy_all(sockReceiveBuffer, |
| header, |
| sizeof(InternalMsgHdrInfoStruct)); |
| } |
| else |
| { |
| // if a buffer was supplied then it better be big enough |
| assert(partiallyReceivedBuffer_->getBufferLength() >= |
| expectedBytes_); |
| } |
| } // message header received |
| |
| // prepare for receiving the next chunk or for leaving the loop |
| receivedBytesSoFar_ += receivedBytesThisTime; |
| } // no receive error |
| } // while expectedBytes_ > receivedBytesSoFar_ |
| |
| // at this point we are done with a message or we got a bad error |
| if (lastError_.hasError()) |
| { |
| // indicate the error by returning a zero-length message |
| if (partiallyReceivedBuffer_ != NULL) |
| partiallyReceivedBuffer_->setMessageLength(0); |
| } |
| else |
| { |
| // set message length in the buffer |
| partiallyReceivedBuffer_->setMessageLength(expectedBytes_); |
| } |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(fdesc_); |
| IpcMessageObj *firstUserObj = (IpcMessageObj *) |
| (partiallyReceivedBuffer_->data(sizeof(InternalMsgHdrInfoStruct))); |
| cerr << "received " << receivedBytesSoFar_ << " bytes, objtype " |
| << firstUserObj->getType() << endl; |
| # endif |
| |
| // indicate we are ready for the next message |
| expectedBytes_ = 0; |
| receivedBytesSoFar_ = 0; |
| message = partiallyReceivedBuffer_; |
| partiallyReceivedBuffer_ = NULL; |
| |
| return TRUE; |
| } |
| |
| NABoolean SockSocket::accept(SockFdesc &fdesc, IpcTimeout timeout) |
| { |
| // --------------------------------------------------------------------- |
| // If a timeout is specified, do a select first before receiving data |
| // NOTE: we wait <timeout> 10ms intervals each time we go through |
| // the loop, although we should wait only a total or <timeout> intervals |
| // (usually, once a message head is coming in, the tail is not far away) |
| // --------------------------------------------------------------------- |
| if (timeout != IpcInfiniteTimeout) |
| { |
| // two structs from sys/time.h and sys/types.h to describe the |
| // timeout in seconds/microseconds to wait and a bitmap of file |
| // descriptors |
| timeval timeOutVal; |
| fd_set readyDescriptors; |
| |
| // make a bitmap with the bit for fdesc_ set to 1 |
| FD_ZERO(&readyDescriptors); |
| FD_SET(fdesc_,&readyDescriptors); |
| |
| // store the timeout value in timeOutVal |
| timeOutVal.tv_sec = timeout / 100; |
| timeOutVal.tv_usec = timeout % 100; |
| |
| Int32 retcode = |
| select(FD_SETSIZE,&readyDescriptors,NULL,NULL,&timeOutVal); |
| |
| if (retcode < 0) |
| { |
| // indicate an error by returning TRUE and setting the error |
| setFromerrno("::select for accept"); |
| fdesc = InvalidFdesc; |
| return TRUE; |
| } |
| |
| if (retcode == 0) |
| { |
| // the timeout expired, return without receiving something |
| return FALSE; |
| } |
| } // non-infinite timeout |
| |
| struct sockaddr clientSockAddr; |
| socklen_t addrlen = sizeof(clientSockAddr); |
| |
| // call the accept() system call, waiting until a client wants to connect |
| Int32 retcode = ::accept(fdesc_,&clientSockAddr,&addrlen); |
| |
| if (retcode >= 0) |
| { |
| // for successful calls, accept() returns the file descriptor |
| // of a newly created socket that can be used to communicate to |
| // the client (the existing socket continues to be the lisner |
| // port, no real messages are ever sent on it) |
| fdesc = retcode; |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(fdesc); |
| |
| SockIPAddress clientIPAddr(clientSockAddr); |
| SockPortNumber clientPort = clientSockAddr.sa_data[0] * 256 + |
| clientSockAddr.sa_data[1]; |
| IpcProcessId clientPid(clientIPAddr,clientPort); |
| char clientPidAsString[200]; |
| |
| clientPid.toAscii(clientPidAsString,200); |
| cerr << "accepted connection from " << clientPidAsString << endl; |
| # endif |
| |
| } |
| else |
| { |
| // a negative returncode indicates an error |
| fdesc = InvalidFdesc; |
| setFromerrno("::accept()"); |
| } |
| |
| // something happened, either an error or a new client request |
| return TRUE; |
| } |
| |
| SockErrNo SockSocket::setFromerrno(const char *msg) |
| { |
| lastError_.setFromerrno(); |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(fdesc_); |
| cerr << "socket error " << lastError_.geterrno() << ", msg: " << msg << endl; |
| # endif |
| |
| return lastError_; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockConnection |
| // ----------------------------------------------------------------------- |
| |
| SockConnection::SockConnection(IpcEnvironment *env, |
| const IpcProcessId &serviceProcId, |
| NABoolean thisIsTheControlConnection, |
| const char *eye) : |
| IpcConnection(env,serviceProcId, eye), sock_(env), |
| isClient_(TRUE), lastReplyTag_(0), ioq_(env->getHeap()) |
| { |
| port_ = sock_.connect(serviceProcId.getIPAddress(), |
| serviceProcId.getPortNumber()); |
| |
| if (thisIsTheControlConnection) |
| { |
| // The process id that was passed in specified a service for |
| // inetd. A new process got created and opened a new port on |
| // which it listens for new requests. Catch the message of |
| // the new process where it announces its listner port and |
| // set that listner port in the server process id. This makes |
| // the server process id unique and allows other processes to |
| // connect to the server by calling the same constructor, but |
| // with the thisIsTheControlConnection parameter set to FALSE. |
| setOtherEnd(IpcProcessId(serviceProcId.getIPAddress(), |
| sock_.receiveListnerPortNum())); |
| } |
| else |
| { |
| // we already got the server process id with the listner port |
| // set; no new process got created, use the original process id |
| } |
| |
| if (NOT sock_.hasError()) |
| { |
| // a connection was established to the specified port, |
| // this is the client side |
| setState(ESTABLISHED); |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(sock_.getFdesc()); |
| char serverPidAsString[200]; |
| |
| serviceProcId.toAscii(serverPidAsString,200); |
| cerr << "connected to server " << serverPidAsString |
| << " on port " << port_ << endl; |
| # endif |
| |
| } |
| else |
| { |
| // it blew |
| setErrorInfo(sock_.getError().geterrno()); |
| setState(ERROR_STATE); |
| } |
| } |
| |
| SockConnection::SockConnection( |
| IpcEnvironment *env, |
| SockFdesc fdesc, |
| NABoolean isClient, |
| const char *eye) : IpcConnection(env,IpcProcessId(), eye), |
| sock_(fdesc,env), port_(NoSockPortNumber), |
| isClient_(isClient), lastReplyTag_(0), |
| ioq_(env->getHeap()) |
| { |
| if (fdesc != InvalidFdesc) |
| { |
| if (isClient) |
| setState(ESTABLISHED); // client may send right now |
| else |
| setState(REPLY_PENDING); // sender needs to wait for client's message |
| } |
| } |
| |
| SockConnection::~SockConnection() |
| { |
| // deallocate ioq entries |
| for (Int32 i = 0; i < (Int32) ioq_.entries(); i++) |
| { |
| getEnvironment()->getHeap()->deallocateMemory(ioq_[i]); |
| } |
| } |
| |
| void SockConnection::send(IpcMessageBuffer *buffer) |
| { |
| socketIOQueueEntry *sendEntry = NULL; |
| short replyTag; |
| |
| if (isClient_) |
| { |
| // if the client sends, add a new entry to the I/O queue since |
| // the send operation starts a new round trip |
| |
| sendEntry = new(getEnvironment()->getHeap()) socketIOQueueEntry; |
| ioq_.insert(sendEntry); |
| |
| // increment reply tag |
| // (wrap around at some arbitrary number that is large enough) |
| lastReplyTag_++; |
| if (lastReplyTag_ > 4711) |
| lastReplyTag_ = 0; |
| replyTag = lastReplyTag_; |
| |
| sendEntry->msg_ = buffer->getMessageStream(); |
| sendEntry->recvBuffer_ = NULL; |
| sendEntry->replyTag_ = lastReplyTag_; |
| sendEntry->receiving_ = FALSE; |
| } |
| else |
| { |
| // for the server, a send operation is actually a reply and it |
| // completes the operation; find the request with the correct |
| // reply tag |
| |
| replyTag = buffer->getReplyTag(); |
| |
| // find the correct reply tag in the I/O queue |
| for (Int32 i = 0; i < (Int32) ioq_.entries() AND sendEntry == NULL; i++) |
| { |
| if (ioq_[i]->replyTag_ == replyTag) |
| sendEntry = ioq_[i]; |
| } |
| assert(sendEntry AND sendEntry->recvBuffer_ == NULL AND |
| NOT sendEntry->receiving_); |
| } |
| |
| // Put the reply tag into the message itself. A little note: there are |
| // three places where we store reply tags: in the I/O queue entry, |
| // in the IpcMessageBuffer header (the part that doesn't get sent in |
| // a message) and in the message header that does get sent. In NSK there |
| // is no need to send a reply tag, it is maintained by NSK, so we |
| // need only two places in NSK. We always keep the IpcMessageBuffer |
| // reply tag consistent with the one in the message header, and we |
| // use the reply tag to find the correct I/O queue slot when we |
| // receive in the client or when we send (reply) in the server. For |
| // a socket connection, a reply tag is generated when the client sends, |
| // it is then sent to the server and finally sent back to the client |
| // where it is used to match the reply with the original send. |
| |
| ((InternalMsgHdrInfoStruct *) buffer->data())->sockReplyTag_ = replyTag; |
| |
| // set I/O queue entry |
| sendEntry->sent_ = FALSE; |
| sendEntry->sendBuffer_ = buffer; |
| |
| // indicate that I/O operations are pending, connection is active |
| setState(SENDING); |
| |
| // try to send this message or another, earlier, message |
| tryToSendMore(); |
| |
| // NOTE: send queue of the base class is not used |
| } |
| |
| void SockConnection::receive(IpcMessageStreamBase *msg) |
| { |
| socketIOQueueEntry *receiveEntry = NULL; |
| |
| if (isClient_) |
| { |
| // in the client, find the send I/O that was issued by this message |
| // stream |
| for (Int32 i = 0; i < (Int32)ioq_.entries() AND receiveEntry == NULL; i++) |
| { |
| if (ioq_[i]->msg_ == msg) |
| receiveEntry = ioq_[i]; |
| } |
| // make sure there is an entry that has been sent already |
| assert(receiveEntry AND receiveEntry->sent_ AND |
| NOT receiveEntry->receiving_ AND |
| receiveEntry->sendBuffer_ == NULL); |
| } |
| else |
| { |
| // in the server, add a new entry to the I/O queue |
| receiveEntry = new(getEnvironment()->getHeap()) socketIOQueueEntry; |
| ioq_.insert(receiveEntry); |
| |
| // set IO queue entry |
| receiveEntry->sent_ = FALSE; |
| receiveEntry->msg_ = msg; |
| receiveEntry->sendBuffer_ = NULL; |
| receiveEntry->recvBuffer_ = NULL; |
| } |
| |
| // set rest of IO queue entry |
| receiveEntry->receiving_ = TRUE; |
| |
| setState(RECEIVING); |
| |
| // we are now at a state where wait() will be able to complete the |
| // receive I/O |
| } |
| |
| WaitReturnStatus SockConnection::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox) |
| { |
| // try to send some more messages |
| tryToSendMore(); // $$$$ use of zero timeout is not ideal |
| |
| // loop over the I/O queue and try to receive some messages and try |
| // to call callbacks |
| for (Int32 i = 0; i < (Int32)ioq_.entries(); i++) |
| { |
| socketIOQueueEntry *e = ioq_[i]; |
| |
| if (e->receiving_) |
| { |
| // this is an entry that is receiving, try to get a message |
| // buffer from the socket (but in the client this will not |
| // necessarily be the one that is associated with this IO |
| // queue entry) |
| IpcMessageBuffer *buffer = NULL; |
| NABoolean completed = sock_.receive(buffer,timeout); |
| |
| if (sock_.hasError()) |
| { |
| // it blew |
| setErrorInfo(sock_.getError().geterrno()); |
| setState(ERROR_STATE); |
| ABORT("Can't handle this error via ComDiags"); |
| } |
| |
| if (completed) |
| { |
| // find the IO queue entry that is associated with the |
| // received buffer |
| |
| // get the sent reply tag from the message header |
| short replyTag = |
| ((InternalMsgHdrInfoStruct *) buffer->data())->sockReplyTag_; |
| |
| if (isClient_) |
| { |
| // in the client, use the reply tag to find the associated |
| // entry |
| NABoolean found = FALSE; |
| for (Int32 j = 0; j < (Int32)ioq_.entries() AND NOT found; j++) |
| { |
| if (ioq_[j]->replyTag_ == replyTag) |
| { |
| e = ioq_[j]; |
| found = TRUE; |
| } |
| } |
| assert(found AND e->sent_ AND |
| e->recvBuffer_ == NULL); |
| } |
| else |
| { |
| // in the server the buffer is matched with the current |
| // I/O queue entry; we know it is in the receiving state |
| assert(e->receiving_ AND |
| NOT e->sent_ AND |
| e->sendBuffer_ == NULL AND |
| e->recvBuffer_ == NULL); |
| |
| // store the sent reply tag with the buffer and in the IOQ |
| // (IpcMessageStream will maintain it there and |
| // SocketConnection::send() will match in up later) |
| buffer->setReplyTag(replyTag); |
| e->replyTag_ = replyTag; |
| } |
| |
| // e now has a buffer associated with it |
| e->recvBuffer_ = buffer; |
| } |
| |
| // let the timeout expire only once |
| timeout = IpcImmediately; |
| } |
| |
| // if a message was sent/received and the callback wasn't called yet |
| // then do this now |
| if (e->sent_ AND e->sendBuffer_) |
| { |
| // can call a send callback |
| // save the message buffer on the stack and reset the IO queue entry |
| IpcMessageBuffer *buffer = e->sendBuffer_; |
| |
| if (isClient_) |
| { |
| // setting the buffer pointer back to NULL indicates that |
| // the callback has been called |
| e->sendBuffer_ = NULL; |
| } |
| else |
| { |
| // in the server, calling the send callback is the last |
| // thing to do in the request-reply cycle |
| ioq_.removeAt(i); |
| |
| // if this was the last active I/O then set state to inactive |
| if (ioq_.entries() == 0) |
| setState(REPLY_PENDING); |
| } |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(sock_.getFdesc()); |
| cerr << "calling send callback" << endl; |
| # endif |
| |
| // finally, call the callback, which may have a lot of side-effects |
| buffer->callSendCallback(this); |
| buffer->decrRefCount(); |
| |
| // bail out, don't trust environment after calling a callback |
| return WAIT_OK; |
| } |
| else if (e->receiving_ AND e->recvBuffer_) |
| { |
| // can call a receive callback |
| // save the message buffer on the stack and reset the IO queue entry |
| IpcMessageBuffer *buffer = e->recvBuffer_; |
| |
| if (isClient_) |
| { |
| // finished send/receive cycle if receive callback is called |
| // in the client |
| ioq_.removeAt(i); |
| |
| // if this was the last active I/O then set state to inactive |
| if (ioq_.entries() == 0) |
| setState(ESTABLISHED); |
| } |
| else |
| { |
| // in the server we'll keep the I/O queue entry until the |
| // reply comes |
| e->recvBuffer_ = NULL; |
| e->receiving_ = FALSE; |
| } |
| |
| # ifdef LOG_IPC |
| IpcSockLogTimestamp(sock_.getFdesc()); |
| cerr << "calling receive callback" << endl; |
| # endif |
| |
| // finally, call the callback, which may have a lot of side-effects |
| buffer->addCallback(e->msg_); |
| buffer->callReceiveCallback(this); |
| |
| // bail out, don't trust environment after calling a callback |
| return WAIT_OK; |
| } |
| } // for each ioq_ entry |
| |
| return WAIT_OK; |
| } |
| |
| SockConnection * SockConnection::castToSockConnection() |
| { |
| return this; |
| } |
| |
| Int32 SockConnection::numQueuedSendMessages() |
| { |
| // the current implementation assumes that no messages can |
| // be queued and that only one message can be sent nowait |
| assert(sendQueueEntries() == 0 OR |
| sendQueueEntries() == 1 AND sendIOPending()); |
| return sendQueueEntries(); |
| } |
| |
| Int32 SockConnection::numQueuedReceiveMessages() |
| { |
| // the current implementation assumes that no messages can |
| // be queued and that only one message can be received nowait |
| assert(receiveQueueEntries() == 0 OR |
| receiveQueueEntries() == 1 AND receiveIOPending()); |
| return receiveQueueEntries(); |
| } |
| |
| void SockConnection::populateDiagsArea(ComDiagsArea *&diags, |
| CollHeap *diagsHeap) |
| { |
| if (sock_.hasError()) |
| { |
| IpcAllocateDiagsArea(diags,diagsHeap); |
| |
| *diags << DgSqlCode(-2035) << DgInt0(sock_.getError().geterrno()); |
| getEnvironment()->getMyOwnProcessId(IPC_DOM_INTERNET). |
| addProcIdToDiagsArea(*diags,0); |
| getOtherEnd().addProcIdToDiagsArea(*diags,1); |
| } |
| } |
| |
| void SockConnection::setFdesc(SockFdesc fdesc, NABoolean isClient) |
| { |
| assert(getState() == INITIAL); |
| sock_.setFdesc(fdesc); |
| if (isClient) |
| setState(ESTABLISHED); |
| else |
| setState(REPLY_PENDING); |
| } |
| |
| SockPortNumber SockConnection::connect(const SockIPAddress &ipAddr, SockPortNumber port) |
| { SockPortNumber port_ = sock_.connect(ipAddr,port); |
| setOtherEnd(IpcProcessId(ipAddr,sock_.receiveListnerPortNum())); // (3/6/97) |
| if (NOT sock_.hasError()) |
| { |
| // a connection was established to the specified port, |
| // this is the client side |
| setState(ESTABLISHED); |
| } |
| else |
| { |
| // it blew |
| setErrorInfo(sock_.getError().geterrno()); |
| setState(ERROR_STATE); |
| }; |
| return port_; |
| } |
| |
| void SockConnection::tryToSendMore() |
| { |
| // while there are unsent entries left, try to send them if the socket |
| // is ready |
| for (Int32 i = 0; i < (Int32) ioq_.entries(); i++) |
| { |
| // can we send the message in this io queue entry? |
| if (NOT ioq_[i]->sent_ AND ioq_[i]->sendBuffer_) |
| { |
| // try to send the next message |
| |
| if (sock_.send(ioq_[i]->sendBuffer_,IpcImmediately)) |
| { |
| // message has been sent or an error has occurred during send |
| ioq_[i]->sent_ = TRUE; |
| |
| if (sock_.hasError()) |
| { |
| // it blew |
| setErrorInfo(sock_.getError().geterrno()); |
| setState(ERROR_STATE); |
| return; |
| } |
| } |
| else |
| return; // socket is busy, don't try another one |
| } |
| } |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockPairConnection |
| // ----------------------------------------------------------------------- |
| |
| SockPairConnection::SockPairConnection( |
| IpcEnvironment *env, const char *eye) : |
| SockConnection(env,InvalidFdesc,TRUE,eye) |
| { |
| ABORT ("Not Implemented milestone 1"); |
| |
| } |
| |
| |
| SockPairConnection::SockPairConnection( |
| IpcEnvironment *env, SockFdesc fd) : SockConnection(env,fd,FALSE) |
| { |
| otherEnd_ = NULL; |
| } |
| |
| SockPairConnection::~SockPairConnection() {} |
| |
| SockPairConnection *SockPairConnection::otherEnd() |
| { |
| // make sure to delete the pointer to the other end before returning |
| // it, since the other end of the connection will go to a different process |
| SockPairConnection *temp = otherEnd_; |
| |
| otherEnd_ = NULL; |
| return temp; |
| } |
| |
| void SockPairConnection::doConnectNow() |
| {port_ = connect(ipAddr_,port_); |
| } |
| |
| |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockListnerPort |
| // ----------------------------------------------------------------------- |
| |
| SockListnerPort::SockListnerPort(IpcEnvironment *env, |
| SockFdesc fdesc, |
| SockControlConnection *cc, |
| const char *eye) : |
| SockConnection(env,fdesc,FALSE,eye) |
| { |
| cc_ = cc; |
| // we should always be listening to new requests, the state never changes |
| setState(RECEIVING); |
| } |
| |
| void SockListnerPort::send(IpcMessageBuffer *) |
| { |
| ABORT("SockListnerPort::send()"); |
| } |
| |
| void SockListnerPort::receive(IpcMessageStreamBase *) |
| { |
| ABORT("SockListnerPort::receive()"); |
| } |
| |
| WaitReturnStatus SockListnerPort::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox) |
| { |
| // new file descriptor in case a connection request comes |
| SockFdesc newFdesc; |
| |
| // call the accept system call to wait for client connection requests |
| if (socket().accept(newFdesc,timeout)) |
| { |
| // It worked, we have a new client and a socket to it in newFdesc. |
| // Create a new IPC socket connection to the client. |
| SockConnection *newConn = |
| new(getEnvironment()->getHeap()) SockConnection( |
| getEnvironment(),newFdesc,FALSE); |
| |
| // call the virtual method of the control connection that is |
| // supposed to know what to do with a new client connection |
| cc_->acceptNewConnectionRequest(newConn); |
| |
| // the state remains "RECEIVING", listen for more connection requests |
| } |
| return WAIT_OK; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class SockControlConnection |
| // ----------------------------------------------------------------------- |
| |
| SockControlConnection::SockControlConnection( |
| IpcEnvironment *env, const char *eye) |
| : IpcControlConnection(IPC_DOM_INTERNET, eye), |
| listnerSocket_(env) |
| { |
| // This process was forked by the internet demon inetd or by the master ESP |
| // Stdin and stdout are one socket that is already connected to the client. |
| controlConnection_ = new(env->getHeap()) SockConnection(env,SockStdin,FALSE); |
| incrNumRequestors(); |
| |
| // bind the new listner socket to any port for communication with any process |
| // and listen on the socket |
| listnerPortNum_ = listnerSocket_.listen(); |
| |
| // create a connection object that will listen on the listner port |
| // and stay active forever |
| listnerPort_ = new(env->getHeap()) SockListnerPort( |
| env,listnerSocket_.getFdesc(),this); |
| |
| // the listener port has some arbitrary port number; send the listener port |
| // number back to the client as a string "Listening to port xxxxxxxxxxx\n" |
| cout << "Listening to port " << setw(11) << listnerPortNum_ << "\n" << flush; |
| } |
| |
| SockControlConnection::SockControlConnection( |
| IpcEnvironment *env, Int32 inheritedSocket, Int32 passedPort, const char *eye) |
| : IpcControlConnection(IPC_DOM_INTERNET), |
| listnerSocket_(env) |
| { |
| assert(0); |
| } |
| |
| IpcConnection *SockControlConnection::getConnection() const |
| { |
| return controlConnection_; |
| } |
| |
| SockControlConnection * SockControlConnection::castToSockControlConnection() |
| { |
| return this; |
| } |
| |
| void SockControlConnection::acceptNewConnectionRequest(SockConnection *conn) |
| { |
| // the default implementation is to reject new connection requests |
| // should have a deleteMe() method for IpcConnection $$$$ |
| conn->~SockConnection(); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Some methods for class IpcServerClass |
| // ----------------------------------------------------------------------- |
| |
| IpcConnection * IpcServerClass::createInternetProcess( |
| ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| const char *nodeName, |
| const char *className, |
| IpcCpuNum /*cpuNum (sorry, no support for SMPs yet)*/, |
| NABoolean /*usesTransactions (sorry, no transactions in cyberspace*/, |
| SockPortNumber defaultPortNumber) |
| { |
| IpcNodeName theNode(IPC_DOM_INTERNET,nodeName); |
| |
| // the inetd service identified by className_ |
| SockService service(className,defaultPortNumber); |
| |
| // Establish a connection to the service, which will cause |
| // a new server process to be spawned |
| IpcConnection *result = new(environment_->getHeap()) SockConnection( |
| environment_, |
| IpcProcessId(theNode.getIPAddress(),service.getPortNumber()), |
| TRUE); |
| |
| // do an error check and set diags area if it's used |
| if (diags) |
| result->populateDiagsArea(*diags,diagsHeap); |
| |
| return result; |
| } |
| |
| |
| |
| IpcConnection * IpcServerClass::forkProcess(ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| const char * /*nodeName*/, |
| const char *className, |
| IpcCpuNum /*cpuNum*/, |
| NABoolean /*usesTransactions*/) |
| { |
| IpcConnection *result = NULL; |
| Int32 serverPid; |
| SockPairConnection *sockPairServerConn; |
| SockPairConnection *clientConn; |
| |
| sockPairServerConn = new SockPairConnection(environment_); |
| clientConn = sockPairServerConn->otherEnd(); |
| result = sockPairServerConn; |
| |
| serverPid = fork(); |
| |
| if (serverPid < 0) |
| ABORT("Couldn't fork the server process"); |
| |
| if (serverPid != 0) |
| { |
| // I'm the master |
| |
| // the connection to the client is not needed here |
| delete clientConn; |
| clientConn = NULL; |
| } |
| else |
| { |
| // delete (close) the connection to the server, I AM the server |
| delete result; |
| result = NULL; |
| |
| // assign the connection to the client to stdin/stdout, this |
| // is the convention for a socket-based server to receive data |
| clientConn->assignToStdInOut(); |
| |
| // access the process' environment and propagate it |
| extern char **environ; |
| // the command line looks like "<className> -fork" |
| const char *argv[3]; |
| |
| argv[0] = (char *)className; |
| argv[1] = "-fork"; |
| argv[2] = 0; |
| |
| // add "-debug" to the command line arguments if an environment |
| // variable is set |
| if (getenv("DEBUG_SERVER")) |
| argv[1] = "-fork -debug"; |
| if (getenv("DEBUG_FORK")) |
| NADebug(); |
| |
| // I'm the server, call exec() on the server program file |
| ABORT("Not implemented for milestone 1"); |
| |
| // won't ever come out of here |
| } |
| |
| return result; |
| } |
| |
| #ifdef LOG_IPC |
| void IpcSockLogTimestamp(Int32 fdesc) |
| { |
| cerr << getpid() << "(" << fdesc << ") "; |
| } |
| #endif |
| |
| #endif /* DISABLE_SOCKET_IPC */ |