blob: f7b8afb7e86afc6a92532c4d05736a9f32063ab8 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* service.c
* Shared code module that supports Greenplum system server processes
* under the postmaster that communicate with other Postgres processes
* with TCP/IP (same GP instance or different GP instance).
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <pthread.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <time.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
#include "miscadmin.h"
#include "libpq/pqsignal.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbselect.h"
#include "commands/sequence.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "postmaster/service.h"
#include "postmaster/walsendserver.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/pmsignal.h" /* PostmasterIsAlive */
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/ps_status.h"
#include "cdb/cdbutil.h"
#include "cdb/cdblink.h"
#include "access/xlog_internal.h"
#include "utils/memutils.h"
// TEMP BEGIN
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
#endif
/*
* backlog for listen() call:
*/
#define LISTEN_BACKLOG 64
#define getInt16(pNetData) ntohs(*((int16 *)(pNetData)))
#define getInt32(pNetData) ntohl(*((int32 *)(pNetData)))
#define getOid(pNetData)
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
// UNDONE: Network types?
typedef struct ServiceConnectionRequestMsg
{
char eyeCatcher[SERVER_EYE_CATCHER_LEN];
} ServiceConnectionRequestMsg;
typedef struct ServiceConnectionResponseMsg
{
bool ok;
} ServiceConnectionResponseMsg;
static bool ServiceDoConnect(ServiceConfig *serviceConfig, int listenerPort, ServiceClient *serviceClient, bool complain);
static bool ServiceClientRead(ServiceClient *serviceClient, void* response, int responseLen, struct timeval *timeout);
static bool ServiceClientPollRead(ServiceClient *serviceClient, void* response, int responseLen, bool *pollResponseReceived);
static bool ServiceClientWrite(ServiceClient *serviceClient, void* request, int requestLen);
static void ServiceQuickDie(SIGNAL_ARGS);
static void ServiceListenLoop(ServiceCtrl *serviceCtrl);
static bool ServiceProcessRequest(ServiceCtrl *serviceCtrl, int sockfd, uint8 *inputBuff, bool newConnection);
static bool ServiceNewConnectionMsg(ServiceCtrl *serviceCtrl, int sockfd, ServiceConnectionRequestMsg *newConnectionRequestMsg);
static int ServiceListenerSetup(ServiceCtrl *serviceCtrl);
void
ServiceInit(ServiceConfig *serviceConfig, ServiceCtrl *serviceCtrl)
{
Assert(serviceCtrl != NULL);
Assert(serviceConfig != NULL);
serviceCtrl->serviceConfig = serviceConfig;
serviceCtrl->listenerFd = -1;
serviceCtrl->listenerPort = -1;
}
static char ClientErrorString[200] = "";
char *ServiceGetLastClientErrorString(void)
{
return ClientErrorString;
}
static bool
ServiceDoConnect(ServiceConfig *serviceConfig, int listenerPort, ServiceClient *serviceClient, bool complain)
{
int n;
struct sockaddr_in addr;
int saved_err;
char *message;
bool result = false;
DECLARE_SAVE_SUPPRESS_PANIC();
PG_TRY();
{
SUPPRESS_PANIC();
for (;;)
{
/*
* Open a connection to the service.
*/
serviceClient->sockfd = socket(AF_INET, SOCK_STREAM, 0);
addr.sin_family = AF_INET;
addr.sin_port = htons(listenerPort);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
if ((n = connect(serviceClient->sockfd, (struct sockaddr *)&addr, sizeof(addr))) < 0)
{
saved_err = errno;
close(serviceClient->sockfd);
serviceClient->sockfd = -1;
if (errno == EINTR)
continue;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Could not connect to '%s': %s",
serviceConfig->title,
strerror(saved_err))));
}
else
{
//success. we're done here!
break;
}
}
/* make socket non-blocking BEFORE we connect. */
if (!pg_set_noblock(serviceClient->sockfd))
{
saved_err = errno;
close(serviceClient->sockfd);
serviceClient->sockfd = -1;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Could not set '%s' socket to non-blocking mode: %s",
serviceConfig->title,
strerror(saved_err))));
}
result = true;
RESTORE_PANIC();
}
PG_CATCH();
{
RESTORE_PANIC();
/* Report the error to the server log */
if (!elog_demote(WARNING))
{
elog(LOG,"unable to demote error");
PG_RE_THROW();
}
message = elog_message();
if (message != NULL && strlen(message) + 1 < sizeof(ClientErrorString))
strcpy(ClientErrorString, message);
else
strcpy(ClientErrorString, "");
if (complain)
EmitErrorReport();
FlushErrorState();
result = false;
}
PG_END_TRY();
return result;
}
void
ServiceGetClientTimeout(ServiceConfig *serviceConfig, struct timeval *timeout)
{
Assert(serviceConfig != NULL);
if (serviceConfig->ServiceClientTimeout == NULL)
{
timeout->tv_sec = 0;
timeout->tv_usec = 250000; // 0.25 second.
}
else
serviceConfig->ServiceClientTimeout(timeout);
}
/*
* Used by clients to connect to a service.
*/
bool
ServiceClientConnect(ServiceConfig *serviceConfig, int listenerPort, ServiceClient *serviceClient, bool complain)
{
// ServiceConnectionRequestMsg newConnectionRequestMsg;
// ServiceConnectionResponseMsg newConnectionResponseMsg;
bool connected;
Assert(serviceConfig != NULL);
Assert(serviceClient != NULL);
if (serviceClient->sockfd != -1)
{
/*
* We are already connected.
*/
return true;
}
serviceClient->serviceConfig = serviceConfig;
connected = ServiceDoConnect(serviceConfig, listenerPort, serviceClient, complain);
// ServiceClientWrite(serviceClient, &newConnectionRequestMsg, sizeof(newConnectionRequestMsg));
// ServiceClientRead(serviceClient, &newConnectionResponseMsg, sizeof(newConnectionResponseMsg));
// UNDONE: Check result.
return connected;
}
/*
* Used by clients to send a request to a service.
*/
bool
ServiceClientSendRequest(ServiceClient *serviceClient, void* request, int requestLen)
{
ServiceConfig *serviceConfig;
char *message;
bool result = false;
DECLARE_SAVE_SUPPRESS_PANIC();
Assert(serviceClient != NULL);
Assert(request != NULL);
PG_TRY();
{
SUPPRESS_PANIC();
serviceConfig = serviceClient->serviceConfig;
if (serviceConfig == NULL)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Not connected to '%s'",
serviceConfig->title)));
}
if (requestLen != serviceClient->serviceConfig->requestLen)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Expecting request length %d and actual length is %d for '%s'",
serviceClient->serviceConfig->requestLen, requestLen,
serviceConfig->title)));
}
result = ServiceClientWrite(serviceClient, request, requestLen);
RESTORE_PANIC();
}
PG_CATCH();
{
RESTORE_PANIC();
/* Report the error to the server log */
if (!elog_demote(WARNING))
{
elog(LOG,"unable to demote error");
PG_RE_THROW();
}
message = elog_message();
if (message != NULL && strlen(message) + 1 < sizeof(ClientErrorString))
strcpy(ClientErrorString, message);
else
strcpy(ClientErrorString, "");
EmitErrorReport();
FlushErrorState();
result = false;
}
PG_END_TRY();
return result;
}
static bool
ServiceClientWrite(ServiceClient *serviceClient, void* request, int requestLen)
{
ServiceConfig *serviceConfig;
int n;
int bytesWritten = 0;
int saved_err;
char *message;
bool result = false;
mpp_fd_set wset;
struct timeval rundownTimeout;
// Use local variable since select modifies
// the timeout parameter with remaining time.
DECLARE_SAVE_SUPPRESS_PANIC();
Assert(serviceClient != NULL);
serviceConfig = (ServiceConfig*)serviceClient->serviceConfig;
Assert(serviceConfig != NULL);
ServiceGetClientTimeout(serviceConfig, &rundownTimeout);
Assert(request != NULL);
PG_TRY();
{
SUPPRESS_PANIC();
if (serviceClient->sockfd == -1)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Lost connection to '%s'",
serviceConfig->title)));
/*
* write the request
*/
while( bytesWritten < requestLen )
{
n = write(serviceClient->sockfd,
((char *)request) + bytesWritten,
requestLen - bytesWritten);
saved_err = errno;
if (n == 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Connection to '%s' is closed",
serviceConfig->title)));
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Write error to '%s': %s",
serviceConfig->title,
strerror(saved_err))));
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
MPP_FD_ZERO(&wset);
MPP_FD_SET(serviceClient->sockfd, &wset);
n = select(serviceClient->sockfd + 1, NULL, (fd_set *)&wset, NULL, &rundownTimeout);
if (n == 0)
{
struct timeval wholeTimeout;
ServiceGetClientTimeout(serviceConfig, &wholeTimeout);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Write to '%s' timed out after %d.%03d seconds)",
serviceConfig->title,
(int)wholeTimeout.tv_sec,
(int)wholeTimeout.tv_usec / 1000)));
}
else if (n < 0 && errno == EINTR)
continue;
else if (n < 0)
{
saved_err = errno;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Write error to '%s': %s",
serviceConfig->title,
strerror(saved_err))));
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
bytesWritten += n;
}
result = true;
RESTORE_PANIC();
}
PG_CATCH();
{
RESTORE_PANIC();
/* Report the error to the server log */
if (!elog_demote(WARNING))
{
elog(LOG,"unable to demote error");
PG_RE_THROW();
}
message = elog_message();
if (message != NULL && strlen(message) + 1 < sizeof(ClientErrorString))
strcpy(ClientErrorString, message);
else
strcpy(ClientErrorString, "");
EmitErrorReport();
FlushErrorState();
result = false;
}
PG_END_TRY();
return result;
}
/*
* Used by clients to receive a response from a service.
*/
bool
ServiceClientReceiveResponse(ServiceClient *serviceClient, void* response, int responseLen, struct timeval *timeout)
{
ServiceConfig *serviceConfig;
bool successful;
Assert(serviceClient != NULL);
serviceConfig = serviceClient->serviceConfig;
Assert(serviceConfig != NULL);
Assert(response != NULL);
Assert(responseLen == serviceClient->serviceConfig->responseLen);
successful = ServiceClientRead(serviceClient, response, responseLen, timeout);
return successful;
}
static bool
ServiceClientRead(ServiceClient *serviceClient, void* response, int responseLen, struct timeval *timeout)
{
ServiceConfig *serviceConfig;
int n;
int bytesRead = 0;
int saved_err;
char *message;
bool result = false;
mpp_fd_set rset;
struct timeval rundownTimeout = {0,0};
// Use local variable since select modifies
// the timeout parameter with remaining time.
DECLARE_SAVE_SUPPRESS_PANIC();
Assert(serviceClient != NULL);
serviceConfig = serviceClient->serviceConfig;
Assert(serviceConfig != NULL);
Assert(response != NULL);
if (timeout != NULL)
rundownTimeout = *timeout;
PG_TRY();
{
SUPPRESS_PANIC();
/*
* read the response
*/
while (bytesRead < responseLen)
{
n = read(serviceClient->sockfd,
((char *)response) + bytesRead,
responseLen - bytesRead);
saved_err = errno;
if (n == 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Connection to '%s' is closed (%d)",
serviceConfig->title, serviceClient->sockfd)));
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Read error from '%s': %s (%d)",
serviceConfig->title,
strerror(saved_err), serviceClient->sockfd)));
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
MPP_FD_ZERO(&rset);
MPP_FD_SET(serviceClient->sockfd, &rset);
n = select(serviceClient->sockfd + 1, (fd_set *)&rset, NULL, NULL, (timeout == NULL ? NULL : &rundownTimeout));
if (n == 0)
{
if (timeout != NULL)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Read from '%s' timed out after %d.%03d seconds",
serviceConfig->title,
(int)timeout->tv_sec,
(int)timeout->tv_usec / 1000)));
}
}
else if (n < 0 && errno == EINTR)
continue;
else if (n < 0)
{
saved_err = errno;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Read error from '%s': %s (%d)",
serviceConfig->title,
strerror(saved_err), serviceClient->sockfd)));
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
else
bytesRead += n;
}
result = true;
RESTORE_PANIC();
}
PG_CATCH();
{
RESTORE_PANIC();
/* Report the error to the server log */
if (!elog_demote(WARNING))
{
elog(LOG,"unable to demote error");
PG_RE_THROW();
}
message = elog_message();
if (message != NULL && strlen(message) + 1 < sizeof(ClientErrorString))
strcpy(ClientErrorString, message);
else
strcpy(ClientErrorString, "");
EmitErrorReport();
FlushErrorState();
result = false;
}
PG_END_TRY();
return result;
}
/*
* Used by clients to receive a response from a service.
*/
bool
ServiceClientPollResponse(ServiceClient *serviceClient, void* response, int responseLen, bool *pollResponseReceived)
{
ServiceConfig *serviceConfig;
bool successful;
Assert(serviceClient != NULL);
serviceConfig = serviceClient->serviceConfig;
Assert(serviceConfig != NULL);
Assert(response != NULL);
Assert(responseLen == serviceClient->serviceConfig->responseLen);
successful = ServiceClientPollRead(serviceClient, response, responseLen, pollResponseReceived);
return successful;
}
static bool
ServiceClientPollRead(ServiceClient *serviceClient, void* response, int responseLen, bool *pollResponseReceived)
{
ServiceConfig *serviceConfig;
int n;
int saved_err;
char *message;
bool result = false;
DECLARE_SAVE_SUPPRESS_PANIC();
Assert(serviceClient != NULL);
serviceConfig = serviceClient->serviceConfig;
Assert(serviceConfig != NULL);
Assert(response != NULL);
PG_TRY();
{
SUPPRESS_PANIC();
/*
* Attempt to read the response
*/
while (true)
{
n = read(serviceClient->sockfd,
((char *)response),
responseLen);
saved_err = errno;
if (n == 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Connection to '%s' is closed",
serviceConfig->title)));
}
else if (n < 0)
{
if (saved_err == EWOULDBLOCK)
{
*pollResponseReceived = false;
break;
}
if (saved_err == EINTR)
continue;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Read error from '%s': %s",
serviceConfig->title,
strerror(saved_err))));
}
if (n != responseLen)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Expecting message length %d and actual read length was %d from '%s'",
responseLen, n,
serviceConfig->title)));
return false;
}
*pollResponseReceived = true;
break;
}
result = true;
RESTORE_PANIC();
}
PG_CATCH();
{
RESTORE_PANIC();
/* Report the error to the server log */
if (!elog_demote(WARNING))
{
elog(LOG,"unable to demote error");
PG_RE_THROW();
}
message = elog_message();
if (message != NULL && strlen(message) + 1 < sizeof(ClientErrorString))
strcpy(ClientErrorString, message);
else
strcpy(ClientErrorString, "");
EmitErrorReport();
FlushErrorState();
result = false;
}
PG_END_TRY();
return result;
}
#ifdef EXEC_BACKEND
/*
* ServiceForkExec()
*
* Format up the arglist for the service process, then fork and exec.
*/
pid_t
ServiceForkExec(ServiceConfig *serviceConfig)
{
char *av[10];
int ac = 0;
#define MAX_FORK_PARAMETER_LEN 25
char forkParameter[MAX_FORK_PARAMETER_LEN];
int len;
len = sprintf(forkParameter, "--fork%s",serviceConfig->forkTitle);
Assert(len > 0 && len < MAX_FORK_PARAMETER_LEN);
av[ac++] = "postgres";
av[ac++] = forkParameter;
av[ac++] = NULL; /* filled in by postmaster_forkexec */
av[ac] = NULL;
Assert(ac < lengthof(av));
return postmaster_forkexec(ac, av);
}
#endif /* EXEC_BACKEND */
/*
* ServiceQuickDie() occurs when signalled SIGQUIT by the postmaster.
*
* Some backend has bought the farm,
* so we need to stop what we're doing and exit.
*/
static void
ServiceQuickDie(SIGNAL_ARGS)
{
PG_SETMASK(&BlockSig);
ereport(LOG,
(errmsg("terminating immediately")));
/* DO ANY SPECIAL SERVICE QUICKDIE HANDLING HERE */
/*
* NOTE: see MPP-9518, MPP-9396, we need to make sure the atexit()
* hooks get cleaned up before calling exit(). quickdie() knows how
* to do that.
*/
quickdie(PASS_SIGNAL_ARGS);
/* not reached */
}
/*
* Shutdown signal from postmaster: abort transaction and exit
* at soonest convenient time
*/
static void
ServiceDie(SIGNAL_ARGS)
{
ereport(LOG,
(errmsg("terminating")));
die(postgres_signal_arg);
/* not reached */
}
static void
HandleCrash(SIGNAL_ARGS)
{
StandardHandlerForSigillSigsegvSigbus_OnMainThread("a service process", PASS_SIGNAL_ARGS);
}
/*
* Common service main.
*/
void
ServiceMain(ServiceCtrl *serviceCtrl)
{
ServiceConfig *serviceConfig;
sigjmp_buf local_sigjmp_buf;
Assert(serviceCtrl != NULL);
serviceConfig = (ServiceConfig*)serviceCtrl->serviceConfig;
Assert(serviceConfig != NULL);
IsUnderPostmaster = true;
/* reset MyProcPid */
MyProcPid = getpid();
/* Lose the postmaster's on-exit routines */
on_exit_reset();
/* Identify myself via ps */
init_ps_display(serviceConfig->psTitle, "", "", "");
if (serviceConfig->ServiceEarlyInit != NULL)
{
serviceConfig->ServiceEarlyInit();
}
else
{
SetProcessingMode(InitProcessing);
}
/*
* Set up signal handlers. We operate on databases much like a regular
* backend, so we use the same signal handling. See equivalent code in
* tcop/postgres.c.
*
* Currently, we don't pay attention to postgresql.conf changes that
* happen during a single daemon iteration, so we can ignore SIGHUP.
*/
pqsignal(SIGHUP, SIG_IGN);
/*
* Presently, SIGINT will lead to autovacuum shutdown, because that's how
* we handle ereport(ERROR). It could be improved however.
*/
pqsignal(SIGINT, StatementCancelHandler);
pqsignal(SIGTERM, ServiceDie);
pqsignal(SIGQUIT, ServiceQuickDie);
pqsignal(SIGALRM, handle_sig_alarm);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, serviceConfig->ServiceRequestShutdown);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGCHLD, SIG_DFL);
#ifdef SIGBUS
pqsignal(SIGBUS, HandleCrash);
#endif
#ifdef SIGILL
pqsignal(SIGILL, HandleCrash);
#endif
#ifdef SIGSEGV
pqsignal(SIGSEGV, HandleCrash);
#endif
/* Early initialization */
BaseInit();
if (serviceConfig->ServicePostgresInit != NULL)
{
serviceConfig->ServicePostgresInit();
}
SetProcessingMode(NormalProcessing);
/*
* If an exception is encountered, processing resumes here.
*
* See notes in postgres.c about the design of this coding.
*/
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
/* Prevents interrupts while cleaning up */
HOLD_INTERRUPTS();
/* Report the error to the server log */
EmitErrorReport();
/*
* We can now go away. Note that because we'll call InitProcess, a
* callback will be registered to do ProcKill, which will clean up
* necessary state.
*/
proc_exit(0);
}
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
PG_SETMASK(&UnBlockSig);
/* set up a listener port and put it in shmem*/
serviceCtrl->listenerPort = ServiceListenerSetup(serviceCtrl);
if (serviceConfig->ServiceInit != NULL)
{
serviceConfig->ServiceInit(serviceCtrl->listenerPort);
}
/* listen loop */
ServiceListenLoop(serviceCtrl);
proc_exit(0);
}
static void
ServiceListenLoop(ServiceCtrl *serviceCtrl)
{
ServiceConfig *serviceConfig = (ServiceConfig*)serviceCtrl->serviceConfig;
uint8 *inputBuff;
int n,
highsock = 0,
newsockfd;
mpp_fd_set rset,
rrset;
struct sockaddr_in addr;
socklen_t addrlen;
List *connectedSockets = NIL;
ListCell *cell;
Assert(TopMemoryContext != NULL);
MemoryContextSwitchTo(TopMemoryContext);
Assert(CurrentMemoryContext == TopMemoryContext);
/*
* Setup scratch buffer.
*/
inputBuff = palloc(serviceConfig->requestLen);
MPP_FD_ZERO(&rset);
MPP_FD_SET(serviceCtrl->listenerFd, &rset);
highsock = serviceCtrl->listenerFd + 1;
/* we'll handle many incoming sockets but keep the sockets in blocking
* mode since we are dealing with very small messages.
*/
while(true)
{
struct timeval shutdownTimeout = {1,0}; // 1 second.
// Use local variable since select modifies
// the timeout parameter with remaining time.
CHECK_FOR_INTERRUPTS();
if (serviceConfig->ServiceShutdownRequested())
{
if (serviceConfig->ServiceShutdown != NULL)
{
serviceConfig->ServiceShutdown();
}
break;
}
/* no need to live on if postmaster has died */
if (!PostmasterIsAlive(true))
{
if (serviceConfig->ServicePostmasterDied != NULL)
{
serviceConfig->ServicePostmasterDied();
}
else
{
ereport(LOG,
(errmsg("exiting because postmaster has died")));
proc_exit(1);
}
}
memcpy(&rrset, &rset, sizeof(mpp_fd_set));
n = select(highsock + 1, (fd_set *)&rrset, NULL, NULL, &shutdownTimeout);
if (n == 0 || (n < 0 && errno == EINTR))
{
/* intr or timeout: Have we been here too long ? */
continue;
}
if (n < 0)
{
/* this may be a little severe, but if we error on select()
* we'll just go ahead and blow up. This will result in the
* postmaster re-spawning a new process.
*/
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': error during select() call (error:%d).",
serviceConfig->title, errno)));
break;
}
/* is it someone tickling our listener port? */
if (MPP_FD_ISSET(serviceCtrl->listenerFd, &rrset))
{
addrlen = sizeof(addr);
if ((newsockfd = accept(serviceCtrl->listenerFd,
(struct sockaddr *) & addr,
&addrlen)) < 0)
{
/*
* TODO: would be nice to read the errno and try and provide
* more useful info as to why this happened.
*/
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': error from client connection: %s)",
serviceConfig->title,
strerror(errno))));
}
/* make socket non-blocking BEFORE we connect. */
if (!pg_set_noblock(newsockfd))
{
/*
* TODO: would be nice to read the errno and try and provide
* more useful info as to why this happened.
*/
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': could not set outbound socket to non-blocking mode: %s",
serviceConfig->title,
strerror(errno))));
}
if (newsockfd > highsock)
highsock = newsockfd + 1;
MPP_FD_SET(newsockfd, &rset);
/*
* Read connection message.
*/
// UNDONE: temporarily turn off new connection flag...
if( !ServiceProcessRequest(serviceCtrl, newsockfd, inputBuff, false))
{
/* close it down */
MPP_FD_CLR( newsockfd, &rset);
shutdown(newsockfd, SHUT_WR);
close(newsockfd);
}
else
{
connectedSockets = lappend_int(connectedSockets, newsockfd);
}
}
/* loop through all of our established sockets */
cell = list_head(connectedSockets);
while (cell != NULL)
{
int fd = lfirst_int(cell);
/* get the next cell ready before we delete */
cell = lnext(cell);
if (MPP_FD_ISSET(fd, &rrset))
{
if( !ServiceProcessRequest(serviceCtrl, fd, inputBuff, false))
{
/* close it down */
MPP_FD_CLR( fd, &rset);
connectedSockets = list_delete_int(connectedSockets, fd);
shutdown(fd, SHUT_WR);
close(fd);
}
}
}
}
ereport(LOG,
(errmsg("normal shutdown")));
proc_exit(0);
}
/*
* Used by ServiceListenLoop to process an incoming request.
*/
static bool
ServiceProcessRequest(ServiceCtrl *serviceCtrl, int sockfd, uint8 *inputBuff, bool newConnection)
{
ServiceConfig *serviceConfig = (ServiceConfig*)serviceCtrl->serviceConfig;
ServiceConnectionRequestMsg newConnectionRequestMsg;
int saved_err;
mpp_fd_set rset;
struct timeval rundownTimeout;
// Use local variable since select modifies
// the timeout parameter with remaining time.
int n;
int bytesRead = 0;
uint8 *request = NULL;
int reqlen;
bool successful;
ServiceGetClientTimeout(serviceConfig, &rundownTimeout);
if (newConnection)
{
request = (uint8*)&newConnectionRequestMsg;
reqlen = sizeof(newConnectionRequestMsg);
}
else
{
request = inputBuff;
reqlen = serviceConfig->requestLen;
}
/*
* Read in the incoming request message.
*/
while (bytesRead < reqlen)
{
CHECK_FOR_INTERRUPTS();
n = read(sockfd, request + bytesRead, reqlen - bytesRead);
saved_err = errno;
if (n == 0)
{
// elog(LOG, "'%s': client socket sockfd %d is closed",
// serviceConfig->title, sockfd);
return false;
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': read error from backend process: %s",
serviceConfig->title,
strerror(saved_err))));
return false;
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
CHECK_FOR_INTERRUPTS();
if (serviceConfig->ServiceShutdownRequested())
return false;
MPP_FD_ZERO(&rset);
MPP_FD_SET(sockfd, &rset);
n = select(sockfd + 1, (fd_set *)&rset, NULL, NULL, &rundownTimeout);
if (n == 0)
{
struct timeval wholeTimeout;
ServiceGetClientTimeout(serviceConfig, &wholeTimeout);
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': read from backend process timed out after %d.%03d seconds",
serviceConfig->title,
(int)wholeTimeout.tv_sec,
(int)wholeTimeout.tv_usec / 1000)));
return false;
}
if (n < 0 && errno == EINTR)
continue;
else if (n < 0)
{
saved_err = errno;
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': read error from backend process: %s",
serviceConfig->title,
strerror(saved_err))));
return false;
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
bytesRead += n;
}
if (newConnection)
successful = ServiceNewConnectionMsg(serviceCtrl, sockfd, &newConnectionRequestMsg);
else
successful = serviceConfig->ServiceRequest(serviceCtrl, sockfd, request);
return successful;
}
bool
ServiceProcessRespond(ServiceCtrl *serviceCtrl, int sockfd, uint8 *response, int responseLen)
{
ServiceConfig *serviceConfig = (ServiceConfig*)serviceCtrl->serviceConfig;
int n;
int saved_err;
mpp_fd_set wset;
int bytesWritten = 0;
struct timeval rundownTimeout;
// Use local variable since select modifies
// the timeout parameter with remaining time.
ServiceGetClientTimeout(serviceConfig, &rundownTimeout);
// elog(LOG,"ServiceProcessRespond called for sockfd %d, responseLen %d",
// sockfd, responseLen);
/*
* Write the response
*/
while (bytesWritten < responseLen )
{
CHECK_FOR_INTERRUPTS();
n = write(sockfd,
response + bytesWritten,
responseLen - bytesWritten);
saved_err = errno;
if (n == 0)
{
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': connection to backend process is closed",
serviceConfig->title)));
return false;
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': write error to backend process: %s",
serviceConfig->title,
strerror(saved_err))));
return false;
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&wset);
MPP_FD_SET(sockfd, &wset);
n = select(sockfd + 1, NULL, (fd_set *)&wset, NULL, &rundownTimeout);
if (n == 0)
{
struct timeval wholeTimeout;
ServiceGetClientTimeout(serviceConfig, &wholeTimeout);
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': write to backend process timed out after %d.%03d seconds)",
serviceConfig->title,
(int)wholeTimeout.tv_sec,
(int)wholeTimeout.tv_usec / 1000)));
return false;
}
if (n < 0 && errno == EINTR)
continue;
else if (n < 0)
{
saved_err = errno;
ereport(NOTICE, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("'%s': write error to backend process: %s)",
serviceConfig->title,
strerror(saved_err))));
return false;
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
else
bytesWritten += n;
}
return true;
}
static bool
ServiceNewConnectionMsg(ServiceCtrl *serviceCtrl, int sockfd, ServiceConnectionRequestMsg *newConnectionRequestMsg)
{
// ServiceConfig *serviceConfig = (ServiceConfig*)serviceCtrl->serviceConfig;
ServiceConnectionResponseMsg connectionResponseMsg;
bool successful;
// UNDONE: Check fields...
successful = ServiceProcessRespond(serviceCtrl, sockfd, (uint8*)&connectionResponseMsg, sizeof(connectionResponseMsg));
return successful;
}
/*
* Sets up a listener on a system-picked port. Returns the port that the
* system port.
*/
static int
ServiceListenerSetup(ServiceCtrl *serviceCtrl)
{
ServiceConfig *serviceConfig = (ServiceConfig*)serviceCtrl->serviceConfig;
int listenerPort;
struct sockaddr_storage addr;
socklen_t addrlen;
struct addrinfo hints;
struct addrinfo *addrs, *rp;
int s;
char service[32];
/*
* we let the system pick the TCP port here so we don't have to
* manage port resources ourselves.
*/
snprintf(service,32,"%d",0);
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* TCP socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
s = getaddrinfo(NULL, service, &hints, &addrs);
if (s != 0)
elog(ERROR,"getaddrinfo says %s",gai_strerror(s));
/*
* getaddrinfo() returns a list of address structures,
* one for each valid address and family we can use.
*
* Try each address until we successfully bind.
* If socket (or bind) fails, we (close the socket
* and) try the next address. This can happen if
* the system supports IPv6, but IPv6 is disabled from
* working, or if it supports IPv6 and IPv4 is disabled.
*/
/*
* If there is both an AF_INET6 and an AF_INET choice,
* we prefer the AF_INET6, because on UNIX it can receive either
* protocol, whereas AF_INET can only get IPv4. Otherwise we'd need
* to bind two sockets, one for each protocol.
*
* Why not just use AF_INET6 in the hints? That works perfect
* if we know this machine supports IPv6 and IPv6 is enabled,
* but we don't know that.
*/
#ifdef HAVE_IPV6
if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6)
{
/*
* We got both an INET and INET6 possibility, but we want to prefer the INET6 one if it works.
* Reverse the order we got from getaddrinfo so that we try things in our preferred order.
* If we got more possibilities (other AFs??), I don't think we care about them, so don't
* worry if the list is more that two, we just rearrange the first two.
*/
struct addrinfo *temp = addrs->ai_next; /* second node */
addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to third node if any */
temp->ai_next = addrs; /* point second node to first */
addrs = temp; /* start the list with the old second node */
}
#endif
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/*
* getaddrinfo gives us all the parameters for the socket() call
* as well as the parameters for the bind() call.
*/
serviceCtrl->listenerFd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (serviceCtrl->listenerFd == -1)
continue;
if (bind(serviceCtrl->listenerFd, rp->ai_addr, rp->ai_addrlen) == 0)
break; /* Success */
close(serviceCtrl->listenerFd);
}
if (rp == NULL)
{ /* No address succeeded */
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("SeqServer Error: Could not setup listener socket: %m"),
errdetail("error during bind() call (error:%d).", errno)));
}
if (listen(serviceCtrl->listenerFd, LISTEN_BACKLOG) < 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("SeqServer Error: Could not setup listener socket: %s", strerror(errno)),
errdetail("error during listen() call (error:%d).", errno)));
}
addrlen = sizeof(addr);
if (getsockname(serviceCtrl->listenerFd, (struct sockaddr *) & addr, &addrlen) < 0)
{
/*
* TODO: would be nice to read the errno and try and provide
* more useful info as to why this happened.
*/
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("'%s' could not setup listener socket: %s",
serviceConfig->title,
strerror(errno)),
errdetail("error during getsockname() call (error:%d).", errno)));
}
/* display which port was chosen by the system. */
if (addr.ss_family == AF_INET6)
listenerPort = ntohs(((struct sockaddr_in6*)&addr)->sin6_port);
else
listenerPort = ntohs(((struct sockaddr_in*)&addr)->sin_port);
elog(DEBUG5, "'%s' listener on port: %d", serviceConfig->title, listenerPort);
return listenerPort;
}