blob: 4defd2d01fe56f8dd3dc311aa1a047f03eb90efc [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
* ic_common.c
* Interconnect code shared between UDP, and TCP IPC Layers.
*
* Reviewers: jzhang, tkordas
*-------------------------------------------------------------------------
*/
#ifdef WIN32
/*
* Need this to get WSAPoll (poll). And it
* has to be set before any header from the Win32 API is loaded.
*/
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include "postgres.h"
#include "nodes/execnodes.h" /* Slice, SliceTable */
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "utils/memutils.h"
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "libpq/ip.h"
#include "utils/builtins.h"
#include "utils/debugbreak.h"
#include "cdb/cdbselect.h"
#include "cdb/tupchunklist.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include <fcntl.h>
#include <limits.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#include "port.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
/* If we have old platform sdk headers, WSAPoll() might not be there */
#ifndef POLLIN
/* Event flag definitions for WSAPoll(). */
#define POLLRDNORM 0x0100
#define POLLRDBAND 0x0200
#define POLLIN (POLLRDNORM | POLLRDBAND)
#define POLLPRI 0x0400
#define POLLWRNORM 0x0010
#define POLLOUT (POLLWRNORM)
#define POLLWRBAND 0x0020
#define POLLERR 0x0001
#define POLLHUP 0x0002
#define POLLNVAL 0x0004
typedef struct pollfd {
SOCKET fd;
SHORT events;
SHORT revents;
} WSAPOLLFD, *PWSAPOLLFD, FAR *LPWSAPOLLFD;
__control_entrypoint(DllExport)
WINSOCK_API_LINKAGE
int
WSAAPI
WSAPoll(
IN OUT LPWSAPOLLFD fdArray,
IN ULONG fds,
IN INT timeout
);
#endif
#define poll WSAPoll
/*
* Postgres normally uses it's own custom select implementation
* on Windows, but they haven't implemented execeptfds, which
* we use here. So, undef this to use the normal Winsock version
* for now
*/
#undef select
#endif
/*
#define AMS_VERBOSE_LOGGING
*/
/*=========================================================================
* STRUCTS
*/
/*=========================================================================
* GLOBAL STATE VARIABLES
*/
/* Socket file descriptor for the listener. */
int TCP_listenerFd;
int UDP_listenerFd;
/* Socket file descriptor for the sequence server. */
int savedSeqServerFd = -1;
char *savedSeqServerHost = NULL;
uint16 savedSeqServerPort = 0;
/*
* Outgoing port assignment
*
* To reserve a port number for outgoing connections, we open a dummy
* listening socket. Nobody connects to this socket.
*/
int portReservationFd = -1;
int outgoingPort = 0;
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
static void setupSeqServerConnection(char *hostname, uint16 port);
#ifdef AMS_VERBOSE_LOGGING
static void dumpEntryConnections(int elevel, ChunkTransportStateEntry *pEntry);
static void print_connection(ChunkTransportState *transportStates, int fd, const char *msg);
#endif
static void
logChunkParseDetails(MotionConn *conn)
{
struct icpkthdr *pkt;
Assert(conn != NULL);
Assert(conn->pBuff != NULL);
pkt = (struct icpkthdr *)conn->pBuff;
elog(LOG, "Interconnect parse details: pkt->len %d pkt->seq %d pkt->flags 0x%x conn->active %d conn->stopRequest %d pkt->icId %d my_icId %d",
pkt->len, pkt->seq, pkt->flags, conn->stillActive, conn->stopRequested, pkt->icId, gp_interconnect_id);
elog(LOG, "Interconnect parse details continued: peer: srcpid %d dstpid %d recvslice %d sendslice %d srccontent %d dstcontent %d",
pkt->srcPid, pkt->dstPid, pkt->recvSliceIndex, pkt->sendSliceIndex, pkt->srcContentId, pkt->dstContentId);
}
TupleChunkListItem
RecvTupleChunk(MotionConn *conn, bool inTeardown)
{
TupleChunkListItem tcItem;
TupleChunkListItem firstTcItem = NULL;
TupleChunkListItem lastTcItem = NULL;
uint32 tcSize;
int bytesProcessed = 0;
if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
{
/* read the packet in from the network. */
readPacket(conn, inTeardown);
/* go through and form us some TupleChunks. */
bytesProcessed = PACKET_HEADER_SIZE;
}
else
{
/* go through and form us some TupleChunks. */
bytesProcessed = sizeof(struct icpkthdr);
}
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "recvtuple chunk recv bytes %d msgsize %d conn->pBuff %p conn->msgPos: %p",
conn->recvBytes, conn->msgSize, conn->pBuff, conn->msgPos);
#endif
while (bytesProcessed != conn->msgSize)
{
if (conn->msgSize - bytesProcessed < TUPLE_CHUNK_HEADER_SIZE)
{
logChunkParseDetails(conn);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error parsing message: insufficient data received."),
errdetail("conn->msgSize %d bytesProcessed %d < chunk-header %d",
conn->msgSize, bytesProcessed, TUPLE_CHUNK_HEADER_SIZE)));
}
tcSize = TUPLE_CHUNK_HEADER_SIZE + (*(uint16 *) (conn->msgPos + bytesProcessed));
/* sanity check */
if (tcSize > Gp_max_packet_size)
{
/* see MPP-720: it is possible that our message got messed
* up by a cancellation ? */
ML_CHECK_FOR_INTERRUPTS(inTeardown);
/*
* MPP-4010: add some extra debugging.
*/
if (lastTcItem != NULL)
elog(LOG, "Interconnect error parsing message: last item length %d inplace %p", lastTcItem->chunk_length, lastTcItem->inplace);
else
elog(LOG, "Interconnect error parsing message: no last item");
logChunkParseDetails(conn);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error parsing message"),
errdetail("tcSize %d > max %d header %d processed %d/%d from %p",
tcSize, Gp_max_packet_size,
TUPLE_CHUNK_HEADER_SIZE, bytesProcessed, conn->msgSize, conn->msgPos)));
}
/* we only check for interrupts here when we don't have a guaranteed full-message */
if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
{
if (tcSize >= conn->msgSize)
{
/* see MPP-720: it is possible that our message got
* messed up by a cancellation ? */
ML_CHECK_FOR_INTERRUPTS(inTeardown);
logChunkParseDetails(conn);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error parsing message"),
errdetail("tcSize %d >= conn->msgSize %d", tcSize, conn->msgSize)));
}
}
Assert(tcSize < conn->msgSize);
/*
* We store the data inplace, and handle any necessary copying later
* on
*/
tcItem = (TupleChunkListItem) palloc0(sizeof(TupleChunkListItemData));
tcItem->chunk_length = tcSize;
tcItem->inplace = (char *) (conn->msgPos + bytesProcessed);
bytesProcessed += TYPEALIGN(TUPLE_CHUNK_ALIGN,tcSize);
if (firstTcItem == NULL)
{
firstTcItem = tcItem;
lastTcItem = tcItem;
}
else
{
lastTcItem->p_next = tcItem;
lastTcItem = tcItem;
}
}
conn->recvBytes -= conn->msgSize;
if (conn->recvBytes != 0)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "residual message %d bytes", conn->recvBytes);
#endif
conn->msgPos += conn->msgSize;
}
conn->msgSize = 0;
return firstTcItem;
}
/*=========================================================================
* VISIBLE FUNCTIONS
*/
/* See ml_ipc.h */
void
InitMotionLayerIPC(void)
{
uint16 tcp_listener = 0;
uint16 udp_listener = 0;
/*activated = false;*/
savedSeqServerFd = -1;
if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
InitMotionTCP(&TCP_listenerFd, &tcp_listener);
else if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
InitMotionUDP(&UDP_listenerFd, &udp_listener);
Gp_listener_port = (udp_listener<<16) | tcp_listener;
elog(DEBUG1, "Interconnect listening on tcp port %d udp port %d (0x%x)", tcp_listener, udp_listener, Gp_listener_port);
}
/* See ml_ipc.h */
void
CleanUpMotionLayerIPC(void)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "Cleaning Up Motion Layer IPC...");
if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
CleanupMotionTCP();
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
CleanupMotionUDP();
/* close down the Interconnect listener socket. */
if (TCP_listenerFd >= 0)
closesocket(TCP_listenerFd);
if (UDP_listenerFd >= 0)
closesocket(UDP_listenerFd);
if (portReservationFd >= 0)
closesocket(portReservationFd);
/* be safe and reset global state variables. */
Gp_listener_port = 0;
TCP_listenerFd = -1;
UDP_listenerFd = -1;
portReservationFd = -1;
}
/* See ml_ipc.h */
bool
SendTupleChunkToAMS(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 targetRoute,
TupleChunkListItem tcItem)
{
int i,
recount = 0;
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
TupleChunkListItem currItem;
if (!transportStates)
elog(FATAL, "SendTupleChunkToAMS: no transport-states.");
if (!transportStates->activated)
elog(FATAL, "SendTupleChunkToAMS: transport states inactive");
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "sendtuplechunktoams: calling get_transport_state"
"w/transportStates %p transportState->size %d motnodeid %d route %d",
transportStates, transportStates->size, motNodeID, targetRoute);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
/*
* tcItem can actually be a chain of tcItems. we need to send out all of
* them.
*/
currItem = tcItem;
for (currItem = tcItem; currItem != NULL; currItem = currItem->p_next)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "SendTupleChunkToAMS: chunk length %d", currItem->chunk_length);
#endif
if (targetRoute == BROADCAST_SEGIDX)
{
doBroadcast(mlStates, transportStates, pEntry, currItem, &recount);
}
else
{
/* handle pt-to-pt message. Primary */
conn = pEntry->conns + targetRoute;
/* only send to interested connections */
if (conn->stillActive)
{
transportStates->SendChunk(mlStates, transportStates, pEntry, conn, currItem, motNodeID);
if (!conn->stillActive)
recount = 1;
}
/* in 4.0 logical mirror xmit eliminated. */
}
}
if (recount == 0)
return true;
/* if we don't have any connections active, return false */
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->stillActive)
break;
}
/* if we found an active connection we're not done */
return (i < pEntry->numConns);
}
/*
* The fetches a direct pointer into our transmit buffers, along with
* an indication as to how much data can be safely shoved into the
* buffer (started at the pointed location).
*
* This works a lot like SendTupleChunkToAMS().
*/
void
getTransportDirectBuffer(ChunkTransportState *transportStates,
int16 motNodeID,
int16 targetRoute,
struct directTransportBuffer *b)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
if (!transportStates)
{
elog(FATAL, "getTransportDirectBuffer: no transport states");
}
else if (!transportStates->activated)
{
elog(FATAL, "getTransportDirectBuffer: inactive transport states");
}
else if (targetRoute == BROADCAST_SEGIDX)
{
elog(FATAL, "getTransportDirectBuffer: can't direct-transport to broadcast");
}
Assert(b != NULL);
do
{
getChunkTransportState(transportStates, motNodeID, &pEntry);
/* handle pt-to-pt message. Primary */
conn = pEntry->conns + targetRoute;
/* only send to interested connections */
if (!conn->stillActive)
{
break;
}
b->pri = conn->pBuff + conn->msgSize;
b->prilen = Gp_max_packet_size - conn->msgSize;
/* got buffer. */
return;
}
while (0);
/* buffer is missing ? */
b->pri = NULL;
b->prilen = 0;
return;
}
/*
* The fetches a direct pointer into our transmit buffers, along with
* an indication as to how much data can be safely shoved into the
* buffer (started at the pointed location).
*
* This works a lot like SendTupleChunkToAMS().
*/
void
putTransportDirectBuffer(ChunkTransportState *transportStates,
int16 motNodeID,
int16 targetRoute, int length)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
if (!transportStates)
{
elog(FATAL, "putTransportDirectBuffer: no transport states");
}
else if (!transportStates->activated)
{
elog(FATAL, "putTransportDirectBuffer: inactive transport states");
}
else if (targetRoute == BROADCAST_SEGIDX)
{
elog(FATAL, "putTransportDirectBuffer: can't direct-transport to broadcast");
}
getChunkTransportState(transportStates, motNodeID, &pEntry);
/* handle pt-to-pt message. Primary */
conn = pEntry->conns + targetRoute;
/* only send to interested connections */
if (conn->stillActive)
{
conn->msgSize += length;
conn->tupleCount++;
}
/* put buffer. */
return;
}
/*
* DeregisterReadInterest is called on receiving nodes when they
* believe that they're done with the receiver
*/
void
DeregisterReadInterest(ChunkTransportState *transportStates,
int motNodeID,
int srcRoute,
const char *reason)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
if (!transportStates)
{
elog(FATAL, "DeregisterReadInterest: no transport states");
}
if (!transportStates->activated)
return;
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + srcRoute;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
elog(DEBUG3, "Interconnect finished receiving "
"from seg%d slice%d %s pid=%d sockfd=%d; %s",
conn->remoteContentId,
pEntry->sendSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
conn->sockfd,
reason);
}
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
{
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "deregisterReadInterest set stillactive = false for node %d route %d (%s)", motNodeID, srcRoute, reason);
#endif
markUDPConnInactive(conn);
}
else
{
/*
* we also mark the connection as "done." The way synchronization works is
* strange. On QDs the "teardown" doesn't get called until all segments
* are finished, which means that we need some way for the QEs to know
* that Teardown should complete, otherwise we deadlock the entire query
* (QEs wait in their Teardown calls, while the QD waits for them to
* finish)
*/
shutdown(conn->sockfd, SHUT_WR);
MPP_FD_CLR(conn->sockfd, &pEntry->readSet);
}
return;
}
/*
* Returns the fd of the socket that connects to the seqserver. This value
* is -1 if it has not been setup.
*/
int
GetSeqServerFD(void)
{
/*
* setup connection to seq server if needed. The interconnect is
* responsible for maintaining the connection although it actually doesn't
* use the socket directly. sequence.c does to obtain sequence values
* from the seqserver. TeardownInterconnect() is responsible for closing
* the socket.
*
*/
if (savedSeqServerHost == NULL)
elog(ERROR, "Invalid Sequence Access. Sequence server info is invalid.");
if (savedSeqServerFd == -1)
setupSeqServerConnection(savedSeqServerHost, savedSeqServerPort);
return savedSeqServerFd;
}
void
SetupSequenceServer(const char *host, int port)
{
if (host != NULL)
{
/*
* See MPP-10162: certain PL/PGSQL functions may call us multiple
* times without an intervening Teardown.
*/
if (savedSeqServerHost != NULL && savedSeqServerHost != host)
{
free(savedSeqServerHost);
savedSeqServerHost = NULL;
savedSeqServerPort = 0;
}
/*
* Don't use MemoryContexts -- they make error handling
* difficult here.
*/
if (savedSeqServerHost != host) {
savedSeqServerHost = strdup(host);
}
if (savedSeqServerHost == NULL)
{
elog(ERROR, "SetupSequenceServer: memory allocation failed.");
}
Assert(port != 0);
savedSeqServerPort = port;
}
}
void
TeardownSequenceServer(void)
{
/*
* If we setup a connection to the seqserver then we need to disconnect
*/
if (savedSeqServerFd != -1)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG3, "tearing down seqserver connection");
shutdown(savedSeqServerFd, SHUT_RDWR);
closesocket(savedSeqServerFd);
savedSeqServerFd = -1;
}
if (savedSeqServerHost != NULL)
{
free(savedSeqServerHost);
savedSeqServerHost = NULL;
savedSeqServerPort = 0;
}
}
static void
setupSeqServerConnection(char *seqServerHost, uint16 seqServerPort)
{
int n;
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL;
struct addrinfo hint;
/*
* We get the IP address (IPv4 or IPv6) of the sequence server,
* not it's name, so we can tell getaddrinfo to skip any attempt at
* name resolution.
*/
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
snprintf(portNumberStr, sizeof(portNumberStr), "%d", seqServerPort);
service = portNumberStr;
ret = pg_getaddrinfo_all(seqServerHost, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(ERROR,
(errmsg("could not translate host addr \"%s\", port \"%d\" to address: %s",
seqServerHost, seqServerPort, gai_strerror(ret))));
return;
}
if ((savedSeqServerFd = socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol)) < 0)
elog(ERROR,"socket() call failed: %m");
if ((n = connect(savedSeqServerFd, addrs->ai_addr, addrs->ai_addrlen)) < 0)
{
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not connect to seqserver."),
errdetail("%m%s", "connect")));
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
/* make socket non-blocking BEFORE we connect. */
if (!pg_set_noblock(savedSeqServerFd))
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not set seqserver socket"
"to non-blocking mode."),
errdetail("%m%s sockfd=%d", "fcntl", savedSeqServerFd)));
}
void
SetupInterconnect(EState *estate)
{
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
SetupUDPInterconnect(estate);
else if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
SetupTCPInterconnect(estate);
else
{
if (Gp_interconnect_type != INTERCONNECT_TYPE_NIL)
{
elog(FATAL, "SetupUDPInterconnect: unknown interconnect-type");
}
if (estate->interconnect_context)
{
elog(FATAL, "SetupInterconnect: already initialized.");
}
if (!estate->es_sliceTable)
{
elog(FATAL, "SetupInterconnect: no slice table ?");
}
estate->interconnect_context = palloc0(sizeof(ChunkTransportState));
/* initialize state variables */
Assert(estate->interconnect_context->size == 0);
estate->interconnect_context->size = CTS_INITIAL_SIZE;
estate->interconnect_context->states = palloc0(CTS_INITIAL_SIZE * sizeof(ChunkTransportStateEntry));
estate->interconnect_context->teardownActive = false;
estate->interconnect_context->activated = false;
estate->interconnect_context->incompleteConns = NIL;
estate->interconnect_context->sliceTable = NULL;
estate->interconnect_context->sliceId = -1;
estate->interconnect_context->sliceTable = estate->es_sliceTable;
estate->interconnect_context->sliceId = LocallyExecutingSliceIndex(estate);
}
return;
}
/*
* Move this out to separate stack frame, so that we don't have to mark
* tons of stuff volatile in TeardownInterconnect().
*/
void
forceEosToPeers(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int motNodeID)
{
if (!transportStates)
{
elog(FATAL, "no transport-states.");
}
transportStates->teardownActive = true;
transportStates->SendEos(mlStates, transportStates, motNodeID, get_eos_tuplechunklist());
transportStates->teardownActive = false;
}
/* TeardownInterconnect() function is used to cleanup interconnect resources that
* were allocated during SetupInterconnect(). This function should ALWAYS be
* called after SetupInterconnect to avoid leaking resources (like sockets)
* even if SetupInterconnect did not complete correctly.
*/
void
TeardownInterconnect(ChunkTransportState *transportStates,
MotionLayerState *mlStates,
bool forceEOS)
{
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
{
TeardownUDPInterconnect(transportStates, mlStates, forceEOS);
return;
}
else if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP)
{
TeardownTCPInterconnect(transportStates, mlStates, forceEOS);
return;
}
else if (Gp_interconnect_type == INTERCONNECT_TYPE_NIL)
{
if (transportStates->states != NULL)
pfree(transportStates->states);
pfree(transportStates);
return;
}
}
/*=========================================================================
* HELPER FUNCTIONS
*/
/* Function createChunkTransportState() is used to create a ChunkTransportState struct and
* place it in the hashtab hashtable based on the motNodeID.
*
* PARAMETERS
*
* motNodeID - motion node ID for this ChunkTransportState.
*
* numPrimaryConns - number of primary connections for this motion node.
* All are incoming if this is a receiving motion node.
* All are outgoing if this is a sending motion node.
*
* RETURNS
* An empty and initialized ChunkTransportState struct for the given motion node. If
* a ChuckTransportState struct is already registered for the motNodeID an ERROR is
* thrown.
*/
ChunkTransportStateEntry *
createChunkTransportState(ChunkTransportState *transportStates,
Slice *sendSlice,
Slice *recvSlice,
int numPrimaryConns)
{
ChunkTransportStateEntry *pEntry;
int motNodeID;
int i;
Assert(recvSlice &&
IsA(recvSlice, Slice) &&
recvSlice->sliceIndex >= 0);
Assert(sendSlice &&
IsA(sendSlice, Slice) &&
sendSlice->sliceIndex > 0);
motNodeID = sendSlice->sliceIndex;
if (motNodeID > transportStates->size)
{
/* increase size of our table */
ChunkTransportStateEntry *newTable;
newTable = repalloc(transportStates->states, motNodeID * sizeof(ChunkTransportStateEntry));
transportStates->states = newTable;
/* zero-out the new piece at the end */
MemSet(&transportStates->states[transportStates->size], 0, (motNodeID - transportStates->size) * sizeof(ChunkTransportStateEntry));
transportStates->size = motNodeID;
}
pEntry = &transportStates->states[motNodeID - 1];
if (pEntry->valid)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: A HTAB entry for motion node %d already exists.", motNodeID),
errdetail("conns %p numConns %d first sock %d highreadsock %d", pEntry->conns, pEntry->numConns, pEntry->conns[0].sockfd, pEntry->highReadSock)));
}
pEntry->valid = true;
pEntry->motNodeId = motNodeID;
pEntry->numConns = numPrimaryConns;
pEntry->numPrimaryConns = numPrimaryConns;
pEntry->highReadSock = 0;
pEntry->scanStart = 0;
pEntry->sendSlice = sendSlice;
pEntry->recvSlice = recvSlice;
pEntry->outgoingPortRetryCount = 0;
pEntry->conns = palloc0(pEntry->numConns * sizeof(pEntry->conns[0]));
for (i = 0; i < pEntry->numConns; i++)
{
MotionConn *conn = &pEntry->conns[i];
/* Initialize MotionConn entry. */
conn->state = mcsNull;
conn->sockfd = -1;
conn->msgSize = 0;
conn->tupleCount = 0;
conn->stillActive = false;
conn->stopRequested = false;
conn->wakeup_ms = 0;
conn->cdbProc = NULL;
}
MPP_FD_ZERO(&pEntry->readSet);
return pEntry;
}
/* Function removeChunkTransportState() is used to remove a ChunkTransportState struct from
* the hashtab hashtable.
*
* This should only be called after createChunkTransportState().
*
* PARAMETERS
*
* motNodeID - motion node ID to lookup the ChunkTransportState.
* pIncIdx - parent slice idx in child slice. If not multiplexed, should be 1.
*
* RETURNS
* The ChunkTransportState that was removed from the hashtab hashtable.
*/
ChunkTransportStateEntry *
removeChunkTransportState(ChunkTransportState *transportStates,
int16 motNodeID)
{
ChunkTransportStateEntry *pEntry = NULL;
if (motNodeID > transportStates->size)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Unexpected Motion Node Id: %d. During remove. (size %d)",
motNodeID, transportStates->size)));
}
else if (!transportStates->states[motNodeID - 1].valid)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Unexpected Motion Node Id: %d. During remove. State not valid",
motNodeID)));
}
else
{
transportStates->states[motNodeID - 1].valid = false;
pEntry = &transportStates->states[motNodeID - 1];
}
return pEntry;
}
#ifdef AMS_VERBOSE_LOGGING
void
dumpEntryConnections(int elevel, ChunkTransportStateEntry *pEntry)
{
int i;
MotionConn *conn;
for (i = 0; i < pEntry->numConns; i++)
{
conn = &pEntry->conns[i];
if (conn->sockfd == -1 &&
conn->state == mcsNull)
elog(elevel, "... motNodeId=%d conns[%d]: not connected",
pEntry->motNodeId, i);
else
elog(elevel, "... motNodeId=%d conns[%d]: "
"%s%d pid=%d sockfd=%d remote=%s local=%s",
pEntry->motNodeId, i,
(i < pEntry->numPrimaryConns) ? "seg" : "mir",
conn->remoteContentId,
conn->cdbProc ? conn->cdbProc->pid : 0,
conn->sockfd,
conn->remoteHostAndPort,
conn->localHostAndPort);
}
}
#endif
/*
* Set the listener address associated with the slice to
* the master address that is established through libpq
* connection. This guarantees that the outgoing connections
* will connect to an address that is reachable in the event
* when the master can not be reached by segments through
* the network interface recorded in the catalog.
*/
void adjustMasterRouting(Slice *recvSlice)
{
ListCell *lc = NULL;
foreach(lc, recvSlice->primaryProcesses)
{
CdbProcess *cdbProc = (CdbProcess *)lfirst(lc);
if (cdbProc->listenerAddr == NULL)
{
if (strcmp(MyProcPort->remote_host, "[local]") == 0)
cdbProc->listenerAddr = pstrdup("127.0.0.1");
else
cdbProc->listenerAddr = pstrdup(MyProcPort->remote_host);
}
}
}
void
SendDummyPacket(void)
{
int sockfd = -1;
int ret = -1;
struct addrinfo* addrs = NULL;
struct addrinfo* rp = NULL;
struct addrinfo hint;
uint16 udp_listenner;
char port_str[32] = {0};
char* dummy_pkt = "stop it";
/*
* Get address info from interconnect udp listenner port
*/
udp_listenner = (Gp_listener_port >> 16) & 0x0ffff;
snprintf(port_str, sizeof(port_str), "%d", udp_listenner);
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_DGRAM;
hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
ret = pg_getaddrinfo_all(NULL, port_str, &hint, &addrs);
if (ret || !addrs)
{
elog(LOG, "Send dummy packet failed, pg_getaddrinfo_all(): %s", strerror(errno));
goto send_error;
}
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
/* Create socket according to pg_getaddrinfo_all() */
sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sockfd < 0)
{
continue;
}
if (!pg_set_noblock(sockfd))
{
if (sockfd >= 0)
closesocket(sockfd);
continue;
}
break;
}
if (rp == NULL)
{
elog(LOG, "Send dummy packet failed, create socket failed: %s", strerror(errno));
goto send_error;
}
/*
* Send a dummy package to the interconnect listener, try 10 times
*/
int counter = 0;
while (counter < 10)
{
counter++;
ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen);
if(ret < 0)
{
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
{
continue;
}
else
{
elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno));
goto send_error;
}
}
break;
}
if (counter >= 10)
{
elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno));
goto send_error;
}
pg_freeaddrinfo_all(hint.ai_family, addrs);
close(sockfd);
return;
send_error:
if (addrs)
{
pg_freeaddrinfo_all(hint.ai_family, addrs);
}
if (sockfd != -1)
{
close(sockfd);
}
return;
}
/*
* WaitInterconnectQuit
*
* Wait for the ic thread to quit, don't clean any resource owned by ic thread
*/
void
WaitInterconnectQuit(void)
{
WaitInterconnectQuitUDP();
}