blob: d629eb096985d043a6030f55c8c4fa88c5aab6a1 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* ic_common.c
* Interconnect code shared between UDP, and TCP IPC Layers.
*
* IDENTIFICATION
* contrib/interconnect/ic_common.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "ic_common.h"
#include "ic_modules.h"
#include "common/ip.h"
#include "nodes/execnodes.h" /* ExecSlice, SliceTable */
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp.h"
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <netinet/in.h>
static interconnect_handle_t * open_interconnect_handles;
static bool interconnect_resowner_callback_registered;
/*=========================================================================
* VISIBLE FUNCTIONS
*/
/* See ml_ipc.h */
bool
SendTupleChunkToAMS(ChunkTransportState * transportStates,
int16 motNodeID,
int16 targetRoute,
TupleChunkListItem tcItem)
{
int i,
recount = 0;
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
TupleChunkListItem currItem;
if (!transportStates)
elog(FATAL, "SendTupleChunkToAMS: no transport-states.");
if (!transportStates->activated)
elog(FATAL, "SendTupleChunkToAMS: transport states inactive");
/* check em' */
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG3, "sendtuplechunktoams: calling get_transport_state"
"w/transportStates %p transportState->size %d motnodeid %d route %d",
transportStates, transportStates->size, motNodeID, targetRoute);
#endif
getChunkTransportState(transportStates, motNodeID, &pEntry);
/*
* tcItem can actually be a chain of tcItems. we need to send out all of
* them.
*/
for (currItem = tcItem; currItem != NULL; currItem = currItem->p_next)
{
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "SendTupleChunkToAMS: chunk length %d", currItem->chunk_length);
#endif
if (targetRoute == BROADCAST_SEGIDX)
{
doBroadcast(transportStates, pEntry, currItem, &recount);
}
else
{
if (targetRoute < 0 || targetRoute >= pEntry->numConns)
{
elog(FATAL, "SendTupleChunkToAMS: targetRoute is %d, must be between 0 and %d .",
targetRoute, pEntry->numConns);
}
/* handle pt-to-pt message. Primary */
getMotionConn(pEntry, targetRoute, &conn);
/* only send to interested connections */
if (conn->stillActive)
{
CurrentMotionIPCLayer->SendChunk(transportStates, pEntry, conn, currItem, motNodeID);
if (!conn->stillActive)
recount = 1;
}
/* in 4.0 logical mirror xmit eliminated. */
}
}
if (recount == 0)
return true;
/* if we don't have any connections active, return false */
for (i = 0; i < pEntry->numConns; i++)
{
getMotionConn(pEntry, i, &conn);
if (conn->stillActive)
break;
}
/* if we found an active connection we're not done */
return (i < pEntry->numConns);
}
/*
* The fetches a direct pointer into our transmit buffers, along with
* an indication as to how much data can be safely shoved into the
* buffer (started at the pointed location).
*
* This works a lot like SendTupleChunkToAMS().
*/
void
GetTransportDirectBuffer(ChunkTransportState * transportStates,
int16 motNodeID,
int16 targetRoute,
struct directTransportBuffer *b)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
if (!transportStates)
{
elog(FATAL, "GetTransportDirectBuffer: no transport states");
}
else if (!transportStates->activated)
{
elog(FATAL, "GetTransportDirectBuffer: inactive transport states");
}
else if (targetRoute == BROADCAST_SEGIDX)
{
elog(FATAL, "GetTransportDirectBuffer: can't direct-transport to broadcast");
}
Assert(b != NULL);
do
{
getChunkTransportState(transportStates, motNodeID, &pEntry);
/* handle pt-to-pt message. Primary */
getMotionConn(pEntry, targetRoute, &conn);
/* only send to interested connections */
if (!conn->stillActive)
{
break;
}
b->pri = conn->pBuff + conn->msgSize;
b->prilen = Gp_max_packet_size - conn->msgSize;
/* got buffer. */
return;
}
while (0);
/* buffer is missing ? */
b->pri = NULL;
b->prilen = 0;
return;
}
/*
* The fetches a direct pointer into our transmit buffers, along with
* an indication as to how much data can be safely shoved into the
* buffer (started at the pointed location).
*
* This works a lot like SendTupleChunkToAMS().
*/
void
PutTransportDirectBuffer(ChunkTransportState * transportStates,
int16 motNodeID,
int16 targetRoute, int length)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
if (!transportStates)
{
elog(FATAL, "PutTransportDirectBuffer: no transport states");
}
else if (!transportStates->activated)
{
elog(FATAL, "PutTransportDirectBuffer: inactive transport states");
}
else if (targetRoute == BROADCAST_SEGIDX)
{
elog(FATAL, "PutTransportDirectBuffer: can't direct-transport to broadcast");
}
getChunkTransportState(transportStates, motNodeID, &pEntry);
/* handle pt-to-pt message. Primary */
getMotionConn(pEntry, targetRoute, &conn);
/* only send to interested connections */
if (conn->stillActive)
{
conn->msgSize += length;
conn->tupleCount++;
}
/* put buffer. */
return;
}
/*=========================================================================
* HELPER FUNCTIONS
*/
/* Function createChunkTransportState() is used to create a ChunkTransportState struct and
* place it in the hashtab hashtable based on the motNodeID.
*
* PARAMETERS
*
* motNodeID - motion node ID for this ChunkTransportState.
*
* numConns - number of primary connections for this motion node.
* All are incoming if this is a receiving motion node.
* All are outgoing if this is a sending motion node.
*
* RETURNS
* An empty and initialized ChunkTransportState struct for the given motion node. If
* a ChuckTransportState struct is already registered for the motNodeID an ERROR is
* thrown.
*/
ChunkTransportStateEntry *
createChunkTransportState(ChunkTransportState * transportStates,
ExecSlice * sendSlice,
ExecSlice * recvSlice,
int numConns,
size_t chunk_trans_state_entry_size)
{
ChunkTransportStateEntry *pEntry = NULL;
int motNodeID;
int i;
Assert(recvSlice->sliceIndex >= 0);
Assert(sendSlice->sliceIndex > 0);
motNodeID = sendSlice->sliceIndex;
if (motNodeID > transportStates->size)
{
/* increase size of our table */
ChunkTransportStateEntry *newTable;
size_t old_states_pos = transportStates->size * chunk_trans_state_entry_size;
newTable = repalloc(transportStates->states, motNodeID * chunk_trans_state_entry_size);
transportStates->states = newTable;
/* zero-out the new piece at the end */
MemSet(((uint8 *) transportStates->states) + old_states_pos, 0, (motNodeID - transportStates->size) * chunk_trans_state_entry_size);
transportStates->size = motNodeID;
}
getChunkTransportStateNoValid(transportStates, motNodeID, &pEntry);
if (pEntry->valid)
{
MotionConn *conn = NULL;
getMotionConn(pEntry, 0, &conn);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: A HTAB entry for motion node %d already exists",
motNodeID),
errdetail("conns %p numConns %d first sock %d",
pEntry->conns, pEntry->numConns,
conn != NULL ? conn->sockfd : -2)));
}
pEntry->valid = true;
pEntry->motNodeId = motNodeID;
pEntry->numConns = numConns;
pEntry->scanStart = 0;
pEntry->sendSlice = sendSlice;
pEntry->recvSlice = recvSlice;
allocMotionConns(pEntry);
for (i = 0; i < pEntry->numConns; i++)
{
MotionConn *conn = NULL;
getMotionConn(pEntry, i, &conn);
/* Initialize MotionConn entry. */
conn->state = mcsNull;
conn->sockfd = -1;
conn->msgSize = 0;
conn->tupleCount = 0;
conn->stillActive = false;
conn->stopRequested = false;
conn->cdbProc = NULL;
conn->remapper = NULL;
conn->sent_record_typmod = 0;
}
return pEntry;
}
/* Function removeChunkTransportState() is used to remove a ChunkTransportState struct from
* the hashtab hashtable.
*
* This should only be called after createChunkTransportState().
*
* PARAMETERS
*
* motNodeID - motion node ID to lookup the ChunkTransportState.
* pIncIdx - parent slice idx in child slice. If not multiplexed, should be 1.
*
* RETURNS
* The ChunkTransportState that was removed from the hashtab hashtable.
*/
ChunkTransportStateEntry *
removeChunkTransportState(ChunkTransportState * transportStates,
int16 motNodeID)
{
ChunkTransportStateEntry *pEntry = NULL;
if (motNodeID > transportStates->size)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Unexpected Motion Node Id: %d",
motNodeID),
errdetail("During remove. (size %d)", transportStates->size)));
}
getChunkTransportStateNoValid(transportStates, motNodeID, &pEntry);
if (!pEntry->valid)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("interconnect error: Unexpected Motion Node Id: %d",
motNodeID),
errdetail("During remove. State not valid")));
}
else
{
pEntry->valid = false;
}
MPP_FD_ZERO(&pEntry->readSet);
return pEntry;
}
/*
* checkForCancelFromQD
* Check for cancel from QD.
*
* Should be called only inside the dispatcher
*/
void
checkForCancelFromQD(ChunkTransportState * pTransportStates)
{
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(pTransportStates);
Assert(pTransportStates->estate);
if (cdbdisp_checkForCancel(pTransportStates->estate->dispatcherState))
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg(CDB_MOTION_LOST_CONTACT_STRING)));
/* not reached */
}
}
/*
* format_sockaddr
* Format a sockaddr to a human readable string
*
* This function must be kept threadsafe, elog/ereport/palloc etc are not
* allowed within this function.
*/
char *
format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len)
{
int ret;
char remote_host[NI_MAXHOST];
char remote_port[NI_MAXSERV];
ret = pg_getnameinfo_all(sa, sizeof(struct sockaddr_storage),
remote_host, sizeof(remote_host),
remote_port, sizeof(remote_port),
NI_NUMERICHOST | NI_NUMERICSERV);
if (ret != 0)
snprintf(buf, len, "?host?:?port?");
else
{
#ifdef HAVE_IPV6
if (sa->ss_family == AF_INET6)
snprintf(buf, len, "[%s]:%s", remote_host, remote_port);
else
#endif
snprintf(buf, len, "%s:%s", remote_host, remote_port);
}
return buf;
}
void
destroy_interconnect_handle(interconnect_handle_t * h)
{
h->interconnect_context = NULL;
/* unlink from linked list first */
if (h->prev)
h->prev->next = h->next;
else
open_interconnect_handles = h->next;
if (h->next)
h->next->prev = h->prev;
pfree(h);
if (open_interconnect_handles == NULL)
MemoryContextReset(InterconnectContext);
}
static void
cleanup_interconnect_handle(interconnect_handle_t * h)
{
if (h->interconnect_context == NULL)
{
destroy_interconnect_handle(h);
return;
}
h->teardown_cb(h->interconnect_context, true);
}
static void
interconnect_abort_callback(ResourceReleasePhase phase,
bool isCommit,
bool isTopLevel,
void *arg)
{
interconnect_handle_t *curr;
interconnect_handle_t *next;
if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
return;
next = open_interconnect_handles;
while (next)
{
curr = next;
next = curr->next;
if (curr->owner == CurrentResourceOwner)
{
if (isCommit)
elog(WARNING, "interconnect reference leak: %p still referenced", curr);
cleanup_interconnect_handle(curr);
}
}
}
interconnect_handle_t *
allocate_interconnect_handle(TeardownInterconnectCallBack callback)
{
interconnect_handle_t *h;
if (InterconnectContext == NULL)
InterconnectContext = AllocSetContextCreate(TopMemoryContext,
"Interconnect Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
h = MemoryContextAllocZero(InterconnectContext, sizeof(interconnect_handle_t));
h->teardown_cb = callback;
h->owner = CurrentResourceOwner;
h->next = open_interconnect_handles;
h->prev = NULL;
if (open_interconnect_handles)
open_interconnect_handles->prev = h;
open_interconnect_handles = h;
if (!interconnect_resowner_callback_registered)
{
RegisterResourceReleaseCallback(interconnect_abort_callback, NULL);
interconnect_resowner_callback_registered = true;
}
return h;
}
interconnect_handle_t *
find_interconnect_handle(ChunkTransportState * icContext)
{
interconnect_handle_t *head = open_interconnect_handles;
while (head != NULL)
{
if (head->interconnect_context == icContext)
return head;
head = head->next;
}
return NULL;
}
TupleRemapper *
GetMotionConnTupleRemapper(ChunkTransportState * transportStates,
int16 motNodeID,
int16 targetRoute)
{
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
getChunkTransportState(transportStates, motNodeID, &pEntry);
getMotionConn(pEntry, targetRoute, &conn);
Assert(conn);
return conn->remapper;
}
int32 *
GetMotionSentRecordTypmod(ChunkTransportState * transportStates,
int16 motNodeID,
int16 targetRoute)
{
MotionConn *conn;
ChunkTransportStateEntry *pEntry = NULL;
getChunkTransportState(transportStates, motNodeID, &pEntry);
if (targetRoute == BROADCAST_SEGIDX)
conn = &pEntry->conns[0];
else
conn = &pEntry->conns[targetRoute];
return &conn->sent_record_typmod;
}
/*
* do nothing for tcp/proxy implement.
*/
void
DirectPutRxBufferTCP(ChunkTransportState *transportStates, int motNodeID, int route)
{
return;
}
/*
* do nothing for tcp/proxy implement.
*/
uint32
GetActiveMotionConnsTCP(void)
{
return 0;
}