| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "optimizer/newPlanner.h" |
| |
| #include "access/aomd.h" |
| #include "access/fileam.h" |
| #include "access/filesplit.h" |
| #include "access/plugstorage.h" |
| #include "catalog/catquery.h" |
| #include "catalog/pg_exttable.h" |
| #include "catalog/pg_proc.h" |
| #include "cdb/cdbdatalocality.h" |
| #include "cdb/cdbfilesystemcredential.h" |
| #include "cdb/cdbhash.h" |
| #include "cdb/cdbvars.h" |
| #include "commands/tablecmds.h" |
| #include "magma/cwrapper/magma-client-c.h" |
| #include "miscadmin.h" |
| #include "optimizer/planmain.h" |
| #include "optimizer/var.h" |
| #include "parser/parsetree.h" |
| #include "storage/cwrapper/orc-format-c.h" |
| #include "univplan/cwrapper/univplan-c.h" |
| #include "utils/acl.h" |
| #include "utils/builtins.h" |
| #include "utils/guc.h" |
| #include "utils/hawq_funcoid_mapping.h" |
| #include "utils/hawq_type_mapping.h" |
| #include "utils/lsyscache.h" |
| #include "utils/uri.h" |
| |
| const char *new_executor_mode_on = "on"; |
| const char *new_executor_mode_auto = "auto"; |
| const char *new_executor_mode_off = "off"; |
| char *new_executor_mode; |
| char *new_executor_enable_partitioned_hashagg_mode; |
| char *new_executor_enable_partitioned_hashjoin_mode; |
| char *new_executor_enable_external_sort_mode; |
| int new_executor_partitioned_hash_recursive_depth_limit; |
| int new_executor_ic_tcp_client_limit_per_query_per_segment; |
| |
| const char *new_executor_runtime_filter_mode; |
| const char *new_executor_runtime_filter_mode_local = "local"; |
| const char *new_executor_runtime_filter_mode_global = "global"; |
| |
| const char *new_scheduler_mode_on = "on"; |
| const char *new_scheduler_mode_off = "off"; |
| char *new_scheduler_mode; |
| |
| int new_interconnect_type; |
| const char *show_new_interconnect_type() { |
| switch (new_interconnect_type) { |
| case INTERCONNECT_TYPE_UDP: |
| return "UDP"; |
| case INTERCONNECT_TYPE_TCP: |
| default: |
| return "TCP"; |
| } |
| } |
| |
| #define MAGMA_MAX_FILESPLIT_NUM 2048 |
| #define MAGMA_MAX_FILESPLIT_NAME 128 |
| |
| static void do_convert_plantree_to_common_plan(Plan *node, int32_t pid, |
| bool isLeft, bool isSubPlan, |
| List *splits, Relation rel, |
| bool insist, |
| CommonPlanContext *ctx); |
| static bool do_convert_targetlist_to_common_plan(Plan *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_quallist_to_common_plan(Plan *node, |
| CommonPlanContext *ctx, |
| bool isInsist); |
| static bool do_convert_indexqual_to_common_plan(Plan *node, |
| CommonPlanContext *ctx, |
| bool isInsist); |
| static bool do_convert_initplan_to_common_plan(Plan *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_hashExpr_to_common_plan(Motion *node, |
| CommonPlanContext *ctx); |
| static void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx); |
| static void do_convert_magma_rangevseg_map_to_common_plan( |
| CommonPlanContext *ctx); |
| static void do_convert_rangetbl_to_common_plan(List *rtable, |
| CommonPlanContext *ctx); |
| static void do_convert_token_map_to_common_plan(CommonPlanContext *ctx); |
| static void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx); |
| static void do_convert_splits_list_to_common_plan(List *splits, Oid relOid, |
| CommonPlanContext *ctx); |
| static void do_convert_splits_to_common_plan(Scan *scan, Oid relOid, |
| CommonPlanContext *ctx); |
| static bool do_convert_expr_to_common_plan(int32_t pid, Expr *expr, |
| CommonPlanContext *ctx); |
| static bool do_convert_limit_to_common_plan(Limit *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_sort_limit_to_common_plan(Sort *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_nestloop_joinqual_to_common_plan(NestLoop *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_hashjoin_clause_to_common_plan(HashJoin *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_mergejoin_clause_to_common_plan(MergeJoin *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_result_qual_to_common_plan(Result *node, |
| CommonPlanContext *ctx); |
| static bool do_convert_subqueryscan_subplan_to_common_plan( |
| SubqueryScan *node, CommonPlanContext *ctx); |
| static Expr *parentExprSwitchTo(Expr *parent, CommonPlanContext *ctx); |
| static void setDummyTListRef(CommonPlanContext *ctx); |
| static void unsetDummyTListRef(CommonPlanContext *ctx); |
| static void getFmtName(char *fmtOptsJson, char **fmtName); |
| static bool checkSupportedTableFormat(Node *node, CommonPlanContext *ctx); |
| static void checkUnsupportedStmt(PlannedStmt *stmt, CommonPlanContext *ctx); |
| static void checkReadStatsOnlyForAgg(Agg *node, CommonPlanContext *ctx); |
| static bool checkSupportedSubLinkType(SubLinkType sublinkType); |
| static bool checkInsertSupportTable(PlannedStmt *stmt); |
| static bool checkIsPrepareQuery(QueryDesc *queryDesc); |
| |
| #define DIRECT_LEFT_CHILD_VAR 0 |
| #define INT64_MAX_LENGTH 20 |
| |
| static bool checkSupportedTableFormat(Node *node, CommonPlanContext *cxt) { |
| if (NULL == node) return false; |
| |
| switch (nodeTag(node)) { |
| case T_MagmaIndexScan: |
| case T_MagmaIndexOnlyScan: |
| case T_ExternalScan: { |
| ExternalScan *n = (ExternalScan *)node; |
| char fmtType = n->fmtType; |
| char *fmtName = NULL; |
| fmtName = getExtTblFormatterTypeInFmtOptsList(n->fmtOpts); |
| |
| if (fmtType == 'b') { |
| if (!pg_strncasecmp(fmtName, ORCTYPE, sizeof(ORCTYPE) - 1)) { |
| cxt->querySelect = true; |
| } |
| if (!pg_strncasecmp(fmtName, MAGMATYPE, sizeof(MAGMATYPE) - 1)) { |
| cxt->querySelect = true; |
| cxt->isMagma = true; |
| cxt->magmaRelIndex = n->scan.scanrelid; |
| } |
| } |
| |
| if (fmtName) pfree(fmtName); |
| break; |
| } |
| case T_AppendOnlyScan: { |
| AppendOnlyScan *n = (AppendOnlyScan *)node; |
| RangeTblEntry *rte = rt_fetch(n->scan.scanrelid, cxt->stmt->rtable); |
| if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) { |
| cxt->querySelect = true; |
| break; |
| } else { |
| cxt->convertible = false; |
| return true; |
| } |
| } |
| |
| case T_ParquetScan: { |
| cxt->convertible = false; |
| return true; |
| } |
| default: |
| break; |
| } |
| |
| return plan_tree_walker(node, checkSupportedTableFormat, cxt); |
| } |
| |
| bool can_convert_common_plan(QueryDesc *queryDesc, CommonPlanContext *ctx) { |
| PlannedStmt *stmt = queryDesc ? queryDesc->plannedstmt : NULL; |
| // disable for cursor and bind message |
| if (!queryDesc || queryDesc->extended_query) return false; |
| |
| // Disable new executor when too many TCP connection. |
| // Here it considers only the TCP client number of the root plan, regardless |
| // of its subplan and initplan. And GATHER MOTION are also considered as fully |
| // connected layer, as that in REDISTRIBUTE and BROADCAST MOTION. |
| int ic_tcp_client_num_per_segment = queryDesc->planner_segments * |
| queryDesc->planner_segments * |
| stmt->nMotionNodes / slaveHostNumber; |
| if (ic_tcp_client_num_per_segment > |
| new_executor_ic_tcp_client_limit_per_query_per_segment && |
| new_interconnect_type == INTERCONNECT_TYPE_TCP) { |
| if (!queryDesc->newPlanForceAuto && |
| pg_strcasecmp(new_executor_mode, new_executor_mode_on) == 0) |
| elog( |
| ERROR, |
| "Exceeding new executor TCP client limit per query per segment, %d > " |
| "%d, please set new_executor=auto/off to fall back to old executor", |
| ic_tcp_client_num_per_segment, |
| new_executor_ic_tcp_client_limit_per_query_per_segment); |
| return false; |
| } |
| |
| Assert(stmt->planTree); |
| planner_init_common_plan_context(stmt, ctx); |
| |
| // Fix issue 817 |
| if (checkIsPrepareQuery(queryDesc)) goto end; |
| |
| stmt->planner_segments = queryDesc->planner_segments; |
| stmt->originNodeType = queryDesc->originNodeType; |
| convert_to_common_plan(stmt, ctx); |
| |
| if (!ctx->convertible) goto end; |
| |
| convert_querydesc_to_common_plan(queryDesc, ctx); |
| |
| if (!ctx->convertible) goto end; |
| |
| return true; |
| |
| end: |
| planner_destroy_common_plan_context(ctx, !queryDesc->newPlanForceAuto); |
| return false; |
| } |
| |
| void convert_extscan_to_common_plan(Plan *node, List *splits, Relation rel, |
| CommonPlanContext *ctx) { |
| // only covert the plan node of extscan |
| if (ctx->convertible) |
| do_convert_plantree_to_common_plan(node, -1, true, false, splits, rel, |
| false, ctx); |
| if (ctx->convertible) { |
| do_convert_onetbl_to_common_plan(rel->rd_id, ctx); |
| } |
| } |
| |
| void *convert_orcscan_qual_to_common_plan(Plan *node, CommonPlanContext *ctx) { |
| planner_init_common_plan_context(NULL, ctx); |
| univPlanSeqScanNewInstance(ctx->univplan, -1); |
| do_convert_quallist_to_common_plan(node, ctx, false); |
| return univPlanGetQualList(ctx->univplan); |
| } |
| |
| // cal the range num |
| void convert_rangenum_to_common_plan(PlannedStmt *stmt, |
| CommonPlanContext *ctx) { |
| ListCell *lc_split = NULL; |
| foreach (lc_split, stmt->scantable_splits) { |
| List *split = (List *)lfirst(lc_split); |
| ctx->rangeNum += list_length(split); |
| } |
| } |
| |
| void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) { |
| checkUnsupportedStmt(stmt, ctx); |
| |
| if (ctx->convertible) |
| checkSupportedTableFormat((Node *)stmt->planTree, ctx); |
| |
| if (ctx->convertible) { |
| int32_t pid = -1; |
| if (stmt->commandType == CMD_INSERT) { |
| pid = univPlanInsertNewInstance(ctx->univplan, pid); |
| univPlanInsertSetRelId(ctx->univplan, |
| list_nth_int(stmt->resultRelations, 0)); |
| /* deal with magma insert info */ |
| /* 1. get the reloid */ |
| int32_t index = list_nth_int(stmt->resultRelations, 0); |
| RangeTblEntry *rte = (RangeTblEntry *)list_nth(stmt->rtable, index - 1); |
| Oid oid = rte->relid; |
| if (dataStoredInMagmaByOid(oid)) { |
| ctx->isMagma = true; |
| ctx->magmaRelIndex = index; |
| /* 2. get magma splits */ |
| List *magma_splits = |
| GetFileSplitsOfSegmentMagma(stmt->scantable_splits, oid); |
| /* 3. prepare hash info */ |
| int32_t nDistKeyIndex = 0; |
| int16_t *distKeyIndex = NULL; |
| fetchDistributionPolicy(oid, &nDistKeyIndex, &distKeyIndex); |
| /* the number of ranges is dynamic for magma table */ |
| int32_t nRanges = 0; |
| ListCell *lc_split = NULL; |
| foreach (lc_split, magma_splits) { |
| List *split = (List *)lfirst(lc_split); |
| nRanges += list_length(split); |
| } |
| uint32 range_to_rg_map[nRanges]; |
| List *rg = magma_build_range_to_rg_map(magma_splits, range_to_rg_map); |
| int nRg = list_length(rg); |
| uint16 *rgId = palloc0(sizeof(uint16) * nRg); |
| char **rgUrl = palloc0(sizeof(char *) * nRg); |
| magma_build_rg_to_url_map(magma_splits, rg, rgId, rgUrl); |
| univPlanInsertSetHasher(ctx->univplan, nDistKeyIndex, distKeyIndex, |
| nRanges, range_to_rg_map, nRg, rgId, rgUrl); |
| pfree(rgId); |
| pfree(rgUrl); |
| } |
| univPlanAddToPlanNode(ctx->univplan, true); |
| } |
| do_convert_plantree_to_common_plan(stmt->planTree, pid, true, false, NIL, |
| NULL, true, ctx); |
| do_convert_magma_rangevseg_map_to_common_plan(ctx); |
| } |
| |
| ListCell *lc; |
| foreach (lc, stmt->subplans) { |
| Plan *subplan = (Plan *)lfirst(lc); |
| if (ctx->convertible) |
| do_convert_plantree_to_common_plan(subplan, -1, true, true, NIL, NULL, |
| true, ctx); |
| } |
| if (ctx->convertible) do_convert_rangetbl_to_common_plan(stmt->rtable, ctx); |
| if (ctx->convertible && enable_secure_filesystem) |
| do_convert_token_map_to_common_plan(ctx); |
| if (ctx->convertible && ctx->isMagma) do_convert_snapshot_to_common_plan(ctx); |
| } |
| |
| void planner_init_common_plan_context(PlannedStmt *stmt, |
| CommonPlanContext *ctx) { |
| ctx->univplan = univPlanNewInstance(); |
| ctx->convertible = |
| pg_strcasecmp(new_executor_mode, new_executor_mode_off) != 0 ? true |
| : false; |
| ctx->enforceNewScheduler = |
| pg_strcasecmp(new_scheduler_mode, new_scheduler_mode_on) == 0 ? true |
| : false; |
| ctx->base.node = (Node *)stmt; |
| ctx->querySelect = false; |
| ctx->isMagma = false; |
| ctx->stmt = stmt; |
| ctx->setDummyTListRef = false; |
| ctx->scanReadStatsOnly = false; |
| ctx->parent = NULL; |
| ctx->exprBufStack = NIL; |
| ctx->rangeNum = 0; |
| } |
| |
| void planner_destroy_common_plan_context(CommonPlanContext *ctx, bool enforce) { |
| bool succeed = ctx->convertible; |
| univPlanFreeInstance(&ctx->univplan); |
| // only enforce query statement |
| if (enforce && ctx->querySelect && !succeed && |
| pg_strcasecmp(new_executor_mode, new_executor_mode_on) == 0) |
| elog(ERROR, |
| "New executor not supported yet, please set new_executor=auto/off to " |
| "fall back to old executor"); |
| } |
| |
| void get_all_stageno_from_plantree(Plan *node, int32_t *stageNo, |
| int32_t *stageNum, bool *isInitPlan) { |
| if (node == NULL) return; |
| |
| switch (nodeTag(node)) { |
| case T_Motion: { |
| Motion *m = (Motion *)node; |
| stageNo[*stageNum] = m->motionID; |
| (*stageNum)++; |
| break; |
| } |
| case T_SubqueryScan: { |
| SubqueryScan *subqueryscan = (SubqueryScan *)node; |
| get_all_stageno_from_plantree(subqueryscan->subplan, stageNo, stageNum, |
| isInitPlan); |
| } |
| } |
| if (node->initPlan) { |
| ListCell *lc; |
| foreach (lc, node->initPlan) { |
| SubPlan *initplan = (SubPlan *)lfirst(lc); |
| isInitPlan[initplan->plan_id - 1] = true; |
| } |
| } |
| get_all_stageno_from_plantree(node->lefttree, stageNo, stageNum, isInitPlan); |
| get_all_stageno_from_plantree(node->righttree, stageNo, stageNum, isInitPlan); |
| } |
| |
| void do_convert_plantree_to_common_plan(Plan *node, int32_t pid, bool isLeft, |
| bool isSubPlan, List *splits, |
| Relation rel, bool insist, |
| CommonPlanContext *ctx) { |
| if (node == NULL || !ctx->convertible) return; |
| int32_t uid; |
| |
| switch (nodeTag(node)) { |
| case T_Motion: { |
| Motion *m = (Motion *)node; |
| ConnectorType connType; |
| if (m->motionType == MOTIONTYPE_HASH) { |
| connType = UnivPlanShuffle; |
| } else if (m->motionType == MOTIONTYPE_FIXED) { |
| if (m->numOutputSegs == 0) |
| connType = UnivPlanBroadcast; |
| else |
| connType = UnivPlanConverge; |
| } else { |
| goto end; |
| } |
| uid = univPlanConnectorNewInstance(ctx->univplan, pid); |
| univPlanConnectorSetType(ctx->univplan, connType); |
| univPlanConnectorSetStageNo(ctx->univplan, m->motionID); |
| if (m->numSortCols > 0) { |
| int32_t *mappingSortFuncId = palloc(m->numSortCols * sizeof(int32_t)); |
| int32_t *colIdx = palloc(m->numSortCols * sizeof(int32_t)); |
| for (int i = 0; i < m->numSortCols; i++) { |
| mappingSortFuncId[i] = |
| HAWQ_FUNCOID_MAPPING(get_opcode(m->sortOperators[i])); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingSortFuncId[i])) goto end; |
| colIdx[i] = m->sortColIdx[i]; |
| } |
| univPlanConnectorSetColIdx(ctx->univplan, m->numSortCols, colIdx); |
| univPlanConnectorSetSortFuncId(ctx->univplan, m->numSortCols, |
| mappingSortFuncId); |
| pfree(mappingSortFuncId); |
| pfree(colIdx); |
| } |
| if (m->plan.directDispatch.isDirectDispatch) { |
| List *contentIds = m->plan.directDispatch.contentIds; |
| Assert(list_length(contentIds) == 1); |
| univPlanConnectorSetDirectDispatchId(ctx->univplan, |
| linitial_int(contentIds)); |
| } |
| if (connType == UnivPlanShuffle) { |
| if (!do_convert_hashExpr_to_common_plan(node, ctx)) goto end; |
| } |
| setDummyTListRef(ctx); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| unsetDummyTListRef(ctx); |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_MagmaIndexScan: |
| case T_MagmaIndexOnlyScan: |
| case T_ExternalScan: { |
| ExternalScan *n = (ExternalScan *)node; |
| // currently we support orc and magma format |
| char fmtType = n->fmtType; |
| char *fmtName = NULL; |
| bool magmaTable = false; |
| fmtName = getExtTblFormatterTypeInFmtOptsList(n->fmtOpts); |
| // For orc and magma table have different infos in scan node |
| if (fmtName) { |
| if (!pg_strncasecmp(fmtName, MAGMATYPE, sizeof(MAGMATYPE) - 1)) { |
| magmaTable = true; |
| } |
| } |
| |
| if (fmtType != 'b' || |
| (pg_strncasecmp(fmtName, ORCTYPE, sizeof(ORCTYPE) - 1) && |
| pg_strncasecmp(fmtName, MAGMATYPE, sizeof(MAGMATYPE) - 1))) { |
| if (fmtName) pfree(fmtName); |
| goto end; |
| } |
| |
| if (fmtName) pfree(fmtName); |
| |
| ListCell *lc; |
| foreach (lc, n->uriList) { |
| char *url = (char *)strVal(lfirst(lc)); |
| Uri *uri = ParseExternalTableUri(url); |
| if (uri == NULL || |
| (uri->protocol != URI_HDFS && uri->protocol != URI_MAGMA)) { |
| goto end; |
| } |
| } |
| // calculate columns to read for seqscan |
| int32_t numColsToRead = 0; |
| Plan *plan = (Plan *)&((Scan *)node)->plan; |
| Oid relOid; |
| // scan magma table in old executor |
| if (magmaTable && rel != NULL) { |
| relOid = rel->rd_id; |
| } else { |
| // scan magma table in new executor |
| relOid = getrelid(((Scan *)node)->scanrelid, ctx->stmt->rtable); |
| } |
| int32_t numCols = get_relnatts(relOid); |
| bool *proj = (bool *)palloc0(sizeof(bool) * numCols); |
| GetNeededColumnsForScan((Node *)plan->targetlist, proj, numCols); |
| GetNeededColumnsForScan((Node *)plan->qual, proj, numCols); |
| |
| // if (magmaTable) { |
| // int32_t i = 0; |
| // for (; i < numCols; ++i) { |
| // if (proj[i]) break; |
| // } |
| // if (i == numCols) proj[0] = true; |
| // } |
| |
| for (int32_t i = 0; i < numCols; i++) { |
| if (proj[i]) numColsToRead++; |
| } |
| |
| int32_t *columnsToRead = palloc(numColsToRead * sizeof(int32_t)); |
| int32_t index = 0; |
| for (int32_t i = 0; i < numCols; i++) { |
| if (proj[i]) columnsToRead[index++] = i + 1; |
| } |
| // This branch deal with magma table |
| if (magmaTable) { |
| uid = univPlanExtScanNewInstance(ctx->univplan, pid); |
| if (node->type != T_ExternalScan) { |
| univPlanExtScanSetIndex(ctx->univplan, true); |
| switch (node->type) { |
| case T_MagmaIndexScan: |
| univPlanExtScanSetScanType(ctx->univplan, ExternalIndexScan); |
| break; |
| case T_MagmaIndexOnlyScan: |
| univPlanExtScanSetScanType(ctx->univplan, ExternalIndexOnlyScan); |
| break; |
| default: |
| elog(ERROR, "unknown external scan type."); |
| break; |
| } |
| univPlanExtScanDirection(ctx->univplan, |
| ((ExternalScan *)node)->indexorderdir); |
| univPlanExtScanSetIndexName(ctx->univplan, |
| ((ExternalScan *)node)->indexname); |
| if (!do_convert_indexqual_to_common_plan(node, ctx, insist)) goto end; |
| } else { |
| univPlanExtScanSetScanType(ctx->univplan, NormalExternalScan); |
| } |
| univPlanExtScanSetRelId(ctx->univplan, ((Scan *)node)->scanrelid); |
| univPlanExtScanSetReadStatsOnly(ctx->univplan, ctx->scanReadStatsOnly); |
| if (columnsToRead) |
| univPlanExtScanSetColumnsToRead(ctx->univplan, numColsToRead, |
| columnsToRead); |
| // TODO(xsheng) cannot convert some TARGETENTRY to univplan because |
| // some expression types are not supported by universal plan. |
| // e.g. (composite type field) |
| // update t_boxes set tp.len = (tp).len+1 where id = 2; |
| // currently we can support convert composite type(e.g. tp) but we |
| // cannot convert composite type field(e.g. tp.len) |
| // we won't use target list in the plan post-processing, comment it now |
| if (splits == NIL && ctx->stmt != NULL) { // do it for new executor |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| } |
| // if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, insist)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (splits != NULL && ctx->stmt == NULL) { |
| // old executor, only convert magma external scan plan |
| do_convert_splits_list_to_common_plan(splits, relOid, ctx); |
| } else if (splits == NIL && ctx->stmt != NULL) { |
| // new executor, convert whole plan |
| do_convert_splits_to_common_plan((Scan *)node, relOid, ctx); |
| } |
| } else if (!magmaTable) { // This branch deal with orc table |
| uid = univPlanSeqScanNewInstance(ctx->univplan, pid); |
| univPlanSeqScanSetRelId(ctx->univplan, ((Scan *)node)->scanrelid); |
| univPlanSeqScanSetReadStatsOnly(ctx->univplan, ctx->scanReadStatsOnly); |
| if (columnsToRead) |
| univPlanSeqScanSetColumnsToRead(ctx->univplan, numColsToRead, |
| columnsToRead); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| do_convert_splits_to_common_plan((Scan *)node, relOid, ctx); |
| } else { |
| goto end; |
| } |
| break; |
| } |
| case T_AppendOnlyScan: { |
| int32_t numColsToRead = 0; |
| Plan *plan = (Plan *)&((Scan *)node)->plan; |
| Oid relOid = getrelid(((Scan *)node)->scanrelid, ctx->stmt->rtable); |
| int32_t numCols = get_relnatts(relOid); |
| bool *proj = (bool *)palloc0(sizeof(bool) * numCols); |
| GetNeededColumnsForScan((Node *)plan->targetlist, proj, numCols); |
| GetNeededColumnsForScan((Node *)plan->qual, proj, numCols); |
| |
| for (int32_t i = 0; i < numCols; i++) { |
| if (proj[i]) numColsToRead++; |
| } |
| |
| int32_t *columnsToRead = palloc(numColsToRead * sizeof(int32_t)); |
| int32_t index = 0; |
| for (int32_t i = 0; i < numCols; i++) { |
| if (proj[i]) columnsToRead[index++] = i + 1; |
| } |
| uid = univPlanSeqScanNewInstance(ctx->univplan, pid); |
| univPlanSeqScanSetRelId(ctx->univplan, ((Scan *)node)->scanrelid); |
| univPlanSeqScanSetReadStatsOnly(ctx->univplan, ctx->scanReadStatsOnly); |
| if (columnsToRead) |
| univPlanSeqScanSetColumnsToRead(ctx->univplan, numColsToRead, |
| columnsToRead); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| do_convert_splits_to_common_plan((Scan *)node, relOid, ctx); |
| break; |
| } |
| case T_Agg: { |
| Agg *agg = (Agg *)node; |
| |
| uid = univPlanAggNewInstance(ctx->univplan, pid); |
| int64_t numCols = agg->numCols; |
| int32_t *grpColIdx = palloc(numCols * sizeof(int32_t)); |
| for (int i = 0; i < numCols; ++i) grpColIdx[i] = agg->grpColIdx[i]; |
| univPlanAggSetNumGroupsAndGroupColIndexes(ctx->univplan, agg->numGroups, |
| numCols, grpColIdx); |
| univPlanAggSetAggstrategy(ctx->univplan, agg->aggstrategy); |
| univPlanAggSetRollup(ctx->univplan, agg->numNullCols, agg->inputGrouping, |
| agg->grouping, agg->rollupGSTimes, |
| agg->inputHasGrouping, agg->lastAgg, agg->streaming); |
| pfree(grpColIdx); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (!isSubPlan) checkReadStatsOnlyForAgg(agg, ctx); |
| break; |
| } |
| case T_Sort: { |
| Sort *sort = (Sort *)node; |
| uid = univPlanSortNewInstance(ctx->univplan, pid); |
| int32_t *mappingSortFuncId = palloc(sort->numCols * sizeof(int32_t)); |
| int32_t *colIdx = palloc(sort->numCols * sizeof(int32_t)); |
| for (int i = 0; i < sort->numCols; i++) { |
| mappingSortFuncId[i] = |
| HAWQ_FUNCOID_MAPPING(get_opcode(sort->sortOperators[i])); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingSortFuncId[i])) goto end; |
| colIdx[i] = sort->sortColIdx[i]; |
| } |
| univPlanSortSetColIdx(ctx->univplan, sort->numCols, colIdx); |
| univPlanSortSetSortFuncId(ctx->univplan, sort->numCols, |
| mappingSortFuncId); |
| univPlanSortSetNoDuplicates(ctx->univplan, sort->noduplicates); |
| pfree(mappingSortFuncId); |
| pfree(colIdx); |
| if (!do_convert_sort_limit_to_common_plan(sort, ctx)) goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_Limit: { |
| Limit *limit = (Limit *)node; |
| uid = univPlanLimitNewInstance(ctx->univplan, pid); |
| if (!do_convert_limit_to_common_plan(limit, ctx)) goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_Append: { |
| Append *append = (Append *)node; |
| if (append->isTarget || append->plan.qual) goto end; |
| uid = univPlanAppendNewInstance(ctx->univplan, pid); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_NestLoop: { |
| if (pg_strcasecmp(enable_alpha_newqe_str, "OFF") == 0) goto end; |
| NestLoop *nl = (NestLoop *)node; |
| if (nl->outernotreferencedbyinner || nl->shared_outer || |
| nl->singleton_outer) |
| goto end; |
| uid = univPlanNestLoopNewInstance(ctx->univplan, pid); |
| if (!univPlanNestLoopSetType(ctx->univplan, |
| (UnivPlanCJoinType)nl->join.jointype)) |
| goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_nestloop_joinqual_to_common_plan(nl, ctx)) goto end; |
| break; |
| } |
| case T_HashJoin: { |
| if (pg_strcasecmp(enable_alpha_newqe_str, "OFF") == 0) goto end; |
| HashJoin *hj = (HashJoin *)node; |
| uid = univPlanHashJoinNewInstance(ctx->univplan, pid); |
| if (!univPlanHashJoinSetType(ctx->univplan, |
| (UnivPlanCJoinType)hj->join.jointype)) |
| goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_hashjoin_clause_to_common_plan(hj, ctx)) goto end; |
| break; |
| } |
| case T_Hash: { |
| uid = univPlanHashNewInstance(ctx->univplan, pid); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_MergeJoin: { |
| MergeJoin *mj = (MergeJoin *)node; |
| uid = univPlanMergeJoinNewInstance(ctx->univplan, pid); |
| univPlanMergeJoinSetUniqueOuter(ctx->univplan, mj->unique_outer); |
| if (!univPlanMergeJoinSetType(ctx->univplan, |
| (UnivPlanCJoinType)mj->join.jointype)) |
| goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_mergejoin_clause_to_common_plan(mj, ctx)) goto end; |
| break; |
| } |
| case T_Material: { |
| Material *material = (Material *)node; |
| uid = univPlanMaterialNewInstance(ctx->univplan, pid); |
| if (!univPlanMaterialSetAttr( |
| ctx->univplan, (UnivPlanCShareType)material->share_type, |
| material->cdb_strict, material->share_id, material->driver_slice, |
| material->nsharer, material->nsharer_xslice)) |
| goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_ShareInputScan: { |
| ShareInputScan *shareInputScan = (ShareInputScan *)node; |
| uid = univPlanShareInputScanNewInstance(ctx->univplan, pid); |
| if (!univPlanShareInputScanSetAttr( |
| ctx->univplan, (UnivPlanCShareType)shareInputScan->share_type, |
| shareInputScan->share_id, shareInputScan->driver_slice)) |
| goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_Result: { |
| Result *result = (Result *)node; |
| if (result->hashFilter) goto end; |
| uid = univPlanResultNewInstance(ctx->univplan, pid); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_result_qual_to_common_plan(result, ctx)) goto end; |
| break; |
| } |
| case T_SubqueryScan: { |
| SubqueryScan *subqueryscan = (SubqueryScan *)node; |
| uid = univPlanSubqueryScanNewInstance(ctx->univplan, pid); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_subqueryscan_subplan_to_common_plan(subqueryscan, ctx)) |
| goto end; |
| break; |
| } |
| case T_Unique: { |
| Unique *uniq = (Unique *)node; |
| uid = univPlanUniqueNewInstance(ctx->univplan, pid); |
| int64_t numCols = uniq->numCols; |
| int32_t *uniqColIdx = palloc(numCols * sizeof(int32_t)); |
| for (int i = 0; i < numCols; ++i) uniqColIdx[i] = uniq->uniqColIdx[i]; |
| univPlanUniqueSetNumGroupsAndUniqColIdxs(ctx->univplan, uniq->numCols, |
| uniqColIdx); |
| pfree(uniqColIdx); |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| case T_SetOp: { |
| SetOp *setop = (SetOp *)node; |
| uid = univPlanSetOpNewInstance(ctx->univplan, pid); |
| if (!univPlanSetOpSetAttr(ctx->univplan, setop->cmd, setop->numCols, |
| setop->dupColIdx, setop->flagColIdx)) |
| goto end; |
| if (!do_convert_targetlist_to_common_plan(node, ctx)) goto end; |
| if (!do_convert_quallist_to_common_plan(node, ctx, true)) goto end; |
| if (!do_convert_initplan_to_common_plan(node, ctx)) goto end; |
| break; |
| } |
| default: // plannode not supported yet |
| goto end; |
| } |
| |
| univPlanSetPlanNodeInfo(ctx->univplan, node->plan_rows, node->plan_width, |
| node->operatorMemKB); |
| univPlanAddToPlanNode(ctx->univplan, isLeft); |
| do_convert_plantree_to_common_plan(node->lefttree, uid, true, isSubPlan, |
| splits, rel, insist, ctx); |
| do_convert_plantree_to_common_plan(node->righttree, uid, false, isSubPlan, |
| splits, rel, insist, ctx); |
| if (node->type == T_Append) { |
| Append *append = (Append *)node; |
| ListCell *lc; |
| foreach (lc, append->appendplans) { |
| Plan *planNode = (Plan *)lfirst(lc); |
| do_convert_plantree_to_common_plan(planNode, uid, true, isSubPlan, splits, |
| rel, insist, ctx); |
| } |
| } |
| |
| return; |
| |
| end: |
| ctx->convertible = false; |
| elog(DEBUG1, "New Executor not support plan node of %s", nodeToString(node)); |
| return; |
| } |
| |
| bool do_convert_targetlist_to_common_plan(Plan *node, CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, node->targetlist) { |
| TargetEntry *te = (TargetEntry *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, (Expr *)te, ctx)) return false; |
| univPlanTargetListAddTargetEntry(ctx->univplan, te->resjunk); |
| } |
| return true; |
| } |
| |
| /* |
| * param isInsist indicates whether insist convert quallist if there are |
| * unsupported expr type. if true then return false for unsuppported type |
| * otherwise convert "all" quallists that supported |
| * return true if all the quallist node is converted |
| */ |
| bool do_convert_quallist_to_common_plan(Plan *node, CommonPlanContext *ctx, |
| bool isInsist) { |
| ListCell *lc; |
| foreach (lc, node->qual) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| bool convert_ret = do_convert_expr_to_common_plan(-1, expr, ctx); |
| if (!convert_ret && isInsist) |
| return false; |
| else if (!convert_ret && !isInsist) |
| continue; |
| else if (convert_ret) |
| univPlanQualListAddExpr(ctx->univplan); |
| } |
| return true; |
| } |
| |
| static bool do_convert_indexqual_to_common_plan(Plan *node, |
| CommonPlanContext *ctx, |
| bool isInsist) { |
| ListCell *lc; |
| foreach (lc, ((ExternalScan *)node)->indexqualorig) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| bool convert_ret = do_convert_expr_to_common_plan(-1, expr, ctx); |
| if (!convert_ret && isInsist) |
| return false; |
| else if (!convert_ret && !isInsist) |
| continue; |
| else if (convert_ret) |
| univPlanIndexQualListAddExpr(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_initplan_to_common_plan(Plan *node, CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, node->initPlan) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| bool convert_ret = do_convert_expr_to_common_plan(-1, expr, ctx); |
| if (!convert_ret) return false; |
| univPlanInitplanAddExpr(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_hashExpr_to_common_plan(Motion *node, CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, node->hashExpr) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanConnectorAddHashExpr(ctx->univplan); |
| } |
| return true; |
| } |
| |
| void do_convert_magma_rangevseg_map_to_common_plan(CommonPlanContext *ctx) { |
| int *map = NULL; |
| int nmap = 0; |
| get_magma_range_vseg_map(&map, &nmap, ctx->stmt->planner_segments); |
| univPlanSetRangeVsegMap(ctx->univplan, map, nmap); |
| } |
| |
| bool do_convert_limit_to_common_plan(Limit *node, CommonPlanContext *ctx) { |
| if (node->limitOffset) { |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, (Expr *)node->limitOffset, ctx)) |
| return false; |
| univPlanLimitAddLimitOffset(ctx->univplan); |
| } |
| if (node->limitCount) { |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, (Expr *)node->limitCount, ctx)) |
| return false; |
| univPlanLimitAddLimitCount(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_sort_limit_to_common_plan(Sort *node, CommonPlanContext *ctx) { |
| if (node->limitOffset) { |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, (Expr *)node->limitOffset, ctx)) |
| return false; |
| univPlanSortAddLimitOffset(ctx->univplan); |
| } |
| if (node->limitCount) { |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, (Expr *)node->limitCount, ctx)) |
| return false; |
| univPlanSortAddLimitCount(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_nestloop_joinqual_to_common_plan(NestLoop *node, |
| CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, node->join.joinqual) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanNestLoopAddJoinQual(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_hashjoin_clause_to_common_plan(HashJoin *node, |
| CommonPlanContext *ctx) { |
| ListCell *lc; |
| |
| foreach (lc, node->join.joinqual) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanHashJoinAddJoinQual(ctx->univplan); |
| } |
| |
| foreach (lc, node->hashclauses) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanHashJoinAddHashClause(ctx->univplan); |
| } |
| |
| foreach (lc, node->hashqualclauses) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanHashJoinAddHashQualClause(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_mergejoin_clause_to_common_plan(MergeJoin *node, |
| CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, node->join.joinqual) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanMergeJoinAddJoinQual(ctx->univplan); |
| } |
| foreach (lc, node->mergeclauses) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanMergeJoinAddMergeClause(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_result_qual_to_common_plan(Result *node, |
| CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, (List *)node->resconstantqual) { |
| Expr *expr = (Expr *)lfirst(lc); |
| univPlanNewExpr(ctx->univplan); |
| if (!do_convert_expr_to_common_plan(-1, expr, ctx)) return false; |
| univPlanResultAddResConstantQual(ctx->univplan); |
| } |
| return true; |
| } |
| |
| bool do_convert_subqueryscan_subplan_to_common_plan(SubqueryScan *node, |
| CommonPlanContext *ctx) { |
| univPlanNewSubPlanNode(ctx->univplan); |
| do_convert_plantree_to_common_plan(node->subplan, -1, true, false, NIL, NULL, |
| true, ctx); |
| univPlanSubqueryScanAddSubPlan(ctx->univplan); |
| univPlanFreeSubPlanNode(ctx->univplan); |
| return ctx->convertible; |
| } |
| |
| void do_convert_splits_to_common_plan(Scan *scan, Oid relOid, |
| CommonPlanContext *ctx) { |
| ListCell *lc1; |
| foreach (lc1, ctx->stmt->scantable_splits) { |
| SegFileSplitMap map = (SegFileSplitMapNode *)lfirst(lc1); |
| if (map->relid == relOid) { |
| List *relSplits = map->splits; |
| ListCell *lc2; |
| foreach (lc2, relSplits) { |
| List *taskSplits = (List *)lfirst(lc2); |
| do_convert_splits_list_to_common_plan(taskSplits, relOid, ctx); |
| } |
| return; |
| } |
| } |
| } |
| |
| void do_convert_splits_list_to_common_plan(List *splits, Oid relOid, |
| CommonPlanContext *ctx) { |
| uint32_t fileSplitNum = (splits == NIL) ? 0 : splits->length; |
| elog(DEBUG1, "SplitNum: %u", fileSplitNum); |
| char relStorage = get_rel_relstorage(relOid); |
| if (relStorage == RELSTORAGE_ORC) { |
| if (splits == NIL) { |
| univPlanSeqScanAddTaskWithFileSplits(false, ctx->univplan, 0, NULL, NULL, |
| NULL, NULL, NULL, NULL); |
| return; |
| } |
| |
| int64_t *start = palloc0(fileSplitNum * sizeof(int64_t)); |
| int64_t *len = palloc0(fileSplitNum * sizeof(int64_t)); |
| int64_t *logicEof = palloc0(fileSplitNum * sizeof(int64_t)); |
| Relation rel = heap_open(relOid, NoLock); |
| int32 filePathMaxLen = AOSegmentFilePathNameLen(rel) + 1; |
| char **fileName = palloc0(fileSplitNum * sizeof(char *)); |
| int index = 0; |
| int dummySegno; |
| ListCell *lc = NULL; |
| foreach (lc, splits) { |
| FileSplitNode *filesplit = (FileSplitNode *)lfirst(lc); |
| start[index] = filesplit->offsets; |
| len[index] = filesplit->lengths; |
| logicEof[index] = filesplit->logiceof; |
| fileName[index] = palloc0(filePathMaxLen); |
| MakeAOSegmentFileName(rel, filesplit->segno, -1, &dummySegno, |
| fileName[index]); |
| ++index; |
| elog(DEBUG1, "segno: %d, offset %d, length %d, logic eof %d", |
| filesplit->segno, filesplit->offsets, filesplit->lengths, |
| filesplit->logiceof); |
| } |
| heap_close(rel, NoLock); |
| univPlanSeqScanAddTaskWithFileSplits(false, ctx->univplan, fileSplitNum, |
| (const char **)fileName, start, len, |
| logicEof, NULL, NULL); |
| |
| for (index = 0; index < fileSplitNum; ++index) |
| if (fileName[index]) pfree(fileName[index]); |
| pfree(fileName); |
| pfree(start); |
| pfree(len); |
| pfree(logicEof); |
| } else { |
| ExtTableEntry *extEntry = GetExtTableEntry(relOid); |
| Uri *uri = ParseExternalTableUri(strVal(linitial(extEntry->locations))); |
| |
| ListCell *lc = NULL; |
| int index = 0; |
| int fileNameLen; |
| if (is_hdfs_protocol(uri)) { |
| if (splits == NIL) { |
| univPlanSeqScanAddTaskWithFileSplits(false, ctx->univplan, 0, NULL, |
| NULL, NULL, NULL, NULL, NULL); |
| FreeExternalTableUri(uri); |
| return; |
| } |
| char **fileName = palloc0(fileSplitNum * sizeof(char *)); |
| int64_t *start = palloc0(fileSplitNum * sizeof(int64_t)); |
| int64_t *len = palloc0(fileSplitNum * sizeof(int64_t)); |
| |
| foreach (lc, splits) { |
| FileSplitNode *filesplit = (FileSplitNode *)lfirst(lc); |
| |
| start[index] = filesplit->offsets; |
| len[index] = filesplit->lengths; |
| |
| fileNameLen = |
| strlen(uri->hostname) + strlen(filesplit->ext_file_uri_string) + 15; |
| fileName[index] = palloc(sizeof(char) * fileNameLen); |
| sprintf(fileName[index], "hdfs://%s:%d%s", uri->hostname, uri->port, |
| filesplit->ext_file_uri_string); |
| |
| ++index; |
| elog(DEBUG1, "SplitInfo: %s, offset %d, length %d", |
| filesplit->ext_file_uri_string, filesplit->offsets, |
| filesplit->lengths); |
| } |
| |
| univPlanSeqScanAddTaskWithFileSplits(false, ctx->univplan, fileSplitNum, |
| (const char **)fileName, start, len, |
| NULL, NULL, NULL); |
| |
| for (index = 0; index < fileSplitNum; ++index) { |
| if (fileName[index]) { |
| pfree(fileName[index]); |
| } |
| } |
| pfree(fileName); |
| pfree(start); |
| pfree(len); |
| } else if (is_magma_protocol(uri)) { |
| if (splits == NIL) { |
| univPlanSeqScanAddTaskWithFileSplits(true, ctx->univplan, 0, NULL, NULL, |
| NULL, NULL, NULL, NULL); |
| FreeExternalTableUri(uri); |
| return; |
| } |
| if (fileSplitNum > MAGMA_MAX_FILESPLIT_NUM) { |
| elog(ERROR, "File split number for table is %u, which exceeds %u", |
| fileSplitNum, MAGMA_MAX_FILESPLIT_NUM); |
| } |
| |
| char *fileName[MAGMA_MAX_FILESPLIT_NUM]; |
| int64_t start[MAGMA_MAX_FILESPLIT_NUM]; |
| int64_t len[MAGMA_MAX_FILESPLIT_NUM]; |
| int32_t rangeId[MAGMA_MAX_FILESPLIT_NUM]; |
| int32_t replicaGroupId[MAGMA_MAX_FILESPLIT_NUM]; |
| |
| foreach (lc, splits) { |
| FileSplitNode *filesplit = (FileSplitNode *)lfirst(lc); |
| |
| start[index] = filesplit->offsets; |
| len[index] = filesplit->lengths; |
| rangeId[index] = filesplit->range_id; |
| replicaGroupId[index] = filesplit->replicaGroup_id; |
| fileName[index] = filesplit->ext_file_uri_string; |
| |
| ++index; |
| } |
| |
| univPlanSeqScanAddTaskWithFileSplits(true, ctx->univplan, fileSplitNum, |
| (const char **)fileName, start, len, |
| NULL, rangeId, replicaGroupId); |
| } else { |
| elog(ERROR, "external table with %s protocol is not supported", |
| uri->customprotocol); |
| } |
| |
| FreeExternalTableUri(uri); |
| } |
| } |
| |
| void do_convert_rangetbl_to_common_plan(List *rtable, CommonPlanContext *ctx) { |
| ListCell *lc; |
| foreach (lc, rtable) { |
| RangeTblEntry *rte = (RangeTblEntry *)lfirst(lc); |
| if (rte->rtekind != RTE_RELATION || rte->inh) { |
| univPlanRangeTblEntryAddDummy(ctx->univplan); |
| continue; |
| } |
| do_convert_onetbl_to_common_plan(rte->relid, ctx); |
| } |
| } |
| |
| void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) { |
| Relation rel = heap_open(relid, NoLock); |
| int attNum = 0; |
| char **columnName = NULL; |
| int32_t *columnDataType = NULL; |
| int64_t *columnDataTypeMod = NULL; |
| |
| if (RelationIsOrc(rel)) { |
| TupleDesc tableAttrs = rel->rd_att; |
| attNum = tableAttrs->natts; |
| columnName = palloc(attNum * sizeof(char *)); |
| columnDataType = palloc(attNum * sizeof(int32_t)); |
| columnDataTypeMod = palloc(attNum * sizeof(int64_t)); |
| for (int i = 0; i < attNum; ++i) { |
| Form_pg_attribute att = tableAttrs->attrs[i]; |
| columnName[i] = pstrdup(att->attname.data); |
| columnDataType[i] = map_hawq_type_to_common_plan(att->atttypid); |
| if (columnDataType[i] == INVALIDTYPEID) { |
| if (ctx->isMagma) { |
| columnDataType[i] = IOBASETYPEID; |
| } else { |
| ctx->convertible = false; |
| goto end; |
| } |
| } |
| columnDataTypeMod[i] = att->atttypmod; |
| } |
| FormatType fmttype = UnivPlanOrcFormat; |
| univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, "dummy", "{}", |
| attNum, (const char **)columnName, |
| columnDataType, columnDataTypeMod, NULL); |
| } else if (RelationIsExternal(rel)) { |
| TupleDesc tableAttrs = rel->rd_att; |
| attNum = tableAttrs->natts; |
| columnName = palloc(attNum * sizeof(char *)); |
| columnDataType = palloc(attNum * sizeof(int32_t)); |
| columnDataTypeMod = palloc(attNum * sizeof(int64_t)); |
| for (int i = 0; i < attNum; ++i) { |
| Form_pg_attribute att = tableAttrs->attrs[i]; |
| columnName[i] = pstrdup(att->attname.data); |
| columnDataType[i] = map_hawq_type_to_common_plan(att->atttypid); |
| if (columnDataType[i] == INVALIDTYPEID) { |
| if (ctx->isMagma) { |
| columnDataType[i] = IOBASETYPEID; |
| } else { |
| ctx->convertible = false; |
| goto end; |
| } |
| } |
| columnDataTypeMod[i] = att->atttypmod; |
| } |
| ExtTableEntry *extEntry = GetExtTableEntry(relid); |
| char *fmtOptsJson = NULL; |
| buildExternalTableFormatOptionStringInJson(extEntry->fmtopts, &fmtOptsJson); |
| |
| FormatType fmttype; |
| char *location = NULL; |
| char *fmtName = NULL; |
| char *targetName = NULL; |
| getFmtName(fmtOptsJson, &fmtName); |
| int16_t magmaType = -1; |
| if (pg_strncasecmp(fmtName, "\"magmatp\"", strlen("\"magmatp\"")) == 0) { |
| magmaType = 0; |
| } else if (pg_strncasecmp(fmtName, "\"magmaap\"", strlen("\"magmaap\"")) == |
| 0) { |
| magmaType = 1; |
| } |
| // indicate magma format table |
| if (magmaType >= 0) { |
| fmttype = UnivPlanMagmaFormat; |
| Oid namespaceOid = RelationGetNamespace(rel); |
| char *schema = getNamespaceNameByOid(namespaceOid); |
| Assert(schema != NULL); |
| char *table = RelationGetRelationName(rel); |
| int locationLen = sizeof("magma:///") - 1 + strlen(database) + |
| sizeof("/") + strlen(schema) + sizeof("/") + |
| strlen(table) + sizeof('\0'); |
| location = palloc(locationLen * sizeof(char)); |
| sprintf(location, "magma:///%s/%s/%s", database, schema, table); |
| |
| /* get the magma target str */ |
| int tarLen = strlen(database) + sizeof(".") + strlen(schema) + |
| sizeof(".") + strlen(table) + sizeof('\0'); |
| targetName = palloc(tarLen * sizeof(char)); |
| sprintf(targetName, "%s.%s.%s", database, schema, table); |
| |
| if (ctx->stmt != NULL) { |
| // for magma table, we should set magma_format_type, serialized_schema |
| // only for new executor |
| struct json_object *opt_json_object = json_tokener_parse(fmtOptsJson); |
| if (json_object_object_get(opt_json_object, "magma_format_type") == |
| NULL) { |
| char tmp[2]; |
| pg_itoa(magmaType, tmp); |
| json_object_object_add(opt_json_object, "magma_format_type", |
| json_object_new_string(tmp)); |
| } |
| char *serializeSchema = NULL; |
| int serializeSchemaLen = 0; |
| GetMagmaSchemaByRelid(ctx->stmt->scantable_splits, relid, |
| &serializeSchema, &serializeSchemaLen); |
| if (json_object_object_get(opt_json_object, "serialized_schema") == |
| NULL) { |
| json_object_object_add( |
| opt_json_object, "serialized_schema", |
| json_object_new_string_len(serializeSchema, serializeSchemaLen)); |
| } |
| if (opt_json_object != NULL) { |
| const char *str = json_object_to_json_string(opt_json_object); |
| fmtOptsJson = (char *)palloc0(strlen(str) + 1); |
| strcpy(fmtOptsJson, str); |
| json_object_put(opt_json_object); |
| } |
| } |
| } else if (pg_strncasecmp(fmtName, "\"orc\"", strlen("\"orc\"")) == 0) { |
| fmttype = UnivPlanOrcFormat; |
| location = pstrdup(strVal(linitial(extEntry->locations))); |
| } else if (pg_strncasecmp(fmtName, "\"csv\"", strlen("\"csv\"")) == 0 || |
| pg_strncasecmp(fmtName, "\"text\"", strlen("\"text\"")) == 0) { |
| univPlanRangeTblEntryAddDummy(ctx->univplan); |
| goto end; |
| } else { |
| elog(ERROR, "Cannot get external table format."); |
| } |
| |
| univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, location, |
| fmtOptsJson, attNum, |
| (const char **)columnName, columnDataType, |
| columnDataTypeMod, targetName); |
| |
| if (fmtOptsJson != NULL) pfree(fmtOptsJson); |
| if (fmtName != NULL) pfree(fmtName); |
| if (targetName != NULL) pfree(targetName); |
| pfree(location); |
| } else { |
| univPlanRangeTblEntryAddDummy(ctx->univplan); |
| heap_close(rel, NoLock); |
| return; |
| } |
| |
| end: |
| heap_close(rel, NoLock); |
| for (int i = 0; i < attNum; ++i) { |
| pfree(columnName[i]); |
| } |
| pfree(columnName); |
| pfree(columnDataType); |
| pfree(columnDataTypeMod); |
| } |
| |
| void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) { |
| HASH_SEQ_STATUS status; |
| struct FileSystemCredential *entry; |
| StringInfoData buffer; |
| FileSystemCredentialC entryC; |
| MemSet(&entryC, 0, sizeof(FileSystemCredentialC)); |
| FileSystemCredentialCPtr entryCPtr = &entryC; |
| |
| HTAB *currentFilesystemCredentials; |
| MemoryContext currentFilesystemCredentialsMemoryContext; |
| |
| get_current_credential_cache_and_memcxt( |
| ¤tFilesystemCredentials, |
| ¤tFilesystemCredentialsMemoryContext); |
| |
| Insist(NULL != currentFilesystemCredentials); |
| Insist(NULL != currentFilesystemCredentialsMemoryContext); |
| |
| initStringInfo(&buffer); |
| hash_seq_init(&status, currentFilesystemCredentials); |
| |
| while (NULL != (entry = hash_seq_search(&status))) { |
| entryCPtr->credential = entry->credential; |
| entryCPtr->key.host = entry->key.host; |
| entryCPtr->key.port = entry->key.port; |
| entryCPtr->key.protocol = entry->key.protocol; |
| univPlanAddTokenEntry(ctx->univplan, entryCPtr); |
| } |
| return; |
| } |
| |
| // it's convertible and it's a magma scan |
| void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx) { |
| // start transaction in magma for SELECT in new executor |
| if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) { |
| PlugStorageBeginTransaction(NULL); |
| } |
| Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED); |
| int32_t size = 0; |
| char *snapshot = NULL; |
| MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(), &snapshot, |
| &size); |
| if (snapshot && size != 0) { |
| univPlanAddSnapshot(ctx->univplan, snapshot, size); |
| } |
| free(snapshot); |
| } |
| /* |
| * return true if all the node expr is converted |
| */ |
| bool do_convert_expr_to_common_plan(int32_t pid, Expr *expr, |
| CommonPlanContext *ctx) { |
| int32_t mappingFuncId; |
| int32_t uid; |
| ListCell *lc; |
| Expr *old; |
| switch (expr->type) { |
| case T_TargetEntry: { |
| TargetEntry *te = (TargetEntry *)expr; |
| old = parentExprSwitchTo(expr, ctx); |
| if (!do_convert_expr_to_common_plan(pid, te->expr, ctx)) goto end; |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_RelabelType: { |
| RelabelType *te = (RelabelType *)expr; |
| old = parentExprSwitchTo(expr, ctx); |
| if (!do_convert_expr_to_common_plan(pid, te->arg, ctx)) goto end; |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_Var: { |
| Var *var = (Var *)expr; |
| // TODO(chiyang): support system attribute |
| if (var->varattno < 0 && |
| !(var->varattno == SelfItemPointerAttributeNumber || |
| var->varattno == GpSegmentIdAttributeNumber)) |
| goto end; |
| if (ctx->parent && ctx->parent->type == T_Aggref) { |
| Aggref *aggref = (Aggref *)ctx->parent; |
| univPlanAggrefAddProxyVar(ctx->univplan, pid, var->varattno, |
| HAWQ_FUNCOID_MAPPING(aggref->aggfnoid), |
| var->vartypmod, var->varnoold, |
| var->varoattno); |
| } else { |
| Oid varType = var->vartype; |
| if (checkUnsupportedDataType(varType, DateStyle)) goto end; |
| univPlanExprAddVar( |
| ctx->univplan, pid, |
| var->varno == DIRECT_LEFT_CHILD_VAR ? OUTER : var->varno, |
| var->varattno, map_hawq_type_to_common_plan(varType), |
| var->vartypmod, var->varnoold, var->varoattno); |
| } |
| break; |
| } |
| |
| case T_Const: { |
| Const *constval = (Const *)expr; |
| int32_t consttype = map_hawq_type_to_common_plan(constval->consttype); |
| if ((!constval->constisnull) && |
| (checkUnsupportedDataType(constval->consttype, DateStyle))) |
| goto end; |
| if (ctx->setDummyTListRef && ctx->parent && |
| ctx->parent->type == T_TargetEntry) { |
| univPlanExprAddVar(ctx->univplan, pid, OUTER, |
| ((TargetEntry *)ctx->parent)->resno, consttype, |
| constval->consttypmod, 0, 0); |
| } else { |
| Oid typoutput; |
| bool typIsVarlena; |
| getTypeOutputInfo(constval->consttype, &typoutput, &typIsVarlena); |
| char *extval = NULL; |
| if (!constval->constisnull) { |
| int savedDateStyle = DateStyle; |
| int savedDateOrder = DateOrder; |
| DateStyle = USE_ISO_DATES; |
| DateOrder = DATEORDER_MDY; |
| extval = OidOutputFunctionCall(typoutput, constval->constvalue); |
| DateStyle = savedDateStyle; |
| DateOrder = savedDateOrder; |
| if (constval->consttype == INTERVALOID) { |
| Interval *ival = (Interval *)DatumGetPointer(constval->constvalue); |
| extval = palloc(sizeof(char) * INT64_MAX_LENGTH * 2); |
| sprintf(extval, "%d:%d:%lld", ival->month, ival->day, ival->time); |
| } |
| } |
| univPlanExprAddConst(ctx->univplan, pid, consttype, |
| constval->constisnull, extval, |
| constval->consttypmod); |
| } |
| break; |
| } |
| |
| case T_OpExpr: { |
| OpExpr *opExpr = (OpExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| mappingFuncId = HAWQ_FUNCOID_MAPPING(opExpr->opfuncid); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)) goto end; |
| uid = univPlanExprAddOpExpr(ctx->univplan, pid, mappingFuncId); |
| foreach (lc, opExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_FuncExpr: { |
| FuncExpr *funcExpr = (FuncExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| mappingFuncId = HAWQ_FUNCOID_MAPPING(funcExpr->funcid); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)) goto end; |
| if (IS_HAWQ_MAPPING_DO_NOTHING(mappingFuncId)) { |
| if (funcExpr->args->length != 1) goto end; |
| foreach (lc, funcExpr->args) { |
| if (!do_convert_expr_to_common_plan(pid, lfirst(lc), ctx)) goto end; |
| } |
| } else { |
| uid = univPlanExprAddFuncExpr(ctx->univplan, pid, mappingFuncId); |
| foreach (lc, funcExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_Aggref: { |
| Aggref *aggref = (Aggref *)expr; |
| |
| // disable count distinct case |
| if (aggref->aggdistinct || aggref->aggorder) goto end; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| mappingFuncId = HAWQ_FUNCOID_MAPPING(aggref->aggfnoid); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)) goto end; |
| switch (aggref->aggstage) { |
| case AGGSTAGE_NORMAL: |
| uid = univPlanAggrefAddOneStage(ctx->univplan, pid, mappingFuncId); |
| break; |
| case AGGSTAGE_PARTIAL: |
| uid = |
| univPlanAggrefAddPartialStage(ctx->univplan, pid, mappingFuncId); |
| break; |
| case AGGSTAGE_INTERMEDIATE: |
| uid = univPlanAggrefAddIntermediateStage(ctx->univplan, pid, |
| mappingFuncId); |
| break; |
| case AGGSTAGE_FINAL: |
| uid = univPlanAggrefAddFinalStage(ctx->univplan, pid, mappingFuncId); |
| break; |
| default: |
| goto end; |
| } |
| |
| foreach (lc, aggref->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| case T_BoolExpr: { |
| BoolExpr *boolExpr = (BoolExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| uid = univPlanExprAddBoolExpr(ctx->univplan, pid, |
| (UnivplanBoolExprType)boolExpr->boolop); |
| foreach (lc, boolExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| case T_NullTest: { |
| NullTest *nullTest = (NullTest *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| uid = univPlanExprAddNullTestExpr( |
| ctx->univplan, pid, (UnivplanNullTestType)nullTest->nulltesttype); |
| if (!do_convert_expr_to_common_plan(uid, nullTest->arg, ctx)) goto end; |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| case T_BooleanTest: { |
| BooleanTest *boolTest = (BooleanTest *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| uid = univPlanExprAddBoolTestExpr( |
| ctx->univplan, pid, (UnivplanBooleanTestType)boolTest->booltesttype); |
| if (!do_convert_expr_to_common_plan(uid, boolTest->arg, ctx)) goto end; |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_CaseExpr: { |
| CaseExpr *caseexpr = (CaseExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| ctx->exprBufStack = lcons(caseexpr->arg, ctx->exprBufStack); |
| |
| int32_t casetype = map_hawq_type_to_common_plan(caseexpr->casetype); |
| if (checkUnsupportedDataType(caseexpr->casetype, DateStyle)) { |
| goto end; |
| } |
| uid = univPlanExprAddCaseExpr(ctx->univplan, pid, casetype); |
| foreach (lc, caseexpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| univPlanExprAddCaseExprDefresult(ctx->univplan, uid); |
| if (!do_convert_expr_to_common_plan(uid, caseexpr->defresult, ctx)) |
| goto end; |
| |
| parentExprSwitchTo(old, ctx); |
| ctx->exprBufStack = list_delete_first(ctx->exprBufStack); |
| break; |
| } |
| |
| case T_CaseWhen: { |
| CaseWhen *casewhen = (CaseWhen *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| uid = univPlanExprAddCaseWhen(ctx->univplan, pid); |
| |
| univPlanExprAddCaseWhenExpr(ctx->univplan, uid); |
| if (!do_convert_expr_to_common_plan(uid, casewhen->expr, ctx)) goto end; |
| |
| univPlanExprAddCaseWhenResult(ctx->univplan, uid); |
| if (!do_convert_expr_to_common_plan(uid, casewhen->result, ctx)) goto end; |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_CaseTestExpr: { |
| if (!do_convert_expr_to_common_plan(pid, linitial(ctx->exprBufStack), |
| ctx)) |
| goto end; |
| break; |
| } |
| |
| case T_Param: { |
| Param *param = (Param *)expr; |
| if (param->paramkind != PARAM_EXEC) goto end; |
| univPlanExprAddParam(ctx->univplan, pid, |
| (UnivplanParamKind)param->paramkind, param->paramid, |
| map_hawq_type_to_common_plan(param->paramtype), |
| param->paramtypmod); |
| break; |
| } |
| |
| case T_SubPlan: { |
| SubPlan *subplan = (SubPlan *)expr; |
| // TODO(chiyang): support ExecHashSubPlan |
| if (subplan->useHashTable) goto end; |
| if (!checkSupportedSubLinkType(subplan->subLinkType)) goto end; |
| uid = univPlanExprAddSubPlan( |
| ctx->univplan, pid, (UnivplanSubLinkType)subplan->subLinkType, |
| subplan->plan_id, subplan->qDispSliceId, |
| map_hawq_type_to_common_plan(subplan->firstColType), |
| subplan->firstColTypmod, subplan->useHashTable, subplan->is_initplan); |
| int num = 0; |
| if ((num = list_length(subplan->setParam)) > 0) { |
| int32_t *setParam = palloc(num * sizeof(int32_t)); |
| int idx = 0; |
| foreach (lc, subplan->setParam) |
| setParam[idx++] = lfirst_int(lc); |
| univPlanSubPlanAddSetParam(ctx->univplan, uid, num, setParam); |
| pfree(setParam); |
| } |
| if ((num = list_length(subplan->parParam)) > 0) { |
| int32_t *parParam = palloc(num * sizeof(int32_t)); |
| int idx = 0; |
| foreach (lc, subplan->parParam) |
| parParam[idx++] = lfirst_int(lc); |
| univPlanSubPlanAddParParam(ctx->univplan, uid, num, parParam); |
| pfree(parParam); |
| } |
| if ((num = list_length(subplan->paramIds)) > 0) { |
| int32_t *testexprParam = palloc(num * sizeof(int32_t)); |
| int idx = 0; |
| foreach (lc, subplan->paramIds) |
| testexprParam[idx++] = lfirst_int(lc); |
| univPlanSubPlanAddTestexprParam(ctx->univplan, uid, num, testexprParam); |
| pfree(testexprParam); |
| } |
| foreach (lc, subplan->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| univPlanExprAddSubPlanTestexpr(ctx->univplan, uid); |
| if (subplan->testexpr && |
| !do_convert_expr_to_common_plan(uid, (Expr *)subplan->testexpr, ctx)) |
| goto end; |
| break; |
| } |
| |
| case T_ScalarArrayOpExpr: { |
| ScalarArrayOpExpr *scalarArrayOpExpr = (ScalarArrayOpExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| mappingFuncId = HAWQ_FUNCOID_MAPPING(scalarArrayOpExpr->opfuncid); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)) goto end; |
| uid = univPlanExprAddScalarArrayOpExpr(ctx->univplan, pid, mappingFuncId, |
| scalarArrayOpExpr->useOr); |
| foreach (lc, scalarArrayOpExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_CoalesceExpr: { |
| CoalesceExpr *coalesceExpr = (CoalesceExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| int32_t coalesceType = |
| map_hawq_type_to_common_plan(coalesceExpr->coalescetype); |
| if (checkUnsupportedDataType(coalesceExpr->coalescetype, DateStyle)) { |
| goto end; |
| } |
| uid = univPlanExprAddCoalesceExpr(ctx->univplan, pid, coalesceType, |
| exprTypmod(coalesceExpr)); |
| foreach (lc, coalesceExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_NullIfExpr: { |
| NullIfExpr *nullIfExpr = (NullIfExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| mappingFuncId = HAWQ_FUNCOID_MAPPING(nullIfExpr->opfuncid); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)) goto end; |
| int32_t nullIfType = map_hawq_type_to_common_plan(exprType(nullIfExpr)); |
| if (checkUnsupportedDataType(exprType(nullIfExpr), DateStyle)) { |
| goto end; |
| } |
| uid = univPlanExprAddNullIfExpr(ctx->univplan, pid, mappingFuncId, |
| nullIfType, exprTypmod(nullIfExpr)); |
| foreach (lc, nullIfExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_DistinctExpr: { |
| DistinctExpr *distExpr = (DistinctExpr *)expr; |
| |
| old = parentExprSwitchTo(expr, ctx); |
| |
| mappingFuncId = HAWQ_FUNCOID_MAPPING(distExpr->opfuncid); |
| if (IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)) goto end; |
| uid = univPlanExprAddDistinctExpr(ctx->univplan, pid, mappingFuncId); |
| foreach (lc, distExpr->args) { |
| if (!do_convert_expr_to_common_plan(uid, lfirst(lc), ctx)) goto end; |
| } |
| |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_Grouping: { |
| old = parentExprSwitchTo(expr, ctx); |
| uid = univPlanExprAddGrouping(ctx->univplan, pid); |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_GroupId: { |
| old = parentExprSwitchTo(expr, ctx); |
| uid = univPlanExprAddGroupId(ctx->univplan, pid); |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| case T_GroupingFunc: { |
| GroupingFunc *groupingFunc = (GroupId *)expr; |
| old = parentExprSwitchTo(expr, ctx); |
| int32_t *args = palloc(list_length(groupingFunc->args) * sizeof(int32_t)); |
| |
| ListCell *lc; |
| int idx = 0; |
| foreach (lc, groupingFunc->args) { |
| args[idx++] = (int)intVal(lfirst(lc)); |
| } |
| uid = univPlanExprAddGroupingFunc(ctx->univplan, pid, args, |
| list_length(groupingFunc->args), |
| groupingFunc->ngrpcols); |
| pfree(args); |
| parentExprSwitchTo(old, ctx); |
| break; |
| } |
| |
| default: |
| goto end; |
| } |
| |
| return true; |
| end: |
| elog(DEBUG1, "New Executor not support expression of %s", nodeToString(expr)); |
| return false; |
| } |
| |
| Expr *parentExprSwitchTo(Expr *parent, CommonPlanContext *ctx) { |
| Expr *old = ctx->parent; |
| ctx->parent = parent; |
| return old; |
| } |
| |
| void setDummyTListRef(CommonPlanContext *ctx) { ctx->setDummyTListRef = true; } |
| |
| void unsetDummyTListRef(CommonPlanContext *ctx) { |
| ctx->setDummyTListRef = false; |
| } |
| |
| void getFmtName(char *fmtOptsJson, char **fmtName) { |
| *fmtName = NULL; |
| struct json_object *jobj = json_tokener_parse(fmtOptsJson); |
| |
| json_object *returnObj; |
| if (jobj != NULL && |
| json_object_object_get_ex(jobj, "formatter", &returnObj)) { |
| if (returnObj != NULL) { |
| const char *str = json_object_to_json_string(returnObj); |
| *fmtName = (char *)palloc0(strlen(str) + 1); |
| strcpy(*fmtName, str); |
| } |
| json_object_put(jobj); |
| } |
| } |
| |
| void checkUnsupportedStmt(PlannedStmt *stmt, CommonPlanContext *ctx) { |
| if (stmt->commandType == CMD_UPDATE || stmt->commandType == CMD_DELETE) |
| goto end; |
| |
| if (stmt->commandType == CMD_INSERT && !checkInsertSupportTable(stmt)) |
| goto end; |
| |
| if (stmt->originNodeType == T_CopyStmt) goto end; |
| |
| // disable insert into for common plan currently |
| if (stmt->intoClause) goto end; |
| |
| return; |
| |
| end: |
| ctx->convertible = false; |
| } |
| |
| bool checkInsertSupportTable(PlannedStmt *stmt) { |
| // disable partitioned result target |
| if (stmt->result_partitions) return false; |
| if (list_length(stmt->resultRelations) > 1) return false; |
| int32_t index = list_nth_int(stmt->resultRelations, 0); |
| RangeTblEntry *rte = (RangeTblEntry *)list_nth(stmt->rtable, index - 1); |
| |
| // if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true; |
| |
| Relation pgExtTableRel = heap_open(ExtTableRelationId, RowExclusiveLock); |
| cqContext cqc; |
| HeapTuple tuple = caql_getfirst(caql_addrel(cqclr(&cqc), pgExtTableRel), |
| cql("SELECT * FROM pg_exttable " |
| " WHERE reloid = :1 " |
| " FOR UPDATE ", |
| ObjectIdGetDatum(rte->relid))); |
| if (!HeapTupleIsValid(tuple)) goto end; |
| |
| bool isNull; |
| char fmtCode = |
| DatumGetChar(heap_getattr(tuple, Anum_pg_exttable_fmttype, |
| RelationGetDescr(pgExtTableRel), &isNull)); |
| if (!fmttype_is_custom(fmtCode)) goto end; |
| |
| Datum fmtOptDatum = heap_getattr(tuple, Anum_pg_exttable_fmtopts, |
| RelationGetDescr(pgExtTableRel), &isNull); |
| char *fmtOptString = |
| DatumGetCString(DirectFunctionCall1(textout, fmtOptDatum)); |
| char *fmtName = getExtTblFormatterTypeInFmtOptsStr(fmtOptString); |
| // format name must be orc/magmaap |
| const int FORMAT_MAGMAAP_LEN = 7; |
| const int FORMAT_ORC_LEN = 3; |
| bool isSupported = |
| fmtName && (!pg_strncasecmp(fmtName, "magmaap", FORMAT_MAGMAAP_LEN) || |
| !pg_strncasecmp(fmtName, "orc", FORMAT_ORC_LEN)); |
| if (!isSupported) goto end; |
| heap_close(pgExtTableRel, RowExclusiveLock); |
| return true; |
| |
| end: |
| heap_close(pgExtTableRel, RowExclusiveLock); |
| return false; |
| } |
| |
| void checkReadStatsOnlyForAgg(Agg *node, CommonPlanContext *ctx) { |
| ctx->scanReadStatsOnly = false; |
| |
| if (((Plan *)node)->lefttree->type == T_ExternalScan || |
| ((Plan *)node)->lefttree->type == T_Append || |
| ((Plan *)node)->lefttree->type == T_AppendOnlyScan) { |
| // not work for group by statements |
| if (node->numCols - node->numNullCols > 0) return; |
| |
| // not work for scan with filter |
| if (((Plan *)node)->lefttree->qual) return; |
| |
| // for append node |
| if (((Plan *)node)->lefttree->type == T_Append) { |
| Append *appendNode = (Append *)((Plan *)node)->lefttree; |
| ListCell *lc; |
| foreach (lc, appendNode->appendplans) { |
| Plan *appendPlan = (Plan *)lfirst(lc); |
| if (!appendPlan->type == T_ExternalScan) return; |
| if (appendPlan->qual) return; |
| ListCell *lstcell; |
| foreach (lstcell, appendPlan->targetlist) { |
| TargetEntry *te = (TargetEntry *)lfirst(lstcell); |
| if (te->expr->type != T_Var) return; |
| } |
| } |
| } |
| |
| bool readStatsOnly = false; |
| ListCell *lc1; |
| foreach (lc1, ((Plan *)node)->targetlist) { |
| TargetEntry *te = (TargetEntry *)lfirst(lc1); |
| assert(te->expr->type == T_Aggref); |
| Aggref *aggref = (Aggref *)te->expr; |
| assert(aggref->aggstage == AGGSTAGE_PARTIAL); |
| // filter functions that cannot use stats |
| // currently only avg(),sum(),count(),min(),max() could use stats |
| if (!((aggref->aggfnoid >= 2100 && aggref->aggfnoid <= 2147) || |
| (aggref->aggfnoid >= 2244 && aggref->aggfnoid <= 2245) || |
| (aggref->aggfnoid == 2803))) { |
| return; |
| } |
| // special case for count(*) |
| if (list_length(aggref->args) == 0) return; |
| ListCell *lc2; |
| foreach (lc2, aggref->args) { |
| Expr *expr = lfirst(lc2); |
| if (expr->type == T_Var) { |
| // disable read stats only for timestamp |
| if (aggref->aggfnoid != COUNT_ANY_OID) { |
| if (((Var *)expr)->vartype == HAWQ_TYPE_TIMESTAMP || |
| ((Var *)expr)->vartype == HAWQ_TYPE_TIMESTAMPTZ) |
| return; |
| } |
| readStatsOnly = true; |
| } else if (expr->type == T_RelabelType && |
| ((RelabelType *)expr)->arg->type == T_Var) { |
| // disable read stats only for timestamp |
| if (aggref->aggfnoid != COUNT_ANY_OID) { |
| if (((Var *)((RelabelType *)expr)->arg)->vartype == |
| HAWQ_TYPE_TIMESTAMP) |
| return; |
| } |
| readStatsOnly = true; |
| } else { |
| return; |
| } |
| } |
| } |
| ctx->scanReadStatsOnly = readStatsOnly; |
| } |
| } |
| |
| bool checkSupportedSubLinkType(SubLinkType sublinkType) { |
| switch (sublinkType) { |
| case EXISTS_SUBLINK: |
| case ALL_SUBLINK: |
| case ANY_SUBLINK: |
| case EXPR_SUBLINK: |
| case NOT_EXISTS_SUBLINK: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| bool checkIsPrepareQuery(QueryDesc *queryDesc) { |
| if (queryDesc->sourceText) { |
| if (pg_strncasecmp((queryDesc->sourceText), "PREPARE", 7) == 0) { |
| if (pg_strcasecmp(new_executor_mode, new_executor_mode_on) == 0) { |
| elog(ERROR, |
| "New executor not supported yet, please set new_executor=auto/off " |
| "to " |
| "fall back to old executor"); |
| } |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void convert_querydesc_to_common_plan(QueryDesc *queryDesc, |
| CommonPlanContext *ctx) { |
| if (queryDesc->params != NULL && queryDesc->params->numParams > 0) { |
| int savedDateStyle = DateStyle; |
| int savedDateOrder = DateOrder; |
| for (int32_t iParam = 0; iParam < queryDesc->params->numParams; ++iParam) { |
| ParamExternData *pxd = &queryDesc->params->params[iParam]; |
| Oid typoutput; |
| bool typIsVarlena; |
| getTypeOutputInfo(pxd->ptype, &typoutput, &typIsVarlena); |
| char *extval = NULL; |
| if (!pxd->isnull) { |
| switch (pxd->ptype) { |
| case BOOLOID: |
| case INT8OID: |
| case INT4OID: |
| case INT2OID: |
| case FLOAT8OID: |
| case FLOAT4OID: |
| case TIMEOID: |
| case TIMETZOID: |
| extval = OidOutputFunctionCall(typoutput, pxd->value); |
| break; |
| case DATEOID: |
| case TIMESTAMPOID: |
| case TIMESTAMPTZOID: |
| DateStyle = USE_ISO_DATES; |
| DateOrder = DATEORDER_MDY; |
| extval = OidOutputFunctionCall(typoutput, pxd->value); |
| DateStyle = savedDateStyle; |
| DateOrder = savedDateOrder; |
| break; |
| case INTERVALOID: { |
| Interval *ival = (Interval *)DatumGetPointer(pxd->value); |
| extval = palloc(sizeof(char) * INT64_MAX_LENGTH * 2); |
| sprintf(extval, "%d-%d-%lld", ival->month, ival->day, ival->time); |
| } break; |
| default: |
| if (pxd->value) |
| extval = OidOutputFunctionCall(typoutput, pxd->value); |
| } |
| } |
| univPlanAddParamInfo(ctx->univplan, |
| map_hawq_type_to_common_plan(pxd->ptype), |
| pxd->isnull, extval); |
| } |
| } |
| } |