blob: 28592c74fe9c3549c6ef8684450e3fec40ba6bf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
* ic_tcp.c
* Interconnect code specific to TCP transport.
*
* Reviewers: jzhang, tkordas
*-------------------------------------------------------------------------
*/
#ifdef WIN32
/*
* Need this to get WSAPoll (poll). And it
* has to be set before any header from the Win32 API is loaded.
*/
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include "postgres.h"
#include <pthread.h>
#include "nodes/execnodes.h" /* Slice, SliceTable */
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "utils/memutils.h"
#include "utils/hsearch.h"
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "libpq/ip.h"
#include "utils/builtins.h"
#include "utils/debugbreak.h"
#include "cdb/cdbselect.h"
#include "cdb/tupchunklist.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include <fcntl.h>
#include <limits.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
/*
* backlog for listen() call: it is important that this be something like a
* good match for the maximum number of QEs. Slow insert performance will
* result if it is too low.
*/
#define MAX_BIND_RETRIES 12
#define CONNECT_RETRY_MS 4000
#define CONNECT_AGGRESSIVERETRY_MS 500
/* listener backlog is calculated at listener-creation time */
int listenerBacklog = 128;
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#include "port.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
/* If we have old platform sdk headers, WSAPoll() might not be there */
#ifndef POLLIN
/* Event flag definitions for WSAPoll(). */
#define POLLRDNORM 0x0100
#define POLLRDBAND 0x0200
#define POLLIN (POLLRDNORM | POLLRDBAND)
#define POLLPRI 0x0400
#define POLLWRNORM 0x0010
#define POLLOUT (POLLWRNORM)
#define POLLWRBAND 0x0020
#define POLLERR 0x0001
#define POLLHUP 0x0002
#define POLLNVAL 0x0004
typedef struct pollfd {
SOCKET fd;
SHORT events;
SHORT revents;
} WSAPOLLFD, *PWSAPOLLFD, FAR *LPWSAPOLLFD;
__control_entrypoint(DllExport)
WINSOCK_API_LINKAGE
int
WSAAPI
WSAPoll(
IN OUT LPWSAPOLLFD fdArray,
IN ULONG fds,
IN INT timeout
);
#endif
#define poll WSAPoll
/*
* Postgres normally uses it's own custom select implementation
* on Windows, but they haven't implemented execeptfds, which
* we use here. So, undef this to use the normal Winsock version
* for now
*/
#undef select
#endif
/* our timeout value for select() and other socket operations. */
static struct timeval tval;
static inline MotionConn *
getMotionConn(ChunkTransportStateEntry *pEntry, int iConn)
{
Assert(pEntry);
Assert(pEntry->conns);
Assert(iConn < pEntry->numConns + pEntry->numPrimaryConns);
return pEntry->conns + iConn;
}
static ChunkTransportStateEntry *startOutgoingConnections(ChunkTransportState *transportStates,
Slice *sendSlice,
int *pOutgoingCount);
static void format_fd_set(StringInfo buf, int nfds, mpp_fd_set fds, char* pfx, char *sfx);
static char *format_sockaddr(struct sockaddr *sa, char* buf, int bufsize);
static void setupOutgoingConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn);
static void updateOutgoingConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, int errnoSave);
static void sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn * conn);
static bool readRegisterMessage(ChunkTransportState *transportStates,
MotionConn *conn);
static MotionConn *acceptIncomingConnection(void);
static void flushInterconnectListenerBacklog(void);
static void waitOnOutbound(ChunkTransportStateEntry *pEntry);
static TupleChunkListItem RecvTupleChunkFromAnyTCP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute);
static TupleChunkListItem RecvTupleChunkFromTCP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute);
static void SendEosTCP(MotionLayerState *mlStates, ChunkTransportState *transportStates,
int motNodeID, TupleChunkListItem tcItem);
static bool SendChunkTCP(MotionLayerState *mlStates, ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn * conn, TupleChunkListItem tcItem, int16 motionId);
static bool flushBuffer(MotionLayerState *mlStates, ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, int16 motionId);
static void doSendStopMessageTCP(ChunkTransportState *transportStates, int16 motNodeID);
/*
* setupTCPListeningSocket
*/
static void
setupTCPListeningSocket(int backlog, int *listenerSocketFd, uint16 *listenerPort)
{
int errnoSave;
int fd = -1;
const char *fun;
*listenerSocketFd = -1;
*listenerPort = 0;
struct sockaddr_storage addr;
socklen_t addrlen;
struct addrinfo hints;
struct addrinfo *addrs, *rp;
int s;
char service[32];
char myname[128];
char *localname = NULL;
/*
* we let the system pick the TCP port here so we don't have to
* manage port resources ourselves. So set the port to 0 (any port)
*/
snprintf(service,32,"%d",0);
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* TCP socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
/* We use INADDR_ANY if we don't have a valid address for
* ourselves (e.g. QD local connections tend to be AF_UNIX, or
* on 127.0.0.1 -- so bind everything) */
if (Gp_role == GP_ROLE_DISPATCH ||
(MyProcPort->laddr.addr.ss_family != AF_INET &&
MyProcPort->laddr.addr.ss_family != AF_INET6))
localname = NULL; /* We will listen on all network adapters */
else
{
/*
* Restrict what IP address we will listen on to just the one
* that was used to create this QE session.
*/
getnameinfo((const struct sockaddr *)&(MyProcPort->laddr.addr), MyProcPort->laddr.salen,
myname, sizeof(myname),
NULL, 0, NI_NUMERICHOST);
hints.ai_flags |= AI_NUMERICHOST;
localname = myname;
elog(DEBUG2, "binding to %s only",localname);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG4, (errmsg("binding listener %s", localname)));
}
s = getaddrinfo(localname, service, &hints, &addrs);
if (s != 0)
elog(ERROR,"getaddrinfo says %s",gai_strerror(s));
/*
* getaddrinfo() returns a list of address structures,
* one for each valid address and family we can use.
*
* Try each address until we successfully bind.
* If socket (or bind) fails, we (close the socket
* and) try the next address. This can happen if
* the system supports IPv6, but IPv6 is disabled from
* working, or if it supports IPv6 and IPv4 is disabled.
*/
/*
* If there is both an AF_INET6 and an AF_INET choice,
* we prefer the AF_INET6, because on UNIX it can receive either
* protocol, whereas AF_INET can only get IPv4. Otherwise we'd need
* to bind two sockets, one for each protocol.
*
* Why not just use AF_INET6 in the hints? That works perfect
* if we know this machine supports IPv6 and IPv6 is enabled,
* but we don't know that.
*/
#ifdef HAVE_IPV6
if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6)
{
/*
* We got both an INET and INET6 possibility, but we want to prefer the INET6 one if it works.
* Reverse the order we got from getaddrinfo so that we try things in our preferred order.
* If we got more possibilities (other AFs??), I don't think we care about them, so don't
* worry if the list is more that two, we just rearrange the first two.
*/
struct addrinfo *temp = addrs->ai_next; /* second node */
addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to third node if any */
temp->ai_next = addrs; /* point second node to first */
addrs = temp; /* start the list with the old second node */
}
#endif
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/*
* getaddrinfo gives us all the parameters for the socket() call
* as well as the parameters for the bind() call.
*/
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd == -1)
continue;
/*
* we let the system pick the TCP port here so we don't have to
* manage port resources ourselves.
*/
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0)
break; /* Success */
close(fd);
fd = -1;
}
fun = "bind";
if (fd == -1)
goto error;
/* Make socket non-blocking. */
fun = "fcntl(O_NONBLOCK)";
if (!pg_set_noblock(fd))
goto error;
fun = "listen";
if (listen(fd, backlog) < 0)
goto error;
/* Get the listening socket's port number. */
fun = "getsockname";
addrlen = sizeof(addr);
if (getsockname(fd, (struct sockaddr *) &addr, &addrlen) < 0)
goto error;
/* Give results to caller. */
*listenerSocketFd = fd;
/* display which port was chosen by the system. */
if (addr.ss_family == AF_INET6)
*listenerPort = ntohs(((struct sockaddr_in6*)&addr)->sin6_port);
else
*listenerPort = ntohs(((struct sockaddr_in*)&addr)->sin_port);
return;
error:
errnoSave = errno;
if (fd >= 0)
closesocket(fd);
errno = errnoSave;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not set up tcp listener socket."),
errdetail("%m%s", fun)));
} /* setupListeningSocket */
/*
* Initialize TCP specific comms.
*/
void
InitMotionTCP(int *listenerSocketFd, uint16 *listenerPort)
{
tval.tv_sec = 0;
tval.tv_usec = 500000;
#ifdef pg_on_solaris
listenerBacklog = Min(1024, listenerBacklog);
#endif
setupTCPListeningSocket(listenerBacklog, listenerSocketFd, listenerPort);
return;
}
/* cleanup any TCP-specific comms info */
void
CleanupMotionTCP(void)
{
/* nothing to do. */
return;
}
/* Function readPacket() is used to read in the next packet from the given
* MotionConn.
*
* This call blocks until the packet is read in, and is part of a
* global scheme where senders block until the entire message is sent, and
* receivers block until the entire message is read. Both use non-blocking
* socket calls so that we can handle any PG interrupts.
*
* Note, that for speed we want to read a message all in one go,
* header and all. A consequence is that we may read in part of the
* next message, which we've got to keep track of ... recvBytes holds
* the byte-count of the unprocessed messages.
*
* PARAMETERS
* conn - MotionConn to read the packet from.
*
*/
//static inline void
void
readPacket(MotionConn *conn, bool inTeardown)
{
int n,
bytesRead = conn->recvBytes;
bool gotHeader = false,
gotPacket = false;
mpp_fd_set rset;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: (fd %d) (max %d) outstanding bytes %d", conn->sockfd, Gp_max_packet_size, conn->recvBytes);
#endif
/* do we have a complete message waiting to be processed ? */
if (conn->recvBytes >= PACKET_HEADER_SIZE)
{
memcpy(&conn->msgSize, conn->msgPos, sizeof(uint32));
gotHeader = true;
if (conn->recvBytes >= conn->msgSize)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: returning previously read data (%d)", conn->recvBytes);
#endif
return;
}
}
/*
* partial message waiting in recv buffer! Move to head of buffer:
* eliminate the slack (which will always be at the beginning) in
* the buffer
*/
if (conn->recvBytes != 0)
memcpy(conn->pBuff, conn->msgPos, conn->recvBytes);
conn->msgPos = conn->pBuff;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: %s on previous call msgSize %d", gotHeader ? "got header" : "no header", conn->msgSize);
#endif
while (!gotPacket && bytesRead < Gp_max_packet_size)
{
/* see if user canceled and stuff like that */
ML_CHECK_FOR_INTERRUPTS(inTeardown);
/*
* we read at the end of the buffer, we've eliminated any
* slack above
*/
if ((n = recv(conn->sockfd, conn->pBuff + bytesRead,
Gp_max_packet_size - bytesRead, 0)) < 0)
{
if (errno == EINTR)
continue;
if (errno == EWOULDBLOCK)
{
do
{
struct timeval timeout = tval;
/* see if user canceled and stuff like that */
ML_CHECK_FOR_INTERRUPTS(inTeardown);
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
n = select(conn->sockfd + 1, (fd_set *)&rset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
continue;
else if (n < 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error reading an incoming packet."),
errdetail("%m%s from seg%d at %s",
"select",
conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
while (n < 1);
}
else
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error reading an incoming packet."),
errdetail("%m%s from seg%d at %s",
"read",
conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
else if (n == 0)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket(); breaking in while (fd %d) recvBytes %d msgSize %d", conn->sockfd, conn->recvBytes, conn->msgSize);
print_connection(transportStates, conn->sockfd, "interconnect error on");
#endif
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error: connection closed prematurely."),
errdetail("from Remote Connection: contentId=%d at %s",
conn->remoteContentId, conn->remoteHostAndPort)));
break;
}
else
{
bytesRead += n;
if (!gotHeader && bytesRead >= PACKET_HEADER_SIZE)
{
/* got the header */
memcpy(&conn->msgSize, conn->msgPos, sizeof(uint32));
gotHeader = true;
}
conn->recvBytes = bytesRead;
if (gotHeader && bytesRead >= conn->msgSize)
gotPacket = true;
}
}
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "readpacket: got %d bytes", conn->recvBytes);
#endif
}
static void
flushIncomingData(int fd)
{
static char trash[8192];
int bytes;
/*
* If we're in TeardownInterconnect, we should only have to call recv() a
* couple of times to empty out our socket buffers
*/
do
{
bytes = recv(fd, trash, sizeof(trash), 0);
} while (bytes > 0);
}
/* Function startOutgoingConnections() is used to initially kick-off any outgoing
* connections for mySlice.
*
* This should not be called for root slices (i.e. QD ones) since they don't
* ever have outgoing connections.
*
* PARAMETERS
*
* sendSlice - Slice that this process is member of.
* pIncIdx - index in the parent slice list of myslice.
*
* RETURNS
* Initialized ChunkTransportState for the Sending Motion Node Id.
*/
ChunkTransportStateEntry *
startOutgoingConnections(ChunkTransportState *transportStates,
Slice *sendSlice,
int *pOutgoingCount)
{
ChunkTransportStateEntry *pEntry;
MotionConn *conn;
ListCell *cell;
Slice *recvSlice;
CdbProcess *cdbProc;
*pOutgoingCount = 0;
recvSlice = (Slice *) list_nth(transportStates->sliceTable->slices, sendSlice->parentIndex);
adjustMasterRouting(recvSlice);
if (gp_interconnect_aggressive_retry)
{
if ((list_length(recvSlice->children) * sendSlice->numGangMembersToBeActive) > listenerBacklog)
transportStates->aggressiveRetry = true;
}
else
transportStates->aggressiveRetry = false;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "Interconnect seg%d slice%d setting up sending motion node (aggressive retry is %s)",
GetQEIndex(), sendSlice->sliceIndex,
(transportStates->aggressiveRetry ? "active" : "inactive"));
pEntry = createChunkTransportState(transportStates,
sendSlice,
recvSlice,
list_length(recvSlice->primaryProcesses));
/*
* Setup a MotionConn entry for each of our outbound connections.
* Request a connection to each receiving backend's listening port.
* NB: Some mirrors could be down & have no CdbProcess entry.
*/
conn = pEntry->conns;
foreach(cell, recvSlice->primaryProcesses)
{
cdbProc = (CdbProcess *)lfirst(cell);
if (cdbProc)
{
conn->cdbProc = cdbProc;
conn->pBuff = palloc(Gp_max_packet_size);
conn->state = mcsSetupOutgoingConnection;
(*pOutgoingCount)++;
}
conn++;
}
return pEntry;
} /* startOutgoingConnections */
/*
* setupOutgoingConnection
*
* Called by SetupInterconnect when conn->state == mcsSetupOutgoingConnection.
*
* On return, state is:
* mcsSetupOutgoingConnection if failed and caller should retry.
* mcsConnecting if non-blocking connect() is pending. Caller should
* send registration message when socket becomes write-ready.
* mcsSendRegMsg or mcsStarted if connect() completed successfully.
*/
void
setupOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
CdbProcess *cdbProc = conn->cdbProc;
int sockopt;
int n;
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL;
struct addrinfo hint;
Assert(conn->cdbProc);
Assert(conn->state == mcsSetupOutgoingConnection);
conn->wakeup_ms = 0;
conn->remoteContentId = cdbProc->contentid;
/*
* record the destination IP addr and port for error messages.
* Since the IP addr might be IPv6, it might have ':' embedded, so in
* that case, put '[]' around it so we can see that the string is an
* IP and port (otherwise it might look just like an IP).
*
* Should we really put this info into these fields? Later we replace this with
* the source IP and port (after the bind call).
*/
if (strchr(cdbProc->listenerAddr,':')!=0)
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"[%s]:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
else
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"%s:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
/* Might be retrying due to connection failure etc. Close old socket. */
if (conn->sockfd >= 0)
{
closesocket(conn->sockfd);
conn->sockfd = -1;
}
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
snprintf(portNumberStr, sizeof(portNumberStr), "%d", cdbProc->listenerPort);
service = portNumberStr;
ret = pg_getaddrinfo_all(cdbProc->listenerAddr, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(ERROR,
(errmsg("could not translate host addr \"%s\", port \"%d\" to address: %s",
cdbProc->listenerAddr, cdbProc->listenerPort, gai_strerror(ret))));
return;
}
/* Since we aren't using name resolution, getaddrinfo will return only 1 entry */
/*
* Create a socket. getaddrinfo() returns the parameters needed by socket()
*/
conn->sockfd = socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol);
if (conn->sockfd < 0)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error setting up outgoing "
"connection."),
errdetail("%m%s", "socket")));
/* make socket non-blocking BEFORE we connect. */
if (!pg_set_noblock(conn->sockfd))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error setting up outgoing "
"connection."),
errdetail("%m%s", "fcntl(O_NONBLOCK)")));
}
/* Allow bind() to succeed even if the port is in TIME_WAIT state. */
sockopt = 1;
if (setsockopt(conn->sockfd, SOL_SOCKET, SO_REUSEADDR,
(void *)&sockopt, sizeof(sockopt)) < 0)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error setting up outgoing "
"connection."),
errdetail("%m %s", "setsockopt(SO_REUSEADDR)")));
/*
* To help with fault tolerance, try to use the same LAN adapter as was used
* to connect the session to us.
*
* Bind that IP address to the socket. But, we must insure that the source
* address is the same address family as the destination address.
*
* The is especially important to check on the QD, since local
* sessions are often AF_UNIX, and even if TCP, they could be either v6 or v4.
*
*/
struct sockaddr_storage saddr;
int saddr_len = addrs->ai_addrlen;
memset(&saddr, 0, sizeof(saddr));
if (MyProcPort->laddr.addr.ss_family == addrs->ai_family)
{
/* We need to copy the address so we can clear the port */
memcpy(&saddr, &MyProcPort->laddr.addr, MyProcPort->laddr.salen);
if (saddr.ss_family == AF_INET)
((struct sockaddr_in *)&saddr)->sin_port = 0; /* pick any available outbound port */
else if (saddr.ss_family == AF_INET6)
((struct sockaddr_in6 *)&saddr)->sin6_port = 0;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
char debugmsg[128];
inet_ntop(saddr.ss_family, (struct sockaddr *)&saddr, debugmsg, sizeof(debugmsg));
ereport(DEBUG4, (errmsg("bind outbound %s", debugmsg)));
}
}
else
{
/*
* bind to the "any ip address" "any port". Not really necessary to use bind()
* but this allows us to get the system assigned source IP and port without
* waiting until after the connect() call.
*/
saddr.ss_family = addrs->ai_family;
/* We could set the address to INADDR_ANY or INADDR_ANY6 but those are just zeros anyway */
}
if (bind(conn->sockfd, (struct sockaddr *)&saddr, saddr_len) < 0)
{
char debugmsg[128];
inet_ntop(saddr.ss_family, (struct sockaddr *)&saddr, debugmsg, sizeof(debugmsg));
/* Should never get EADDRINUSE because we know it's our port. */
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error setting up outgoing "
"connection."),
errdetail("Could not bind to local addr %s. %m",
debugmsg)));
}
/*
* Get the source IP address and Port
*/
format_sockaddr((struct sockaddr *) &saddr, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("Interconnect connecting to seg%d slice%d %s "
"pid=%d sockfd=%d",
conn->remoteContentId,
pEntry->recvSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
conn->sockfd)));
/*
* Initiate the connection.
*/
for (;;)
{ /* connect() EINTR retry loop */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
n = connect(conn->sockfd, addrs->ai_addr, addrs->ai_addrlen);
/* Non-blocking socket never connects immediately, but check anyway. */
if (n == 0)
{
sendRegisterMessage(transportStates, pEntry, conn);
pg_freeaddrinfo_all(hint.ai_family, addrs);
return;
}
/* Retry if a signal was received. */
if (errno == EINTR)
continue;
/* Normal case: select() will tell us when connection is made. */
if (errno == EINPROGRESS ||
errno == EWOULDBLOCK)
{
conn->state = mcsConnecting;
pg_freeaddrinfo_all(hint.ai_family, addrs);
return;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
/* connect() failed. Log the error. Caller should retry. */
updateOutgoingConnection(transportStates, pEntry, conn, errno);
return;
} /* connect() EINTR retry loop */
} /* setupOutgoingConnection */
/*
* updateOutgoingConnection
*
* Called when connect() succeeds or fails.
*/
void
updateOutgoingConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, int errnoSave)
{
socklen_t sizeoferrno = sizeof(errnoSave);
/* Get errno value indicating success or failure. */
if (errnoSave == -1 &&
getsockopt(conn->sockfd, SOL_SOCKET, SO_ERROR,
(void *)&errnoSave, &sizeoferrno))
{
/* getsockopt failed */
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect could not connect to seg%d %s",
conn->remoteContentId,
conn->remoteHostAndPort),
errdetail("%m%s sockfd=%d",
"getsockopt(SO_ERROR)",
conn->sockfd)));
}
switch (errnoSave)
{
/* Success! Advance to next state. */
case 0:
sendRegisterMessage(transportStates, pEntry, conn);
return;
default:
errno = errnoSave;
ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect could not connect to seg%d %s "
"pid=%d; "
"will retry. %m%s",
conn->remoteContentId,
conn->remoteHostAndPort,
conn->cdbProc->pid,
"connect")));
break;
}
/* Tell caller to close the socket and try again. */
conn->state = mcsSetupOutgoingConnection;
} /* updateOutgoingConnection */
/* Function sendRegisterMessage() used to send a Register message to the
* remote destination on the other end of the provided conn.
*
* PARAMETERS
*
* pEntry - ChunkTransportState.
* conn - MotionConn to send message out on.
*
* Called by SetupInterconnect when conn->state == mcsSetupOutgoingConnection.
*
* On return, state is:
* mcsSendRegMsg if registration message has not been completely sent.
* Caller should retry when socket becomes write-ready.
* mcsStarted if registration message has been sent. Caller can start
* sending data.
*/
static void
sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEntry * pEntry, MotionConn * conn)
{
int bytesToSend;
int bytesSent;
if (conn->state != mcsSendRegMsg)
{
RegisterMessage *regMsg = (RegisterMessage *)conn->pBuff;
struct sockaddr_storage localAddr;
socklen_t addrsize;
Assert(conn->cdbProc &&
conn->pBuff &&
sizeof(*regMsg) <= Gp_max_packet_size);
Assert(pEntry->recvSlice &&
pEntry->sendSlice &&
IsA(pEntry->recvSlice, Slice) &&
IsA(pEntry->sendSlice, Slice));
/* Save local host and port for log messages. */
addrsize = sizeof(localAddr);
if (getsockname(conn->sockfd, (struct sockaddr *)&localAddr, &addrsize))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error after making connection."),
errdetail("%m%s sockfd=%d remote=%s",
"getsockname",
conn->sockfd,
conn->remoteHostAndPort)));
}
format_sockaddr((struct sockaddr *)&localAddr, conn->localHostAndPort,
sizeof(conn->localHostAndPort));
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
ereport(LOG, (errmsg("Interconnect sending registration message "
"to seg%d slice%d %s pid=%d "
"from seg%d slice%d %s sockfd=%d",
conn->remoteContentId,
pEntry->recvSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
GetQEIndex(),
pEntry->sendSlice->sliceIndex,
conn->localHostAndPort,
conn->sockfd)));
regMsg->msgBytes = sizeof(*regMsg);
regMsg->recvSliceIndex = pEntry->recvSlice->sliceIndex;
regMsg->sendSliceIndex = pEntry->sendSlice->sliceIndex;
regMsg->srcContentId = GetQEIndex();
regMsg->srcListenerPort = Gp_listener_port&0x0ffff;
regMsg->srcPid = MyProcPid;
regMsg->srcSessionId = gp_session_id;
regMsg->srcCommandCount = gp_interconnect_id;
conn->state = mcsSendRegMsg;
conn->msgPos = conn->pBuff;
conn->msgSize = sizeof(*regMsg);
}
/* Send as much as we can. */
for (;;)
{
bytesToSend = conn->pBuff + conn->msgSize - conn->msgPos;
bytesSent = send(conn->sockfd, conn->msgPos, bytesToSend, 0);
if (bytesSent == bytesToSend)
break;
else if (bytesSent >= 0)
conn->msgPos += bytesSent;
else if (errno == EWOULDBLOCK)
return; /* call me again to send the rest */
else if (errno == EINTR)
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
else
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error writing registration "
"message to seg%d at %s",
conn->remoteContentId,
conn->remoteHostAndPort),
errdetail("%m%s pid=%d sockfd=%d local=%s",
"write",
conn->cdbProc->pid,
conn->sockfd,
conn->localHostAndPort)));
}
}
/* Sent it all. */
conn->state = mcsStarted;
conn->msgPos = NULL;
conn->msgSize = PACKET_HEADER_SIZE;
conn->stillActive = true;
} /* sendRegisterMessage */
/* Function readRegisterMessage() reads a "Register" message off of the conn
* and places it in the right MotionLayerEntry conn slot based on the contents
* of the register message.
*
* PARAMETERS
*
* conn - MotionConn to read the register messagefrom.
*
* Returns true if message has been received; or false if caller must retry
* when socket becomes read-ready.
*/
static bool
readRegisterMessage(ChunkTransportState *transportStates,
MotionConn *conn)
{
int bytesToReceive;
int bytesReceived;
int iconn;
RegisterMessage *regMsg;
RegisterMessage msg;
MotionConn *newConn;
ChunkTransportStateEntry *pEntry = NULL;
CdbProcess *cdbproc;
/* Get ready to receive the Register message. */
if (conn->state != mcsRecvRegMsg)
{
conn->state = mcsRecvRegMsg;
conn->msgSize = sizeof(*regMsg);
conn->msgPos = conn->pBuff;
Assert(conn->pBuff &&
sizeof(*regMsg) <= Gp_max_packet_size);
}
/* Receive all that is available, up to the expected message size. */
for (;;)
{
bytesToReceive = conn->pBuff + conn->msgSize - conn->msgPos;
bytesReceived = recv(conn->sockfd, conn->msgPos, bytesToReceive, 0);
if (bytesReceived == bytesToReceive)
break;
else if (bytesReceived > 0)
conn->msgPos += bytesReceived;
else if (bytesReceived == 0)
{
elog(LOG, "Interconnect error reading register message from %s: connection closed",
conn->remoteHostAndPort);
/* maybe this peer is already retrying ? */
goto old_conn;
}
else if (errno == EWOULDBLOCK)
return false; /* call me again to receive the rest */
else if (errno == EINTR)
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
else
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error reading register message "
"from %s", conn->remoteHostAndPort),
errdetail("%m%s sockfd=%d local=%s",
"read",
conn->sockfd,
conn->localHostAndPort)));
}
}
/*
* Got the whole message. Convert fields to native byte order.
*/
regMsg = (RegisterMessage *)conn->pBuff;
msg.msgBytes = regMsg->msgBytes;
msg.recvSliceIndex = regMsg->recvSliceIndex;
msg.sendSliceIndex = regMsg->sendSliceIndex;
msg.srcContentId = regMsg->srcContentId;
msg.srcListenerPort = regMsg->srcListenerPort;
msg.srcPid = regMsg->srcPid;
msg.srcSessionId = regMsg->srcSessionId;
msg.srcCommandCount = regMsg->srcCommandCount;
/* Check for valid message format. */
if (msg.msgBytes != sizeof(*regMsg))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error reading register message "
"from %s: format not recognized",
conn->remoteHostAndPort),
errdetail("msgBytes=%d expected=%d sockfd=%d local=%s",
msg.msgBytes, (int)sizeof(*regMsg),
conn->sockfd, conn->localHostAndPort)));
}
/* get rid of old connections first */
if (msg.srcSessionId != gp_session_id ||
msg.srcCommandCount < gp_interconnect_id)
{
/* This is an old connection, which can be safely
* ignored. We get this kind of stuff for cases in which
* one gang participating in the interconnect exited a
* query before calling SetupInterconnect(). Later queries
* wind up receiving their registration messages. */
elog(LOG, "Received invalid, old registration message: "
"will ignore ('expected:received' session %d:%d ic-id %d:%d)",
gp_session_id, msg.srcSessionId,
gp_interconnect_id, msg.srcCommandCount);
goto old_conn;
}
/* Verify that the message pertains to one of our receiving Motion nodes. */
if (msg.sendSliceIndex > 0 &&
msg.sendSliceIndex <= transportStates->size &&
msg.recvSliceIndex == transportStates->sliceId &&
msg.srcContentId >= -1)
{
/* this is a good connection */
}
else
{
/* something is wrong */
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error: Invalid registration "
"message received from %s.",
conn->remoteHostAndPort),
errdetail("sendSlice=%d recvSlice=%d srcContentId=%d "
"srcPid=%d srcListenerPort=%d "
"srcSessionId=%d srcCommandCount=%d motnode=%d",
msg.sendSliceIndex, msg.recvSliceIndex,
msg.srcContentId, msg.srcPid,
msg.srcListenerPort, msg.srcSessionId,
msg.srcCommandCount, msg.sendSliceIndex)));
}
/*
* Find state info for the specified Motion node. The sender's slice
* number equals the motion node id.
*/
getChunkTransportState(transportStates, msg.sendSliceIndex, &pEntry);
Assert(pEntry);
/*
* Find and verify the CdbProcess node for the sending process.
*/
if (list_length(pEntry->sendSlice->primaryProcesses) == 1)
iconn = 0;
else
iconn = msg.srcContentId;
cdbproc = (CdbProcess *)list_nth(pEntry->sendSlice->primaryProcesses, iconn);
if (msg.srcContentId != cdbproc->contentid ||
msg.srcListenerPort != cdbproc->listenerPort ||
msg.srcPid != cdbproc->pid)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error: Invalid registration "
"message received from %s.",
conn->remoteHostAndPort),
errdetail("sendSlice=%d srcContentId=%d "
"srcPid=%d srcListenerPort=%d",
msg.sendSliceIndex, msg.srcContentId,
msg.srcPid, msg.srcListenerPort)));
}
/*
* Allocate MotionConn slot corresponding to sender's position in the
* sending slice's CdbProc list.
*/
newConn = getMotionConn(pEntry, iconn);
if (newConn->sockfd != -1 ||
newConn->state != mcsNull)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error: Duplicate registration "
"message received from %s.",
conn->remoteHostAndPort),
errdetail("Already accepted registration from %s for "
"sendSlice=%d srcContentId=%d "
"srcPid=%d srcListenerPort=%d",
newConn->remoteHostAndPort,
msg.sendSliceIndex, msg.srcContentId,
msg.srcPid, msg.srcListenerPort)));
}
/* message looks good */
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
ereport(LOG, (errmsg("Interconnect seg%d slice%d sockfd=%d accepted "
"registration message from seg%d slice%d %s pid=%d",
GetQEIndex(),
msg.recvSliceIndex,
conn->sockfd,
msg.srcContentId,
msg.sendSliceIndex,
conn->remoteHostAndPort,
msg.srcPid)));
}
/* Copy caller's temporary MotionConn to its assigned slot. */
*newConn = *conn;
newConn->cdbProc = cdbproc;
newConn->remoteContentId = msg.srcContentId;
/*
* The caller's MotionConn object is no longer valid.
*/
MemSet(conn, 0, sizeof(*conn));
conn->state = mcsNull;
/*
* Prepare to begin reading tuples.
*/
newConn->state = mcsStarted;
newConn->msgPos = NULL;
newConn->msgSize = 0;
newConn->stillActive = true;
MPP_FD_SET(newConn->sockfd, &pEntry->readSet);
if (newConn->sockfd > pEntry->highReadSock)
pEntry->highReadSock = newConn->sockfd;
#ifdef AMS_VERBOSE_LOGGING
dumpEntryConnections(DEBUG4, pEntry);
#endif
/* we've completed registration of this connection */
return true;
old_conn:
shutdown(conn->sockfd, SHUT_RDWR);
closesocket(conn->sockfd);
conn->sockfd = -1;
pfree(conn->pBuff);
conn->pBuff = NULL;
/* this connection is done, but with sockfd == -1 isn't a
* "success" */
return true;
} /* readRegisterMessage */
/*
* acceptIncomingConnection
*
* accept() a connection request that is pending on the listening socket.
* Returns a newly palloc'ed MotionConn object; or NULL if the listening
* socket does not have any pending connection requests.
*/
MotionConn *
acceptIncomingConnection(void)
{
int newsockfd;
socklen_t addrsize;
MotionConn *conn;
struct sockaddr_storage remoteAddr;
struct sockaddr_storage localAddr;
/*
* Accept a connection.
*/
for (;;)
{ /* loop until success or EWOULDBLOCK */
MemSet(&remoteAddr, 0, sizeof(remoteAddr));
addrsize = sizeof(remoteAddr);
newsockfd = accept(TCP_listenerFd, (struct sockaddr *)&remoteAddr, &addrsize);
if (newsockfd >= 0)
break;
switch (errno)
{
case EINTR:
/* A signal arrived. Loop to retry the accept(). */
break;
case EWOULDBLOCK:
/* Connection request queue is empty. Normal return. */
return NULL;
case EBADF:
case EFAULT:
case EINVAL:
#ifndef _WIN32
case ENOTSOCK:
#endif
case EOPNOTSUPP:
/* Shouldn't get these errors unless there is a bug. */
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error on listener port %d",
Gp_listener_port),
errdetail("%m%s sockfd=%d",
"accept",
TCP_listenerFd)));
break; /* not reached */
case ENOMEM:
case ENFILE:
case EMFILE:
case ENOBUFS:
/* Out of resources. */
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error on listener port %d",
Gp_listener_port),
errdetail("%m%s sockfd=%d",
"accept",
TCP_listenerFd)));
break; /* not reached */
default:
/* Network problem, connection aborted, etc. Continue. */
ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect connection request not "
"completed on listener port %d",
Gp_listener_port),
errdetail("%m%s sockfd=%d",
"accept",
TCP_listenerFd)));
} /* switch (errno) */
} /* loop until success or EWOULDBLOCK */
/*
* Create a MotionConn object to hold the connection state.
*/
conn = palloc0(sizeof(MotionConn));
conn->sockfd = newsockfd;
conn->pBuff = palloc(Gp_max_packet_size);
conn->msgSize = 0;
conn->recvBytes = 0;
conn->msgPos = 0;
conn->tupleCount = 0;
conn->stillActive = false;
conn->state = mcsAccepted;
conn->remoteContentId = -2;
/* Save remote and local host:port strings for error messages. */
format_sockaddr((struct sockaddr *)&remoteAddr, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));
addrsize = sizeof(localAddr);
if (getsockname(newsockfd, (struct sockaddr *)&localAddr, &addrsize))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error after accepting connection."),
errdetail("%m%s sockfd=%d remote=%s",
"getsockname",
newsockfd,
conn->remoteHostAndPort)));
}
format_sockaddr((struct sockaddr *)&localAddr, conn->localHostAndPort,
sizeof(conn->localHostAndPort));
/* make socket non-blocking */
if (!pg_set_noblock(newsockfd))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error after accepting connection."),
errdetail("%m%s sockfd=%d remote=%s local=%s",
"fcntl(O_NONBLOCK)",
newsockfd,
conn->remoteHostAndPort,
conn->localHostAndPort)));
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "Interconnect got incoming connection "
"from remote=%s to local=%s sockfd=%d",
conn->remoteHostAndPort, conn->localHostAndPort, newsockfd);
return conn;
} /* acceptIncomingConnection */
/* See ml_ipc.h */
void
SetupTCPInterconnect(EState *estate)
{
int i, index, n;
ListCell *cell;
Slice *mySlice;
Slice *aSlice;
MotionConn *conn;
int incoming_count = 0;
int outgoing_count = 0;
int expectedTotalIncoming = 0;
int expectedTotalOutgoing = 0;
int iteration = 0;
GpMonotonicTime startTime;
StringInfoData logbuf;
uint64 elapsed_ms = 0;
/* we can have at most one of these. */
ChunkTransportStateEntry *sendingChunkTransportState = NULL;
if (estate->interconnect_context)
{
elog(FATAL, "SetupTCPInterconnect: already initialized.");
}
else if (!estate->es_sliceTable)
{
elog(FATAL, "SetupTCPInterconnect: no slice table ?");
}
estate->interconnect_context = palloc0(sizeof(ChunkTransportState));
/* initialize state variables */
Assert(estate->interconnect_context->size == 0);
estate->interconnect_context->size = CTS_INITIAL_SIZE;
estate->interconnect_context->states = palloc0(CTS_INITIAL_SIZE * sizeof(ChunkTransportStateEntry));
estate->interconnect_context->teardownActive = false;
estate->interconnect_context->activated = false;
estate->interconnect_context->incompleteConns = NIL;
estate->interconnect_context->sliceTable = NULL;
estate->interconnect_context->sliceId = -1;
estate->interconnect_context->sliceTable = estate->es_sliceTable;
estate->interconnect_context->sliceId = LocallyExecutingSliceIndex(estate);
estate->interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromTCP;
estate->interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyTCP;
estate->interconnect_context->SendEos = SendEosTCP;
estate->interconnect_context->SendChunk = SendChunkTCP;
estate->interconnect_context->doSendStopMessage = doSendStopMessageTCP;
mySlice = (Slice *) list_nth(estate->interconnect_context->sliceTable->slices, LocallyExecutingSliceIndex(estate));
Assert(estate->es_sliceTable &&
IsA(mySlice, Slice) &&
mySlice->sliceIndex == LocallyExecutingSliceIndex(estate));
gp_interconnect_id = estate->interconnect_context->sliceTable->ic_instance_id;
gp_set_monotonic_begin_time(&startTime);
/* Initiate outgoing connections. */
if (mySlice->parentIndex != -1)
sendingChunkTransportState = startOutgoingConnections(estate->interconnect_context, mySlice, &expectedTotalOutgoing);
/* now we'll do some setup for each of our Receiving Motion Nodes. */
foreach(cell, mySlice->children)
{
int totalNumProcs, activeNumProcs;
int childId = lfirst_int(cell);
ChunkTransportStateEntry *pEntry;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "Setting up RECEIVING motion node %d", childId);
#endif
aSlice = (Slice *) list_nth(estate->interconnect_context->sliceTable->slices, childId);
/*
* If we're using directed-dispatch we have dummy
* primary-process entries, so we count the entries.
*/
activeNumProcs = 0;
totalNumProcs = list_length(aSlice->primaryProcesses);
for (i=0; i < totalNumProcs; i++)
{
CdbProcess *cdbProc;
cdbProc = list_nth(aSlice->primaryProcesses, i);
if (cdbProc)
activeNumProcs++;
}
pEntry = createChunkTransportState(estate->interconnect_context, aSlice, mySlice, totalNumProcs);
/* let cdbmotion now how many receivers to expect. */
setExpectedReceivers(estate->motionlayer_context, childId, activeNumProcs);
expectedTotalIncoming += activeNumProcs;
}
if (expectedTotalIncoming > listenerBacklog)
ereport(WARNING, (errmsg("SetupTCPInterconnect: too many expected incoming connections(%d), Interconnect setup might possibly fail", expectedTotalIncoming),
errhint("Try enlarging the gp_interconnect_tcp_listener_backlog GUC value and OS net.core.somaxconn parameter")));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("SetupInterconnect will activate "
"%d incoming, %d outgoing routes. "
"Listening on port=%d sockfd=%d.",
expectedTotalIncoming, expectedTotalOutgoing,
Gp_listener_port, TCP_listenerFd)));
/*
* Loop until all connections are completed or time limit is exceeded.
*/
while (outgoing_count < expectedTotalOutgoing ||
incoming_count < expectedTotalIncoming)
{ /* select() loop */
struct timeval timeout;
mpp_fd_set rset, wset, eset;
int highsock = -1;
uint64 timeout_ms = 20*60*1000;
int outgoing_fail_count = 0;
iteration++;
MPP_FD_ZERO(&rset);
MPP_FD_ZERO(&wset);
MPP_FD_ZERO(&eset);
/* Expecting any new inbound connections? */
if (incoming_count < expectedTotalIncoming)
{
if (TCP_listenerFd < 0)
{
elog(FATAL, "SetupTCPInterconnect: bad listener");
}
MPP_FD_SET(TCP_listenerFd, &rset);
highsock = TCP_listenerFd;
}
/* Inbound connections awaiting registration message */
foreach(cell, estate->interconnect_context->incompleteConns)
{
conn = (MotionConn *)lfirst(cell);
if (conn->state != mcsRecvRegMsg || conn->sockfd < 0)
{
elog(FATAL, "SetupTCPInterconnect: incomplete connection bad state or bad fd");
}
MPP_FD_SET(conn->sockfd, &rset);
highsock = Max(highsock, conn->sockfd);
}
/* Outgoing connections */
outgoing_count = 0;
n = sendingChunkTransportState ? sendingChunkTransportState->numConns : 0;
for (i = 0; i < n; i++)
{
index = i;
conn = &sendingChunkTransportState->conns[index];
/* Time to cancel incomplete connect() and retry? */
if (conn->state == mcsConnecting &&
conn->wakeup_ms > 0 &&
conn->wakeup_ms <= elapsed_ms + 20)
{
ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect timeout: Connection "
"to seg%d %s from local port %s was not "
"complete after " UINT64_FORMAT
"ms " UINT64_FORMAT " elapsed. Will retry.",
conn->remoteContentId,
conn->remoteHostAndPort,
conn->localHostAndPort,
conn->wakeup_ms, (elapsed_ms + 20))));
conn->state = mcsSetupOutgoingConnection;
}
/* Time to connect? */
if (conn->state == mcsSetupOutgoingConnection &&
conn->wakeup_ms <= elapsed_ms + 20)
{
setupOutgoingConnection(estate->interconnect_context, sendingChunkTransportState, conn);
switch (conn->state)
{
case mcsSetupOutgoingConnection:
/* Retry failed connection after awhile. */
conn->wakeup_ms = (iteration - 1) * 1000 + elapsed_ms;
break;
case mcsConnecting:
/* Set time limit for connect() to complete. */
if (estate->interconnect_context->aggressiveRetry)
conn->wakeup_ms = CONNECT_AGGRESSIVERETRY_MS + elapsed_ms;
else
conn->wakeup_ms = CONNECT_RETRY_MS + elapsed_ms;
break;
default:
conn->wakeup_ms = 0;
break;
}
}
/* What events are we watching for? */
switch (conn->state)
{
case mcsNull:
break;
case mcsSetupOutgoingConnection:
outgoing_fail_count++;
break;
case mcsConnecting:
if (conn->sockfd < 0)
{
elog(FATAL, "SetupTCPInterconnect: bad fd, mcsConnecting");
}
MPP_FD_SET(conn->sockfd, &wset);
MPP_FD_SET(conn->sockfd, &eset);
highsock = Max(highsock, conn->sockfd);
break;
case mcsSendRegMsg:
if (conn->sockfd < 0)
{
elog(FATAL, "SetupTCPInterconnect: bad fd, mcsSendRegMsg");
}
MPP_FD_SET(conn->sockfd, &wset);
highsock = Max(highsock, conn->sockfd);
break;
case mcsStarted:
outgoing_count++;
break;
default:
elog(FATAL, "SetupTCPInterconnect: bad connection state");
}
if (conn->wakeup_ms > 0)
timeout_ms = Min(timeout_ms, conn->wakeup_ms - elapsed_ms);
} /* loop to set up outgoing connections */
/* Break out of select() loop if completed all connections. */
if (outgoing_count == expectedTotalOutgoing &&
incoming_count == expectedTotalIncoming)
break;
/*
* Been here long? Bail if gp_interconnect_setup_timeout exceeded.
*/
if (interconnect_setup_timeout > 0)
{
int to = interconnect_setup_timeout * 1000;
if (to <= elapsed_ms + 20)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect timeout: Unable to "
"complete setup of all connections "
"within time limit."),
errdetail("Completed %d of %d incoming and "
"%d of %d outgoing connections. "
"gp_interconnect_setup_timeout = %d "
"seconds.",
incoming_count, expectedTotalIncoming,
outgoing_count, expectedTotalOutgoing,
interconnect_setup_timeout)
));
/* don't wait for more than 500ms */
timeout_ms = Min(500, Min(timeout_ms, to - elapsed_ms));
}
/*
* If no socket events to wait for, loop to retry after a pause.
*/
if (highsock < 0)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE &&
(timeout_ms > 0 || iteration > 2))
ereport(LOG, (errmsg("SetupInterconnect+" UINT64_FORMAT
"ms: pause " UINT64_FORMAT "ms "
"outgoing_fail=%d iteration=%d",
elapsed_ms, timeout_ms,
outgoing_fail_count, iteration)
));
/* Shouldn't be in this loop unless we have some work to do. */
if (outgoing_fail_count <= 0)
{
elog(FATAL, "SetupInterconnect: invalid outgoing count");
}
/* Wait until earliest wakeup time or overall timeout. */
if (timeout_ms > 0)
{
ML_CHECK_FOR_INTERRUPTS(estate->interconnect_context->teardownActive);
pg_usleep(timeout_ms * 1000);
ML_CHECK_FOR_INTERRUPTS(estate->interconnect_context->teardownActive);
}
/* Back to top of loop and look again. */
elapsed_ms = gp_get_elapsed_ms(&startTime);
continue;
}
/*
* Wait for socket events.
*
* In order to handle errors at intervals less than the full
* timeout length, we limit our select(2) wait to a maximum of
* 500ms.
*/
if (timeout_ms > 0)
{
timeout.tv_sec = timeout_ms / 1000; /* 0 */
timeout.tv_usec = (timeout_ms - (timeout.tv_sec * 1000)) * 1000;
Assert(timeout_ms == timeout.tv_sec*1000 + timeout.tv_usec/1000);
}
else
timeout.tv_sec = timeout.tv_usec = 0;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
initStringInfo(&logbuf);
format_fd_set(&logbuf, highsock+1, rset, "r={", "} ");
format_fd_set(&logbuf, highsock+1, wset, "w={", "} ");
format_fd_set(&logbuf, highsock+1, eset, "e={", "}");
elapsed_ms = gp_get_elapsed_ms(&startTime);
ereport(DEBUG1, (errmsg("SetupInterconnect+" UINT64_FORMAT
"ms: select() "
"Interest: %s. timeout=" UINT64_FORMAT "ms "
"outgoing_fail=%d iteration=%d",
elapsed_ms, logbuf.data, timeout_ms,
outgoing_fail_count, iteration)));
pfree(logbuf.data);
MemSet(&logbuf, 0, sizeof(logbuf));
}
ML_CHECK_FOR_INTERRUPTS(estate->interconnect_context->teardownActive);
n = select(highsock + 1, (fd_set *)&rset, (fd_set *)&wset, (fd_set *)&eset, &timeout);
ML_CHECK_FOR_INTERRUPTS(estate->interconnect_context->teardownActive);
elapsed_ms = gp_get_elapsed_ms(&startTime);
/*
* Log the select() if requested.
*/
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG ||
n != expectedTotalIncoming + expectedTotalOutgoing)
{
int elevel = (n == expectedTotalIncoming + expectedTotalOutgoing)
? DEBUG1 : LOG;
int errnoSave = errno;
initStringInfo(&logbuf);
if (n > 0)
{
appendStringInfo(&logbuf, "result=%d Ready: ", n);
format_fd_set(&logbuf, highsock+1, rset, "r={", "} ");
format_fd_set(&logbuf, highsock+1, wset, "w={", "} ");
format_fd_set(&logbuf, highsock+1, eset, "e={", "}");
}
else
appendStringInfoString(&logbuf, n < 0 ? "error" : "timeout");
ereport(elevel, (errmsg("SetupInterconnect+" UINT64_FORMAT "ms: select() %s",
elapsed_ms, logbuf.data) ));
pfree(logbuf.data);
MemSet(&logbuf, 0, sizeof(logbuf));
errno = errnoSave;
}
}
if (n < 0)
{
if (errno == EINTR)
continue;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error: %m%s", "select")));
}
/*
* check our connections that are accepted'd but no register
* message. we don't know which motion node these apply to until
* we actually receive the REGISTER message. this is why they are
* all in a single list.
*
* NOTE: we don't use foreach() here because we want to trim from the
* list as we go.
*
* We used to bail out of the while loop when incoming_count hit expectedTotalIncoming, but
* that causes problems if some connections are left over -- better to just process them
* here.
*/
cell = list_head(estate->interconnect_context->incompleteConns);
while (n > 0 && cell != NULL)
{
conn = (MotionConn *) lfirst(cell);
/*
* we'll get the next cell ready now in case we need to delete
* the cell that corresponds to our MotionConn
*/
cell = lnext(cell);
if (MPP_FD_ISSET(conn->sockfd, &rset))
{
n--;
if (readRegisterMessage(estate->interconnect_context, conn))
{
/* We're done with this connection (either it is
* bogus (and has been dropped), or we've added it to the appropriate
* hash table) */
estate->interconnect_context->incompleteConns = list_delete_ptr(estate->interconnect_context->incompleteConns, conn);
/* is the connection ready ? */
if (conn->sockfd != -1)
incoming_count++;
if (conn->pBuff)
pfree(conn->pBuff);
/* Free temporary MotionConn storage. */
pfree(conn);
}
}
}
/*
* Someone tickling our listener port? Accept pending connections.
*/
if (MPP_FD_ISSET(TCP_listenerFd, &rset))
{
n--;
while ((conn = acceptIncomingConnection()) != NULL)
{
/* get the connection read for a subsequent call to ReadRegisterMessage() */
conn->state = mcsRecvRegMsg;
conn->msgSize = sizeof(RegisterMessage);
conn->msgPos = conn->pBuff;
estate->interconnect_context->incompleteConns = lappend(estate->interconnect_context->incompleteConns, conn);
}
}
/*
* Check our outgoing connections.
*/
i = 0;
while (n > 0 &&
outgoing_count < expectedTotalOutgoing &&
i < sendingChunkTransportState->numConns)
{ /* loop to check outgoing connections */
conn = &sendingChunkTransportState->conns[i++];
switch (conn->state)
{
case mcsConnecting:
/* Has connect() succeeded or failed? */
if (MPP_FD_ISSET(conn->sockfd, &wset) ||
MPP_FD_ISSET(conn->sockfd, &eset))
{
n--;
updateOutgoingConnection(estate->interconnect_context, sendingChunkTransportState, conn, -1);
switch (conn->state)
{
case mcsSetupOutgoingConnection:
/* Failed. Wait awhile before retrying. */
conn->wakeup_ms = (iteration - 1) * 1000 + elapsed_ms;
break;
case mcsSendRegMsg:
/* Connected, but reg msg not fully sent. */
conn->wakeup_ms = 0;
break;
case mcsStarted:
/* Connected, sent reg msg, ready to rock. */
outgoing_count++;
break;
default:
elog(FATAL, "SetupInterconnect: bad outgoing state");
}
}
break;
case mcsSendRegMsg:
/* Ready to continue sending? */
if (MPP_FD_ISSET(conn->sockfd, &wset))
{
n--;
sendRegisterMessage(estate->interconnect_context, sendingChunkTransportState, conn);
if (conn->state == mcsStarted)
outgoing_count++;
}
break;
default:
break;
}
} /* loop to check outgoing connections */
/* By now we have dealt with all the events reported by select(). */
if (n != 0)
elog(FATAL, "SetupInterconnect: extra select events.");
} /* select() loop */
/*
* if everything really got setup properly then we shouldn't have
* any incomplete connections.
*
* XXX: In some cases (when the previous query got 'fast-track
* cancelled' because of an error during setup) we can wind up
* with connections here which ought to have been cleaned
* up. These connections should be closed out here. It would
* obviously be better if we could avoid these connections in the
* first place!
*/
if (list_length(estate->interconnect_context->incompleteConns) != 0)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG2, "Incomplete connections after known connections done, cleaning %d",
list_length(estate->interconnect_context->incompleteConns));
while ((cell = list_head(estate->interconnect_context->incompleteConns)) != NULL)
{
conn = (MotionConn *) lfirst(cell);
if (conn->sockfd != -1)
{
flushIncomingData(conn->sockfd);
shutdown(conn->sockfd, SHUT_WR);
closesocket(conn->sockfd);
conn->sockfd = -1;
}
estate->interconnect_context->incompleteConns = list_delete_ptr(estate->interconnect_context->incompleteConns, conn);
if (conn->pBuff)
pfree(conn->pBuff);
pfree(conn);
}
}
estate->interconnect_context->activated = true;
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
{
elapsed_ms = gp_get_elapsed_ms(&startTime);
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE ||
elapsed_ms >= 0.1 * 1000 * interconnect_setup_timeout)
elog(LOG, "SetupInterconnect+" UINT64_FORMAT "ms: Activated %d incoming, "
"%d outgoing routes.",
elapsed_ms, incoming_count, outgoing_count);
}
} /* SetupInterconnect */
/* TeardownInterconnect() function is used to cleanup interconnect resources that
* were allocated during SetupInterconnect(). This function should ALWAYS be
* called after SetupInterconnect to avoid leaking resources (like sockets)
* even if SetupInterconnect did not complete correctly. As a result, this
* function must complete successfully even if SetupInterconnect didn't.
*
* SetupInterconnect() always gets called under the ExecutorState MemoryContext.
* This context is destroyed at the end of the query and all memory that gets
* allocated under it is free'd. We don't have have to worry about pfree() but
* we definitely have to worry about socket resources.
*
* If forceEOS is set, we force end-of-stream notifications out any send-nodes,
* and we wrap that send in a PG_TRY/CATCH block because the goal is to reduce
* confusion (and if we're being shutdown abnormally anyhow, let's not start
* adding errors!).
*/
void
TeardownTCPInterconnect(ChunkTransportState *transportStates,
MotionLayerState *mlStates,
bool forceEOS)
{
ListCell *cell;
ChunkTransportStateEntry *pEntry = NULL;
int i;
Slice *mySlice;
MotionConn *conn;
if (transportStates->sliceTable == NULL)
{
elog(LOG, "TeardownUDPInterconnect: missing slice table.");
return;
}
/* if we're already trying to clean up after an error -- don't
* allow signals to interrupt us */
if (forceEOS)
HOLD_INTERRUPTS();
mySlice = (Slice *) list_nth(transportStates->sliceTable->slices, transportStates->sliceId);
/* Log the start of TeardownInterconnect. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
{
int elevel = 0;
if (forceEOS || !transportStates->activated)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = LOG;
else
elevel = DEBUG1;
}
else if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = DEBUG4;
if (elevel)
ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
"%s; setup was %s",
GetQEIndex(), mySlice->sliceIndex,
forceEOS ? "force" : "normal",
transportStates->activated ? "completed" : "exited")));
/* if setup did not complete, log the slicetable */
if (!transportStates->activated &&
gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog_node_display(DEBUG3, "local slice table", transportStates->sliceTable, true);
}
/*
* phase 1 mark all sockets (senders and receivers) with
* shutdown(2), start with incomplete connections (if any).
*/
/*
* The incompleteConns list is only used as a staging area for
* MotionConns during by SetupInterconnect(). So we only expect
* to have entries here if SetupInterconnect() did not finish
* correctly.
*
* NOTE: we don't use foreach() here because we want to trim from the
* list as we go.
*/
if (transportStates->incompleteConns &&
gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG2, "Found incomplete conn. length %d", list_length(transportStates->incompleteConns));
/* These are connected inbound peers that we haven't dealt with
* quite yet */
while ((cell = list_head(transportStates->incompleteConns)) != NULL)
{
MotionConn *conn = (MotionConn *) lfirst(cell);
/* they're incomplete, so just slam them shut. */
if (conn->sockfd != -1)
{
flushIncomingData(conn->sockfd);
shutdown(conn->sockfd, SHUT_WR);
closesocket(conn->sockfd);
conn->sockfd = -1;
}
/*
* The list operations are kind of confusing (see list.c), we
* could alternatively write the following line as:
*
* incompleteConns = list_delete_cell(incompleteConns, cell,
* NULL); or incompleteConns =
* list_delete_first(incompleteConns); or incompleteConns =
* list_delete_ptr(incompleteConns, conn)
*/
transportStates->incompleteConns = list_delete(transportStates->incompleteConns, conn);
}
list_free(transportStates->incompleteConns);
transportStates->incompleteConns = NIL;
/*
* Now "normal" connections which made it through our
* peer-registration step. With these we have to worry about
* "in-flight" data.
*/
if (mySlice->parentIndex != -1)
{
/* cleanup a Sending motion node. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect seg%d slice%d closing connections to slice%d",
GetQEIndex(), mySlice->sliceIndex, mySlice->parentIndex);
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);
if (forceEOS)
forceEosToPeers(mlStates, transportStates, mySlice->sliceIndex);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
shutdown(conn->sockfd, SHUT_WR);
}
}
/*
* cleanup all of our Receiving Motion nodes, these get closed
* immediately (the receiver know for real if they want to shut
* down -- they aren't going to be processing any more data).
*/
foreach(cell, mySlice->children)
{
Slice *aSlice;
int childId = lfirst_int(cell);
aSlice = (Slice *) list_nth(transportStates->sliceTable->slices, childId);
getChunkTransportState(transportStates, aSlice->sliceIndex, &pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect closing connections from slice%d",
aSlice->sliceIndex);
/*
* receivers know that they no longer care about data from
* below ... so we can safely discard data queued in both
* directions
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
{
flushIncomingData(conn->sockfd);
shutdown(conn->sockfd, SHUT_WR);
closesocket(conn->sockfd);
conn->sockfd = -1;
}
}
removeChunkTransportState(transportStates, aSlice->sliceIndex);
pfree(pEntry->conns);
}
/*
* phase 2: wait on all sockets for completion, when complete call
* close and free (if required)
*/
if (mySlice->parentIndex != -1)
{
/* cleanup a Sending motion node. */
getChunkTransportState(transportStates, mySlice->sliceIndex, &pEntry);
if (!forceEOS)
waitOnOutbound(pEntry);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
{
closesocket(conn->sockfd);
conn->sockfd = -1;
}
}
pEntry = removeChunkTransportState(transportStates, mySlice->sliceIndex);
}
/*
* If there are clients waiting on our listener; we *must*
* disconnect them; otherwise we'll be out of sync with the client
* (we may accept them on a subsequent query!)
*/
if (TCP_listenerFd != -1)
flushInterconnectListenerBacklog();
transportStates->activated = false;
transportStates->sliceTable = NULL;
if (transportStates != NULL)
{
if (transportStates->states != NULL)
pfree(transportStates->states);
pfree(transportStates);
}
if (forceEOS)
RESUME_INTERRUPTS();
#ifdef AMS_VERBOSE_LOGGING
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "TeardownInterconnect successful");
#endif
}
#ifdef AMS_VERBOSE_LOGGING
void
print_connection(ChunkTransportState *transportStates, int fd, const char *msg)
{
struct sockaddr_in local,
remote;
socklen_t len;
int errlevel = transportStates->teardownActive ? LOG : ERROR;
len = sizeof(remote);
if (getpeername(fd, (struct sockaddr *) & remote, &len) < 0)
{
elog(errlevel, "print_connection(%d, %s): can't get peername err: %m",
fd, msg);
}
len = sizeof(local);
if (getsockname(fd, (struct sockaddr *) & local, &len) < 0)
{
elog(errlevel, "print_connection(%d, %s): can't get localname err: %m",
fd, msg);
}
elog(DEBUG2, "%s: w/ports (%d/%d)",
msg, ntohs(local.sin_port), ntohs(remote.sin_port));
}
#endif
void
format_fd_set(StringInfo buf, int nfds, mpp_fd_set fds, char* pfx, char *sfx)
{
int i;
appendStringInfoString(buf, pfx);
for (i = 1; i < nfds; i++)
{
if (MPP_FD_ISSET(i, &fds))
appendStringInfo(buf, "%d,", i);
}
if (buf->len > 0 &&
buf->data[buf->len-1] == ',')
truncateStringInfo(buf, buf->len - 1);
appendStringInfoString(buf, sfx);
}
char *
format_sockaddr(struct sockaddr *sa, char* buf, int bufsize)
{
/* Save remote host:port string for error messages. */
if (sa->sa_family == AF_INET)
{
struct sockaddr_in * sin = (struct sockaddr_in *)sa;
uint32 saddr = ntohl(sin->sin_addr.s_addr);
snprintf(buf, bufsize, "%d.%d.%d.%d:%d",
(saddr >> 24)&0xff,
(saddr >> 16)&0xff,
(saddr >> 8)&0xff,
saddr&0xff,
ntohs(sin->sin_port));
}
#ifdef HAVE_IPV6
else if (sa->sa_family == AF_INET6)
{
char remote_port[32];
if (bufsize > 10)
{
buf[0] = '[';
/*
* inet_ntop isn't portable.
* //inet_ntop(AF_INET6, &sin6->sin6_addr, buf, bufsize - 8);
*
* postgres has a standard routine for converting addresses to printable format,
* which works for IPv6, IPv4, and Unix domain sockets. I've changed this
* routine to use that, but I think the entire format_sockaddr routine could
* be replaced with it.
*/
int ret = pg_getnameinfo_all((const struct sockaddr_storage *)sa, sizeof(struct sockaddr_storage),
buf+1, bufsize-10,
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);
if (ret != 0)
{
elog(LOG,"getnameinfo returned %d: %s, and says %s port %s",ret,gai_strerror(ret),buf,remote_port);
/*
* Fall back to using our internal inet_ntop routine, which really is for inet datatype
* This is because of a bug in solaris, where getnameinfo sometimes fails
* Once we find out why, we can remove this
*/
snprintf(remote_port,sizeof(remote_port),"%d",((struct sockaddr_in6 *)sa)->sin6_port);
/*
* This is nasty: our internal inet_net_ntop takes PGSQL_AF_INET6, not AF_INET6, which
* is very odd... They are NOT the same value (even though PGSQL_AF_INET == AF_INET
*/
#define PGSQL_AF_INET6 (AF_INET + 1)
inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf+1, bufsize-10);
elog(LOG,"Our alternative method says %s]:%s",buf,remote_port);
}
buf += strlen(buf);
strcat(buf,"]");
buf++;
}
snprintf(buf, 8, ":%s", remote_port);
}
#endif
else
snprintf(buf, bufsize, "?host?:?port?");
return buf;
} /* format_sockaddr */
static void
flushInterconnectListenerBacklog(void)
{
int pendingConn,
newfd,
i;
mpp_fd_set rset;
struct timeval timeout;
do
{
MPP_FD_ZERO(&rset);
MPP_FD_SET(TCP_listenerFd, &rset);
timeout.tv_sec = 0;
timeout.tv_usec = 0;
pendingConn = select(TCP_listenerFd + 1, (fd_set *)&rset, NULL, NULL, &timeout);
if (pendingConn > 0)
{
for (i = 0; i < pendingConn; i++)
{
struct sockaddr_storage remoteAddr;
struct sockaddr_storage localAddr;
char remoteHostAndPort[64];
char localHostAndPort[64];
socklen_t addrsize;
addrsize = sizeof(remoteAddr);
newfd = accept(TCP_listenerFd, (struct sockaddr *)&remoteAddr, &addrsize);
if (newfd < 0)
{
ereport(DEBUG3, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error while clearing incoming connections."),
errdetail("%m%s sockfd=%d", "accept", newfd)));
continue;
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE)
{
/* Get remote and local host:port strings for message. */
format_sockaddr((struct sockaddr *)&remoteAddr, remoteHostAndPort,
sizeof(remoteHostAndPort));
addrsize = sizeof(localAddr);
if (getsockname(newfd, (struct sockaddr *)&localAddr, &addrsize))
{
ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error while clearing incoming connections."),
errdetail("%m%s sockfd=%d remote=%s",
"getsockname",
newfd,
remoteHostAndPort)));
}
else
{
format_sockaddr((struct sockaddr *)&localAddr, localHostAndPort,
sizeof(localHostAndPort));
ereport(DEBUG2, (errmsg("Interconnect clearing incoming connection "
"from remote=%s to local=%s. sockfd=%d.",
remoteHostAndPort, localHostAndPort,
newfd)));
}
}
/* make socket non-blocking */
if (!pg_set_noblock(newfd))
{
elog(LOG, "During incoming queue flush, could not set non-blocking.");
}
else
{
/* shutdown this socket */
flushIncomingData(newfd);
}
shutdown(newfd, SHUT_WR);
closesocket(newfd);
}
}
else if (pendingConn < 0 && errno != EINTR)
{
ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error during listener cleanup."),
errdetail("%m%s sockfd=%d", "select", TCP_listenerFd)));
}
/*
* now we either loop through for another check (on EINTR or
* if we cleaned one client) or we're done
*/
}
while (pendingConn != 0);
}
/*
* Wait for our peer to close the socket (at which point our select(2)
* will tell us that the socket is ready to read, and the socket-read
* will only return 0.
*
* This works without the select, but burns tons of CPU doing nothing
* useful.
*
* ----
* The way it used to work, is we used CHECK_FOR_INTERRUPTS(), and
* wrapped it in PG_TRY: We *must* return locally; otherwise
* TeardownInterconnect() can't exit cleanly. So we wrap our
* cancel-detection checks for interrupts with a PG_TRY block.
*
* By swallowing the non-local return on cancel, we lose the "cancel"
* state (CHECK_FOR_INTERRUPTS() clears QueryCancelPending()). So we
* should just check QueryCancelPending here ... and avoid calling
* CHECK_FOR_INTERRUPTS().
*
* ----
*
* Now we just check explicitly for interrupts (which is, as far as I
* can tell, the only interrupt-driven state change we care
* about). This should give us notification of ProcDiePending and
* QueryCancelPending
*/
static void
waitOnOutbound(ChunkTransportStateEntry *pEntry)
{
MotionConn *conn;
struct timeval timeout;
mpp_fd_set waitset, curset;
int maxfd=-1;
int i, n, conn_count=0;
MPP_FD_ZERO(&waitset);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0)
{
MPP_FD_SET(conn->sockfd, &waitset);
if (conn->sockfd > maxfd)
maxfd = conn->sockfd;
conn_count++;
}
}
for (;;)
{
int saved_err;
if (conn_count == 0)
return;
if (InterruptPending)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "waitOnOutbound(): interrupt pending fast-track");
#endif
return;
}
timeout.tv_sec = 0;
timeout.tv_usec = 500000;
memcpy(&curset, &waitset, sizeof(mpp_fd_set));
n = select(maxfd + 1, (fd_set *)&curset, NULL, NULL, &timeout);
if (n == 0 || (n < 0 && errno == EINTR))
{
continue;
}
else if (n < 0)
{
saved_err = errno;
if (InterruptPending)
return;
/*
* Something unexpected, but probably not horrible warn
* and return
*/
elog(LOG, "TeardownTCPInterconnect: waitOnOutbound select errno=%d", saved_err);
break;
}
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 && MPP_FD_ISSET(conn->sockfd, &curset))
{
int count;
char buf;
/* ready to read. */
count = recv(conn->sockfd, &buf, sizeof(buf), 0);
if (count == 0) /* done ! */
{
MPP_FD_CLR(conn->sockfd, &waitset);
/* we may have finished */
conn_count--;
continue;
}
if (count > 0 || (count < 0 && errno == EAGAIN))
continue;
/* Some other kind of error happened */
if (errno == EINTR)
continue;
/*
* Something unexpected, but probably not horrible warn and
* return
*/
MPP_FD_CLR(conn->sockfd, &waitset);
/* we may have finished */
conn_count--;
elog(LOG, "TeardownTCPInterconnect: waitOnOutbound %m%s", "recv");
continue;
}
}
}
return;
}
static void
doSendStopMessageTCP(ChunkTransportState *transportStates, int16 motNodeID)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
int i;
char m = 'S';
ssize_t written;
getChunkTransportState(transportStates, motNodeID, &pEntry);
Assert(pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect needs no more input from slice%d; notifying senders to stop.",
motNodeID);
/*
* Note: we're only concerned with receivers here.
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 &&
MPP_FD_ISSET(conn->sockfd, &pEntry->readSet))
{
/* someone is trying to send stuff to us, let's stop 'em */
while ((written = send(conn->sockfd, &m, sizeof(m), 0)) < 0)
{
if (errno == EINTR)
{
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
continue;
}
else
break;
}
if (written != sizeof(m))
{
/*
* how can this happen ? the kernel buffer should be
* empty in the send direction
*/
elog(LOG, "SendStopMessage: failed on write. %m");
}
}
/* CRITICAL TO AVOID DEADLOCK */
DeregisterReadInterest(transportStates, motNodeID, i,
"no more input needed");
}
}
static TupleChunkListItem
RecvTupleChunkFromTCP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFrom(motNodID=%d, srcpIncIdx %d srcRoute=%d)", motNodeID, srcpInc, srcRoute);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + srcRoute;
return RecvTupleChunk(conn, transportStates->teardownActive);
}
static TupleChunkListItem
RecvTupleChunkFromAnyTCP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionNodeEntry *pMNEntry;
MotionConn *conn;
TupleChunkListItem tcItem;
mpp_fd_set rset;
int n,
i,
index;
bool skipSelect = false;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny(motNodeId=%d)", motNodeID);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
pMNEntry = getMotionNodeEntry(mlStates, motNodeID, "RecvTupleChunkFromAny");
do
{
struct timeval timeout = tval;
/* make sure we check for these. */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
memcpy(&rset, &pEntry->readSet, sizeof(mpp_fd_set));
/*
* since we may have data in a local buffer, we may be able to
* short-circuit the select() call (and if we don't do this we may
* wait when we have data ready, since it has already been read)
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 &&
MPP_FD_ISSET(conn->sockfd, &rset) &&
conn->recvBytes != 0)
{
/* we have data on this socket, let's short-circuit our select */
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
skipSelect = true;
}
}
if (skipSelect)
break;
n = select(pEntry->highReadSock + 1, (fd_set *)&rset, NULL, NULL, &timeout);
pMNEntry->sel_rd_wait += (tval.tv_sec - timeout.tv_sec) * 1000000 + (tval.tv_usec - timeout.tv_usec);
if (n < 0)
{
if (errno == EINTR)
continue;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error receiving an incoming packet."),
errdetail("%m%s", "select")));
}
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny() select() returned %d ready sockets", n);
#endif
} while (n < 1);
/*
* We scan the file descriptors starting from where we left off in the
* last call (don't continually poll the first when others may be ready!).
*/
index = pEntry->scanStart;
for (i = 0; i < pEntry->numConns; i++, index++)
{
/*
* avoid division ? index = ((scanStart + i) % pEntry->numConns);
*/
if (index >= pEntry->numConns)
index = 0;
conn = pEntry->conns + index;
#ifdef AMS_VERBOSE_LOGGING
if (!conn->stillActive)
{
elog(LOG, "RecvTupleChunkFromAny: trying to read on inactive socket %d", conn->sockfd);
}
#endif
if (conn->sockfd >= 0 &&
MPP_FD_ISSET(conn->sockfd, &rset))
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromAny() (fd %d) %d/%d", conn->sockfd, motNodeID, index);
#endif
tcItem = RecvTupleChunk(conn, transportStates->teardownActive);
*srcRoute = index;
/*
* advance start point (avoid doing division/modulus operation
* here)
*/
pEntry->scanStart = index + 1;
return tcItem;
}
}
/* we should never ever get here... */
elog(FATAL, "RecvTupleChunkFromAnyTCP: didn't receive, and didn't get cancelled");
return NULL; /* keep the compiler happy */
}
/* See ml_ipc.h */
static void
SendEosTCP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int motNodeID,
TupleChunkListItem tcItem)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
int i;
if (!transportStates)
{
elog(FATAL, "SendEosTCP: missing interconnect context.");
}
else if (!transportStates->activated && !transportStates->teardownActive)
{
elog(FATAL, "SendEosTCP: context and teardown inactive.");
}
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
getChunkTransportState(transportStates, motNodeID, &pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Interconnect seg%d slice%d sending end-of-stream to slice%d",
GetQEIndex(), motNodeID, pEntry->recvSlice->sliceIndex);
/* we want to add our tcItem onto each of the outgoing buffers --
* this is guaranteed to leave things in a state where a flush is
* *required*. */
doBroadcast(mlStates, transportStates, pEntry, tcItem, NULL);
/* now flush all of the buffers. */
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->sockfd >= 0 && conn->state == mcsStarted)
flushBuffer(mlStates, transportStates, pEntry, conn, motNodeID);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "SendEosTCP() Leaving");
#endif
}
return;
}
static bool
flushBuffer(MotionLayerState *mlStates, ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn, int16 motionId)
{
char *sendptr;
MotionNodeEntry *pMNEntry;
int n,
sent = 0;
mpp_fd_set wset;
mpp_fd_set rset;
#ifdef AMS_VERBOSE_LOGGING
{
struct timeval snapTime;
gettimeofday(&snapTime, NULL);
elog(DEBUG5, "----sending chunk @%s.%d time is %d.%d",
__FILE__, __LINE__, (int)snapTime.tv_sec, (int)snapTime.tv_usec);
}
#endif
pMNEntry = getMotionNodeEntry(mlStates, motionId, "flushBuffer");
/* first set header length */
*(uint32 *) conn->pBuff = conn->msgSize;
/* now send message */
sendptr = (char *) conn->pBuff;
sent = 0;
do
{
struct timeval timeout;
/* check for stop message before sending anything */
timeout.tv_sec = 0;
timeout.tv_usec = 0;
MPP_FD_ZERO(&rset);
MPP_FD_SET(conn->sockfd, &rset);
/*
* since timeout = 0, select returns imediately and no time is wasted
* waiting trying to send data on the network
*/
n = select(conn->sockfd + 1, (fd_set *)&rset, NULL, NULL, &timeout);
/* handle errors at the write call, below */
if (n > 0 && MPP_FD_ISSET(conn->sockfd, &rset))
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
/* got a stop message */
conn->stillActive = false;
return false;
}
if ((n = send(conn->sockfd, sendptr + sent, conn->msgSize - sent, 0)) < 0)
{
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (errno == EINTR)
continue;
if (errno == EWOULDBLOCK)
{
do
{
timeout = tval;
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
MPP_FD_ZERO(&rset);
MPP_FD_ZERO(&wset);
MPP_FD_SET(conn->sockfd, &wset);
MPP_FD_SET(conn->sockfd, &rset);
n = select(conn->sockfd + 1, (fd_set *)&rset, (fd_set *)&wset, NULL, &timeout);
pMNEntry->sel_wr_wait += (tval.tv_sec - timeout.tv_sec)*1000000 +(tval.tv_usec - timeout.tv_usec);
if (n < 0)
{
if (errno == EINTR)
continue;
/*
* if we got an error in teardown, ignore it: treat it
* as a stop message
*/
if (transportStates->teardownActive)
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
conn->stillActive = false;
return false;
}
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error writing an outgoing packet: %m"),
errdetail("error during select() call (error:%d).\n"
"For Remote Connection: contentId=%d at %s",
errno, conn->remoteContentId,
conn->remoteHostAndPort)));
}
/*
* as a sender... if there is something to read...
* it must mean its a StopSendingMessage. we
* don't even bother to read it.
*/
if (MPP_FD_ISSET(conn->sockfd, &rset) || transportStates->teardownActive)
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
conn->stillActive = false;
return false;
}
} while (n < 1);
}
else
{
/*
* if we got an error in teardown, ignore it: treat it as a
* stop message
*/
if (transportStates->teardownActive)
{
#ifdef AMS_VERBOSE_LOGGING
print_connection(transportStates, conn->sockfd, "stop from");
#endif
conn->stillActive = false;
return false;
}
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error writing an outgoing packet"),
errdetail("error during send() call (error:%d).\n"
"For Remote Connection: contentId=%d at %s",
errno, conn->remoteContentId,
conn->remoteHostAndPort)));
}
}
else
{
sent += n;
}
} while (sent < conn->msgSize);
conn->tupleCount = 0;
conn->msgSize = PACKET_HEADER_SIZE;
return true;
}
/* The Function sendChunk() is used to send a tcItem to a single
* destination. Tuples often are *very small* we aggregate in our
* local buffer before sending into the kernel.
*
* PARAMETERS
* conn - MotionConn that the tcItem is to be sent to.
* tcItem - message to be sent.
* motionId - Node Motion Id.
*/
static bool
SendChunkTCP(MotionLayerState *mlStates, ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn * conn, TupleChunkListItem tcItem, int16 motionId)
{
int length=TYPEALIGN(TUPLE_CHUNK_ALIGN,tcItem->chunk_length);
Assert(conn->msgSize > 0);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "sendChunk: msgSize %d this chunk length %d", conn->msgSize, tcItem->chunk_length);
#endif
if (conn->msgSize + length > Gp_max_packet_size)
{
if (!flushBuffer(mlStates, transportStates, pEntry, conn, motionId))
return false;
}
memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
conn->msgSize += length;
conn->tupleCount++;
return true;
}