| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /* |
| * executormgr.c |
| * |
| * |
| */ |
| #include "postgres.h" |
| |
| #include "cdb/dispatcher.h" |
| #include "cdb/dispatcher_new.h" |
| #include "cdb/executormgr.h" |
| #include "cdb/poolmgr.h" |
| |
| #include "cdb/cdbconn.h" /* SegmentDatabaseDescriptor */ |
| #include "cdb/cdbgang.h" /* Gang */ |
| |
| #include "catalog/pg_authid.h" /* TODO:BOOTSTRAP_USER_ID remove! */ |
| #include "cdb/cdbdisp.h" /* TODO: DispatchCommandQueryParms */ |
| #include "cdb/cdbdispatchresult.h" /* TODO: CdbDispatchResult & cdbdisp_makeDispatchResults */ |
| #include "cdb/cdbvars.h" /* TODO: gp_commond_count */ |
| #include "miscadmin.h" /* TODO: MyDatabaseId */ |
| #include "libpq/libpq-be.h" /* TODO: MyProcPort */ |
| #include "commands/dbcommands.h" /* TODO: get_database_dts */ |
| #include "utils/lsyscache.h" /* TODO: get_rolname */ |
| #include "utils/guc_tables.h" /* TODO: manipulate gucs */ |
| #include "portability/instr_time.h" /* Monitor the dispatcher performance */ |
| |
| #include "resourcemanager/dynrm.h" |
| |
| typedef enum ExecutorMgrConstant { |
| EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE = 256, |
| } ExecutorMgrConstant; |
| |
| typedef enum QueryExecutorState { |
| QES_UNINIT, /* Uninit state */ |
| QES_DISPATCHABLE, /* Init state */ |
| QES_RUNNING, /* Work dispatched */ |
| QES_STOP, /* No more data */ |
| } QueryExecutorState; |
| |
| typedef enum QueryExecutorHealth { |
| QEH_NA, |
| QEH_GOOD, |
| QEH_CANCEL, |
| QEH_ERROR, |
| } QueryExecutorHealth; |
| |
| typedef struct QueryExecutor { |
| /* Executor State */ |
| bool takeovered; /* Someone outside the dispacher will release the executor. */ |
| SegmentDatabaseDescriptor *desc; |
| struct Gang *refGang; |
| QueryExecutorState state; |
| QueryExecutorHealth health; |
| |
| /* Error State */ |
| struct CdbDispatchResult *refResult; /* TODO: libpq is ugly, should invent a new connection manager. */ |
| |
| /* Reference to dispatch data. */ |
| struct DispatchSlice *refSlice; |
| struct DispatchTask *refTask; |
| |
| /* Serialized identity state */ |
| const char *identity_msg; |
| int identity_msg_len; |
| |
| instr_time time_dispatch_begin; |
| instr_time time_dispatch_end; |
| instr_time time_connect_begin; |
| instr_time time_connect_end; |
| instr_time time_consume_begin; |
| instr_time time_consume_end; |
| instr_time time_free_begin; |
| instr_time time_free_end; |
| } QueryExecutor; |
| |
| typedef struct ExecutorCache { |
| bool init; |
| MemoryContext ctx; |
| struct PoolMgrState *pool; |
| struct PoolMgrState *entrydb_pool; // pool for entry db connection |
| int cached_num; |
| int allocated_num; |
| int takeover_num; |
| } ExecutorCache; |
| |
| static ExecutorCache executor_cache; |
| #define GetSegmentHashKey(segment) (((segment)->master) ? "master" : (segment)->hostname) |
| |
| |
| static void executormgr_catch_error(QueryExecutor *executor); |
| static void executormgr_destory(SegmentDatabaseDescriptor *desc); |
| static struct CdbDispatchResult *executormgr_get_executor_result(QueryExecutor *executor); |
| |
| void |
| executormgr_setup_env(MemoryContext ctx) |
| { |
| MemoryContext old; |
| |
| if (executor_cache.init) |
| return; |
| |
| executor_cache.pool = poolmgr_create_pool(ctx, (PoolMgrCleanCallback) executormgr_destory); |
| executor_cache.entrydb_pool = poolmgr_create_pool(ctx, (PoolMgrCleanCallback) executormgr_destory); |
| executor_cache.ctx = ctx; |
| executor_cache.init = true; |
| |
| /* TODO: Setup dispatcher information. But should remove in the future. */ |
| old = MemoryContextSwitchTo(ctx); |
| MyProcPort->dboid = MyDatabaseId; |
| MyProcPort->dbdtsoid = get_database_dts(MyDatabaseId); |
| MyProcPort->bootstrap_user = get_rolname(BOOTSTRAP_SUPERUSERID); |
| MyProcPort->encoding = GetDatabaseEncoding(); |
| MemoryContextSwitchTo(old); |
| } |
| |
| void |
| executormgr_cleanup_env(void) |
| { |
| if (!executor_cache.init) |
| return; |
| |
| if (executor_cache.allocated_num != 0) |
| elog(WARNING, "%d segments was allocated but not returned during cleanup executor manager.", executor_cache.allocated_num); |
| if (executor_cache.takeover_num != 0) |
| elog(WARNING, "%d segments was takeovered but not returned during cleanup executor manager.", executor_cache.takeover_num); |
| |
| poolmgr_drop_pool(executor_cache.pool); |
| poolmgr_drop_pool(executor_cache.entrydb_pool); |
| } |
| |
| static void |
| executormgr_count_executors_num(SegmentDatabaseDescriptor *desc, int *count) |
| { |
| *count += 1; |
| } |
| |
| int |
| executormgr_get_cached_executor_num(void) |
| { |
| int ret = 0; |
| |
| Assert(executor_cache.init); |
| |
| poolmgr_iterate(executor_cache.pool, NULL, (PoolMgrIterateCallback) executormgr_count_executors_num, (PoolIterateArg) &ret); |
| return ret; |
| } |
| |
| int |
| executormgr_get_cached_executor_num_onentrydb(void) |
| { |
| int ret = 0; |
| |
| Assert(executor_cache.init); |
| |
| poolmgr_iterate(executor_cache.entrydb_pool, NULL, |
| (PoolMgrIterateCallback) executormgr_count_executors_num, |
| (PoolIterateArg) &ret); |
| return ret; |
| } |
| |
| QueryExecutor * |
| executormgr_create_executor(void) |
| { |
| QueryExecutor *executor = palloc0(sizeof(QueryExecutor)); |
| |
| INSTR_TIME_SET_ZERO(executor->time_dispatch_begin); |
| INSTR_TIME_SET_ZERO(executor->time_dispatch_end); |
| INSTR_TIME_SET_ZERO(executor->time_connect_begin); |
| INSTR_TIME_SET_ZERO(executor->time_connect_end); |
| INSTR_TIME_SET_ZERO(executor->time_consume_begin); |
| INSTR_TIME_SET_ZERO(executor->time_consume_end); |
| INSTR_TIME_SET_ZERO(executor->time_free_begin); |
| INSTR_TIME_SET_ZERO(executor->time_free_end); |
| |
| return executor; |
| } |
| |
| /* |
| * executormgr_bind_executor_task |
| * For each task in slice, allocate an executor and bind it to task. |
| */ |
| bool |
| executormgr_bind_executor_task(struct DispatchData *data, |
| QueryExecutor *executor, |
| SegmentDatabaseDescriptor *desc, |
| struct DispatchTask *task, |
| struct DispatchSlice *slice) |
| { |
| Assert(desc != NULL); |
| |
| executor->state = QES_UNINIT; |
| executor->desc = desc; |
| executor->state = QES_DISPATCHABLE; |
| executor->health = QEH_NA; |
| |
| /* setup the owner of the executor */ |
| executor->takeovered = false; |
| |
| /* setup payload */ |
| executor->refSlice = slice; |
| executor->refTask = task; |
| |
| /* TODO: set result slot */ |
| executor->refResult = cdbdisp_makeResult(dispatch_get_results(data), |
| executor->desc, |
| dispatch_get_task_identity(task)->slice_id); |
| |
| if (executor->refResult == NULL) |
| return false; |
| |
| /* Transfer any connection errors from segdbDesc. */ |
| executormgr_merge_error(executor); |
| |
| return true; |
| } |
| |
| void |
| executormgr_serialize_executor_state(struct DispatchData *data, |
| QueryExecutor *executor, |
| struct DispatchTask *task, |
| struct DispatchSlice *slice) |
| { |
| executor->identity_msg = SerializeProcessIdentity(dispatch_get_task_identity(task), &executor->identity_msg_len); |
| } |
| |
| void |
| executormgr_unbind_executor_task(struct DispatchData *data, |
| QueryExecutor *executor, |
| struct DispatchTask *task, |
| struct DispatchSlice *slice) |
| { |
| if (executor->state == QES_UNINIT) |
| return; |
| |
| /* Return executors */ |
| TIMING_BEGIN(executor->time_free_begin); |
| if (!executor->takeovered) |
| executormgr_free_executor(executor->desc); |
| executor->state = QES_UNINIT; |
| TIMING_END(executor->time_free_end); |
| } |
| |
| void |
| executormgr_get_statistics(QueryExecutor *executor, |
| instr_time *time_connect_begin, |
| instr_time *time_connect_end, |
| instr_time *time_dispatch_begin, |
| instr_time *time_dispatch_end, |
| instr_time *time_consume_begin, |
| instr_time *time_consume_end, |
| instr_time *time_free_begin, |
| instr_time *time_free_end) |
| { |
| INSTR_TIME_ASSIGN(*time_connect_begin, executor->time_connect_begin); |
| INSTR_TIME_ASSIGN(*time_connect_end, executor->time_connect_end); |
| INSTR_TIME_ASSIGN(*time_dispatch_begin, executor->time_dispatch_begin); |
| INSTR_TIME_ASSIGN(*time_dispatch_end, executor->time_dispatch_end); |
| INSTR_TIME_ASSIGN(*time_consume_begin, executor->time_consume_begin); |
| INSTR_TIME_ASSIGN(*time_consume_end, executor->time_consume_end); |
| INSTR_TIME_ASSIGN(*time_free_begin, executor->time_free_begin); |
| INSTR_TIME_ASSIGN(*time_free_end, executor->time_free_end); |
| } |
| |
| void |
| executormgr_get_executor_connection_info(QueryExecutor *executor, |
| char **address, int *port, int *myPort, int *pid) |
| { |
| if (address) |
| *address = pstrdup(executor->desc->segment->hostip); |
| |
| if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP) |
| *port = (executor->desc->motionListener >> 16) & 0x0ffff; |
| else |
| *port = (executor->desc->motionListener & 0x0ffff); |
| |
| *myPort = (executor->desc->my_listener & 0x0ffff); |
| |
| *pid = executor->desc->backendPid; |
| } |
| |
| int |
| executormgr_get_executor_slice_id(QueryExecutor *executor) |
| { |
| return dispatch_get_task_identity(executor->refTask)->slice_id; |
| } |
| |
| /* |
| * executormgr_is_stop |
| */ |
| bool |
| executormgr_is_stop(QueryExecutor *executor) |
| { |
| return executor->state == QES_STOP; |
| } |
| |
| bool |
| executormgr_has_error(QueryExecutor *executor) |
| { |
| return executor->health == QEH_ERROR; |
| } |
| |
| static struct CdbDispatchResult * |
| executormgr_get_executor_result(QueryExecutor *executor) |
| { |
| return executor->refResult; |
| } |
| |
| int |
| executormgr_get_segment_ID(QueryExecutor *executor) |
| { |
| Segment *seg = executor->desc->segment; |
| |
| /* For segment only */ |
| if (seg->master || seg->standby) |
| return -1; |
| |
| return seg->ID; |
| } |
| |
| int |
| executormgr_get_fd(QueryExecutor *executor) |
| { |
| return PQsocket(executor->desc->conn); |
| } |
| |
| bool |
| executormgr_cancel(QueryExecutor *executor) |
| { |
| PGconn *conn = executor->desc->conn; |
| PGcancel *cn; |
| char errbuf[EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE]; |
| bool success; |
| |
| cn = PQgetCancel(conn); |
| MemSet(errbuf, 0, sizeof(errbuf)); |
| |
| success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0); |
| if(!success){ |
| write_log("executormgr_cancel cancel failed, %s.", errbuf); |
| } |
| |
| #if 0 |
| if (success) |
| { |
| executor->state = QES_STOP; |
| executor->health = QEH_CANCEL; |
| } |
| else |
| { |
| /* TODO: log error? how to deal with connection error. */ |
| executormgr_catch_error(executor); |
| } |
| #endif |
| |
| { |
| write_log("function executormgr_cancel calling executormgr_catch_error"); |
| executormgr_catch_error(executor); |
| } |
| PQfreeCancel(cn); |
| return success; |
| } |
| |
| static bool |
| executormgr_validate_conn(PGconn *conn) |
| { |
| if (conn == NULL) |
| return false; |
| if (!dispatch_validate_conn(conn->sock)) |
| { |
| printfPQExpBuffer(&conn->errorMessage, |
| libpq_gettext( |
| "server closed the connection unexpectedly\n" |
| "\tThis probably means the server terminated abnormally\n" |
| "\tbefore or while processing the request.\n")); |
| conn->status = CONNECTION_BAD; |
| closesocket(conn->sock); |
| conn->sock = -1; |
| return false; |
| } |
| return true; |
| } |
| |
| /* |
| * executormgr_is_dispatchable |
| * Return the true iff executor can receive query. |
| */ |
| static bool |
| executormgr_is_dispatchable(QueryExecutor *executor) |
| { |
| PGconn *conn = executor->desc->conn; |
| |
| Assert(executor->state == QES_DISPATCHABLE); |
| |
| if (PQisBusy(conn)) |
| { |
| /* TODO: dead code? */ |
| if (!executormgr_discard(executor) && PQisBusy(conn)) |
| return false; |
| } |
| |
| if (!executormgr_validate_conn(conn) || PQstatus(conn) == CONNECTION_BAD) |
| { |
| write_log("function %s meets error, connection is bad.", __func__); |
| executormgr_catch_error(executor); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /* |
| * executormgr_dispatch_and_run |
| * Dispatch data and run the query. |
| */ |
| bool |
| executormgr_dispatch_and_run(struct DispatchData *data, QueryExecutor *executor) |
| { |
| PGconn *conn = executor->desc->conn; |
| char *query = NULL; |
| int query_len; |
| DispatchCommandQueryParms *parms = dispatcher_get_QueryParms(data); |
| |
| if (!executormgr_is_dispatchable(executor)) |
| goto error; |
| |
| TIMING_BEGIN(executor->time_dispatch_begin); |
| query = PQbuildGpQueryString(parms->strCommand, parms->strCommandlen, |
| parms->serializedQuerytree, parms->serializedQuerytreelen, |
| parms->serializedPlantree, parms->serializedPlantreelen, |
| parms->serializedParams, parms->serializedParamslen, |
| parms->serializedSliceInfo, parms->serializedSliceInfolen, |
| NULL, 0, |
| executor->identity_msg, executor->identity_msg_len, |
| parms->serializedQueryResource, parms->serializedQueryResourcelen, |
| parms->serializedCommonPlan, parms->serializedCommonPlanLen, |
| 0, |
| gp_command_count, |
| executormgr_get_executor_slice_id(executor), |
| parms->rootIdx, |
| parms->seqServerHost, parms->seqServerHostlen, parms->seqServerPort, |
| parms->primary_gang_id, |
| GetCurrentStatementStartTimestamp(), |
| GetSessionUserId(), /* For external tools who want this info on segments. */ |
| IsAuthenticatedUserSuperUser(), |
| BOOTSTRAP_SUPERUSERID, |
| true, |
| BOOTSTRAP_SUPERUSERID, |
| &query_len); |
| |
| if (PQsendGpQuery_shared(conn, query, query_len) == 0) |
| goto error; |
| |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| write_log("The time after dispatching : %s to conn %s, isNonBlocking %d, number of chars waiting" |
| "in buffer %d", query, conn->gpqeid, PQisnonblocking(conn), conn->outCount); |
| write_log("Size to dispatch: %.3f KB", (double)query_len / 1024); |
| } |
| |
| TIMING_END(executor->time_dispatch_end); |
| free(query); |
| executor->state = QES_RUNNING; |
| executor->health = QEH_GOOD; |
| executor->refResult->hasDispatched = true; |
| |
| return true; |
| |
| error: |
| free(query); |
| executormgr_catch_error(executor); |
| return false; |
| } |
| |
| bool |
| executormgr_check_segment_status(QueryExecutor *executor) |
| { |
| /* |
| * Cancel the query if a segment is down. QEs could hang in interconnect |
| * until timeout when one segment is down. This will cause QD keep polling |
| * until QE timeout. |
| */ |
| int ID = executormgr_get_segment_ID(executor); |
| |
| if (ID >= 0 && IsSegmentDown(ID)) |
| { |
| cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1, |
| executormgr_get_executor_result(executor)); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void |
| executormgr_seterrcode_if_needed(QueryExecutor *executor) |
| { |
| struct CdbDispatchResult *dispatchResult; |
| |
| if (executor == NULL) |
| return; |
| |
| dispatchResult = executormgr_get_executor_result(executor); |
| if (dispatchResult->errcode == ERRCODE_SUCCESSFUL_COMPLETION || |
| dispatchResult->errcode == ERRCODE_INTERNAL_ERROR) |
| { |
| cdbdisp_seterrcode(ERRCODE_INTERNAL_ERROR, -1, dispatchResult); |
| } |
| } |
| |
| /* |
| * executormgr_consume |
| * If there are data available for executor, use this interface to consume data. |
| * Return false if there is an error. Need to check executor state if returns |
| * true. |
| */ |
| bool |
| executormgr_consume(QueryExecutor *executor) |
| { |
| PGconn *conn = executor->desc->conn; |
| int rc; |
| bool done = false; |
| |
| TIMING_BEGIN(executor->time_consume_begin); |
| CdbDispatchResult *resultSlot = executormgr_get_executor_result(executor); |
| |
| if ((rc = PQconsumeInput(conn)) == 0) |
| goto connection_error; |
| |
| while (!PQisBusy(conn)) |
| { |
| PGresult *result; |
| ExecStatusType status_type; |
| int result_index; |
| |
| if (PQstatus(conn) == CONNECTION_BAD) |
| goto connection_error; |
| result_index = cdbdisp_numPGresult(resultSlot); |
| result = PQgetResult(conn); |
| |
| /* Normal command finished */ |
| if (!result) |
| { |
| done = true; |
| break; |
| } |
| |
| /* Transfer the result to resultSlot, so we don't need to cleanup! */ |
| cdbdisp_appendResult(resultSlot, result); |
| |
| status_type = PQresultStatus(result); |
| if (status_type == PGRES_COMMAND_OK || |
| status_type == PGRES_TUPLES_OK || |
| status_type == PGRES_COPY_IN || |
| status_type == PGRES_COPY_OUT) |
| { |
| resultSlot->okindex = result_index; |
| if (result->numRejected > 0) |
| resultSlot->numrowsrejected += result->numRejected; |
| if (status_type == PGRES_COPY_IN || status_type == PGRES_COPY_OUT) |
| { |
| done = true; |
| break; |
| } |
| } |
| else |
| { |
| char *sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE); |
| int errcode = 0; |
| |
| /* TODO: error? */ |
| if (sqlstate && strlen(sqlstate) == 5) |
| errcode = cdbdisp_sqlstate_to_errcode(sqlstate); |
| cdbdisp_seterrcode(errcode, result_index, resultSlot); |
| goto connection_error; |
| } |
| } |
| |
| if (done) |
| executor->state = QES_STOP; |
| |
| TIMING_END(executor->time_consume_end); |
| return true; |
| |
| connection_error: |
| /* Let caller deal with connection error. */ |
| write_log("function %s meets error, connection is bad.", __func__); |
| executormgr_catch_error(executor); |
| return false; |
| } |
| |
| /* |
| * executormgr_discard |
| * Discard the useless results in a executor. |
| */ |
| bool |
| executormgr_discard(QueryExecutor *executor) |
| { |
| PGresult *result; |
| PGconn *conn = executor->desc->conn; |
| bool ret; |
| |
| result = PQgetResult(conn); |
| ret = result != NULL; |
| PQclear(result); |
| |
| return ret; |
| } |
| |
| /* |
| * executormgr_catch_error |
| * Currently, only connection error is an executor error. |
| */ |
| static void |
| executormgr_catch_error(QueryExecutor *executor) |
| { |
| PGconn *conn = executor->desc->conn; |
| char *msg; |
| int errCode = 0; |
| if (executor->refResult->errcode != 0) |
| errCode = executor->refResult->errcode; |
| |
| msg = PQerrorMessage(conn); |
| |
| /* In order to avoid executor errcode != 0, but executor errormsg is null */ |
| // if (msg && (strcmp("", msg) != 0) && (executor->refResult->errcode == 0)) { |
| if (msg && (strcmp("", msg) != 0)) { |
| if (executor->refResult->errcode == 0) |
| errCode = ERRCODE_GP_INTERCONNECTION_ERROR; |
| |
| PQExpBufferData selfDesc; |
| initPQExpBuffer(&selfDesc); |
| appendPQExpBuffer(&selfDesc, "(seg%d %s:%d)", |
| executor->desc->segment->segindex, |
| executor->desc->segment->hostname, |
| executor->desc->segment->port); |
| |
| if (!executor->refResult->error_message) { |
| cdbdisp_appendMessage( |
| executor->refResult, LOG, errCode, |
| "%s %s: %s", |
| (executor->state == QES_DISPATCHABLE ? |
| "Error dispatching to" : |
| (executor->state == QES_RUNNING ? |
| "Query Executor Error in" : "Error in ")), |
| (executor->desc->whoami && strcmp(executor->desc->whoami, "") != 0) ? |
| executor->desc->whoami : selfDesc.data, |
| msg ? msg : "unknown error"); |
| } |
| termPQExpBuffer(&selfDesc); |
| executor->health = QEH_ERROR; // there is one error collected |
| } |
| else |
| { |
| executor->health = QEH_CANCEL; // no error found, set as normal cancel |
| } |
| |
| PQfinish(conn); |
| executor->desc->conn = NULL; |
| executor->state = QES_STOP; |
| } |
| |
| void |
| executormgr_merge_error(QueryExecutor *executor) |
| { |
| if (executor->state == QES_UNINIT) |
| return; |
| if (executormgr_is_executor_error(executor)) |
| cdbdisp_mergeConnectionErrors(executor->refResult, executor->desc); |
| } |
| |
| void |
| executormgr_merge_error_for_dispatcher( |
| QueryExecutor *executor, int *errHostSize, |
| int *errNum, char ***errHostInfo) |
| { |
| if (executor->state == QES_UNINIT) |
| return; |
| int errCode = executor->refResult->errcode; |
| if (ERRCODE_GP_INTERCONNECTION_ERROR == errCode |
| || ERRCODE_ADMIN_SHUTDOWN == errCode) { |
| // add host to *errHostInfo |
| char *host = executor->desc->segment->hostname; |
| int hostNameLen = strlen(host); |
| (*errHostInfo)[*errNum] = (char*)palloc0(hostNameLen + 1); |
| strcpy((*errHostInfo)[*errNum], host); |
| (*errNum)++; |
| if(*errNum == *errHostSize) { |
| (*errHostSize) *= 2; |
| (*errHostInfo) = (char**) repalloc((*errHostInfo), |
| *errHostSize * sizeof(char *)); |
| } |
| } |
| if (executormgr_is_executor_error(executor)) { |
| cdbdisp_mergeConnectionErrors(executor->refResult, executor->desc); |
| } |
| } |
| |
| bool |
| executormgr_is_executor_error(QueryExecutor *executor) { |
| if (executor->desc->errcode || executor->desc->error_message.len) |
| return true; |
| else |
| return false; |
| } |
| |
| bool |
| executormgr_is_executor_valid(QueryExecutor *executor) { |
| return executor->desc != NULL; |
| } |
| |
| SegmentDatabaseDescriptor * |
| executormgr_takeover_segment_conns(QueryExecutor *executor) |
| { |
| executor->takeovered = true; |
| executor_cache.takeover_num++; |
| executor_cache.allocated_num--; |
| |
| return executor->desc; |
| } |
| |
| void |
| executormgr_free_takeovered_segment_conn(SegmentDatabaseDescriptor *desc) |
| { |
| executor_cache.takeover_num--; |
| executor_cache.allocated_num++; |
| executormgr_free_executor(desc); |
| } |
| |
| static SegmentDatabaseDescriptor * |
| executormgr_allocate_any_executor(bool is_writer, bool is_entrydb) |
| { |
| // get executor from pool and check whether the connection is valid, keep |
| // running until finding a valid one or the pool becomes NULL |
| struct PoolMgrState *executor_pool = |
| is_entrydb ? executor_cache.entrydb_pool : executor_cache.pool; |
| SegmentDatabaseDescriptor *desc = poolmgr_get_random_item(executor_pool); |
| while (desc != NULL && !executormgr_validate_conn(desc->conn)) { |
| desc = poolmgr_get_random_item(executor_pool); |
| } |
| return desc; |
| } |
| |
| static SegmentDatabaseDescriptor * |
| executormgr_allocate_executor_by_name(const char *name, bool is_writer) |
| { |
| // get executor from pool and check whether the connection is valid, keep |
| // running until finding a valid one or the pool becomes NULL |
| SegmentDatabaseDescriptor *desc = |
| poolmgr_get_item_by_name(executor_cache.pool, name); |
| while (desc != NULL && !executormgr_validate_conn(desc->conn)) { |
| desc = poolmgr_get_item_by_name(executor_cache.pool, name); |
| } |
| return desc; |
| } |
| |
| /* |
| * executormgr_allocate_executor |
| * Allocate an executor for specific slice/task. |
| */ |
| SegmentDatabaseDescriptor * |
| executormgr_allocate_executor(Segment *segment, bool is_writer, bool is_entrydb) |
| { |
| SegmentDatabaseDescriptor *ret; |
| |
| if (is_entrydb || (segment != NULL && segment->master)) |
| ret = executormgr_allocate_any_executor(is_writer, true); |
| else if (segment == NULL) |
| ret = executormgr_allocate_any_executor(is_writer, false); |
| else |
| ret = executormgr_allocate_executor_by_name(GetSegmentHashKey(segment), is_writer); |
| if (!ret) |
| return NULL; |
| |
| executor_cache.allocated_num++; |
| executor_cache.cached_num--; |
| return ret; |
| } |
| |
| /* |
| * executormgr_free_executor |
| * Free an executor. |
| */ |
| void |
| executormgr_free_executor(SegmentDatabaseDescriptor *desc) |
| { |
| executor_cache.allocated_num--; |
| if (!desc->conn) |
| { |
| /* executor has connection error, remove it. */ |
| executormgr_destory(desc); |
| return; |
| } |
| |
| desc->conn->asyncStatus = PGASYNC_IDLE; |
| // pqClearAsyncResult(desc->conn); |
| if (desc->segment->master) |
| poolmgr_put_item(executor_cache.entrydb_pool, GetSegmentHashKey(desc->segment), desc); |
| else |
| poolmgr_put_item(executor_cache.pool, GetSegmentHashKey(desc->segment), desc); |
| |
| executor_cache.cached_num++; |
| } |
| |
| bool |
| executormgr_add_address(SegmentDatabaseDescriptor *segdbDesc, bool skipHost, PQExpBuffer str) |
| { |
| Segment *segment = segdbDesc->segment; |
| |
| /* |
| * If init buffer failed, set segdbDesc.errcode, err_message, conn set null and return, |
| * the caller cdbgang.c will later check the status of segdbDesc and output to user. |
| */ |
| if(str->maxlen == 0) |
| { |
| segdbDesc->errcode = ERRCODE_OUT_OF_MEMORY; |
| appendPQExpBuffer(&segdbDesc->error_message, |
| "Master unable to connect, malloc memory structure failure"); |
| segdbDesc->conn = NULL; |
| return false; |
| } |
| |
| /* |
| * On the master, we must use UNIX domain sockets for security -- as it can |
| * be authenticated. See MPP-15802. |
| */ |
| if (!segment->master && !skipHost) |
| { |
| /* |
| * First we pick the cached hostip if we have it. |
| * |
| * If we don't have a cached hostip, we use the host->address, |
| * if we don't have that we fallback to host->hostname. |
| */ |
| if (segment->hostip != NULL) |
| { |
| appendPQExpBuffer(str, "hostaddr=%s ", segment->hostip); |
| } |
| else if (segment->hostname == NULL) |
| { |
| appendPQExpBufferStr(str, "host='' " ); |
| } |
| else if (isdigit(segment->hostname[0])) |
| { |
| appendPQExpBuffer(str, "hostaddr=%s ", segment->hostname); |
| } |
| else |
| { |
| appendPQExpBuffer(str, "host=%s ", segment->hostname); |
| } |
| } |
| |
| appendPQExpBuffer(str, "port=%u ", segment->port); |
| |
| /* |
| * XXX: PQconnectdb() doesn't handle embedded quotes (') but they can be |
| * in a valid database name. |
| */ |
| if (MyProcPort->database_name) |
| appendPQExpBuffer(str, "dbname='%s' ", MyProcPort->database_name); |
| |
| appendPQExpBuffer(str, "user='%s' ", MyProcPort->user_name); |
| |
| appendPQExpBuffer(str, "connect_timeout=%d ", gp_segment_connect_timeout); |
| |
| appendPQExpBuffer(str, "dboid=%u ", MyProcPort->dboid); |
| appendPQExpBuffer(str, "dbdtsoid=%u ", MyProcPort->dbdtsoid); |
| appendPQExpBuffer(str, "bootstrap_user=%s ", MyProcPort->bootstrap_user); |
| appendPQExpBuffer(str, "encoding=%d ", MyProcPort->encoding); |
| |
| return true; |
| } |
| |
| static bool |
| addOneOption(PQExpBufferData *buffer, struct config_generic * guc) |
| { |
| Assert(guc && (guc->flags & (GUC_GPDB_ADDOPT | GUC_NEW_DISP))); |
| switch (guc->vartype) |
| { |
| case PGC_BOOL: |
| { |
| struct config_bool *bguc = (struct config_bool *) guc; |
| |
| appendPQExpBuffer(buffer, " -c %s=%s", guc->name, |
| *(bguc->variable) ? "true" : "false" |
| ); |
| return true; |
| } |
| case PGC_INT: |
| { |
| struct config_int *iguc = (struct config_int *) guc; |
| |
| appendPQExpBuffer(buffer, " -c %s=%d", guc->name, *iguc->variable); |
| return true; |
| } |
| case PGC_REAL: |
| { |
| struct config_real *rguc = (struct config_real *) guc; |
| |
| appendPQExpBuffer(buffer, " -c %s=%f", guc->name, *rguc->variable); |
| return true; |
| } |
| case PGC_STRING: |
| { |
| struct config_string *sguc = (struct config_string *) guc; |
| const char *str = *sguc->variable; |
| unsigned int j, start, size; |
| char *temp, *new_temp; |
| |
| size = 256; |
| temp = malloc(size + 8); |
| if (temp == NULL) |
| return false; |
| |
| j = 0; |
| for (start = 0; start < strlen(str); ++start) |
| { |
| if (j == size) |
| { |
| size *= 2; |
| new_temp = realloc(temp, size + 8); |
| if (new_temp == NULL) |
| { |
| free(temp); |
| return false; |
| } |
| temp = new_temp; |
| } |
| |
| if (str[start] == ' ') |
| { |
| temp[j++] = '\\'; |
| temp[j++] = '\\'; |
| } else if (str[start] == '"' || str[start] == '\'') |
| temp[j++] = '\\'; |
| |
| temp[j++] = str[start]; |
| } |
| |
| temp[j] = '\0'; |
| appendPQExpBuffer(buffer, " -c %s=%s", guc->name, temp); |
| free(temp); |
| |
| return true; |
| } |
| } |
| |
| Assert(!"Invalid guc var type"); |
| return false; |
| } |
| |
| bool executormgr_addDispGuc(PQExpBuffer str, bool isSuperUser) { |
| struct config_generic **gucs = get_guc_variables(); |
| int ngucs = get_num_guc_variables(); |
| |
| appendPQExpBufferStr(str, "options='"); |
| |
| for (int i = 0; i < ngucs; ++i) { |
| struct config_generic *guc = gucs[i]; |
| |
| if ((guc->flags & GUC_NEW_DISP) && |
| (guc->context == PGC_USERSET || isSuperUser)) { |
| if (!addOneOption(str, guc)) return false; |
| } |
| } |
| appendPQExpBuffer(str, "' "); |
| |
| return true; |
| } |
| |
| bool |
| executormgr_add_guc(PQExpBuffer str, bool is_superuser) |
| { |
| struct config_generic **gucs = get_guc_variables(); |
| int ngucs = get_num_guc_variables(); |
| int i; |
| |
| appendPQExpBufferStr(str, "options='"); |
| |
| /* |
| * Transactions are tricky. |
| * Here is the copy and pasted code, and we know they are working. |
| * The problem, is that QE may ends up with different iso level, but |
| * postgres really does not have read uncommited and repeated read. |
| * (is this true?) and they are mapped. |
| * |
| * Put these two gucs in the generic framework works (pass make installcheck-good) |
| * if we make assign_defaultxactisolevel and assign_XactIsoLevel correct take |
| * string "readcommitted" etc. (space stripped). However, I do not |
| * want to change this piece of code unless I know it is broken. |
| */ |
| if (DefaultXactIsoLevel == XACT_SERIALIZABLE) |
| appendPQExpBuffer(str, " -c default_transaction_isolation=serializable"); |
| |
| if (XactIsoLevel == XACT_SERIALIZABLE) |
| appendPQExpBuffer(str, " -c transaction_isolation=serializable"); |
| |
| for (i = 0; i < ngucs; ++i) |
| { |
| struct config_generic *guc = gucs[i]; |
| |
| if ((guc->flags & GUC_GPDB_ADDOPT) && |
| (guc->context == PGC_USERSET || is_superuser)) |
| { |
| if (!addOneOption(str, guc)) |
| return false; |
| } |
| } |
| |
| /* Add the default database/user gucs */ |
| if (MyProcPort->override_options.len) |
| { |
| appendPQExpBuffer(str, " %s", MyProcPort->override_options.data); |
| } |
| |
| appendPQExpBuffer(str, "' "); |
| |
| return true; |
| } |
| |
| bool |
| executormgr_add_static_state(PQExpBuffer str, bool is_writer) |
| { |
| char *master_host; |
| int master_port; |
| |
| appendPQExpBuffer(str, "gpqeid=%d;", gp_session_id); |
| |
| #ifdef HAVE_INT64_TIMESTAMP |
| appendPQExpBuffer(str, INT64_FORMAT ";", PgStartTime); |
| #else |
| #ifndef _WIN32 |
| appendPQExpBuffer(str, "%.14a;", PgStartTime); |
| #else |
| appendPQExpBuffer(str, "%g;", PgStartTime); |
| #endif |
| #endif |
| |
| GetMasterAddress(&master_host, &master_port); |
| /* dispatcher host name for external table */ |
| appendPQExpBuffer(str, "%s;", master_host); |
| /* dispatcher host port for external table */ |
| appendPQExpBuffer(str, "%d;", master_port); |
| appendPQExpBuffer(str, "%s;", is_writer ? "true" : "false"); |
| |
| /* change last semicolon to space */ |
| Assert(str->data[str->len - 1] == ';'); |
| str->data[str->len - 1] = ' '; |
| |
| return true; |
| } |
| |
| SegmentDatabaseDescriptor * |
| executormgr_prepare_connect(Segment *segment, bool is_writer) |
| { |
| SegmentDatabaseDescriptor *desc; |
| Segment *long_lived_segment; |
| |
| /* Executor have to exist for a long period. */ |
| long_lived_segment = CopySegment(segment, executor_cache.ctx); |
| desc = MemoryContextAlloc(executor_cache.ctx, sizeof(*desc)); |
| cdbconn_initSegmentDescriptor(desc, long_lived_segment); |
| |
| executor_cache.allocated_num++; |
| return desc; |
| } |
| |
| bool |
| executormgr_connect(SegmentDatabaseDescriptor *desc, QueryExecutor *executor, |
| bool is_writer, bool is_superuser) |
| { |
| PQExpBufferData buffer; |
| |
| /* Build the connection string. */ |
| initPQExpBuffer(&buffer); |
| if (buffer.maxlen == 0) |
| goto error; |
| |
| if (!executormgr_add_address(desc, false, &buffer)) |
| goto error; |
| |
| if (!executormgr_add_guc(&buffer, is_superuser)) |
| goto error; |
| |
| if (!executormgr_add_static_state(&buffer, is_writer)) |
| goto error; |
| |
| TIMING_BEGIN(executor->time_connect_begin); |
| if (!cdbconn_doConnect(desc, buffer.data)) |
| goto error; |
| TIMING_END(executor->time_connect_end); |
| |
| free(buffer.data); |
| return true; |
| |
| error: |
| termPQExpBuffer(&buffer); |
| return false; |
| } |
| |
| static void |
| executormgr_destory(SegmentDatabaseDescriptor *desc) |
| { |
| PQfinish(desc->conn); |
| desc->conn = NULL; |
| cdbconn_termSegmentDescriptor(desc); |
| FreeSegment(desc->segment); |
| pfree(desc); |
| } |
| |
| bool executormgr_has_cached_executor() |
| { |
| return executor_cache.cached_num > 0; |
| } |
| |
| bool executormgr_clean_cached_executor_filter(PoolItem item) |
| { |
| SegmentDatabaseDescriptor *desc = (SegmentDatabaseDescriptor *)item; |
| return desc->conn->asyncStatus == PGASYNC_IDLE; |
| } |
| |
| void executormgr_clean_cached_executor() |
| { |
| /* go through each cached executor */ |
| int cleaned = 0; |
| if (!executor_cache.init) |
| { |
| return; |
| } |
| |
| cleaned = poolmgr_clean(executor_cache.pool, (PoolMgrIterateFilter) executormgr_clean_cached_executor_filter); |
| cleaned += poolmgr_clean(executor_cache.entrydb_pool, (PoolMgrIterateFilter) executormgr_clean_cached_executor_filter); |
| elog(DEBUG5, "cleaned %d idle executors", cleaned); |
| } |