blob: ed7f078d7c0a1ade510d1584ae90c461b85080f5 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* seqserver.c
* Process under QD postmaster used for doling out sequence values to
* QEs.
*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <time.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
#include "miscadmin.h"
#include "libpq/pqsignal.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbselect.h"
#include "commands/sequence.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "postmaster/seqserver.h"
#include "catalog/catalog.h"
#include "catalog/pg_database.h"
#include "catalog/pg_tablespace.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h" /* PostmasterIsAlive */
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
#include "tcop/tcopprot.h"
#include "utils/ps_status.h"
#include "storage/backendid.h"
#include "utils/syscache.h"
#include "tcop/tcopprot.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
#endif
/*
* backlog for listen() call:
*/
#define LISTEN_BACKLOG 64
#define getInt16(pNetData) ntohs(*((int16 *)(pNetData)))
#define getInt32(pNetData) ntohl(*((int32 *)(pNetData)))
#define getOid(pNetData)
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
#ifdef EXEC_BACKEND
static pid_t seqserver_forkexec(void);
#endif
NON_EXEC_STATIC void SeqServerMain(int argc, char *argv[]);
static void SeqServerLoop(void);
static bool processSequenceRequest( int sockfd );
static int listenerSetup(void);
extern bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
/*=========================================================================
* GLOBAL STATE VARIABLES
*/
/* Socket file descriptor for the listener. */
static int listenerFd;
/* our timeout value for select() and other socket operations. */
static struct timeval tval;
static bool am_seqserver = false;
uint8 *inputBuff;
/*=========================================================================
* VISIBLE FUNCTIONS
*/
int SeqServerShmemSize(void)
{
/* just our one int4 */
return sizeof(SeqServerControlBlock);
}
void SeqServerShmemInit(void)
{
bool found;
/* Create or attach to the SharedSnapshot shared structure */
seqServerCtl = (SeqServerControlBlock *) ShmemInitStruct("Sequence Server", SeqServerShmemSize(), &found);
}
/*
* Main entry point for seqserver controller process.
*
* This code is heavily based on pgarch.c, q.v.
*/
int
seqserver_start(void)
{
pid_t SeqServerPID;
#ifdef EXEC_BACKEND
switch ((SeqServerPID = seqserver_forkexec()))
#else
switch ((SeqServerPID = fork_process()))
#endif
{
case -1:
ereport(LOG,
(errmsg("could not fork seqserver process: %m")));
return 0;
#ifndef EXEC_BACKEND
case 0:
/* in postmaster child ... */
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
SeqServerMain(0, NULL);
break;
#endif
default:
return (int) SeqServerPID;
}
/* shouldn't get here */
return 0;
}
/*
* Used by clients to sequence server to send a Sequence Request
*/
void
sendSequenceRequest(int sockfd,
Relation seqrel,
int session_id,
int64 *plast,
int64 *pcached,
int64 *pincrement,
bool *poverflow)
{
int n;
int bytesWritten = 0;
int bytesRead = 0;
int saved_err;
mpp_fd_set rset, wset;
Oid dbid = seqrel->rd_node.dbNode;
Oid tablespaceid = seqrel->rd_node.spcNode;
Oid seq_oid = seqrel->rd_id;
bool isTemp = seqrel->rd_istemp;
/* request message */
NextValRequest request;
int reqlen = sizeof(request);
/* response message */
NextValResponse response;
int resplen = sizeof(response);
if (sockfd == -1)
elog(ERROR, "Lost Connection to seqserver");
/*
* send the sequence request to the sequence server
*/
memset(&request, 0, sizeof(request));
request.startCookie = SEQ_SERVER_REQUEST_START;
request.dbid = htonl(dbid);
request.tablespaceid = htonl(tablespaceid);
request.seq_oid = htonl(seq_oid);
request.isTemp = htonl(isTemp);
request.session_id = htonl(session_id);
request.endCookie = SEQ_SERVER_REQUEST_END;
/*
* write the request
*/
while (bytesWritten < reqlen)
{
CHECK_FOR_INTERRUPTS();
n = write(sockfd, ((char *)&request)+bytesWritten, reqlen - bytesWritten);
saved_err = errno;
if (n == 0)
{
elog(ERROR, "Error: seqserver socket closed.");
return;
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Error: Could not write() message to seqserver: %s",
strerror(errno)),
errdetail("error during write() call (error:%d) to seqserver on sockfd: %d ",
errno, sockfd)));
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
struct timeval timeout = tval;
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&wset);
MPP_FD_SET(sockfd, &wset);
n = select(sockfd + 1, NULL, (fd_set *)&wset, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
continue;
else if (n < 0)
{
saved_err = errno;
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Error: Could not write() message to seqserver: %s",
strerror(errno)),
errdetail("error during write() call (error:%d) to seqserver on sockfd: %d ",
errno, sockfd)));
return;
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
bytesWritten += n;
}
/*
* read the response
*/
while (bytesRead < resplen)
{
CHECK_FOR_INTERRUPTS();
n = read(sockfd, ((char *)&response)+bytesRead, resplen - bytesRead);
saved_err = errno;
if (n == 0)
{
elog( ERROR, "seqserver socket is closed");
return;
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
elog( ERROR, "Error: Could not read() message from seqserver."
"(error:%d). Closing connection.", saved_err);
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
struct timeval timeout = tval;
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&rset);
MPP_FD_SET(sockfd, &rset);
n = select(sockfd + 1, (fd_set *)&rset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
continue;
else if (n < 0)
{
saved_err = errno;
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Error: Could not read() message from seqserver"),
errdetail("error during read() call (error:%d) from remote"
"sockfd = %d", errno, sockfd)));
return;
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
else
bytesRead += n;
}
if (response.poverflowed)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("nextval: reached %s value of sequence \"%s\" (" INT64_FORMAT ")",
response.pincrement>0 ? "maximum":"minimum",
RelationGetRelationName(seqrel), response.plast)));
}
*plast = response.plast;
*pcached = response.pcached;
*pincrement = response.pincrement;
*poverflow = response.poverflowed;
}
/*=========================================================================
* HELPER FUNCTIONS
*/
#ifdef EXEC_BACKEND
/*
* seqserver_forkexec()
*
* Format up the arglist for the serqserver process, then fork and exec.
*/
static pid_t
seqserver_forkexec(void)
{
char *av[10];
int ac = 0;
av[ac++] = "postgres";
av[ac++] = "--forkseqserver";
av[ac++] = NULL; /* filled in by postmaster_forkexec */
av[ac] = NULL;
Assert(ac < lengthof(av));
return postmaster_forkexec(ac, av);
}
#endif /* EXEC_BACKEND */
static bool shutdown_requested=false;
static void
RequestShutdown(SIGNAL_ARGS)
{
shutdown_requested = true;
}
static char *knownDatabase = "template1";
/*
* AutoVacMain
*/
NON_EXEC_STATIC void
SeqServerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
char *fullpath;
IsUnderPostmaster = true;
am_seqserver = true;
/* reset MyProcPid */
MyProcPid = getpid();
/* Lose the postmaster's on-exit routines */
on_exit_reset();
/* Identify myself via ps */
init_ps_display("seqserver process", "", "", "");
SetProcessingMode(InitProcessing);
/*
* Set up signal handlers. We operate on databases much like a regular
* backend, so we use the same signal handling. See equivalent code in
* tcop/postgres.c.
*
* Currently, we don't pay attention to postgresql.conf changes that
* happen during a single daemon iteration, so we can ignore SIGHUP.
*/
pqsignal(SIGHUP, SIG_IGN);
/*
* Presently, SIGINT will lead to autovacuum shutdown, because that's how
* we handle ereport(ERROR). It could be improved however.
*/
pqsignal(SIGINT, StatementCancelHandler);
pqsignal(SIGTERM, die);
pqsignal(SIGQUIT, quickdie); /* we don't do any seq-server specific cleanup, just use the standard. */
pqsignal(SIGALRM, handle_sig_alarm);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, RequestShutdown);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGCHLD, SIG_DFL);
/* Early initialization */
BaseInit();
/* See InitPostgres()... */
InitProcess();
InitBufferPoolBackend();
InitXLOGAccess();
SetProcessingMode(NormalProcessing);
/*
* If an exception is encountered, processing resumes here.
*
* See notes in postgres.c about the design of this coding.
*/
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
/* Prevents interrupts while cleaning up */
HOLD_INTERRUPTS();
/* Report the error to the server log */
EmitErrorReport();
/*
* We can now go away. Note that because we'll call InitProcess, a
* callback will be registered to do ProcKill, which will clean up
* necessary state.
*/
proc_exit(0);
}
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
PG_SETMASK(&UnBlockSig);
/* set up a listener port and put it in shmem */
seqServerCtl->seqServerPort = listenerSetup();
/*
* setup scratch buffers
*/
inputBuff = palloc(sizeof(NextValRequest));
/*
* The following additional initialization allows us to call the persistent meta-data modules.
*/
/*
* Create a resource owner to keep track of our resources (currently only
* buffer pins).
*/
CurrentResourceOwner = ResourceOwnerCreate(NULL, "SeqServer");
/*
* In order to access the catalog, we need a database, and a
* tablespace; our access to the heap is going to be slightly
* limited, so we'll just use some defaults.
*/
MyDatabaseId = TemplateDbOid;
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
if (!FindMyDatabase(knownDatabase, &MyDatabaseId, &MyDatabaseTableSpace))
ereport(FATAL, (errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database 'template1' does not exist")));
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
SetDatabasePath(fullpath);
/*
* Finish filling in the PGPROC struct, and add it to the ProcArray. (We
* need to know MyDatabaseId before we can do this, since it's entered
* into the PGPROC struct.)
*
* Once I have done this, I am visible to other backends!
*/
InitProcessPhase2();
/*
* Initialize my entry in the shared-invalidation manager's array of
* per-backend data.
*
* Sets up MyBackendId, a unique backend identifier.
*/
MyBackendId = InvalidBackendId;
SharedInvalBackendInit(false);
if (MyBackendId > MaxBackends || MyBackendId <= 0)
elog(FATAL, "bad backend id: %d", MyBackendId);
/*
* bufmgr needs another initialization call too
*/
InitBufferPoolBackend();
/* heap access requires the rel-cache */
RelationCacheInitialize();
InitCatalogCache();
/*
* It's now possible to do real access to the system catalogs.
*
* Load relcache entries for the system catalogs. This must create at
* least the minimum set of "nailed-in" cache entries.
*/
RelationCacheInitializePhase2(); /* main loop */
SeqServerLoop();
ResourceOwnerRelease(CurrentResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, true);
/* One iteration done, go away */
proc_exit(0);
}
static void
SeqServerLoop(void)
{
int n,
highsock = 0,
newsockfd;
mpp_fd_set rset,
rrset;
struct sockaddr_in addr;
socklen_t addrlen;
List *connectedSockets = NIL;
ListCell *cell;
tval.tv_sec = 3;
tval.tv_usec = 500000;
MPP_FD_ZERO(&rset);
MPP_FD_SET(listenerFd, &rset);
highsock = listenerFd + 1;
/* we'll handle many incoming sockets but keep the sockets in blocking
* mode since we are dealing with very small messages.
*/
while(true)
{
struct timeval timeout = tval;
CHECK_FOR_INTERRUPTS();
if (shutdown_requested)
break;
/* no need to live on if postmaster has died */
if (!PostmasterIsAlive(true))
exit(1);
memcpy(&rrset, &rset, sizeof(mpp_fd_set));
n = select(highsock + 1, (fd_set *)&rrset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
{
/* intr or timeout: Have we been here too long ? */
continue;
}
if (n < 0)
{
/* this may be a little severe, but if we error on select()
* we'll just go ahead and blow up. This will result in the
* postmaster re-spawning a new process.
*/
elog(ERROR,"Error during select() call (error:%d).", errno);
break;
}
elog(DEBUG5, "seqserver select() returned %d ready sockets", n);
/* is it someone tickling our listener port? */
if (MPP_FD_ISSET(listenerFd, &rrset))
{
elog(DEBUG3, "seqserver: accepting an Incoming Connection ");
addrlen = sizeof(addr);
if ((newsockfd = accept(listenerFd, (struct sockaddr *) & addr, &addrlen)) < 0)
{
/*
* TODO: would be nice to read the errno and try and provide
* more useful info as to why this happened.
*/
elog(LOG, "seqserver: error during accept() call (error:%d)", errno);
}
/* make socket non-blocking BEFORE we connect. */
if (!pg_set_noblock(newsockfd))
{
/*
* TODO: would be nice to read the errno and try and provide
* more useful info as to why this happened.
*/
elog(DEBUG3, "seqserver: Could not set outbound socket to non-blocking mode"
"error during fcntl() call (error:%d) for sockfd: %d",
errno, newsockfd);
}
if (newsockfd > highsock)
highsock = newsockfd + 1;
MPP_FD_SET(newsockfd, &rset);
connectedSockets = lappend_int(connectedSockets, newsockfd);
}
/* loop through all of our established sockets */
cell = list_head(connectedSockets);
while (cell != NULL)
{
int fd = lfirst_int(cell);
/* get the next cell ready before we delete */
cell = lnext(cell);
if (MPP_FD_ISSET(fd, &rrset))
{
if (!processSequenceRequest(fd))
{
/* close it down */
MPP_FD_CLR( fd, &rset);
connectedSockets = list_delete_int(connectedSockets, fd);
shutdown(fd, SHUT_WR);
close(fd);
}
}
}
} /* end server loop */
return;
}
/*
* Used by sequenceServer to process incoming Sequence Requests
*/
static bool
processSequenceRequest(int sockfd )
{
NextValRequest nextValRequest;
NextValResponse response;
int saved_err;
mpp_fd_set rset, wset;
int64 plast;
int64 pcached;
int64 pincrement;
bool poverflow = false;
int n;
int bytesRead = 0;
int bytesWritten = 0;
int reqlen = sizeof(NextValRequest);
int outputBuffLength = sizeof(NextValResponse);
bool checkCookie = true;
/* Read in the incoming request message */
while (bytesRead < reqlen)
{
CHECK_FOR_INTERRUPTS();
n = read(sockfd, inputBuff + bytesRead, reqlen - bytesRead);
saved_err = errno;
if (n == 0)
{
elog(DEBUG3, "client socket is closed");
return false;
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
elog(DEBUG3, "Error: Could not read() message from client."
"(error:%d). Closing connection.", saved_err);
return false;
}
if (saved_err == EWOULDBLOCK)
{
/* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
struct timeval timeout = tval;
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&rset);
MPP_FD_SET(sockfd, &rset);
n = select(sockfd + 1, (fd_set *)&rset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
continue;
else if (n < 0)
{
saved_err = errno;
elog(DEBUG3, "error reading incoming message from client. (errno: %d)",
saved_err);
return false;
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
bytesRead += n;
/*
* MPP-10189: validate input, if it looks like we got
* something invalid, either slam the connection shut, or
* should we look for a valid embedded message ?
*/
if (checkCookie && bytesRead > sizeof(uint32_t))
{
uint32_t c;
memcpy(&c, inputBuff, sizeof(uint32_t));
if (c != SEQ_SERVER_REQUEST_START)
{
elog(LOG, "bogus start cookie.");
return false; /* this will close the connection */
}
checkCookie = false;
}
}
/*
* MPP-10189: Verify end-cookie
*/
{
uint32_t c;
memcpy(&c, inputBuff + bytesRead - sizeof(uint32_t), sizeof(uint32_t));
if (c != SEQ_SERVER_REQUEST_END)
{
elog(LOG, "bogus end cookie.");
return false; /* this will close the connection */
}
}
/*
* process what we've read
*/
memcpy(&nextValRequest, inputBuff, sizeof(nextValRequest));
nextValRequest.dbid = ntohl(nextValRequest.dbid);
nextValRequest.tablespaceid = ntohl(nextValRequest.tablespaceid);
nextValRequest.seq_oid = ntohl(nextValRequest.seq_oid);
nextValRequest.isTemp = ntohl(nextValRequest.isTemp);
nextValRequest.session_id = ntohl(nextValRequest.session_id);
elog(DEBUG5, "Received nextval request for dbid: %ld tablespaceid: %ld seqoid: "
"%ld isTemp: %s session_id: %ld",
(long int)nextValRequest.dbid,
(long int)nextValRequest.tablespaceid,
(long int)nextValRequest.seq_oid,
nextValRequest.isTemp ? "true" : "false",
(long)nextValRequest.session_id);
/*
* Process request.
*
* MPP-10189: May throw an error, we need to catch it -- just slam
* our connection shut (caller will do that if we return false).
*/
PG_TRY();
{
cdb_sequence_nextval_server(nextValRequest.tablespaceid,
nextValRequest.dbid,
nextValRequest.seq_oid,
nextValRequest.isTemp,
&plast,
&pcached,
&pincrement,
&poverflow);
}
PG_CATCH();
{
if (!elog_demote(LOG))
{
elog(LOG, "unable to demote error");
PG_RE_THROW();
}
return false;
}
PG_END_TRY();
/*
* Create the response
*/
response.plast = plast;
response.pcached = pcached;
response.pincrement = pincrement;
response.poverflowed = poverflow;
/*
* Write the response
*/
while (bytesWritten < outputBuffLength )
{
CHECK_FOR_INTERRUPTS();
n = write(sockfd, ((uint8 *)&response)+bytesWritten, outputBuffLength - bytesWritten);
saved_err = errno;
if (n == 0)
{
elog(DEBUG3, "client socket is closed");
return false;
}
if (n < 0)
{
if (saved_err != EINTR && saved_err != EWOULDBLOCK)
{
elog(DEBUG3, "Error: Could not write() message from client."
"(error:%d). Closing connection.", sockfd);
return false;
}
if (saved_err == EWOULDBLOCK)
{
/*
* we shouldn't really get here since we are dealing with
* small messages, but once we've read a bit of data we
* need to finish out reading till we get the message (or error)
*/
do
{
struct timeval timeout = tval;
CHECK_FOR_INTERRUPTS();
MPP_FD_ZERO(&wset);
MPP_FD_SET(sockfd, &wset);
n = select(sockfd + 1, NULL, (fd_set *)&wset, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
continue;
else if (n < 0)
{
saved_err = errno;
elog(DEBUG3, "error writing incoming message from client. (errno: %d)", saved_err);
return false;
}
}
while (n < 1);
}
/* else saved_err == EINTR */
continue;
}
else
bytesWritten += n;
}
return true;
}
/* listenerSetup
*
* Sets up a listener on a system-picked port. Returns the port that the
* system port.
*/
static int
listenerSetup(void)
{
int listenerPort;
struct sockaddr_storage addr;
socklen_t addrlen;
struct addrinfo hints;
struct addrinfo *addrs, *rp;
int s;
char service[32];
/*
* we let the system pick the TCP port here so we don't have to
* manage port resources ourselves.
*/
snprintf(service,32,"%d",0);
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* TCP socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
s = getaddrinfo(NULL, service, &hints, &addrs);
if (s != 0)
elog(ERROR,"getaddrinfo says %s",gai_strerror(s));
/*
* getaddrinfo() returns a list of address structures,
* one for each valid address and family we can use.
*
* Try each address until we successfully bind.
* If socket (or bind) fails, we (close the socket
* and) try the next address. This can happen if
* the system supports IPv6, but IPv6 is disabled from
* working, or if it supports IPv6 and IPv4 is disabled.
*/
/*
* If there is both an AF_INET6 and an AF_INET choice,
* we prefer the AF_INET6, because on UNIX it can receive either
* protocol, whereas AF_INET can only get IPv4. Otherwise we'd need
* to bind two sockets, one for each protocol.
*
* Why not just use AF_INET6 in the hints? That works perfect
* if we know this machine supports IPv6 and IPv6 is enabled,
* but we don't know that.
*/
#ifdef HAVE_IPV6
if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6)
{
/*
* We got both an INET and INET6 possibility, but we want to prefer the INET6 one if it works.
* Reverse the order we got from getaddrinfo so that we try things in our preferred order.
* If we got more possibilities (other AFs??), I don't think we care about them, so don't
* worry if the list is more that two, we just rearrange the first two.
*/
struct addrinfo *temp = addrs->ai_next; /* second node */
addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to third node if any */
temp->ai_next = addrs; /* point second node to first */
addrs = temp; /* start the list with the old second node */
}
#endif
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/*
* getaddrinfo gives us all the parameters for the socket() call
* as well as the parameters for the bind() call.
*/
elog(DEBUG1,"seqserver listener socket %d %d %d",rp->ai_family, rp->ai_socktype, rp->ai_protocol);
listenerFd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (listenerFd == -1)
continue;
if (bind(listenerFd, rp->ai_addr, rp->ai_addrlen) == 0)
break; /* Success */
close(listenerFd);
}
if (rp == NULL)
{ /* No address succeeded */
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("SeqServer Error: Could not setup listener socket: %m"),
errdetail("error during bind() call (error:%d).", errno)));
}
if (listen(listenerFd, LISTEN_BACKLOG) < 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("SeqServer Error: Could not setup listener socket: %s", strerror(errno)),
errdetail("error during listen() call (error:%d).", errno)));
}
addrlen = sizeof(addr);
if (getsockname(listenerFd, (struct sockaddr *) & addr, &addrlen) < 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("SeqServer Error: Could not setup listener socket: %s", strerror(errno)),
errdetail("error during getsockname() call (error:%d).", errno)));
}
/* display which port was chosen by the system. */
if (addr.ss_family == AF_INET6)
listenerPort = ntohs(((struct sockaddr_in6*)&addr)->sin6_port);
else
listenerPort = ntohs(((struct sockaddr_in*)&addr)->sin_port);
elog(DEBUG1, "SeqServer listener on port: %d", listenerPort);
return listenerPort;
}