blob: 2b68f157ec2507fcfe6368005d2448866c1f7106 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
* ic_udp.c
* Interconnect code specific to UDP transport.
*-------------------------------------------------------------------------
*/
#ifdef WIN32
/*
* Need this to get WSAPoll (poll). And it
* has to be set before any header from the Win32 API is loaded.
*/
#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include "postgres.h"
#include <pthread.h>
#include "access/transam.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "nodes/print.h"
#include "utils/memutils.h"
#include "utils/hsearch.h"
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "libpq/ip.h"
#include "utils/atomic.h"
#include "utils/builtins.h"
#include "utils/debugbreak.h"
#include "utils/pg_crc.h"
#include "port/pg_crc32c.h"
#include "cdb/cdbselect.h"
#include "cdb/tupchunklist.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp.h"
#include "cdb/dispatcher.h"
#include "cdb/cdbicudpfaultinjection.h"
#include "portability/instr_time.h"
#include <fcntl.h>
#include <limits.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "pgtime.h"
#include <netinet/in.h>
#include "port.h"
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
/* If we have old platform sdk headers, WSAPoll() might not be there */
#ifndef POLLIN
/* Event flag definitions for WSAPoll(). */
#define POLLRDNORM 0x0100
#define POLLRDBAND 0x0200
#define POLLIN (POLLRDNORM | POLLRDBAND)
#define POLLPRI 0x0400
#define POLLWRNORM 0x0010
#define POLLOUT (POLLWRNORM)
#define POLLWRBAND 0x0020
#define POLLERR 0x0001
#define POLLHUP 0x0002
#define POLLNVAL 0x0004
typedef struct pollfd {
SOCKET fd;
SHORT events;
SHORT revents;
} WSAPOLLFD, *PWSAPOLLFD, FAR *LPWSAPOLLFD;
__control_entrypoint(DllExport)
WINSOCK_API_LINKAGE
int
WSAAPI
WSAPoll(
IN OUT LPWSAPOLLFD fdArray,
IN ULONG fds,
IN INT timeout
);
#endif
#define poll WSAPoll
/*
* Postgres normally uses it's own custom select implementation
* on Windows, but they haven't implemented execeptfds, which
* we use here. So, undef this to use the normal Winsock version
* for now
*/
#undef select
#endif
#define MAX_TRY (11)
int
timeoutArray[] =
{
1,
1,
2,
4,
8,
16,
32,
64,
128,
256,
512,
512 /* MAX_TRY*/
};
#define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) : (timeoutArray[MAX_TRY]))
/* 1/4 sec in msec */
#define RX_THREAD_POLL_TIMEOUT (250)
/*
* Flags definitions for flag-field of UDP-messages
*
* We use bit operations to test these, flags are powers of two only
*/
#define UDPIC_FLAGS_RECEIVER_TO_SENDER (1)
#define UDPIC_FLAGS_ACK (2)
#define UDPIC_FLAGS_STOP (4)
#define UDPIC_FLAGS_EOS (8)
#define UDPIC_FLAGS_NAK (16)
#define UDPIC_FLAGS_DISORDER (32)
#define UDPIC_FLAGS_DUPLICATE (64)
#define UDPIC_FLAGS_CAPACITY (128)
/*
* ConnHtabBin
*
* A connection hash table bin.
*
*/
typedef struct ConnHtabBin ConnHtabBin;
struct ConnHtabBin
{
MotionConn *conn;
struct ConnHtabBin *next;
};
/*
* ConnHashTable
*
* Connection hash table definition.
*
*/
typedef struct ConnHashTable ConnHashTable;
struct ConnHashTable
{
MemoryContext cxt;
ConnHtabBin **table;
int size;
};
/* TODO: Should figure out a way to set this hash table size. */
#define DEFAULT_CONN_HTAB_SIZE (Max((128*Gp_interconnect_hash_multiplier), 16))
#define CONN_HASH_VALUE(icpkt) ((uint32)((((icpkt)->srcPid ^ (icpkt)->dstPid)) + (icpkt)->dstContentId))
#define CONN_HASH_MATCH(a, b) (((a)->motNodeId == (b)->motNodeId && \
(a)->dstContentId == (b)->dstContentId && \
(a)->srcContentId == (b)->srcContentId && \
(a)->recvSliceIndex == (b)->recvSliceIndex && \
(a)->sendSliceIndex == (b)->sendSliceIndex && \
(a)->srcPid == (b)->srcPid && \
(a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId))
/*
* Cursor IC table definition.
*
* For cursor case, there may be several concurrent interconnect
* instances on QD. The table is used to track the status of the
* instances, which is quite useful for "ACK the past and NAK the future" paradigm.
*
*/
#define CURSOR_IC_TABLE_SIZE (128)
/*
* CursorICHistoryEntry
*
* The definition of cursor IC history entry.
*/
typedef struct CursorICHistoryEntry CursorICHistoryEntry;
struct CursorICHistoryEntry
{
/* Interconnect instance id. */
uint32 icId;
/* Command id. */
uint32 cid;
/* Interconnect instance status.
* state 1 (value 1): interconnect is setup
* state 0 (value 0): interconnect was torn down.
*/
uint8 status;
/* Next entry. */
CursorICHistoryEntry *next;
};
/*
* CursorICHistoryTable
*
* Cursor IC history table. It is a small hash table.
*/
typedef struct CursorICHistoryTable CursorICHistoryTable;
struct CursorICHistoryTable
{
uint32 count;
CursorICHistoryEntry *table[CURSOR_IC_TABLE_SIZE];
};
/*
* Synchronization timeout values
*
* MAIN_THREAD_COND_TIMEOUT - 1/4 second in usec
*/
#define MAIN_THREAD_COND_TIMEOUT (250000)
/* Turn on/off UDP signal (on by default for Mac OS) */
/* #define IC_USE_PTHREAD_SYNCHRONIZATION */
/*
* Used for synchronization between main thread (receiver) and background thread.
*
*/
typedef struct ThreadWaitingState ThreadWaitingState;
struct ThreadWaitingState
{
bool waiting;
int waitingNode;
int waitingRoute;
int reachRoute;
/* main_thread_waiting_query is needed to disambiguate for cursors */
int waitingQuery;
};
/*
* ReceiveControlInfo
*
* The related control information for receiving data packets.
* Main thread (Receiver) and background thread use the information in
* this data structure to handle data packets.
*
*/
typedef struct ReceiveControlInfo ReceiveControlInfo;
struct ReceiveControlInfo
{
/* Main thread waiting state. */
ThreadWaitingState mainWaitingState;
/*
* Buffers used to assemble disorder messages at receiver side.
*/
icpkthdr *disorderBuffer;
/* The last interconnect instance id which is torn down. */
uint32 lastTornIcId;
/* Cursor history table. */
CursorICHistoryTable cursorHistoryTable;
/*
* Last distributed transaction id when SetupUDPInterconnect is called.
* Coupled with cursorHistoryTable, it is used to handle multiple concurrent cursor
* cases.
*/
DistributedTransactionId lastDXatId;
};
/*
* Main thread (Receiver) and background thread use the information in
* this data structure to handle data packets.
*/
static ReceiveControlInfo rx_control_info;
/*
* RxBufferPool
*
* Receive thread buffer pool definition. The implementation of
* receive side buffer pool is different from send side buffer pool.
* It is because receive side buffer pool needs a ring buffer to
* easily implement disorder message handling logic.
*/
typedef struct RxBufferPool RxBufferPool;
struct RxBufferPool
{
/* The max number of buffers we can get from this pool. */
int maxCount;
/* The number of allocated buffers */
int count;
/* The list of free buffers. */
char *freeList;
};
/*
* The buffer pool used for keeping data packets.
*
* maxCount is set to 1 to make sure there is always a buffer
* for picking packets from OS buffer.
*/
static RxBufferPool rx_buffer_pool = {1, 0, NULL};
/*
* SendBufferPool
*
* The send side buffer pool definition.
*
*/
typedef struct SendBufferPool SendBufferPool;
struct SendBufferPool
{
/* The maximal number of buffers sender can use. */
int maxCount;
/* The number of buffers sender already used. */
int count;
/* The free buffer list at the sender side. */
ICBufferList freeList;
};
/*
* The sender side buffer pool.
*/
static SendBufferPool snd_buffer_pool;
/*
* SendControlInfo
*
* The related control information for sending data packets and handing acks.
* Main thread use the information in this data structure to do ack handling
* and congestion control.
*
*/
typedef struct SendControlInfo SendControlInfo;
struct SendControlInfo
{
/* The buffer used for accepting acks */
icpkthdr *ackBuffer;
/* congestion window */
float cwnd;
/* minimal congestion control window */
float minCwnd;
/* slow start threshold */
float ssthresh;
};
/*
* Main thread use the information in this data structure to do ack handling
* and congestion control.
*/
static SendControlInfo snd_control_info;
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
/*
* UDPSignal
* The udp interconnect specific implementation of timeout wait/signal mechanism.
* (Only used for MacOS to avoid the bug in MacOS 10.6.x: MPP-9910).
* More details are available in the functions to implement UDPSignal.
*/
typedef struct UDPSignal UDPSignal;
struct UDPSignal
{
/* We often use the address of a pthread condition variable as signal/condition id. */
void *sigId;
/* The udp socket fd to implement the mechanism. */
int fd;
/* The port. */
int32 port;
/* The UDP socket address family */
int txFamily;
/* Address info. */
struct sockaddr_storage peer;
socklen_t peer_len;
};
#endif
/*
* ICGlobalControlInfo
*
* Some shared control information that is used by main thread (senders, receivers, or both)
* and the background thread.
*
*/
typedef struct ICGlobalControlInfo ICGlobalControlInfo;
struct ICGlobalControlInfo
{
/* The background thread handle. */
pthread_t threadHandle;
/* flag showing whether the thread is created. */
bool threadCreated;
/* The lock protecting eno field. */
pthread_mutex_t errorLock;
int eno;
/* Keep the udp socket buffer size used. */
uint32 socketSendBufferSize;
uint32 socketRecvBufferSize;
uint64 lastExpirationCheckTime;
uint64 lastDeadlockCheckTime;
/* Used to decide whether to retransmit for capacity based FC. */
uint64 lastPacketSendTime;
/* MemoryContext for UDP interconnect. */
MemoryContext memContext;
/*
* Lock and condition variable for coordination between
* main thread and background thread. It protects the shared data
* between the two thread (the connHtab, rx buffer pool and the mainWaitingState etc.).
*/
pthread_mutex_t lock;
pthread_cond_t cond;
/* Am I a sender? */
bool isSender;
/* Global connection htab for both sending connections and
* receiving connections. Protected by the lock in this data structure.
*/
ConnHashTable connHtab;
/* The connection htab used to cache future packets. */
ConnHashTable startupCacheHtab;
/* Used by main thread to ask the background thread to exit. */
uint32 shutdown;
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
UDPSignal usig;
#endif
};
/*
* Shared control information that is used by senders, receivers and background thread.
*/
static ICGlobalControlInfo ic_control_info;
/*
* Macro for unack queue ring, round trip time (RTT) and expiration period (RTO)
*
* UNACK_QUEUE_RING_SLOTS_NUM - the number of slots in the unack queue ring.
* this value should be greater than or equal to 2.
* TIMER_SPAN - timer period in us
* TIMER_CHECKING_PERIOD - timer checking period in us
* UNACK_QUEUE_RING_LENGTH - the whole time span of the unack queue ring
* DEFAULT_RTT - default rtt in us.
* MIN_RTT - min rtt in us
* MAX_RTT - max rtt in us
* RTT_SHIFT_COEFFICIENT - coefficient for RTT computation
*
* DEFAULT_DEV - default round trip standard deviation
* MAX_DEV - max dev
* DEV_SHIFT_COEFFICIENT - coefficient for DEV computation
*
* MAX_EXPIRATION_PERIOD - max expiration period in us
* MIN_EXPIRATION_PERIOD - min expiration period in us
* MAX_TIME_NO_TIMER_CHECKING - max time without checking timer
* DEADLOCK_CHECKING_TIME - deadlock checking time
*
* MAX_SEQS_IN_DISORDER_ACK - max number of sequences that can be transmitted in a
* disordered packet ack.
*
*
* Considerations on the settings of the values:
*
* TIMER_SPAN and UNACK_QUEUE_RING_SLOTS_NUM define the ring period.
* Currently, it is UNACK_QUEUE_RING_LENGTH (default 10 seconds).
*
* The definition of UNACK_QUEUE_RING_LENGTH is quite related to the size of
* sender side buffer and the size we may resend in a burst for an expiration event
* (which may overwhelm switch or OS if it is too large).
* Thus, we do not want to send too much data in a single expiration event. Here, a
* relatively large UNACK_QUEUE_RING_SLOTS_NUM value is used to avoid that.
*
* If the sender side buffer is X (MB), then on each slot,
* there are about X/UNACK_QUEUE_RING_SLOTS_NUM. Even we have a very large sender buffer,
* for example, 100MB, there is about 96M/2000 = 50K per slot.
* This is fine for the OS (with buffer 2M for each socket generally) and switch.
*
* Note that even when the buffers are not evenly distributed in the ring and there are some packet
* losses, the congestion control mechanism, the disorder and duplicate packet handling logic will
* make assure the number of outstanding buffers (in unack queues) not very large.
*
* MIN_RTT/MAX_RTT/DEFAULT_RTT/MIN_EXPIRATION_PERIOD/MAX_EXPIRATION_PERIOD gives some heuristic values about
* the computation of RTT and expiration period. RTT and expiration period (RTO) are not
* constant for various kinds of hardware and workloads. Thus, they are computed dynamically.
* But we also want to bound the values of RTT and MAX_EXPIRATION_PERIOD. It is
* because there are some faults that may make RTT a very abnormal value. Thus, RTT and
* expiration period are upper and lower bounded.
*
* MAX_SEQS_IN_DISORDER_ACK should be smaller than (MIN_PACKET_SIZE - sizeof(icpkthdr))/sizeof(uint32).
* It is due to the limitation of the ack receive buffer size.
*
*/
#define UNACK_QUEUE_RING_SLOTS_NUM (2000)
#define TIMER_SPAN (Gp_interconnect_timer_period * 1000) /* default: 5ms */
#define TIMER_CHECKING_PERIOD (Gp_interconnect_timer_checking_period) /* default: 20ms */
#define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN)
#define DEFAULT_RTT (Gp_interconnect_default_rtt * 1000) /* default: 20ms */
#define MIN_RTT (100) /* 0.1ms */
#define MAX_RTT (200 * 1000) /* 200ms */
#define RTT_SHIFT_COEFFICIENT (3) /* RTT_COEFFICIENT 1/8 (0.125) */
#define DEFAULT_DEV (0)
#define MIN_DEV MIN_RTT
#define MAX_DEV MAX_RTT
#define DEV_SHIFT_COEFFICIENT (2) /* DEV_COEFFICIENT 1/4 (0.25) */
#define MAX_EXPIRATION_PERIOD (1000 * 1000) /* 1s */
#define MIN_EXPIRATION_PERIOD (Gp_interconnect_min_rto * 1000) /* default: 20ms */
#define MAX_TIME_NO_TIMER_CHECKING (50 * 1000) /* 50ms */
#define DEADLOCK_CHECKING_TIME (512 * 1000) /* 512ms */
#define MAX_SEQS_IN_DISORDER_ACK (4)
/*
* UnackQueueRing
*
* An unacked queue ring is used to decide which packet is expired in constant time.
*
* Each slot of the ring represents a fixed time span, for example 1ms, and
* each slot has a associated buffer list/queue which contains the packets
* which will expire in the time span.
*
* If the current time pointer (time t) points to slot 1,
* then slot 2 represents the time span from t + 1ms to t + 2ms.
* When we check whether there are some packets expired, we start from the last
* current time recorded, and resend all the packets in the queue
* until we reach the slot that the updated current time points to.
*
*/
typedef struct UnackQueueRing UnackQueueRing;
struct UnackQueueRing
{
/* save the current time when we check the time wheel for expiration */
uint64 currentTime;
/* the slot index corresponding to current time */
int idx;
/* the number of outstanding packets in unack queue ring */
int numOutStanding;
/* the number of outstanding packets that use the
* shared bandwidth in the congestion window. */
int numSharedOutStanding;
/* time slots */
ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM];
};
/*
* All connections in a process share this unack queue ring instance.
*/
static UnackQueueRing unack_queue_ring = {0, 0, 0};
/*
* AckSendParam
*
* The prarmeters for ack sending.
*/
typedef struct AckSendParam
{
/* header for the ack */
icpkthdr msg;
/* peer address for the ack */
struct sockaddr_storage peer;
socklen_t peer_len;
} AckSendParam;
/*
* ICStatistics
*
* A structure keeping various statistics about interconnect internal.
*
* Note that the statistics for ic is not accurate for multiple cursor case on QD.
*
* totalRecvQueueSize - receive queue size sum when main thread is trying to get a packet.
* recvQueueSizeCountingTime - counting times when computing totalRecvQueueSize.
* totalCapacity - the capacity sum when packets are tried to be sent.
* capacityCountingTime - counting times used to compute totalCapacity.
* totalBuffers - total buffers available when sending packets.
* bufferCountingTime - counting times when compute totalBuffers.
* retransmits - the number of packet retransmits.
* mismatchNum - the number of mismatched packets received.
* crcErrors - the number of crc errors.
* sndPktNum - the number of packets sent by sender.
* recvPktNum - the number of packets received by receiver.
* disorderedPktNum - disordered packet number.
* duplicatedPktNum - duplicate packet number.
* recvAckNum - the number of Acks received.
* statusQueryMsgNum - the number of status query messages sent.
*
*/
typedef struct ICStatistics
{
uint64 totalRecvQueueSize;
uint64 recvQueueSizeCountingTime;
uint64 totalCapacity;
uint64 capacityCountingTime;
uint64 totalBuffers;
uint64 bufferCountingTime;
int32 retransmits;
int32 startupCachedPktNum;
int32 mismatchNum;
int32 crcErrors;
int32 sndPktNum;
int32 recvPktNum;
int32 disorderedPktNum;
int32 duplicatedPktNum;
int32 recvAckNum;
int32 statusQueryMsgNum;
} ICStatistics;
/* Statistics for UDP interconnect. */
static ICStatistics ic_statistics;
/*=========================================================================
* STATIC FUNCTIONS declarations
*/
/* Cursor IC History table related functions. */
static void initCursorICHistoryTable(CursorICHistoryTable *t);
static void addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid);
static void updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status);
static CursorICHistoryEntry *getCursorIcEntry(CursorICHistoryTable *t, uint32 icId);
static void pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId);
static void purgeCursorIcEntry(CursorICHistoryTable *t);
static void resetMainThreadWaiting(ThreadWaitingState *state);
static void setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int route, int icId);
/* Background thread error handling functions. */
static void checkRxThreadError(void);
static void setRxThreadError(int eno);
static void resetRxThreadError(void);
static void getSockAddr(struct sockaddr_storage * peer, socklen_t * peer_len, const char * listenerAddr, int listenerPort);
static void setXmitSocketOptions(int txfd);
static uint32 setSocketBufferSize(int fd, int type, int expectedSize, int leastSize);
static void setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFamily);
static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates,
Slice *sendSlice,
int *pOutgoingCount);
static void setupOutgoingUDPConnection(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *conn);
static char *formatSockAddr(struct sockaddr *sa, char* buf, int bufsize);
/* Connection hash table functions. */
static bool initConnHashTable(ConnHashTable *ht, MemoryContext ctx);
static bool connAddHash(ConnHashTable *ht, MotionConn *conn);
static MotionConn *findConnByHeader(ConnHashTable *ht, icpkthdr *hdr);
static void destroyConnHashTable(ConnHashTable *ht);
static inline void sendAckWithParam(AckSendParam *param);
static void sendAck(MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq);
static void sendDisorderAck(MotionConn *conn, uint32 seq, uint32 extraSeq, uint32 lostPktCnt);
static void sendStatusQueryMessage(MotionConn *conn, int fd, uint32 seq);
static inline void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen);
static void putRxBufferAndSendAck(MotionConn *conn, AckSendParam *param);
static inline void putRxBufferToFreeList(RxBufferPool *p, icpkthdr *buf);
static inline icpkthdr *getRxBufferFromFreeList(RxBufferPool *p);
static icpkthdr *getRxBuffer(RxBufferPool *p);
static void initRxBufferPool(RxBufferPool *p);
/* ICBufferList functions. */
static inline void icBufferListInitHeadLink(ICBufferLink *link);
static inline void icBufferListInit(ICBufferList *list, ICBufferListType type);
static inline bool icBufferListIsHead(ICBufferList *list, ICBufferLink *link);
static inline ICBufferLink *icBufferListFirst(ICBufferList *list);
static inline int icBufferListLength(ICBufferList *list);
static inline ICBuffer *icBufferListDelete(ICBufferList *list, ICBuffer *buf);
static inline ICBuffer *icBufferListPop(ICBufferList *list);
static void icBufferListFree(ICBufferList *list);
static inline ICBuffer *icBufferListAppend(ICBufferList *list, ICBuffer *buf);
static void icBufferListReturn(ICBufferList *list, bool inExpirationQueue);
static void SetupUDPInterconnect_Internal(EState *estate);
static inline TupleChunkListItem
RecvTupleChunkFromAnyUDP_Internal(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute);
static inline TupleChunkListItem
RecvTupleChunkFromUDP_Internal(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute);
static void TeardownUDPInterconnect_Internal(ChunkTransportState *transportStates,
MotionLayerState *mlStates,
bool forceEOS);
static void freeDisorderedPackets(MotionConn *conn);
static void checkForCancelFromQD(ChunkTransportState *pTransportStates);
static void prepareRxConnForRead(MotionConn *conn);
static TupleChunkListItem RecvTupleChunkFromAnyUDP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute);
static TupleChunkListItem RecvTupleChunkFromUDP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute);
static TupleChunkListItem
receiveChunksUDP(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
int16 motNodeID, int16 *srcRoute, MotionConn *conn, bool inTeardown);
static void SendEosUDP(MotionLayerState *mlStates, ChunkTransportState *transportStates,
int motNodeID, TupleChunkListItem tcItem);
static bool SendChunkUDP(MotionLayerState *mlStates, ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn * conn, TupleChunkListItem tcItem, int16 motionId);
static void doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID);
static bool dispatcherAYT(void);
static void checkQDConnectionAlive(void);
static void *rxThreadFunc(void *arg);
static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void inline handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now);
static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry);
static void handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, int16 motionId);
static void handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt);
static bool handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param);
static bool handleAckForDuplicatePkt(MotionConn *conn, icpkthdr *pkt);
static bool handleAckForDisorderPkt(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, icpkthdr *pkt);
static inline void prepareXmit(MotionConn *conn);
static inline void addCRC(icpkthdr *pkt);
static inline bool checkCRC(icpkthdr *pkt);
static void sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn);
static void sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, ICBuffer *buf, MotionConn * conn);
static inline uint64 computeExpirationPeriod(MotionConn *conn, uint32 retry);
static ICBuffer *getSndBuffer(MotionConn *conn);
static void initSndBufferPool();
static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now);
static void initUnackQueueRing(UnackQueueRing *uqr);
static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
static void checkDeadlock(ChunkTransportStateEntry *pEntry, MotionConn *conn);
static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len);
static void cleanupStartupCache(void);
static void handleCachedPackets(void);
static uint64 getCurrentTime(void);
static void initMutex(pthread_mutex_t *mutex);
static bool waitOnCondition(int timeout_us, pthread_cond_t *cond, pthread_mutex_t *mutex);
static inline void logPkt(char *prefix, icpkthdr *pkt);
static void aggregateStatistics(ChunkTransportStateEntry *pEntry);
static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
static int udpSignalTimeoutWait(UDPSignal *sig, pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *ts);
static bool udpSignalPoll(UDPSignal *sig, int timeout);
static bool udpSignalGet(UDPSignal *sig);
static void setupUDPSignal(UDPSignal *sig);
static void destroyUDPSignal(UDPSignal *sig);
static void udpSignal(UDPSignal *sig);
#endif
/* #define TRANSFER_PROTOCOL_STATS */
#ifdef TRANSFER_PROTOCOL_STATS
typedef enum TransProtoEvent TransProtoEvent;
enum TransProtoEvent {
TPE_DATA_PKT_SEND,
TPE_ACK_PKT_QUERY
};
typedef struct TransProtoStatEntry TransProtoStatEntry;
struct TransProtoStatEntry
{
TransProtoStatEntry *next;
/* Basic information */
uint32 time;
TransProtoEvent event;
int dstPid;
uint32 seq;
/* more attributes can be added on demand. */
/*
* float cwnd;
* int capacity;
*/
};
typedef struct TransProtoStats TransProtoStats;
struct TransProtoStats
{
pthread_mutex_t lock;
TransProtoStatEntry *head;
TransProtoStatEntry *tail;
uint64 count;
uint64 startTime;
};
static TransProtoStats trans_proto_stats = {PTHREAD_MUTEX_INITIALIZER, NULL, NULL, 0};
/*
* initTransProtoStats
* Initialize the transport protocol states data structures.
*/
static void
initTransProtoStats()
{
pthread_mutex_lock(&trans_proto_stats.lock);
while (trans_proto_stats.head) {
TransProtoStatEntry *cur = NULL;
cur = trans_proto_stats.head;
trans_proto_stats.head = trans_proto_stats.head->next;
free(cur);
trans_proto_stats.count--;
}
trans_proto_stats.head = NULL;
trans_proto_stats.tail = NULL;
trans_proto_stats.count = 0;
trans_proto_stats.startTime = getCurrentTime();
pthread_mutex_unlock(&trans_proto_stats.lock);
}
static void
updateStats(TransProtoEvent event, MotionConn *conn, icpkthdr *pkt)
{
TransProtoStatEntry *new = NULL;
/* Add to list */
new = (TransProtoStatEntry *) malloc(sizeof(TransProtoStatEntry));
if (!new)
return;
memset(new, 0, sizeof(*new));
/* change the list */
pthread_mutex_lock(&trans_proto_stats.lock);
if (trans_proto_stats.count == 0)
{
/* 1st element */
trans_proto_stats.head = new;
trans_proto_stats.tail = new;
}
else
{
trans_proto_stats.tail->next = new;
trans_proto_stats.tail = new;
}
trans_proto_stats.count++;
new->time = getCurrentTime() - trans_proto_stats.startTime;
new->event = event;
new->dstPid = pkt->dstPid;
new->seq = pkt->seq;
/* Other attributes can be added on demand
* new->cwnd = snd_control_info.cwnd;
* new->capacity = conn->capacity;
*/
pthread_mutex_unlock(&trans_proto_stats.lock);
}
static void
dumpTransProtoStats()
{
char tmpbuf[32];
snprintf(tmpbuf, 32, "%d." UINT64_FORMAT "txt", MyProcPid, getCurrentTime());
FILE *ofile = fopen(tmpbuf, "w+");
pthread_mutex_lock(&trans_proto_stats.lock);
while (trans_proto_stats.head) {
TransProtoStatEntry *cur = NULL;
cur = trans_proto_stats.head;
trans_proto_stats.head = trans_proto_stats.head->next;
fprintf(ofile, "time %d event %d seq %d destpid %d\n", cur->time, cur->event, cur->seq, cur->dstPid);
free(cur);
trans_proto_stats.count--;
}
trans_proto_stats.tail = NULL;
pthread_mutex_unlock(&trans_proto_stats.lock);
fclose(ofile);
}
#endif /* TRANSFER_PROTOCOL_STATS */
/*
* initCursorICHistoryTable
* Initialize cursor ic history table.
*/
static void
initCursorICHistoryTable(CursorICHistoryTable *t)
{
t->count = 0;
memset(t->table, 0, sizeof(t->table));
}
/*
* addCursorIcEntry
* Add an entry to the the cursor ic table.
*/
static void
addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid)
{
MemoryContext old;
CursorICHistoryEntry *p;
uint32 index = icId % CURSOR_IC_TABLE_SIZE;
old = MemoryContextSwitchTo(ic_control_info.memContext);
p = palloc0(sizeof(struct CursorICHistoryEntry));
MemoryContextSwitchTo(old);
p->icId = icId;
p->cid = cid;
p->status = 1;
p->next = t->table[index];
t->table[index] = p;
t->count++;
elog(DEBUG2, "add icid %d cid %d status %d", p->icId, p->cid, p->status);
return;
}
/*
* updateCursorIcEntry
* Update the status of the cursor ic entry for a give interconnect instance id.
*
* There are two states for an instance of interconnect.
* state 1 (value 1): interconnect is setup
* state 0 (value 0): interconnect was torn down.
*/
static void
updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status)
{
struct CursorICHistoryEntry *p;
uint8 index = icId % CURSOR_IC_TABLE_SIZE;
for (p = t->table[index]; p; p = p->next)
{
if (p->icId == icId)
{
p->status = status;
return;
}
}
/* not found */
}
/*
* getCursorIcEntry
* Get the cursor entry given interconnect id.
*/
static CursorICHistoryEntry *
getCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
{
struct CursorICHistoryEntry *p;
uint8 index = icId % CURSOR_IC_TABLE_SIZE;
for (p = t->table[index]; p; p = p->next)
{
if (p->icId == icId)
{
return p;
}
}
/* not found */
return NULL;
}
/*
* pruneCursorIcEntry
* Prune entries in the hash table.
*/
static void
pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
{
uint8 index;
for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++)
{
struct CursorICHistoryEntry *p, *q;
p = t->table[index];
q = NULL;
while (p)
{
/* remove an entry if it is older than the prune-point */
if (p->icId < icId)
{
struct CursorICHistoryEntry *trash;
if (!q)
{
t->table[index] = p->next;
}
else
{
q->next = p->next;
}
trash = p;
/* set up next loop */
p = trash->next;
pfree(trash);
t->count--;
}
else
{
q = p;
p = p->next;
}
}
}
}
/*
* purgeCursorIcEntry
* Clean cursor ic history table.
*/
static void
purgeCursorIcEntry(CursorICHistoryTable *t)
{
uint8 index;
for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++)
{
struct CursorICHistoryEntry *trash;
while (t->table[index])
{
trash = t->table[index];
t->table[index] = trash->next;
pfree(trash);
}
}
}
/*
* resetMainThreadWaiting
* Reset main thread waiting state.
*/
static void
resetMainThreadWaiting(ThreadWaitingState *state)
{
state->waiting = false;
state->waitingNode = -1;
state->waitingRoute = ANY_ROUTE;
state->reachRoute = ANY_ROUTE;
state->waitingQuery = -1;
}
/*
* setMainThreadWaiting
* Set main thread waiting state.
*/
static void
setMainThreadWaiting(ThreadWaitingState *state, int motNodeId, int route, int icId)
{
state->waiting = true;
state->waitingNode = motNodeId;
state->waitingRoute = route;
state->reachRoute = ANY_ROUTE;
state->waitingQuery = icId;
}
/*
* checkRxThreadError
* Check whether there was error in the background thread in main thread.
*
* If error found, report it.
*/
static void
checkRxThreadError()
{
pthread_mutex_lock(&ic_control_info.errorLock);
if (ic_control_info.eno != 0)
{
errno = ic_control_info.eno;
pthread_mutex_unlock(&ic_control_info.errorLock);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect encountered an error"),
errdetail("%m%s", "in receive background thread,")));
}
pthread_mutex_unlock(&ic_control_info.errorLock);
}
/*
* setRxThreadError
* Set the error no in background thread.
*
* Record the error in background thread. Main thread checks the errors periodically.
* If main thread will find it, main thread will handle it.
*/
static void
setRxThreadError(int eno)
{
pthread_mutex_lock(&ic_control_info.errorLock);
/* always let main thread know the error that occurred first. */
if (ic_control_info.eno == 0)
{
ic_control_info.eno = eno;
write_log("Interconnect error: in background thread, set ic_control_info.eno to %d, rx_buffer_pool.count %d, rx_buffer_pool.maxCount %d", eno, rx_buffer_pool.count, rx_buffer_pool.maxCount);
}
pthread_mutex_unlock(&ic_control_info.errorLock);
}
/*
* resetRxThreadError
* Reset the error no.
*
*/
static void
resetRxThreadError()
{
pthread_mutex_lock(&ic_control_info.errorLock);
ic_control_info.eno = 0;
pthread_mutex_unlock(&ic_control_info.errorLock);
}
/*
* setupUDPListeningSocket
* Setup udp listening socket.
*/
static void
setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFamily)
{
int errnoSave;
int fd = -1;
const char *fun;
/*
* At the moment, we don't know which of IPv6 or IPv4 is wanted,
* or even supported, so just ask getaddrinfo...
*
* Perhaps just avoid this and try socket with AF_INET6 and AF_INT?
*
* Most implementation of getaddrinfo are smart enough to only
* return a particular address family if that family is both enabled,
* and at least one network adapter has an IP address of that family.
*/
struct addrinfo hints;
struct addrinfo *addrs, *rp;
int s;
struct sockaddr_storage our_addr;
socklen_t our_addr_len;
char service[32];
snprintf(service,32,"%d",0);
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
#ifdef USE_ASSERT_CHECKING
if (gp_udpic_network_disable_ipv6)
hints.ai_family = AF_INET;
#endif
#ifdef __darwin__
hints.ai_family = AF_INET; /* Due to a bug in OSX Leopard, disable IPv6 for UDP interconnect on all OSX platforms */
#endif
fun = "getaddrinfo";
s = getaddrinfo(NULL, service, &hints, &addrs);
if (s != 0)
elog(ERROR, "getaddrinfo says %s", gai_strerror(s));
/*
* getaddrinfo() returns a list of address structures,
* one for each valid address and family we can use.
*
* Try each address until we successfully bind.
* If socket (or bind) fails, we (close the socket
* and) try the next address. This can happen if
* the system supports IPv6, but IPv6 is disabled from
* working, or if it supports IPv6 and IPv4 is disabled.
*/
/*
* If there is both an AF_INET6 and an AF_INET choice,
* we prefer the AF_INET6, because on UNIX it can receive either
* protocol, whereas AF_INET can only get IPv4. Otherwise we'd need
* to bind two sockets, one for each protocol.
*
* Why not just use AF_INET6 in the hints? That works perfect
* if we know this machine supports IPv6 and IPv6 is enabled,
* but we don't know that.
*/
#ifndef __darwin__
#ifdef HAVE_IPV6
if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6)
{
/*
* We got both an INET and INET6 possibility, but we want to prefer the INET6 one if it works.
* Reverse the order we got from getaddrinfo so that we try things in our preferred order.
* If we got more possibilities (other AFs??), I don't think we care about them, so don't
* worry if the list is more that two, we just rearrange the first two.
*/
struct addrinfo *temp = addrs->ai_next; /* second node */
addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to third node if any */
temp->ai_next = addrs; /* point second node to first */
addrs = temp; /* start the list with the old second node */
elog(DEBUG1,"Have both IPv6 and IPv4 choices");
}
#endif
#endif
for (rp = addrs; rp != NULL; rp = rp->ai_next)
{
fun = "socket";
/*
* getaddrinfo gives us all the parameters for the socket() call
* as well as the parameters for the bind() call.
*/
elog(DEBUG1,"receive socket ai_family %d ai_socktype %d ai_protocol %d",rp->ai_family, rp->ai_socktype, rp->ai_protocol);
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd == -1)
continue;
elog(DEBUG1,"receive socket %d ai_family %d ai_socktype %d ai_protocol %d",fd,rp->ai_family, rp->ai_socktype, rp->ai_protocol);
fun = "fcntl(O_NONBLOCK)";
if (!pg_set_noblock(fd))
{
if (fd >= 0)
closesocket(fd);
continue;
}
fun = "bind";
elog(DEBUG1,"bind addrlen %d fam %d",rp->ai_addrlen,rp->ai_addr->sa_family);
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0)
{
*txFamily = rp->ai_family;
break; /* Success */
}
if (fd >= 0)
closesocket(fd);
}
if (rp == NULL)
{ /* No address succeeded */
goto error;
}
freeaddrinfo(addrs); /* No longer needed */
/*
* Get our socket address (IP and Port), which we will save for others to connected to.
*/
MemSet(&our_addr, 0, sizeof(our_addr));
our_addr_len = sizeof(our_addr);
fun = "getsockname";
if (getsockname(fd, (struct sockaddr *) &our_addr, &our_addr_len) < 0)
goto error;
Assert(our_addr.ss_family == AF_INET || our_addr.ss_family == AF_INET6 );
*listenerSocketFd = fd;
if (our_addr.ss_family == AF_INET6)
*listenerPort = ntohs(((struct sockaddr_in6 *)&our_addr)->sin6_port);
else
*listenerPort = ntohs(((struct sockaddr_in *)&our_addr)->sin_port);
setXmitSocketOptions(fd);
return;
error:
errnoSave = errno;
if (fd >= 0)
closesocket(fd);
errno = errnoSave;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not set up udp listener socket."),
errdetail("%m%s", fun)));
return;
}
/*
* InitMutex
* Initialize mutex.
*/
static void
initMutex(pthread_mutex_t *mutex)
{
pthread_mutexattr_t m_atts;
pthread_mutexattr_init(&m_atts);
pthread_mutexattr_settype(&m_atts, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(mutex, &m_atts);
}
/*
* InitMotionUDP
* Initialize UDP specific comms, and create rx-thread.
*/
void
InitMotionUDP(int *listenerSocketFd, uint16 *listenerPort)
{
int pthread_err;
int txFamily = -1;
/* attributes of the thread we're creating */
pthread_attr_t t_atts;
MemoryContext old;
/* Initialize global ic control data. */
ic_control_info.eno = 0;
ic_control_info.isSender = false;
ic_control_info.socketSendBufferSize = 2 * 1024 * 1024;
ic_control_info.socketRecvBufferSize = 2 * 1024 * 1024;
ic_control_info.memContext = AllocSetContextCreate(TopMemoryContext,
"UdpInterconnectMemContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
initMutex(&ic_control_info.errorLock);
initMutex(&ic_control_info.lock);
pthread_cond_init(&ic_control_info.cond, NULL);
ic_control_info.shutdown = 0;
old = MemoryContextSwitchTo(ic_control_info.memContext);
initConnHashTable(&ic_control_info.connHtab, ic_control_info.memContext);
if (!initConnHashTable(&ic_control_info.startupCacheHtab, NULL))
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to initialize connection htab for startup cache")));
/*
* setup listening socket.
*/
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily);
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
setupUDPSignal(&ic_control_info.usig);
#endif
/* Initialize receive control data. */
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
/* allocate a buffer for sending disorder messages */
rx_control_info.disorderBuffer = palloc0(MIN_PACKET_SIZE);
rx_control_info.lastDXatId = InvalidTransactionId;
rx_control_info.lastTornIcId = 0;
initCursorICHistoryTable(&rx_control_info.cursorHistoryTable);
initRxBufferPool(&rx_buffer_pool);
/* Initialize send control data */
snd_control_info.cwnd = 0;
snd_control_info.minCwnd = 0;
snd_control_info.ackBuffer = palloc0(MIN_PACKET_SIZE);
MemoryContextSwitchTo(old);
#ifdef TRANSFER_PROTOCOL_STATS
initMutex(&trans_proto_stats.lock);
#endif
/* Start up our rx-thread */
/* save ourselves some memory: the defaults for thread stack
* size are large (1M+) */
pthread_attr_init(&t_atts);
#ifdef pg_on_solaris
/* Solaris doesn't have PTHREAD_STACK_MIN ? */
pthread_attr_setstacksize(&t_atts, (128*1024));
#else
pthread_attr_setstacksize(&t_atts, Max(PTHREAD_STACK_MIN, (128*1024)));
#endif
pthread_err = pthread_create(&ic_control_info.threadHandle, &t_atts, rxThreadFunc, NULL);
pthread_attr_destroy(&t_atts);
if (pthread_err != 0)
{
ic_control_info.threadCreated = false;
ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("InitMotionLayerIPC: failed to create thread"),
errdetail("pthread_create() failed with err %d", pthread_err)));
}
ic_control_info.threadCreated = true;
return;
}
/*
* CleanupMotionUDP
* Clean up UDP specific stuff such as cursor ic hash table, thread etc.
*/
void
CleanupMotionUDP(void)
{
elog(DEBUG2, "udp-ic: telling receiver thread to shutdown.");
/*
* We should not hold any lock when we reach here even
* when we report FATAL errors. Just in case,
* We still release the locks here.
*/
pthread_mutex_unlock(&ic_control_info.errorLock);
pthread_mutex_unlock(&ic_control_info.lock);
/* Shutdown rx thread. */
compare_and_swap_32(&ic_control_info.shutdown, 0, 1);
if(ic_control_info.threadCreated)
{
pthread_join(ic_control_info.threadHandle, NULL);
}
elog(DEBUG2, "udp-ic: receiver thread shutdown.");
purgeCursorIcEntry(&rx_control_info.cursorHistoryTable);
destroyConnHashTable(&ic_control_info.connHtab);
/* background thread exited, we can do the cleanup without locking. */
cleanupStartupCache();
destroyConnHashTable(&ic_control_info.startupCacheHtab);
/* free the disorder buffer */
pfree(rx_control_info.disorderBuffer);
rx_control_info.disorderBuffer = NULL;
/* free the buffer for acks */
pfree(snd_control_info.ackBuffer);
snd_control_info.ackBuffer = NULL;
MemoryContextDelete(ic_control_info.memContext);
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
destroyUDPSignal(&ic_control_info.usig);
#endif
#ifdef USE_ASSERT_CHECKING
/*
* Check malloc times, in Interconnect part, memory are carefully released in tear down
* code (even when error occurred). But if a FATAL error is reported, tear down
* code will not be executed. Thus, it is still possible the malloc times and free times
* do not match when we reach here. The process will die in this case, the mismatch does
* not introduce issues.
*/
if (icudp_malloc_times != 0)
elog(LOG, "WARNING: malloc times and free times do not match.");
#endif
}
/*
* waitOnCondition
* Used by sender/receiver to wait some time.
*
* MUST BE CALLED WITH *mutex* HELD!
*/
static bool
waitOnCondition(int timeout_us, pthread_cond_t *cond, pthread_mutex_t *mutex)
{
struct timespec ts;
int wait;
Assert(timeout_us >= 0);
/*
* MPP-9910: pthread_cond_timedwait appears to be broken in OS-X 10.6.x "Snow Leopard"
* Let's use a different timewait function that works better on OSX (and is simpler
* because it uses relative time)
*/
#ifdef __darwin__
ts.tv_sec = 0;
ts.tv_nsec = 1000 * timeout_us;
#else
{
struct timeval tv;
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec;
/* leave in ms for this */
ts.tv_nsec = (tv.tv_usec + timeout_us);
if (ts.tv_nsec >= 1000000)
{
ts.tv_sec++;
ts.tv_nsec -= 1000000;
}
ts.tv_nsec *= 1000; /* convert usec to nsec */
}
#endif
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
elog(DEBUG5, "waiting (timed) on route %d %s", rx_control_info.mainWaitingState.waitingRoute,
(rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE ? "(any route)" : ""));
}
/*
* interrupts may occurs when we are waiting. the interrupt handler
* only set some flags. Only when interrupt checking function is called,
* the interrupts are handled.
*
* We should pay attention to the fact that in elog/erreport/write_log,
* interrupts are checked.
*
*/
#if defined(__darwin__)
#if (defined(IC_USE_PTHREAD_SYNCHRONIZATION))
wait = pthread_cond_timedwait_relative_np(cond, mutex, &ts);
#else
wait = udpSignalTimeoutWait(&ic_control_info.usig, cond, mutex, &ts);
#endif
#else
wait = pthread_cond_timedwait(cond, mutex, &ts);
#endif
if (wait == ETIMEDOUT)
{
/* condition not met */
return false;
}
/* we didn't time out, condition met! */
return true;
}
/*
* initConnHashTable
* Initialize a connection hash table.
*/
static bool
initConnHashTable(ConnHashTable *ht, MemoryContext cxt)
{
int i;
ht->cxt = cxt;
ht->size = DEFAULT_CONN_HTAB_SIZE;
if (ht->cxt)
{
ht->table = (struct ConnHtabBin **) palloc0(ht->size * sizeof(struct ConnHtabBin *));
}
else
{
ht->table = (struct ConnHtabBin **) malloc(ht->size * sizeof(struct ConnHtabBin *));
if (ht->table == NULL)
return false;
}
for (i = 0; i < ht->size; i++)
ht->table[i] = NULL;
return true;
}
/*
* connAddHash
* Add a connection to the hash table
*
* Note: we want to add a connection to the hashtable if it isn't
* already there ... so we just have to check the pointer values -- no
* need to use CONN_HASH_MATCH() at all!
*/
static bool
connAddHash(ConnHashTable *ht, MotionConn *conn)
{
uint32 hashcode;
struct ConnHtabBin *bin, *newbin;
MemoryContext old;
hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;
/*
* check for collision -- if we already have an entry for this
* connection, don't add another one.
*/
for (bin = ht->table[hashcode]; bin != NULL; bin = bin->next)
{
if (bin->conn == conn)
{
elog(DEBUG5, "connAddHash(): duplicate ?! node %d route %d", conn->conn_info.motNodeId, conn->route);
return true; /* false *only* indicates memory-alloc failure. */
}
}
if (ht->cxt)
{
old = MemoryContextSwitchTo(ht->cxt);
newbin = (struct ConnHtabBin *) palloc0(sizeof(struct ConnHtabBin));
}
else
{
newbin = (struct ConnHtabBin *) malloc(sizeof(struct ConnHtabBin));
if (newbin == NULL)
return false;
}
newbin->conn = conn;
newbin->next = ht->table[hashcode];
ht->table[hashcode] = newbin;
if (ht->cxt)
MemoryContextSwitchTo(old);
return true;
}
/*
* connDelHash
* Delete a connection from the hash table
*
* Note: we want to remove a connection from the hashtable if it is
* there ... so we just have to check the pointer values -- no need to
* use CONN_HASH_MATCH() at all!
*/
static void
connDelHash(ConnHashTable *ht, MotionConn *conn)
{
uint32 hashcode;
struct ConnHtabBin *c, *p, *trash;
hashcode = CONN_HASH_VALUE(&conn->conn_info) % ht->size;
c = ht->table[hashcode];
/* find entry */
p = NULL;
while (c != NULL)
{
/* found ? */
if (c->conn == conn)
break;
p = c;
c = c->next;
}
/* not found ? */
if (c == NULL)
{
return;
}
/* found the connection, remove from the chain. */
trash = c;
if (p == NULL)
ht->table[hashcode] = c->next;
else
p->next = c->next;
if (ht->cxt)
pfree(trash);
else
free(trash);
return;
}
/*
* findConnByHeader
* Find the corresponding connection given a pkt header information.
*
* With the new mirroring scheme, the interconnect is no longer involved:
* we don't have to disambiguate anymore.
*
* NOTE: the icpkthdr field dstListenerPort is used for disambiguation.
* on receivers it may not match the actual port (it may have an extra bit
* set (1<<31)).
*/
static MotionConn *
findConnByHeader(ConnHashTable *ht, icpkthdr *hdr)
{
uint32 hashcode;
struct ConnHtabBin *bin;
MotionConn *ret = NULL;
hashcode = CONN_HASH_VALUE(hdr) % ht->size;
for (bin = ht->table[hashcode]; bin != NULL; bin = bin->next)
{
if (CONN_HASH_MATCH(&bin->conn->conn_info, hdr))
{
ret = bin->conn;
if (DEBUG5 >= log_min_messages)
write_log("findConnByHeader: found. route %d state %d hashcode %d conn %p",
ret->route, ret->state, hashcode, ret);
return ret;
}
}
if (DEBUG5 >= log_min_messages)
write_log("findConnByHeader: not found! (hdr->srcPid %d "
"hdr->srcContentId %d hdr->dstContentId %d hdr->dstPid %d sess(%d:%d) cmd(%d:%d)) hashcode %d",
hdr->srcPid, hdr->srcContentId, hdr->dstContentId, hdr->dstPid, hdr->sessionId,
gp_session_id, hdr->icId, gp_interconnect_id, hashcode);
return NULL;
}
/*
* destroyConnHashTable
* Release the connection hash table.
*/
static void
destroyConnHashTable(ConnHashTable *ht)
{
int i;
for (i = 0; i < ht->size; i++)
{
struct ConnHtabBin *trash;
while (ht->table[i] != NULL)
{
trash = ht->table[i];
ht->table[i] = trash->next;
if (ht->cxt)
pfree(trash);
else
free(trash);
}
}
if (ht->cxt)
pfree(ht->table);
else
free(ht->table);
}
/*
* sendControlMessage
* Helper function to send a control message.
*
* It is different from sendOnce which retries on interrupts...
* Here, we leave it to retransmit logic to handle these cases.
*/
static inline void
sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen)
{
int n;
#ifdef USE_ASSERT_CHECKING
if (testmode_inject_fault(gp_udpic_dropacks_percent))
{
#ifdef AMS_VERBOSE_LOGGING
write_log("THROW CONTROL MESSAGE with seq %d extraSeq %d srcpid %d despid %d", pkt->seq, pkt->extraSeq, pkt->srcPid, pkt->dstPid);
#endif
return;
}
#endif
/* Add CRC for the control message. */
if (gp_interconnect_full_crc)
addCRC(pkt);
n = sendto(fd, (const char *)pkt, pkt->len, 0, addr, peerLen);
/* No need to handle EAGAIN here: no-space just means that we
* dropped the packet: our ordinary retransmit mechanism will
* handle that case
*/
if (n < pkt->len)
write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq);
}
/*
* setAckSendParam
* Set the ack sending parameters.
*/
static inline void
setAckSendParam(AckSendParam *param, MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq)
{
memcpy(&param->msg, (char *)&conn->conn_info, sizeof(icpkthdr));
param->msg.flags = flags;
param->msg.seq = seq;
param->msg.extraSeq = extraSeq;
param->msg.len = sizeof(icpkthdr);
param->peer = conn->peer;
param->peer_len = conn->peer_len;
}
/*
* sendAckWithParam
* Send acknowledgment to sender.
*/
static inline void
sendAckWithParam(AckSendParam *param)
{
sendControlMessage(&param->msg, UDP_listenerFd, (struct sockaddr *)&param->peer, param->peer_len);
}
/*
* sendAck
* Send acknowledgment to sender.
*/
static void
sendAck(MotionConn *conn, int32 flags, uint32 seq, uint32 extraSeq)
{
icpkthdr msg;
memcpy(&msg, (char *)&conn->conn_info, sizeof(msg));
msg.flags = flags;
msg.seq = seq;
msg.extraSeq = extraSeq;
msg.len = sizeof(icpkthdr);
#ifdef AMS_VERBOSE_LOGGING
write_log("sendack: flags 0x%x node %d route %d seq %d extraSeq %d",
msg.flags, msg.motNodeId, conn->route, msg.seq, msg.extraSeq);
#endif
sendControlMessage(&msg, UDP_listenerFd, (struct sockaddr *)&conn->peer, conn->peer_len);
}
/*
* sendDisorderAck
* Send a disorder message to the sender.
*
* Whenever the receiver detects a disorder packet, it will assemble a disorder message
* which contains the sequence numbers of the possibly lost packets.
*
*/
static void
sendDisorderAck(MotionConn *conn, uint32 seq, uint32 extraSeq, uint32 lostPktCnt)
{
icpkthdr *disorderBuffer = rx_control_info.disorderBuffer;
memcpy(disorderBuffer, (char *)&conn->conn_info, sizeof(icpkthdr));
disorderBuffer->flags |= UDPIC_FLAGS_DISORDER;
disorderBuffer->seq = seq;
disorderBuffer->extraSeq = extraSeq;
disorderBuffer->len = lostPktCnt * sizeof(uint32) + sizeof(icpkthdr);
#ifdef AMS_VERBOSE_LOGGING
if (!(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6))
{
write_log("UDP Interconnect bug (in sendDisorderAck): trying to send ack when we don't know where to send to %s", conn->remoteHostAndPort);
}
#endif
sendControlMessage(disorderBuffer, UDP_listenerFd, (struct sockaddr *)&conn->peer, conn->peer_len);
}
/*
* sendStatusQueryMessage
* Used by senders to send a status query message for a connection to receivers.
*
* When receivers get such a message, they will respond with
* the connection status (consumed seq, received seq ...).
*/
static void
sendStatusQueryMessage(MotionConn *conn, int fd, uint32 seq)
{
icpkthdr msg;
memcpy(&msg, (char *)&conn->conn_info, sizeof(msg));
msg.flags = UDPIC_FLAGS_CAPACITY;
msg.seq = seq;
msg.extraSeq = 0;
msg.len = sizeof(msg);
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_ACK_PKT_QUERY, conn, &msg);
#endif
sendControlMessage(&msg, fd, (struct sockaddr *)&conn->peer, conn->peer_len);
}
/*
* putRxBufferAndSendAck
* Return a buffer and send an acknowledgment.
*
* SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
*/
static void
putRxBufferAndSendAck(MotionConn *conn, AckSendParam *param)
{
icpkthdr *buf=NULL;
buf = (icpkthdr *)conn->pkt_q[conn->pkt_q_head];
uint32 seq = buf->seq;
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "putRxBufferAndSendAck conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
if (buf == NULL)
{
pthread_mutex_unlock(&ic_control_info.lock);
elog(FATAL, "putRxBufferAndSendAck: buffer is NULL");
}
conn->pkt_q[conn->pkt_q_head] = NULL;
conn->pBuff = NULL;
conn->pkt_q_head = (conn->pkt_q_head + 1) % Gp_interconnect_queue_depth;
conn->pkt_q_size--;
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "putRxBufferAndSendAck conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
putRxBufferToFreeList(&rx_buffer_pool, buf);
conn->conn_info.extraSeq = seq;
/* Send an Ack to the sender. */
if ((seq % 2 == 0) || (Gp_interconnect_queue_depth == 1))
{
if (param != NULL)
{
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, seq);
}
else
{
sendAck(conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, seq);
}
}
}
/*
* MlPutRxBuffer
*
* The cdbmotion code has discarded our pointer to the motion-conn
* structure, but has enough info to fully specify it.
*/
void
MlPutRxBuffer(ChunkTransportState *transportStates, int motNodeID, int route)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
AckSendParam param;
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + route;
memset(&param, 0, sizeof(AckSendParam));
pthread_mutex_lock(&ic_control_info.lock);
if (conn->pBuff != NULL)
{
putRxBufferAndSendAck(conn, &param);
}
else
{
pthread_mutex_unlock(&ic_control_info.lock);
elog(FATAL, "Interconnect error: tried to release a NULL buffer");
}
pthread_mutex_unlock(&ic_control_info.lock);
/* real ack sending is after lock release to decrease the lock holding time. */
if (param.msg.len != 0)
sendAckWithParam(&param);
}
/*
* initRxBufferPool
* Initialize receive buffer pool.
*/
static void
initRxBufferPool(RxBufferPool *p)
{
p->count = 0;
p->maxCount = 1;
p->freeList = NULL;
}
/*
* getRxBuffer
* Get a receive buffer.
*
* SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static icpkthdr *
getRxBuffer(RxBufferPool *p)
{
icpkthdr *ret = NULL;
#ifdef USE_ASSERT_CHECKING
if (FINC_HAS_FAULT(FINC_RX_BUF_NULL) &&
testmode_inject_fault(gp_udpic_fault_inject_percent))
return NULL;
#endif
do
{
if (p->freeList == NULL)
{
if (p->count > p->maxCount)
{
if (DEBUG3 >= log_min_messages)
write_log("Interconnect ran out of rx-buffers count/max %d/%d", p->count, p->maxCount);
break;
}
/* malloc is used for thread safty. */
ret = (icpkthdr *)malloc(Gp_max_packet_size);
/*
* Note: we return NULL if the malloc() fails -- and the
* background thread will set the error. Main thread will
* check the error, report it and start teardown.
*/
if (ret != NULL)
p->count++;
break;
}
/* we have buffers available in our freelist */
ret = getRxBufferFromFreeList(p);
} while (0);
return ret;
}
/*
* putRxBufferToFreeList
* Return a receive buffer to free list
*
* SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
*/
static inline void
putRxBufferToFreeList(RxBufferPool *p, icpkthdr *buf)
{
/* return the buffer into the free list. */
*(char **)buf = p->freeList;
p->freeList = (char *)buf;
}
/*
* getRxBufferFromFreeList
* Get a receive buffer from free list
*
* SHOULD BE CALLED WITH rx_control_info.lock *LOCKED*
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static inline icpkthdr*
getRxBufferFromFreeList(RxBufferPool *p)
{
icpkthdr *buf = NULL;
buf = (icpkthdr *) p->freeList;
p->freeList = *(char **) (p->freeList);
return buf;
}
/*
* freeRxBuffer
* Free a receive buffer.
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static inline void
freeRxBuffer(RxBufferPool *p, icpkthdr *buf)
{
free(buf);
p->count--;
}
/*
* setSocketBufferSize
* Set socket buffer size.
*/
static uint32
setSocketBufferSize(int fd, int type, int expectedSize, int leastSize)
{
int bufSize;
int errnoSave;
socklen_t skLen=0;
const char *fun;
fun = "getsockopt";
skLen = sizeof(bufSize);
if (getsockopt(fd, SOL_SOCKET, type, (char *)&bufSize, &skLen) < 0)
goto error;
elog(DEBUG1, "UDP-IC: xmit default buffer size %d bytes", bufSize);
/*
* We'll try the expected size first, and fall back to least size if that doesn't work.
*/
bufSize = expectedSize;
fun = "setsockopt";
while (setsockopt(fd, SOL_SOCKET, type, (const char *)&bufSize, skLen) < 0)
{
bufSize = bufSize >> 1;
if (bufSize < leastSize)
goto error;
}
elog(DEBUG1, "UDP-IC: xmit use buffer size %d bytes", bufSize);
return bufSize;
error:
errnoSave = errno;
if (fd >= 0)
closesocket(fd);
errno = errnoSave;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not set up udp listener socket."),
errdetail("%m%s", fun)));
/* Make GCC not complain. */
return 0;
}
/*
* setXmitSocketOptions
* Set transmit socket options.
*/
static void
setXmitSocketOptions(int txfd)
{
uint32 bufSize = 0;
/*
* The Gp_udp_bufsize_k guc should be set carefully.
*
* If it is small, such as 128K, and send queue depth and receive queue depth are large,
* then it is possible OS can not handle all of the UDP packets GPDB delivered to it.
* OS will introduce a lot of packet losses and disordered packets.
*
* In order to set Gp_udp_bufsize_k to a larger value, the OS UDP buffer should be set to
* a large enough value.
*
*/
bufSize = (Gp_udp_bufsize_k != 0 ? Gp_udp_bufsize_k * 1024 : 2048 * 1024);
ic_control_info.socketRecvBufferSize = setSocketBufferSize(txfd, SO_RCVBUF, bufSize, 128 * 1024);
ic_control_info.socketSendBufferSize = setSocketBufferSize(txfd, SO_SNDBUF, bufSize, 128 * 1024);
}
#ifdef USE_ASSERT_CHECKING
/*
* icBufferListLog
* Log the buffer list.
*/
static void
icBufferListLog(ICBufferList *list)
{
write_log("Length %d, type %d headptr %p", list->length, list->type, &list->head);
ICBufferLink *bufLink = list->head.next;
int len = list->length;
int i = 0;
while (bufLink != &list->head && len > 0)
{
ICBuffer *buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
: GET_ICBUFFER_FROM_SECONDARY(bufLink));
write_log("Node %d, linkptr %p", i++, bufLink);
logPkt("from list", buf->pkt);
bufLink = bufLink->next;
len--;
}
}
/*
* icBufferListCheck
* Buffer list sanity check.
*/
static void
icBufferListCheck(char * prefix, ICBufferList *list)
{
if (list == NULL)
{
write_log("ICBufferList ERROR %s: NULL list", prefix);
goto error;
}
if (list->length < 0)
{
write_log("ICBufferList ERROR %s: list length %d < 0 ", prefix, list->length);
goto error;
}
if (list->length == 0 && (list->head.prev != list->head.next && list->head.prev != &list->head))
{
write_log("ICBufferList ERROR %s: length is 0, &list->head %p, prev %p, next %p", prefix, &list->head, list->head.prev, list->head.next);
icBufferListLog(list);
goto error;
}
int len = list->length;
ICBufferLink *link = list->head.next;
while (len > 0)
{
link = link->next;
len--;
}
if (link != &list->head)
{
write_log("ICBufferList ERROR: %s len %d", prefix, list->length);
icBufferListLog(list);
goto error;
}
return;
error:
write_log("wait for 120s and then abort.");
pg_usleep(120000000);
abort();
}
#endif
/*
* icBufferListInitHeadLink
* Initialize the pointers in the head link to point to itself.
*/
static inline void
icBufferListInitHeadLink(ICBufferLink *link)
{
link->next = link->prev = link;
}
/*
* icBufferListInit
* Initialize the buffer list with the given type.
*/
static inline void
icBufferListInit(ICBufferList *list, ICBufferListType type)
{
list->type = type;
list->length = 0;
icBufferListInitHeadLink(&list->head);
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListInit", list);
#endif
}
/*
* icBufferListIsHead
* Return whether the given link is the head link of the list.
*
* This function is often used as the end condition of an iteration of the list.
*/
static inline bool
icBufferListIsHead(ICBufferList *list, ICBufferLink *link)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListIsHead", list);
#endif
return (link == &list->head);
}
/*
* icBufferListFirst
* Return the first link after the head link.
*
* Note that the head link is a pseudo link used to only to ease the operations of the link list.
* If the list only contains the head link, this function will return the head link.
*/
static inline ICBufferLink *
icBufferListFirst(ICBufferList *list)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListFirst", list);
#endif
return list->head.next;
}
/*
* icBufferListLength
* Get the list length.
*/
static inline int
icBufferListLength(ICBufferList *list)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListLength", list);
#endif
return list->length;
}
/*
* icBufferListDelete
* Remove an buffer from the buffer list and return the buffer.
*/
static inline ICBuffer *
icBufferListDelete(ICBufferList *list, ICBuffer *buf)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListDelete", list);
#endif
ICBufferLink *bufLink = NULL;
bufLink = (list->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary);
bufLink->prev->next = bufLink->next;
bufLink->next->prev = bufLink->prev;
list->length--;
return buf;
}
/*
* icBufferListPop
* Remove the head buffer from the list.
*/
static inline ICBuffer *
icBufferListPop(ICBufferList *list)
{
ICBuffer *buf = NULL;
ICBufferLink *bufLink = NULL;
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListPop", list);
#endif
if (list->length == 0)
return NULL;
bufLink = icBufferListFirst(list);
buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
: GET_ICBUFFER_FROM_SECONDARY(bufLink));
bufLink->prev->next = bufLink->next;
bufLink->next->prev = bufLink->prev;
list->length--;
return buf;
}
/*
* icBufferListFree
* Free all the buffers in the list.
*/
static void
icBufferListFree(ICBufferList *list)
{
ICBuffer *buf = NULL;
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListFree", list);
#endif
while ((buf = icBufferListPop(list)) != NULL)
pfree(buf);
}
/*
* icBufferListAppend
* Append a buffer to a list.
*/
static inline ICBuffer *
icBufferListAppend(ICBufferList *list, ICBuffer *buf)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListAppend", list);
#endif
ICBufferLink *bufLink = NULL;
bufLink = (list->type == ICBufferListType_Primary ? &buf->primary : &buf->secondary);
bufLink->prev = list->head.prev;
bufLink->next = &list->head;
list->head.prev->next = bufLink;
list->head.prev = bufLink;
list->length++;
return buf;
}
/*
* icBufferListReturn
* Return the buffers in the list to the free buffer list.
*
* If the buf is also in an expiration queue, we also need to remove it from the expiration queue.
*
*/
static void
icBufferListReturn(ICBufferList *list, bool inExpirationQueue)
{
#ifdef USE_ASSERT_CHECKING
icBufferListCheck("icBufferListReturn", list);
#endif
ICBuffer *buf = NULL;
while ((buf = icBufferListPop(list)) != NULL )
{
if (inExpirationQueue) /* the buf is in also in the expiration queue */
{
icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
unack_queue_ring.numOutStanding--;
if (icBufferListLength(list) >= 1)
unack_queue_ring.numSharedOutStanding--;
}
icBufferListAppend(&snd_buffer_pool.freeList, buf);
}
}
/*
* initUnackQueueRing
* Initialize an unack queue ring.
*
* Align current time to a slot boundary and set current slot index (time pointer) to 0.
*/
static void
initUnackQueueRing(UnackQueueRing *uqr)
{
int i = 0;
uqr->currentTime = getCurrentTime();
uqr->currentTime = uqr->currentTime - (uqr->currentTime % TIMER_SPAN);
uqr->idx = 0;
uqr->numOutStanding = 0;
uqr->numSharedOutStanding = 0;
for(; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
{
icBufferListInit(&uqr->slots[i], ICBufferListType_Secondary);
}
}
/*
* computeExpirationPeriod
* Compute expiration period according to the connection information.
*
* Considerations on expiration period computation:
*
* RTT is dynamically computed, and expiration period is based on RTT values.
* We cannot simply use RTT as the expiration value, since real workload does
* not always have a stable RTT. A small constant value is multiplied to the RTT value
* to make the resending logic insensitive to the frequent small changes of RTT.
*
*/
static inline uint64
computeExpirationPeriod(MotionConn *conn, uint32 retry)
{
/*
* In fault injection mode, we often use DEFAULT_RTT,
* because the intentional large percent of packet/ack losses will make
* the RTT too large. This will leads to a slow retransmit speed.
* In real hardware environment/workload, we do not expect such a packet loss pattern.
*/
#ifdef USE_ASSERT_CHECKING
if (udp_testmode)
{
return DEFAULT_RTT;
}
else
#endif
{
uint32 factor = (retry <= 12 ? retry : 12);
return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, (conn->rtt + (conn->dev << 2)) << (factor)));
}
}
/*
* initSndBufferPool
* Initialize the send buffer pool.
*
* The initial maxCount is set to 1 for gp_interconnect_snd_queue_depth = 1 case,
* then there is at least an extra free buffer to send for that case.
*/
static void
initSndBufferPool(SendBufferPool *p)
{
icBufferListInit(&p->freeList, ICBufferListType_Primary);
p->count = 0;
p->maxCount = (Gp_interconnect_snd_queue_depth == 1 ? 1 : 0);
}
/*
* cleanSndBufferPool
* Clean the send buffer pool.
*/
static inline void
cleanSndBufferPool(SendBufferPool *p)
{
icBufferListFree(&p->freeList);
p->count = 0;
p->maxCount = 0;
}
/*
* getSndBuffer
* Get a send buffer for a connection.
*
* Different flow control mechanisms use different buffer management policies.
* Capacity based flow control uses per-connection buffer policy and Loss based
* flow control uses shared buffer policy.
*
* Return NULL when no free buffer available.
*/
static ICBuffer *
getSndBuffer(MotionConn *conn)
{
ICBuffer *ret = NULL;
ic_statistics.totalBuffers += (icBufferListLength(&snd_buffer_pool.freeList) + snd_buffer_pool.maxCount - snd_buffer_pool.count);
ic_statistics.bufferCountingTime++;
/* Capacity based flow control does not use shared buffers */
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY)
{
Assert(icBufferListLength(&conn->unackQueue) + icBufferListLength(&conn->sndQueue) <= Gp_interconnect_snd_queue_depth);
if (icBufferListLength(&conn->unackQueue) + icBufferListLength(&conn->sndQueue) >= Gp_interconnect_snd_queue_depth)
return NULL;
}
if (icBufferListLength(&snd_buffer_pool.freeList) > 0)
{
return icBufferListPop(&snd_buffer_pool.freeList);
}
else
{
if (snd_buffer_pool.count < snd_buffer_pool.maxCount)
{
ret = (ICBuffer *) palloc0(Gp_max_packet_size + sizeof(ICBuffer));
snd_buffer_pool.count++;
ret->conn = NULL;
ret->nRetry = 0;
icBufferListInitHeadLink(&ret->primary);
icBufferListInitHeadLink(&ret->secondary);
ret->unackQueueRingSlot = 0;
}
else
{
return NULL;
}
}
return ret;
}
/*
* The udp interconnect specific implementation of timeout wait/signal mechanism.
* (Only for MacOS)
*
* The introduction of this is due to the bug in pthread_cond_wait/pthread_cond_timedwait_relative_np
* on MacOs. (MPP-9910).
*
* The implementation of the signal mechanism is based on UDP protocol. Waiting thread is polling on
* a UDP socket, and wakes up thread will send a signal id to the socket when the condition is met.
*
* Due to the reliability of UDP protocol (packet loss, duplicate, interrupted system calls, error
* return of system calls...), the waiting thread should:
*
* 1) check the condition again when it is waken up
* 2) when the time wait return with timeout, it should check the condition again.
*
* It is not necessary to implement a retransmit/ack mechanism because the local socket is relatively
* reliable and the communication load is light.
*
* ##NOTE: This implementation is specific for UDP interconnect use, and it is not portable. Developers
* should pay attention to add another condition variable.
*/
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
/*
* udpSignalTimeoutWait
* Timeout wait implementation on Mac.
*
* Return 0 on success (condition met) and return ETIMEOUT on timeout/other errors.
*
* Note that this implementation is UDP interconnect specific and not for general usage.
* It depends on the udp socket built in InitMotionUDP.
*
* Can only be used in Main thread.
*
*/
static int
udpSignalTimeoutWait(UDPSignal *sig, pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *ts)
{
Assert(sig != NULL && mutex != NULL && ts != NULL);
int timeout = ts->tv_nsec/1000/1000;
Assert(timeout >= 0);
pthread_mutex_unlock(mutex);
sig->sigId = (void *)cond;
bool ret = ETIMEDOUT;
if (udpSignalPoll(sig, timeout))
{
if (udpSignalGet(sig))
ret = 0;
}
sig->sigId = NULL;
pthread_mutex_lock(mutex);
return ret;
}
/*
* udpSignal
*
* The udp interconnect specific implementation of pthread_cond_signal.
*
*/
static void
udpSignal(UDPSignal *sig)
{
int n;
char buf[16];
#ifdef USE_ASSERT_CHECKING
int percent = gp_udpic_dropacks_percent > 0 ? 1 : 0;
if (testmode_inject_fault(percent))
{
#ifdef AMS_VERBOSE_LOGGING
write_log("THROW SIGNAL with value %p", sig->sigId);
#endif
return;
}
#endif
xmit_retry:
*((void **)buf) = sig->sigId;
n = sendto(sig->fd, buf, sizeof(sig->sigId), 0,
(struct sockaddr *)&sig->peer, sig->peer_len);
if (n < 0)
{
if (errno == EINTR)
goto xmit_retry;
if (errno == EAGAIN) /* no space ? not an error. */
return;
/* Error case, this may happen in both main thread and background thread,
* treat it like in background thread. Finally, main thread will find this error.
*/
write_log("udpsignal failed fd %d: got error %d errno %d signal %p", sig->fd, n, errno, sig->sigId);
setRxThreadError(errno);
return;
/* not reached */
}
if (n != sizeof(int))
write_log("udpsignal failed fd %d: got error %d errno %d signal %p", sig->fd, n, errno, sig->sigId);
}
/*
* setupUDPSignal
* Setup the socket needed by the signal.
*
* Can only be used in Main thread.
*/
static void
setupUDPSignal(UDPSignal *sig)
{
Assert(sig != NULL);
uint16 port;
setupUDPListeningSocket(&sig->fd, &port, &sig->txFamily);
sig->port = port;
getSockAddr(&sig->peer, &sig->peer_len, "127.0.0.1", port);
sig->sigId = NULL;
Assert(sig->peer.ss_family == AF_INET || sig->peer.ss_family == AF_INET6);
}
/*
* destroyUDPSignal
* Destroy the signal.
*
* Can only be used in Main thread.
*/
static void
destroyUDPSignal(UDPSignal *sig)
{
Assert(sig != NULL);
if (sig->fd >= 0)
closesocket(sig->fd);
sig->sigId = NULL;
sig->fd = -1;
sig->port = 0;
}
/*
* udpSignalGet
* Try to get the signal from the socket.
*
* Note: Can only be called from main thread.
*/
static bool
udpSignalGet(UDPSignal *sig)
{
int n;
#define SIGNAL_BUFFER_SIZE 16
struct sockaddr_storage peer;
socklen_t peerlen;
char buf[SIGNAL_BUFFER_SIZE];
for (;;)
{
/* ready to read on our socket ? */
peerlen = sizeof(peer);
n = recvfrom(sig->fd, buf, SIGNAL_BUFFER_SIZE, 0, (struct sockaddr *)&peer, &peerlen);
if (n < 0)
{
/* had nothing to read. */
if (errno == EWOULDBLOCK)
return false;
if (errno == EINTR)
continue;
elog(ERROR, "Interconnect error: getting signal from socket buffer failed.");
/* not reached. */
return false;
}
if (n != sizeof(sig->sigId))
{
continue;
}
if (*((void **)buf) == sig->sigId)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("Get signal %p", *((void **)buf));
#endif
return true;
}
}
return false;
}
/*
* udpSignalPoll
* Timeout (in ms) polling of signal packets.
*
*
* Note: Can only be called from main thread.
*/
static bool
udpSignalPoll(UDPSignal *sig, int timeout)
{
struct pollfd nfd;
int n;
nfd.fd = sig->fd;
nfd.events = POLLIN;
n = poll(&nfd, 1, timeout);
if (n < 0)
{
if (errno == EINTR)
return false;
elog(ERROR, "Interconnect error: signal polling failed.");
/* not reached */
}
/* timeout */
if (n == 0)
{
return false;
}
/* got some signal in the buffer. */
if (n == 1 && (nfd.events & POLLIN))
{
return true;
}
return false;
}
#endif
/*
* startOutgoingUDPConnections
* Used to initially kick-off any outgoing connections for mySlice.
*
* This should not be called for root slices (i.e. QD ones) since they don't
* ever have outgoing connections.
*
* PARAMETERS
*
* sendSlice - Slice that this process is a member of.
*
* RETURNS
* Initialized ChunkTransportState for the Sending Motion Node Id.
*/
ChunkTransportStateEntry *
startOutgoingUDPConnections(ChunkTransportState *transportStates,
Slice *sendSlice,
int *pOutgoingCount)
{
ChunkTransportStateEntry *pEntry;
MotionConn *conn;
ListCell *cell;
Slice *recvSlice;
CdbProcess *cdbProc;
int i;
uint16 port = 0;
*pOutgoingCount = 0;
recvSlice = (Slice *) list_nth(transportStates->sliceTable->slices, sendSlice->parentIndex);
/*
* Potentially introduce a Bug (MPP-17186).
* The workaround is to turn off log_hostname guc.
*/
adjustMasterRouting(recvSlice);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d setting up sending motion node",
GetQEIndex(), sendSlice->sliceIndex);
pEntry = createChunkTransportState(transportStates,
sendSlice,
recvSlice,
list_length(recvSlice->primaryProcesses));
Assert(pEntry && pEntry->valid);
/*
* Setup a MotionConn entry for each of our outbound connections.
* Request a connection to each receiving backend's listening port.
* NB: Some mirrors could be down & have no CdbProcess entry.
*/
conn = pEntry->conns;
i = 0;
foreach(cell, recvSlice->primaryProcesses)
{
cdbProc = (CdbProcess *)lfirst(cell);
if (cdbProc)
{
conn->cdbProc = cdbProc;
icBufferListInit(&conn->sndQueue, ICBufferListType_Primary);
icBufferListInit(&conn->unackQueue, ICBufferListType_Primary);
conn->capacity = Gp_interconnect_queue_depth;
/* send buffer pool must be initialized before this. */
snd_buffer_pool.maxCount += Gp_interconnect_snd_queue_depth;
snd_control_info.cwnd += 1;
conn->curBuff = getSndBuffer(conn);
/* should have at least one buffer for each connection */
Assert(conn->curBuff != NULL);
conn->rtt = DEFAULT_RTT;
conn->dev = DEFAULT_DEV;
conn->deadlockCheckBeginTime = 0;
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
conn->sentSeq = 0;
conn->receivedAckSeq = 0;
conn->consumedSeq = 0;
conn->pBuff = (uint8 *)conn->curBuff->pkt;
conn->state = mcsSetupOutgoingConnection;
conn->route = i++;
conn->waitEOS = false;
(*pOutgoingCount)++;
}
conn++;
}
pEntry->txfd = -1;
pEntry->txport = 0;
setupUDPListeningSocket(&pEntry->txfd, &port, &pEntry->txfd_family);
pEntry->txport = port;
return pEntry;
}
/*
* getSockAddr
* Convert IP addr and port to sockaddr
*/
static void
getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort)
{
int ret;
char portNumberStr[32];
char *service;
struct addrinfo *addrs = NULL;
struct addrinfo hint;
/*
* Get socketaddr to connect to.
*/
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_DGRAM; /* UDP */
hint.ai_family = AF_UNSPEC; /* Allow for any family (v4, v6, even unix in the future) */
#ifdef AI_NUMERICSERV
hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name resolution */
#else
hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
#endif
snprintf(portNumberStr, sizeof(portNumberStr), "%d", listenerPort);
service = portNumberStr;
ret = pg_getaddrinfo_all(listenerAddr, service, &hint, &addrs);
if (ret || !addrs)
{
if (addrs)
pg_freeaddrinfo_all(hint.ai_family, addrs);
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not parse remote listener"
"address: '%s' port '%d': %s", listenerAddr,listenerPort,gai_strerror(ret)),
errdetail("getaddrinfo() unable to parse address: '%s'",
listenerAddr)));
return;
}
/* Since we aren't using name resolution, getaddrinfo will return only 1 entry */
elog(DEBUG1,"GetSockAddr socket ai_family %d ai_socktype %d ai_protocol %d for %s ",addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol, listenerAddr);
memset(peer, 0, sizeof(struct sockaddr_storage));
memcpy(peer, addrs->ai_addr, addrs->ai_addrlen);
*peer_len = addrs->ai_addrlen;
pg_freeaddrinfo_all(addrs->ai_family, addrs);
}
/*
* setupOutgoingUDPConnection
* Setup outgoing UDP connection.
*/
void
setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
CdbProcess *cdbProc = conn->cdbProc;
Assert(conn->state == mcsSetupOutgoingConnection);
Assert(conn->cdbProc);
conn->wakeup_ms = 0;
conn->remoteContentId = cdbProc->contentid;
conn->stat_min_ack_time = ~((uint64)0);
/* Save the information for the error message if getaddrinfo fails */
if (strchr(cdbProc->listenerAddr,':') != 0)
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"[%s]:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
else
snprintf(conn->remoteHostAndPort, sizeof(conn->remoteHostAndPort),
"%s:%d", cdbProc->listenerAddr, cdbProc->listenerPort);
/*
* Get socketaddr to connect to.
*/
getSockAddr(&conn->peer, &conn->peer_len, cdbProc->listenerAddr, cdbProc->listenerPort);
/* Save the destination IP address */
formatSockAddr((struct sockaddr *)&conn->peer, conn->remoteHostAndPort,
sizeof(conn->remoteHostAndPort));
Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6 );
{
#ifdef USE_ASSERT_CHECKING
{
struct sockaddr_storage source_addr;
socklen_t source_addr_len;
MemSet(&source_addr, 0, sizeof(source_addr));
source_addr_len = sizeof(source_addr);
if (getsockname(pEntry->txfd, (struct sockaddr *) &source_addr, &source_addr_len) == -1)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect Error: Could not get port from socket."),
errdetail("%m")));
}
Assert(pEntry->txfd_family == source_addr.ss_family);
}
#endif
/*
* If the socket was created with a different address family than the place we
* are sending to, we might need to do something special.
*/
if (pEntry->txfd_family != conn->peer.ss_family)
{
/*
* If the socket was created AF_INET6, but the address we want to send to is IPv4 (AF_INET),
* we might need to change the address format. On Linux, it isn't necessary: glibc automatically
* handles this. But on MAC OSX and Solaris, we need to convert the IPv4 address to an
* V4-MAPPED address in AF_INET6 format.
*/
if (pEntry->txfd_family == AF_INET6)
{
struct sockaddr_storage temp;
const struct sockaddr_in *in = (const struct sockaddr_in *)&conn->peer;
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *)&temp;
memset(&temp, 0, sizeof(temp));
elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address.");
/* Construct a V4-to-6 mapped address. */
temp.ss_family = AF_INET6;
in6_new->sin6_family = AF_INET6;
in6_new->sin6_port = in->sin_port;
in6_new->sin6_flowinfo = 0;
memset (&in6_new->sin6_addr, '\0', sizeof (in6_new->sin6_addr));
//in6_new->sin6_addr.s6_addr16[5] = 0xffff;
((uint16 *)&in6_new->sin6_addr)[5] = 0xffff;
//in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr;
memcpy(((char *)&in6_new->sin6_addr)+12,&(in->sin_addr),4);
in6_new->sin6_scope_id = 0;
/* copy it back */
memcpy(&conn->peer,&temp,sizeof(struct sockaddr_in6));
conn->peer_len = sizeof(struct sockaddr_in6);
}
else
{
/*
* If we get here, something is really wrong. We created the socket as IPv4-only (AF_INET),
* but the address we are trying to send to is IPv6. It's possible we could have a V4-mapped
* address that we could convert to an IPv4 address, but there is currently no code path where
* that could happen. So this must be an error.
*/
elog(ERROR, "Trying to use an IPv4 (AF_INET) socket to send to an IPv6 address");
}
}
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("Interconnect connecting to seg%d slice%d %s "
"pid=%d sockfd=%d",
conn->remoteContentId,
pEntry->recvSlice->sliceIndex,
conn->remoteHostAndPort,
conn->cdbProc->pid,
conn->sockfd)));
/* send connection request */
MemSet(&conn->conn_info, 0, sizeof(conn->conn_info));
conn->conn_info.len = 0;
conn->conn_info.flags = 0;
conn->conn_info.motNodeId = pEntry->motNodeId;
conn->conn_info.recvSliceIndex = pEntry->recvSlice->sliceIndex;
conn->conn_info.sendSliceIndex = pEntry->sendSlice->sliceIndex;
conn->conn_info.srcContentId = GetQEIndex();
conn->conn_info.dstContentId = conn->cdbProc->contentid;
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "setupOutgoingUDPConnection: node %d route %d srccontent %d dstcontent %d: %s",
pEntry->motNodeId, conn->route, GetQEIndex(), conn->cdbProc->contentid, conn->remoteHostAndPort);
conn->conn_info.srcListenerPort = (Gp_listener_port>>16) & 0x0ffff;
conn->conn_info.srcPid = MyProcPid;
conn->conn_info.dstPid = conn->cdbProc->pid;
conn->conn_info.dstListenerPort = conn->cdbProc->listenerPort;
conn->conn_info.sessionId = gp_session_id;
conn->conn_info.icId = gp_interconnect_id;
connAddHash(&ic_control_info.connHtab, conn);
/*
* No need to get the connection lock here, since background rx thread will never access send connections.
*/
conn->msgPos = NULL;
conn->msgSize = sizeof(conn->conn_info);
conn->stillActive = true;
conn->conn_info.seq = 1;
Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6 );
} /* setupOutgoingUDPConnection */
/*
* checkForCancelFromQD
* Check for cancel from QD.
*
* Should be called only inside the dispatcher
*/
static void
checkForCancelFromQD(ChunkTransportState *pTransportStates)
{
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(pTransportStates);
Assert(pTransportStates->estate);
if (dispatcher_has_error(pTransportStates->estate->dispatch_data))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg(CDB_MOTION_LOST_CONTACT_STRING)));
/* not reached */
}
}
/*
* handleCachedPackets
* Deal with cached packets.
*/
static void
handleCachedPackets(void)
{
MotionConn *cachedConn = NULL;
MotionConn *setupConn = NULL;
ConnHtabBin *bin = NULL;
icpkthdr *pkt = NULL;
AckSendParam param;
int i = 0;
int j = 0;
for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
{
bin = ic_control_info.startupCacheHtab.table[i];
while (bin)
{
cachedConn = bin->conn,
setupConn = NULL;
for (j = 0; j < cachedConn->pkt_q_size; j++)
{
pkt = (icpkthdr *) cachedConn->pkt_q[j];
if (pkt == NULL)
continue;
rx_buffer_pool.maxCount--;
/* look up this pkt's connection in connHtab */
setupConn = findConnByHeader(&ic_control_info.connHtab, pkt);
if (setupConn == NULL)
{
/* mismatch! */
putRxBufferToFreeList(&rx_buffer_pool, pkt);
cachedConn->pkt_q[j] = NULL;
continue;
}
memset(&param, 0, sizeof(param));
if (!handleDataPacket(setupConn, pkt, &cachedConn->peer, &cachedConn->peer_len, &param))
{
/* no need to cache this packet */
putRxBufferToFreeList(&rx_buffer_pool, pkt);
}
ic_statistics.recvPktNum++;
if (param.msg.len != 0)
sendAckWithParam(&param);
cachedConn->pkt_q[j] = NULL;
}
bin = bin->next;
connDelHash(&ic_control_info.startupCacheHtab, cachedConn);
/* MPP-19981
* free the cached connections; otherwise memory leak
* would be introduced.
*/
free(cachedConn->pkt_q);
free(cachedConn);
}
}
}
/*
* SetupUDPInterconnect_Internal
* Internal function for setting up UDP interconnect.
*/
static void
SetupUDPInterconnect_Internal(EState *estate)
{
int i, n;
ListCell *cell;
Slice *mySlice;
Slice *aSlice;
MotionConn *conn=NULL;
int incoming_count = 0;
int outgoing_count = 0;
int expectedTotalIncoming = 0;
int expectedTotalOutgoing = 0;
ChunkTransportStateEntry *sendingChunkTransportState = NULL;
pthread_mutex_lock(&ic_control_info.lock);
gp_interconnect_id = estate->es_sliceTable->ic_instance_id;
Assert(gp_interconnect_id > 0);
estate->interconnect_context = palloc0(sizeof(ChunkTransportState));
/* add back-pointer for dispatch check. */
estate->interconnect_context->estate = estate;
/* initialize state variables */
Assert(estate->interconnect_context->size == 0);
estate->interconnect_context->size = CTS_INITIAL_SIZE;
estate->interconnect_context->states = palloc0(CTS_INITIAL_SIZE * sizeof(ChunkTransportStateEntry));
estate->interconnect_context->teardownActive = false;
estate->interconnect_context->activated = false;
estate->interconnect_context->incompleteConns = NIL;
estate->interconnect_context->sliceTable = NULL;
estate->interconnect_context->sliceId = -1;
estate->interconnect_context->sliceTable = estate->es_sliceTable;
estate->interconnect_context->sliceId = LocallyExecutingSliceIndex(estate);
estate->interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromUDP;
estate->interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyUDP;
estate->interconnect_context->SendEos = SendEosUDP;
estate->interconnect_context->SendChunk = SendChunkUDP;
estate->interconnect_context->doSendStopMessage = doSendStopMessageUDP;
mySlice = (Slice *) list_nth(estate->interconnect_context->sliceTable->slices, LocallyExecutingSliceIndex(estate));
Assert(mySlice &&
IsA(mySlice, Slice) &&
mySlice->sliceIndex == LocallyExecutingSliceIndex(estate));
#ifdef USE_ASSERT_CHECKING
if (gp_udpic_dropseg != UNDEF_SEGMENT
|| gp_udpic_dropacks_percent != 0
|| gp_udpic_dropxmit_percent != 0
|| gp_udpic_fault_inject_percent != 0)
udp_testmode = true;
else
udp_testmode = false;
#endif
if (Gp_role == GP_ROLE_DISPATCH)
{
DistributedTransactionId distTransId = 0;
TransactionId localTransId = 0;
TransactionId subtransId = 0;
GetAllTransactionXids(&(distTransId),
&(localTransId),
&(subtransId));
/*
* Prune only when we are not in the save transaction and there is a large number
* of entries in the table
*/
if (distTransId != rx_control_info.lastDXatId && rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE))
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, gp_interconnect_id);
pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, gp_interconnect_id);
}
addCursorIcEntry(&rx_control_info.cursorHistoryTable, gp_interconnect_id, gp_command_count);
/* save the latest transaction id. */
rx_control_info.lastDXatId = distTransId;
}
/* now we'll do some setup for each of our Receiving Motion Nodes. */
foreach(cell, mySlice->children)
{
int numProcs;
int childId = lfirst_int(cell);
ChunkTransportStateEntry *pEntry=NULL;
int numValidProcs = 0;
aSlice = (Slice *) list_nth(estate->interconnect_context->sliceTable->slices, childId);
numProcs = list_length(aSlice->primaryProcesses);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Setup recving connections: my slice %d, childId %d",
mySlice->sliceIndex, childId);
pEntry = createChunkTransportState(estate->interconnect_context, aSlice, mySlice, numProcs);
Assert(pEntry);
Assert(pEntry->valid);
for (i=0; i < pEntry->numConns; i++)
{
conn = &pEntry->conns[i];
conn->cdbProc = list_nth(aSlice->primaryProcesses, i);
if (conn->cdbProc)
{
numValidProcs++;
/* update the max buffer count of our rx buffer pool. */
rx_buffer_pool.maxCount += Gp_interconnect_queue_depth;
/* rx_buffer_queue */
conn->pkt_q_size = 0;
conn->pkt_q_head = 0;
conn->pkt_q_tail = 0;
conn->pkt_q = (uint8 **) palloc0(Gp_interconnect_queue_depth * sizeof(uint8 *));
/* connection header info (defining characteristics of this connection) */
MemSet(&conn->conn_info, 0, sizeof(conn->conn_info));
conn->route = i;
conn->conn_info.seq = 1;
conn->stillActive = true;
incoming_count++;
conn->conn_info.motNodeId = pEntry->motNodeId;
conn->conn_info.recvSliceIndex = mySlice->sliceIndex;
conn->conn_info.sendSliceIndex = aSlice->sliceIndex;
conn->conn_info.srcContentId = conn->cdbProc->contentid;
conn->conn_info.dstContentId = GetQEIndex();
conn->conn_info.srcListenerPort = conn->cdbProc->listenerPort;
conn->conn_info.srcPid = conn->cdbProc->pid;
conn->conn_info.dstPid = MyProcPid;
conn->conn_info.dstListenerPort = (Gp_listener_port>>16) & 0x0ffff;
conn->conn_info.sessionId = gp_session_id;
conn->conn_info.icId = gp_interconnect_id;
conn->conn_info.flags = UDPIC_FLAGS_RECEIVER_TO_SENDER;
connAddHash(&ic_control_info.connHtab, conn);
}
}
expectedTotalIncoming += numValidProcs;
/* let cdbmotion know how many receivers to expect. */
setExpectedReceivers(estate->motionlayer_context, childId, numValidProcs);
}
snd_control_info.cwnd = 0;
snd_control_info.minCwnd = 0;
snd_control_info.ssthresh = 0;
/* Initiate outgoing connections. */
if (mySlice->parentIndex != -1)
{
initSndBufferPool(&snd_buffer_pool);
initUnackQueueRing(&unack_queue_ring);
ic_control_info.isSender = true;
ic_control_info.lastExpirationCheckTime = getCurrentTime();
ic_control_info.lastPacketSendTime = ic_control_info.lastExpirationCheckTime;
ic_control_info.lastDeadlockCheckTime = ic_control_info.lastExpirationCheckTime;
sendingChunkTransportState = startOutgoingUDPConnections(estate->interconnect_context, mySlice, &expectedTotalOutgoing);
n = sendingChunkTransportState->numConns;
for (i = 0; i < n; i++)
{ /* loop to set up outgoing connections */
conn = &sendingChunkTransportState->conns[i];
setupOutgoingUDPConnection(estate->interconnect_context, sendingChunkTransportState, conn);
outgoing_count++;
continue;
}
snd_control_info.minCwnd = snd_control_info.cwnd;
snd_control_info.ssthresh = snd_buffer_pool.maxCount;
#ifdef TRANSFER_PROTOCOL_STATS
initTransProtoStats();
#endif
}
else
{
ic_control_info.isSender = false;
ic_control_info.lastExpirationCheckTime = 0;
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("SetupUDPInterconnect will activate "
"%d incoming, %d outgoing routes for gp_interconnect_id %d. "
"Listening on ports=%d/%d sockfd=%d.",
expectedTotalIncoming, expectedTotalOutgoing, gp_interconnect_id,
Gp_listener_port&0x0ffff, (Gp_listener_port>>16)&0x0ffff, UDP_listenerFd)));
/* If there are packets cached by background thread, add them to the connections. */
if (gp_interconnect_cache_future_packets)
handleCachedPackets();
estate->interconnect_context->activated = true;
pthread_mutex_unlock(&ic_control_info.lock);
}
/*
* SetupUDPInterconnect
* setup UDP interconnect.
*/
void
SetupUDPInterconnect(EState *estate)
{
if (estate->interconnect_context)
{
elog(FATAL, "SetupUDPInterconnect: already initialized.");
}
else if (!estate->es_sliceTable)
{
elog(FATAL, "SetupUDPInterconnect: no slice table ?");
}
PG_TRY();
{
SetupUDPInterconnect_Internal(estate);
/* Internal error if we locked the mutex but forgot to unlock it. */
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* freeDisorderedPackets
* Put the disordered packets into free buffer list.
*/
static void
freeDisorderedPackets(MotionConn *conn)
{
int k = 0;
if (conn->pkt_q == NULL)
return;
for(; k < Gp_interconnect_queue_depth; k++)
{
icpkthdr *buf = (icpkthdr *)conn->pkt_q[k];
if (buf != NULL)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "CLEAR Out-of-order PKT: conn %p pkt [seq %d] for node %d route %d, [head seq] %d queue size %d, queue head %d queue tail %d", conn, buf->seq, buf->motNodeId, conn->route, conn->conn_info.seq - conn->pkt_q_size, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
/* return the buffer into the free list. */
putRxBufferToFreeList(&rx_buffer_pool, buf);
conn->pkt_q[k] = NULL;
}
}
}
/*
* chunkTransportStateEntryInitialized
* Check whether the transport state entry is initialized.
*/
static bool
chunkTransportStateEntryInitialized(ChunkTransportState *transportStates,
int16 motNodeID)
{
if (motNodeID > transportStates->size || !transportStates->states[motNodeID - 1].valid)
return false;
return true;
}
/*
* computeNetworkStatistics
* Compute the max/min/avg network statistics.
*/
static inline void
computeNetworkStatistics(uint64 value, uint64 *min, uint64 *max, double *sum)
{
if (value >= *max)
*max = value;
if (value <= *min)
*min = value;
*sum += value;
}
/*
* TeardownUDPInterconnect_Internal
* Helper function for TeardownUDPInterconnect.
*
* Developers should pay attention to:
*
* 1) Do not handle interrupts/throw errors in Teardown, otherwise, Teardown may be called twice.
* It will introduce an undefined behavior. And memory leaks will be introduced.
*
* 2) Be careful about adding elog/ereport/write_log in Teardown function,
* esp, out of HOLD_INTERRUPTS/RESUME_INTERRUPTS pair, since elog/ereport/write_log may
* handle interrupts.
*
*/
static void
TeardownUDPInterconnect_Internal(ChunkTransportState *transportStates,
MotionLayerState *mlStates,
bool forceEOS)
{
ChunkTransportStateEntry *pEntry = NULL;
int i;
Slice *mySlice;
MotionConn *conn;
uint64 maxRtt = 0;
double avgRtt = 0;
uint64 minRtt = ~((uint64)0);
uint64 maxDev = 0;
double avgDev = 0;
uint64 minDev = ~((uint64)0);
bool isReceiver = false;
if (transportStates == NULL || transportStates->sliceTable == NULL)
{
elog(LOG, "TeardownUDPInterconnect: missing slice table.");
return;
}
if (!transportStates->states)
{
elog(LOG, "TeardownUDPInterconnect: missing states.");
return;
}
mySlice = (Slice *) list_nth(transportStates->sliceTable->slices, transportStates->sliceId);
HOLD_INTERRUPTS();
/* Log the start of TeardownInterconnect. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
{
int elevel = 0;
if (forceEOS || !transportStates->activated)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = LOG;
else
elevel = DEBUG1;
}
else if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elevel = DEBUG4;
if (elevel)
ereport(elevel, (errmsg("Interconnect seg%d slice%d cleanup state: "
"%s; setup was %s",
GetQEIndex(), mySlice->sliceIndex,
forceEOS ? "force" : "normal",
transportStates->activated ? "completed" : "exited")));
/* if setup did not complete, log the slicetable */
if (!transportStates->activated &&
gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog_node_display(DEBUG3, "local slice table", transportStates->sliceTable, true);
}
/*
* add lock to protect the hash table, since background thread is still working.
*/
pthread_mutex_lock(&ic_control_info.lock);
if (gp_interconnect_cache_future_packets)
cleanupStartupCache();
/*
* Now "normal" connections which made it through our
* peer-registration step. With these we have to worry about
* "in-flight" data.
*/
if (mySlice->parentIndex != -1)
{
Slice *parentSlice;
parentSlice = (Slice *) list_nth(transportStates->sliceTable->slices, mySlice->parentIndex);
/* cleanup a Sending motion node. */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d closing connections to slice%d (%d peers)",
GetQEIndex(), mySlice->sliceIndex, mySlice->parentIndex,
list_length(parentSlice->primaryProcesses));
/*
* In the olden days, we required that the error case
* successfully transmit and end-of-stream message here. But
* the introduction of cdbdisp_check_estate_for_cancel()
* alleviates for the QD case, and the cross-connection of
* writer gangs in the dispatcher (propagation of cancel
* between them) fixes the I-S case.
*
* So the call to forceEosToPeers() is no longer required.
*/
if (chunkTransportStateEntryInitialized(transportStates, mySlice->sliceIndex))
{
/* now it is safe to remove. */
pEntry = removeChunkTransportState(transportStates, mySlice->sliceIndex);
if (pEntry->txfd >= 0)
closesocket(pEntry->txfd);
pEntry->txfd = -1;
pEntry->txfd_family = 0;
/* connection array allocation may fail in interconnect setup. */
if (pEntry->conns)
{
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->cdbProc == NULL)
continue;
/* compute some statistics */
computeNetworkStatistics(conn->rtt, &minRtt, &maxRtt, &avgRtt);
computeNetworkStatistics(conn->dev, &minDev, &maxDev, &avgDev);
icBufferListReturn(&conn->sndQueue, false);
icBufferListReturn(&conn->unackQueue, Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY ? false : true);
connDelHash(&ic_control_info.connHtab, conn);
}
avgRtt = avgRtt / pEntry->numConns;
avgDev = avgDev / pEntry->numConns;
/* free all send side buffers */
cleanSndBufferPool(&snd_buffer_pool);
}
}
#ifdef TRANSFER_PROTOCOL_STATS
dumpTransProtoStats();
#endif
}
/* Previously, there is a piece of code that deals with pending stops.
* Now it is delegated to background rx thread which will deal with any
* mismatched packets.
*/
/*
* cleanup all of our Receiving Motion nodes, these get closed
* immediately (the receiver know for real if they want to shut
* down -- they aren't going to be processing any more data).
*/
ListCell *cell;
foreach(cell, mySlice->children)
{
Slice *aSlice;
int childId = lfirst_int(cell);
aSlice = (Slice *) list_nth(transportStates->sliceTable->slices, childId);
/*
* First check whether the entry is initialized to avoid the potential
* errors thrown out from the removeChunkTransportState, which may
* introduce some memory leaks.
*/
if (chunkTransportStateEntryInitialized(transportStates, aSlice->sliceIndex))
{
/* remove it */
pEntry = removeChunkTransportState(transportStates, aSlice->sliceIndex);
Assert(pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect closing connections from slice%d",
aSlice->sliceIndex);
isReceiver = true;
if (pEntry->conns)
{
/*
* receivers know that they no longer care about data from
* below ... so we can safely discard data queued in both
* directions
*/
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->cdbProc == NULL)
continue;
rx_buffer_pool.maxCount -= Gp_interconnect_queue_depth;
/* out of memory has occurred, break out */
if (!conn->pkt_q)
break;
connDelHash(&ic_control_info.connHtab, conn);
/* putRxBufferAndSendAck() dequeues messages and moves them to pBuff */
while (conn->pkt_q_size > 0)
{
putRxBufferAndSendAck(conn, NULL);
}
/* we also need to clear all the out-of-order packets */
freeDisorderedPackets(conn);
/* free up the packet queue */
pfree(conn->pkt_q);
conn->pkt_q = NULL;
}
pfree(pEntry->conns);
pEntry->conns = NULL;
}
}
}
/* now that we've moved active rx-buffers to the freelist, we can prune the freelist itself */
while (rx_buffer_pool.count > rx_buffer_pool.maxCount)
{
icpkthdr *buf = NULL;
/* If this happened, there is some memory leaks.. */
if (rx_buffer_pool.freeList == NULL)
{
pthread_mutex_unlock(&ic_control_info.lock);
elog(FATAL, "freelist NULL: count %d max %d buf %p", rx_buffer_pool.count, rx_buffer_pool.maxCount, rx_buffer_pool.freeList);
}
buf = getRxBufferFromFreeList(&rx_buffer_pool);
freeRxBuffer(&rx_buffer_pool, buf);
}
/*
* Update the history of interconnect instance id.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
updateCursorIcEntry(&rx_control_info.cursorHistoryTable, transportStates->sliceTable->ic_instance_id, 0);
}
else if (Gp_role == GP_ROLE_EXECUTE)
{
rx_control_info.lastTornIcId = transportStates->sliceTable->ic_instance_id;
}
elog((gp_interconnect_log_stats ? LOG: DEBUG1), "Interconnect State: "
"isSender %d isReceiver %d "
"snd_queue_depth %d recv_queue_depth %d Gp_max_packet_size %d "
"UNACK_QUEUE_RING_SLOTS_NUM %d TIMER_SPAN %d DEFAULT_RTT %d "
"forceEOS %d, gp_interconnect_id %d ic_id_last_teardown %d "
"snd_buffer_pool.count %d snd_buffer_pool.maxCount %d snd_sock_bufsize %d recv_sock_bufsize %d "
"snd_pkt_count %d retransmits %d crc_errors %d"
" recv_pkt_count %d recv_ack_num %d"
" recv_queue_size_avg %f"
" capacity_avg %f"
" freebuf_avg %f "
"mismatch_pkt_num %d disordered_pkt_num %d duplicated_pkt_num %d"
" rtt/dev [" UINT64_FORMAT "/" UINT64_FORMAT ", %f/%f, " UINT64_FORMAT "/" UINT64_FORMAT "] "
" cwnd %f status_query_msg_num %d",
ic_control_info.isSender, isReceiver,
Gp_interconnect_snd_queue_depth, Gp_interconnect_queue_depth, Gp_max_packet_size,
UNACK_QUEUE_RING_SLOTS_NUM, TIMER_SPAN, DEFAULT_RTT,
forceEOS, transportStates->sliceTable->ic_instance_id, rx_control_info.lastTornIcId,
snd_buffer_pool.count, snd_buffer_pool.maxCount, ic_control_info.socketSendBufferSize, ic_control_info.socketRecvBufferSize,
ic_statistics.sndPktNum, ic_statistics.retransmits, ic_statistics.crcErrors,
ic_statistics.recvPktNum, ic_statistics.recvAckNum,
(double)((double)ic_statistics.totalRecvQueueSize)/((double)ic_statistics.recvQueueSizeCountingTime),
(double)((double)ic_statistics.totalCapacity)/((double)ic_statistics.capacityCountingTime),
(double)((double)ic_statistics.totalBuffers)/((double)ic_statistics.bufferCountingTime),
ic_statistics.mismatchNum, ic_statistics.disorderedPktNum, ic_statistics.duplicatedPktNum,
(minRtt == ~((uint64)0) ? 0 : minRtt), (minDev == ~((uint64)0) ? 0 : minDev), avgRtt, avgDev, maxRtt, maxDev,
snd_control_info.cwnd, ic_statistics.statusQueryMsgNum);
ic_control_info.isSender = false;
memset(&ic_statistics, 0, sizeof(ICStatistics));
pthread_mutex_unlock(&ic_control_info.lock);
/* reset the rx thread network error flag */
resetRxThreadError();
transportStates->activated = false;
transportStates->sliceTable = NULL;
if (transportStates != NULL)
{
if (transportStates->states != NULL)
{
pfree(transportStates->states);
transportStates->states = NULL;
}
pfree(transportStates);
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_TERSE)
elog(DEBUG1, "TeardownUDPInterconnect successful");
RESUME_INTERRUPTS();
}
/*
* TeardownUDPInterconnect
* Tear down UDP interconnect.
*
* This function is called to release the resources used by interconnect.
*/
void
TeardownUDPInterconnect(ChunkTransportState *transportStates,
MotionLayerState *mlStates,
bool forceEOS)
{
PG_TRY();
{
TeardownUDPInterconnect_Internal(transportStates, mlStates, forceEOS);
Assert(pthread_mutex_unlock(&ic_control_info.errorLock) != 0);
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.errorLock);
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* prepareRxConnForRead
* Prepare the receive connection for reading.
*
* MUST BE CALLED WITH rx_control_info.lock LOCKED.
*/
static void
prepareRxConnForRead(MotionConn *conn)
{
elog(DEBUG3, "In prepareRxConnForRead: conn %p, q_head %d q_tail %d q_size %d", conn, conn->pkt_q_head, conn->pkt_q_tail, conn->pkt_q_size);
Assert(conn->pkt_q[conn->pkt_q_head] != NULL);
conn->pBuff = conn->pkt_q[conn->pkt_q_head];
conn->msgPos = conn->pBuff;
conn->msgSize = ((icpkthdr *)conn->pBuff)->len;
conn->recvBytes = conn->msgSize;
}
/*
* receiveChunksUDP
* Receive chunks from the senders
*
* MUST BE CALLED WITH rx_control_info.lock LOCKED.
*/
static TupleChunkListItem
receiveChunksUDP(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
int16 motNodeID, int16 *srcRoute, MotionConn *conn, bool inTeardown)
{
int retries = 0;
bool directed = false;
MotionConn *rxconn = NULL;
TupleChunkListItem tcItem=NULL;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "receivechunksUDP: motnodeid %d", motNodeID);
#endif
Assert(pTransportStates);
Assert(pTransportStates->sliceTable);
if (conn != NULL)
{
directed = true;
*srcRoute = conn->route;
setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, conn->route,
pTransportStates->sliceTable->ic_instance_id);
}
else
{
/* non-directed receive */
setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, ANY_ROUTE,
pTransportStates->sliceTable->ic_instance_id);
}
/* we didn't have any data, so we've got to read it from the network. */
for (;;)
{
/* 1. Do we have data ready */
if (rx_control_info.mainWaitingState.reachRoute != ANY_ROUTE)
{
rxconn = pEntry->conns + rx_control_info.mainWaitingState.reachRoute;
prepareRxConnForRead(rxconn);
elog(DEBUG2, "receiveChunksUDP: non-directed rx woke on route %d", rx_control_info.mainWaitingState.reachRoute);
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
}
aggregateStatistics(pEntry);
if (rxconn != NULL)
{
Assert(rxconn->pBuff);
pthread_mutex_unlock(&ic_control_info.lock);
elog(DEBUG2, "got data with length %d", rxconn->recvBytes);
/* successfully read into this connection's buffer. */
tcItem = RecvTupleChunk(rxconn, inTeardown);
if (!directed)
*srcRoute = rxconn->route;
return tcItem;
}
retries++;
/* 2. Wait for data to become ready */
if (waitOnCondition(MAIN_THREAD_COND_TIMEOUT, &ic_control_info.cond, &ic_control_info.lock))
{
continue; /* success ! */
}
/* handle timeout, check for cancel */
pthread_mutex_unlock(&ic_control_info.lock);
/* check the potential errors in rx thread. */
checkRxThreadError();
/* do not check interrupts when holding the lock */
ML_CHECK_FOR_INTERRUPTS(inTeardown);
/* check to see if the dispatcher should cancel */
if (Gp_role == GP_ROLE_DISPATCH)
{
checkForCancelFromQD(pTransportStates);
}
/* NIC on master (and thus the QD connection) may become bad, check it. */
if ((retries & 0x3f) == 0)
checkQDConnectionAlive();
pthread_mutex_lock(&ic_control_info.lock);
} /* for (;;) */
/* We either got data, or get cancelled. We never make it out to
* here. */
return NULL; /* make GCC behave */
}
/*
* RecvTupleChunkFromAnyUDP_Internal
* Receive tuple chunks from any route (connections)
*/
static inline TupleChunkListItem
RecvTupleChunkFromAnyUDP_Internal(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn=NULL;
int i, index, activeCount=0;
TupleChunkListItem tcItem=NULL;
bool found = false;
if (!transportStates)
{
elog(FATAL, "RecvTupleChunkFromAnyUDP: missing context");
}
else if (!transportStates->activated)
{
elog(FATAL, "RecvTupleChunkFromAnyUDP: interconnect context not active!");
}
getChunkTransportState(transportStates, motNodeID, &pEntry);
index = pEntry->scanStart;
pthread_mutex_lock(&ic_control_info.lock);
for (i = 0; i < pEntry->numConns; i++, index++)
{
if (index >= pEntry->numConns)
index = 0;
conn = pEntry->conns + index;
if (conn->stillActive)
activeCount++;
ic_statistics.totalRecvQueueSize += conn->pkt_q_size;
ic_statistics.recvQueueSizeCountingTime++;
if (conn->pkt_q_size > 0)
{
found = true;
prepareRxConnForRead(conn);
break;
}
}
if (found)
{
pthread_mutex_unlock(&ic_control_info.lock);
tcItem = RecvTupleChunk(conn, transportStates->teardownActive);
*srcRoute = conn->route;
pEntry->scanStart = index + 1;
return tcItem;
}
/* no data pending in our queue */
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "RecvTupleChunkFromAnyUDP(): activeCount is %d", activeCount);
#endif
if (activeCount == 0)
{
pthread_mutex_unlock(&ic_control_info.lock);
return NULL;
}
/* receiveChunksUDP() releases rx_control_info.lock as a side-effect */
tcItem = receiveChunksUDP(transportStates, pEntry, motNodeID, srcRoute, NULL, transportStates->teardownActive);
pEntry->scanStart = *srcRoute + 1;
return tcItem;
}
/*
* RecvTupleChunkFromAnyUDP
* Receive tuple chunks from any route (connections)
*/
static TupleChunkListItem
RecvTupleChunkFromAnyUDP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 *srcRoute)
{
TupleChunkListItem icItem = NULL;
PG_TRY();
{
icItem = RecvTupleChunkFromAnyUDP_Internal(mlStates, transportStates, motNodeID, srcRoute);
/* error if mutex still held (debug build only) */
Assert(pthread_mutex_unlock(&ic_control_info.errorLock) != 0);
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.errorLock);
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
return icItem;
}
/*
* RecvTupleChunkFromUDP_Internal
* Receive tuple chunks from a specific route (connection)
*/
static inline TupleChunkListItem
RecvTupleChunkFromUDP_Internal(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn=NULL;
int16 route;
if (!transportStates)
{
elog(FATAL, "RecvTupleChunkFromUDP: missing context");
}
else if (!transportStates->activated)
{
elog(FATAL, "RecvTupleChunkFromUDP: interconnect context not active!");
}
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "RecvTupleChunkFromUDP().");
#endif
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "RecvTupleChunkFromUDP(motNodID=%d, srcRoute=%d)", motNodeID, srcRoute);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + srcRoute;
#ifdef AMS_VERBOSE_LOGGING
if (!conn->stillActive)
{
elog(LOG, "RecvTupleChunkFromUDP(): connection inactive ?!");
}
#endif
pthread_mutex_lock(&ic_control_info.lock);
if (!conn->stillActive)
{
pthread_mutex_unlock(&ic_control_info.lock);
return NULL;
}
ic_statistics.totalRecvQueueSize += conn->pkt_q_size;
ic_statistics.recvQueueSizeCountingTime++;
if (conn->pkt_q[conn->pkt_q_head] != NULL)
{
prepareRxConnForRead(conn);
pthread_mutex_unlock(&ic_control_info.lock);
TupleChunkListItem tcItem=NULL;
tcItem = RecvTupleChunk(conn, transportStates->teardownActive);
return tcItem;
}
/* no existing data, we've got to read a packet */
/* receiveChunksUDP() releases ic_control_info.lock as a side-effect */
TupleChunkListItem chunks = receiveChunksUDP(transportStates, pEntry, motNodeID, &route, conn, transportStates->teardownActive);
return chunks;
}
/*
* RecvTupleChunkFromUDP
* Receive tuple chunks from a specific route (connection)
*/
static TupleChunkListItem
RecvTupleChunkFromUDP(ChunkTransportState *transportStates,
int16 motNodeID,
int16 srcRoute)
{
TupleChunkListItem icItem = NULL;
PG_TRY();
{
icItem = RecvTupleChunkFromUDP_Internal(transportStates, motNodeID, srcRoute);
/* error if mutex still held (debug build only) */
Assert(pthread_mutex_unlock(&ic_control_info.errorLock) != 0);
Assert(pthread_mutex_unlock(&ic_control_info.lock) != 0);
}
PG_CATCH();
{
pthread_mutex_unlock(&ic_control_info.errorLock);
pthread_mutex_unlock(&ic_control_info.lock);
PG_RE_THROW();
}
PG_END_TRY();
return icItem;
}
/*
* markUDPConnInactive
* Mark the connection inactive.
*/
void
markUDPConnInactive(MotionConn *conn)
{
pthread_mutex_lock(&ic_control_info.lock);
conn->stillActive = false;
pthread_mutex_unlock(&ic_control_info.lock);
return;
}
/*
* aggregateStatistics
* aggregate statistics.
*/
static void
aggregateStatistics(ChunkTransportStateEntry *pEntry)
{
/*
* We first clear the stats, and then compute new stats
* by aggregating the stats from each connection.
*/
pEntry->stat_total_ack_time = 0;
pEntry->stat_count_acks = 0;
pEntry->stat_max_ack_time = 0;
pEntry->stat_min_ack_time = ~((uint64)0);
pEntry->stat_count_resent = 0;
pEntry->stat_max_resent = 0;
pEntry->stat_count_dropped = 0;
int connNo;
for (connNo = 0; connNo < pEntry->numConns; connNo++)
{
MotionConn *conn = &pEntry->conns[connNo];
pEntry->stat_total_ack_time += conn->stat_total_ack_time;
pEntry->stat_count_acks += conn->stat_count_acks;
pEntry->stat_max_ack_time = Max(pEntry->stat_max_ack_time, conn->stat_max_ack_time);
pEntry->stat_min_ack_time = Min(pEntry->stat_min_ack_time, conn->stat_min_ack_time);
pEntry->stat_count_resent += conn->stat_count_resent;
pEntry->stat_max_resent = Max(pEntry->stat_max_resent, conn->stat_max_resent);
pEntry->stat_count_dropped += conn->stat_count_dropped;
}
}
/*
* logPkt
* Log a packet.
*
*/
static inline void
logPkt(char *prefix, icpkthdr *pkt)
{
write_log("%s [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d ",
prefix, pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
pkt->srcContentId, pkt->dstContentId,
pkt->srcPid, pkt->dstPid,
pkt->srcListenerPort, pkt->dstListenerPort,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->sessionId, pkt->icId,
pkt->flags);
}
/*
* handleAckedPacket
* Called by sender to process acked packet.
*
* Remove it from unack queue and unack queue ring, change the rtt ...
*/
static void inline
handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now)
{
uint64 ackTime = 0;
bool bufIsHead = (&buf->primary == icBufferListFirst(&ackConn->unackQueue));
buf = icBufferListDelete(&ackConn->unackQueue, buf);
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
unack_queue_ring.numOutStanding--;
if (icBufferListLength(&ackConn->unackQueue) >= 1)
unack_queue_ring.numSharedOutStanding--;
ackTime = now - buf->sentTime;
/* In udp_testmode, we do not change rtt dynamically due to the
* large number of packet losses introduced by fault injection code.
* This can decrease the testing time.
*/
#ifdef USE_ASSERT_CHECKING
if (!udp_testmode)
#endif
{
uint64 newRTT = 0;
uint64 newDEV = 0;
if (buf->nRetry == 0)
{
/* newRTT = buf->conn->rtt * (1 - RTT_COEFFICIENT) + ackTime * RTT_COEFFICIENT; */
newRTT = buf->conn->rtt - (buf->conn->rtt >> RTT_SHIFT_COEFFICIENT) + (ackTime >> RTT_SHIFT_COEFFICIENT);
newRTT = Min(MAX_RTT, Max(newRTT, MIN_RTT));
buf->conn->rtt = newRTT;
/* newDEV = buf->conn->dev * (1 - DEV_COEFFICIENT) + DEV_COEFFICIENT * abs(ackTime - newRTT); */
newDEV = buf->conn->dev - (buf->conn->dev >> DEV_SHIFT_COEFFICIENT) + (abs(ackTime - newRTT) >> DEV_SHIFT_COEFFICIENT);
newDEV = Min(MAX_DEV, Max(newDEV, MIN_DEV));
buf->conn->dev = newDEV;
/* adjust the conjestion control window. */
if (snd_control_info.cwnd < snd_control_info.ssthresh)
snd_control_info.cwnd += 1;
else
snd_control_info.cwnd += 1/snd_control_info.cwnd;
snd_control_info.cwnd = Min(snd_control_info.cwnd, snd_buffer_pool.maxCount);
}
}
}
buf->conn->stat_total_ack_time += ackTime;
buf->conn->stat_max_ack_time = Max(ackTime, buf->conn->stat_max_ack_time);
buf->conn->stat_min_ack_time = Min(ackTime, buf->conn->stat_min_ack_time);
/* only change receivedAckSeq when it is the smallest pkt we sent and
* have not received ack for it.
*/
if (bufIsHead)
ackConn->receivedAckSeq = buf->pkt->seq;
/* The first packet acts like a connect setup packet */
if (buf->pkt->seq == 1)
ackConn->state = mcsStarted;
icBufferListAppend(&snd_buffer_pool.freeList, buf);
#ifdef AMS_VERBOSE_LOGGING
write_log("REMOVEPKT %d from unack queue for route %d (retry %d) sndbufmaxcount %d sndbufcount %d sndbuffreelistlen %d, sntSeq %d consumedSeq %d recvAckSeq %d capacity %d, sndQ %d, unackQ %d", buf->pkt->seq, ackConn->route, buf->nRetry, snd_buffer_pool.maxCount, snd_buffer_pool.count, icBufferListLength(&snd_buffer_pool.freeList), buf->conn->sentSeq, buf->conn->consumedSeq, buf->conn->receivedAckSeq, buf->conn->capacity, icBufferListLength(&buf->conn->sndQueue), icBufferListLength(&buf->conn->unackQueue));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
icBufferListLog(&buf->conn->unackQueue);
icBufferListLog(&buf->conn->sndQueue);
}
#endif
}
/*
* handleAck
* handle acks incoming from our upstream peers.
*
* if we receive a stop message, return true (caller will clean up).
*/
static bool
handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry)
{
bool ret = false;
MotionConn *ackConn = NULL;
int n;
struct sockaddr_storage peer;
socklen_t peerlen;
struct icpkthdr *pkt = snd_control_info.ackBuffer;
bool shouldSendBuffers = false;
for (;;)
{
/* ready to read on our socket ? */
peerlen = sizeof(peer);
n = recvfrom(pEntry->txfd, (char *)pkt, MIN_PACKET_SIZE, 0,
(struct sockaddr *)&peer, &peerlen);
if (n < 0)
{
if (errno == EWOULDBLOCK) /* had nothing to read. */
{
aggregateStatistics(pEntry);
return ret;
}
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (errno == EINTR)
continue;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error waiting for peer ack"),
errdetail("During recvfrom() call.\n")));
/* not reached */
}
else if (n < sizeof(struct icpkthdr))
{
continue;
}
else if (n != pkt->len)
{
continue;
}
/*
* check the CRC of the payload.
*/
if (gp_interconnect_full_crc)
{
if (!checkCRC(pkt))
{
gp_atomic_add_32(&ic_statistics.crcErrors, 1);
if (DEBUG2 >= log_min_messages)
write_log("received network data error, dropping bad packet, user data unaffected.");
continue;
}
}
#ifdef AMS_VERBOSE_LOGGING
log_pkt("GOT ACK", pkt);
#endif
/* read packet, is this the ack we want ?
*
* Here, using gp_interconnect_id is safe, since
* only senders get acks. QD (never be a sender) does not. QD may
* have several concurrent running interconnect
* instances.
*/
if (pkt->srcContentId == GetQEIndex() &&
pkt->srcPid == MyProcPid &&
pkt->srcListenerPort == ((Gp_listener_port>>16) & 0x0ffff) &&
pkt->sessionId == gp_session_id &&
pkt->icId == gp_interconnect_id)
{
/* packet is for me. Note here we do not need to get a connection lock here,
* since background rx thread only read the hash table.
*/
ackConn = findConnByHeader(&ic_control_info.connHtab, pkt);
if (ackConn == NULL)
{
elog(LOG, "Received ack for unknown connection (flags 0x%x)", pkt->flags);
continue;
}
ackConn->stat_count_acks++;
ic_statistics.recvAckNum++;
uint64 now = getCurrentTime();
ackConn->deadlockCheckBeginTime = now;
/* We simply disregard pkt losses (NAK) due to process start race (that is,
* sender is started earlier than receiver. rx background thread may receive
* packets when connections are not created yet).
*
* Another option is to resend the packet immediately,
* but experiments do not show any benefits.
*/
if (pkt->flags & UDPIC_FLAGS_NAK)
continue;
while (true)
{
if (pkt->flags & UDPIC_FLAGS_CAPACITY)
{
if (pkt->extraSeq > ackConn->consumedSeq)
{
ackConn->capacity += pkt->extraSeq - ackConn->consumedSeq;
ackConn->consumedSeq = pkt->extraSeq;
shouldSendBuffers = true;
}
}
else if (pkt->flags & UDPIC_FLAGS_DUPLICATE)
{
if (DEBUG1 >= log_min_messages)
write_log("GOTDUPACK [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
shouldSendBuffers |= (handleAckForDuplicatePkt(ackConn, pkt));
break;
}
else if (pkt->flags & UDPIC_FLAGS_DISORDER)
{
if (DEBUG1 >= log_min_messages)
write_log("GOTDISORDER [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, pEntry, ackConn, pkt));
break;
}
if (pkt->seq <= ackConn->receivedAckSeq)
{
if (DEBUG1 >= log_min_messages)
write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq);
break;
}
/* haven't gotten a stop request, maybe this is one ? */
if ((pkt->flags & UDPIC_FLAGS_STOP) && !ackConn->stopRequested && ackConn->stillActive)
{
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "got ack with stop; srcpid %d dstpid %d cmd %d flags 0x%x pktseq %d connseq %d", pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, pkt->seq, ackConn->conn_info.seq);
#endif
ackConn->stopRequested = true;
ackConn->conn_info.flags |= UDPIC_FLAGS_STOP;
ret = true;
/* continue to deal with acks */
}
/* deal with a regular ack. */
if (pkt->flags & UDPIC_FLAGS_ACK)
{
ICBufferLink *link = NULL;
ICBufferLink *next = NULL;
ICBuffer *buf = NULL;
#ifdef AMS_VERBOSE_LOGGING
write_log("GOTACK [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq);
#endif
link = icBufferListFirst(&ackConn->unackQueue);
buf = GET_ICBUFFER_FROM_PRIMARY(link);
while (!icBufferListIsHead(&ackConn->unackQueue, link) && buf->pkt->seq <= pkt->seq)
{
next = link->next;
handleAckedPacket(ackConn, buf, now);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
}
break;
}
/* When there is a capacity increase or some outstanding buffers
* removed from the unack queue ring, we should try to send buffers for the connection.
* Even when stop is received, we still send here, since in STOP/EOS
* race case, we may have been in EOS sending logic and will not check stop message.
*/
if (shouldSendBuffers)
sendBuffers(transportStates, pEntry, ackConn);
}
else
if (DEBUG1 >= log_min_messages)
write_log("handleAck: not the ack we're looking for (flags 0x%x)...mot(%d) content(%d:%d) srcpid(%d:%d) dstpid(%d) srcport(%d:%d) dstport(%d) sess(%d:%d) cmd(%d:%d)",
pkt->flags, pkt->motNodeId,
pkt->srcContentId, GetQEIndex(),
pkt->srcPid, MyProcPid,
pkt->dstPid,
pkt->srcListenerPort, ((Gp_listener_port>>16) & 0x0ffff),
pkt->dstListenerPort,
pkt->sessionId, gp_session_id,
pkt->icId, gp_interconnect_id);
}
}
/*
* addCRC
* add CRC field to the packet.
*/
static inline void
addCRC(icpkthdr *pkt)
{
pg_crc32 local_crc;
INIT_CRC32C(local_crc);
COMP_CRC32C(local_crc, pkt, pkt->len);
FIN_CRC32C(local_crc);
pkt->crc = local_crc;
}
/*
* checkCRC
* check the validity of the packet.
*/
static inline bool
checkCRC(icpkthdr *pkt)
{
pg_crc32 rx_crc, local_crc;
rx_crc = pkt->crc;
pkt->crc = 0;
INIT_CRC32C(local_crc);
COMP_CRC32C(local_crc, pkt, pkt->len);
FIN_CRC32C(local_crc);
if (rx_crc != local_crc)
{
return false;
}
return true;
}
/*
* prepareXmit
* Prepare connection for transmit.
*/
static inline void
prepareXmit(MotionConn *conn)
{
Assert(conn != NULL);
conn->conn_info.len = conn->msgSize;
conn->conn_info.crc = 0;
memcpy(conn->pBuff, &conn->conn_info, sizeof(conn->conn_info));
/* increase the sequence no */
conn->conn_info.seq++;
if (gp_interconnect_full_crc)
{
icpkthdr *pkt = (icpkthdr *)conn->pBuff;
addCRC(pkt);
}
}
/*
* sendOnce
* Send a packet.
*/
static void
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, ICBuffer *buf, MotionConn * conn)
{
int32 n;
#ifdef USE_ASSERT_CHECKING
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
{
#ifdef AMS_VERBOSE_LOGGING
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
#endif
return;
}
#endif
xmit_retry:
n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
(struct sockaddr *)&conn->peer, conn->peer_len);
if (n < 0)
{
if (errno == EINTR)
goto xmit_retry;
if (errno == EAGAIN) /* no space ? not an error. */
return;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error writing an outgoing packet: %m"),
errdetail("error during sendto() call (error:%d).\n"
"For Remote Connection: contentId=%d at %s",
errno, conn->remoteContentId,
conn->remoteHostAndPort)));
/* not reached */
}
if (n != buf->pkt->len)
{
if (DEBUG1 >= log_min_messages)
write_log("Interconnect error writing an outgoing packet [seq %d]: short transmit (given %d sent %d) during sendto() call."
"For Remote Connection: contentId=%d at %s", buf->pkt->seq, buf->pkt->len, n,
conn->remoteContentId,
conn->remoteHostAndPort);
#ifdef AMS_VERBOSE_LOGGING
logPkt("PKT DETAILS ", buf->pkt);
#endif
}
return;
}
/*
* handleStopMsgs
* handle stop messages.
*
*/
static void
handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, int16 motionId)
{
int i = 0;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "handleStopMsgs: node %d", motionId);
#endif
while (i < pEntry->numConns)
{
MotionConn *conn=NULL;
conn = pEntry->conns + i;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "handleStopMsgs: node %d route %d %s %s", motionId, conn->route,
(conn->stillActive ? "active" : "NOT active"), (conn->stopRequested ? "stop requested" : ""));
elog(DEBUG3, "handleStopMsgs: node %d route %d msgSize %d", motionId, conn->route, conn->msgSize);
#endif
/*
* MPP-2427: we're guaranteed to have recently flushed, but
* this might not be empty (if we got a stop on a buffer that
* wasn't the one we were sending) ... empty it first so the
* outbound buffer is empty when we get here.
*/
if (conn->stillActive && conn->stopRequested)
{
/* mark buffer empty */
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
/* now send our stop-ack EOS */
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
Assert(conn->curBuff != NULL);
conn->pBuff[conn->msgSize] = 'S';
conn->msgSize += 1;
prepareXmit(conn);
/* now ready to actually send */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "handleStopMsgs: node %d route %d, seq %d", motionId, i, conn->conn_info.seq);
/* place it into the send queue */
icBufferListAppend(&conn->sndQueue, conn->curBuff);
/* return all buffers */
icBufferListReturn(&conn->sndQueue, false);
icBufferListReturn(&conn->unackQueue, Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY ? false : true);
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
conn->state = mcsEosSent;
conn->curBuff = NULL;
conn->pBuff = NULL;
conn->stillActive = false;
conn->stopRequested = false;
}
i++;
if (i == pEntry->numConns)
{
if (pollAcks(transportStates, pEntry->txfd, 0))
{
if (handleAcks(transportStates, pEntry))
{
/* more stops found, loop again.*/
i = 0;
continue;
}
}
}
}
}
/*
* sendBuffers
* Called by sender to send the buffers in the send queue.
*
* Send the buffers in the send queue of the connection if there is capacity left
* and the congestion control condition is satisfied.
*
* Here, we make assure that a connection can have at least one outstanding buffer.
* This is very important for two reasons:
*
* 1) The handling logic of the ack of the outstanding buffer can always send a buffer
* in the send queue. Otherwise, there maybe a deadlock.
* 2) This makes assure that any connection can have a minimum bandwidth for data
* sending.
*
* After sending a buffer, the buffer will be placed into both the unack queue and
* the corresponding queue in the unack queue ring.
*/
static void
sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
while (conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
{
ICBuffer *buf = NULL;
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS && (icBufferListLength(&conn->unackQueue) > 0
&& unack_queue_ring.numSharedOutStanding >= (snd_control_info.cwnd - snd_control_info.minCwnd)))
break;
/* for connection setup, we only allow one outstanding packet. */
if (conn->state == mcsSetupOutgoingConnection && icBufferListLength(&conn->unackQueue) >= 1)
break;
buf = icBufferListPop(&conn->sndQueue);
uint64 now = getCurrentTime();
buf->sentTime = now;
buf->unackQueueRingSlot = -1;
buf->nRetry = 0;
buf->conn = conn;
conn->capacity--;
icBufferListAppend(&conn->unackQueue, buf);
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
unack_queue_ring.numOutStanding++;
if (icBufferListLength(&conn->unackQueue) > 1)
unack_queue_ring.numSharedOutStanding++;
putIntoUnackQueueRing(&unack_queue_ring, buf, computeExpirationPeriod(buf->conn, buf->nRetry), now);
}
/*
* Note the place of sendOnce here.
* If we send before appending it to the unack queue and
* putting it into unack queue ring, and there is a
* network error occurred in the sendOnce function, error
* message will be output. In the time of error message output,
* interrupts is potentially checked, if there is a pending query cancel,
* it will lead to a dangled buffer (memory leak).
*/
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt);
#endif
sendOnce(transportStates, pEntry, buf, conn);
ic_statistics.sndPktNum++;
#ifdef AMS_VERBOSE_LOGGING
logPkt("SEND PKT DETAIL", buf->pkt);
#endif
buf->conn->sentSeq = buf->pkt->seq;
}
}
/*
* handleDisorderPacket
* Called by rx thread to assemble and send a disorder message.
*
* In current implementation, we limit the number of lost packet sequence numbers
* in the disorder message by the MIN_PACKET_SIZE. There are two reasons here:
*
* 1) The maximal number of lost packet sequence numbers are actually bounded by the
* receive queue depth whose maximal value is very large. Since we share the packet
* receive and ack receive in the background thread, the size of disorder should be
* also limited by the max packet size.
* 2) We can use Gp_max_packet_size here to limit the number of lost packet sequence numbers.
* But considering we do not want to let senders send many packets when getting a lost
* message. Here we use MIN_PACKET_SIZE.
*
*
* the format of a disorder message:
* I) pkt header
* - seq -> packet sequence number that triggers the disorder message
* - extraSeq -> the largest seq of the received packets
* - flags -> UDPIC_FLAGS_DISORDER
* - len -> sizeof(icpkthdr) + sizeof(uint32) * (lost pkt count)
* II) content
* - an array of lost pkt sequence numbers (uint32)
*
*/
static void
handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt)
{
int start = 0;
uint32 lostPktCnt = 0;
uint32 *curSeq = (uint32 *)&rx_control_info.disorderBuffer[1];
uint32 maxSeqs = MAX_SEQS_IN_DISORDER_ACK;
#ifdef AMS_VERBOSE_LOGGING
write_log("PROCESS_DISORDER PKT BEGIN:");
#endif
start = conn->pkt_q_tail;
while (start != pos && lostPktCnt < maxSeqs)
{
if (conn->pkt_q[start] == NULL)
{
*curSeq = tailSeq;
lostPktCnt++;
curSeq++;
#ifdef AMS_VERBOSE_LOGGING
write_log("PROCESS_DISORDER add seq [%d], lostPktCnt %d", *curSeq, lostPktCnt);
#endif
}
tailSeq++;
start = (start + 1) % Gp_interconnect_queue_depth;
}
#ifdef AMS_VERBOSE_LOGGING
write_log("PROCESS_DISORDER PKT END:");
#endif
/* when reaching here, cnt must not be 0 */
sendDisorderAck(conn, pkt->seq, conn->conn_info.seq - 1, lostPktCnt);
}
/*
* handleAckForDisorderPkt
* Called by sender to deal with acks for disorder packet.
*/
static bool
handleAckForDisorderPkt(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, icpkthdr *pkt)
{
ICBufferLink *link = NULL;
ICBuffer *buf = NULL;
ICBufferLink *next = NULL;
uint64 now = getCurrentTime();
uint32 *curLostPktSeq = 0;
int lostPktCnt = 0;
static uint32 times = 0;
static uint32 lastSeq = 0;
bool shouldSendBuffers = false;
if (pkt->extraSeq != lastSeq)
{
lastSeq = pkt->extraSeq;
times = 0;
return false;
}
else
{
times++;
if (times != 2)
return false;
}
curLostPktSeq = (uint32 *) &pkt[1];
lostPktCnt = (pkt->len - sizeof(icpkthdr)) / sizeof(uint32);
/* Resend all the missed packets and remove received packets from queues
*/
link = icBufferListFirst(&conn->unackQueue);
buf = GET_ICBUFFER_FROM_PRIMARY(link);
#ifdef AMS_VERBOSE_LOGGING
write_log("DISORDER: pktlen %d cnt %d pktseq %d first loss %d buf %p", pkt->len, lostPktCnt, pkt->seq, *curLostPktSeq, buf);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
icBufferListLog(&conn->unackQueue);
icBufferListLog(&conn->sndQueue);
}
#endif
/*
* iterate the unack queue
*/
while (!icBufferListIsHead(&conn->unackQueue, link) && buf->pkt->seq <= pkt->seq && lostPktCnt > 0)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("DISORDER: bufseq %d curlostpkt %d cnt %d buf %p pkt->seq %d", buf->pkt->seq, *curLostPktSeq, lostPktCnt, buf, pkt->seq);
#endif
if (buf->pkt->seq == pkt->seq)
{
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
break;
}
if (buf->pkt->seq == *curLostPktSeq)
{
/* this is a lost packet, retransmit */
buf->nRetry++;
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf);
putIntoUnackQueueRing(&unack_queue_ring, buf,
computeExpirationPeriod(buf->conn, buf->nRetry), now);
}
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt);
#endif
sendOnce(transportStates, pEntry, buf, buf->conn);
#ifdef AMS_VERBOSE_LOGGING
write_log("RESEND a buffer for DISORDER: seq %d", buf->pkt->seq);
logPkt("DISORDER RESEND DETAIL ", buf->pkt);
#endif
ic_statistics.retransmits++;
curLostPktSeq++;
lostPktCnt--;
link = link->next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
else if (buf->pkt->seq < *curLostPktSeq)
{
/* remove packet already received. */
next = link->next;
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
else /* buf->pkt->seq > *curPktSeq */
{
/* this case is introduced when the disorder message tell
* you a pkt is lost. But when we handle this message, a
* message (for example, duplicate ack, or another disorder message)
* arriving before this message already removed the pkt.
*/
curLostPktSeq++;
lostPktCnt--;
}
}
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
snd_control_info.ssthresh = Max(snd_control_info.cwnd/2, snd_control_info.minCwnd);
snd_control_info.cwnd = snd_control_info.ssthresh;
}
#ifdef AMS_VERBOSE_LOGGING
write_log("After DISORDER: sndQ %d unackQ %d", icBufferListLength(&conn->sndQueue), icBufferListLength(&conn->unackQueue));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
{
icBufferListLog(&conn->unackQueue);
icBufferListLog(&conn->sndQueue);
}
#endif
return shouldSendBuffers;
}
/*
* handleAckForDuplicatePkt
* Called by sender to deal with acks for duplicate packet.
*
*/
static bool
handleAckForDuplicatePkt(MotionConn *conn, icpkthdr *pkt)
{
ICBufferLink *link = NULL;
ICBuffer *buf = NULL;
ICBufferLink *next = NULL;
uint64 now = getCurrentTime();
bool shouldSendBuffers = false;
#ifdef AMS_VERBOSE_LOGGING
write_log("RESEND the unacked buffers in the queue due to %s", pkt->len == 0 ? "PROCESS_START_RACE" : "DISORDER");
#endif
if (pkt->seq <= pkt->extraSeq)
{
/* Indicate a bug here. */
write_log("ERROR: invalid duplicate message: seq %d extraSeq %d", pkt->seq, pkt->extraSeq);
return false;
}
link = icBufferListFirst(&conn->unackQueue);
buf = GET_ICBUFFER_FROM_PRIMARY(link);
/* deal with continuous pkts */
while (!icBufferListIsHead(&conn->unackQueue, link) && (buf->pkt->seq <= pkt->extraSeq))
{
next = link->next;
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
/* deal with the single duplicate packet */
while (!icBufferListIsHead(&conn->unackQueue, link) && buf->pkt->seq <= pkt->seq)
{
next = link->next;
if (buf->pkt->seq == pkt->seq)
{
handleAckedPacket(conn, buf, now);
shouldSendBuffers = true;
break;
}
link = next;
buf = GET_ICBUFFER_FROM_PRIMARY(link);
}
return shouldSendBuffers;
}
/*
* checkNetworkTimeout
* check network timeout case.
*/
static inline void
checkNetworkTimeout(ICBuffer *buf, uint64 now)
{
/* Using only the time to first sent time to decide timeout is not enough,
* since there is a possibility the sender process is not scheduled or blocked
* by OS for a long time. In this case, only a few times are tried.
* Thus, the GUC Gp_interconnect_min_retries_before_timeout is added here.
*/
if ((buf->nRetry > Gp_interconnect_min_retries_before_timeout) && (now - buf->sentTime) > (Gp_interconnect_transmit_timeout * 1000 * 1000))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect encountered a network error, please check your network"),
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds", buf->pkt->seq, buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry, Gp_interconnect_transmit_timeout)));
}
}
/*
* checkExpiration
* Check whether packets expire. If a packet expires, resend the packet,
* and adjust its position in the unack queue ring.
*
*/
static void
checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now)
{
/* check for expiration */
int count = 0;
int retransmits = 0;
while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM)
{
/* expired, need to resend them */
ICBuffer *curBuf = NULL;
while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL)
{
curBuf->nRetry++;
putIntoUnackQueueRing(
&unack_queue_ring,
curBuf,
computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now);
#ifdef TRANSFER_PROTOCOL_STATS
updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt);
#endif
sendOnce(transportStates, pEntry, curBuf, curBuf->conn);
retransmits++;
ic_statistics.retransmits++;
curBuf->conn->stat_count_resent++;
curBuf->conn->stat_max_resent = Max(curBuf->conn->stat_max_resent, curBuf->conn->stat_count_resent);
checkNetworkTimeout(curBuf, now);
#ifdef AMS_VERBOSE_LOGGING
write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route);
logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
#endif
}
unack_queue_ring.currentTime += TIMER_SPAN;
unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM);
}
/*
* deal with case when there is a long time this function is not called.
*/
unack_queue_ring.currentTime = now - (now % TIMER_SPAN);
if (retransmits > 0 )
{
snd_control_info.ssthresh = Max(snd_control_info.cwnd/2, snd_control_info.minCwnd);
snd_control_info.cwnd = snd_control_info.minCwnd;
}
}
/*
* checkDeadlock
* Check whether deadlock occurs on a connection.
*
* What this function does is to send a status query message to rx thread when the connection has
* not received any acks for some time. This is to avoid potential deadlock when there are continuous
* ack losses. Packet resending logic does not help avoiding deadlock here since the packets in the unack
* queue may already been removed when the sender knows that they have been already buffered in the
* receiver side queue.
*
* Some considerations on deadlock check time period:
*
* Potential deadlock occurs rarely. According to our experiments on various workloads
* and hardware. It occurred only when fault injection is enabled and a large number packets and
* acknowledgments are discarded. Thus, here we use a relatively large deadlock check period.
*
*/
static void
checkDeadlock(ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
uint64 deadlockCheckTime;
if (icBufferListLength(&conn->unackQueue) == 0 && conn->capacity == 0 && icBufferListLength(&conn->sndQueue) > 0)
{
/* we must have received some acks before deadlock occurs. */
Assert(conn->deadlockCheckBeginTime > 0);
#ifdef USE_ASSERT_CHECKING
if (udp_testmode)
{
deadlockCheckTime = 100000;
}
else
#endif
{
deadlockCheckTime = DEADLOCK_CHECKING_TIME;
}
uint64 now = getCurrentTime();
/* request the capacity to avoid the deadlock case */
if (((now - ic_control_info.lastDeadlockCheckTime) > deadlockCheckTime) && ((now - conn->deadlockCheckBeginTime) > deadlockCheckTime))
{
sendStatusQueryMessage(conn, pEntry->txfd, conn->conn_info.seq - 1);
ic_control_info.lastDeadlockCheckTime = now;
ic_statistics.statusQueryMsgNum++;
/* check network error. */
if ((now - conn->deadlockCheckBeginTime) > (Gp_interconnect_transmit_timeout * 1000 * 1000))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect encountered a network error, please check your network"),
errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds", conn->remoteHostAndPort, conn->conn_info.dstPid, conn->conn_info.dstContentId, Gp_interconnect_transmit_timeout)));
}
}
}
}
/*
* pollAcks
* Timeout polling of acks
*/
static inline bool
pollAcks(ChunkTransportState *transportStates, int fd, int timeout)
{
struct pollfd nfd;
int n;
nfd.fd = fd;
nfd.events = POLLIN;
n = poll(&nfd, 1, timeout);
if (n < 0)
{
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
if (errno == EINTR)
return false;
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error waiting for peer ack"),
errdetail("During poll() call.\n")));
/* not reached */
}
if (n == 0) /* timeout */
{
return false;
}
/* got an ack to handle (possibly a stop message) */
if (n == 1 && (nfd.events & POLLIN))
{
return true;
}
return false;
}
/*
* updateRetransmitStatistics
* Update the restransmit statistics.
*/
static inline void
updateRetransmitStatistics(MotionConn *conn)
{
ic_statistics.retransmits++;
conn->stat_count_resent++;
conn->stat_max_resent = Max(conn->stat_max_resent, conn->stat_count_resent);
}
/*
* checkExpirationCapacityFC
* Check expiration for capacity based flow control method.
*
*/
static void
checkExpirationCapacityFC(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, int timeout)
{
if (icBufferListLength(&conn->unackQueue) == 0)
return;
uint64 now = getCurrentTime();
uint64 elapsed = now - ic_control_info.lastPacketSendTime;
if (elapsed >= (timeout * 1000))
{
ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
sendOnce(transportStates, pEntry, buf, buf->conn);
buf->nRetry++;
ic_control_info.lastPacketSendTime = now;
updateRetransmitStatistics(conn);
checkNetworkTimeout(buf, now);
}
}
/*
* checkExceptions
* Check exceptions including packet expiration, deadlock, bg thread error, NIC failure...
*/
static void
checkExceptions(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, int retry, int timeout)
{
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY/* || conn->state == mcsSetupOutgoingConnection*/)
{
checkExpirationCapacityFC(transportStates, pEntry, conn, timeout);
}
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
{
uint64 now = getCurrentTime();
if(now - ic_control_info.lastExpirationCheckTime > TIMER_CHECKING_PERIOD)
{
checkExpiration(transportStates, pEntry, conn, now);
ic_control_info.lastExpirationCheckTime = now;
}
}
if ((retry & 0x3) == 0)
{
checkDeadlock(pEntry, conn);
checkRxThreadError();
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
}
/* NIC on master (and thus the QD connection) may become bad, check it. */
if ((retry & 0x3f) == 0)
checkQDConnectionAlive();
}
/*
* computeTimeout
* Compute timeout value in ms.
*/
static inline int
computeTimeout(MotionConn *conn, int retry)
{
if (icBufferListLength(&conn->unackQueue) == 0)
return TIMER_CHECKING_PERIOD;
ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue);
ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
if (buf->nRetry == 0 && retry == 0)
return 0;
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS)
return TIMER_CHECKING_PERIOD;
/* for capacity based flow control */
return TIMEOUT(buf->nRetry);
}
/*
* SendChunkUDP
* is used to send a tcItem to a single destination. Tuples often are
* *very small* we aggregate in our local buffer before sending into the kernel.
*
* PARAMETERS
* conn - MotionConn that the tcItem is to be sent to.
* tcItem - message to be sent.
* motionId - Node Motion Id.
*/
static bool
SendChunkUDP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry,
MotionConn *conn,
TupleChunkListItem tcItem,
int16 motionId)
{
int length=TYPEALIGN(TUPLE_CHUNK_ALIGN, tcItem->chunk_length);
int retry = 0;
bool doCheckExpiration = false;
bool gotStops = false;
Assert(conn->msgSize > 0);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "sendChunk: msgSize %d this chunk length %d conn seq %d", conn->msgSize, tcItem->chunk_length, conn->conn_info.seq);
#endif
if (conn->msgSize + length <= Gp_max_packet_size)
{
memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
conn->msgSize += length;
conn->tupleCount++;
return true;
}
/* prepare this for transmit */
ic_statistics.totalCapacity += conn->capacity;
ic_statistics.capacityCountingTime++;
/* try to send it */
prepareXmit(conn);
icBufferListAppend(&conn->sndQueue, conn->curBuff);
sendBuffers(transportStates, pEntry, conn);
uint64 now = getCurrentTime();
if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_CAPACITY)
doCheckExpiration = false;
else
doCheckExpiration = (now - ic_control_info.lastExpirationCheckTime) > MAX_TIME_NO_TIMER_CHECKING ? true : false;
/* get a new buffer */
conn->curBuff = NULL;
conn->pBuff = NULL;
ic_control_info.lastPacketSendTime = 0;
conn->deadlockCheckBeginTime = now;
while (doCheckExpiration || (conn->curBuff = getSndBuffer(conn)) == NULL)
{
int timeout = (doCheckExpiration ? 0 : computeTimeout(conn, retry));
if (pollAcks(transportStates, pEntry->txfd, timeout))
{
if (handleAcks(transportStates, pEntry))
{
/* We make sure that we deal with the stop messages
* only after we get a buffer. Otherwise, if the stop
* message is not for this connection, this will lead
* to an error for the following data sending of this
* connection.
*/
gotStops = true;
}
}
checkExceptions(transportStates, pEntry, conn, ++retry, timeout);
doCheckExpiration = false;
}
conn->pBuff = (uint8 *) conn->curBuff->pkt;
if (gotStops)
{
/* handling stop message will make some connection not active anymore */
handleStopMsgs(transportStates, pEntry, motionId);
gotStops = false;
if (!conn->stillActive)
return true;
}
/* reinitialize connection */
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
/* now we can copy the input to the new buffer */
memcpy(conn->pBuff + conn->msgSize, tcItem->chunk_data, tcItem->chunk_length);
conn->msgSize += length;
conn->tupleCount++;
return true;
}
/*
* SendEosUDP
* broadcast eos messages to receivers.
*
* See ml_ipc.h
*
*/
static void
SendEosUDP(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int motNodeID,
TupleChunkListItem tcItem)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
int i = 0;
int retry = 0;
int activeCount = 0;
int timeout = 0;
if (!transportStates)
{
elog(FATAL, "SendEosUDP: missing interconnect context.");
}
else if (!transportStates->activated && !transportStates->teardownActive)
{
elog(FATAL, "SendEosUDP: context and teardown inactive.");
}
#ifdef AMS_VERBOSE_LOGGING
elog(LOG, "entering seneosudp");
#endif
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
getChunkTransportState(transportStates, motNodeID, &pEntry);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d sending end-of-stream to slice%d",
GetQEIndex(), motNodeID, pEntry->recvSlice->sliceIndex);
/* we want to add our tcItem onto each of the outgoing buffers --
* this is guaranteed to leave things in a state where a flush is
* *required*.
*/
doBroadcast(mlStates, transportStates, pEntry, tcItem, NULL);
pEntry->sendingEos = true;
uint64 now = getCurrentTime();
/* now flush all of the buffers. */
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->stillActive)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "sent eos to route %d tuplecount %d seq %d flags 0x%x stillActive %s icId %d %d",
conn->route, conn->tupleCount, conn->conn_info.seq, conn->conn_info.flags, (conn->stillActive ? "true" : "false"), conn->conn_info.icId, conn->msgSize);
/* prepare this for transmit */
if (pEntry->sendingEos)
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
prepareXmit(conn);
/* place it into the send queue */
icBufferListAppend(&conn->sndQueue, conn->curBuff);
sendBuffers(transportStates, pEntry, conn);
conn->tupleCount = 0;
conn->msgSize = sizeof(conn->conn_info);
conn->curBuff = NULL;
conn->deadlockCheckBeginTime = now;
activeCount++;
}
}
/*
* Now waiting for acks from receivers.
*
* Note here waiting is done in a separate phase from the EOS sending phase
* to make the processing faster when a lot of connections are slow and have
* frequent packet losses. In fault injection tests, we found this.
*
*/
while (activeCount > 0)
{
activeCount = 0;
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
if (conn->stillActive)
{
retry = 0;
ic_control_info.lastPacketSendTime = 0;
/* wait until this queue is emptied */
while (icBufferListLength(&conn->unackQueue) > 0 || icBufferListLength(&conn->sndQueue) > 0)
{
timeout = computeTimeout(conn, retry);
if (pollAcks(transportStates, pEntry->txfd, timeout))
handleAcks(transportStates, pEntry);
checkExceptions(transportStates, pEntry, conn, ++retry, timeout);
if (retry >= MAX_TRY)
break;
}
}
if (icBufferListLength(&conn->unackQueue) == 0 && icBufferListLength(&conn->sndQueue) == 0)
{
conn->state = mcsEosSent;
conn->stillActive = false;
}
else
{
activeCount++;
}
}
}
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "SendEosUDP leaving, activeCount %d", activeCount);
if (Debug_print_execution_detail) {
instr_time time;
INSTR_TIME_SET_CURRENT(time);
elog(DEBUG1,"The time before quit SendEosUDP: %.3f ms, activeCount %d",
1000.0 * INSTR_TIME_GET_DOUBLE(time), activeCount);
}
}
/*
* doSendStopMessageUDP
* Send stop messages to all senders.
*/
static void
doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
int i;
if (!transportStates->activated)
return;
getChunkTransportState(transportStates, motNodeID, &pEntry);
Assert(pEntry);
/*
* Note: we're only concerned with receivers here.
*/
pthread_mutex_lock(&ic_control_info.lock);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect needs no more input from slice%d; notifying senders to stop.",
motNodeID);
for (i = 0; i < pEntry->numConns; i++)
{
conn = pEntry->conns + i;
/* Note here, the stillActive flag of a connection may have been
* set to false by markUDPConnInactive.
*/
if (conn->stillActive)
{
if (conn->conn_info.flags & UDPIC_FLAGS_EOS)
{
/* we have a queued packet that has EOS in it. We've acked it, so we're done */
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "do sendstop: already have queued EOS packet, we're done. node %d route %d", motNodeID, i);
conn->stillActive = false;
/* need to drop the queues in the teardown function. */
while (conn->pkt_q_size > 0)
{
putRxBufferAndSendAck(conn, NULL);
}
}
else
{
conn->stopRequested = true;
conn->conn_info.flags |= UDPIC_FLAGS_STOP;
/*
* The peer addresses for incoming connections will not be set until
* the first packet has arrived. However, when the lower slice does not have data to send,
* the corresponding peer address for the incoming connection will never be set.
* We will skip sending ACKs to those connections.
*/
if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6)
{
uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0;
sendAck(conn, UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, seq, seq);
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "sent stop message. node %d route %d seq %d", motNodeID, i, seq);
}
else
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "first packet did not arrive yet. don't sent stop message. node %d route %d", motNodeID, i);
}
}
}
}
pthread_mutex_unlock(&ic_control_info.lock);
}
/*
* formatSockAddr
* Format sockaddr.
*
* NOTE: Because this function can be called in a thread (rxThreadFunc),
* it must not use services such as elog, ereport, palloc/pfree and StringInfo.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*/
char *
formatSockAddr(struct sockaddr *sa, char* buf, int bufsize)
{
/* Save remote host:port string for error messages. */
if (sa->sa_family == AF_INET)
{
struct sockaddr_in * sin = (struct sockaddr_in *)sa;
uint32 saddr = ntohl(sin->sin_addr.s_addr);
snprintf(buf, bufsize, "%d.%d.%d.%d:%d",
(saddr >> 24)&0xff,
(saddr >> 16)&0xff,
(saddr >> 8)&0xff,
saddr&0xff,
ntohs(sin->sin_port));
}
#ifdef HAVE_IPV6
else if (sa->sa_family == AF_INET6)
{
char remote_port[32];
remote_port[0] = '\0';
buf[0] = '\0';
if (bufsize > 10)
{
buf[0] = '[';
buf[1] = '\0'; /* in case getnameinfo fails */
/*
* inet_ntop isn't portable.
* //inet_ntop(AF_INET6, &sin6->sin6_addr, buf, bufsize - 8);
*
* postgres has a standard routine for converting addresses to printable format,
* which works for IPv6, IPv4, and Unix domain sockets. I've changed this
* routine to use that, but I think the entire formatSockAddr routine could
* be replaced with it.
*/
int ret = pg_getnameinfo_all((const struct sockaddr_storage *)sa, sizeof(struct sockaddr_in6),
buf+1, bufsize-10,
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);
if (ret != 0)
{
write_log("getnameinfo returned %d: %s, and says %s port %s",ret,gai_strerror(ret),buf,remote_port);
/*
* Fall back to using our internal inet_ntop routine, which really is for inet datatype
* This is because of a bug in solaris, where getnameinfo sometimes fails
* Once we find out why, we can remove this
*/
snprintf(remote_port,sizeof(remote_port),"%d",((struct sockaddr_in6 *)sa)->sin6_port);
/*
* This is nasty: our internal inet_net_ntop takes PGSQL_AF_INET6, not AF_INET6, which
* is very odd... They are NOT the same value (even though PGSQL_AF_INET == AF_INET
*/
#define PGSQL_AF_INET6 (AF_INET + 1)
inet_net_ntop(PGSQL_AF_INET6, sa, sizeof(struct sockaddr_in6), buf+1, bufsize-10);
write_log("Our alternative method says %s]:%s",buf,remote_port);
}
buf += strlen(buf);
strcat(buf,"]");
buf++;
}
snprintf(buf, 8, ":%s", remote_port);
}
#endif
else
snprintf(buf, bufsize, "?host?:?port?");
return buf;
} /* formatSockAddr */
/*
* checkQDConnectionAlive
* Check whether QD connection is still alive. If not, report error.
*/
static void
checkQDConnectionAlive(void)
{
if (!dispatch_validate_conn(MyProcPort->sock))
{
if (Gp_role == GP_ROLE_EXECUTE)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error segment lost contact with master (recv)")));
else
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("Interconnect error master lost contact with client (recv)")));
}
}
/*
* getCurrentTime
* get current time
*
*/
static uint64
getCurrentTime(void)
{
struct timeval newTime;
int status = 1;
uint64 t = 0;
#if HAVE_LIBRT
/* Use clock_gettime to return monotonic time value. */
struct timespec ts;
status = clock_gettime(CLOCK_MONOTONIC, &ts);
newTime.tv_sec = ts.tv_sec;
newTime.tv_usec = ts.tv_nsec / 1000;
#endif
if (status != 0)
gettimeofday(&newTime, NULL);
t = ((uint64)newTime.tv_sec) * USECS_PER_SECOND + newTime.tv_usec;
return t;
}
/*
* putIntoUnackQueueRing
* Put the buffer into the ring.
*
* expTime - expiration time from now
*
*/
static void
putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now)
{
uint64 diff = now + expTime - uqr->currentTime;
int idx = 0;
if (diff >= UNACK_QUEUE_RING_LENGTH)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("putIntoUnackQueueRing:""now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime);
#endif
diff = UNACK_QUEUE_RING_LENGTH - 1;
}
else if (diff < TIMER_SPAN)
{
diff = TIMER_SPAN;
}
idx = (uqr->idx + diff/TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM;
#ifdef AMS_VERBOSE_LOGGING
write_log("PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT " (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d, nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot, uqr->idx, idx);
#endif
buf->unackQueueRingSlot = idx;
icBufferListAppend(&unack_queue_ring.slots[idx], buf);
}
/*
* handleDataPacket
* Handling the data packet.
*
*/
static bool
handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param)
{
if ((pkt->len == sizeof(icpkthdr)) && (pkt->flags & UDPIC_FLAGS_CAPACITY))
{
if (DEBUG1 >= log_min_messages)
write_log("status queuy message received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d", pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);
#ifdef AMS_VERBOSE_LOGGING
logPkt("STATUS QUERY MESSAGE", pkt);
#endif
setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
return false;
}
/*
* when we're not doing a full-setup on every
* statement, we've got to update the peer info --
* full setups do this at setup-time.
*/
/*
* Note the change here, for process start race and disordered message,
* if we do not fill in peer address, then we may send some acks to unknown address.
* Thus, the following condition is used.
*
*/
if (pkt->seq <= Gp_interconnect_queue_depth)
{
/* fill in the peer. Need to cast away "volatile". ugly */
memset((void *)&conn->peer, 0, sizeof(conn->peer));
memcpy((void *)&conn->peer, peer, *peerlen);
conn->peer_len = *peerlen;
conn->conn_info.dstListenerPort = pkt->dstListenerPort;
if (DEBUG2 >= log_min_messages)
write_log("received the head packets when eliding setup, pkt seq %d", pkt->seq);
}
/* data packet */
if (pkt->flags & UDPIC_FLAGS_EOS)
{
if (DEBUG3 >= log_min_messages)
write_log("received packet with EOS motid %d route %d seq %d",
pkt->motNodeId, conn->route, pkt->seq);
}
/*
* if we got a stop, but didn't request a stop --
* ignore, this is a startup blip: we must have
* acked with a stop -- we don't want to do
* anything further with the stop-message if we
* didn't request a stop!
*
* this is especially important after
* eliding setup is enabled.
*/
if (!conn->stopRequested && (pkt->flags & UDPIC_FLAGS_STOP))
{
if (pkt->flags & UDPIC_FLAGS_EOS)
{
write_log("non-requested stop flag, EOS! seq %d, flags 0x%x", pkt->seq, pkt->flags);
}
return false;
}
if (conn->stopRequested && conn->stillActive)
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG && DEBUG5 >= log_min_messages)
write_log("rx_thread got packet on active connection marked stopRequested. "
"(flags 0x%x) node %d route %d pkt seq %d conn seq %d",
pkt->flags, pkt->motNodeId, conn->route, pkt->seq, conn->conn_info.seq);
/* can we update stillActive ? */
if (DEBUG2 >= log_min_messages)
if (!(pkt->flags & UDPIC_FLAGS_STOP) &&
!(pkt->flags & UDPIC_FLAGS_EOS))
write_log("stop requested but no stop flag on return packet ?!");
if (pkt->flags & UDPIC_FLAGS_EOS)
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
if (conn->conn_info.seq < pkt->seq)
conn->conn_info.seq = pkt->seq; /* note here */
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);
/* we only update stillActive if eos has been sent by peer. */
if (pkt->flags & UDPIC_FLAGS_EOS)
{
if (DEBUG2 >= log_min_messages)
write_log("stop requested and acknowledged by sending peer");
conn->stillActive = false;
}
return false;
}
/* dropped ack or timeout */
if (pkt->seq < conn->conn_info.seq)
{
ic_statistics.duplicatedPktNum++;
if (DEBUG3 >= log_min_messages)
write_log("dropped ack ? ignored data packet w/ cmd %d conn->cmd %d node %d route %d seq %d expected %d flags 0x%x",
pkt->icId, conn->conn_info.icId, pkt->motNodeId,
conn->route, pkt->seq, conn->conn_info.seq, pkt->flags);
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
return false;
}
/* sequence number is correct */
if (!conn->stillActive)
{
/* peer may have dropped ack */
if (gp_log_interconnect >= GPVARS_VERBOSITY_VERBOSE &&
DEBUG1 >= log_min_messages)
write_log("received on inactive connection node %d route %d (seq %d pkt->seq %d)",
pkt->motNodeId, conn->route, conn->conn_info.seq, pkt->seq);
if (conn->conn_info.seq < pkt->seq)
conn->conn_info.seq = pkt->seq;
setAckSendParam(param, conn, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);
return false;
}
/* headSeq is the seq for the head packet. */
uint32 headSeq = conn->conn_info.seq - conn->pkt_q_size;
if ((conn->pkt_q_size == Gp_interconnect_queue_depth) || (pkt->seq - headSeq >= Gp_interconnect_queue_depth))
{
/*
* Error case: NO RX SPACE or out of range pkt
* This indicates a bug.
*/
logPkt("Interconnect error: received a packet when the queue is full ", pkt);
ic_statistics.disorderedPktNum++;
conn->stat_count_dropped++;
return false;
}
/* put the packet at the his position */
bool toWakeup = false;
int pos = (pkt->seq - 1) % Gp_interconnect_queue_depth;
if (conn->pkt_q[pos] == NULL)
{
conn->pkt_q[pos] = (uint8 *)pkt;
if (pos == conn->pkt_q_head)
{
#ifdef AMS_VERBOSE_LOGGING
write_log("SAVE pkt at QUEUE HEAD [seq %d] for node %d route %d, queue head seq %d, queue size %d, queue head %d queue tail %d", pkt->seq, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
toWakeup = true;
}
if (pos == conn->pkt_q_tail)
{
/* move the queue tail */
for(;conn->pkt_q[conn->pkt_q_tail] != NULL && conn->pkt_q_size < Gp_interconnect_queue_depth;)
{
conn->pkt_q_size++;
conn->pkt_q_tail = (conn->pkt_q_tail + 1) % Gp_interconnect_queue_depth;
conn->conn_info.seq++;
}
/* set the EOS flag */
if (((icpkthdr *)(conn->pkt_q[(conn->pkt_q_tail + Gp_interconnect_queue_depth - 1) % Gp_interconnect_queue_depth]))->flags & UDPIC_FLAGS_EOS)
{
conn->conn_info.flags |= UDPIC_FLAGS_EOS;
if (DEBUG1 >= log_min_messages)
write_log("RX_THREAD: the packet with EOS flag is available for access in the queue for route %d", conn->route);
}
/* ack data packet */
setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
#ifdef AMS_VERBOSE_LOGGING
write_log("SAVE conn %p pkt at QUEUE TAIL [seq %d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d queue tail %d", conn, pkt->seq, pos, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
#endif
}
else /* deal with out-of-order packet */
{
if (DEBUG1 >= log_min_messages)
write_log("SAVE conn %p OUT-OF-ORDER pkt [seq %d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d queue tail %d", conn, pkt->seq, pos, pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
/* send an ack for out-of-order packet */
ic_statistics.disorderedPktNum++;
handleDisorderPacket(conn, pos, headSeq + conn->pkt_q_size, pkt);
}
}
else /* duplicate pkt */
{
if (DEBUG1 >= log_min_messages)
write_log("DUPLICATE pkt [seq %d], [head seq] %d, queue size %d, queue head %d queue tail %d", pkt->seq, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
setAckSendParam(param, conn, UDPIC_FLAGS_DUPLICATE | conn->conn_info.flags, pkt->seq, conn->conn_info.seq - 1);
ic_statistics.duplicatedPktNum++;
return false;
}
/* Was the main thread waiting for something ? */
if (rx_control_info.mainWaitingState.waiting &&
rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId &&
rx_control_info.mainWaitingState.waitingQuery == pkt->icId && toWakeup)
{
if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
{
if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
rx_control_info.mainWaitingState.reachRoute = conn->route;
}
else if (rx_control_info.mainWaitingState.waitingRoute == conn->route)
{
if (DEBUG2 >= log_min_messages)
write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute);
rx_control_info.mainWaitingState.reachRoute = conn->route;
}
/* WAKE MAIN THREAD HERE */
#if defined(__darwin__) && !defined(IC_USE_PTHREAD_SYNCHRONIZATION)
udpSignal(&ic_control_info.usig);
#else
pthread_cond_signal(&ic_control_info.cond);
#endif
}
return true;
}
/*
* rxThreadFunc
* Main function of the receive background thread.
*
* NOTE: This function MUST NOT contain elog or ereport statements.
* elog is NOT thread-safe. Developers should instead use something like:
*
* if (DEBUG3 >= log_min_messages)
* write_log("my brilliant log statement here.");
*
* NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
*/
static void *
rxThreadFunc(void *arg)
{
icpkthdr *pkt=NULL;
bool skip_poll=false;
gp_set_thread_sigmasks();
for (;;)
{
struct pollfd nfd;
int n;
/* check shutdown condition*/
if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0))
{
if (DEBUG1 >= log_min_messages)
write_log("udp-ic: rx-thread shutting down");
break;
}
/* Try to get a buffer */
if (pkt == NULL)
{
pthread_mutex_lock(&ic_control_info.lock);
pkt = getRxBuffer(&rx_buffer_pool);
pthread_mutex_unlock(&ic_control_info.lock);
if (pkt == NULL)
{
setRxThreadError(ENOMEM);
continue;
}
}
if (!skip_poll)
{
/* Do we have inbound traffic to handle ?*/
nfd.fd = UDP_listenerFd;
nfd.events = POLLIN;
n = poll(&nfd, 1, RX_THREAD_POLL_TIMEOUT);
if (n < 0)
{
if (errno == EINTR)
continue;
/*
* ERROR case: if simply break out the loop here, there will be a hung here,
* since main thread will never be waken up, and senders will not
* get responses anymore.
*
* Thus, we set an error flag, and let main thread to report an error.
*/
setRxThreadError(errno);
continue;
}
if (n == 0)
continue;
}
if (skip_poll || (n == 1 && (nfd.events & POLLIN)))
{
/* we've got something interesting to read */
/* handle incoming */
/* ready to read on our socket */
MotionConn *conn = NULL;
int read_count = 0;
struct sockaddr_storage peer;
socklen_t peerlen;
peerlen = sizeof(peer);
read_count = recvfrom(UDP_listenerFd, (char *)pkt, Gp_max_packet_size, 0,
(struct sockaddr *)&peer, &peerlen);
if (DEBUG5 >= log_min_messages)
write_log("received inbound len %d", read_count);
if (read_count < 0)
{
skip_poll = false;
if (errno == EWOULDBLOCK || errno == EINTR)
continue;
write_log("Interconnect error: recvfrom (%d)", errno);
/*
* ERROR case: if simply break out the loop here, there will be a hung here,
* since main thread will never be waken up, and senders will not
* get responses anymore.
*
* Thus, we set an error flag, and let main thread to report an error.
*/
setRxThreadError(errno);
continue;
}
if (read_count < sizeof(icpkthdr))
{
if (DEBUG1 >= log_min_messages)
write_log("Interconnect error: short conn receive (%d)", read_count);
continue;
}
/* when we get a "good" recvfrom() result, we can skip poll() until we get a bad one. */
skip_poll = true;
/* length must be >= 0 */
if (pkt->len < 0)
{
if (DEBUG3 >= log_min_messages)
write_log("received inbound with negative length");
continue;
}
if (pkt->len != read_count)
{
if (DEBUG3 >= log_min_messages)
write_log("received inbound packet [%d], short: read %d bytes, pkt->len %d", pkt->seq, read_count, pkt->len);
continue;
}
/*
* check the CRC of the payload.
*/
if (gp_interconnect_full_crc)
{
if (!checkCRC(pkt))
{
gp_atomic_add_32(&ic_statistics.crcErrors, 1);
if (DEBUG2 >= log_min_messages)
write_log("received network data error, dropping bad packet, user data unaffected.");
continue;
}
}
#ifdef AMS_VERBOSE_LOGGING
logPkt("GOT MESSAGE", pkt);
#endif
AckSendParam param;
memset(&param, 0, sizeof(AckSendParam));
/*
* Get the connection for the pkt.
*
* The connection hash table should be locked until
* finishing the processing of the packet to avoid
* the connection addition/removal from the hash table
* during the mean time.
*/
pthread_mutex_lock(&ic_control_info.lock);
conn = findConnByHeader(&ic_control_info.connHtab, pkt);
if (conn != NULL)
{
/* Handling a regular packet */
if (handleDataPacket(conn, pkt, &peer, &peerlen, &param))
pkt = NULL;
ic_statistics.recvPktNum++;
}
else
{
/*
* There may have two kinds of Mismatched packets:
* a) Past packets from previous command after I was torn down
* b) Future packets from current command before my connections are built.
*
* The handling logic is to "Ack the past and Nak the future".
*/
if ((pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER) == 0)
{
if (DEBUG1 >= log_min_messages)
write_log("mismatched packet received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d", pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);
#ifdef AMS_VERBOSE_LOGGING
logPkt("Got a Mismatched Packet", pkt);
#endif
if (handleMismatch(pkt, &peer, peerlen))
pkt = NULL;
ic_statistics.mismatchNum++;
}
}
pthread_mutex_unlock(&ic_control_info.lock);
/* real ack sending is after lock release to decrease the lock holding time. */
if (param.msg.len != 0)
sendAckWithParam(&param);
}
/* pthread_yield(); */
}
/* Before retrun, we release the packet. */
if (pkt)
{
pthread_mutex_lock(&ic_control_info.lock);
freeRxBuffer(&rx_buffer_pool, pkt);
pkt = NULL;
pthread_mutex_unlock(&ic_control_info.lock);
}
/* nothing to return */
return NULL;
}
/*
* handleMismatch
* If the mismatched packet is from an old connection, we may need to
* send an acknowledgment.
*
* We are called with the receiver-lock held, and we never release it.
*
* For QD:
* 1) Not in hashtable : NAK it/Do nothing
* Causes: a) Start race
* b) Before the entry for the ic instance is inserted, an error happened.
* c) From past transactions: should no happen.
* 2) Active in hashtable : NAK it/Do nothing
* Causes: a) Error reported after the entry is inserted, and connections are
* not inserted to the hashtable yet, and before teardown is called.
* 3) Inactive in hashtable: ACK it (with stop)
* Causes: a) Normal execution: after teardown is called on current command.
* b) Error case, 2a) after teardown is called.
* c) Normal execution: from past history transactions (should not happen).
*
* For QE:
* 1) pkt->id > Gp_interconnect_id : NAK it/Do nothing
* Causes: a) Start race
* b) Before Gp_interconnect_id is assigned to correct value, an error happened.
* 2) lastTornIcId < pkt->id == Gp_interconnect_id: NAK it/Do nothing
* Causes: a) Error reported after Gp_interconnect_id is set, and connections are
* not inserted to the hashtable yet, and before teardown is called.
* 3) lastTornIcId == pkt->id == Gp_interconnect_id: ACK it (with stop)
* Causes: a) Normal execution: after teardown is called on current command
* 4) pkt->id < Gp_interconnect_id: NAK it/Do nothing/ACK it.
* Causes: a) Should not happen.
*
*/
static bool
handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
{
bool cached = false;
/*
* we want to ack old packets; but *must* avoid acking connection requests:
*
* "ACK the past, NAK the future" explicit NAKs aren't necessary, we just don't
* want to ACK future packets, that confuses everyone.
*/
if (pkt->seq > 0 && pkt->sessionId == gp_session_id)
{
bool need_ack=false;
uint8 ack_flags=0;
/*
* The QD-backends can't use a counter, they've potentially got multiple instances (one for each active cursor)
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
struct CursorICHistoryEntry *p;
p = getCursorIcEntry(&rx_control_info.cursorHistoryTable, pkt->icId);
if (p)
{
if (p->status == 0)
{
/* Torn down. Ack the past. */
need_ack = true;
}
else /* p->status == 1 */
{
/*
* Not torn down yet.
* It happens when an error (out-of-memory, network error...) occurred
* after the cursor entry is inserted into the table in interconnect setup process.
* The peer will be canceled.
*/
if (DEBUG1 >= log_min_messages)
write_log("GOT A MISMATCH PACKET WITH ID %d HISTORY THINKS IT IS ACTIVE", pkt->icId);
return cached; /* ignore, no ack */
}
}
else
{
if (DEBUG1 >= log_min_messages)
write_log("GOT A MISMATCH PACKET WITH ID %d HISTORY HAS NO RECORD", pkt->icId);
/*
* No record means that two possibilities.
* 1) It is from the future. It is due to startup race. We do not ack future packets
* 2) Before the entry for the ic instance is inserted, an error happened. We do not
* ack for this case too. The peer will be canceled.
*/
ack_flags = UDPIC_FLAGS_NAK;
need_ack = false;
if (gp_interconnect_cache_future_packets)
{
cached = cacheFuturePacket(pkt, peer, peer_len);
}
}
}
/* The QEs get to use a simple counter. */
else if (Gp_role == GP_ROLE_EXECUTE)
{
if (gp_interconnect_id >= pkt->icId)
{
need_ack = true;
/*
* We want to "ACK the past, but NAK the future."
*
* handleAck() will retransmit.
*/
if (pkt->seq >= 1 && pkt->icId > rx_control_info.lastTornIcId)
{
ack_flags = UDPIC_FLAGS_NAK;
need_ack = false;
}
}
else /* gp_interconnect_id < pkt->icId, from the future */
{
if (gp_interconnect_cache_future_packets)
{
cached = cacheFuturePacket(pkt, peer, peer_len);
}
}
}
if (need_ack)
{
MotionConn dummyconn;
char buf[128]; /* numeric IP addresses shouldn't exceed about 50 chars, but play it safe */
memcpy(&dummyconn.conn_info, pkt, sizeof(icpkthdr));
dummyconn.peer = *peer;
dummyconn.peer_len = peer_len;
dummyconn.conn_info.flags |= ack_flags;
if (DEBUG1 >= log_min_messages)
write_log("ACKING PACKET WITH FLAGS: pkt->seq %d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]",
pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, gp_interconnect_id);
formatSockAddr((struct sockaddr *)&dummyconn.peer, buf, sizeof(buf));
if (DEBUG1 >= log_min_messages)
write_log("ACKING PACKET TO %s", buf);
if ((ack_flags & UDPIC_FLAGS_NAK) == 0)
{
ack_flags |= UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_RECEIVER_TO_SENDER;
}
else
{
ack_flags |= UDPIC_FLAGS_RECEIVER_TO_SENDER;
}
/*
* There are two cases, we may need to send a response to sender here.
* One is start race and the other is receiver becomes idle.
*
* ack_flags here can take two possible values
* 1) UDPIC_FLAGS_NAK | UDPIC_FLAGS_RECEIVER_TO_SENDER (for start race)
* 2) UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_RECEIVER_TO_SENDER (for idle receiver)
*
* The final flags in the packet may take some extra bits such as
* 1) UDPIC_FLAGS_STOP
* 2) UDPIC_FLAGS_EOS
* 3) UDPIC_FLAGS_CAPACITY
* which are from original packet
*/
sendAck(&dummyconn, ack_flags | dummyconn.conn_info.flags, dummyconn.conn_info.seq, dummyconn.conn_info.seq);
}
}
else
{
if (DEBUG1 >= log_min_messages)
write_log("dropping packet from command-id %d seq %d (my cmd %d)", pkt->icId, pkt->seq, gp_interconnect_id);
}
return cached;
}
/*
* cacheFuturePacket
* Cache the future packets during the setupUdpInterconnect.
*
* Return true if packet is cached, otherwise false
*/
static bool
cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
{
MotionConn *conn = NULL;
conn = findConnByHeader(&ic_control_info.startupCacheHtab, pkt);
if (conn == NULL)
{
conn = malloc(sizeof(MotionConn));
if (conn == NULL)
{
setRxThreadError(errno);
return false;
}
memset((void *) conn, 0, sizeof(MotionConn));
memcpy(&conn->conn_info, pkt, sizeof(icpkthdr));
conn->pkt_q_size = Gp_interconnect_queue_depth;
conn->pkt_q = (uint8 **) malloc(Gp_interconnect_queue_depth * sizeof(uint8 *));
if (conn->pkt_q == NULL)
{
/* malloc failed. */
free(conn);
setRxThreadError(errno);
return false;
}
/* We only use the array to store cached packets. */
memset(conn->pkt_q, 0, Gp_interconnect_queue_depth * sizeof(uint8 *));
/* Put connection to the hashtable. */
if (!connAddHash(&ic_control_info.startupCacheHtab, conn))
{
free(conn->pkt_q);
free(conn);
setRxThreadError(errno);
return false;
}
/* Setup the peer sock information. */
memcpy(&conn->peer, peer, peer_len);
conn->peer_len = peer_len;
}
/* Reject packets with invalid sequence numbers and packets which have been cached before. */
if (pkt->seq > conn->pkt_q_size || pkt->seq == 0 || conn->pkt_q[pkt->seq - 1] != NULL)
return false;
conn->pkt_q[pkt->seq - 1] = (uint8 *) pkt;
rx_buffer_pool.maxCount++;
ic_statistics.startupCachedPktNum++;
return true;
}
/*
* cleanupStartupCache
* Clean the startup cache.
*/
static void
cleanupStartupCache()
{
ConnHtabBin *bin = NULL;
MotionConn *cachedConn = NULL;
icpkthdr *pkt = NULL;
int i = 0;
int j = 0;
for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
{
bin = ic_control_info.startupCacheHtab.table[i];
while (bin)
{
cachedConn = bin->conn;
for (j = 0; j < cachedConn->pkt_q_size; j++)
{
pkt = (icpkthdr *) cachedConn->pkt_q[j];
if (pkt == NULL)
continue;
rx_buffer_pool.maxCount--;
putRxBufferToFreeList(&rx_buffer_pool, pkt);
cachedConn->pkt_q[j] = NULL;
}
bin = bin->next;
connDelHash(&ic_control_info.startupCacheHtab, cachedConn);
/* MPP-19981
* free the cached connections; otherwise memory leak
* would be introduced.
*/
free(cachedConn->pkt_q);
free(cachedConn);
}
}
}
/* The following functions are facility methods for debugging.
* They are quite useful when there are a large number of connections.
* These functions can be called from gdb to output internal information to a file.
*/
/*
* dumpICBufferList_Internal
* Dump a buffer list.
*/
static void
dumpICBufferList_Internal(ICBufferList *list, FILE *ofile)
{
ICBufferLink *bufLink = list->head.next;
int len = list->length;
int i = 0;
fprintf(ofile, "List Length %d\n", len);
while (bufLink != &list->head && len > 0)
{
ICBuffer *buf = (list->type == ICBufferListType_Primary ? GET_ICBUFFER_FROM_PRIMARY(bufLink)
: GET_ICBUFFER_FROM_SECONDARY(bufLink));
fprintf(ofile, "Node %d, linkptr %p ", i++, bufLink);
fprintf(ofile, "Packet Content [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d\n",
buf->pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
buf->pkt->seq, buf->pkt->extraSeq, buf->pkt->motNodeId, buf->pkt->crc, buf->pkt->len,
buf->pkt->srcContentId, buf->pkt->dstContentId,
buf->pkt->srcPid, buf->pkt->dstPid,
buf->pkt->srcListenerPort, buf->pkt->dstListenerPort,
buf->pkt->sendSliceIndex, buf->pkt->recvSliceIndex,
buf->pkt->sessionId, buf->pkt->icId,
buf->pkt->flags);
bufLink = bufLink->next;
len--;
}
}
/*
* dumpICBufferList
* Dump a buffer list.
*/
void
dumpICBufferList(ICBufferList *list, const char *fname)
{
FILE *ofile = fopen(fname, "w+");
dumpICBufferList_Internal(list, ofile);
fclose(ofile);
}
/*
* dumpUnackQueueRing
* Dump an unack queue ring.
*/
void
dumpUnackQueueRing(const char *fname)
{
FILE *ofile = fopen(fname, "w+");
int i;
fprintf(ofile, "UnackQueueRing: currentTime " UINT64_FORMAT ", idx %d numOutstanding %d numSharedOutstanding %d\n",
unack_queue_ring.currentTime, unack_queue_ring.idx,
unack_queue_ring.numOutStanding, unack_queue_ring.numSharedOutStanding);
fprintf(ofile, "==================================\n");
for (i = 0; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
{
if (icBufferListLength(&unack_queue_ring.slots[i]) > 0)
{
dumpICBufferList_Internal(&unack_queue_ring.slots[i], ofile);
}
}
fclose(ofile);
}
/*
* dumpConnections
* Dump connections.
*/
void
dumpConnections(ChunkTransportStateEntry *pEntry, const char *fname)
{
int i, j;
MotionConn *conn;
FILE *ofile = fopen(fname, "w+");
fprintf(ofile, "Entry connections: conn num %d \n", pEntry->numPrimaryConns);
fprintf(ofile, "==================================\n");
for (i = 0; i < pEntry->numPrimaryConns; i++)
{
conn = &pEntry->conns[i];
fprintf(ofile, "conns[%d] motNodeId=%d: remoteContentId=%d pid=%d sockfd=%d remote=%s local=%s "
"capacity=%d sentSeq=%d receivedAckSeq=%d consumedSeq=%d rtt=" UINT64_FORMAT
" dev=" UINT64_FORMAT " deadlockCheckBeginTime=" UINT64_FORMAT " route=%d msgSize=%d msgPos=%p"
" recvBytes=%d tupleCount=%d waitEOS=%d stillActive=%d stopRequested=%d "
"state=%d\n",
i, pEntry->motNodeId,
conn->remoteContentId,
conn->cdbProc ? conn->cdbProc->pid : 0,
conn->sockfd,
conn->remoteHostAndPort,
conn->localHostAndPort,
conn->capacity, conn->sentSeq, conn->receivedAckSeq, conn->consumedSeq,
conn->rtt, conn->dev, conn->deadlockCheckBeginTime, conn->route, conn->msgSize, conn->msgPos,
conn->recvBytes, conn->tupleCount, conn->waitEOS, conn->stillActive, conn->stopRequested,
conn->state);
fprintf(ofile, "conn_info [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d\n",
conn->conn_info.flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
conn->conn_info.seq, conn->conn_info.extraSeq, conn->conn_info.motNodeId, conn->conn_info.crc, conn->conn_info.len,
conn->conn_info.srcContentId, conn->conn_info.dstContentId,
conn->conn_info.srcPid, conn->conn_info.dstPid,
conn->conn_info.srcListenerPort, conn->conn_info.dstListenerPort,
conn->conn_info.sendSliceIndex, conn->conn_info.recvSliceIndex,
conn->conn_info.sessionId, conn->conn_info.icId,
conn->conn_info.flags);
if (!ic_control_info.isSender)
{
fprintf(ofile, "pkt_q_size=%d pkt_q_head=%d pkt_q_tail=%d pkt_q=%p\n", conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail, conn->pkt_q);
for(j = 0; j < Gp_interconnect_queue_depth; j++)
{
if (conn->pkt_q != NULL && conn->pkt_q[j] != NULL)
{
icpkthdr *pkt = (icpkthdr *)conn->pkt_q[j];
fprintf(ofile, "Packet (pos %d) Info [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
"srcContentId %d dstDesContentId %d "
"srcPid %d dstPid %d "
"srcListenerPort %d dstListernerPort %d "
"sendSliceIndex %d recvSliceIndex %d "
"sessionId %d icId %d "
"flags %d\n",
j,
pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
pkt->srcContentId, pkt->dstContentId,
pkt->srcPid, pkt->dstPid,
pkt->srcListenerPort, pkt->dstListenerPort,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->sessionId, pkt->icId,
pkt->flags);
}
}
}
if (ic_control_info.isSender)
{
fprintf(ofile, "sndQueue ");
dumpICBufferList_Internal(&conn->sndQueue, ofile);
fprintf(ofile, "unackQueue ");
dumpICBufferList_Internal(&conn->unackQueue, ofile);
}
fprintf(ofile, "\n");
}
fclose(ofile);
}