blob: 986c4582a8f5b9c91b17c0a32193b26d3c400d05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* executormgr.c
*
*
*/
#include "postgres.h"
#include "cdb/dispatcher.h"
#include "cdb/dispatcher_new.h"
#include "cdb/executormgr.h"
#include "cdb/poolmgr.h"
#include "cdb/cdbconn.h" /* SegmentDatabaseDescriptor */
#include "cdb/cdbgang.h" /* Gang */
#include "catalog/pg_authid.h" /* TODO:BOOTSTRAP_USER_ID remove! */
#include "cdb/cdbdisp.h" /* TODO: DispatchCommandQueryParms */
#include "cdb/cdbdispatchresult.h" /* TODO: CdbDispatchResult & cdbdisp_makeDispatchResults */
#include "cdb/cdbvars.h" /* TODO: gp_commond_count */
#include "miscadmin.h" /* TODO: MyDatabaseId */
#include "libpq/libpq-be.h" /* TODO: MyProcPort */
#include "commands/dbcommands.h" /* TODO: get_database_dts */
#include "utils/lsyscache.h" /* TODO: get_rolname */
#include "utils/guc_tables.h" /* TODO: manipulate gucs */
#include "portability/instr_time.h" /* Monitor the dispatcher performance */
#include "resourcemanager/dynrm.h"
typedef enum ExecutorMgrConstant {
EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE = 256,
} ExecutorMgrConstant;
typedef enum QueryExecutorState {
QES_UNINIT, /* Uninit state */
QES_DISPATCHABLE, /* Init state */
QES_RUNNING, /* Work dispatched */
QES_STOP, /* No more data */
} QueryExecutorState;
typedef enum QueryExecutorHealth {
QEH_NA,
QEH_GOOD,
QEH_CANCEL,
QEH_ERROR,
} QueryExecutorHealth;
typedef struct QueryExecutor {
/* Executor State */
bool takeovered; /* Someone outside the dispacher will release the executor. */
SegmentDatabaseDescriptor *desc;
struct Gang *refGang;
QueryExecutorState state;
QueryExecutorHealth health;
/* Error State */
struct CdbDispatchResult *refResult; /* TODO: libpq is ugly, should invent a new connection manager. */
/* Reference to dispatch data. */
struct DispatchSlice *refSlice;
struct DispatchTask *refTask;
/* Serialized identity state */
const char *identity_msg;
int identity_msg_len;
instr_time time_dispatch_begin;
instr_time time_dispatch_end;
instr_time time_connect_begin;
instr_time time_connect_end;
instr_time time_consume_begin;
instr_time time_consume_end;
instr_time time_free_begin;
instr_time time_free_end;
} QueryExecutor;
typedef struct ExecutorCache {
bool init;
MemoryContext ctx;
struct PoolMgrState *pool;
struct PoolMgrState *entrydb_pool; // pool for entry db connection
int cached_num;
int allocated_num;
int takeover_num;
} ExecutorCache;
static ExecutorCache executor_cache;
#define GetSegmentHashKey(segment) (((segment)->master) ? "master" : (segment)->hostname)
static void executormgr_catch_error(QueryExecutor *executor);
static void executormgr_destory(SegmentDatabaseDescriptor *desc);
static struct CdbDispatchResult *executormgr_get_executor_result(QueryExecutor *executor);
void
executormgr_setup_env(MemoryContext ctx)
{
MemoryContext old;
if (executor_cache.init)
return;
executor_cache.pool = poolmgr_create_pool(ctx, (PoolMgrCleanCallback) executormgr_destory);
executor_cache.entrydb_pool = poolmgr_create_pool(ctx, (PoolMgrCleanCallback) executormgr_destory);
executor_cache.ctx = ctx;
executor_cache.init = true;
/* TODO: Setup dispatcher information. But should remove in the future. */
old = MemoryContextSwitchTo(ctx);
MyProcPort->dboid = MyDatabaseId;
MyProcPort->dbdtsoid = get_database_dts(MyDatabaseId);
MyProcPort->bootstrap_user = get_rolname(BOOTSTRAP_SUPERUSERID);
MyProcPort->encoding = GetDatabaseEncoding();
MemoryContextSwitchTo(old);
}
void
executormgr_cleanup_env(void)
{
if (!executor_cache.init)
return;
if (executor_cache.allocated_num != 0)
elog(WARNING, "%d segments was allocated but not returned during cleanup executor manager.", executor_cache.allocated_num);
if (executor_cache.takeover_num != 0)
elog(WARNING, "%d segments was takeovered but not returned during cleanup executor manager.", executor_cache.takeover_num);
poolmgr_drop_pool(executor_cache.pool);
poolmgr_drop_pool(executor_cache.entrydb_pool);
}
static void
executormgr_count_executors_num(SegmentDatabaseDescriptor *desc, int *count)
{
*count += 1;
}
int
executormgr_get_cached_executor_num(void)
{
int ret = 0;
Assert(executor_cache.init);
poolmgr_iterate(executor_cache.pool, NULL, (PoolMgrIterateCallback) executormgr_count_executors_num, (PoolIterateArg) &ret);
return ret;
}
int
executormgr_get_cached_executor_num_onentrydb(void)
{
int ret = 0;
Assert(executor_cache.init);
poolmgr_iterate(executor_cache.entrydb_pool, NULL,
(PoolMgrIterateCallback) executormgr_count_executors_num,
(PoolIterateArg) &ret);
return ret;
}
QueryExecutor *
executormgr_create_executor(void)
{
QueryExecutor *executor = palloc0(sizeof(QueryExecutor));
INSTR_TIME_SET_ZERO(executor->time_dispatch_begin);
INSTR_TIME_SET_ZERO(executor->time_dispatch_end);
INSTR_TIME_SET_ZERO(executor->time_connect_begin);
INSTR_TIME_SET_ZERO(executor->time_connect_end);
INSTR_TIME_SET_ZERO(executor->time_consume_begin);
INSTR_TIME_SET_ZERO(executor->time_consume_end);
INSTR_TIME_SET_ZERO(executor->time_free_begin);
INSTR_TIME_SET_ZERO(executor->time_free_end);
return executor;
}
/*
* executormgr_bind_executor_task
* For each task in slice, allocate an executor and bind it to task.
*/
bool
executormgr_bind_executor_task(struct DispatchData *data,
QueryExecutor *executor,
SegmentDatabaseDescriptor *desc,
struct DispatchTask *task,
struct DispatchSlice *slice)
{
Assert(desc != NULL);
executor->state = QES_UNINIT;
executor->desc = desc;
executor->state = QES_DISPATCHABLE;
executor->health = QEH_NA;
/* setup the owner of the executor */
executor->takeovered = false;
/* setup payload */
executor->refSlice = slice;
executor->refTask = task;
/* TODO: set result slot */
executor->refResult = cdbdisp_makeResult(dispatch_get_results(data),
executor->desc,
dispatch_get_task_identity(task)->slice_id);
if (executor->refResult == NULL)
return false;
/* Transfer any connection errors from segdbDesc. */
executormgr_merge_error(executor);
return true;
}
void
executormgr_serialize_executor_state(struct DispatchData *data,
QueryExecutor *executor,
struct DispatchTask *task,
struct DispatchSlice *slice)
{
executor->identity_msg = SerializeProcessIdentity(dispatch_get_task_identity(task), &executor->identity_msg_len);
}
void
executormgr_unbind_executor_task(struct DispatchData *data,
QueryExecutor *executor,
struct DispatchTask *task,
struct DispatchSlice *slice)
{
if (executor->state == QES_UNINIT)
return;
/* Return executors */
TIMING_BEGIN(executor->time_free_begin);
if (!executor->takeovered)
executormgr_free_executor(executor->desc);
executor->state = QES_UNINIT;
TIMING_END(executor->time_free_end);
}
void
executormgr_get_statistics(QueryExecutor *executor,
instr_time *time_connect_begin,
instr_time *time_connect_end,
instr_time *time_dispatch_begin,
instr_time *time_dispatch_end,
instr_time *time_consume_begin,
instr_time *time_consume_end,
instr_time *time_free_begin,
instr_time *time_free_end)
{
INSTR_TIME_ASSIGN(*time_connect_begin, executor->time_connect_begin);
INSTR_TIME_ASSIGN(*time_connect_end, executor->time_connect_end);
INSTR_TIME_ASSIGN(*time_dispatch_begin, executor->time_dispatch_begin);
INSTR_TIME_ASSIGN(*time_dispatch_end, executor->time_dispatch_end);
INSTR_TIME_ASSIGN(*time_consume_begin, executor->time_consume_begin);
INSTR_TIME_ASSIGN(*time_consume_end, executor->time_consume_end);
INSTR_TIME_ASSIGN(*time_free_begin, executor->time_free_begin);
INSTR_TIME_ASSIGN(*time_free_end, executor->time_free_end);
}
void
executormgr_get_executor_connection_info(QueryExecutor *executor,
char **address, int *port, int *myPort, int *pid)
{
if (address)
*address = pstrdup(executor->desc->segment->hostip);
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP)
*port = (executor->desc->motionListener >> 16) & 0x0ffff;
else
*port = (executor->desc->motionListener & 0x0ffff);
*myPort = (executor->desc->my_listener & 0x0ffff);
*pid = executor->desc->backendPid;
}
int
executormgr_get_executor_slice_id(QueryExecutor *executor)
{
return dispatch_get_task_identity(executor->refTask)->slice_id;
}
/*
* executormgr_is_stop
*/
bool
executormgr_is_stop(QueryExecutor *executor)
{
return executor->state == QES_STOP;
}
bool
executormgr_has_error(QueryExecutor *executor)
{
return executor->health == QEH_ERROR;
}
static struct CdbDispatchResult *
executormgr_get_executor_result(QueryExecutor *executor)
{
return executor->refResult;
}
int
executormgr_get_segment_ID(QueryExecutor *executor)
{
Segment *seg = executor->desc->segment;
/* For segment only */
if (seg->master || seg->standby)
return -1;
return seg->ID;
}
int
executormgr_get_fd(QueryExecutor *executor)
{
return PQsocket(executor->desc->conn);
}
bool
executormgr_cancel(QueryExecutor *executor)
{
PGconn *conn = executor->desc->conn;
PGcancel *cn;
char errbuf[EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE];
bool success;
cn = PQgetCancel(conn);
MemSet(errbuf, 0, sizeof(errbuf));
success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0);
if(!success){
write_log("executormgr_cancel cancel failed, %s.", errbuf);
}
#if 0
if (success)
{
executor->state = QES_STOP;
executor->health = QEH_CANCEL;
}
else
{
/* TODO: log error? how to deal with connection error. */
executormgr_catch_error(executor);
}
#endif
{
write_log("function executormgr_cancel calling executormgr_catch_error");
executormgr_catch_error(executor);
}
PQfreeCancel(cn);
return success;
}
static bool
executormgr_validate_conn(PGconn *conn)
{
if (conn == NULL)
return false;
if (!dispatch_validate_conn(conn->sock))
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
"server closed the connection unexpectedly\n"
"\tThis probably means the server terminated abnormally\n"
"\tbefore or while processing the request.\n"));
conn->status = CONNECTION_BAD;
closesocket(conn->sock);
conn->sock = -1;
return false;
}
return true;
}
/*
* executormgr_is_dispatchable
* Return the true iff executor can receive query.
*/
static bool
executormgr_is_dispatchable(QueryExecutor *executor)
{
PGconn *conn = executor->desc->conn;
Assert(executor->state == QES_DISPATCHABLE);
if (PQisBusy(conn))
{
/* TODO: dead code? */
if (!executormgr_discard(executor) && PQisBusy(conn))
return false;
}
if (!executormgr_validate_conn(conn) || PQstatus(conn) == CONNECTION_BAD)
{
write_log("function %s meets error, connection is bad.", __func__);
executormgr_catch_error(executor);
return false;
}
return true;
}
/*
* executormgr_dispatch_and_run
* Dispatch data and run the query.
*/
bool
executormgr_dispatch_and_run(struct DispatchData *data, QueryExecutor *executor)
{
PGconn *conn = executor->desc->conn;
char *query = NULL;
int query_len;
DispatchCommandQueryParms *parms = dispatcher_get_QueryParms(data);
if (!executormgr_is_dispatchable(executor))
goto error;
TIMING_BEGIN(executor->time_dispatch_begin);
query = PQbuildGpQueryString(parms->strCommand, parms->strCommandlen,
parms->serializedQuerytree, parms->serializedQuerytreelen,
parms->serializedPlantree, parms->serializedPlantreelen,
parms->serializedParams, parms->serializedParamslen,
parms->serializedSliceInfo, parms->serializedSliceInfolen,
NULL, 0,
executor->identity_msg, executor->identity_msg_len,
parms->serializedQueryResource, parms->serializedQueryResourcelen,
parms->serializedCommonPlan, parms->serializedCommonPlanLen,
0,
gp_command_count,
executormgr_get_executor_slice_id(executor),
parms->rootIdx,
parms->seqServerHost, parms->seqServerHostlen, parms->seqServerPort,
parms->primary_gang_id,
GetCurrentStatementStartTimestamp(),
GetSessionUserId(), /* For external tools who want this info on segments. */
IsAuthenticatedUserSuperUser(),
BOOTSTRAP_SUPERUSERID,
true,
BOOTSTRAP_SUPERUSERID,
&query_len);
if (PQsendGpQuery_shared(conn, query, query_len) == 0)
goto error;
if (Debug_print_execution_detail) {
instr_time time;
INSTR_TIME_SET_CURRENT(time);
write_log("The time after dispatching : %s to conn %s, isNonBlocking %d, number of chars waiting"
"in buffer %d", query, conn->gpqeid, PQisnonblocking(conn), conn->outCount);
write_log("Size to dispatch: %.3f KB", (double)query_len / 1024);
}
TIMING_END(executor->time_dispatch_end);
free(query);
executor->state = QES_RUNNING;
executor->health = QEH_GOOD;
executor->refResult->hasDispatched = true;
return true;
error:
free(query);
executormgr_catch_error(executor);
return false;
}
bool
executormgr_check_segment_status(QueryExecutor *executor)
{
/*
* Cancel the query if a segment is down. QEs could hang in interconnect
* until timeout when one segment is down. This will cause QD keep polling
* until QE timeout.
*/
int ID = executormgr_get_segment_ID(executor);
if (ID >= 0 && IsSegmentDown(ID))
{
cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1,
executormgr_get_executor_result(executor));
return false;
}
return true;
}
void
executormgr_seterrcode_if_needed(QueryExecutor *executor)
{
struct CdbDispatchResult *dispatchResult;
if (executor == NULL)
return;
dispatchResult = executormgr_get_executor_result(executor);
if (dispatchResult->errcode == ERRCODE_SUCCESSFUL_COMPLETION ||
dispatchResult->errcode == ERRCODE_INTERNAL_ERROR)
{
cdbdisp_seterrcode(ERRCODE_INTERNAL_ERROR, -1, dispatchResult);
}
}
/*
* executormgr_consume
* If there are data available for executor, use this interface to consume data.
* Return false if there is an error. Need to check executor state if returns
* true.
*/
bool
executormgr_consume(QueryExecutor *executor)
{
PGconn *conn = executor->desc->conn;
int rc;
bool done = false;
TIMING_BEGIN(executor->time_consume_begin);
CdbDispatchResult *resultSlot = executormgr_get_executor_result(executor);
if ((rc = PQconsumeInput(conn)) == 0)
goto connection_error;
while (!PQisBusy(conn))
{
PGresult *result;
ExecStatusType status_type;
int result_index;
if (PQstatus(conn) == CONNECTION_BAD)
goto connection_error;
result_index = cdbdisp_numPGresult(resultSlot);
result = PQgetResult(conn);
/* Normal command finished */
if (!result)
{
done = true;
break;
}
/* Transfer the result to resultSlot, so we don't need to cleanup! */
cdbdisp_appendResult(resultSlot, result);
status_type = PQresultStatus(result);
if (status_type == PGRES_COMMAND_OK ||
status_type == PGRES_TUPLES_OK ||
status_type == PGRES_COPY_IN ||
status_type == PGRES_COPY_OUT)
{
resultSlot->okindex = result_index;
if (result->numRejected > 0)
resultSlot->numrowsrejected += result->numRejected;
if (status_type == PGRES_COPY_IN || status_type == PGRES_COPY_OUT)
{
done = true;
break;
}
}
else
{
char *sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
int errcode = 0;
/* TODO: error? */
if (sqlstate && strlen(sqlstate) == 5)
errcode = cdbdisp_sqlstate_to_errcode(sqlstate);
cdbdisp_seterrcode(errcode, result_index, resultSlot);
goto connection_error;
}
}
if (done)
executor->state = QES_STOP;
TIMING_END(executor->time_consume_end);
return true;
connection_error:
/* Let caller deal with connection error. */
write_log("function %s meets error, connection is bad.", __func__);
executormgr_catch_error(executor);
return false;
}
/*
* executormgr_discard
* Discard the useless results in a executor.
*/
bool
executormgr_discard(QueryExecutor *executor)
{
PGresult *result;
PGconn *conn = executor->desc->conn;
bool ret;
result = PQgetResult(conn);
ret = result != NULL;
PQclear(result);
return ret;
}
/*
* executormgr_catch_error
* Currently, only connection error is an executor error.
*/
static void
executormgr_catch_error(QueryExecutor *executor)
{
PGconn *conn = executor->desc->conn;
char *msg;
int errCode = 0;
if (executor->refResult->errcode != 0)
errCode = executor->refResult->errcode;
msg = PQerrorMessage(conn);
/* In order to avoid executor errcode != 0, but executor errormsg is null */
// if (msg && (strcmp("", msg) != 0) && (executor->refResult->errcode == 0)) {
if (msg && (strcmp("", msg) != 0)) {
if (executor->refResult->errcode == 0)
errCode = ERRCODE_GP_INTERCONNECTION_ERROR;
PQExpBufferData selfDesc;
initPQExpBuffer(&selfDesc);
appendPQExpBuffer(&selfDesc, "(seg%d %s:%d)",
executor->desc->segment->segindex,
executor->desc->segment->hostname,
executor->desc->segment->port);
if (!executor->refResult->error_message) {
cdbdisp_appendMessage(
executor->refResult, LOG, errCode,
"%s %s: %s",
(executor->state == QES_DISPATCHABLE ?
"Error dispatching to" :
(executor->state == QES_RUNNING ?
"Query Executor Error in" : "Error in ")),
(executor->desc->whoami && strcmp(executor->desc->whoami, "") != 0) ?
executor->desc->whoami : selfDesc.data,
msg ? msg : "unknown error");
}
termPQExpBuffer(&selfDesc);
executor->health = QEH_ERROR; // there is one error collected
}
else
{
executor->health = QEH_CANCEL; // no error found, set as normal cancel
}
PQfinish(conn);
executor->desc->conn = NULL;
executor->state = QES_STOP;
}
void
executormgr_merge_error(QueryExecutor *executor)
{
if (executor->state == QES_UNINIT)
return;
if (executormgr_is_executor_error(executor))
cdbdisp_mergeConnectionErrors(executor->refResult, executor->desc);
}
void
executormgr_merge_error_for_dispatcher(
QueryExecutor *executor, int *errHostSize,
int *errNum, char ***errHostInfo)
{
if (executor->state == QES_UNINIT)
return;
int errCode = executor->refResult->errcode;
if (ERRCODE_GP_INTERCONNECTION_ERROR == errCode
|| ERRCODE_ADMIN_SHUTDOWN == errCode) {
// add host to *errHostInfo
char *host = executor->desc->segment->hostname;
int hostNameLen = strlen(host);
(*errHostInfo)[*errNum] = (char*)palloc0(hostNameLen + 1);
strcpy((*errHostInfo)[*errNum], host);
(*errNum)++;
if(*errNum == *errHostSize) {
(*errHostSize) *= 2;
(*errHostInfo) = (char**) repalloc((*errHostInfo),
*errHostSize * sizeof(char *));
}
}
if (executormgr_is_executor_error(executor)) {
cdbdisp_mergeConnectionErrors(executor->refResult, executor->desc);
}
}
bool
executormgr_is_executor_error(QueryExecutor *executor) {
if (executor->desc->errcode || executor->desc->error_message.len)
return true;
else
return false;
}
bool
executormgr_is_executor_valid(QueryExecutor *executor) {
return executor->desc != NULL;
}
SegmentDatabaseDescriptor *
executormgr_takeover_segment_conns(QueryExecutor *executor)
{
executor->takeovered = true;
executor_cache.takeover_num++;
executor_cache.allocated_num--;
return executor->desc;
}
void
executormgr_free_takeovered_segment_conn(SegmentDatabaseDescriptor *desc)
{
executor_cache.takeover_num--;
executor_cache.allocated_num++;
executormgr_free_executor(desc);
}
static SegmentDatabaseDescriptor *
executormgr_allocate_any_executor(bool is_writer, bool is_entrydb)
{
// get executor from pool and check whether the connection is valid, keep
// running until finding a valid one or the pool becomes NULL
struct PoolMgrState *executor_pool =
is_entrydb ? executor_cache.entrydb_pool : executor_cache.pool;
SegmentDatabaseDescriptor *desc = poolmgr_get_random_item(executor_pool);
while (desc != NULL && !executormgr_validate_conn(desc->conn)) {
desc = poolmgr_get_random_item(executor_pool);
}
return desc;
}
static SegmentDatabaseDescriptor *
executormgr_allocate_executor_by_name(const char *name, bool is_writer)
{
// get executor from pool and check whether the connection is valid, keep
// running until finding a valid one or the pool becomes NULL
SegmentDatabaseDescriptor *desc =
poolmgr_get_item_by_name(executor_cache.pool, name);
while (desc != NULL && !executormgr_validate_conn(desc->conn)) {
desc = poolmgr_get_item_by_name(executor_cache.pool, name);
}
return desc;
}
/*
* executormgr_allocate_executor
* Allocate an executor for specific slice/task.
*/
SegmentDatabaseDescriptor *
executormgr_allocate_executor(Segment *segment, bool is_writer, bool is_entrydb)
{
SegmentDatabaseDescriptor *ret;
if (is_entrydb || (segment != NULL && segment->master))
ret = executormgr_allocate_any_executor(is_writer, true);
else if (segment == NULL)
ret = executormgr_allocate_any_executor(is_writer, false);
else
ret = executormgr_allocate_executor_by_name(GetSegmentHashKey(segment), is_writer);
if (!ret)
return NULL;
executor_cache.allocated_num++;
executor_cache.cached_num--;
return ret;
}
/*
* executormgr_free_executor
* Free an executor.
*/
void
executormgr_free_executor(SegmentDatabaseDescriptor *desc)
{
executor_cache.allocated_num--;
if (!desc->conn)
{
/* executor has connection error, remove it. */
executormgr_destory(desc);
return;
}
desc->conn->asyncStatus = PGASYNC_IDLE;
// pqClearAsyncResult(desc->conn);
if (desc->segment->master)
poolmgr_put_item(executor_cache.entrydb_pool, GetSegmentHashKey(desc->segment), desc);
else
poolmgr_put_item(executor_cache.pool, GetSegmentHashKey(desc->segment), desc);
executor_cache.cached_num++;
}
bool
executormgr_add_address(SegmentDatabaseDescriptor *segdbDesc, bool skipHost, PQExpBuffer str)
{
Segment *segment = segdbDesc->segment;
/*
* If init buffer failed, set segdbDesc.errcode, err_message, conn set null and return,
* the caller cdbgang.c will later check the status of segdbDesc and output to user.
*/
if(str->maxlen == 0)
{
segdbDesc->errcode = ERRCODE_OUT_OF_MEMORY;
appendPQExpBuffer(&segdbDesc->error_message,
"Master unable to connect, malloc memory structure failure");
segdbDesc->conn = NULL;
return false;
}
/*
* On the master, we must use UNIX domain sockets for security -- as it can
* be authenticated. See MPP-15802.
*/
if (!segment->master && !skipHost)
{
/*
* First we pick the cached hostip if we have it.
*
* If we don't have a cached hostip, we use the host->address,
* if we don't have that we fallback to host->hostname.
*/
if (segment->hostip != NULL)
{
appendPQExpBuffer(str, "hostaddr=%s ", segment->hostip);
}
else if (segment->hostname == NULL)
{
appendPQExpBufferStr(str, "host='' " );
}
else if (isdigit(segment->hostname[0]))
{
appendPQExpBuffer(str, "hostaddr=%s ", segment->hostname);
}
else
{
appendPQExpBuffer(str, "host=%s ", segment->hostname);
}
}
appendPQExpBuffer(str, "port=%u ", segment->port);
/*
* XXX: PQconnectdb() doesn't handle embedded quotes (') but they can be
* in a valid database name.
*/
if (MyProcPort->database_name)
appendPQExpBuffer(str, "dbname='%s' ", MyProcPort->database_name);
appendPQExpBuffer(str, "user='%s' ", MyProcPort->user_name);
appendPQExpBuffer(str, "connect_timeout=%d ", gp_segment_connect_timeout);
appendPQExpBuffer(str, "dboid=%u ", MyProcPort->dboid);
appendPQExpBuffer(str, "dbdtsoid=%u ", MyProcPort->dbdtsoid);
appendPQExpBuffer(str, "bootstrap_user=%s ", MyProcPort->bootstrap_user);
appendPQExpBuffer(str, "encoding=%d ", MyProcPort->encoding);
return true;
}
static bool
addOneOption(PQExpBufferData *buffer, struct config_generic * guc)
{
Assert(guc && (guc->flags & (GUC_GPDB_ADDOPT | GUC_NEW_DISP)));
switch (guc->vartype)
{
case PGC_BOOL:
{
struct config_bool *bguc = (struct config_bool *) guc;
appendPQExpBuffer(buffer, " -c %s=%s", guc->name,
*(bguc->variable) ? "true" : "false"
);
return true;
}
case PGC_INT:
{
struct config_int *iguc = (struct config_int *) guc;
appendPQExpBuffer(buffer, " -c %s=%d", guc->name, *iguc->variable);
return true;
}
case PGC_REAL:
{
struct config_real *rguc = (struct config_real *) guc;
appendPQExpBuffer(buffer, " -c %s=%f", guc->name, *rguc->variable);
return true;
}
case PGC_STRING:
{
struct config_string *sguc = (struct config_string *) guc;
const char *str = *sguc->variable;
unsigned int j, start, size;
char *temp, *new_temp;
size = 256;
temp = malloc(size + 8);
if (temp == NULL)
return false;
j = 0;
for (start = 0; start < strlen(str); ++start)
{
if (j == size)
{
size *= 2;
new_temp = realloc(temp, size + 8);
if (new_temp == NULL)
{
free(temp);
return false;
}
temp = new_temp;
}
if (str[start] == ' ')
{
temp[j++] = '\\';
temp[j++] = '\\';
} else if (str[start] == '"' || str[start] == '\'')
temp[j++] = '\\';
temp[j++] = str[start];
}
temp[j] = '\0';
appendPQExpBuffer(buffer, " -c %s=%s", guc->name, temp);
free(temp);
return true;
}
}
Assert(!"Invalid guc var type");
return false;
}
bool executormgr_addDispGuc(PQExpBuffer str, bool isSuperUser) {
struct config_generic **gucs = get_guc_variables();
int ngucs = get_num_guc_variables();
appendPQExpBufferStr(str, "options='");
for (int i = 0; i < ngucs; ++i) {
struct config_generic *guc = gucs[i];
if ((guc->flags & GUC_NEW_DISP) &&
(guc->context == PGC_USERSET || isSuperUser)) {
if (!addOneOption(str, guc)) return false;
}
}
appendPQExpBuffer(str, "' ");
return true;
}
bool
executormgr_add_guc(PQExpBuffer str, bool is_superuser)
{
struct config_generic **gucs = get_guc_variables();
int ngucs = get_num_guc_variables();
int i;
appendPQExpBufferStr(str, "options='");
/*
* Transactions are tricky.
* Here is the copy and pasted code, and we know they are working.
* The problem, is that QE may ends up with different iso level, but
* postgres really does not have read uncommited and repeated read.
* (is this true?) and they are mapped.
*
* Put these two gucs in the generic framework works (pass make installcheck-good)
* if we make assign_defaultxactisolevel and assign_XactIsoLevel correct take
* string "readcommitted" etc. (space stripped). However, I do not
* want to change this piece of code unless I know it is broken.
*/
if (DefaultXactIsoLevel == XACT_SERIALIZABLE)
appendPQExpBuffer(str, " -c default_transaction_isolation=serializable");
if (XactIsoLevel == XACT_SERIALIZABLE)
appendPQExpBuffer(str, " -c transaction_isolation=serializable");
for (i = 0; i < ngucs; ++i)
{
struct config_generic *guc = gucs[i];
if ((guc->flags & GUC_GPDB_ADDOPT) &&
(guc->context == PGC_USERSET || is_superuser))
{
if (!addOneOption(str, guc))
return false;
}
}
/* Add the default database/user gucs */
if (MyProcPort->override_options.len)
{
appendPQExpBuffer(str, " %s", MyProcPort->override_options.data);
}
appendPQExpBuffer(str, "' ");
return true;
}
bool
executormgr_add_static_state(PQExpBuffer str, bool is_writer)
{
char *master_host;
int master_port;
appendPQExpBuffer(str, "gpqeid=%d;", gp_session_id);
#ifdef HAVE_INT64_TIMESTAMP
appendPQExpBuffer(str, INT64_FORMAT ";", PgStartTime);
#else
#ifndef _WIN32
appendPQExpBuffer(str, "%.14a;", PgStartTime);
#else
appendPQExpBuffer(str, "%g;", PgStartTime);
#endif
#endif
GetMasterAddress(&master_host, &master_port);
/* dispatcher host name for external table */
appendPQExpBuffer(str, "%s;", master_host);
/* dispatcher host port for external table */
appendPQExpBuffer(str, "%d;", master_port);
appendPQExpBuffer(str, "%s;", is_writer ? "true" : "false");
/* change last semicolon to space */
Assert(str->data[str->len - 1] == ';');
str->data[str->len - 1] = ' ';
return true;
}
SegmentDatabaseDescriptor *
executormgr_prepare_connect(Segment *segment, bool is_writer)
{
SegmentDatabaseDescriptor *desc;
Segment *long_lived_segment;
/* Executor have to exist for a long period. */
long_lived_segment = CopySegment(segment, executor_cache.ctx);
desc = MemoryContextAlloc(executor_cache.ctx, sizeof(*desc));
cdbconn_initSegmentDescriptor(desc, long_lived_segment);
executor_cache.allocated_num++;
return desc;
}
bool
executormgr_connect(SegmentDatabaseDescriptor *desc, QueryExecutor *executor,
bool is_writer, bool is_superuser)
{
PQExpBufferData buffer;
/* Build the connection string. */
initPQExpBuffer(&buffer);
if (buffer.maxlen == 0)
goto error;
if (!executormgr_add_address(desc, false, &buffer))
goto error;
if (!executormgr_add_guc(&buffer, is_superuser))
goto error;
if (!executormgr_add_static_state(&buffer, is_writer))
goto error;
TIMING_BEGIN(executor->time_connect_begin);
if (!cdbconn_doConnect(desc, buffer.data))
goto error;
TIMING_END(executor->time_connect_end);
free(buffer.data);
return true;
error:
termPQExpBuffer(&buffer);
return false;
}
static void
executormgr_destory(SegmentDatabaseDescriptor *desc)
{
PQfinish(desc->conn);
desc->conn = NULL;
cdbconn_termSegmentDescriptor(desc);
FreeSegment(desc->segment);
pfree(desc);
}
bool executormgr_has_cached_executor()
{
return executor_cache.cached_num > 0;
}
bool executormgr_clean_cached_executor_filter(PoolItem item)
{
SegmentDatabaseDescriptor *desc = (SegmentDatabaseDescriptor *)item;
return desc->conn->asyncStatus == PGASYNC_IDLE;
}
void executormgr_clean_cached_executor()
{
/* go through each cached executor */
int cleaned = 0;
if (!executor_cache.init)
{
return;
}
cleaned = poolmgr_clean(executor_cache.pool, (PoolMgrIterateFilter) executormgr_clean_cached_executor_filter);
cleaned += poolmgr_clean(executor_cache.entrydb_pool, (PoolMgrIterateFilter) executormgr_clean_cached_executor_filter);
elog(DEBUG5, "cleaned %d idle executors", cleaned);
}