blob: 81b671bd6adde41f4e9a0b5c3ef83d0a49a7ce8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbconn.c
*
* SegmentDatabaseDescriptor methods
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
#include "miscadmin.h"
#include "utils/memutils.h"
#include "libpq/libpq-be.h"
extern int pq_flush(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
#include "cdb/cdbconn.h" /* me */
#include "cdb/cdbutil.h" /* CdbComponentDatabaseInfo */
#include "cdb/cdbvars.h"
int gp_segment_connect_timeout = 180;
static void MPPnoticeReceiver(void * arg, const PGresult * res)
{
PQExpBufferData msgbuf;
PGMessageField *pfield;
int elevel = INFO;
char * sqlstate = "00000";
char * severity = "WARNING";
char * file = "";
char * line = NULL;
char * func = "";
char message[1024];
char * detail = NULL;
char * hint = NULL;
char * context = NULL;
SegmentDatabaseDescriptor *segdbDesc = (SegmentDatabaseDescriptor *) arg;
if (!res)
return;
strcpy(message,"missing error text");
for (pfield = res->errFields; pfield != NULL; pfield = pfield->next)
{
switch (pfield->code)
{
case PG_DIAG_SEVERITY:
severity = pfield->contents;
if (strcmp(pfield->contents,"WARNING")==0)
elevel = WARNING;
else if (strcmp(pfield->contents,"NOTICE")==0)
elevel = NOTICE;
else if (strcmp(pfield->contents,"DEBUG1")==0 ||
strcmp(pfield->contents,"DEBUG")==0)
elevel = DEBUG1;
else if (strcmp(pfield->contents,"DEBUG2")==0)
elevel = DEBUG2;
else if (strcmp(pfield->contents,"DEBUG3")==0)
elevel = DEBUG3;
else if (strcmp(pfield->contents,"DEBUG4")==0)
elevel = DEBUG4;
else if (strcmp(pfield->contents,"DEBUG5")==0)
elevel = DEBUG5;
else
elevel = INFO;
break;
case PG_DIAG_SQLSTATE:
sqlstate = pfield->contents;
break;
case PG_DIAG_MESSAGE_PRIMARY:
strncpy(message, pfield->contents, 800);
message[800] = '\0';
if (segdbDesc && segdbDesc->whoami && strlen(segdbDesc->whoami) < 200)
{
strcat(message," (");
strcat(message, segdbDesc->whoami);
strcat(message,")");
}
break;
case PG_DIAG_MESSAGE_DETAIL:
detail = pfield->contents;
break;
case PG_DIAG_MESSAGE_HINT:
hint = pfield->contents;
break;
case PG_DIAG_STATEMENT_POSITION:
case PG_DIAG_INTERNAL_POSITION:
case PG_DIAG_INTERNAL_QUERY:
break;
case PG_DIAG_CONTEXT:
context = pfield->contents;
break;
case PG_DIAG_SOURCE_FILE:
file = pfield->contents;
break;
case PG_DIAG_SOURCE_LINE:
line = pfield->contents;
break;
case PG_DIAG_SOURCE_FUNCTION:
func = pfield->contents;
break;
case PG_DIAG_GP_PROCESS_TAG:
break;
default:
break;
}
}
if (elevel < client_min_messages && elevel != INFO)
return;
/*
* We use PQExpBufferData instead of StringInfoData
* because the former uses malloc, the latter palloc.
* We are in a thread, and we CANNOT use palloc since it's not
* thread safe. We cannot call elog or ereport either for the
* same reason.
*/
initPQExpBuffer(&msgbuf);
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* New style with separate fields */
appendPQExpBufferChar(&msgbuf, PG_DIAG_SEVERITY);
appendBinaryPQExpBuffer(&msgbuf, severity, strlen(severity)+1);
appendPQExpBufferChar(&msgbuf, PG_DIAG_SQLSTATE);
appendBinaryPQExpBuffer(&msgbuf, sqlstate, strlen(sqlstate)+1);
/* M field is required per protocol, so always send something */
appendPQExpBufferChar(&msgbuf, PG_DIAG_MESSAGE_PRIMARY);
appendBinaryPQExpBuffer(&msgbuf, message , strlen(message) + 1);
if (detail)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_MESSAGE_DETAIL);
appendBinaryPQExpBuffer(&msgbuf, detail, strlen(detail)+1);
}
if (hint)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_MESSAGE_HINT);
appendBinaryPQExpBuffer(&msgbuf, hint, strlen(hint)+1);
}
if (context)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_CONTEXT);
appendBinaryPQExpBuffer(&msgbuf, context, strlen(context)+1);
}
/*
if (edata->cursorpos > 0)
{
snprintf(tbuf, sizeof(tbuf), "%d", edata->cursorpos);
appendPQExpBufferChar(&msgbuf, PG_DIAG_STATEMENT_POSITION);
appendBinaryPQExpBuffer(&msgbuf, tbuf);
}
if (edata->internalpos > 0)
{
snprintf(tbuf, sizeof(tbuf), "%d", edata->internalpos);
appendPQExpBufferChar(&msgbuf, PG_DIAG_INTERNAL_POSITION);
appendBinaryPQExpBuffer(&msgbuf, tbuf);
}
if (edata->internalquery)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_INTERNAL_QUERY);
appendBinaryPQExpBuffer(&msgbuf, edata->internalquery);
}
*/
if (file)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_SOURCE_FILE);
appendBinaryPQExpBuffer(&msgbuf, file, strlen(file)+1);
}
if (line)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_SOURCE_LINE);
appendBinaryPQExpBuffer(&msgbuf, line, strlen(line)+1);
}
if (func)
{
appendPQExpBufferChar(&msgbuf, PG_DIAG_SOURCE_FUNCTION);
appendBinaryPQExpBuffer(&msgbuf, func, strlen(func)+1);
}
}
else
{
appendPQExpBuffer(&msgbuf, "%s: ", severity);
appendBinaryPQExpBuffer(&msgbuf, message, strlen(message));
appendPQExpBufferChar(&msgbuf, '\n');
appendPQExpBufferChar(&msgbuf, '\0');
}
appendPQExpBufferChar(&msgbuf, '\0'); /* terminator */
pq_putmessage('N', msgbuf.data, msgbuf.len);
termPQExpBuffer(&msgbuf);
pq_flush();
}
/* Initialize a QE connection descriptor in storage provided by the caller. */
void
cdbconn_initSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc,
Segment *segment)
{
MemSet(segdbDesc, 0, sizeof(*segdbDesc));
/* Segment db info */
segdbDesc->segment = segment;
/* Connection info */
segdbDesc->conn = NULL;
segdbDesc->motionListener = 0;
segdbDesc->whoami = NULL;
/* Connection error info */
segdbDesc->errcode = 0;
initPQExpBuffer(&segdbDesc->error_message);
} /* cdbconn_initSegmentDescriptor */
/* Free all memory owned by this segment descriptor. */
void
cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc)
{
/* Free the error message buffer. */
segdbDesc->errcode = 0;
termPQExpBuffer(&segdbDesc->error_message);
/* Free connection info. */
if (segdbDesc->whoami)
{
free(segdbDesc->whoami);
segdbDesc->whoami = NULL;
}
} /* cdbconn_termSegmentDescriptor */
/* Connect to a QE as a client via libpq. */
bool /* returns true if connected */
cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
const char *connection_string)
{
/*
* Call libpq to connect
*/
segdbDesc->conn = PQconnectdb(connection_string);
/* Build whoami string to identify the QE for use in messages. */
if(!cdbconn_setSliceIndex(segdbDesc, -1))
{
if (!segdbDesc->errcode)
segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
/*
* Check for connection failure.
*/
if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
{
if (!segdbDesc->errcode)
segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
write_log("Master unable to connect to %s : %s\nConnection option: %s",
segdbDesc->whoami,
PQerrorMessage(segdbDesc->conn),
connection_string);
appendPQExpBuffer(&segdbDesc->error_message,
"Master unable to connect to %s: %s\n",
segdbDesc->whoami,
PQerrorMessage(segdbDesc->conn));
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
/*
* Successfully connected.
*/
else
{
PQsetNoticeReceiver(segdbDesc->conn, &MPPnoticeReceiver, segdbDesc);
/* Command the QE to initialize its motion layer.
* Wait for it to respond giving us the TCP port number
* where it listens for connections from the gang below.
*/
segdbDesc->motionListener = PQgetQEdetail(segdbDesc->conn, false);
segdbDesc->backendPid = PQbackendPID(segdbDesc->conn);
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("Connected to %s motionListener=%d/%d with options %s\n",
segdbDesc->whoami,
(segdbDesc->motionListener & 0x0ffff),
((segdbDesc->motionListener>>16) & 0x0ffff),
connection_string);
}
return segdbDesc->conn != NULL;
} /* cdbconn_doConnect */
/* Build text to identify this QE in error messages. */
bool
cdbconn_setSliceIndex(SegmentDatabaseDescriptor *segdbDesc,
int sliceIndex)
{
Segment *segment = segdbDesc->segment;
PQExpBuffer scratchbuf = &segdbDesc->error_message;
int scratchoff = scratchbuf->len;
Assert(scratchbuf->len < 300000);
/* Format the connection info. */
appendPQExpBuffer(scratchbuf, "seg%d %s:%d",
segment->segindex, segment->hostname, segment->port);
/* If connected, format the QE's process id. */
if (segdbDesc->conn)
{
int pid = PQbackendPID(segdbDesc->conn);
if (pid)
appendPQExpBuffer(scratchbuf, " pid=%d", pid);
}
/* Store updated whoami text. */
if (segdbDesc->whoami != NULL)
free(segdbDesc->whoami);
segdbDesc->whoami = strdup(scratchbuf->data + scratchoff);
if(!segdbDesc->whoami)
{
appendPQExpBuffer(scratchbuf, " Error: Out of Memory");
return false;
}
/* Give back our scratch space at tail of error_message buffer. */
truncatePQExpBuffer(scratchbuf, scratchoff);
return true;
} /* cdbconn_setSliceIndex */