| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * dispatcher.c |
| * Dispatcher is a coordinator between optimizer, wokermgr, tx mgr, and so on. |
| * Dispatcher like the logical dispatcher doing plan stuff and workermgr is like |
| * physical part of dispatcher. But workermgr should still be general to other |
| * module too. |
| * |
| * Dispatcher needs to compute the necessary information that QE needs and |
| * delivery them to the workermgr. |
| * |
| * TODO: QueryContext is not cool that its leve is not clear. ALTER use it |
| * outside of dispatcher and queryDesc use it inside. Have to redesign it. |
| */ |
| |
| #include "postgres.h" |
| #include <pthread.h> |
| #include <limits.h> |
| |
| #include "cdb/dispatcher.h" |
| #include "cdb/dispatcher_mgt.h" |
| #include "cdb/workermgr.h" |
| #include "cdb/executormgr.h" |
| #include "postmaster/identity.h" |
| #include "cdb/cdbdisp.h" /* TODO: should remove */ |
| #include "cdb/cdbgang.h" /* SliceTable & Gang */ |
| #include "cdb/cdbvars.h" /* gp_max_plan_size */ |
| #include "executor/executor.h" /* RootSliceIndex */ |
| #include "cdb/cdbrelsize.h" /* clear_relsize_cache */ |
| #include "utils/memutils.h" /* GetMemoryChunkContext */ |
| #include "cdb/cdbsrlz.h" /* serializeNode */ |
| #include "utils/datum.h" /* datumGetSize */ |
| #include "utils/faultinjector.h" |
| #include "utils/lsyscache.h" /* get_typlenbyval */ |
| #include "miscadmin.h" /* CHECK_FOR_INTERRUPTS */ |
| #include "tcop/tcopprot.h" /* ResetUsage */ |
| #include "portability/instr_time.h" /* Monitor the dispatcher performance */ |
| #include "cdb/cdbdispatchresult.h" /* cdbdisp_makeDispatchResults */ |
| #include "lib/stringinfo.h" /* StringInfoData */ |
| #include "tcop/pquery.h" /* PortalGetResource */ |
| |
| #include "resourcemanager/communication/rmcomm_QD2RM.h" |
| #include "storage/ipc.h" |
| |
| /* Define and structure */ |
| typedef struct DispatchTask |
| { |
| ProcessIdentity id; |
| /* |
| * Information about executor which one is assigned this task. |
| * NULL means match any segment except entry db! |
| */ |
| Segment *segment; |
| |
| bool entrydb; /*whether this task should be dispatched to entrydb*/ |
| } DispatchTask; |
| |
| typedef struct DispatchSlice |
| { |
| int tasks_num; |
| DispatchTask *tasks; |
| } DispatchSlice; |
| |
| typedef struct DispatchMetaData { |
| /* Share Information */ |
| int all_slices_num; /* EXPLAIN needs all slices num to store ANALYZE stats */ |
| int used_slices_num; |
| DispatchSlice *slices; |
| } DispatchMetaData; |
| |
| /* |
| * DispatchState |
| * Change and check state should in a wise way. |
| */ |
| typedef enum DispatchState { |
| DS_INIT, /* Not running */ |
| DS_RUN, /* Running */ |
| DS_OK, /* Running without error */ |
| DS_ERROR, /* Running encounter error */ |
| DS_DONE, /* Cleanup and Error consumed */ |
| } DispatchState; |
| |
| #define dispatcher_set_state_init(data) ((data)->state = DS_INIT) |
| #define dispatcher_set_state_run(data) ((data)->state = DS_RUN) |
| #define dispatcher_set_state_ok(data) ((data)->state = DS_OK) |
| #define dispatcher_set_state_error(data) ((data)->state = DS_ERROR) |
| #define dispatcher_set_state_done(data) ((data)->state = DS_DONE) |
| #define dispatcher_is_state_init(data) ((data)->state == DS_INIT) |
| #define dispatcher_is_state_run(data) ((data)->state == DS_RUN) |
| #define dispatcher_is_state_ok(data) ((data)->state == DS_OK) |
| #define dispatcher_is_state_error(data) ((data)->state == DS_ERROR) |
| #define dispatcher_is_state_done(data) ((data)->state == DS_DONE) |
| |
| #define ERROR_HOST_INIT_SIZE 5 |
| |
| typedef struct DispatchData |
| { |
| DispatchState state; |
| |
| /* Original Information */ |
| struct QueryDesc *queryDesc; |
| int localSliceId; |
| |
| /* Payload */ |
| DispatchCommandQueryParms *pQueryParms; |
| struct SliceTable *sliceTable; |
| |
| /* Resource that we can use. */ |
| bool resource_is_mine; |
| QueryResource *resource; |
| bool dispatch_to_all_cached_executors; /* sync all of executors */ |
| |
| /* Readonly for worker manager */ |
| DispatchMetaData job; |
| |
| /* Communication between dispatcher and worker manager. */ |
| struct QueryExecutorTeam *query_executor_team; |
| struct CdbDispatchResults *results; /* TODO: libpq is ugly, should invent a new connection manager. */ |
| |
| /* Maintained by worker manager. */ |
| void *worker_mgr_state; |
| |
| /* Statistics */ |
| int segment_num; /* the number of segments for this query */ |
| int segment_num_on_entrydb; |
| int num_of_cached_executors; |
| int num_of_new_connected_executors; |
| |
| /* |
| * This is the start time of dispatcher, use last dispatched executor |
| * finished time substract this is one is the total time. |
| */ |
| instr_time time_begin; |
| instr_time time_total; /* time_last_dispatch_end - time_begin */ |
| instr_time time_connect; /* time_last_connect_end - time_first_connect_begin */ |
| instr_time time_dispatch; /* time_last_dispatch_end - time_first_dispatch_begin */ |
| |
| /* Performance Counter: dispatcher side */ |
| instr_time time_first_connect_begin; |
| instr_time time_last_connect_end; |
| instr_time time_first_dispatch_begin; |
| instr_time time_last_dispatch_end; |
| |
| /* Performance Counter: executor side */ |
| instr_time time_max_dispatched; |
| instr_time time_min_dispatched; |
| instr_time time_total_dispatched; |
| |
| instr_time time_max_consumed; |
| instr_time time_min_consumed; |
| instr_time time_total_consumed; |
| |
| instr_time time_max_free; |
| instr_time time_min_free; |
| instr_time time_total_free; |
| int num_of_dispatched; |
| } DispatchData; |
| |
| /* The following two structures used to generailize dispatch statement */ |
| typedef enum DispatchStatementType { |
| DST_STRING, |
| DST_NODE, |
| } DispatchStatementType; |
| |
| typedef struct DispatchStatement { |
| DispatchStatementType type; |
| |
| /* String type: change this to union is better */ |
| const char *string; |
| const char *serializeQuerytree; |
| int serializeLenQuerytree; |
| |
| /* Node type */ |
| struct Node *node; |
| struct QueryContextInfo *ctxt; |
| |
| /* Control tag */ |
| bool dispatch_to_all_cached_executor; |
| } DispatchStatement; |
| |
| /* Global states: DO NOT add global state unless you have to. */ |
| static MemoryContext DispatchDataContext; |
| static int DispatchInitCount = -1; |
| |
| |
| /* Static function */ |
| static DispatchCommandQueryParms *dispatcher_fill_query_param(const char *strCommand, |
| char *serializeQuerytree, |
| int serializeLenQuerytree, |
| char *serializePlantree, |
| int serializeLenPlantree, |
| char *serializeParams, |
| int serializeLenParams, |
| char *serializeSliceInfo, |
| int serializeLenSliceInfo, |
| int rootIdx, |
| QueryResource *resource); |
| static void dispatcher_split_logical_tasks_for_query_desc(DispatchData *data); |
| static void dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data); |
| static bool dispatcher_init_works(DispatchData *data); |
| static bool dispatcher_compute_threads_num(DispatchData *data); |
| static bool dispatch_collect_executors_error(DispatchData * data); |
| static void dispatch_throw_error(DispatchData *data); |
| static void dispatch_init_env(void); |
| static int dispatcher_get_total_query_executors_num(DispatchData *data); |
| |
| |
| static void |
| remove_subquery_in_RTEs(Node *node) |
| { |
| if (node == NULL) |
| { |
| return; |
| } |
| |
| if (IsA(node, RangeTblEntry)) |
| { |
| RangeTblEntry *rte = (RangeTblEntry *)node; |
| if (RTE_SUBQUERY == rte->rtekind && NULL != rte->subquery) |
| { |
| /* |
| * replace subquery with a dummy subquery |
| */ |
| rte->subquery = makeNode(Query); |
| } |
| |
| return; |
| } |
| |
| if (IsA(node, List)) |
| { |
| List *list = (List *) node; |
| ListCell *lc = NULL; |
| foreach(lc, list) |
| { |
| remove_subquery_in_RTEs((Node *) lfirst(lc)); |
| } |
| } |
| } |
| |
| /* should remove */ |
| typedef struct { |
| int sliceIndex; |
| int children; |
| Slice *slice; |
| } sliceVec; |
| static int |
| compare_slice_order(const void *aa, const void *bb) |
| { |
| sliceVec *a = (sliceVec *)aa; |
| sliceVec *b = (sliceVec *)bb; |
| |
| if (a->slice == NULL) |
| return 1; |
| if (b->slice == NULL) |
| return -1; |
| |
| /* sort the writer gang slice first, because he sets the shared snapshot */ |
| if (a->slice->is_writer && !b->slice->is_writer) |
| return -1; |
| else if (b->slice->is_writer && !a->slice->is_writer) |
| return 1; |
| |
| if (a->children == b->children) |
| return 0; |
| else if (a->children > b->children) |
| return 1; |
| else |
| return -1; |
| } |
| static void |
| mark_bit(char *bits, int nth) |
| { |
| int nthbyte = nth >> 3; |
| char nthbit = 1 << (nth & 7); |
| bits[nthbyte] |= nthbit; |
| } |
| static void |
| or_bits(char* dest, char* src, int n) |
| { |
| int i; |
| |
| for(i=0; i<n; i++) |
| dest[i] |= src[i]; |
| } |
| |
| static int |
| count_bits(char* bits, int nbyte) |
| { |
| int i; |
| int nbit = 0; |
| int bitcount[] = { |
| 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4 |
| }; |
| |
| for(i=0; i<nbyte; i++) |
| { |
| nbit += bitcount[bits[i] & 0x0F]; |
| nbit += bitcount[(bits[i] >> 4) & 0x0F]; |
| } |
| |
| return nbit; |
| } |
| |
| static int markbit_dep_children(SliceTable *sliceTable, int sliceIdx, sliceVec *sliceVec, int bitmasklen, char* bits) |
| { |
| ListCell *sublist; |
| Slice *slice = (Slice *) list_nth(sliceTable->slices, sliceIdx); |
| |
| foreach(sublist, slice->children) |
| { |
| int childIndex = lfirst_int(sublist); |
| char *newbits = palloc0(bitmasklen); |
| |
| markbit_dep_children(sliceTable, childIndex, sliceVec, bitmasklen, newbits); |
| or_bits(bits, newbits, bitmasklen); |
| mark_bit(bits, childIndex); |
| pfree(newbits); |
| } |
| |
| sliceVec[sliceIdx].sliceIndex = sliceIdx; |
| sliceVec[sliceIdx].children = count_bits(bits, bitmasklen); |
| sliceVec[sliceIdx].slice = slice; |
| |
| return sliceVec[sliceIdx].children; |
| } |
| static int |
| count_dependent_children(SliceTable * sliceTable, int sliceIndex, sliceVec *sliceVector, int len) |
| { |
| int ret = 0; |
| int bitmasklen = (len+7) >> 3; |
| char *bitmask = palloc0(bitmasklen); |
| |
| ret = markbit_dep_children(sliceTable, sliceIndex, sliceVector, bitmasklen, bitmask); |
| pfree(bitmask); |
| |
| return ret; |
| } |
| static int |
| fillSliceVector(SliceTable *sliceTbl, int rootIdx, sliceVec *sliceVector, int sliceLim) |
| { |
| int top_count; |
| |
| /* count doesn't include top slice add 1 */ |
| top_count = 1 + count_dependent_children(sliceTbl, rootIdx, sliceVector, sliceLim); |
| |
| qsort(sliceVector, sliceLim, sizeof(sliceVec), compare_slice_order); |
| |
| return top_count; |
| } |
| |
| static void |
| aggregateQueryResource(QueryResource *queryRes) |
| { |
| MemoryContext old; |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| |
| if (queryRes && |
| queryRes->segments && |
| (list_length(queryRes->segments) > 0)) |
| { |
| ListCell *lc1 = NULL; |
| ListCell *lc2 = NULL; |
| Segment *seg1 = NULL; |
| Segment *seg2 = NULL; |
| int nseg = 0; |
| int i = 0; |
| |
| queryRes->numSegments = list_length(queryRes->segments); |
| queryRes->segment_vcore_agg = NULL; |
| queryRes->segment_vcore_agg = palloc0(sizeof(int) * queryRes->numSegments); |
| queryRes->segment_vcore_writer = palloc0(sizeof(int) * queryRes->numSegments); |
| |
| foreach (lc1, queryRes->segments) |
| { |
| int j = 0; |
| nseg = 0; |
| seg1 = (Segment *) lfirst(lc1); |
| queryRes->segment_vcore_writer[i] = i; |
| foreach (lc2, queryRes->segments) |
| { |
| seg2 = (Segment *) lfirst(lc2); |
| if (strcmp(seg1->hostname, seg2->hostname) == 0) |
| { |
| nseg++; |
| if (j < queryRes->segment_vcore_writer[i]) |
| { |
| queryRes->segment_vcore_writer[i] = j; |
| } |
| } |
| j++; |
| } |
| |
| queryRes->segment_vcore_agg[i++] = nseg; |
| } |
| } |
| |
| MemoryContextSwitchTo(old); |
| } |
| |
| static void |
| dispatch_init_env(void) |
| { |
| ++DispatchInitCount; |
| if (DispatchDataContext) { |
| if (DispatchInitCount == 0) |
| MemoryContextResetAndDeleteChildren(DispatchDataContext); |
| return; |
| } |
| DispatchDataContext = AllocSetContextCreate(TopMemoryContext, |
| "Dispatch Data Context", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| executormgr_setup_env(TopMemoryContext); |
| } |
| |
| |
| /* Implementation TODO: Need Modify resource parameter */ |
| DispatchData * |
| initialize_dispatch_data(QueryResource *resource, bool dispatch_to_all_cached_executors) |
| { |
| DispatchData *dispatch_data; |
| MemoryContext old; |
| |
| dispatch_init_env(); |
| |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| dispatch_data = palloc0(sizeof(DispatchData)); |
| MemoryContextSwitchTo(old); |
| dispatcher_set_state_init(dispatch_data); |
| |
| dispatch_data->dispatch_to_all_cached_executors = dispatch_to_all_cached_executors; |
| |
| if (dispatch_to_all_cached_executors) |
| { |
| /* |
| * If one statement needs to be dispatched to all cached |
| * executors, resource is useless. |
| */ |
| Assert(resource == NULL); |
| dispatch_data->segment_num = executormgr_get_cached_executor_num(); |
| dispatch_data->segment_num_on_entrydb = executormgr_get_cached_executor_num_onentrydb(); |
| } |
| else |
| { |
| Assert(resource); |
| |
| dispatch_data->resource_is_mine = false; |
| dispatch_data->resource = resource; |
| dispatch_data->segment_num = list_length(resource->segments); |
| } |
| |
| INSTR_TIME_SET_ZERO(dispatch_data->time_max_dispatched); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_min_dispatched); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_total_dispatched); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_max_consumed); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_min_consumed); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_total_consumed); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_max_free); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_min_free); |
| INSTR_TIME_SET_ZERO(dispatch_data->time_total_free); |
| return dispatch_data; |
| } |
| |
| /* |
| * prepare_dispatch_query_desc |
| * Have to generate logical splits of the plan. |
| */ |
| void |
| prepare_dispatch_query_desc(DispatchData *data, |
| struct QueryDesc *queryDesc) |
| { |
| char *splan, |
| *sparams; |
| |
| int splan_len, |
| splan_len_uncompressed, |
| sparams_len; |
| |
| SliceTable *sliceTbl; |
| int rootIdx; |
| PlannedStmt *stmt; |
| bool is_SRI; |
| |
| data->queryDesc = queryDesc; |
| |
| Assert(queryDesc != NULL && queryDesc->estate != NULL); |
| |
| /* |
| * Later we'll need to operate with the slice table provided via the |
| * EState structure in the argument QueryDesc. Cache this information |
| * locally and assert our expectations about it. |
| */ |
| sliceTbl = queryDesc->estate->es_sliceTable; |
| rootIdx = RootSliceIndex(queryDesc->estate); |
| |
| Assert(sliceTbl != NULL); |
| Assert(rootIdx == 0 || |
| (rootIdx > sliceTbl->nMotions && rootIdx <= sliceTbl->nMotions + sliceTbl->nInitPlans)); |
| |
| /* |
| * Keep old value so we can restore it. We use this field as a parameter. |
| */ |
| data->localSliceId = sliceTbl->localSlice; |
| |
| /* |
| * This function is called only for planned statements. |
| */ |
| stmt = queryDesc->plannedstmt; |
| Assert(stmt); |
| |
| |
| /* |
| * Let's evaluate STABLE functions now, so we get consistent values on the QEs |
| * |
| * Also, if this is a single-row INSERT statement, let's evaluate |
| * nextval() and currval() now, so that we get the QD's values, and a |
| * consistent value for everyone |
| * |
| */ |
| |
| is_SRI = false; |
| |
| if (queryDesc->operation == CMD_INSERT) |
| { |
| Assert(stmt->commandType == CMD_INSERT); |
| |
| /* We might look for constant input relation (instead of SRI), but I'm afraid |
| * that wouldn't scale. |
| */ |
| is_SRI = IsA(stmt->planTree, Result) && stmt->planTree->lefttree == NULL; |
| } |
| |
| if (!is_SRI) |
| clear_relsize_cache(); |
| |
| if (queryDesc->operation == CMD_INSERT || |
| queryDesc->operation == CMD_SELECT || |
| queryDesc->operation == CMD_UPDATE || |
| queryDesc->operation == CMD_DELETE) |
| { |
| |
| MemoryContext oldContext; |
| |
| oldContext = CurrentMemoryContext; |
| if ( stmt->qdContext ) /* Temporary! See comment in PlannedStmt. */ |
| { |
| oldContext = MemoryContextSwitchTo(stmt->qdContext); |
| } |
| else /* MPP-8382: memory context of plan tree should not change */ |
| { |
| MemoryContext mc = GetMemoryChunkContext(stmt->planTree); |
| oldContext = MemoryContextSwitchTo(mc); |
| } |
| |
| stmt->planTree = (Plan *) exec_make_plan_constant(stmt, is_SRI); |
| |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| /* |
| * MPP-20785: |
| * remove subquery field from RTE's since it is not needed during query |
| * execution, |
| * this is an optimization to reduce size of serialized plan before dispatching |
| */ |
| remove_subquery_in_RTEs((Node *) (queryDesc->plannedstmt->rtable)); |
| |
| /* |
| * serialized plan tree. Note that we're called for a single |
| * slice tree (corresponding to an initPlan or the main plan), so the |
| * parameters are fixed and we can include them in the prefix. |
| */ |
| splan = serializeNode((Node *) queryDesc->plannedstmt, &splan_len, &splan_len_uncompressed); |
| |
| /* compute the total uncompressed size of the query plan for all slices */ |
| int num_slices = queryDesc->plannedstmt->planTree->nMotionNodes + 1; |
| uint64 plan_size_in_kb = ((uint64)splan_len_uncompressed * (uint64)num_slices) / (uint64)1024; |
| |
| elog(DEBUG1, "Query plan size to dispatch: " UINT64_FORMAT "KB", plan_size_in_kb); |
| |
| if (0 < gp_max_plan_size && plan_size_in_kb > gp_max_plan_size) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), |
| (errmsg("Query plan size limit exceeded, current size: " UINT64_FORMAT "KB, max allowed size: %dKB", plan_size_in_kb, gp_max_plan_size), |
| errhint("Size controlled by gp_max_plan_size")))); |
| } |
| |
| Assert(splan != NULL && splan_len > 0 && splan_len_uncompressed > 0); |
| |
| if (queryDesc->params != NULL && queryDesc->params->numParams > 0) |
| { |
| ParamListInfoData *pli; |
| ParamExternData *pxd; |
| StringInfoData parambuf; |
| Size length; |
| int plioff; |
| int32 iparam; |
| |
| /* Allocate buffer for params */ |
| initStringInfo(¶mbuf); |
| |
| /* Copy ParamListInfoData header and ParamExternData array */ |
| pli = queryDesc->params; |
| length = (char *)&pli->params[pli->numParams] - (char *)pli; |
| plioff = parambuf.len; |
| Assert(plioff == MAXALIGN(plioff)); |
| appendBinaryStringInfo(¶mbuf, pli, length); |
| |
| /* Copy pass-by-reference param values. */ |
| for (iparam = 0; iparam < queryDesc->params->numParams; iparam++) |
| { |
| int16 typlen; |
| bool typbyval; |
| |
| /* Recompute pli each time in case parambuf.data is repalloc'ed */ |
| pli = (ParamListInfoData *)(parambuf.data + plioff); |
| pxd = &pli->params[iparam]; |
| |
| /* Does pxd->value contain the value itself, or a pointer? */ |
| get_typlenbyval(pxd->ptype, &typlen, &typbyval); |
| if (!typbyval) |
| { |
| char *s = DatumGetPointer(pxd->value); |
| |
| if (pxd->isnull || |
| !PointerIsValid(s)) |
| { |
| pxd->isnull = true; |
| pxd->value = 0; |
| } |
| else |
| { |
| length = datumGetSize(pxd->value, typbyval, typlen); |
| |
| /* MPP-1637: we *must* set this before we |
| * append. Appending may realloc, which will |
| * invalidate our pxd ptr. (obviously we could |
| * append first if we recalculate pxd from the new |
| * base address) */ |
| pxd->value = Int32GetDatum(length); |
| |
| appendBinaryStringInfo(¶mbuf, &iparam, sizeof(iparam)); |
| appendBinaryStringInfo(¶mbuf, s, length); |
| } |
| } |
| } |
| sparams = parambuf.data; |
| sparams_len = parambuf.len; |
| } |
| else |
| { |
| sparams = NULL; |
| sparams_len = 0; |
| } |
| |
| data->pQueryParms = dispatcher_fill_query_param(queryDesc->sourceText, |
| NULL, 0, |
| splan, splan_len, |
| sparams, sparams_len, |
| NULL, 0, |
| rootIdx, |
| queryDesc->resource); |
| |
| Assert(sliceTbl); |
| Assert(sliceTbl->slices != NIL); |
| |
| data->sliceTable = sliceTbl; |
| |
| dispatcher_split_logical_tasks_for_query_desc(data); |
| } |
| |
| static void |
| prepare_dispatch_statement_string(DispatchData *data, |
| const char *statement, |
| const char *serializeQuerytree, |
| int serializeLenQuerytree) |
| { |
| data->pQueryParms = dispatcher_fill_query_param(statement, |
| (char *) serializeQuerytree, serializeLenQuerytree, |
| NULL, 0, |
| NULL, 0, |
| NULL, 0, |
| 0, |
| data->resource); |
| dispatcher_split_logical_tasks_for_statemet_or_node(data); |
| } |
| |
| static void |
| prepare_dispatch_statement_node(struct DispatchData *data, |
| struct Node *node, |
| struct QueryContextInfo *ctxt) |
| { |
| char *serializedQuerytree; |
| int serializedQuerytree_len; |
| Query *q = makeNode(Query); |
| |
| /* Construct an utility query node. */ |
| q->commandType = CMD_UTILITY; |
| q->utilityStmt = node; |
| q->contextdisp = ctxt; |
| q->querySource = QSRC_ORIGINAL; |
| /* |
| * We must set q->canSetTag = true. False would be used to hide a command |
| * introduced by rule expansion which is not allowed to return its |
| * completion status in the command tag (PQcmdStatus/PQcmdTuples). For |
| * example, if the original unexpanded command was SELECT, the status |
| * should come back as "SELECT n" and should not reflect other commands |
| * inserted by rewrite rules. True means we want the status. |
| */ |
| q->canSetTag = true; |
| serializedQuerytree = serializeNode((Node *) q, &serializedQuerytree_len, NULL /*uncompressed_size*/); |
| |
| data->pQueryParms = dispatcher_fill_query_param(debug_query_string, |
| serializedQuerytree, serializedQuerytree_len, |
| NULL, 0, |
| NULL, 0, |
| NULL, 0, |
| 0, |
| data->resource); |
| |
| dispatcher_split_logical_tasks_for_statemet_or_node(data); |
| } |
| |
| /* |
| * dispatcher_split_logical_tasks_for_query_desc |
| * Have to generate a parallel plan to execute. |
| */ |
| static void |
| dispatcher_split_logical_tasks_for_query_desc(DispatchData *data) |
| { |
| MemoryContext old; |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| |
| sliceVec *sliceVector = NULL; |
| int i; |
| int slice_num; |
| |
| /* |
| * Dispatch order is top-bottom, this will reduce the useless pkts sends |
| * from scan nodes. |
| */ |
| sliceVector = palloc0(list_length(data->sliceTable->slices) * sizeof(*sliceVector)); |
| slice_num = fillSliceVector(data->sliceTable, |
| data->pQueryParms->rootIdx, |
| sliceVector, |
| list_length(data->sliceTable->slices)); |
| |
| data->job.used_slices_num = slice_num; |
| data->job.all_slices_num = list_length(data->sliceTable->slices); |
| |
| data->job.slices = palloc0(data->job.used_slices_num * sizeof(DispatchSlice)); |
| for (i = 0; i < data->job.used_slices_num; i++) |
| { |
| data->job.slices[i].tasks_num = list_length(data->resource->segments); |
| data->job.slices[i].tasks = palloc0(data->job.slices[i].tasks_num * sizeof(DispatchTask)); |
| } |
| |
| for (i = 0; i < slice_num; i++) |
| { |
| Slice *slice = sliceVector[i].slice; |
| bool is_writer = false; |
| |
| is_writer = (i == 0 && !data->queryDesc->extended_query); |
| /* Not all of slices need to dispatch, such as root slice! */ |
| if (slice->gangType == GANGTYPE_UNALLOCATED) |
| { |
| data->job.used_slices_num--; |
| continue; |
| } |
| |
| /* Direct dispatch may not useful in Hawq 2.0. */ |
| if (slice->directDispatch.isDirectDispatch) |
| { |
| DispatchTask *task = &data->job.slices[i].tasks[0]; |
| |
| data->job.slices[i].tasks_num = 1; |
| |
| task->id.id_in_slice = linitial_int(slice->directDispatch.contentIds); |
| task->id.slice_id = slice->sliceIndex; |
| task->id.gang_member_num = list_length(data->resource->segments); |
| task->id.command_count = gp_command_count; |
| task->id.is_writer = is_writer; |
| task->segment = list_nth(data->resource->segments, task->id.id_in_slice); |
| task->id.init = true; |
| |
| } |
| else if (slice->gangSize == 1) |
| { |
| DispatchTask *task = &data->job.slices[i].tasks[0]; |
| |
| /* Run some single executor case. Should merge with direct dispatch! */ |
| data->job.slices[i].tasks_num = 1; |
| |
| /* TODO: I really want to remove the MASTER_CONTENT_ID. */ |
| if (slice->gangType == GANGTYPE_ENTRYDB_READER) |
| { |
| task->id.id_in_slice = MASTER_CONTENT_ID; |
| task->segment = data->resource->master; |
| } |
| else |
| { |
| task->id.id_in_slice = 0; |
| task->segment = linitial(data->resource->segments); |
| } |
| |
| task->id.slice_id = slice->sliceIndex; |
| task->id.gang_member_num = list_length(data->resource->segments); |
| task->id.command_count = gp_command_count; |
| task->id.is_writer = is_writer; |
| task->id.init = true; |
| } |
| else |
| { |
| int j; |
| |
| /* Assign the id from 0. */ |
| for (j = 0; j < data->job.slices[i].tasks_num; j++) |
| { |
| DispatchTask *task = &data->job.slices[i].tasks[j]; |
| |
| task->id.id_in_slice = j; |
| task->id.slice_id = slice->sliceIndex; |
| task->id.gang_member_num = list_length(data->resource->segments); |
| task->id.command_count = gp_command_count; |
| task->id.is_writer = is_writer; |
| task->segment = list_nth(data->resource->segments, task->id.id_in_slice); |
| task->id.init = true; |
| } |
| } |
| } |
| |
| pfree(sliceVector); |
| |
| MemoryContextSwitchTo(old); |
| } |
| |
| /* |
| * dispatcher_split_logical_tasks_for_statemet_or_node |
| */ |
| static void |
| dispatcher_split_logical_tasks_for_statemet_or_node(DispatchData *data) |
| { |
| MemoryContext old; |
| int i; |
| int segment_num, segment_num_entrydb; |
| |
| segment_num = data->segment_num; |
| segment_num_entrydb = data->segment_num_on_entrydb; |
| |
| if ((segment_num == 0) && (segment_num_entrydb == 0)) |
| { |
| return; |
| } |
| |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| |
| data->job.used_slices_num = segment_num_entrydb + 1; |
| data->job.all_slices_num = segment_num_entrydb + 1; |
| data->job.slices = palloc0(data->job.used_slices_num * sizeof(DispatchSlice)); |
| for (i = 0; i < segment_num_entrydb; i++) |
| { |
| /* TODO: where should I store the runtime context ?? */ |
| data->job.slices[i].tasks_num = 1; |
| data->job.slices[i].tasks = palloc0( |
| data->job.slices[i].tasks_num * sizeof(DispatchTask)); |
| DispatchTask *task = &data->job.slices[i].tasks[0]; |
| task->id.slice_id = i; |
| task->id.id_in_slice = MASTER_CONTENT_ID; |
| task->id.gang_member_num = 1; |
| task->id.command_count = gp_command_count; |
| task->id.is_writer = true; |
| /* NULL means ANY segment */ |
| task->segment = NULL; // ?? How to get the master resource |
| task->entrydb = true; |
| task->id.init = true; |
| } |
| for (i = segment_num_entrydb; i < data->job.used_slices_num; i++) |
| { |
| /* TODO: where should I store the runtime context ?? */ |
| data->job.slices[i].tasks_num = segment_num; |
| data->job.slices[i].tasks = palloc0(data->job.slices[i].tasks_num * sizeof(DispatchTask)); |
| } |
| |
| /* Set tasks id for segment. We have only one slice! */ |
| for (i = 0; i < data->job.slices[segment_num_entrydb].tasks_num; i++) |
| { |
| DispatchTask *task = &data->job.slices[segment_num_entrydb].tasks[i]; |
| task->id.slice_id = segment_num_entrydb; |
| task->id.id_in_slice = i; |
| task->id.gang_member_num = segment_num; |
| task->id.command_count = gp_command_count; |
| task->id.is_writer = true; |
| /* NULL means ANY segment */ |
| task->segment = data->dispatch_to_all_cached_executors ? NULL : list_nth(data->resource->segments, i); |
| task->id.init = true; |
| } |
| data->pQueryParms->primary_gang_id = -1; |
| |
| MemoryContextSwitchTo(old); |
| } |
| |
| /* |
| * dispatcher_bind_executor |
| * Bind the dispatcher smallest dispatch data to executor. |
| */ |
| static bool |
| dispatcher_bind_executor(DispatchData *data) |
| { |
| int i, j; |
| struct QueryExecutor *executor; |
| QueryExecutorIterator iterator; |
| List *concurrent_connect_executors = NIL; |
| ListCell *lc; |
| bool ret = true; |
| bool bindExecutorError = false; |
| |
| dispmgt_init_query_executor_iterator(data->query_executor_team, &iterator); |
| for (i = 0; i < data->job.used_slices_num; i++) |
| { |
| DispatchSlice *slice = &data->job.slices[i]; |
| |
| if (bindExecutorError) |
| break; |
| |
| for (j = 0; j < slice->tasks_num; j++) |
| { |
| DispatchTask *task = &slice->tasks[j]; |
| struct SegmentDatabaseDescriptor *desc; |
| |
| executor = dispmgt_get_query_executor_iterator(&iterator, false); |
| desc = executormgr_allocate_executor(task->segment, task->id.is_writer, task->entrydb); |
| |
| if (!desc) |
| { |
| // may have problem for tasks assigned to entrydb for dispatch_statement_node |
| // and dispatch_statement_string |
| if (task->segment != NULL) |
| { |
| struct ConcurrentConnectExecutorInfo *info; |
| |
| info = dispmgt_build_preconnect_info(task->segment, |
| task->id.is_writer, executor, |
| data, slice, task); |
| concurrent_connect_executors = lappend(concurrent_connect_executors, |
| info); |
| data->num_of_new_connected_executors++; |
| if (!executormgr_bind_executor_task(data, executor, info->desc, task, slice)) |
| { |
| bindExecutorError = true; |
| break; |
| } |
| } |
| continue; |
| } |
| |
| #ifdef FAULT_INJECTOR |
| FaultInjector_InjectFaultIfSet( |
| ConnectionFailAfterGangCreation, |
| DDLNotSpecified, |
| "", // databaseName |
| ""); // tableName |
| #endif |
| |
| if (!executormgr_bind_executor_task(data, executor, desc, task, slice)) { |
| bindExecutorError = true; |
| break; |
| } |
| data->num_of_cached_executors++; |
| } |
| } |
| |
| if (data->dispatch_to_all_cached_executors) |
| elog(DEBUG1, "dispatch to all cached executors: expected: %d actual: %d new connected(should be 0): %d", |
| dispatcher_get_total_query_executors_num(data), |
| data->num_of_cached_executors, |
| data->num_of_new_connected_executors); |
| |
| if (concurrent_connect_executors == NIL) |
| return true; |
| |
| if (!bindExecutorError) { |
| ret = dispmgt_concurrent_connect(concurrent_connect_executors, |
| gp_connections_per_thread); |
| } else { |
| ret = false; |
| } |
| |
| foreach(lc, concurrent_connect_executors) |
| dispmgt_free_preconnect_info(lfirst(lc)); |
| list_free(concurrent_connect_executors); |
| |
| return ret; |
| } |
| |
| /* |
| * dispatcher_serialize_state |
| */ |
| static void |
| dispatcher_serialize_state(DispatchData *data) |
| { |
| int i, |
| j; |
| struct QueryExecutor *executor; |
| QueryExecutorIterator iterator; |
| char *sliceInfo; |
| int sliceInfoLen; |
| SliceTable *sliceTable = data->queryDesc ? data->queryDesc->estate->es_sliceTable : NULL; |
| |
| /* Firstly, serialize every executor state. */ |
| dispmgt_init_query_executor_iterator(data->query_executor_team, &iterator); |
| for (i = 0; i < data->job.used_slices_num; i++) |
| { |
| DispatchSlice *slice = &data->job.slices[i]; |
| |
| for (j = 0; j < slice->tasks_num; j++) |
| { |
| DispatchTask *task = &slice->tasks[j]; |
| |
| executor = dispmgt_get_query_executor_iterator(&iterator, true); |
| if (executor == NULL) |
| continue; |
| executormgr_serialize_executor_state(data, executor, task, slice); |
| |
| if (sliceTable) |
| { |
| Slice *plan_slice = list_nth(sliceTable->slices, task->id.slice_id); |
| CdbProcess *proc = makeNode(CdbProcess);//list_nth(plan_slice->primaryProcesses, get_task_id_in_slice(slice, task)); |
| |
| /* Update the real executors information for interconnect. */ |
| executormgr_get_executor_connection_info(executor, &proc->listenerAddr, &proc->listenerPort, &proc->pid); |
| proc->contentid = task->id.id_in_slice; |
| plan_slice->primaryProcesses = lappend(plan_slice->primaryProcesses, proc); |
| } |
| } |
| } |
| |
| /* Ok, we can serialize the global state. */ |
| if (sliceTable) |
| { |
| sliceInfo = serializeNode((Node *) data->queryDesc->estate->es_sliceTable, &sliceInfoLen, NULL /*uncompressed_size*/); |
| data->pQueryParms->serializedSliceInfo = sliceInfo; |
| data->pQueryParms->serializedSliceInfolen = sliceInfoLen; |
| } |
| } |
| |
| /* |
| * dispatcher_serialize_query_resource |
| */ |
| static void |
| dispatcher_serialize_query_resource(DispatchData *data) |
| { |
| char *serializedQueryResource = NULL; |
| int serializedQueryResourcelen = 0; |
| QueryResource * queryResource = data->resource; |
| |
| if (queryResource) |
| { |
| aggregateQueryResource(queryResource); |
| serializedQueryResource = serializeNode(( Node *) queryResource, &serializedQueryResourcelen, NULL); |
| data->pQueryParms->serializedQueryResource = serializedQueryResource; |
| data->pQueryParms->serializedQueryResourcelen = serializedQueryResourcelen; |
| |
| if (queryResource->segment_vcore_agg) |
| { |
| pfree(queryResource->segment_vcore_agg); |
| } |
| if (queryResource->segment_vcore_writer) |
| { |
| pfree(queryResource->segment_vcore_writer); |
| } |
| } |
| } |
| |
| /* |
| * dispatcher_unbind_executor |
| */ |
| static void |
| dispatcher_unbind_executor(DispatchData *data) |
| { |
| int i, |
| j; |
| struct QueryExecutor *executor; |
| QueryExecutorIterator iterator; |
| |
| dispmgt_init_query_executor_iterator(data->query_executor_team, &iterator); |
| for (i = 0; i < data->job.used_slices_num; i++) |
| { |
| DispatchSlice *slice = &data->job.slices[i]; |
| |
| for (j = 0; j < slice->tasks_num; j++) |
| { |
| DispatchTask *task = &slice->tasks[j]; |
| |
| executor = dispmgt_get_query_executor_iterator(&iterator, true); |
| if (executor == NULL) |
| continue; |
| executormgr_unbind_executor_task(data, executor, task, slice); |
| } |
| } |
| } |
| |
| static void |
| dispatcher_update_statistics_per_executor(DispatchData *data, struct QueryExecutor *executor) |
| { |
| instr_time time_connect_begin; |
| instr_time time_connect_end; |
| instr_time time_connect_diff; |
| instr_time time_dispatch_begin; |
| instr_time time_dispatch_end; |
| instr_time time_dispatch_diff; |
| instr_time time_consume_begin; |
| instr_time time_consume_end; |
| instr_time time_consume_diff; |
| instr_time time_free_begin; |
| instr_time time_free_end; |
| instr_time time_free_diff; |
| bool first_time = (data->num_of_dispatched == 0); |
| |
| executormgr_get_statistics(executor, &time_connect_begin, &time_connect_end, |
| &time_dispatch_begin, &time_dispatch_end, |
| &time_consume_begin, &time_consume_end, |
| &time_free_begin, &time_free_end); |
| INSTR_TIME_SET_ZERO(time_connect_diff); |
| INSTR_TIME_SET_ZERO(time_dispatch_diff); |
| INSTR_TIME_SET_ZERO(time_consume_diff); |
| INSTR_TIME_SET_ZERO(time_free_diff); |
| INSTR_TIME_ACCUM_DIFF(time_connect_diff, time_connect_end, time_connect_begin); |
| INSTR_TIME_ACCUM_DIFF(time_dispatch_diff, time_dispatch_end, time_dispatch_begin); |
| INSTR_TIME_ACCUM_DIFF(time_consume_diff, time_consume_end, time_consume_begin); |
| INSTR_TIME_ACCUM_DIFF(time_free_diff, time_free_end, time_free_begin); |
| |
| data->num_of_dispatched++; |
| if (first_time) |
| { |
| /* dispatcher side */ |
| INSTR_TIME_ASSIGN(data->time_first_connect_begin, time_connect_begin); |
| INSTR_TIME_ASSIGN(data->time_last_connect_end, time_connect_end); |
| INSTR_TIME_ASSIGN(data->time_first_dispatch_begin, time_dispatch_begin); |
| INSTR_TIME_ASSIGN(data->time_last_dispatch_end, time_dispatch_end); |
| |
| /* executor side */ |
| INSTR_TIME_ACCUM_DIFF(data->time_total_dispatched, time_dispatch_end, time_dispatch_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_max_dispatched, time_dispatch_end, time_dispatch_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_min_dispatched, time_dispatch_end, time_dispatch_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_total_consumed, time_consume_end, time_consume_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_max_consumed, time_consume_end, time_consume_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_min_consumed, time_consume_end, time_consume_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_total_free, time_free_end, time_free_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_max_free, time_free_end, time_free_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_min_free, time_free_end, time_free_begin); |
| return; |
| } |
| |
| /* dispatch side */ |
| if (data->num_of_new_connected_executors > 0) |
| { |
| if (INSTR_TIME_GREATER_THAN(data->time_first_connect_begin, time_connect_begin)) |
| INSTR_TIME_ASSIGN(data->time_first_connect_begin, time_connect_begin); |
| if (INSTR_TIME_LESS_THAN(data->time_last_connect_end, time_connect_end)) |
| INSTR_TIME_ASSIGN(data->time_last_connect_end, time_connect_end); |
| } |
| if (INSTR_TIME_GREATER_THAN(data->time_first_dispatch_begin, time_dispatch_begin)) |
| INSTR_TIME_ASSIGN(data->time_first_dispatch_begin, time_dispatch_begin); |
| if (INSTR_TIME_LESS_THAN(data->time_last_dispatch_end, time_dispatch_end)) |
| INSTR_TIME_ASSIGN(data->time_last_dispatch_end, time_dispatch_end); |
| |
| /* executor side */ |
| if (INSTR_TIME_LESS_THAN(data->time_max_dispatched, time_dispatch_diff)) |
| INSTR_TIME_ASSIGN(data->time_max_dispatched, time_dispatch_diff); |
| if (INSTR_TIME_GREATER_THAN(data->time_min_dispatched, time_dispatch_diff)) |
| INSTR_TIME_ASSIGN(data->time_min_dispatched, time_dispatch_diff); |
| if (INSTR_TIME_LESS_THAN(data->time_max_consumed, time_consume_diff)) |
| INSTR_TIME_ASSIGN(data->time_max_consumed, time_consume_diff); |
| if (INSTR_TIME_GREATER_THAN(data->time_min_consumed, time_consume_diff)) |
| INSTR_TIME_ASSIGN(data->time_min_consumed, time_consume_diff); |
| if (INSTR_TIME_LESS_THAN(data->time_max_free, time_free_diff)) |
| INSTR_TIME_ASSIGN(data->time_max_free, time_free_diff); |
| if (INSTR_TIME_GREATER_THAN(data->time_min_free, time_free_diff)) |
| INSTR_TIME_ASSIGN(data->time_min_free, time_free_diff); |
| |
| |
| INSTR_TIME_ADD(data->time_total_dispatched, time_dispatch_diff); |
| INSTR_TIME_ADD(data->time_total_consumed, time_consume_diff); |
| INSTR_TIME_ADD(data->time_total_free, time_free_diff); |
| } |
| |
| static void |
| dispatcher_update_statistics(DispatchData *data) |
| { |
| int i, |
| j; |
| struct QueryExecutor *executor; |
| QueryExecutorIterator iterator; |
| |
| dispmgt_init_query_executor_iterator(data->query_executor_team, &iterator); |
| for (i = 0; i < data->job.used_slices_num; i++) |
| { |
| DispatchSlice *slice = &data->job.slices[i]; |
| |
| for (j = 0; j < slice->tasks_num; j++) |
| { |
| executor = dispmgt_get_query_executor_iterator(&iterator, true); |
| if (executor == NULL) |
| continue; |
| dispatcher_update_statistics_per_executor(data, executor); |
| } |
| } |
| |
| INSTR_TIME_ACCUM_DIFF(data->time_total, data->time_last_dispatch_end, data->time_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_connect, data->time_last_connect_end, data->time_first_connect_begin); |
| INSTR_TIME_ACCUM_DIFF(data->time_dispatch, data->time_last_dispatch_end, data->time_first_dispatch_begin); |
| } |
| |
| /* |
| * dispatch_plan |
| * Create corresponding dispatching data structure. It requires caller splits |
| * plan. This function will compute the necessary threads number and dispatch |
| * plan to each executor. |
| */ |
| void |
| dispatch_run(DispatchData *data) |
| { |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| elog(DEBUG1,"The time on entering dispatch_run: %.3f ms", |
| 1000.0 * INSTR_TIME_GET_DOUBLE(time)); |
| } |
| |
| if ((data->segment_num == 0) && (data->segment_num_on_entrydb == 0)) |
| { |
| data->state = DS_DONE; |
| return; |
| } |
| |
| dispatcher_init_works(data); |
| INSTR_TIME_SET_CURRENT(data->time_begin); |
| if (!dispatcher_bind_executor(data)) |
| goto error; |
| |
| #ifdef FAULT_INJECTOR |
| FaultInjector_InjectFaultIfSet( |
| FailQeAfterConnection, |
| DDLNotSpecified, |
| "", // databaseName |
| ""); // tableName |
| #endif |
| |
| /* |
| * Only after we have the executors, we can serialize the state. Or we |
| * don't know the executor listening address. |
| */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| dispatcher_serialize_state(data); |
| dispatcher_serialize_query_resource(data); |
| dispatcher_set_state_run(data); |
| dispmgt_dispatch_and_run(data->worker_mgr_state, data->query_executor_team); |
| |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| elog(DEBUG1,"The time after dispatching all the results: %.3f ms", |
| 1000.0 * INSTR_TIME_GET_DOUBLE(time)); |
| } |
| return; |
| |
| error: |
| dispatcher_set_state_error(data); |
| elog(ERROR, "dispatcher encounter error"); |
| return; |
| } |
| |
| /* |
| * dispatch_wait |
| * Wait the workermgr return and setup error informaton returned from executors. |
| */ |
| void |
| dispatch_wait(DispatchData *data) |
| { |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| elog(DEBUG1,"The time on entering dispatch_wait: %.3f ms", |
| 1000.0 * INSTR_TIME_GET_DOUBLE(time)); |
| } |
| if (dispatcher_is_state_ok(data) || |
| dispatcher_is_state_error(data) || |
| dispatcher_is_state_done(data)) |
| return; |
| |
| workermgr_wait_job(data->worker_mgr_state); |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* Check executors state before return. */ |
| if (dispatch_collect_executors_error(data)) |
| dispatcher_set_state_error(data); |
| else |
| dispatcher_set_state_ok(data); |
| |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| elog(DEBUG1,"The time after collecting all the executor results: %.3f ms", |
| 1000.0 * INSTR_TIME_GET_DOUBLE(time)); |
| } |
| } |
| |
| /* |
| * free_dispatch_data |
| * free the DispatchData allocated resource |
| */ |
| void free_dispatch_data(struct DispatchData *data) { |
| if (data->pQueryParms) { |
| pfree(data->pQueryParms); |
| data->pQueryParms = NULL; |
| } |
| if (data->job.slices) { |
| for (int i = 0; i < data->job.used_slices_num; ++i) { |
| if (data->job.slices[i].tasks) { |
| pfree(data->job.slices[i].tasks); |
| data->job.slices[i].tasks = NULL; |
| } |
| } |
| pfree(data->job.slices); |
| data->job.slices = NULL; |
| } |
| if (data->query_executor_team) { |
| QueryExecutorGroup *qeGrp = data->query_executor_team->query_executor_groups; |
| if (qeGrp) { |
| if (qeGrp->fds) { |
| pfree(qeGrp->fds); |
| qeGrp->fds = NULL; |
| } |
| for (int j = 0; j < qeGrp->query_executor_num; ++j) { |
| pfree(qeGrp->query_executors[j]); |
| qeGrp->query_executors[j] = NULL; |
| } |
| } |
| pfree(data->query_executor_team); |
| data->query_executor_team = NULL; |
| } |
| pfree(data); |
| } |
| |
| /* |
| * dispatch_end_env |
| * accompanied with dispatch_init_env |
| */ |
| void dispatch_end_env(struct DispatchData *data) { |
| Assert(data != NULL); |
| |
| --DispatchInitCount; |
| |
| cdbdisp_destroyDispatchResults(data->results); |
| data->results = NULL; |
| dispatcher_set_state_done(data); |
| |
| PG_TRY(); |
| { |
| dispatcher_unbind_executor(data); |
| if (data->resource_is_mine) |
| { |
| FreeResource(data->resource); |
| data->resource_is_mine = false; |
| } |
| } |
| PG_CATCH(); |
| { |
| free_dispatch_data(data); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| free_dispatch_data(data); |
| } |
| |
| /* |
| * dispatch_cleanup |
| * Cleanup the workermgr. Report error if something happended on executors. |
| */ |
| void |
| dispatch_cleanup(DispatchData *data) |
| { |
| /* to avoid duplicate dispatch cleanup */ |
| if (data == NULL) return; |
| if (dispatcher_is_state_done(data)) return; |
| |
| /* |
| * We should not get here when dispatcher hit an exception. But |
| * executors may have some troubles. |
| */ |
| if (dispatcher_is_state_error(data)) |
| { |
| /* We cannot unbind executors until we retrieve error message! */ |
| if (!proc_exit_inprogress) { |
| dispatch_throw_error(data); |
| Assert(false); |
| return; /* should not hit */ |
| } |
| |
| } |
| |
| dispatch_end_env(data); |
| } |
| |
| /* |
| * dispatch_catch_error |
| * Cancel the workermgr work and report error based on bug on dispatcher or |
| * executors. |
| */ |
| void |
| dispatch_catch_error(DispatchData *data) |
| { |
| int qderrcode = elog_geterrcode(); |
| bool useQeError = false; |
| |
| /* Init state means no thread and data is not correct! */ |
| if (!dispatcher_is_state_init(data)) |
| { |
| workermgr_cancel_job(data->worker_mgr_state); |
| |
| /* Have to figure out exception source */ |
| dispatch_collect_executors_error(data); |
| } |
| |
| /* |
| * When a QE stops executing a command due to an error, as a |
| * consequence there can be a cascade of interconnect errors |
| * (usually "sender closed connection prematurely") thrown in |
| * downstream processes (QEs and QD). So if we are handling |
| * an interconnect error, and a QE hit a more interesting error, |
| * we'll let the QE's error report take precedence. |
| */ |
| if (qderrcode == ERRCODE_GP_INTERCONNECTION_ERROR) |
| { |
| bool qd_lost_flag = false; |
| char *qderrtext = elog_message(); |
| |
| if (qderrtext && strcmp(qderrtext, CDB_MOTION_LOST_CONTACT_STRING) == 0) |
| qd_lost_flag = true; |
| |
| if (data->results && data->results->errcode) |
| { |
| if (qd_lost_flag && data->results->errcode == ERRCODE_GP_INTERCONNECTION_ERROR) |
| useQeError = true; |
| else if (data->results->errcode != ERRCODE_GP_INTERCONNECTION_ERROR) |
| useQeError = true; |
| } |
| } |
| |
| if (useQeError) |
| { |
| /* |
| * Throw the QE's error, catch it, and fall thru to return |
| * normally so caller can finish cleaning up. Afterwards |
| * caller must exit via PG_RE_THROW(). |
| */ |
| PG_TRY(); |
| { |
| dispatch_throw_error(data); |
| } |
| PG_CATCH(); |
| {} /* nop; fall thru */ |
| PG_END_TRY(); |
| } |
| |
| dispatch_end_env(data); |
| } |
| |
| void |
| cleanup_dispatch_data(DispatchData *data) |
| { |
| if (data->queryDesc) |
| data->queryDesc->estate->es_sliceTable->localSlice = data->localSliceId; |
| } |
| |
| static void |
| dispatch_statement(DispatchStatement *stmt, QueryResource *resource, DispatchDataResult *result) |
| { |
| volatile DispatchData *data; |
| |
| data = initialize_dispatch_data(resource, stmt->dispatch_to_all_cached_executor); |
| if (stmt->type == DST_STRING) |
| prepare_dispatch_statement_string((DispatchData *) data, |
| stmt->string, |
| stmt->serializeQuerytree, |
| stmt->serializeLenQuerytree); |
| else |
| prepare_dispatch_statement_node((DispatchData *) data, |
| stmt->node, |
| stmt->ctxt); |
| |
| PG_TRY(); |
| { |
| dispatch_run((DispatchData *) data); |
| dispatch_wait((DispatchData *) data); |
| if (dispatcher_is_state_ok(data) && result) |
| { |
| int segment_num = data->segment_num + data->segment_num_on_entrydb; |
| |
| if (segment_num > 0) |
| { |
| initStringInfo(&result->errbuf); |
| |
| result->result = cdbdisp_returnResults(segment_num, data->results, &result->errbuf, &result->numresults); |
| /* cdbdisp_returnResults free the pointer ! */ |
| data->results = NULL; |
| |
| /* Takeover the list of segment connections. */ |
| result->segment_conns = dispmgt_takeover_segment_conns(data->query_executor_team); |
| } |
| } |
| dispatch_cleanup((DispatchData *) data); |
| } |
| PG_CATCH(); |
| { |
| dispatch_wait((DispatchData *) data); |
| dispatch_cleanup((DispatchData *) data); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| } |
| |
| void |
| dispatch_statement_string(const char *string, |
| const char *serializeQuerytree, |
| int serializeLenQuerytree, |
| QueryResource *resource, |
| DispatchDataResult *result, |
| bool sync_on_all_executors) |
| { |
| DispatchStatement stmt; |
| |
| MemSet(&stmt, 0, sizeof(stmt)); |
| stmt.type = DST_STRING; |
| stmt.string = string; |
| stmt.serializeQuerytree = serializeQuerytree; |
| stmt.serializeLenQuerytree = serializeLenQuerytree; |
| stmt.dispatch_to_all_cached_executor = sync_on_all_executors; |
| |
| dispatch_statement(&stmt, resource, result); |
| } |
| |
| /* |
| * dispatch_statement_node |
| */ |
| void |
| dispatch_statement_node(struct Node *node, |
| struct QueryContextInfo *ctxt, |
| QueryResource *resource, |
| DispatchDataResult *result) |
| { |
| DispatchStatement stmt; |
| |
| Assert(resource); |
| MemSet(&stmt, 0, sizeof(stmt)); |
| stmt.type = DST_NODE; |
| stmt.node = node; |
| stmt.ctxt = ctxt; |
| stmt.dispatch_to_all_cached_executor = false; |
| |
| dispatch_statement(&stmt, resource, result); |
| } |
| |
| void |
| dispatch_free_result(DispatchDataResult *result) |
| { |
| if (!result) |
| return; |
| |
| if(result->errbuf.data) |
| { |
| pfree(result->errbuf.data); |
| result->errbuf.data = NULL; |
| } |
| |
| if (result->result) |
| { |
| for (int i = 0; i <= result->numresults; i++){ |
| PQclear(result->result[i]); |
| result->result[i] = NULL; |
| } |
| free(result->result); |
| } |
| |
| dispmgt_free_takeoved_segment_conns(result->segment_conns); |
| } |
| |
| struct CdbDispatchResults * |
| dispatch_get_results(struct DispatchData *data) |
| { |
| return data->results; |
| } |
| |
| int |
| dispatch_get_segment_num(struct DispatchData *data) |
| { |
| return list_length(data->resource->segments); |
| } |
| |
| /* |
| * |
| */ |
| static DispatchCommandQueryParms * |
| dispatcher_fill_query_param(const char *strCommand, |
| char *serializeQuerytree, |
| int serializeLenQuerytree, |
| char *serializePlantree, |
| int serializeLenPlantree, |
| char *serializeParams, |
| int serializeLenParams, |
| char *serializeSliceInfo, |
| int serializeLenSliceInfo, |
| int rootIdx, |
| QueryResource *resource) |
| { |
| DispatchCommandQueryParms *queryParms; |
| Segment *master = NULL; |
| MemoryContext old; |
| |
| if (resource) |
| { |
| master = resource->master; |
| } |
| |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| queryParms = palloc0(sizeof(*queryParms)); |
| MemoryContextSwitchTo(old); |
| |
| queryParms->strCommand = strCommand; |
| queryParms->strCommandlen = strCommand ? strlen(strCommand) + 1 : 0; |
| queryParms->serializedQuerytree = serializeQuerytree; |
| queryParms->serializedQuerytreelen = serializeLenQuerytree; |
| queryParms->serializedPlantree = serializePlantree; |
| queryParms->serializedPlantreelen = serializeLenPlantree; |
| queryParms->serializedParams = serializeParams; |
| queryParms->serializedParamslen = serializeLenParams; |
| queryParms->serializedSliceInfo = serializeSliceInfo; |
| queryParms->serializedSliceInfolen= serializeLenSliceInfo; |
| queryParms->rootIdx = rootIdx; |
| |
| /* sequence server info */ |
| if (master) |
| { |
| queryParms->seqServerHost = pstrdup(master->hostip); |
| queryParms->seqServerHostlen = strlen(master->hostip) + 1; |
| } |
| queryParms->seqServerPort = seqServerCtl->seqServerPort; |
| |
| queryParms->primary_gang_id = 0; /* We are relying on the slice table to provide gang ids */ |
| |
| /* |
| * in hawq, there is no distributed transaction |
| */ |
| queryParms->serializedDtxContextInfo = NULL; |
| queryParms->serializedDtxContextInfolen = 0; |
| |
| return queryParms; |
| } |
| |
| /* |
| * dispatcher_init_works |
| * This function is create all of data structures. |
| */ |
| static bool |
| dispatcher_init_works(DispatchData *data) |
| { |
| MemoryContext old; |
| |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| /* Compute the threads number & */ |
| dispatcher_compute_threads_num(data); |
| /* Create the threads data strucutre. */ |
| data->worker_mgr_state = workermgr_create_workermgr_state(dispmgt_get_group_num(data->query_executor_team)); |
| MemoryContextSwitchTo(old); |
| |
| return true; |
| } |
| |
| static bool |
| dispatcher_compute_threads_num(DispatchData *data) |
| { |
| int threads_num = 1; |
| int query_executors_num; |
| int executors_num_per_thread = gp_connections_per_thread; |
| |
| query_executors_num = dispatcher_get_total_query_executors_num(data); |
| data->results = cdbdisp_makeDispatchResults(query_executors_num, data->job.all_slices_num, false); |
| |
| /* TODO: decide the groups(threads) number. */ |
| if (query_executors_num == 0) |
| threads_num = 1; |
| if (executors_num_per_thread > query_executors_num) |
| threads_num = 1; |
| else |
| threads_num = (query_executors_num / executors_num_per_thread) + ((query_executors_num % executors_num_per_thread == 0) ? 0 : 1); |
| |
| data->query_executor_team = dispmgt_create_dispmgt_state(data, |
| threads_num, |
| query_executors_num, |
| executors_num_per_thread); |
| |
| return true; |
| } |
| |
| DispatchCommandQueryParms * |
| dispatcher_get_QueryParms(DispatchData *data) |
| { |
| return data->pQueryParms; |
| } |
| |
| static int |
| dispatcher_get_total_query_executors_num(DispatchData *data) |
| { |
| int num = 0; |
| int i; |
| |
| for (i = 0; i < data->job.used_slices_num; i++) |
| num += data->job.slices[i].tasks_num; |
| |
| return num; |
| } |
| |
| ProcessIdentity * |
| dispatch_get_task_identity(DispatchTask *task) |
| { |
| return &task->id; |
| } |
| |
| static bool |
| dispatch_collect_executors_error(DispatchData *data) |
| { |
| MemoryContext old; |
| old = MemoryContextSwitchTo(DispatchDataContext); |
| |
| QueryExecutorIterator iterator; |
| struct QueryExecutor *executor; |
| bool ret = false; |
| |
| // the host list of error executors |
| int errHostNum = 0; |
| int errHostSize = ERROR_HOST_INIT_SIZE; |
| char **errHostName = (char**) palloc0(errHostSize * sizeof(char *)); |
| |
| dispmgt_init_query_executor_iterator(data->query_executor_team, &iterator); |
| while ((executor = dispmgt_get_query_executor_iterator(&iterator, true)) != NULL) |
| { |
| executormgr_merge_error_for_dispatcher(executor, &errHostSize, |
| &errHostNum, &errHostName); |
| } |
| |
| if (dispatcher_has_error(data)) { |
| if (errHostNum > 0) |
| sendFailedNodeToResourceManager(errHostNum, errHostName); |
| ret = true; |
| } |
| for (int i = 0; i < errHostNum; i++) { |
| pfree(errHostName[i]); |
| } |
| pfree(errHostName); |
| |
| MemoryContextSwitchTo(old); |
| |
| return ret; |
| } |
| |
| static void |
| dispatch_throw_error(DispatchData *data) |
| { |
| StringInfoData buf; |
| int errorcode = data->results->errcode; |
| |
| /* Format error messages from the primary gang. */ |
| initStringInfo(&buf); |
| cdbdisp_dumpDispatchResults(data->results, &buf, false); |
| |
| /* Too bad, our gang got an error. */ |
| PG_TRY(); |
| { |
| ereport(ERROR, (errcode(errorcode), |
| errOmitLocation(true), |
| errmsg("%s", buf.data))); |
| } |
| PG_CATCH(); |
| { |
| pfree(buf.data); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| } |
| |
| bool |
| dispatcher_has_error(DispatchData *data) |
| { |
| return data->results && data->results->errcode; |
| } |
| |
| void |
| dispatcher_print_statistics(StringInfo buf, DispatchData *data) |
| { |
| if (!data) |
| return; |
| dispatcher_update_statistics(data); |
| if (data->num_of_dispatched == 0) |
| return; |
| appendStringInfo(buf, "Dispatcher statistics:\n"); |
| appendStringInfo(buf, |
| " executors used(total/cached/new connection): (%d/%d/%d);", |
| data->num_of_dispatched, data->num_of_cached_executors, |
| data->num_of_new_connected_executors); |
| appendStringInfo(buf, |
| " dispatcher time(total/connection/dispatch data): (%.3f ms/%.3f ms/%.3f ms).\n", |
| INSTR_TIME_GET_MILLISEC(data->time_total), |
| INSTR_TIME_GET_MILLISEC(data->time_connect), |
| INSTR_TIME_GET_MILLISEC(data->time_dispatch)); |
| appendStringInfo(buf, |
| " dispatch data time(max/min/avg): (%.3f ms/%.3f ms/%.3f ms);", |
| INSTR_TIME_GET_MILLISEC(data->time_max_dispatched), |
| INSTR_TIME_GET_MILLISEC(data->time_min_dispatched), |
| INSTR_TIME_GET_MILLISEC(data->time_total_dispatched) |
| / data->num_of_dispatched); |
| appendStringInfo(buf, |
| " consume executor data time(max/min/avg): (%.3f ms/%.3f ms/%.3f ms);", |
| INSTR_TIME_GET_MILLISEC(data->time_max_consumed), |
| INSTR_TIME_GET_MILLISEC(data->time_min_consumed), |
| INSTR_TIME_GET_MILLISEC(data->time_total_consumed) |
| / data->num_of_dispatched); |
| appendStringInfo(buf, |
| " free executor time(max/min/avg): (%.3f ms/%.3f ms/%.3f ms).\n", |
| INSTR_TIME_GET_MILLISEC(data->time_max_free), |
| INSTR_TIME_GET_MILLISEC(data->time_min_free), |
| INSTR_TIME_GET_MILLISEC(data->time_total_free) / data->num_of_dispatched); |
| } |
| |
| |
| |
| /* |
| * Check the connection from the dispatcher to verify that it is still there. |
| * Return true if the dispatcher connection is still alive. |
| */ |
| bool dispatch_validate_conn(pgsocket sock) |
| { |
| ssize_t ret; |
| char buf; |
| |
| if (sock < 0) |
| return false; |
| |
| #ifndef WIN32 |
| ret = recv(sock, &buf, 1, MSG_PEEK|MSG_DONTWAIT); |
| #else |
| ret = recv(sock, &buf, 1, MSG_PEEK|MSG_PARTIAL); |
| #endif |
| |
| if (ret == 0) /* socket has been closed. EOF */ |
| return false; |
| |
| if (ret > 0) /* data waiting on socket */ |
| { |
| if (buf == 'E') /* waiting data indicates error */ |
| return false; |
| else |
| return true; |
| } |
| if (ret == -1) /* error, or would be block. */ |
| { |
| if (errno == EAGAIN || errno == EINPROGRESS || errno == EWOULDBLOCK) |
| return true; /* connection intact, no data available */ |
| else |
| return false; |
| } |
| /* not reached */ |
| |
| return true; |
| } |