blob: 77c08ee6e29fdc77f1ac64a432c54999534c5666 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* ic_internal.h
* Motion IPC Layer.
*
* IDENTIFICATION
* contrib/interconnect/ic_internal.h
*
*-------------------------------------------------------------------------
*/
#ifndef INTER_CONNECT_INTERNAL_H
#define INTER_CONNECT_INTERNAL_H
#include "tcp/ic_tcp.h"
#include "udp/ic_udpifc.h"
#include "cdb/cdbinterconnect.h"
#define CONTAINER_OF(ptr, type, member) \
({ \
const typeof( ((type *)0)->member ) *__member_ptr = (ptr); \
(type *)( (char *)__member_ptr - offsetof(type,member) ); \
})
typedef enum MotionConnState
{
mcsNull,
mcsAccepted,
mcsSetupOutgoingConnection,
mcsConnecting,
mcsRecvRegMsg,
mcsSendRegMsg,
mcsStarted,
mcsEosSent
} MotionConnState;
/*
* Structure used for keeping track of a pt-to-pt connection between two
* Cdb Entities (either QE or QD).
*/
typedef struct MotionConn
{
/* socket file descriptor. */
int sockfd;
/* pointer to the data buffer. */
uint8 *pBuff;
/* size of the message in the buffer, if any. */
int32 msgSize;
/* position of message inside of buffer, "cursor" pointer */
uint8 *msgPos;
/*
* recv bytes: we can have more than one message/message fragment in recv
* queue at once
*/
int32 recvBytes;
int tupleCount;
/*
* false means 1) received a stop message and has handled it. 2) received
* EOS message or sent out EOS message 3) received a QueryFinishPending
* notify and has handled it.
*/
bool stillActive;
/*
* used both by motion sender and motion receiver
*
* sender: true means receiver don't need to consume tuples any more,
* sender is also responsible to send stop message to its senders.
*
* receiver: true means have sent out a stop message to its senders. The
* stop message might be lost, stopRequested can also tell sender that no
* more data needed in the ack message.
*/
bool stopRequested;
MotionConnState state;
struct CdbProcess *cdbProc;
int remoteContentId;
char remoteHostAndPort[128]; /* Numeric IP addresses should never
* be longer than about 50 chars, but
* play it safe */
/*
* used by the sender.
*
* the typmod of last sent record type in current connection,
* if the connection is for broadcasting then we only check
* and update this attribute on connection 0.
*/
int32 sent_record_typmod;
/*
* used by the receiver.
*
* all the remap information.
*/
TupleRemapper *remapper;
} MotionConn;
typedef struct MotionConnUDP
{
struct MotionConn mConn;
/* send side queue for packets to be sent */
ICBufferList sndQueue;
int capacity;
/* seq already sent */
uint32 sentSeq;
/* ack of this seq and packets with smaller seqs have been received */
uint32 receivedAckSeq;
/* packets with this seq or smaller seqs have been consumed */
uint32 consumedSeq;
uint64 rtt;
uint64 dev;
uint64 deadlockCheckBeginTime;
ICBuffer *curBuff;
/*
* send side unacked packet queue. Since it is often accessed at the same
* time with unack queue ring, it is protected with unqck queue ring lock.
*/
ICBufferList unackQueue;
uint16 route;
struct icpkthdr conn_info;
struct sockaddr_storage peer; /* Allow for IPv4 or IPv6 */
socklen_t peer_len; /* And remember the actual length */
/* a queue of maximum length Gp_interconnect_queue_depth */
int pkt_q_capacity; /* max capacity of the queue */
int pkt_q_size; /* number of packets in the queue */
int pkt_q_head;
int pkt_q_tail;
uint8 **pkt_q;
uint64 stat_total_ack_time;
uint64 stat_count_acks;
uint64 stat_max_ack_time;
uint64 stat_min_ack_time;
uint64 stat_count_resent;
uint64 stat_max_resent;
uint64 stat_count_dropped;
} MotionConnUDP;
typedef struct MotionConnTCP
{
struct MotionConn mConn;
uint64 wakeup_ms;
char localHostAndPort[128];
} MotionConnTCP;
/*
* Used to organize all of the information for a given motion node.
*/
typedef struct ChunkTransportStateEntry
{
int motNodeId;
bool valid;
/* Connection array
*
* MUST pay attention: use getMotionConn to get MotionConn.
* must not use `->conns[index]` to get MotionConn. Because the struct
* MotionConn is a base structure for MotionConnTCP and
* MotionConnUDP. After connection setup, the `conns` will be fill
* with MotionConnUDP/MotionConnTCP, but the pointer still is
* MotionConn which should use `CONTAINER_OF` to get the real object.
*/
MotionConn *conns;
int numConns;
int scanStart;
/*
* used for receiving. to select() from a set of interesting MotionConns
* to see when data is ready to be read. When the incoming connections
* are established, read interest is turned on. It is turned off when an
* EOS (End of Stream) message is read.
*/
mpp_fd_set readSet;
/* slice table entries */
struct ExecSlice *sendSlice;
struct ExecSlice *recvSlice;
} ChunkTransportStateEntry;
typedef struct ChunkTransportStateEntryTCP
{
ChunkTransportStateEntry entry;
/* highest file descriptor in the readSet. */
int highReadSock;
} ChunkTransportStateEntryTCP;
typedef struct ChunkTransportStateEntryUDP
{
ChunkTransportStateEntry entry;
/* setup info */
int txfd;
int txfd_family;
unsigned short txport;
bool sendingEos;
/* Statistics info for this motion on the interconnect level */
uint64 stat_total_ack_time;
uint64 stat_count_acks;
uint64 stat_max_ack_time;
uint64 stat_min_ack_time;
uint64 stat_count_resent;
uint64 stat_max_resent;
uint64 stat_count_dropped;
} ChunkTransportStateEntryUDP;
#define GetMotionConn(pEntry, MotionConnType, offset, cconn) \
Assert((pEntry) != NULL); \
if (offset >= 0 && offset < (pEntry)->numConns) { \
*(cconn) = &((MotionConnType *)(pEntry)->conns)[offset].mConn;\
} else { \
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), \
errmsg("Interconnect Error: Unexpected Motion conn offset: %ld (size %d). This means" \
" a motion conn that wasn't setup is requesting interconnect" \
" resources.", (unsigned long int)(offset), (pEntry)->numConns))); \
}
#define getMotionConn(pEntry, offset, cconn) \
do { \
if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_TCP || \
CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_PROXY) { \
GetMotionConn(pEntry, MotionConnTCP, offset, cconn) \
} else if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_UDPIFC) { \
GetMotionConn(pEntry, MotionConnUDP, offset, cconn) \
} else { \
Assert(false); \
} \
Assert(*cconn); \
} while (0)
#define allocMotionConns(pEntry) \
do { \
Assert((pEntry) != NULL); \
if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_TCP || \
CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_PROXY) { \
(pEntry)->conns = palloc0((pEntry)->numConns * sizeof(MotionConnTCP)); \
} else if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_UDPIFC) { \
(pEntry)->conns = palloc0((pEntry)->numConns * sizeof(MotionConnUDP)); \
} else { \
Assert(false); \
} \
} while (0)
/*
* Must used getChunkTransportState/getChunkTransportStateNoValid to fill ChunkTransportStateEntry
* Cause is a ChunkTransportStateEntry array in ChunkTransportState
*/
#define GetChunkTransportState(transportState, ChunkTransportStateEntryType, motNodeID, ppEntry) \
Assert((transportState) != NULL); \
if ((motNodeID) > 0 && \
(transportState) && \
(motNodeID) <= (transportState)->size && \
((ChunkTransportStateEntryType *)(transportState)->states)[(motNodeID)-1].entry.motNodeId == (motNodeID) && \
((ChunkTransportStateEntryType *)(transportState)->states)[(motNodeID)-1].entry.valid) \
{ \
*(ppEntry) = &((ChunkTransportStateEntryType *)(transportState)->states)[(motNodeID) - 1].entry; \
} \
else \
{ \
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), \
errmsg("Interconnect Error: Unexpected Motion Node Id: %d (size %d). This means" \
" a motion node that wasn't setup is requesting interconnect" \
" resources.", (motNodeID), (transportState)->size))); \
/* not reached */ \
}
#define getChunkTransportState(transportState, motNodeID, ppEntry) \
do { \
if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_TCP || \
CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_PROXY) { \
GetChunkTransportState(transportState, ChunkTransportStateEntryTCP,motNodeID, ppEntry) \
} else if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_UDPIFC) { \
GetChunkTransportState(transportState, ChunkTransportStateEntryUDP,motNodeID, ppEntry) \
} else { \
Assert(false); \
}\
} while (0)
#define GetChunkTransportStateNoValid(transportState, ChunkTransportStateEntryType, motNodeID, ppEntry) \
Assert((transportState) != NULL); \
if ((motNodeID) > 0 && \
(transportState) && \
(motNodeID) <= (transportState)->size) \
{ \
*(ppEntry) = &((ChunkTransportStateEntryType *)(transportState)->states)[(motNodeID) - 1].entry; \
} \
else \
{ \
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), \
errmsg("Interconnect Error: Unexpected Motion Node Id: %d (size %d). This means" \
" a motion node that wasn't setup is requesting interconnect" \
" resources.", (motNodeID), (transportState)->size))); \
/* not reached */ \
}
#define getChunkTransportStateNoValid(transportState, motNodeID, ppEntry) \
do { \
if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_TCP || \
CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_PROXY) { \
GetChunkTransportStateNoValid(transportState, ChunkTransportStateEntryTCP,motNodeID, ppEntry) \
} else if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_UDPIFC) { \
GetChunkTransportStateNoValid(transportState, ChunkTransportStateEntryUDP,motNodeID, ppEntry) \
} else { \
Assert(false); \
} \
} while (0)
#endif // INTER_CONNECT_INTERNAL_H