| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * execMain.c |
| * top level executor interface routines |
| * |
| * INTERFACE ROUTINES |
| * ExecutorStart() |
| * ExecutorRun() |
| * ExecutorEnd() |
| * |
| * The old ExecutorMain() has been replaced by ExecutorStart(), |
| * ExecutorRun() and ExecutorEnd() |
| * |
| * These three procedures are the external interfaces to the executor. |
| * In each case, the query descriptor is required as an argument. |
| * |
| * ExecutorStart() must be called at the beginning of execution of any |
| * query plan and ExecutorEnd() should always be called at the end of |
| * execution of a plan. |
| * |
| * ExecutorRun accepts direction and count arguments that specify whether |
| * the plan is to be executed forwards, backwards, and for how many tuples. |
| * |
| * Portions Copyright (c) 2005-2010, Greenplum inc |
| * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.280.2.2 2007/02/02 00:07:27 tgl Exp $ |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| #include "fmgr.h" |
| #include "gpmon/gpmon.h" |
| |
| #include "access/heapam.h" |
| #include "access/aosegfiles.h" |
| #include "access/orcam.h" |
| #include "access/orcsegfiles.h" |
| #include "access/parquetsegfiles.h" |
| #include "access/appendonlywriter.h" |
| #include "access/fileam.h" |
| #include "access/filesplit.h" |
| #include "access/read_cache.h" |
| #include "access/reloptions.h" |
| #include "access/transam.h" |
| #include "access/xact.h" |
| #include "access/plugstorage.h" |
| #include "catalog/heap.h" |
| #include "catalog/namespace.h" |
| #include "catalog/toasting.h" |
| #include "catalog/aoseg.h" |
| #include "catalog/catalog.h" |
| #include "catalog/pg_attribute_encoding.h" |
| #include "catalog/pg_type.h" |
| #include "catalog/pg_exttable.h" |
| #include "cdb/cdbpartition.h" |
| #include "cdb/cdbmirroredfilesysobj.h" |
| #include "commands/dbcommands.h" |
| #include "commands/tablecmds.h" /* XXX: temp for get_parts() */ |
| #include "commands/tablespace.h" |
| #include "commands/trigger.h" |
| #include "executor/execDML.h" |
| #include "executor/execdebug.h" |
| #include "executor/instrument.h" |
| #include "executor/nodeSubplan.h" |
| #include "libpq/pqformat.h" |
| #include "miscadmin.h" |
| #include "nodes/makefuncs.h" /* temporary */ |
| #include "nodes/pg_list.h" |
| #include "optimizer/clauses.h" |
| #include "parser/parse_clause.h" |
| #include "parser/parse_expr.h" |
| #include "parser/parse_relation.h" |
| #include "parser/parsetree.h" |
| #include "storage/smgr.h" |
| #include "utils/acl.h" |
| #include "utils/builtins.h" |
| #include "utils/hsearch.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/ps_status.h" |
| #include "utils/rel.h" |
| #include "utils/uri.h" |
| #include "utils/workfile_mgr.h" |
| |
| #include "catalog/pg_statistic.h" |
| #include "catalog/pg_class.h" |
| |
| #include "tcop/tcopprot.h" |
| |
| #include "cdb/cdbappendonlyam.h" |
| #include "cdb/cdbparquetam.h" |
| #include "cdb/cdbcat.h" |
| #include "cdb/cdbdisp.h" |
| #include "cdb/cdbdispatchresult.h" |
| #include "cdb/cdbdatalocality.h" |
| #include "cdb/dispatcher.h" |
| #include "cdb/dispatcher_new.h" |
| #include "cdb/cdbexplain.h" /* cdbexplain_sendExecStats() */ |
| #include "cdb/cdbplan.h" |
| #include "cdb/cdbsrlz.h" |
| #include "cdb/cdbsubplan.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/ml_ipc.h" |
| #include "cdb/cdbmotion.h" |
| #include "cdb/cdbgang.h" |
| #include "cdb/cdboidsync.h" |
| #include "cdb/cdbmirroredbufferpool.h" |
| #include "cdb/cdbpersistentstore.h" |
| #include "cdb/cdbpersistentfilesysobj.h" |
| #include "cdb/cdbllize.h" |
| #include "cdb/memquota.h" |
| #include "cdb/cdbsharedstorageop.h" |
| #include "cdb/cdbtargeteddispatch.h" |
| #include "cdb/cdbquerycontextdispatching.h" |
| #include "optimizer/prep.h" |
| #include "tcop/pquery.h" |
| |
| #include "resourcemanager/dynrm.h" |
| #include "utils/syscache.h" |
| |
| extern bool filesystem_support_truncate; |
| |
| typedef struct evalPlanQual |
| { |
| Index rti; |
| EState *estate; |
| PlanState *planstate; |
| struct evalPlanQual *next; /* stack of active PlanQual plans */ |
| struct evalPlanQual *free; /* list of free PlanQual plans */ |
| } evalPlanQual; |
| |
| /* decls for local routines only used within this module */ |
| static void InitPlan(QueryDesc *queryDesc, int eflags); |
| static void initResultRelInfo(ResultRelInfo *resultRelInfo, |
| Index resultRelationIndex, |
| List *rangeTable, |
| CmdType operation, |
| bool doInstrument, |
| bool needLock); |
| static void ExecCheckPlanOutput(Relation resultRel, List *targetList); |
| static TupleTableSlot *ExecutePlan(EState *estate, PlanState *planstate, |
| CmdType operation, |
| long numberTuples, |
| ScanDirection direction, |
| DestReceiver *dest); |
| static void ExecSelect(TupleTableSlot *slot, |
| DestReceiver *dest, |
| EState *estate); |
| static TupleTableSlot *EvalPlanQualNext(EState *estate); |
| static void EndEvalPlanQual(EState *estate); |
| static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt); |
| static void EvalPlanQualStart(evalPlanQual *epq, EState *estate, |
| evalPlanQual *priorepq); |
| static void EvalPlanQualStop(evalPlanQual *epq); |
| static void OpenIntoRel(QueryDesc *queryDesc); |
| static void CloseIntoRel(QueryDesc *queryDesc); |
| static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); |
| static void intorel_receive(TupleTableSlot *slot, DestReceiver *self); |
| static void intorel_shutdown(DestReceiver *self); |
| static void intorel_destroy(DestReceiver *self); |
| static void ClearPartitionState(EState *estate); |
| static void findPartTableOidsToDispatch(Plan *plan, List **partTables, List **oidLists); |
| |
| typedef struct CopyDirectDispatchToSliceContext |
| { |
| plan_tree_base_prefix base; /* Required prefix for plan_tree_walker/mutator */ |
| EState *estate; /* EState instance */ |
| } CopyDirectDispatchToSliceContext; |
| |
| static bool CopyDirectDispatchFromPlanToSliceTableWalker( Node *node, CopyDirectDispatchToSliceContext *context); |
| |
| static void |
| CopyDirectDispatchToSlice( Plan *ddPlan, int sliceId, CopyDirectDispatchToSliceContext *context) |
| { |
| EState *estate = context->estate; |
| Slice *slice = (Slice *)list_nth(estate->es_sliceTable->slices, sliceId); |
| |
| Assert( ! slice->directDispatch.isDirectDispatch ); /* should not have been set by some other process */ |
| Assert(ddPlan != NULL); |
| |
| if ( ddPlan->directDispatch.isDirectDispatch) |
| { |
| slice->directDispatch.isDirectDispatch = true; |
| slice->directDispatch.contentIds = list_copy(ddPlan->directDispatch.contentIds); |
| } |
| } |
| |
| static bool |
| CopyDirectDispatchFromPlanToSliceTableWalker( Node *node, CopyDirectDispatchToSliceContext *context) |
| { |
| int sliceId = -1; |
| Plan *ddPlan = NULL; |
| |
| if (node == NULL) |
| return false; |
| |
| if (IsA(node, Motion)) |
| { |
| Motion *motion = (Motion *) node; |
| |
| ddPlan = (Plan*)node; |
| sliceId = motion->motionID; |
| } |
| |
| if (ddPlan != NULL) |
| { |
| CopyDirectDispatchToSlice(ddPlan, sliceId, context); |
| } |
| return plan_tree_walker(node, CopyDirectDispatchFromPlanToSliceTableWalker, context); |
| } |
| |
| static void |
| CopyDirectDispatchFromPlanToSliceTable(PlannedStmt *stmt, EState *estate) |
| { |
| CopyDirectDispatchToSliceContext context; |
| exec_init_plan_tree_base(&context.base, stmt); |
| context.estate = estate; |
| CopyDirectDispatchToSlice( stmt->planTree, 0, &context); |
| CopyDirectDispatchFromPlanToSliceTableWalker((Node *) stmt->planTree, &context); |
| } |
| |
| |
| typedef struct QueryCxtWalkerCxt { |
| plan_tree_base_prefix base; |
| QueryContextInfo *info; |
| } QueryCxtWalkerCxt; |
| |
| /** |
| * SetupSegnoForErrorTable |
| * travel the query plan to find out the external table scan, assign segfile for error table if exist. |
| */ |
| static bool |
| SetupSegnoForErrorTable(Node *node, QueryCxtWalkerCxt *cxt) |
| { |
| ExternalScan *scan = (ExternalScan *)node; |
| QueryContextInfo *info = cxt->info; |
| List *errSegnos; |
| bool reuse_segfilenum_in_same_xid = true; |
| |
| if (NULL == node) |
| return false; |
| |
| switch (nodeTag(node)) |
| { |
| case T_ExternalScan: |
| /* |
| * has no error table |
| */ |
| if (!OidIsValid(scan->fmterrtbl)) |
| return false; |
| |
| // hdfs protocol external table in text/csv format |
| if (hasErrTblInFmtOpts(scan->fmtOpts)) |
| return false; |
| |
| /* |
| * check if two external table use the same error table in a statement |
| */ |
| if (info->errTblOid) |
| { |
| ListCell *c; |
| Oid errtbloid; |
| foreach(c, info->errTblOid) |
| { |
| errtbloid = lfirst_oid(c); |
| if (errtbloid == scan->fmterrtbl) |
| { |
| reuse_segfilenum_in_same_xid = false; |
| break; |
| } |
| } |
| } |
| |
| /* |
| * Prepare error table for insert. |
| */ |
| Assert(!rel_is_partitioned(scan->fmterrtbl)); |
| errSegnos = SetSegnoForWrite(NIL, scan->fmterrtbl, GetQEGangNum(), false, reuse_segfilenum_in_same_xid, true); |
| scan->errAosegnos = errSegnos; |
| info->errTblOid = lcons_oid(scan->fmterrtbl, info->errTblOid); |
| |
| Relation errRel = heap_open(scan->fmterrtbl, RowExclusiveLock); |
| CreateAoSegFileForRelationOnMaster(errRel, errSegnos); |
| prepareDispatchedCatalogSingleRelation(info, scan->fmterrtbl, TRUE, errSegnos); |
| scan->err_aosegfileinfos = fetchSegFileInfos(scan->fmterrtbl, errSegnos); |
| |
| heap_close(errRel, RowExclusiveLock); |
| |
| return false; |
| default: |
| break; |
| |
| } |
| |
| return plan_tree_walker(node, SetupSegnoForErrorTable, cxt); |
| } |
| |
| /* |
| * findPartTableOidsToDispatch |
| * find all partitioned tables for which static selection has been done. |
| * "partTables" will have the oids of these tables |
| * "oidLists" will have the corresponding lists of selected child oids for each table |
| */ |
| static void |
| findPartTableOidsToDispatch(Plan *plan, List **partTables, List **oidLists) |
| { |
| Assert (NULL != plan); |
| Assert (NULL != partTables); |
| Assert (NIL == *partTables); |
| Assert (NULL != oidLists); |
| Assert (NIL == *oidLists); |
| |
| List *partitionSelectors = extract_nodes_plan(plan, T_PartitionSelector, true /*descendIntoSubqueries*/); |
| |
| // list of partitioned tables for which we have to ship metadata for all parts |
| List *fullScanTables = NIL; |
| |
| ListCell *psc = NULL; |
| foreach (psc, partitionSelectors) |
| { |
| PartitionSelector *ps = (PartitionSelector *) lfirst(psc); |
| bool allPartsSelected = list_member_oid(fullScanTables, ps->relid); |
| int idx = list_find_oid(*partTables, ps->relid); |
| if (0 <= idx) |
| { |
| // table has been encountered before and it did not need all parts |
| Assert(!allPartsSelected); |
| List *partOids = list_nth(*oidLists, idx); |
| if (ps->staticSelection) |
| { |
| // union the parts we needed before and the parts we need now |
| List *newList = list_union_oid(partOids, ps->staticPartOids); |
| list_nth_replace(*oidLists, idx, newList); |
| list_free(partOids); |
| partOids = newList; |
| } |
| else |
| { |
| // this instance of the table requires all parts - remove |
| // it from the output list and add it to the full scan list |
| *oidLists = list_delete_ptr(*oidLists, partOids); |
| *partTables = list_delete_oid(*partTables, ps->relid); |
| list_free(partOids); |
| |
| fullScanTables = lappend_oid(fullScanTables, ps->relid); |
| } |
| } |
| else if (!allPartsSelected) |
| { |
| // table has not been encountered before |
| if (ps->staticSelection) |
| { |
| // static selection, use list of selected parts + root oid |
| List *partOids = list_copy(ps->staticPartOids); |
| partOids = lappend_oid(partOids, ps->relid); |
| |
| *oidLists = lappend(*oidLists, partOids); |
| *partTables = lappend_oid(*partTables, ps->relid); |
| } |
| else |
| { |
| // table needs all parts |
| fullScanTables = lappend_oid(fullScanTables, ps->relid); |
| } |
| } |
| } |
| |
| list_free(partitionSelectors); |
| list_free(fullScanTables); |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecutorStart |
| * |
| * This routine must be called at the beginning of any execution of any |
| * query plan |
| * |
| * Takes a QueryDesc previously created by CreateQueryDesc (it's not real |
| * clear why we bother to separate the two functions, but...). The tupDesc |
| * field of the QueryDesc is filled in to describe the tuples that will be |
| * returned, and the internal fields (estate and planstate) are set up. |
| * |
| * eflags contains flag bits as described in executor.h. |
| * |
| * NB: the CurrentMemoryContext when this is called will become the parent |
| * of the per-query context used for this Executor invocation. |
| * |
| * MPP: In here we take care of setting up all the necessary items that |
| * will be needed to service the query, such as setting up interconnect, |
| * and dispatching the query. Any other items in the future |
| * must be added here. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecutorStart(QueryDesc *queryDesc, int eflags) |
| { |
| EState *estate; |
| MemoryContext oldcontext; |
| GpExecIdentity exec_identity; |
| bool shouldDispatch; |
| |
| /* sanity checks: queryDesc must not be started already */ |
| Assert(queryDesc != NULL); |
| Assert(queryDesc->estate == NULL); |
| Assert(queryDesc->plannedstmt != NULL); |
| |
| PlannedStmt *plannedStmt = queryDesc->plannedstmt; |
| |
| if (NULL == plannedStmt->memoryAccount) |
| { |
| plannedStmt->memoryAccount = MemoryAccounting_CreateAccount(0, MEMORY_OWNER_TYPE_EXECUTOR); |
| } |
| |
| START_MEMORY_ACCOUNT(plannedStmt->memoryAccount); |
| { |
| Assert(queryDesc->plannedstmt->intoPolicy == NULL |
| || queryDesc->plannedstmt->intoPolicy->ptype == POLICYTYPE_PARTITIONED); |
| |
| if ( Gp_role == GP_ROLE_DISPATCH ) { |
| queryDesc->savedResource = GetActiveQueryResource(); |
| SetActiveQueryResource(queryDesc->resource); |
| } |
| |
| /** |
| * Perfmon related stuff. |
| */ |
| if (gp_enable_gpperfmon |
| && Gp_role == GP_ROLE_DISPATCH |
| && queryDesc->gpmon_pkt) |
| { |
| gpmon_qlog_query_start(queryDesc->gpmon_pkt); |
| } |
| |
| /** |
| * Distribute memory to operators. |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| QueryResource *resource = GetActiveQueryResource(); |
| if (resource) { |
| queryDesc->plannedstmt->query_mem = resource->segment_memory_mb; |
| queryDesc->plannedstmt->query_mem *= 1024 * 1024; |
| |
| elog(DEBUG3, "Query requested %.0fKB memory %lf core", |
| (double) (1.0 * queryDesc->plannedstmt->query_mem / 1024.0), |
| resource->segment_vcore); |
| } |
| |
| /** |
| * There are some statements that do not go through the resource queue, |
| * so we cannot put in a strong assert here. Someday, we should fix |
| * resource queues. |
| */ |
| if (queryDesc->plannedstmt->query_mem > 0) { |
| |
| uint64 memAvailableBytes = 0; |
| |
| /* With resouce manager in NONE mode, we assign memory quota for operators normally */ |
| if ( strcasecmp(rm_global_rm_type, HAWQDRM_CONFFILE_SVRTYPE_VAL_NONE) == 0 ) |
| { |
| memAvailableBytes = queryDesc->plannedstmt->query_mem; |
| } |
| /* With resouce manager in YARN/MESOS mode, we assign memory quota for operators conservatively */ |
| else |
| { |
| memAvailableBytes = queryDesc->plannedstmt->query_mem * hawq_re_memory_quota_allocation_ratio; |
| } |
| |
| AssignOperatorMemoryKB(queryDesc->plannedstmt, memAvailableBytes); |
| } |
| } |
| |
| /* |
| * If the transaction is read-only, we need to check if any writes are |
| * planned to non-temporary tables. EXPLAIN is considered read-only. |
| */ |
| if ((XactReadOnly || Gp_role == GP_ROLE_DISPATCH) && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) |
| ExecCheckXactReadOnly(queryDesc->plannedstmt); |
| |
| /* |
| * Build EState, switch into per-query memory context for startup. |
| */ |
| estate = CreateExecutorState(); |
| queryDesc->estate = estate; |
| |
| oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); |
| |
| /** |
| * Attached the plannedstmt from queryDesc |
| */ |
| estate->es_plannedstmt = queryDesc->plannedstmt; |
| |
| /* |
| * Fill in parameters, if any, from queryDesc |
| */ |
| estate->es_param_list_info = queryDesc->params; |
| |
| if (queryDesc->plannedstmt->nCrossLevelParams > 0) |
| estate->es_param_exec_vals = (ParamExecData *) |
| palloc0(queryDesc->plannedstmt->nCrossLevelParams * sizeof(ParamExecData)); |
| |
| /* |
| * Copy other important information into the EState |
| */ |
| estate->es_snapshot = queryDesc->snapshot; |
| estate->es_crosscheck_snapshot = queryDesc->crosscheck_snapshot; |
| estate->es_instrument = queryDesc->doInstrument; |
| estate->showstatctx = queryDesc->showstatctx; |
| |
| /* |
| * Shared input info is needed when ROLE_EXECUTE or sequential plan |
| */ |
| estate->es_sharenode = (List **) palloc0(sizeof(List *)); |
| |
| /* |
| * Initialize the motion layer for this query. |
| * |
| * NOTE: need to be in estate->es_query_cxt before the call. |
| */ |
| initMotionLayerStructs((MotionLayerState **)&estate->motionlayer_context); |
| |
| /* Reset workfile disk full flag */ |
| WorkfileDiskspace_SetFull(false /* isFull */); |
| /* Initialize per-query resource (diskspace) tracking */ |
| WorkfileQueryspace_InitEntry(gp_session_id, gp_command_count); |
| |
| /* |
| * Handling of the Slice table depends on context. |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH && queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL) |
| { |
| SetupDispatcherIdentity(queryDesc->plannedstmt->planner_segments); |
| /* |
| * If this is an extended query (normally cursor or bind/exec) - before |
| * starting the portal, we need to make sure that the shared snapshot is |
| * already set by a writer gang, or the cursor query readers will |
| * timeout waiting for one that may not exist (in some cases). Therefore |
| * we insert a small hack here and dispatch a SET query that will do it |
| * for us. (This is also done in performOpenCursor() for the simple |
| * query protocol). |
| * |
| * MPP-7504/MPP-7448: We also call this down inside the dispatcher after |
| * the pre-dispatch evaluator has run. |
| */ |
| if (queryDesc->extended_query) |
| { |
| /* verify_shared_snapshot_ready(); */ |
| } |
| |
| /* Set up blank slice table to be filled in during InitPlan. */ |
| InitSliceTable(estate, queryDesc->plannedstmt->nMotionNodes, queryDesc->plannedstmt->nInitPlans); |
| |
| /** |
| * Copy direct dispatch decisions out of the plan and into the slice table. Must be done after slice table is built. |
| * Note that this needs to happen whether or not the plan contains direct dispatch decisions. This |
| * is because the direct dispatch partially forgets some of the decisions it has taken. |
| **/ |
| if (gp_enable_direct_dispatch) |
| { |
| CopyDirectDispatchFromPlanToSliceTable(queryDesc->plannedstmt, estate ); |
| } |
| |
| /* Pass EXPLAIN ANALYZE flag to qExecs. */ |
| estate->es_sliceTable->doInstrument = queryDesc->doInstrument; |
| |
| /* set our global sliceid variable for elog. */ |
| currentSliceId = LocallyExecutingSliceIndex(estate); |
| |
| /* Determin OIDs for into relation, if any */ |
| if (queryDesc->plannedstmt->intoClause != NULL) |
| { |
| IntoClause *intoClause = queryDesc->plannedstmt->intoClause; |
| Relation pg_class_desc; |
| Relation pg_type_desc; |
| Oid reltablespace; |
| |
| /* MPP-10329 - must always dispatch the tablespace */ |
| if (intoClause->tableSpaceName) |
| { |
| reltablespace = get_tablespace_oid(intoClause->tableSpaceName); |
| if (!OidIsValid(reltablespace)) |
| ereport(ERROR, |
| (errcode(ERRCODE_UNDEFINED_OBJECT), |
| errmsg("tablespace \"%s\" does not exist", |
| intoClause->tableSpaceName))); |
| } |
| else |
| { |
| reltablespace = GetDefaultTablespace(); |
| |
| /* Need the real tablespace id for dispatch */ |
| if (!OidIsValid(reltablespace)) |
| /*reltablespace = MyDatabaseTableSpace;*/ |
| reltablespace = get_database_dts(MyDatabaseId); |
| |
| intoClause->tableSpaceName = get_tablespace_name(reltablespace); |
| } |
| |
| pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock); |
| pg_type_desc = heap_open(TypeRelationId, RowExclusiveLock); |
| |
| intoClause->oidInfo.relOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| elog(DEBUG3, "ExecutorStart assigned new intoOidInfo.relOid = %d", |
| intoClause->oidInfo.relOid); |
| |
| intoClause->oidInfo.comptypeOid = GetNewRelFileNode(reltablespace, false, pg_type_desc, false); |
| intoClause->oidInfo.toastOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| intoClause->oidInfo.toastIndexOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| intoClause->oidInfo.toastComptypeOid = GetNewRelFileNode(reltablespace, false, pg_type_desc, false); |
| intoClause->oidInfo.aosegOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| intoClause->oidInfo.aosegIndexOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| intoClause->oidInfo.aosegComptypeOid = GetNewRelFileNode(reltablespace, false, pg_type_desc, false); |
| intoClause->oidInfo.aoblkdirOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| intoClause->oidInfo.aoblkdirIndexOid = GetNewRelFileNode(reltablespace, false, pg_class_desc, false); |
| intoClause->oidInfo.aoblkdirComptypeOid = GetNewRelFileNode(reltablespace, false, pg_type_desc, false); |
| |
| heap_close(pg_class_desc, RowExclusiveLock); |
| heap_close(pg_type_desc, RowExclusiveLock); |
| |
| } |
| } |
| else if (Gp_role == GP_ROLE_EXECUTE) |
| { |
| |
| /* qDisp should have sent us a slice table via MPPEXEC */ |
| if (queryDesc->plannedstmt->sliceTable != NULL) |
| { |
| SliceTable *sliceTable; |
| Slice *slice; |
| |
| sliceTable = (SliceTable *)queryDesc->plannedstmt->sliceTable; |
| Assert(IsA(sliceTable, SliceTable)); |
| slice = (Slice *)list_nth(sliceTable->slices, sliceTable->localSlice); |
| Assert(IsA(slice, Slice)); |
| |
| estate->es_sliceTable = sliceTable; |
| |
| estate->currentSliceIdInPlan = slice->rootIndex; |
| estate->currentExecutingSliceId = slice->rootIndex; |
| |
| /* set our global sliceid variable for elog. */ |
| currentSliceId = LocallyExecutingSliceIndex(estate); |
| |
| /* Should we collect statistics for EXPLAIN ANALYZE? */ |
| estate->es_instrument = sliceTable->doInstrument; |
| queryDesc->doInstrument = sliceTable->doInstrument; |
| } |
| |
| /* InitPlan() will acquire locks by walking the entire plan |
| * tree -- we'd like to avoid acquiring the locks until |
| * *after* we've set up the interconnect */ |
| if (queryDesc->plannedstmt->nMotionNodes > 0) |
| { |
| int i; |
| |
| PG_TRY(); |
| { |
| for (i=1; i <= queryDesc->plannedstmt->nMotionNodes; i++) |
| { |
| InitMotionLayerNode(estate->motionlayer_context, i); |
| } |
| |
| estate->es_interconnect_is_setup = true; |
| |
| Assert(!estate->interconnect_context); |
| SetupInterconnect(estate); |
| Assert(estate->interconnect_context); |
| } |
| PG_CATCH(); |
| { |
| mppExecutorCleanup(queryDesc); |
| if (GP_ROLE_DISPATCH == Gp_role) |
| { |
| SetActiveQueryResource(queryDesc->savedResource); |
| } |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| } |
| } |
| |
| /* If the interconnect has been set up; we need to catch any |
| * errors to shut it down -- so we have to wrap InitPlan in a PG_TRY() block. */ |
| PG_TRY(); |
| { |
| /* |
| * Initialize the plan state tree |
| */ |
| Assert(CurrentMemoryContext == estate->es_query_cxt); |
| InitPlan(queryDesc, eflags); |
| |
| Assert(queryDesc->planstate); |
| |
| if (Gp_role == GP_ROLE_DISPATCH && |
| queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL) |
| { |
| /* Assign gang descriptions to the root slices of the slice forest. */ |
| InitRootSlices(queryDesc, queryDesc->planner_segments); |
| |
| if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) |
| { |
| /* |
| * Since we intend to execute the plan, inventory the slice tree, |
| * allocate gangs, and associate them with slices. |
| * |
| * For now, always use segment 'gp_singleton_segindex' for |
| * singleton gangs. |
| * |
| * On return, gangs have been allocated and CDBProcess lists have |
| * been filled in in the slice table.) |
| */ |
| AssignGangs(queryDesc, gp_singleton_segindex); |
| } |
| |
| } |
| |
| #ifdef USE_ASSERT_CHECKING |
| AssertSliceTableIsValid((struct SliceTable *) estate->es_sliceTable, queryDesc->plannedstmt); |
| #endif |
| |
| if (Debug_print_slice_table && Gp_role == GP_ROLE_DISPATCH) |
| elog_node_display(DEBUG3, "slice table", estate->es_sliceTable, true); |
| |
| /* |
| * If we're running as a QE and there's a slice table in our queryDesc, |
| * then we need to finish the EState setup we prepared for back in |
| * CdbExecQuery. |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE && estate->es_sliceTable != NULL) |
| { |
| MotionState *motionstate = NULL; |
| |
| /* |
| * Note that, at this point on a QE, the estate is setup (based on the |
| * slice table transmitted from the QD via MPPEXEC) so that fields |
| * es_sliceTable, cur_root_idx and es_cur_slice_idx are correct for |
| * the QE. |
| * |
| * If responsible for a non-root slice, arrange to enter the plan at the |
| * slice's sending Motion node rather than at the top. |
| */ |
| if (LocallyExecutingSliceIndex(estate) != RootSliceIndex(estate)) |
| { |
| motionstate = getMotionState(queryDesc->planstate, LocallyExecutingSliceIndex(estate)); |
| Assert(motionstate != NULL && IsA(motionstate, MotionState)); |
| |
| /* Patch Motion node so it looks like a top node. */ |
| motionstate->ps.plan->nMotionNodes = estate->es_sliceTable->nMotions; |
| motionstate->ps.plan->nParamExec = estate->es_sliceTable->nInitPlans; |
| } |
| |
| if (Debug_print_slice_table) |
| elog_node_display(DEBUG3, "slice table", estate->es_sliceTable, true); |
| |
| if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) |
| elog(DEBUG1, "seg%d executing slice%d under root slice%d", |
| GetQEIndex(), |
| LocallyExecutingSliceIndex(estate), |
| RootSliceIndex(estate)); |
| } |
| |
| if (Gp_role == GP_ROLE_DISPATCH && |
| (queryDesc->operation == CMD_INSERT || |
| queryDesc->operation == CMD_UPDATE || |
| queryDesc->operation == CMD_DELETE)) { |
| for (int32_t i = 0; i < estate->es_num_result_relations; ++i) |
| if (RelationIsMagmaTable2( |
| estate->es_result_relations[i].ri_RelationDesc->rd_id)) |
| ReadCacheHashEntryReviseOnCommit( |
| estate->es_result_relations[i].ri_RelationDesc->rd_id, |
| false); |
| } |
| |
| /* |
| * Are we going to dispatch this plan parallel? Only if we're running as |
| * a QD and the plan is a parallel plan. |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH && |
| queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL && |
| !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) |
| { |
| shouldDispatch = true && !readCacheEnabled(); |
| } |
| else |
| { |
| shouldDispatch = false; |
| } |
| |
| /* |
| * if in dispatch mode, time to serialize plan and query |
| * trees, and fire off cdb_exec command to each of the qexecs |
| */ |
| if (shouldDispatch) |
| { |
| /* |
| * collect catalogs which should be dispatched to QE |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| int i; |
| |
| List *result_segfileinfos = NIL; |
| PlannedStmt *plannedstmt = queryDesc->plannedstmt; |
| Assert(NULL == plannedstmt->contextdisp); |
| plannedstmt->contextdisp = CreateQueryContextInfo(); |
| |
| /* |
| * (GPSQL-872) Include all tuples from pg_aoseg_* |
| * catalog for relations that are used in FROM clause. |
| */ |
| if (plannedstmt->rtable) |
| { |
| /* |
| * find all partitioned tables for which static selection has been done. |
| * "partTables" will have the oids of these tables |
| * "oidsToDispatch" will have the corresponding lists of selected child oids for each table |
| */ |
| List *partTables = NIL; |
| List *oidsToDispatch = NIL; |
| findPartTableOidsToDispatch(plannedstmt->planTree, &partTables, &oidsToDispatch); |
| |
| ListCell *rtc; |
| int rti=0; |
| foreach(rtc, plannedstmt->rtable) |
| { |
| ++rti; /* List indices start with 1. */ |
| RangeTblEntry *rte = lfirst(rtc); |
| |
| if (rte->rtekind == RTE_RELATION) |
| { |
| ListCell *relc; |
| bool relForInsert = FALSE; |
| foreach(relc, plannedstmt->resultRelations) |
| { |
| int reli = lfirst_int(relc); |
| if (reli == rti) |
| { |
| relForInsert = TRUE; |
| break; |
| } |
| } |
| if (!relForInsert) |
| { |
| int idx = list_find_oid(partTables, rte->relid); |
| if (0 <= idx) |
| { |
| /* |
| * static selection performed on table - add only |
| * the oids of the selected parts |
| */ |
| List *oidList = list_nth(oidsToDispatch, idx); |
| ListCell *oidc = NULL; |
| foreach (oidc, oidList) |
| { |
| prepareDispatchedCatalogSingleRelation(plannedstmt->contextdisp, |
| lfirst_oid(oidc), FALSE /*forInsert*/, 0 /*segno*/); |
| } |
| } |
| else |
| { |
| prepareDispatchedCatalogRelation(plannedstmt->contextdisp, |
| rte->relid, FALSE, NULL, FALSE); |
| } |
| } |
| |
| } |
| } |
| |
| list_free(partTables); |
| list_free_deep(oidsToDispatch); |
| } |
| |
| for (i = 0; i < estate->es_num_result_relations; ++i) |
| { |
| ResultRelInfo * relinfo; |
| relinfo = &estate->es_result_relation_info[i]; |
| prepareDispatchedCatalogRelation(plannedstmt->contextdisp, |
| RelationGetRelid(relinfo->ri_RelationDesc), TRUE, estate->es_result_aosegnos, TRUE); |
| result_segfileinfos = GetResultRelSegFileInfos(RelationGetRelid(relinfo->ri_RelationDesc), |
| estate->es_result_aosegnos, result_segfileinfos); |
| if (dataStoredInHive(relinfo->ri_RelationDesc)) { |
| fetchUrlStoredInHive(RelationGetRelid(relinfo->ri_RelationDesc), &(plannedstmt->hiveUrl)); |
| } |
| |
| } |
| plannedstmt->result_segfileinfos = result_segfileinfos; |
| |
| if (plannedstmt->intoClause != NULL) |
| { |
| List *segment_segnos = SetSegnoForWrite(NIL, 0, GetQEGangNum(), true, true, false); |
| prepareDispatchedCatalogSingleRelation(plannedstmt->contextdisp, |
| plannedstmt->intoClause->oidInfo.relOid, TRUE, segment_segnos); |
| } |
| |
| |
| if (plannedstmt->rtable) |
| prepareDispatchedCatalog(plannedstmt->contextdisp, plannedstmt->rtable); |
| |
| if (plannedstmt->returningLists) |
| { |
| ListCell *lc; |
| foreach(lc, plannedstmt->returningLists) |
| { |
| List *targets = lfirst(lc); |
| |
| if (targets) |
| prepareDispatchedCatalogTargets(plannedstmt->contextdisp, targets); |
| } |
| } |
| |
| prepareDispatchedCatalogPlan(plannedstmt->contextdisp, plannedstmt->planTree); |
| |
| if (plannedstmt->subplans) |
| { |
| ListCell *lc; |
| foreach(lc, plannedstmt->subplans) |
| { |
| Plan *plantree = lfirst(lc); |
| if (plantree) |
| prepareDispatchedCatalogPlan(plannedstmt->contextdisp, plantree); |
| } |
| } |
| |
| /** |
| * travel the plan for external table scan to setup error table segno. |
| */ |
| QueryCxtWalkerCxt cxt; |
| cxt.base.node = (Node *)plannedstmt; |
| cxt.info = plannedstmt->contextdisp; |
| plan_tree_walker((Node *)plannedstmt->planTree, SetupSegnoForErrorTable, &cxt); |
| |
| FinalizeQueryContextInfo(plannedstmt->contextdisp); |
| } |
| |
| /* |
| * First, see whether we need to pre-execute any initPlan subplans. |
| */ |
| if (queryDesc->plannedstmt->planTree->nParamExec > 0) |
| { |
| preprocess_initplans(queryDesc); |
| |
| /* |
| * Copy the values of the preprocessed subplans to the |
| * external parameters. |
| */ |
| queryDesc->params = addRemoteExecParamsToParamList(queryDesc->plannedstmt, |
| queryDesc->params, |
| queryDesc->estate->es_param_exec_vals); |
| } |
| CommonPlanContext ctx; |
| bool newPlanner = can_convert_common_plan(queryDesc, &ctx); |
| estate->mainDispatchData = mainDispatchInit(queryDesc->resource); |
| estate->dispatch_data = NULL; |
| mainDispatchPrepare(estate->mainDispatchData, queryDesc, newPlanner); |
| mainDispatchRun(estate->mainDispatchData, &ctx, newPlanner); |
| |
| DropQueryContextInfo(queryDesc->plannedstmt->contextdisp); |
| queryDesc->plannedstmt->contextdisp = NULL; |
| } |
| |
| /* |
| * Get executor identity (who does the executor serve). we can assume |
| * Forward scan direction for now just for retrieving the identity. |
| */ |
| if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) |
| exec_identity = getGpExecIdentity(queryDesc, ForwardScanDirection, estate); |
| else |
| exec_identity = GP_IGNORE; |
| |
| /* non-root on QE */ |
| if (exec_identity == GP_NON_ROOT_ON_QE) |
| { |
| |
| MotionState *motionState = getMotionState(queryDesc->planstate, LocallyExecutingSliceIndex(estate)); |
| |
| Assert(motionState); |
| |
| Assert(IsA(motionState->ps.plan, Motion)); |
| |
| /* update the connection information, if needed */ |
| if (((PlanState *) motionState)->plan->nMotionNodes > 0) |
| { |
| ExecUpdateTransportState((PlanState *)motionState, |
| estate->interconnect_context); |
| } |
| } |
| else if (exec_identity == GP_ROOT_SLICE) |
| { |
| /* Run a root slice. */ |
| // new plan will use new interconnect |
| if (queryDesc->planstate != NULL && |
| queryDesc->planstate->plan->nMotionNodes > 0 |
| && !estate->es_interconnect_is_setup |
| && !queryDesc->newPlan) |
| { |
| estate->es_interconnect_is_setup = true; |
| |
| Assert(!estate->interconnect_context); |
| SetupInterconnect(estate); |
| Assert(estate->interconnect_context); |
| } |
| if (estate->es_interconnect_is_setup && !queryDesc->newPlan) |
| { |
| ExecUpdateTransportState(queryDesc->planstate, |
| estate->interconnect_context); |
| } |
| } |
| else if (exec_identity != GP_IGNORE) |
| { |
| /* should never happen */ |
| Assert(!"unsupported parallel execution strategy"); |
| } |
| |
| if(estate->es_interconnect_is_setup) |
| Assert(estate->interconnect_context != NULL); |
| |
| } |
| |
| PG_CATCH(); |
| { |
| if (queryDesc->plannedstmt->contextdisp) |
| { |
| DropQueryContextInfo(queryDesc->plannedstmt->contextdisp); |
| queryDesc->plannedstmt->contextdisp = NULL; |
| } |
| mppExecutorCleanup(queryDesc); |
| if (GP_ROLE_DISPATCH == Gp_role) |
| { |
| SetActiveQueryResource(queryDesc->savedResource); |
| } |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| if (DEBUG1 >= log_min_messages) |
| { |
| char msec_str[32]; |
| switch (check_log_duration(msec_str, false)) |
| { |
| case 1: |
| case 2: |
| ereport(LOG, (errmsg("duration to ExecutorStart end: %s ms", msec_str))); |
| break; |
| } |
| } |
| if (GP_ROLE_DISPATCH == Gp_role) |
| { |
| SetActiveQueryResource(queryDesc->savedResource); |
| } |
| } |
| END_MEMORY_ACCOUNT(); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecutorRun |
| * |
| * This is the main routine of the executor module. It accepts |
| * the query descriptor from the traffic cop and executes the |
| * query plan. |
| * |
| * ExecutorStart must have been called already. |
| * |
| * If direction is NoMovementScanDirection then nothing is done |
| * except to start up/shut down the destination. Otherwise, |
| * we retrieve up to 'count' tuples in the specified direction. |
| * |
| * Note: count = 0 is interpreted as no portal limit, i.e., run to |
| * completion. |
| * |
| * MPP: In here we must ensure to only run the plan and not call |
| * any setup/teardown items (unless in a CATCH block). |
| * |
| * ---------------------------------------------------------------- |
| */ |
| TupleTableSlot * |
| ExecutorRun(QueryDesc *queryDesc, |
| ScanDirection direction, long count) |
| { |
| EState *estate; |
| CmdType operation; |
| DestReceiver *dest; |
| bool sendTuples; |
| TupleTableSlot *result = NULL; |
| MemoryContext oldcontext; |
| /* |
| * NOTE: Any local vars that are set in the PG_TRY block and examined in the |
| * PG_CATCH block should be declared 'volatile'. (setjmp shenanigans) |
| */ |
| Slice *currentSlice; |
| GpExecIdentity exec_identity; |
| |
| /* sanity checks */ |
| Assert(queryDesc != NULL); |
| |
| estate = queryDesc->estate; |
| |
| Assert(estate != NULL); |
| |
| Assert(NULL != queryDesc->plannedstmt && NULL != queryDesc->plannedstmt->memoryAccount); |
| |
| START_MEMORY_ACCOUNT(queryDesc->plannedstmt->memoryAccount); |
| |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| elog(DEBUG1,"The time on entering ExecutorRun: %.3f ms", |
| 1000.0 * INSTR_TIME_GET_DOUBLE(time)); |
| } |
| |
| if (GP_ROLE_DISPATCH == Gp_role) |
| { |
| queryDesc->savedResource = GetActiveQueryResource(); |
| SetActiveQueryResource(queryDesc->resource); |
| } |
| /* |
| * Set dynamicTableScanInfo to the one in estate, and reset its value at |
| * the end of ExecutorRun(). This is to support two cases: |
| * |
| * (1) For PLPgsql/SQL functions. There might be multiple DynamicTableScanInfos |
| * involved, one for each statement in the function. We set the global variable |
| * dynamicTableScanInfo to the value for the running statement here, and reset |
| * its value at the end of ExecutorRun(). |
| * |
| * (2) For cursor queries. Each cursor query has its own set of DynamicTableScanInfos, |
| * and they could be called in different orders. |
| */ |
| DynamicTableScanInfo *origDynamicTableScanInfo = dynamicTableScanInfo; |
| dynamicTableScanInfo = estate->dynamicTableScanInfo; |
| |
| /* |
| * Switch into per-query memory context |
| */ |
| oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); |
| |
| /* |
| * CDB: Update global slice id for log messages. |
| */ |
| currentSlice = getCurrentSlice(estate, LocallyExecutingSliceIndex(estate)); |
| if (currentSlice) |
| { |
| if (Gp_role == GP_ROLE_EXECUTE || |
| sliceRunsOnQD(currentSlice)) |
| currentSliceId = currentSlice->sliceIndex; |
| } |
| |
| /* |
| * extract information from the query descriptor and the query feature. |
| */ |
| operation = queryDesc->operation; |
| dest = queryDesc->dest; |
| |
| /* |
| * startup tuple receiver, if we will be emitting tuples |
| */ |
| estate->es_processed = 0; |
| estate->es_lastoid = InvalidOid; |
| |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) |
| { |
| /* initialize execution status structure for delete, update */ |
| HASHCTL hashInfo; |
| int hashFlag; |
| MemSet(&hashInfo, 0, sizeof(hashInfo)); |
| hashInfo.keysize = sizeof(Oid); |
| hashInfo.entrysize = sizeof(ExternalInsertDescHashEntry); |
| hashInfo.hash = tag_hash; |
| hashFlag = (HASH_ELEM | HASH_FUNCTION); |
| if (operation == CMD_DELETE) |
| { |
| estate->es_ext_del_oid_desc = hash_create( |
| "Oid ExternalDeleteDesc Hash", 16, &hashInfo, hashFlag); |
| if (!estate->es_ext_del_oid_desc) |
| { |
| elog(ERROR, "failed to create hash for delete"); |
| } |
| } |
| else |
| { |
| estate->es_ext_upd_oid_desc = hash_create( |
| "Oid ExternalUpdateDesc Hash", 16, &hashInfo, hashFlag); |
| if (!estate->es_ext_upd_oid_desc) |
| { |
| elog(ERROR, "failed to create hash for update"); |
| } |
| } |
| } |
| sendTuples = (queryDesc->tupDesc != NULL && |
| (operation == CMD_SELECT || |
| queryDesc->plannedstmt->returningLists)); |
| |
| if (sendTuples) |
| (*dest->rStartup) (dest, operation, queryDesc->tupDesc); |
| |
| /* |
| * Need a try/catch block here so that if an ereport is called from |
| * within ExecutePlan, we can clean up by calling CdbCheckDispatchResult. |
| * This cleans up the asynchronous commands running through the threads launched from |
| * CdbDispatchCommand. |
| */ |
| PG_TRY(); |
| { |
| /* |
| * Run the plan locally. There are three ways; |
| * |
| * 1. Do nothing |
| * 2. Run a root slice |
| * 3. Run a non-root slice on a QE. |
| * |
| * Here we decide what is our identity -- root slice, non-root |
| * on QE or other (in which case we do nothing), and then run |
| * the plan if required. For more information see |
| * getGpExecIdentity() in execUtils. |
| */ |
| exec_identity = getGpExecIdentity(queryDesc, direction, estate); |
| |
| if (exec_identity == GP_IGNORE) |
| { |
| result = NULL; |
| } |
| else if (exec_identity == GP_NON_ROOT_ON_QE) |
| { |
| /* |
| * Run a non-root slice on a QE. |
| * |
| * Since the top Plan node is a (Sending) Motion, run the plan |
| * forward to completion. The plan won't return tuples locally |
| * (tuples go out over the interconnect), so the destination is |
| * uninteresting. The command type should be SELECT, however, to |
| * avoid other sorts of DML processing.. |
| * |
| * This is the center of slice plan activity -- here we arrange to |
| * blunder into the middle of the plan rather than entering at the |
| * root. |
| */ |
| |
| MotionState *motionState = getMotionState(queryDesc->planstate, LocallyExecutingSliceIndex(estate)); |
| |
| Assert(motionState); |
| |
| estate->es_plannedstmt = queryDesc->plannedstmt; |
| result = ExecutePlan(estate, |
| (PlanState *) motionState, |
| CMD_SELECT, |
| 0, |
| ForwardScanDirection, |
| dest); |
| } else if (exec_identity == GP_ROOT_SLICE) { |
| /* |
| * Run a root slice |
| * It corresponds to the "normal" path through the executor |
| * in that we enter the plan at the top and count on the |
| * motion nodes at the fringe of the top slice to return |
| * without ever calling nodes below them. |
| */ |
| if (readCacheEnabled()) { |
| do { |
| (*dest->receiveSlot)(NULL, dest); |
| } while (!readCacheEof()); |
| result = NULL; |
| } else if (queryDesc->newPlan) { |
| exec_mpp_query_new(queryDesc->estate->mainDispatchData, |
| queryDesc->newPlan->str, |
| queryDesc->newPlan->len, currentSliceId, |
| false, dest, queryDesc->planstate); |
| result = NULL; |
| } else { |
| estate->es_plannedstmt = queryDesc->plannedstmt; |
| result = ExecutePlan(estate, queryDesc->planstate, |
| operation, count, direction, dest); |
| } |
| } else { |
| /* should never happen */ |
| Assert(!"undefined parallel execution strategy"); |
| } |
| |
| /* |
| * if result is null we got end-of-stream. We need to mark it |
| * since with a cursor end-of-stream will only be received with |
| * the fetch that returns the last tuple. ExecutorEnd needs to |
| * know if EOS was received in order to do the right cleanup. |
| */ |
| if(result == NULL) |
| estate->es_got_eos = true; |
| |
| dynamicTableScanInfo = origDynamicTableScanInfo; |
| } |
| PG_CATCH(); |
| { |
| dynamicTableScanInfo = origDynamicTableScanInfo; |
| |
| /* If EXPLAIN ANALYZE, let qExec try to return stats to qDisp. */ |
| if (estate->es_sliceTable && |
| estate->es_sliceTable->doInstrument && |
| Gp_role == GP_ROLE_EXECUTE) |
| { |
| PG_TRY(); |
| { |
| cdbexplain_sendExecStats(queryDesc); |
| } |
| PG_CATCH(); |
| { |
| /* Close down interconnect etc. */ |
| mppExecutorCleanup(queryDesc); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| } |
| |
| /* Close down interconnect etc. */ |
| mppExecutorCleanup(queryDesc); |
| if (GP_ROLE_DISPATCH == Gp_role) |
| { |
| SetActiveQueryResource(queryDesc->savedResource); |
| } |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| /* |
| * shutdown tuple receiver, if we started it |
| */ |
| if (sendTuples) |
| (*dest->rShutdown) (dest); |
| |
| MemoryContextSwitchTo(oldcontext); |
| END_MEMORY_ACCOUNT(); |
| |
| if (Debug_print_execution_detail) { |
| instr_time time; |
| INSTR_TIME_SET_CURRENT(time); |
| elog(DEBUG1,"The time before quit ExecutorRun: %.3f ms", |
| 1000.0 * INSTR_TIME_GET_DOUBLE(time)); |
| } |
| |
| if (GP_ROLE_DISPATCH == Gp_role) |
| { |
| SetActiveQueryResource(queryDesc->savedResource); |
| } |
| |
| return result; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecutorEnd |
| * |
| * This routine must be called at the end of execution of any |
| * query plan |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecutorEnd(QueryDesc *queryDesc) |
| { |
| EState *estate; |
| MemoryContext oldcontext; |
| MemoryContext tmpcontext; |
| |
| /* sanity checks */ |
| Assert(queryDesc != NULL); |
| |
| estate = queryDesc->estate; |
| |
| Assert(estate != NULL); |
| |
| Assert(NULL != queryDesc->plannedstmt && NULL != queryDesc->plannedstmt->memoryAccount); |
| |
| START_MEMORY_ACCOUNT(queryDesc->plannedstmt->memoryAccount); |
| |
| if (DEBUG1 >= log_min_messages) |
| { |
| char msec_str[32]; |
| switch (check_log_duration(msec_str, false)) |
| { |
| case 1: |
| case 2: |
| ereport(LOG, (errmsg("duration to ExecutorEnd starting: %s ms", msec_str), |
| errOmitLocation(true))); |
| break; |
| } |
| } |
| |
| if (gp_partitioning_dynamic_selection_log && |
| estate->dynamicTableScanInfo != NULL && |
| estate->dynamicTableScanInfo->numScans > 0) |
| { |
| for (int scanNo = 0; scanNo < estate->dynamicTableScanInfo->numScans; scanNo++) |
| { |
| dumpDynamicTableScanPidIndex(scanNo); |
| } |
| } |
| |
| /* |
| * Switch into per-query memory context to run ExecEndPlan |
| */ |
| oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); |
| |
| /* |
| * If EXPLAIN ANALYZE, qExec returns stats to qDisp now. |
| */ |
| if (estate->es_sliceTable && |
| estate->es_sliceTable->doInstrument && |
| Gp_role == GP_ROLE_EXECUTE) |
| cdbexplain_sendExecStats(queryDesc); |
| |
| /* |
| * if needed, collect mpp dispatch results and tear down |
| * all mpp specific resources (interconnect, seq server). |
| */ |
| PG_TRY(); |
| { |
| mppExecutorFinishup(queryDesc); |
| } |
| PG_CATCH(); |
| { |
| /* Cleanup the global resource reference for spi/function resource inheritate. */ |
| if (Gp_role == GP_ROLE_DISPATCH) { |
| /* we need to free resource in old memory-context. */ |
| tmpcontext = MemoryContextSwitchTo(oldcontext); |
| AutoFreeResource(queryDesc->resource); |
| queryDesc->resource = NULL; |
| oldcontext = MemoryContextSwitchTo(tmpcontext); |
| } |
| |
| /* |
| * we got an error. do all the necessary cleanup. |
| */ |
| mppExecutorCleanup(queryDesc); |
| |
| /* |
| * Remove our own query's motion layer. |
| */ |
| RemoveMotionLayer(estate->motionlayer_context, true); |
| |
| /* |
| * Release EState and per-query memory context. This should release |
| * everything the executor has allocated. |
| */ |
| FreeExecutorState(estate); |
| |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| /* Cleanup the global resource reference for spi/function resource inheritate. */ |
| if ( Gp_role == GP_ROLE_DISPATCH ) { |
| /* we need to free resource in old memory-context. */ |
| tmpcontext = MemoryContextSwitchTo(oldcontext); |
| AutoFreeResource(queryDesc->resource); |
| queryDesc->resource = NULL; |
| oldcontext = MemoryContextSwitchTo(tmpcontext); |
| } |
| |
| /* cleanup execution status structure for delete if necessary */ |
| HASH_SEQ_STATUS status; |
| |
| ExternalInsertDescHashEntry *extDelDescEntry; |
| hash_seq_init(&status, estate->es_ext_del_oid_desc); |
| while ((extDelDescEntry = (ExternalInsertDescHashEntry *) hash_seq_search(&status)) != NULL) |
| { |
| ExternalInsertDesc extDelDesc = extDelDescEntry->ext_ins_desc; |
| InvokeMagmaEndDelete(extDelDesc->ext_ps_delete_funcs.enddeletes, extDelDesc); |
| } |
| |
| ExternalInsertDescHashEntry *extUpdDescEntry; |
| hash_seq_init(&status, estate->es_ext_upd_oid_desc); |
| while ((extUpdDescEntry = (ExternalInsertDescHashEntry *) hash_seq_search(&status)) != NULL) |
| { |
| ExternalInsertDesc extUpdDesc = extUpdDescEntry->ext_ins_desc; |
| elog(LOG, "exec update end update: %d", extUpdDescEntry->ext_ins_oid); |
| estate->es_processed += InvokeMagmaEndUpdate(extUpdDesc->ext_ps_update_funcs.endupdates, extUpdDesc); |
| } |
| |
| hash_destroy(estate->es_ext_del_oid_desc); |
| hash_destroy(estate->es_ext_upd_oid_desc); |
| |
| /* |
| * If normal termination, let each operator clean itself up. |
| * Otherwise don't risk it... an error might have left some |
| * structures in an inconsistent state. |
| */ |
| ExecEndPlan(queryDesc->planstate, estate); |
| |
| WorkfileQueryspace_ReleaseEntry(); |
| |
| /* |
| * Remove our own query's motion layer. |
| */ |
| RemoveMotionLayer(estate->motionlayer_context, true); |
| |
| /* |
| * Close the SELECT INTO relation if any |
| */ |
| if (estate->es_select_into) |
| CloseIntoRel(queryDesc); |
| |
| /* |
| * Must switch out of context before destroying it |
| */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| queryDesc->es_processed = estate->es_processed; |
| queryDesc->es_lastoid = estate->es_lastoid; |
| |
| /* |
| * Release EState and per-query memory context. This should release |
| * everything the executor has allocated. |
| */ |
| FreeExecutorState(estate); |
| |
| /** |
| * Perfmon related stuff. |
| */ |
| if (gp_enable_gpperfmon |
| && Gp_role == GP_ROLE_DISPATCH |
| && queryDesc->gpmon_pkt) |
| { |
| gpmon_qlog_query_end(queryDesc->gpmon_pkt); |
| queryDesc->gpmon_pkt = NULL; |
| } |
| |
| /* Reset queryDesc fields that no longer point to anything */ |
| queryDesc->tupDesc = NULL; |
| queryDesc->estate = NULL; |
| queryDesc->planstate = NULL; |
| if (DEBUG1 >= log_min_messages) |
| { |
| char msec_str[32]; |
| switch (check_log_duration(msec_str, false)) |
| { |
| case 1: |
| case 2: |
| ereport(LOG, (errmsg("duration to ExecutorEnd end: %s ms", msec_str), |
| errOmitLocation(true))); |
| break; |
| } |
| } |
| END_MEMORY_ACCOUNT(); |
| |
| if (gp_dump_memory_usage) |
| { |
| MemoryAccounting_SaveToFile(currentSliceId); |
| } |
| ReportOOMConsumption(); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecutorRewind |
| * |
| * This routine may be called on an open queryDesc to rewind it |
| * to the start. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecutorRewind(QueryDesc *queryDesc) |
| { |
| EState *estate; |
| MemoryContext oldcontext; |
| |
| /* sanity checks */ |
| Assert(queryDesc != NULL); |
| |
| estate = queryDesc->estate; |
| |
| Assert(estate != NULL); |
| |
| Assert(NULL != queryDesc->plannedstmt && NULL != queryDesc->plannedstmt->memoryAccount); |
| |
| START_MEMORY_ACCOUNT(queryDesc->plannedstmt->memoryAccount); |
| |
| /* It's probably not sensible to rescan updating queries */ |
| Assert(queryDesc->operation == CMD_SELECT); |
| |
| /* |
| * Switch into per-query memory context |
| */ |
| oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); |
| |
| /* |
| * rescan plan |
| */ |
| ExecReScan(queryDesc->planstate, NULL); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| END_MEMORY_ACCOUNT(); |
| } |
| |
| |
| |
| /* |
| * This function is used to check if the current statement will perform any writes. |
| * It is used to enforce: |
| * (1) read-only mode (both fts and transcation isolation level read only) |
| * as well as |
| * (2) to keep track of when a distributed transaction becomes |
| * "dirty" and will require 2pc. |
| Check that the query does not imply any writes to non-temp tables. |
| */ |
| static void |
| ExecCheckXactReadOnly(PlannedStmt *plannedstmt) |
| { |
| ListCell *l; |
| int rti; |
| bool changesTempTables = false; |
| |
| /* |
| * CREATE TABLE AS or SELECT INTO? |
| * |
| * XXX should we allow this if the destination is temp? |
| */ |
| if (plannedstmt->intoClause != NULL) |
| { |
| Assert(plannedstmt->intoClause->rel); |
| if (plannedstmt->intoClause->rel->istemp) |
| changesTempTables = true; |
| else |
| goto fail; |
| } |
| |
| /* Fail if write permissions are requested on any non-temp table */ |
| rti = 0; |
| foreach(l, plannedstmt->rtable) |
| { |
| RangeTblEntry *rte = lfirst(l); |
| |
| rti++; |
| |
| if (rte->rtekind == RTE_SUBQUERY) |
| { |
| continue; |
| } |
| |
| if (rte->rtekind != RTE_RELATION) |
| continue; |
| |
| if ((rte->requiredPerms & (~ACL_SELECT)) == 0) |
| continue; |
| |
| if (isTempNamespace(get_rel_namespace(rte->relid))) |
| { |
| changesTempTables = true; |
| continue; |
| } |
| |
| /* CDB: Allow SELECT FOR SHARE/UPDATE * |
| * |
| */ |
| if ((rte->requiredPerms & ~(ACL_SELECT | ACL_SELECT_FOR_UPDATE)) == 0) |
| { |
| ListCell *cell; |
| bool foundRTI = false; |
| |
| foreach(cell, plannedstmt->rowMarks) |
| { |
| RowMarkClause *rmc = lfirst(cell); |
| if( rmc->rti == rti ) |
| { |
| foundRTI = true; |
| break; |
| } |
| } |
| |
| if (foundRTI) |
| continue; |
| } |
| |
| goto fail; |
| } |
| if (changesTempTables) |
| ExecutorMarkTransactionDoesWrites(); |
| return; |
| |
| fail: |
| if (XactReadOnly) |
| ereport(ERROR, |
| (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), |
| errmsg("transaction is read-only"))); |
| else |
| ExecutorMarkTransactionDoesWrites(); |
| } |
| |
| /* |
| * BuildPartitionNodeFromRoot |
| * Build PartitionNode for the root partition of a given partition oid. |
| */ |
| static PartitionNode * |
| BuildPartitionNodeFromRoot(Oid relid) |
| { |
| PartitionNode *partitionNode = NULL; |
| |
| if (rel_is_child_partition(relid)) |
| { |
| relid = rel_partition_get_master(relid); |
| } |
| |
| partitionNode = RelationBuildPartitionDescByOid(relid, false /* inctemplate */); |
| |
| return partitionNode; |
| } |
| |
| /* |
| * createPartitionAccessMethods |
| * Create a PartitionAccessMethods object. |
| * |
| * Note that the memory context for the access method is not set at this point. It will |
| * be set during execution. |
| */ |
| static PartitionAccessMethods * |
| createPartitionAccessMethods(int numLevels) |
| { |
| PartitionAccessMethods *accessMethods = palloc(sizeof(PartitionAccessMethods));; |
| accessMethods->partLevels = numLevels; |
| accessMethods->amstate = palloc0(numLevels * sizeof(void *)); |
| accessMethods->part_cxt = NULL; |
| |
| return accessMethods; |
| } |
| |
| /* |
| * createPartitionState |
| * Create a PartitionState object. |
| * |
| * Note that the memory context for the access method is not set at this point. It will |
| * be set during execution. |
| */ |
| PartitionState * |
| createPartitionState(PartitionNode *partsAndRules, |
| int resultPartSize) |
| { |
| Assert(partsAndRules != NULL); |
| |
| PartitionState *partitionState = makeNode(PartitionState); |
| partitionState->accessMethods = createPartitionAccessMethods(num_partition_levels(partsAndRules)); |
| partitionState->max_partition_attr = max_partition_attr(partsAndRules); |
| partitionState->result_partition_array_size = resultPartSize; |
| |
| return partitionState; |
| } |
| |
| /* |
| * InitializeResultRelations |
| * Initialize result relation relevant information |
| * |
| * CDB: Note that we need this info even if we aren't the slice that will be doing |
| * the actual updating, since it's where we learn things, such as if the row needs to |
| * contain OIDs or not. |
| */ |
| static void |
| InitializeResultRelations(PlannedStmt *plannedstmt, EState *estate, CmdType operation, int eflags) |
| { |
| Assert(plannedstmt != NULL && estate != NULL); |
| |
| if (plannedstmt->resultRelations == NULL) |
| { |
| /* |
| * if no result relation, then set state appropriately |
| */ |
| estate->es_result_relations = NULL; |
| estate->es_num_result_relations = 0; |
| estate->es_result_relation_info = NULL; |
| estate->es_result_partitions = NULL; |
| estate->es_result_aosegnos = NIL; |
| |
| return; |
| } |
| |
| List *resultRelations = plannedstmt->resultRelations; |
| int numResultRelations = list_length(resultRelations); |
| ResultRelInfo *resultRelInfos; |
| |
| List *rangeTable = estate->es_range_table; |
| |
| if (numResultRelations > 1) |
| { |
| /* |
| * Multiple result relations (due to inheritance) |
| * stmt->resultRelations identifies them all |
| */ |
| ResultRelInfo *resultRelInfo; |
| |
| resultRelInfos = (ResultRelInfo *) |
| palloc(numResultRelations * sizeof(ResultRelInfo)); |
| resultRelInfo = resultRelInfos; |
| ListCell *lc = NULL; |
| foreach(lc, resultRelations) |
| { |
| initResultRelInfo(resultRelInfo, |
| lfirst_int(lc), |
| rangeTable, |
| operation, |
| estate->es_instrument, |
| (Gp_role != GP_ROLE_EXECUTE || Gp_is_writer)); |
| |
| resultRelInfo++; |
| } |
| } |
| else |
| { |
| /* |
| * Single result relation identified by stmt->queryTree->resultRelation |
| */ |
| numResultRelations = 1; |
| resultRelInfos = (ResultRelInfo *) palloc(sizeof(ResultRelInfo)); |
| initResultRelInfo(resultRelInfos, |
| linitial_int(plannedstmt->resultRelations), |
| rangeTable, |
| operation, |
| estate->es_instrument, |
| (Gp_role != GP_ROLE_EXECUTE || Gp_is_writer)); |
| } |
| |
| /* |
| * In some occasions when inserting data into a target relations we |
| * need to pass some specific information from the QD to the QEs. |
| * we do this information exchange here, via the parseTree. For now |
| * this is used for partitioned and append-only tables. |
| */ |
| |
| estate->es_result_relations = resultRelInfos; |
| estate->es_num_result_relations = numResultRelations; |
| /* Initialize to first or only result rel */ |
| estate->es_result_relation_info = resultRelInfos; |
| |
| if (Gp_role == GP_ROLE_EXECUTE) |
| { |
| estate->es_result_partitions = plannedstmt->result_partitions; |
| estate->es_result_aosegnos = plannedstmt->result_aosegnos; |
| estate->es_result_segfileinfos = plannedstmt->result_segfileinfos; |
| } |
| else |
| { |
| Oid relid = getrelid(linitial_int(plannedstmt->resultRelations), rangeTable); |
| estate->es_result_partitions = BuildPartitionNodeFromRoot(relid); |
| |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) { |
| estate->es_result_aosegnos = NIL; |
| |
| if (get_rel_relstorage(relid) == RELSTORAGE_ORC && |
| !(eflags & EXEC_FLAG_EXPLAIN_ONLY) && rel_has_index(relid)) { |
| ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot UPDATE/DELETE on table:%s because the table has indexes.", |
| get_rel_name(relid)))); |
| } |
| |
| if (get_rel_relstorage(relid) == RELSTORAGE_ORC && |
| !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) { |
| ListCell *cell; |
| foreach (cell, plannedstmt->resultRelations) { |
| Oid myRelId = getrelid(lfirst_int(cell), rangeTable); |
| if (rel_is_partitioned(myRelId)) continue; |
| if (!GetFileSplitsOfSegmentMagma(plannedstmt->scantable_splits, myRelId)) continue; |
| estate->es_plannedstmt->relFileNodeInfo = lappend_oid( |
| estate->es_plannedstmt->relFileNodeInfo, myRelId); |
| estate->es_plannedstmt->relFileNodeInfo = |
| lappend_oid(estate->es_plannedstmt->relFileNodeInfo, |
| orcSetNewRelfilenode(myRelId)); |
| AORelRemoveHashEntryOnCommit(myRelId); |
| } |
| } |
| } else { |
| if (get_rel_relstorage(relid) == RELSTORAGE_ORC && |
| !(eflags & EXEC_FLAG_EXPLAIN_ONLY) && rel_has_index(relid)) { |
| ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot INSERT on table:%s because the table has indexes.", |
| get_rel_name(relid)))); |
| } |
| List *all_relids = NIL; |
| all_relids = lappend_oid(all_relids, relid); |
| if (rel_is_partitioned(relid)) |
| { |
| all_relids = list_concat(all_relids, all_partition_relids(estate->es_result_partitions)); |
| } |
| |
| estate->es_result_aosegnos = |
| assignPerRelSegno(all_relids, GetQEGangNum()); |
| |
| /* Set any QD resultrels segno, just in case. The QEs set |
| * their own in ExecInsert(). */ |
| int relno = 0; |
| ResultRelInfo *relinfo; |
| for (relno = 0; relno < numResultRelations; relno++) { |
| relinfo = &(resultRelInfos[relno]); |
| ResultRelInfoSetSegno(relinfo, estate->es_result_aosegnos); |
| } |
| |
| ListCell *cell; |
| foreach (cell, all_relids) { |
| Oid relid = lfirst_oid(cell); |
| if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) { |
| CreateAoSegFileOnMaster(relid, |
| estate->es_result_aosegnos); |
| } |
| } |
| } |
| } |
| |
| plannedstmt->result_partitions = estate->es_result_partitions; |
| plannedstmt->result_aosegnos = estate->es_result_aosegnos; |
| |
| estate->es_partition_state = NULL; |
| if (estate->es_result_partitions) |
| { |
| estate->es_partition_state = createPartitionState(estate->es_result_partitions, |
| estate->es_num_result_relations); |
| } |
| } |
| |
| /* |
| * InitializeQueryPartsMetadata |
| * Initialize partitioning metadata for all partitions involved in the query. |
| */ |
| static void |
| InitializeQueryPartsMetadata(PlannedStmt *plannedstmt, EState *estate) |
| { |
| Assert(plannedstmt != NULL && estate != NULL); |
| |
| if (plannedstmt->queryPartOids == NIL) |
| { |
| plannedstmt->queryPartsMetadata = NIL; |
| return; |
| } |
| |
| if (Gp_role != GP_ROLE_EXECUTE) |
| { |
| /* |
| * Non-QEs populate the partitioning metadata for all |
| * relevant partitions in the query. |
| */ |
| plannedstmt->queryPartsMetadata = NIL; |
| ListCell *lc = NULL; |
| foreach (lc, plannedstmt->queryPartOids) |
| { |
| Oid relid = (Oid)lfirst_oid(lc); |
| PartitionNode *partitionNode = BuildPartitionNodeFromRoot(relid); |
| Assert(partitionNode != NULL); |
| plannedstmt->queryPartsMetadata = |
| lappend(plannedstmt->queryPartsMetadata, partitionNode); |
| } |
| } |
| |
| /* Populate the partitioning metadata to EState */ |
| Assert(estate->dynamicTableScanInfo != NULL && |
| estate->dynamicTableScanInfo->memoryContext != NULL); |
| |
| MemoryContext oldContext = MemoryContextSwitchTo(estate->dynamicTableScanInfo->memoryContext); |
| |
| ListCell *lc = NULL; |
| foreach(lc, plannedstmt->queryPartsMetadata) |
| { |
| PartitionNode *partsAndRules = (PartitionNode *)lfirst(lc); |
| |
| PartitionMetadata *metadata = palloc(sizeof(PartitionMetadata)); |
| metadata->partsAndRules = partsAndRules; |
| Assert(metadata->partsAndRules != NULL); |
| metadata->accessMethods = createPartitionAccessMethods(num_partition_levels(metadata->partsAndRules)); |
| estate->dynamicTableScanInfo->partsMetadata = |
| lappend(estate->dynamicTableScanInfo->partsMetadata, metadata); |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| /* |
| * InitializePartsMetadata |
| * Initialize partitioning metadata for the given partitioned table oid |
| */ |
| List * |
| InitializePartsMetadata(Oid rootOid) |
| { |
| PartitionMetadata *metadata = palloc(sizeof(PartitionMetadata)); |
| metadata->partsAndRules = BuildPartitionNodeFromRoot(rootOid); |
| Assert(metadata->partsAndRules != NULL); |
| |
| metadata->accessMethods = createPartitionAccessMethods(num_partition_levels(metadata->partsAndRules)); |
| return list_make1(metadata); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * InitPlan |
| * |
| * Initializes the query plan: open files, allocate storage |
| * and start up the rule manager |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| InitPlan(QueryDesc *queryDesc, int eflags) |
| { |
| CmdType operation = queryDesc->operation; |
| PlannedStmt *plannedstmt = queryDesc->plannedstmt; |
| Plan *plan = plannedstmt->planTree; |
| List *rangeTable = plannedstmt->rtable; |
| EState *estate = queryDesc->estate; |
| PlanState *planstate; |
| TupleDesc tupType; |
| ListCell *l; |
| bool shouldDispatch = Gp_role == GP_ROLE_DISPATCH && plannedstmt->planTree->dispatch == DISPATCH_PARALLEL; |
| |
| Assert(plannedstmt->intoPolicy == NULL |
| || plannedstmt->intoPolicy->ptype == POLICYTYPE_PARTITIONED); |
| |
| if (DEBUG1 >= log_min_messages) |
| { |
| char msec_str[32]; |
| switch (check_log_duration(msec_str, false)) |
| { |
| case 1: |
| case 2: |
| ereport(LOG, (errmsg("duration to InitPlan start: %s ms", msec_str), |
| errOmitLocation(true))); |
| break; |
| default: |
| /* do nothing */ |
| break; |
| } |
| } |
| |
| /* |
| * Do permissions checks. It's sufficient to examine the query's top |
| * rangetable here --- subplan RTEs will be checked during |
| * ExecInitSubPlan(). |
| */ |
| if (Gp_role != GP_ROLE_EXECUTE) |
| { |
| ExecCheckRTPerms(plannedstmt->rtable); |
| } |
| |
| /* |
| * get information from query descriptor |
| */ |
| rangeTable = plannedstmt->rtable; |
| |
| /* |
| * initialize the node's execution state |
| */ |
| estate->es_range_table = rangeTable; |
| |
| /* |
| * if there is a result relation, initialize result relation stuff |
| * |
| * CDB: Note that we need this info even if we aren't the slice that will be doing |
| * the actual updating, since it's where we learn things, such as if the row needs to |
| * contain OIDs or not. |
| */ |
| InitializeResultRelations(plannedstmt, estate, operation, eflags); |
| |
| /* |
| * If there are partitions involved in the query, initialize partitioning metadata. |
| */ |
| InitializeQueryPartsMetadata(plannedstmt, estate); |
| |
| /* |
| * set the number of partition selectors for every dynamic scan id |
| */ |
| estate->dynamicTableScanInfo->numSelectorsPerScanId = plannedstmt->numSelectorsPerScanId; |
| |
| /* |
| * Detect whether we're doing SELECT INTO. If so, set the es_into_oids |
| * flag appropriately so that the plan tree will be initialized with the |
| * correct tuple descriptors. (Other SELECT INTO stuff comes later.) |
| */ |
| estate->es_select_into = false; |
| if (operation == CMD_SELECT && plannedstmt->intoClause != NULL) |
| { |
| estate->es_select_into = true; |
| estate->es_into_oids = interpretOidsOption(plannedstmt->intoClause->options); |
| } |
| |
| /* |
| * Have to lock relations selected FOR UPDATE/FOR SHARE before we |
| * initialize the plan tree, else we'd be doing a lock upgrade. While we |
| * are at it, build the ExecRowMark list. |
| */ |
| estate->es_rowMarks = NIL; |
| foreach(l, plannedstmt->rowMarks) |
| { |
| RowMarkClause *rc = (RowMarkClause *) lfirst(l); |
| Oid relid = getrelid(rc->rti, rangeTable); |
| Relation relation; |
| LOCKMODE lockmode; |
| bool lockUpgraded; |
| ExecRowMark *erm; |
| |
| /* CDB: On QD, lock whole table in S or X mode, if distributed. */ |
| lockmode = rc->forUpdate ? RowExclusiveLock : RowShareLock; |
| relation = CdbOpenRelation(relid, lockmode, rc->noWait, &lockUpgraded); |
| if (lockUpgraded) |
| { |
| heap_close(relation, NoLock); |
| continue; |
| } |
| |
| erm = (ExecRowMark *) palloc(sizeof(ExecRowMark)); |
| erm->relation = relation; |
| erm->rti = rc->rti; |
| erm->forUpdate = rc->forUpdate; |
| erm->noWait = rc->noWait; |
| snprintf(erm->resname, sizeof(erm->resname), "ctid%u", rc->rti); |
| estate->es_rowMarks = lappend(estate->es_rowMarks, erm); |
| } |
| |
| /* |
| * Initialize the executor "tuple" table. We need slots for all the plan |
| * nodes, plus possibly output slots for the junkfilter(s). At this point |
| * we aren't sure if we need junkfilters, so just add slots for them |
| * unconditionally. Also, if it's not a SELECT, set up a slot for use for |
| * trigger output tuples. Also, one for RETURNING-list evaluation. |
| */ |
| { |
| /* Slots for the main plan tree */ |
| int nSlots = ExecCountSlotsNode(plannedstmt->planTree); |
| |
| /* |
| * Note that, here, PG loops over the subplans in PlannedStmt, counts |
| * the slots in each, and allocates the slots in the top-level state's |
| * tuple table. GP operates differently. It uses the subplan code in |
| * nodeSubplan.c to count slots for the subplan and to allocate them |
| * in the subplan's state. |
| * |
| * Since every slice does this, GP may over-allocate tuple slots, |
| * however, the cost is small (about 80 bytes per entry) and probably |
| * worth the simplicity of the approach. |
| */ |
| |
| /* Add slots for junkfilter(s) */ |
| if (plannedstmt->resultRelations != NIL) |
| nSlots += list_length(plannedstmt->resultRelations); |
| else |
| nSlots += 1; |
| if (operation != CMD_SELECT) |
| nSlots++; /* for es_trig_tuple_slot */ |
| if (plannedstmt->returningLists) |
| nSlots++; /* for RETURNING projection */ |
| |
| estate->es_tupleTable = ExecCreateTupleTable(nSlots); |
| |
| if (operation != CMD_SELECT) |
| estate->es_trig_tuple_slot = |
| ExecAllocTableSlot(estate->es_tupleTable); |
| } |
| |
| /* mark EvalPlanQual not active */ |
| estate->es_plannedstmt = plannedstmt; |
| estate->es_evalPlanQual = NULL; |
| estate->es_evTupleNull = NULL; |
| estate->es_evTuple = NULL; |
| estate->es_useEvalPlan = false; |
| |
| /* |
| * In QD, we need to initialize the variable database before |
| * ExecInitNode(), that makes us can access the database |
| * correctly in places such as magma_beginscan(), |
| * magma_insertinit(). |
| * |
| * In contrast, if we are in QE, we don't need to do this since |
| * RebuildDatabase() ensures that the database is initialized. |
| * |
| * An example of the uninitialized database: |
| * prepare select_by_id(int) as select * from t where id = 1; |
| * execute select_by_id(1); <- uninitialized when we are in |
| * magma_beginscan() |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| database = get_database_name(MyDatabaseId); |
| } |
| |
| /* |
| * Initialize the private state information for all the nodes in the query |
| * tree. This opens files, allocates storage and leaves us ready to start |
| * processing tuples. |
| */ |
| planstate = ExecInitNode(plannedstmt->planTree, estate, eflags); |
| |
| queryDesc->planstate = planstate; |
| |
| Assert(queryDesc->planstate); |
| |
| if (RootSliceIndex(estate) != LocallyExecutingSliceIndex(estate)) |
| return; |
| |
| /* |
| * Get the tuple descriptor describing the type of tuples to return. (this |
| * is especially important if we are creating a relation with "SELECT |
| * INTO") |
| */ |
| tupType = ExecGetResultType(planstate); |
| |
| /* |
| * Initialize the junk filter if needed. SELECT and INSERT queries need a |
| * filter if there are any junk attrs in the tlist. INSERT and SELECT |
| * INTO also need a filter if the plan may return raw disk tuples (else |
| * heap_insert will be scribbling on the source relation!). UPDATE and |
| * DELETE always need a filter, since there's always a junk 'ctid' |
| * attribute present --- no need to look first. |
| * |
| * This section of code is also a convenient place to verify that the |
| * output of an INSERT or UPDATE matches the target table(s). |
| */ |
| { |
| bool junk_filter_needed = false; |
| ListCell *tlist; |
| |
| switch (operation) |
| { |
| case CMD_SELECT: |
| case CMD_INSERT: |
| foreach(tlist, plan->targetlist) |
| { |
| TargetEntry *tle = (TargetEntry *) lfirst(tlist); |
| |
| if (tle->resjunk) |
| { |
| junk_filter_needed = true; |
| break; |
| } |
| } |
| if (!junk_filter_needed && |
| (operation == CMD_INSERT || estate->es_select_into) && |
| ExecMayReturnRawTuples(planstate)) |
| junk_filter_needed = true; |
| |
| break; |
| case CMD_UPDATE: |
| case CMD_DELETE: |
| junk_filter_needed = true; |
| break; |
| default: |
| break; |
| } |
| |
| if (junk_filter_needed) |
| { |
| /* |
| * If there are multiple result relations, each one needs its own |
| * junk filter. Note this is only possible for UPDATE/DELETE, so |
| * we can't be fooled by some needing a filter and some not. |
| */ |
| |
| if (list_length(plannedstmt->resultRelations) > 1) |
| { |
| List *appendplans; |
| int as_nplans; |
| ResultRelInfo *resultRelInfo; |
| ListCell *lc; |
| |
| /* Top plan had better be an Append here. */ |
| Assert(IsA(plannedstmt->planTree, Append)); |
| Assert(((Append *) plannedstmt->planTree)->isTarget); |
| Assert(IsA(planstate, AppendState)); |
| appendplans = ((Append *) plannedstmt->planTree)->appendplans; |
| as_nplans = list_length(appendplans); |
| Assert(as_nplans == estate->es_num_result_relations); |
| resultRelInfo = estate->es_result_relations; |
| foreach(lc, appendplans) |
| { |
| Plan *subplan = (Plan *)lfirst(lc); |
| JunkFilter *j; |
| |
| if (operation == CMD_UPDATE && PLANGEN_PLANNER == plannedstmt->planGen) |
| ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, |
| subplan->targetlist); |
| |
| TupleDesc cleanTupType = ExecCleanTypeFromTL(subplan->targetlist, |
| resultRelInfo->ri_RelationDesc->rd_att->tdhasoid); |
| j = ExecInitJunkFilter(subplan->targetlist, |
| cleanTupType, |
| ExecAllocTableSlot(estate->es_tupleTable)); |
| resultRelInfo->ri_junkFilter = j; |
| resultRelInfo++; |
| } |
| |
| /* |
| * Set active junkfilter too; at this point ExecInitAppend has |
| * already selected an active result relation... |
| */ |
| estate->es_junkFilter = |
| estate->es_result_relation_info->ri_junkFilter; |
| } |
| else |
| { |
| |
| /* Normal case with just one JunkFilter */ |
| JunkFilter *j; |
| |
| if (PLANGEN_PLANNER == plannedstmt->planGen && (operation == CMD_INSERT || operation == CMD_UPDATE)) |
| ExecCheckPlanOutput(estate->es_result_relation_info->ri_RelationDesc, |
| planstate->plan->targetlist); |
| |
| TupleDesc cleanTupType = ExecCleanTypeFromTL(planstate->plan->targetlist, |
| tupType->tdhasoid); |
| |
| j = ExecInitJunkFilter(planstate->plan->targetlist, |
| cleanTupType, |
| ExecAllocTableSlot(estate->es_tupleTable)); |
| |
| estate->es_junkFilter = j; |
| if (estate->es_result_relation_info) |
| estate->es_result_relation_info->ri_junkFilter = j; |
| |
| /* For SELECT, want to return the cleaned tuple type */ |
| if (operation == CMD_SELECT) |
| tupType = j->jf_cleanTupType; |
| } |
| } |
| else |
| { |
| /* The planner requires that top node of the target list has the same |
| * number of columns than the output relation. This is not a requirement |
| * of the Optimizer. */ |
| if (operation == CMD_INSERT |
| && plannedstmt->planGen == PLANGEN_PLANNER) |
| { |
| ExecCheckPlanOutput(estate->es_result_relation_info->ri_RelationDesc, |
| planstate->plan->targetlist); |
| } |
| |
| estate->es_junkFilter = NULL; |
| } |
| } |
| |
| /* |
| * Initialize RETURNING projections if needed. |
| */ |
| if (plannedstmt->returningLists) |
| { |
| TupleTableSlot *slot; |
| ExprContext *econtext; |
| ResultRelInfo *resultRelInfo; |
| |
| /* |
| * We set QueryDesc.tupDesc to be the RETURNING rowtype in this case. |
| * We assume all the sublists will generate the same output tupdesc. |
| */ |
| tupType = ExecTypeFromTL((List *) linitial(plannedstmt->returningLists), |
| false); |
| |
| /* Set up a slot for the output of the RETURNING projection(s) */ |
| slot = ExecAllocTableSlot(estate->es_tupleTable); |
| ExecSetSlotDescriptor(slot, tupType); |
| /* Need an econtext too */ |
| econtext = CreateExprContext(estate); |
| |
| /* |
| * Build a projection for each result rel. Note that any SubPlans in |
| * the RETURNING lists get attached to the topmost plan node. |
| */ |
| Assert(list_length(plannedstmt->returningLists) == estate->es_num_result_relations); |
| resultRelInfo = estate->es_result_relations; |
| foreach(l, plannedstmt->returningLists) |
| { |
| List *rlist = (List *) lfirst(l); |
| List *rliststate; |
| |
| rliststate = (List *) ExecInitExpr((Expr *) rlist, planstate); |
| resultRelInfo->ri_projectReturning = |
| ExecBuildProjectionInfo(rliststate, econtext, slot, |
| resultRelInfo->ri_RelationDesc->rd_att); |
| resultRelInfo++; |
| } |
| |
| /* |
| * Because we already ran ExecInitNode() for the top plan node, any |
| * subplans we just attached to it won't have been initialized; so we |
| * have to do it here. (Ugly, but the alternatives seem worse.) |
| */ |
| foreach(l, planstate->subPlan) |
| { |
| SubPlanState *sstate = (SubPlanState *) lfirst(l); |
| |
| Assert(IsA(sstate, SubPlanState)); |
| if (sstate->planstate == NULL) /* already inited? */ |
| ExecInitSubPlan(sstate, estate, eflags); |
| } |
| } |
| |
| queryDesc->tupDesc = tupType; |
| |
| /* |
| * If doing SELECT INTO, initialize the "into" relation. We must wait |
| * till now so we have the "clean" result tuple type to create the new |
| * table from. |
| * |
| * If EXPLAIN, skip creating the "into" relation. |
| */ |
| if (estate->es_select_into && !(eflags & EXEC_FLAG_EXPLAIN_ONLY) && |
| /* Only create the table if root slice */ |
| (Gp_role != GP_ROLE_EXECUTE || Gp_is_writer) ) |
| OpenIntoRel(queryDesc); |
| |
| |
| if (DEBUG1 >= log_min_messages) |
| { |
| char msec_str[32]; |
| switch (check_log_duration(msec_str, false)) |
| { |
| case 1: |
| case 2: |
| ereport(LOG, (errmsg("duration to InitPlan end: %s ms", msec_str), |
| errOmitLocation(true))); |
| break; |
| } |
| } |
| } |
| |
| /* |
| * Initialize ResultRelInfo data for one result relation |
| */ |
| static void |
| initResultRelInfo(ResultRelInfo *resultRelInfo, |
| Index resultRelationIndex, |
| List *rangeTable, |
| CmdType operation, |
| bool doInstrument, |
| bool needLock) |
| { |
| Oid resultRelationOid; |
| Relation resultRelationDesc; |
| LOCKMODE lockmode; |
| |
| resultRelationOid = getrelid(resultRelationIndex, rangeTable); |
| |
| /* |
| * MPP-2879: The QEs don't pass their MPPEXEC statements through |
| * the parse (where locks would ordinarily get acquired). So we |
| * need to take some care to pick them up here (otherwise we get |
| * some very strange interactions with QE-local operations (vacuum? |
| * utility-mode ?)). |
| * |
| * NOTE: There is a comment in lmgr.c which reads forbids use of |
| * heap_open/relation_open with "NoLock" followed by use of |
| * RelationOidLock/RelationLock with a stronger lock-mode: |
| * RelationOidLock/RelationLock expect a relation to already be |
| * locked. |
| * |
| * But we also need to serialize CMD_UPDATE && CMD_DELETE to preserve |
| * order on mirrors. |
| * |
| * So we're going to ignore the "NoLock" issue above. |
| */ |
| /* CDB: we must promote locks for UPDATE and DELETE operations. */ |
| lockmode = needLock ? RowExclusiveLock : NoLock; |
| if (dataStoredInMagmaByOid(resultRelationOid)) |
| lockmode = AccessShareLock; |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) |
| { |
| resultRelationDesc = CdbOpenRelation(resultRelationOid, |
| lockmode, |
| false, /* noWait */ |
| NULL); /* lockUpgraded */ |
| } |
| else |
| { |
| resultRelationDesc = heap_open(resultRelationOid, lockmode); |
| } |
| |
| /* |
| * Check valid relkind ... parser and/or planner should have noticed this |
| * already, but let's make sure. |
| */ |
| if (!gp_upgrade_mode || Gp_role != GP_ROLE_DISPATCH) |
| switch (resultRelationDesc->rd_rel->relkind) |
| { |
| case RELKIND_RELATION: |
| /* OK */ |
| break; |
| case RELKIND_SEQUENCE: |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot change sequence \"%s\"", |
| RelationGetRelationName(resultRelationDesc)))); |
| break; |
| case RELKIND_TOASTVALUE: |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot change TOAST relation \"%s\"", |
| RelationGetRelationName(resultRelationDesc)))); |
| break; |
| case RELKIND_AOSEGMENTS: |
| /* Relax the constraint here to allow hawq register */ |
| if (!allowSystemTableModsDML && IsSystemRelation(resultRelationDesc)) { |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot change AO segment listing relation \"%s\"", |
| RelationGetRelationName(resultRelationDesc)))); |
| } |
| break; |
| case RELKIND_AOBLOCKDIR: |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot change AO block directory relation \"%s\"", |
| RelationGetRelationName(resultRelationDesc)))); |
| break; |
| case RELKIND_VIEW: |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot change view \"%s\"", |
| RelationGetRelationName(resultRelationDesc)))); |
| break; |
| } |
| |
| /* OK, fill in the node */ |
| MemSet(resultRelInfo, 0, sizeof(ResultRelInfo)); |
| resultRelInfo->type = T_ResultRelInfo; |
| resultRelInfo->ri_RangeTableIndex = resultRelationIndex; |
| resultRelInfo->ri_RelationDesc = resultRelationDesc; |
| resultRelInfo->ri_NumIndices = 0; |
| resultRelInfo->ri_IndexRelationDescs = NULL; |
| resultRelInfo->ri_IndexRelationInfo = NULL; |
| |
| /* make a copy so as not to depend on relcache info not changing... */ |
| resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc); |
| if (resultRelInfo->ri_TrigDesc) |
| { |
| int n = resultRelInfo->ri_TrigDesc->numtriggers; |
| |
| resultRelInfo->ri_TrigFunctions = (FmgrInfo *) |
| palloc0(n * sizeof(FmgrInfo)); |
| if (doInstrument) |
| resultRelInfo->ri_TrigInstrument = InstrAlloc(n); |
| else |
| resultRelInfo->ri_TrigInstrument = NULL; |
| } |
| else |
| { |
| resultRelInfo->ri_TrigFunctions = NULL; |
| resultRelInfo->ri_TrigInstrument = NULL; |
| } |
| resultRelInfo->ri_ConstraintExprs = NULL; |
| resultRelInfo->ri_junkFilter = NULL; |
| resultRelInfo->ri_projectReturning = NULL; |
| resultRelInfo->ri_aoInsertDesc = NULL; |
| resultRelInfo->ri_extInsertDesc = NULL; |
| resultRelInfo->ri_aosegnos = NIL; |
| |
| /* |
| * If there are indices on the result relation, open them and save |
| * descriptors in the result relation info, so that we can add new index |
| * entries for the tuples we add/update. We need not do this for a |
| * DELETE, however, since deletion doesn't affect indexes. |
| */ |
| if (needLock) /* only needed by the root slice who will do the actual updating */ |
| if (resultRelationDesc->rd_rel->relhasindex && |
| operation != CMD_DELETE) |
| ExecOpenIndices(resultRelInfo); |
| } |
| |
| void |
| CreateAoSegFileOnMaster(Oid relid, List *mapping) |
| { |
| ListCell *relid_to_segno; |
| bool found = false; |
| |
| Relation rel = heap_open(relid, AccessShareLock); |
| |
| /* only relevant for AO relations */ |
| if(!RelationIsAo(rel)) |
| { |
| heap_close(rel, AccessShareLock); |
| return; |
| } |
| |
| Assert(mapping); |
| |
| /* lookup the segfile # to write into, according to my relid */ |
| Oid myrelid = RelationGetRelid(rel); |
| foreach(relid_to_segno, mapping) |
| { |
| SegfileMapNode *n = (SegfileMapNode *)lfirst(relid_to_segno); |
| |
| if(n->relid == myrelid) |
| { |
| Assert(n->segnos != NIL); |
| |
| /* |
| * in hawq, master create all segfile for segments |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| CreateAoSegFileForRelationOnMaster(rel, n->segnos); |
| |
| found = true; |
| break; |
| } |
| } |
| |
| heap_close(rel, AccessShareLock); |
| |
| Assert(found); |
| } |
| |
| typedef FileSegInfo *(*GetFileSegInfoCallback)(Relation rel, |
| AppendOnlyEntry *aoEntry, |
| Snapshot snapshot, int segNo); |
| typedef void (*InsertInitialSegnoEntryCallBack)(AppendOnlyEntry *aoEntry, |
| int segNo); |
| |
| static void CreateAoSegFileForRelationOnMasterInternal( |
| Relation rel, AppendOnlyEntry *aoEntry, List *segnos, |
| SharedStorageOpTasks *addTask, SharedStorageOpTasks *overwriteTask, |
| GetFileSegInfoCallback callback1, |
| InsertInitialSegnoEntryCallBack callback2) { |
| FileSegInfo *fsinfo; |
| ListCell *cell; |
| |
| Relation gp_relfile_node; |
| HeapTuple tuple; |
| |
| ItemPointerData persistentTid; |
| int64 persistentSerialNum; |
| |
| Assert(RelationIsAo(rel)); |
| |
| char *relname = RelationGetRelationName(rel); |
| |
| gp_relfile_node = heap_open(GpRelfileNodeRelationId, AccessShareLock); |
| |
| foreach (cell, segnos) { |
| int segno = lfirst_int(cell); |
| fsinfo = callback1(rel, aoEntry, SnapshotNow, segno); |
| |
| if (NULL == fsinfo) { |
| callback2(aoEntry, segno); |
| } else if (fsinfo->eof != 0) { |
| pfree(fsinfo); |
| continue; |
| } |
| |
| if (fsinfo) { |
| pfree(fsinfo); |
| } |
| |
| tuple = |
| FetchGpRelfileNodeTuple(gp_relfile_node, rel->rd_node.relNode, segno, |
| &persistentTid, &persistentSerialNum); |
| |
| if (HeapTupleIsValid(tuple)) { |
| bool currentTspSupportTruncate = false; |
| |
| if (filesystem_support_truncate) |
| currentTspSupportTruncate = |
| TestCurrentTspSupportTruncate(rel->rd_node.spcNode); |
| |
| heap_freetuple(tuple); |
| |
| /* |
| * here is a record in persistent table, we assume the file exist on |
| * filesystem. but there is no record in pg_aoseg_xxx catalog. We should |
| * overwrite that file in case that the file system do not support |
| * truncate. |
| */ |
| if (!currentTspSupportTruncate) |
| SharedStorageOpAddTask(relname, &rel->rd_node, segno, &persistentTid, |
| persistentSerialNum, overwriteTask); |
| |
| continue; |
| } |
| |
| SharedStorageOpPreAddTask(&rel->rd_node, segno, relname, &persistentTid, |
| &persistentSerialNum); |
| |
| SharedStorageOpAddTask(relname, &rel->rd_node, segno, &persistentTid, |
| persistentSerialNum, addTask); |
| } |
| |
| heap_close(gp_relfile_node, AccessShareLock); |
| } |
| |
| static void |
| CreateParquetSegFileForRelationOnMaster(Relation rel, |
| AppendOnlyEntry *aoEntry, List *segnos, SharedStorageOpTasks *addTasks, SharedStorageOpTasks *overwriteTask) |
| { |
| ParquetFileSegInfo * fsinfo; |
| ListCell *cell; |
| |
| Relation gp_relfile_node; |
| HeapTuple tuple; |
| |
| ItemPointerData persistentTid; |
| int64 persistentSerialNum; |
| |
| Assert(RelationIsParquet(rel)); |
| |
| char * relname = RelationGetRelationName(rel); |
| |
| gp_relfile_node = heap_open(GpRelfileNodeRelationId, AccessShareLock); |
| |
| foreach(cell, segnos) |
| { |
| int segno = lfirst_int(cell); |
| fsinfo = GetParquetFileSegInfo(rel, aoEntry, SnapshotNow, segno); |
| |
| if (NULL == fsinfo) |
| { |
| InsertInitialParquetSegnoEntry(aoEntry, segno); |
| } |
| else if (fsinfo->eof != 0) |
| { |
| pfree(fsinfo); |
| continue; |
| } |
| |
| if (fsinfo) |
| { |
| pfree(fsinfo); |
| } |
| |
| tuple = FetchGpRelfileNodeTuple( |
| gp_relfile_node, |
| rel->rd_node.relNode, |
| segno, |
| &persistentTid, |
| &persistentSerialNum); |
| |
| if (HeapTupleIsValid(tuple)) |
| { |
| bool currentTspSupportTruncate = false; |
| |
| if (filesystem_support_truncate) |
| currentTspSupportTruncate = TestCurrentTspSupportTruncate(rel->rd_node.spcNode); |
| |
| heap_freetuple(tuple); |
| |
| /* |
| * here is a record in persistent table, we assume the file exist on filesystem. |
| * but there is no record in pg_aoseg_xxx catalog. |
| * We should overwrite that file in case that the file system do not support truncate. |
| */ |
| if (!currentTspSupportTruncate) |
| SharedStorageOpAddTask(relname, &rel->rd_node, segno, |
| &persistentTid, |
| persistentSerialNum, |
| overwriteTask); |
| continue; |
| } |
| |
| SharedStorageOpPreAddTask(&rel->rd_node, segno, relname, |
| &persistentTid, |
| &persistentSerialNum); |
| |
| SharedStorageOpAddTask(relname, &rel->rd_node, segno, |
| &persistentTid, |
| persistentSerialNum, |
| addTasks); |
| } |
| |
| heap_close(gp_relfile_node, AccessShareLock); |
| } |
| |
| static void CreateExternalSegFileForRelationOnMaster(Relation rel, List *segnos, |
| SharedStorageOpTasks *addTasks) |
| { |
| ParquetFileSegInfo * fsinfo; |
| ListCell *cell; |
| |
| Assert(RelationIsExternal(rel)); |
| |
| char * relname = RelationGetRelationName(rel); |
| |
| foreach(cell, segnos) |
| { |
| int segno = lfirst_int(cell); |
| |
| Assert(NULL != addTasks); |
| Assert(addTasks->sizeTasks >= addTasks->numTasks); |
| |
| RelFileNode *n; |
| |
| if (addTasks->sizeTasks == addTasks->numTasks) |
| { |
| addTasks->tasks = repalloc(addTasks->tasks, |
| addTasks->sizeTasks * sizeof(SharedStorageOpTask) * 2); |
| addTasks->sizeTasks *= 2; |
| } |
| |
| n = &addTasks->tasks[addTasks->numTasks].node; |
| n->dbNode = rel->rd_node.dbNode; |
| n->relNode = rel->rd_node.relNode; |
| n->spcNode = rel->rd_node.spcNode; |
| |
| addTasks->tasks[addTasks->numTasks].segno = segno; |
| addTasks->tasks[addTasks->numTasks].relname = palloc(strlen(relname) + 1); |
| strcpy(addTasks->tasks[addTasks->numTasks].relname, relname); |
| |
| addTasks->numTasks++; |
| } |
| } |
| |
| void |
| CreateAoSegFileForRelationOnMaster(Relation rel, List *segnos) |
| { |
| SharedStorageOpTasks *addTasks = CreateSharedStorageOpTasks(); |
| SharedStorageOpTasks *overwriteTasks = CreateSharedStorageOpTasks(); |
| |
| |
| if(RelationIsAo(rel)) |
| { |
| AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel->rd_id, SnapshotNow); |
| |
| // lock to avoid of hash table file count mismatch issue |
| Relation aoSegRel = heap_open(aoEntry->segrelid, AccessExclusiveLock); |
| |
| if (RelationIsAoRows(rel)) |
| CreateAoSegFileForRelationOnMasterInternal( |
| rel, aoEntry, segnos, addTasks, overwriteTasks, |
| GetFileSegInfo, InsertInitialSegnoEntry); |
| else if (RelationIsOrc(rel)) |
| CreateAoSegFileForRelationOnMasterInternal( |
| rel, aoEntry, segnos, addTasks, overwriteTasks, |
| getOrcFileSegInfo, insertInitialOrcSegnoEntry); |
| else |
| CreateParquetSegFileForRelationOnMaster( |
| rel, aoEntry, segnos, addTasks, overwriteTasks); |
| |
| heap_close(aoSegRel, AccessExclusiveLock); |
| pfree(aoEntry); |
| |
| PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile); |
| PostPerformSharedStorageOpTasks(addTasks); |
| PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile); |
| } |
| |
| DropSharedStorageOpTasks(addTasks); |
| DropSharedStorageOpTasks(overwriteTasks); |
| } |
| |
| /* |
| * ResultRelInfoSetSegno |
| * |
| * based on a list of relid->segno mapping, look for our own resultRelInfo |
| * relid in the mapping and find the segfile number that this resultrel should |
| * use if it is inserting into an AO relation. for any non AO relation this is |
| * irrelevant and will return early. |
| * |
| * Note that we rely on the fact that the caller has a well constructed mapping |
| * and that it includes all the relids of *any* AO relation that may insert |
| * data during this transaction. For non partitioned tables the mapping list |
| * will have only one element - our table. for partitioning it may have |
| * multiple (depending on how many partitions are AO). |
| * |
| */ |
| void |
| ResultRelInfoSetSegno(ResultRelInfo *resultRelInfo, List *mapping) |
| { |
| ListCell *relid_to_segno; |
| bool found = false; |
| |
| /* only relevant for AO relations */ |
| if(!relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) |
| return; |
| |
| Assert(mapping); |
| Assert(resultRelInfo->ri_RelationDesc); |
| |
| /* lookup the segfile # to write into, according to my relid */ |
| |
| foreach(relid_to_segno, mapping) |
| { |
| SegfileMapNode *n = (SegfileMapNode *)lfirst(relid_to_segno); |
| Oid myrelid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| if(n->relid == myrelid) |
| { |
| Assert(n->segnos != NIL); |
| resultRelInfo->ri_aosegnos = n->segnos; |
| found = true; |
| break; |
| } |
| } |
| |
| Assert(found); |
| } |
| |
| void |
| ResultRelInfoSetSegFileInfo(ResultRelInfo *resultRelInfo, List *mapping) |
| { |
| ListCell *relid_to_segfileinfo; |
| bool found = false; |
| Oid myrelid; |
| |
| /* |
| * Only relevant for AO relations. |
| */ |
| // if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) |
| // { |
| // return; |
| // } |
| |
| Assert(mapping); |
| Assert(resultRelInfo->ri_RelationDesc); |
| |
| myrelid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| |
| /* |
| * Lookup the segment file to write into, according to |
| * myrelid. |
| */ |
| foreach (relid_to_segfileinfo, mapping) |
| { |
| ResultRelSegFileInfoMapNode *n = (ResultRelSegFileInfoMapNode *)lfirst(relid_to_segfileinfo); |
| if (n->relid == myrelid) |
| { |
| Assert(n->segfileinfos != NIL); |
| resultRelInfo->ri_aosegfileinfos = n->segfileinfos; |
| found = true; |
| break; |
| } |
| } |
| |
| Assert(found); |
| } |
| |
| ResultRelSegFileInfo * |
| InitResultRelSegFileInfo(int segno, char storageChar, int numfiles) |
| { |
| ResultRelSegFileInfo *result = makeNode(ResultRelSegFileInfo); |
| result->segno = segno; |
| result->numfiles = numfiles; |
| Assert(result->numfiles > 0); |
| if (relstorage_is_ao(storageChar)) |
| { |
| Assert(result->numfiles == 1); |
| } |
| result->eof = palloc0(sizeof(int64) * result->numfiles); |
| result->uncompressed_eof = palloc0(sizeof(int64) * result->numfiles); |
| |
| return result; |
| } |
| |
| /* |
| * ExecContextForcesOids |
| * |
| * This is pretty grotty: when doing INSERT, UPDATE, or SELECT INTO, |
| * we need to ensure that result tuples have space for an OID iff they are |
| * going to be stored into a relation that has OIDs. In other contexts |
| * we are free to choose whether to leave space for OIDs in result tuples |
| * (we generally don't want to, but we do if a physical-tlist optimization |
| * is possible). This routine checks the plan context and returns TRUE if the |
| * choice is forced, FALSE if the choice is not forced. In the TRUE case, |
| * *hasoids is set to the required value. |
| * |
| * One reason this is ugly is that all plan nodes in the plan tree will emit |
| * tuples with space for an OID, though we really only need the topmost node |
| * to do so. However, node types like Sort don't project new tuples but just |
| * return their inputs, and in those cases the requirement propagates down |
| * to the input node. Eventually we might make this code smart enough to |
| * recognize how far down the requirement really goes, but for now we just |
| * make all plan nodes do the same thing if the top level forces the choice. |
| * |
| * We assume that estate->es_result_relation_info is already set up to |
| * describe the target relation. Note that in an UPDATE that spans an |
| * inheritance tree, some of the target relations may have OIDs and some not. |
| * We have to make the decisions on a per-relation basis as we initialize |
| * each of the child plans of the topmost Append plan. |
| * |
| * SELECT INTO is even uglier, because we don't have the INTO relation's |
| * descriptor available when this code runs; we have to look aside at a |
| * flag set by InitPlan(). |
| */ |
| bool |
| ExecContextForcesOids(PlanState *planstate, bool *hasoids) |
| { |
| if (planstate->state->es_select_into) |
| { |
| *hasoids = planstate->state->es_into_oids; |
| return true; |
| } |
| else |
| { |
| ResultRelInfo *ri = planstate->state->es_result_relation_info; |
| |
| if (ri != NULL) |
| { |
| Relation rel = ri->ri_RelationDesc; |
| |
| if (rel != NULL) |
| { |
| *hasoids = rel->rd_rel->relhasoids; |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| void |
| SendAOTupCounts(EState *estate) |
| { |
| /* |
| * If we're inserting into partitions, send tuple counts for |
| * AO tables back to the QD. |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE && estate->es_result_partitions) |
| { |
| StringInfoData buf; |
| ResultRelInfo *resultRelInfo; |
| int aocount = 0; |
| int i; |
| |
| resultRelInfo = estate->es_result_relations; |
| for (i = 0; i < estate->es_num_result_relations; i++) |
| { |
| if (relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) |
| aocount++; |
| |
| resultRelInfo++; |
| } |
| |
| |
| if (aocount) |
| { |
| if (Debug_appendonly_print_insert) |
| ereport(LOG,(errmsg("QE sending tuple counts of %d partitioned " |
| "AO relations... ", aocount))); |
| |
| pq_beginmessage(&buf, 'o'); |
| pq_sendint(&buf, aocount, 4); |
| |
| resultRelInfo = estate->es_result_relations; |
| for (i = 0; i < estate->es_num_result_relations; i++) |
| { |
| if (relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) |
| { |
| Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| uint64 tupcount = resultRelInfo->ri_aoprocessed; |
| |
| pq_sendint(&buf, relid, 4); |
| pq_sendint64(&buf, tupcount); |
| |
| if (Debug_appendonly_print_insert) |
| ereport(LOG,(errmsg("sent tupcount " INT64_FORMAT " for " |
| "relation %d", tupcount, relid), |
| errOmitLocation(true))); |
| |
| } |
| resultRelInfo++; |
| } |
| pq_endmessage(&buf); |
| } |
| } |
| |
| } |
| /* ---------------------------------------------------------------- |
| * ExecEndPlan |
| * |
| * Cleans up the query plan -- closes files and frees up storage |
| * |
| * NOTE: we are no longer very worried about freeing storage per se |
| * in this code; FreeExecutorState should be guaranteed to release all |
| * memory that needs to be released. What we are worried about doing |
| * is closing relations and dropping buffer pins. Thus, for example, |
| * tuple tables must be cleared or dropped to ensure pins are released. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecEndPlan(PlanState *planstate, EState *estate) |
| { |
| ResultRelInfo *resultRelInfo; |
| int i; |
| ListCell *l; |
| |
| int aocount = 0; |
| |
| /* |
| * shut down any PlanQual processing we were doing |
| */ |
| if (estate->es_evalPlanQual != NULL) |
| EndEvalPlanQual(estate); |
| |
| if (planstate != NULL) |
| ExecEndNode(planstate); |
| |
| ExecDropTupleTable(estate->es_tupleTable, true); |
| estate->es_tupleTable = NULL; |
| |
| /* Report how many tuples we may have inserted into AO tables */ |
| SendAOTupCounts(estate); |
| |
| StringInfo buf = NULL; |
| |
| resultRelInfo = estate->es_result_relations; |
| for (i = 0; i < estate->es_num_result_relations; i++) |
| { |
| if (resultRelInfo->ri_aoInsertDesc || |
| resultRelInfo->ri_orcInsertDesc || |
| resultRelInfo->ri_parquetInsertDesc || |
| resultRelInfo->ri_orcDeleteDesc || |
| resultRelInfo->ri_orcUpdateDesc || |
| resultRelInfo->ri_insertSendBack) |
| ++aocount; |
| resultRelInfo++; |
| } |
| |
| if (Gp_role == GP_ROLE_EXECUTE && aocount > 0) |
| buf = PreSendbackChangedCatalog(aocount); |
| |
| /* |
| * close the result relation(s) if any, but hold locks until xact commit. |
| */ |
| resultRelInfo = estate->es_result_relations; |
| for (i = 0; i < estate->es_num_result_relations; i++) |
| { |
| QueryContextDispatchingSendBack sendback = NULL; |
| |
| /* end (flush) the INSERT operation in the access layer */ |
| if (resultRelInfo->ri_aoInsertDesc) |
| { |
| |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| resultRelInfo->ri_aoInsertDesc->sendback = sendback; |
| sendback->relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| |
| appendonly_insert_finish(resultRelInfo->ri_aoInsertDesc); |
| } |
| |
| /*need add processing for parquet insert desc*/ |
| if (resultRelInfo->ri_parquetInsertDesc){ |
| |
| AssertImply(resultRelInfo->ri_insertSendBack, gp_parquet_insert_sort); |
| |
| if (NULL != resultRelInfo->ri_insertSendBack) |
| { |
| /* |
| * The Parquet part we just finished inserting into already |
| * has sendBack information. This means we're inserting into the |
| * part twice, which is not supported. Error out (GPSQL-2291) |
| */ |
| ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), |
| errmsg("Cannot insert out-of-order tuples in parquet partitions"), |
| errhint("Sort the data on the partitioning key(s) before inserting"), |
| errOmitLocation(true))); |
| } |
| |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| resultRelInfo->ri_parquetInsertDesc->sendback = sendback; |
| sendback->relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| |
| parquet_insert_finish(resultRelInfo->ri_parquetInsertDesc); |
| } |
| |
| if (resultRelInfo->ri_orcInsertDesc) { |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| resultRelInfo->ri_orcInsertDesc->sendback = sendback; |
| sendback->relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| orcEndInsert(resultRelInfo->ri_orcInsertDesc); |
| } |
| |
| /* |
| * This can happen if we inserted into this parquet part then |
| * closed it during insertion. SendBack information is saved |
| * in the resultRelInfo, since the ri_parquetInsertDesc is freed |
| * (GPSQL-2291) |
| */ |
| if (NULL != resultRelInfo->ri_insertSendBack) |
| { |
| Assert(NULL == sendback); |
| sendback = resultRelInfo->ri_insertSendBack; |
| } |
| |
| if (resultRelInfo->ri_extInsertDesc) |
| { |
| ExternalInsertDesc extInsertDesc = resultRelInfo->ri_extInsertDesc; |
| |
| if (extInsertDesc->ext_formatter_type == ExternalTableType_Invalid) |
| { |
| elog(ERROR, "invalid formatter type for external table: %s", __func__); |
| } |
| else if (extInsertDesc->ext_formatter_type != ExternalTableType_PLUG) |
| { |
| external_insert_finish(extInsertDesc); |
| } |
| else |
| { |
| FmgrInfo *insertFinishFunc = |
| extInsertDesc->ext_ps_insert_funcs.insert_finish; |
| |
| if (insertFinishFunc) |
| { |
| InvokePlugStorageFormatInsertFinish(insertFinishFunc, |
| extInsertDesc); |
| } |
| else |
| { |
| elog(ERROR, "%s_insert_finish function was not found", |
| extInsertDesc->ext_formatter_name); |
| } |
| } |
| } |
| |
| // handle update/delete scenario |
| if (resultRelInfo->ri_orcDeleteDesc) { |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| resultRelInfo->ri_orcDeleteDesc->sendback = sendback; |
| sendback->relid = |
| RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| estate->es_processed += orcEndDelete(resultRelInfo->ri_orcDeleteDesc); |
| } |
| |
| if (resultRelInfo->ri_orcUpdateDesc) { |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| resultRelInfo->ri_orcUpdateDesc->sendback = sendback; |
| sendback->relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| estate->es_processed += orcEndUpdate(resultRelInfo->ri_orcUpdateDesc); |
| } |
| |
| if (resultRelInfo->ri_resultSlot) |
| { |
| Assert(resultRelInfo->ri_resultSlot->tts_tupleDescriptor); |
| ReleaseTupleDesc(resultRelInfo->ri_resultSlot->tts_tupleDescriptor); |
| ExecClearTuple(resultRelInfo->ri_resultSlot); |
| } |
| |
| if (sendback && (relstorage_is_ao(RelinfoGetStorage(resultRelInfo))) |
| && Gp_role == GP_ROLE_EXECUTE && aocount > 0) |
| AddSendbackChangedCatalogContent(buf, sendback); |
| |
| DropQueryContextDispatchingSendBack(sendback); |
| |
| /* Close indices and then the relation itself */ |
| ExecCloseIndices(resultRelInfo); |
| heap_close(resultRelInfo->ri_RelationDesc, NoLock); |
| resultRelInfo++; |
| } |
| |
| if (Gp_role == GP_ROLE_EXECUTE && aocount > 0) |
| FinishSendbackChangedCatalog(buf); |
| |
| /* |
| * close any relations selected FOR UPDATE/FOR SHARE, again keeping locks |
| */ |
| foreach(l, estate->es_rowMarks) |
| { |
| ExecRowMark *erm = lfirst(l); |
| |
| heap_close(erm->relation, NoLock); |
| } |
| |
| /* |
| * Release partition-related resources (esp. TupleDesc ref counts). |
| */ |
| if ( estate->es_partition_state ) |
| ClearPartitionState(estate); |
| } |
| |
| /* |
| * Verify that the tuples to be produced by INSERT or UPDATE match the |
| * target relation's rowtype |
| * |
| * We do this to guard against stale plans. If plan invalidation is |
| * functioning properly then we should never get a failure here, but better |
| * safe than sorry. Note that this is called after we have obtained lock |
| * on the target rel, so the rowtype can't change underneath us. |
| * |
| * The plan output is represented by its targetlist, because that makes |
| * handling the dropped-column case easier. |
| */ |
| static void |
| ExecCheckPlanOutput(Relation resultRel, List *targetList) |
| { |
| TupleDesc resultDesc = RelationGetDescr(resultRel); |
| int attno = 0; |
| ListCell *lc; |
| |
| /* |
| * Don't do this during dispatch because the plan is not suitable |
| * structured to meet these tests |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| return; |
| |
| foreach(lc, targetList) |
| { |
| TargetEntry *tle = (TargetEntry *) lfirst(lc); |
| Form_pg_attribute attr; |
| |
| if (tle->resjunk) |
| continue; /* ignore junk tlist items */ |
| |
| if (attno >= resultDesc->natts) |
| ereport(ERROR, |
| (errcode(ERRCODE_DATATYPE_MISMATCH), |
| errmsg("table row type and query-specified row type do not match"), |
| errdetail("Query has too many columns."))); |
| attr = resultDesc->attrs[attno++]; |
| |
| if (!attr->attisdropped) |
| { |
| /* Normal case: demand type match */ |
| if (exprType((Node *) tle->expr) != attr->atttypid) |
| ereport(ERROR, |
| (errcode(ERRCODE_DATATYPE_MISMATCH), |
| errmsg("table row type and query-specified row type do not match"), |
| errdetail("Table has type %s at ordinal position %d, but query expects %s.", |
| format_type_be(attr->atttypid), |
| attno, |
| format_type_be(exprType((Node *) tle->expr))))); |
| } |
| else |
| { |
| /* |
| * For a dropped column, we can't check atttypid (it's likely 0). |
| * In any case the planner has most likely inserted an INT4 null. |
| * What we insist on is just *some* NULL constant. |
| */ |
| if (!IsA(tle->expr, Const) || |
| !((Const *) tle->expr)->constisnull) |
| ereport(ERROR, |
| (errcode(ERRCODE_DATATYPE_MISMATCH), |
| errmsg("table row type and query-specified row type do not match"), |
| errdetail("Query provides a value for a dropped column at ordinal position %d.", |
| attno))); |
| } |
| } |
| if (attno != resultDesc->natts) |
| ereport(ERROR, |
| (errcode(ERRCODE_DATATYPE_MISMATCH), |
| errmsg("table row type and query-specified row type do not match"), |
| errdetail("Query has too few columns."))); |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecutePlan |
| * |
| * processes the query plan to retrieve 'numberTuples' tuples in the |
| * direction specified. |
| * |
| * Retrieves all tuples if numberTuples is 0 |
| * |
| * result is either a slot containing the last tuple in the case |
| * of a SELECT or NULL otherwise. |
| * |
| * Note: the ctid attribute is a 'junk' attribute that is removed before the |
| * user can see it |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * |
| ExecutePlan(EState *estate, |
| PlanState *planstate, |
| CmdType operation, |
| long numberTuples, |
| ScanDirection direction, |
| DestReceiver *dest) |
| { |
| JunkFilter *junkfilter; |
| TupleTableSlot *planSlot; |
| TupleTableSlot *slot; |
| ItemPointer tupleid = NULL; |
| Datum gp_segment_id; |
| ItemPointerData tuple_ctid; |
| long current_tuple_count; |
| TupleTableSlot *result; |
| |
| /* |
| * initialize local variables |
| */ |
| current_tuple_count = 0; |
| result = NULL; |
| |
| /* |
| * Set the direction. |
| */ |
| estate->es_direction = direction; |
| |
| /* |
| * Process BEFORE EACH STATEMENT triggers |
| */ |
| if (Gp_role != GP_ROLE_EXECUTE || Gp_is_writer) |
| { |
| switch (operation) |
| { |
| case CMD_UPDATE: |
| ExecBSUpdateTriggers(estate, estate->es_result_relation_info); |
| break; |
| case CMD_DELETE: |
| ExecBSDeleteTriggers(estate, estate->es_result_relation_info); |
| break; |
| case CMD_INSERT: |
| ExecBSInsertTriggers(estate, estate->es_result_relation_info); |
| break; |
| default: |
| /* do nothing */ |
| break; |
| } |
| } |
| |
| /* Error out for unsupported updates */ |
| if (operation == CMD_UPDATE) |
| { |
| Assert(estate->es_result_relation_info->ri_RelationDesc); |
| Relation rel = estate->es_result_relation_info->ri_RelationDesc; |
| bool rel_is_aorows = RelationIsAoRows(rel); |
| bool rel_is_parquet = RelationIsParquet(rel); |
| |
| if (rel_is_aorows || rel_is_parquet) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("Append-only/Parquet tables are not updatable. Operation not permitted."), |
| errOmitLocation(true))); |
| } |
| } |
| |
| /* |
| * Make sure slice dependencies are met |
| */ |
| ExecSliceDependencyNode(planstate); |
| |
| /* |
| * Loop until we've processed the proper number of tuples from the plan. |
| */ |
| for (;;) |
| { |
| /* Reset the per-output-tuple exprcontext */ |
| ResetPerTupleExprContext(estate); |
| |
| /* |
| * Execute the plan and obtain a tuple |
| */ |
| lnext: ; |
| if (estate->es_useEvalPlan) |
| { |
| planSlot = EvalPlanQualNext(estate); |
| if (TupIsNull(planSlot)) |
| planSlot = ExecProcNode(planstate); |
| } |
| else |
| planSlot = ExecProcNode(planstate); |
| |
| /* |
| * if the tuple is null, then we assume there is nothing more to |
| * process so we just return null... |
| */ |
| if (TupIsNull(planSlot)) { |
| if (operation == CMD_DELETE || operation == CMD_UPDATE) { |
| for (int32_t i = 0; i < estate->es_num_result_relations; ++i) { |
| ResultRelInfo *resultRelInfo = estate->es_result_relations + i; |
| Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; |
| if (RelationIsOrc(resultRelationDesc)) { |
| if (resultRelInfo->ri_orcDeleteDesc == NULL && |
| operation == CMD_DELETE && |
| GetFileSplitsOfSegmentMagma( |
| estate->es_plannedstmt->scantable_splits, |
| resultRelationDesc->rd_id)) { |
| List *splits = GetFileSplitsOfSegment( |
| estate->es_plannedstmt->scantable_splits, |
| resultRelationDesc->rd_id, GetQEIndex()); |
| resultRelInfo->ri_orcDeleteDesc = orcBeginDelete( |
| resultRelationDesc, splits, |
| estate->es_plannedstmt->relFileNodeInfo, |
| false, |
| isDirectDispatch(estate->es_plannedstmt->planTree)); |
| } else if (resultRelInfo->ri_orcUpdateDesc == NULL && |
| operation == CMD_UPDATE && |
| GetFileSplitsOfSegmentMagma( |
| estate->es_plannedstmt->scantable_splits, |
| resultRelationDesc->rd_id)) { |
| List *splits = GetFileSplitsOfSegment( |
| estate->es_plannedstmt->scantable_splits, |
| resultRelationDesc->rd_id, GetQEIndex()); |
| resultRelInfo->ri_orcUpdateDesc = orcBeginUpdate( |
| resultRelationDesc, splits, |
| estate->es_plannedstmt->relFileNodeInfo, |
| false, |
| isDirectDispatch(estate->es_plannedstmt->planTree)); |
| } |
| } |
| } |
| } |
| result = NULL; |
| break; |
| } |
| |
| if (estate->es_plannedstmt->planGen == PLANGEN_PLANNER || |
| operation == CMD_SELECT) |
| { |
| |
| slot = planSlot; |
| |
| /* |
| * If we have a junk filter, then project a new tuple with the junk |
| * removed. |
| * |
| * Store this new "clean" tuple in the junkfilter's resultSlot. |
| * (Formerly, we stored it back over the "dirty" tuple, which is WRONG |
| * because that tuple slot has the wrong descriptor.) |
| * |
| * But first, extract all the junk information we need. |
| */ |
| if ((junkfilter = estate->es_junkFilter) != NULL) |
| { |
| Datum datum; |
| bool isNull; |
| |
| /* |
| * extract the 'ctid' junk attribute. |
| */ |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) |
| { |
| if (!ExecGetJunkAttribute(junkfilter, |
| slot, |
| "ctid", |
| &datum, |
| &isNull)) |
| elog(ERROR, "could not find junk ctid column"); |
| |
| /* shouldn't ever get a null result... */ |
| if (isNull) |
| elog(ERROR, "ctid is NULL"); |
| |
| tupleid = (ItemPointer) DatumGetPointer(datum); |
| tuple_ctid = *tupleid; /* make sure we don't free the ctid!! */ |
| tupleid = &tuple_ctid; |
| |
| if (!ExecGetJunkAttribute(junkfilter, |
| slot, |
| "gp_segment_id", |
| &datum, |
| &isNull)) |
| elog(ERROR, "could not find junk gp_segment_id column"); |
| |
| /* shouldn't ever get a null result... */ |
| if (isNull) |
| elog(ERROR, "gp_segment_id is NULL"); |
| |
| gp_segment_id = datum; |
| } |
| |
| /* |
| * Process any FOR UPDATE or FOR SHARE locking requested. |
| */ |
| else if (estate->es_rowMarks != NIL) |
| { |
| ListCell *l; |
| |
| lmark: ; |
| foreach(l, estate->es_rowMarks) |
| { |
| ExecRowMark *erm = lfirst(l); |
| HeapTupleData tuple; |
| Buffer buffer; |
| ItemPointerData update_ctid; |
| TransactionId update_xmax; |
| TupleTableSlot *newSlot; |
| LockTupleMode lockmode; |
| HTSU_Result test; |
| |
| /* CDB: CTIDs were not fetched for distributed relation. */ |
| Relation relation = erm->relation; |
| if (relation->rd_cdbpolicy && |
| relation->rd_cdbpolicy->ptype == POLICYTYPE_PARTITIONED) |
| continue; |
| |
| if (!ExecGetJunkAttribute(junkfilter, |
| slot, |
| erm->resname, |
| &datum, |
| &isNull)) |
| elog(ERROR, "could not find junk \"%s\" column", |
| erm->resname); |
| |
| /* shouldn't ever get a null result... */ |
| if (isNull) |
| elog(ERROR, "\"%s\" is NULL", erm->resname); |
| |
| tuple.t_self = *((ItemPointer) DatumGetPointer(datum)); |
| |
| if (erm->forUpdate && !dataStoredInMagmaByOid(erm->relation->rd_id)) |
| lockmode = LockTupleExclusive; |
| else |
| lockmode = LockTupleShared; |
| |
| test = heap_lock_tuple(erm->relation, &tuple, &buffer, |
| &update_ctid, &update_xmax, |
| estate->es_snapshot->curcid, |
| lockmode, |
| (erm->noWait ? LockTupleNoWait : LockTupleWait)); |
| ReleaseBuffer(buffer); |
| switch (test) |
| { |
| case HeapTupleSelfUpdated: |
| /* treat it as deleted; do not process */ |
| goto lnext; |
| |
| case HeapTupleMayBeUpdated: |
| break; |
| |
| case HeapTupleUpdated: |
| if (IsXactIsoLevelSerializable) |
| ereport(ERROR, |
| (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), |
| errmsg("could not serialize access due to concurrent update"))); |
| if (!ItemPointerEquals(&update_ctid, |
| &tuple.t_self)) |
| { |
| /* updated, so look at updated version */ |
| newSlot = EvalPlanQual(estate, |
| erm->rti, |
| &update_ctid, |
| update_xmax, |
| estate->es_snapshot->curcid); |
| if (!TupIsNull(newSlot)) |
| { |
| slot = planSlot = newSlot; |
| estate->es_useEvalPlan = true; |
| goto lmark; |
| } |
| } |
| |
| /* |
| * if tuple was deleted or PlanQual failed for |
| * updated tuple - we must not return this tuple! |
| */ |
| goto lnext; |
| |
| default: |
| elog(ERROR, "unrecognized heap_lock_tuple status: %u", |
| test); |
| return NULL; |
| } |
| } |
| } |
| |
| /* |
| * Create a new "clean" tuple with all junk attributes removed. We |
| * don't need to do this for DELETE, however (there will in fact |
| * be no non-junk attributes in a DELETE!) |
| */ |
| if (operation != CMD_DELETE) |
| slot = ExecFilterJunk(junkfilter, slot); |
| } |
| |
| if (operation != CMD_SELECT && Gp_role == GP_ROLE_EXECUTE && !Gp_is_writer) |
| { |
| elog(LOG,"INSERT/UPDATE/DELETE must be executed by a writer segworker group"); |
| Insist(false); |
| } |
| |
| /* |
| * Based on the operation, a tuple is either |
| * returned it to the user (SELECT) or inserted, deleted, or updated. |
| */ |
| switch (operation) |
| { |
| case CMD_SELECT: |
| ExecSelect(slot, dest, estate); |
| result = slot; |
| break; |
| |
| case CMD_INSERT: |
| ExecInsert(slot, dest, estate, PLANGEN_PLANNER, false /* isUpdate */, false /* isInputSorted */); |
| result = NULL; |
| break; |
| |
| case CMD_DELETE: |
| ExecDelete(tupleid, gp_segment_id, planSlot, dest, estate, PLANGEN_PLANNER, false /* isUpdate */); |
| result = NULL; |
| break; |
| |
| case CMD_UPDATE: |
| ExecUpdate(slot, tupleid, gp_segment_id, planSlot, dest, estate); |
| result = NULL; |
| break; |
| |
| default: |
| elog(ERROR, "unrecognized operation code: %d", |
| (int) operation); |
| result = NULL; |
| break; |
| } |
| } |
| |
| /* |
| * check our tuple count.. if we've processed the proper number then |
| * quit, else loop again and process more tuples. Zero numberTuples |
| * means no limit. |
| */ |
| current_tuple_count++; |
| if (numberTuples && numberTuples == current_tuple_count) |
| { |
| break; |
| } |
| } |
| |
| /* |
| * Process AFTER EACH STATEMENT triggers |
| */ |
| if (Gp_role != GP_ROLE_EXECUTE || Gp_is_writer) |
| { |
| switch (operation) |
| { |
| case CMD_UPDATE: |
| ExecASUpdateTriggers(estate, estate->es_result_relation_info); |
| break; |
| case CMD_DELETE: |
| ExecASDeleteTriggers(estate, estate->es_result_relation_info); |
| break; |
| case CMD_INSERT: |
| ExecASInsertTriggers(estate, estate->es_result_relation_info); |
| break; |
| default: |
| /* do nothing */ |
| break; |
| } |
| } |
| /* |
| * here, result is either a slot containing a tuple in the case of a |
| * SELECT or NULL otherwise. |
| */ |
| return result; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecSelect |
| * |
| * SELECTs are easy.. we just pass the tuple to the appropriate |
| * output function. |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| ExecSelect(TupleTableSlot *slot, |
| DestReceiver *dest, |
| EState *estate) |
| { |
| (*dest->receiveSlot) (slot, dest); |
| IncrRetrieved(); |
| (estate->es_processed)++; |
| } |
| |
| /* |
| * ExecRelCheck --- check that tuple meets constraints for result relation |
| */ |
| static const char * |
| ExecRelCheck(ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, EState *estate) |
| { |
| Relation rel = resultRelInfo->ri_RelationDesc; |
| int ncheck = rel->rd_att->constr->num_check; |
| ConstrCheck *check = rel->rd_att->constr->check; |
| ExprContext *econtext; |
| MemoryContext oldContext; |
| List *qual; |
| int i; |
| |
| /* |
| * If first time through for this result relation, build expression |
| * nodetrees for rel's constraint expressions. Keep them in the per-query |
| * memory context so they'll survive throughout the query. |
| */ |
| if (resultRelInfo->ri_ConstraintExprs == NULL) |
| { |
| oldContext = MemoryContextSwitchTo(estate->es_query_cxt); |
| resultRelInfo->ri_ConstraintExprs = |
| (List **) palloc(ncheck * sizeof(List *)); |
| for (i = 0; i < ncheck; i++) |
| { |
| /* ExecQual wants implicit-AND form */ |
| qual = make_ands_implicit(stringToNode(check[i].ccbin)); |
| resultRelInfo->ri_ConstraintExprs[i] = (List *) |
| ExecPrepareExpr((Expr *) qual, estate); |
| } |
| MemoryContextSwitchTo(oldContext); |
| } |
| |
| /* |
| * We will use the EState's per-tuple context for evaluating constraint |
| * expressions (creating it if it's not already there). |
| */ |
| econtext = GetPerTupleExprContext(estate); |
| |
| /* Arrange for econtext's scan tuple to be the tuple under test */ |
| econtext->ecxt_scantuple = slot; |
| |
| /* And evaluate the constraints */ |
| for (i = 0; i < ncheck; i++) |
| { |
| qual = resultRelInfo->ri_ConstraintExprs[i]; |
| |
| /* |
| * NOTE: SQL92 specifies that a NULL result from a constraint |
| * expression is not to be treated as a failure. Therefore, tell |
| * ExecQual to return TRUE for NULL. |
| */ |
| if (!ExecQual(qual, econtext, true)) |
| return check[i].ccname; |
| } |
| |
| /* NULL result means no error */ |
| return NULL; |
| } |
| |
| void |
| ExecConstraints(ResultRelInfo *resultRelInfo, |
| TupleTableSlot *slot, EState *estate) |
| { |
| Relation rel = resultRelInfo->ri_RelationDesc; |
| TupleConstr *constr = rel->rd_att->constr; |
| |
| Assert(constr); |
| |
| if (constr->has_not_null) |
| { |
| int natts = rel->rd_att->natts; |
| int attrChk; |
| |
| for (attrChk = 1; attrChk <= natts; attrChk++) |
| { |
| if (rel->rd_att->attrs[attrChk - 1]->attnotnull && |
| slot_attisnull(slot, attrChk)) |
| ereport(ERROR, |
| (errcode(ERRCODE_NOT_NULL_VIOLATION), |
| errmsg("null value in column \"%s\" violates not-null constraint", |
| NameStr(rel->rd_att->attrs[attrChk - 1]->attname)), |
| errOmitLocation(true))); |
| } |
| } |
| |
| if (constr->num_check > 0) |
| { |
| const char *failed; |
| |
| if ((failed = ExecRelCheck(resultRelInfo, slot, estate)) != NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_CHECK_VIOLATION), |
| errmsg("new row for relation \"%s\" violates check constraint \"%s\"", |
| RelationGetRelationName(rel), failed), |
| errOmitLocation(true))); |
| } |
| } |
| |
| /* |
| * Check a modified tuple to see if we want to process its updated version |
| * under READ COMMITTED rules. |
| * |
| * See backend/executor/README for some info about how this works. |
| * |
| * estate - executor state data |
| * rti - rangetable index of table containing tuple |
| * *tid - t_ctid from the outdated tuple (ie, next updated version) |
| * priorXmax - t_xmax from the outdated tuple |
| * curCid - command ID of current command of my transaction |
| * |
| * *tid is also an output parameter: it's modified to hold the TID of the |
| * latest version of the tuple (note this may be changed even on failure) |
| * |
| * Returns a slot containing the new candidate update/delete tuple, or |
| * NULL if we determine we shouldn't process the row. |
| */ |
| TupleTableSlot * |
| EvalPlanQual(EState *estate, Index rti, |
| ItemPointer tid, TransactionId priorXmax, CommandId curCid) |
| { |
| evalPlanQual *epq; |
| EState *epqstate; |
| Relation relation; |
| HeapTupleData tuple; |
| HeapTuple copyTuple = NULL; |
| bool endNode; |
| |
| Assert(rti != 0); |
| |
| /* |
| * find relation containing target tuple |
| */ |
| if (estate->es_result_relation_info != NULL && |
| estate->es_result_relation_info->ri_RangeTableIndex == rti) |
| relation = estate->es_result_relation_info->ri_RelationDesc; |
| else |
| { |
| ListCell *l; |
| |
| relation = NULL; |
| foreach(l, estate->es_rowMarks) |
| { |
| if (((ExecRowMark *) lfirst(l))->rti == rti) |
| { |
| relation = ((ExecRowMark *) lfirst(l))->relation; |
| break; |
| } |
| } |
| if (relation == NULL) |
| elog(ERROR, "could not find RowMark for RT index %u", rti); |
| } |
| |
| /* |
| * fetch tid tuple |
| * |
| * Loop here to deal with updated or busy tuples |
| */ |
| tuple.t_self = *tid; |
| for (;;) |
| { |
| Buffer buffer; |
| |
| if (heap_fetch(relation, SnapshotDirty, &tuple, &buffer, true, NULL)) |
| { |
| /* |
| * If xmin isn't what we're expecting, the slot must have been |
| * recycled and reused for an unrelated tuple. This implies that |
| * the latest version of the row was deleted, so we need do |
| * nothing. (Should be safe to examine xmin without getting |
| * buffer's content lock, since xmin never changes in an existing |
| * tuple.) |
| */ |
| if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data), |
| priorXmax)) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* otherwise xmin should not be dirty... */ |
| if (TransactionIdIsValid(SnapshotDirty->xmin)) |
| elog(ERROR, "t_xmin is uncommitted in tuple to be updated"); |
| |
| /* |
| * If tuple is being updated by other transaction then we have to |
| * wait for its commit/abort. |
| */ |
| if (TransactionIdIsValid(SnapshotDirty->xmax)) |
| { |
| ReleaseBuffer(buffer); |
| XactLockTableWait(SnapshotDirty->xmax); |
| continue; /* loop back to repeat heap_fetch */ |
| } |
| |
| /* |
| * If tuple was inserted by our own transaction, we have to check |
| * cmin against curCid: cmin >= curCid means our command cannot |
| * see the tuple, so we should ignore it. Without this we are |
| * open to the "Halloween problem" of indefinitely re-updating the |
| * same tuple. (We need not check cmax because |
| * HeapTupleSatisfiesDirty will consider a tuple deleted by our |
| * transaction dead, regardless of cmax.) We just checked that |
| * priorXmax == xmin, so we can test that variable instead of |
| * doing HeapTupleHeaderGetXmin again. |
| */ |
| if (TransactionIdIsCurrentTransactionId(priorXmax) && |
| HeapTupleHeaderGetCmin(tuple.t_data) >= curCid) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* |
| * We got tuple - now copy it for use by recheck query. |
| */ |
| copyTuple = heap_copytuple(&tuple); |
| ReleaseBuffer(buffer); |
| break; |
| } |
| |
| /* |
| * If the referenced slot was actually empty, the latest version of |
| * the row must have been deleted, so we need do nothing. |
| */ |
| if (tuple.t_data == NULL) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* |
| * As above, if xmin isn't what we're expecting, do nothing. |
| */ |
| if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data), |
| priorXmax)) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* |
| * If we get here, the tuple was found but failed SnapshotDirty. |
| * Assuming the xmin is either a committed xact or our own xact (as it |
| * certainly should be if we're trying to modify the tuple), this must |
| * mean that the row was updated or deleted by either a committed xact |
| * or our own xact. If it was deleted, we can ignore it; if it was |
| * updated then chain up to the next version and repeat the whole |
| * test. |
| * |
| * As above, it should be safe to examine xmax and t_ctid without the |
| * buffer content lock, because they can't be changing. |
| */ |
| if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid)) |
| { |
| /* deleted, so forget about it */ |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* updated, so look at the updated row */ |
| tuple.t_self = tuple.t_data->t_ctid; |
| /* updated row should have xmin matching this xmax */ |
| priorXmax = HeapTupleHeaderGetXmax(tuple.t_data); |
| ReleaseBuffer(buffer); |
| /* loop back to fetch next in chain */ |
| } |
| |
| /* |
| * For UPDATE/DELETE we have to return tid of actual row we're executing |
| * PQ for. |
| */ |
| *tid = tuple.t_self; |
| |
| /* |
| * Need to run a recheck subquery. Find or create a PQ stack entry. |
| */ |
| epq = estate->es_evalPlanQual; |
| endNode = true; |
| |
| if (epq != NULL && epq->rti == 0) |
| { |
| /* Top PQ stack entry is idle, so re-use it */ |
| Assert(!(estate->es_useEvalPlan) && epq->next == NULL); |
| epq->rti = rti; |
| endNode = false; |
| } |
| |
| /* |
| * If this is request for another RTE - Ra, - then we have to check wasn't |
| * PlanQual requested for Ra already and if so then Ra' row was updated |
| * again and we have to re-start old execution for Ra and forget all what |
| * we done after Ra was suspended. Cool? -:)) |
| */ |
| if (epq != NULL && epq->rti != rti && |
| epq->estate->es_evTuple[rti - 1] != NULL) |
| { |
| do |
| { |
| evalPlanQual *oldepq; |
| |
| /* stop execution */ |
| EvalPlanQualStop(epq); |
| /* pop previous PlanQual from the stack */ |
| oldepq = epq->next; |
| Assert(oldepq && oldepq->rti != 0); |
| /* push current PQ to freePQ stack */ |
| oldepq->free = epq; |
| epq = oldepq; |
| estate->es_evalPlanQual = epq; |
| } while (epq->rti != rti); |
| } |
| |
| /* |
| * If we are requested for another RTE then we have to suspend execution |
| * of current PlanQual and start execution for new one. |
| */ |
| if (epq == NULL || epq->rti != rti) |
| { |
| /* try to reuse plan used previously */ |
| evalPlanQual *newepq = (epq != NULL) ? epq->free : NULL; |
| |
| if (newepq == NULL) /* first call or freePQ stack is empty */ |
| { |
| newepq = (evalPlanQual *) palloc0(sizeof(evalPlanQual)); |
| newepq->free = NULL; |
| newepq->estate = NULL; |
| newepq->planstate = NULL; |
| } |
| else |
| { |
| /* recycle previously used PlanQual */ |
| Assert(newepq->estate == NULL); |
| epq->free = NULL; |
| } |
| /* push current PQ to the stack */ |
| newepq->next = epq; |
| epq = newepq; |
| estate->es_evalPlanQual = epq; |
| epq->rti = rti; |
| endNode = false; |
| } |
| |
| Assert(epq->rti == rti); |
| |
| /* |
| * Ok - we're requested for the same RTE. Unfortunately we still have to |
| * end and restart execution of the plan, because ExecReScan wouldn't |
| * ensure that upper plan nodes would reset themselves. We could make |
| * that work if insertion of the target tuple were integrated with the |
| * Param mechanism somehow, so that the upper plan nodes know that their |
| * children's outputs have changed. |
| * |
| * Note that the stack of free evalPlanQual nodes is quite useless at the |
| * moment, since it only saves us from pallocing/releasing the |
| * evalPlanQual nodes themselves. But it will be useful once we implement |
| * ReScan instead of end/restart for re-using PlanQual nodes. |
| */ |
| if (endNode) |
| { |
| /* stop execution */ |
| EvalPlanQualStop(epq); |
| } |
| |
| /* |
| * Initialize new recheck query. |
| * |
| * Note: if we were re-using PlanQual plans via ExecReScan, we'd need to |
| * instead copy down changeable state from the top plan (including |
| * es_result_relation_info, es_junkFilter) and reset locally changeable |
| * state in the epq (including es_param_exec_vals, es_evTupleNull). |
| */ |
| EvalPlanQualStart(epq, estate, epq->next); |
| |
| /* |
| * free old RTE' tuple, if any, and store target tuple where relation's |
| * scan node will see it |
| */ |
| epqstate = epq->estate; |
| if (epqstate->es_evTuple[rti - 1] != NULL) |
| heap_freetuple(epqstate->es_evTuple[rti - 1]); |
| epqstate->es_evTuple[rti - 1] = copyTuple; |
| |
| return EvalPlanQualNext(estate); |
| } |
| |
| static TupleTableSlot * |
| EvalPlanQualNext(EState *estate) |
| { |
| evalPlanQual *epq = estate->es_evalPlanQual; |
| MemoryContext oldcontext; |
| TupleTableSlot *slot; |
| |
| Assert(epq->rti != 0); |
| |
| lpqnext:; |
| oldcontext = MemoryContextSwitchTo(epq->estate->es_query_cxt); |
| slot = ExecProcNode(epq->planstate); |
| MemoryContextSwitchTo(oldcontext); |
| |
| /* |
| * No more tuples for this PQ. Continue previous one. |
| */ |
| if (TupIsNull(slot)) |
| { |
| evalPlanQual *oldepq; |
| |
| /* stop execution */ |
| EvalPlanQualStop(epq); |
| /* pop old PQ from the stack */ |
| oldepq = epq->next; |
| if (oldepq == NULL) |
| { |
| /* this is the first (oldest) PQ - mark as free */ |
| epq->rti = 0; |
| estate->es_useEvalPlan = false; |
| /* and continue Query execution */ |
| return NULL; |
| } |
| Assert(oldepq->rti != 0); |
| /* push current PQ to freePQ stack */ |
| oldepq->free = epq; |
| epq = oldepq; |
| estate->es_evalPlanQual = epq; |
| goto lpqnext; |
| } |
| |
| return slot; |
| } |
| |
| static void |
| EndEvalPlanQual(EState *estate) |
| { |
| evalPlanQual *epq = estate->es_evalPlanQual; |
| |
| if (epq->rti == 0) /* plans already shutdowned */ |
| { |
| Assert(epq->next == NULL); |
| return; |
| } |
| |
| for (;;) |
| { |
| evalPlanQual *oldepq; |
| |
| /* stop execution */ |
| EvalPlanQualStop(epq); |
| /* pop old PQ from the stack */ |
| oldepq = epq->next; |
| if (oldepq == NULL) |
| { |
| /* this is the first (oldest) PQ - mark as free */ |
| epq->rti = 0; |
| estate->es_useEvalPlan = false; |
| break; |
| } |
| Assert(oldepq->rti != 0); |
| /* push current PQ to freePQ stack */ |
| oldepq->free = epq; |
| epq = oldepq; |
| estate->es_evalPlanQual = epq; |
| } |
| } |
| |
| /* |
| * Start execution of one level of PlanQual. |
| * |
| * This is a cut-down version of ExecutorStart(): we copy some state from |
| * the top-level estate rather than initializing it fresh. |
| */ |
| static void |
| EvalPlanQualStart(evalPlanQual *epq, EState *estate, evalPlanQual *priorepq) |
| { |
| EState *epqstate; |
| int rtsize; |
| MemoryContext oldcontext; |
| |
| rtsize = list_length(estate->es_range_table); |
| |
| /* |
| * It's tempting to think about using CreateSubExecutorState here, but |
| * at present we can't because of memory leakage concerns ... |
| */ |
| epq->estate = epqstate = CreateExecutorState(); |
| |
| oldcontext = MemoryContextSwitchTo(epqstate->es_query_cxt); |
| |
| /* |
| * The epqstates share the top query's copy of unchanging state such as |
| * the snapshot, rangetable, result-rel info, and external Param info. |
| * They need their own copies of local state, including a tuple table, |
| * es_param_exec_vals, etc. |
| */ |
| epqstate->es_direction = ForwardScanDirection; |
| epqstate->es_snapshot = estate->es_snapshot; |
| epqstate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; |
| epqstate->es_range_table = estate->es_range_table; |
| epqstate->es_result_relations = estate->es_result_relations; |
| epqstate->es_num_result_relations = estate->es_num_result_relations; |
| epqstate->es_result_relation_info = estate->es_result_relation_info; |
| epqstate->es_junkFilter = estate->es_junkFilter; |
| epqstate->es_into_relation_descriptor = estate->es_into_relation_descriptor; |
| epqstate->es_into_relation_is_bulkload = estate->es_into_relation_is_bulkload; |
| epqstate->es_into_relation_last_heap_tid = estate->es_into_relation_last_heap_tid; |
| epqstate->es_param_list_info = estate->es_param_list_info; |
| if (estate->es_plannedstmt->planTree->nParamExec > 0) |
| epqstate->es_param_exec_vals = (ParamExecData *) |
| palloc0(estate->es_plannedstmt->planTree->nParamExec * sizeof(ParamExecData)); |
| epqstate->es_rowMarks = estate->es_rowMarks; |
| epqstate->es_instrument = estate->es_instrument; |
| epqstate->es_select_into = estate->es_select_into; |
| epqstate->es_into_oids = estate->es_into_oids; |
| epqstate->es_plannedstmt = estate->es_plannedstmt; |
| |
| /* |
| * Each epqstate must have its own es_evTupleNull state, but all the stack |
| * entries share es_evTuple state. This allows sub-rechecks to inherit |
| * the value being examined by an outer recheck. |
| */ |
| epqstate->es_evTupleNull = (bool *) palloc0(rtsize * sizeof(bool)); |
| if (priorepq == NULL) |
| /* first PQ stack entry */ |
| epqstate->es_evTuple = (HeapTuple *) |
| palloc0(rtsize * sizeof(HeapTuple)); |
| else |
| /* later stack entries share the same storage */ |
| epqstate->es_evTuple = priorepq->estate->es_evTuple; |
| |
| /* |
| * Create sub-tuple-table; we needn't redo the CountSlots work though. |
| */ |
| epqstate->es_tupleTable = |
| ExecCreateTupleTable(estate->es_tupleTable->size); |
| |
| /* |
| * Initialize the private state information for all the nodes in the query |
| * tree. This opens files, allocates storage and leaves us ready to start |
| * processing tuples. |
| */ |
| epq->planstate = ExecInitNode(estate->es_plannedstmt->planTree, epqstate, 0); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * End execution of one level of PlanQual. |
| * |
| * This is a cut-down version of ExecutorEnd(); basically we want to do most |
| * of the normal cleanup, but *not* close result relations (which we are |
| * just sharing from the outer query). We do, however, have to close any |
| * trigger target relations that got opened, since those are not shared. |
| */ |
| static void |
| EvalPlanQualStop(evalPlanQual *epq) |
| { |
| EState *epqstate = epq->estate; |
| MemoryContext oldcontext; |
| |
| oldcontext = MemoryContextSwitchTo(epqstate->es_query_cxt); |
| |
| ExecEndNode(epq->planstate); |
| |
| ExecDropTupleTable(epqstate->es_tupleTable, true); |
| epqstate->es_tupleTable = NULL; |
| |
| if (epqstate->es_evTuple[epq->rti - 1] != NULL) |
| { |
| heap_freetuple(epqstate->es_evTuple[epq->rti - 1]); |
| epqstate->es_evTuple[epq->rti - 1] = NULL; |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| FreeExecutorState(epqstate); |
| |
| epq->estate = NULL; |
| epq->planstate = NULL; |
| } |
| |
| /* |
| * GetUpdatedTuple_Int |
| * |
| * This function is an extraction of interesting parts of EvalPlanQual (and |
| * therefore it presents some code duplication, probably should clean up at |
| * some point if possible). We use it instead of EvalPlanQual when we are |
| * scanning a local relation and using heap_lock_tuple to lock entries for |
| * update, but we don't have access to the executor state and any RTE's. |
| * |
| * The inputs to this function is the old tuple tid the relation it's in, and |
| * the output is the actual updated relation. |
| */ |
| HeapTuple |
| GetUpdatedTuple_Int(Relation relation, |
| ItemPointer tid, |
| TransactionId priorXmax, |
| CommandId curCid) |
| { |
| HeapTupleData tuple; |
| HeapTuple copyTuple = NULL; |
| |
| /* |
| * fetch tid tuple |
| * |
| * Loop here to deal with updated or busy tuples |
| */ |
| tuple.t_self = *tid; |
| for (;;) |
| { |
| Buffer buffer; |
| |
| if (heap_fetch(relation, SnapshotDirty, &tuple, &buffer, true, NULL)) |
| { |
| /* |
| * If xmin isn't what we're expecting, the slot must have been |
| * recycled and reused for an unrelated tuple. This implies that |
| * the latest version of the row was deleted, so we need do |
| * nothing. (Should be safe to examine xmin without getting |
| * buffer's content lock, since xmin never changes in an existing |
| * tuple.) |
| */ |
| if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data), |
| priorXmax)) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* otherwise xmin should not be dirty... */ |
| if (TransactionIdIsValid(SnapshotDirty->xmin)) |
| elog(ERROR, "t_xmin is uncommitted in tuple to be updated"); |
| |
| /* |
| * If tuple is being updated by other transaction then we have to |
| * wait for its commit/abort. |
| */ |
| if (TransactionIdIsValid(SnapshotDirty->xmax)) |
| { |
| ReleaseBuffer(buffer); |
| XactLockTableWait(SnapshotDirty->xmax); |
| continue; /* loop back to repeat heap_fetch */ |
| } |
| |
| /* |
| * If tuple was inserted by our own transaction, we have to check |
| * cmin against curCid: cmin >= curCid means our command cannot |
| * see the tuple, so we should ignore it. Without this we are |
| * open to the "Halloween problem" of indefinitely re-updating the |
| * same tuple. (We need not check cmax because |
| * HeapTupleSatisfiesDirty will consider a tuple deleted by our |
| * transaction dead, regardless of cmax.) We just checked that |
| * priorXmax == xmin, so we can test that variable instead of |
| * doing HeapTupleHeaderGetXmin again. |
| */ |
| if (TransactionIdIsCurrentTransactionId(priorXmax) && |
| HeapTupleHeaderGetCmin(tuple.t_data) >= curCid) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* |
| * We got tuple - now copy it for use by recheck query. |
| */ |
| copyTuple = heap_copytuple(&tuple); |
| ReleaseBuffer(buffer); |
| break; |
| } |
| |
| /* |
| * If the referenced slot was actually empty, the latest version of |
| * the row must have been deleted, so we need do nothing. |
| */ |
| if (tuple.t_data == NULL) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* |
| * As above, if xmin isn't what we're expecting, do nothing. |
| */ |
| if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data), |
| priorXmax)) |
| { |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* |
| * If we get here, the tuple was found but failed SnapshotDirty. |
| * Assuming the xmin is either a committed xact or our own xact (as it |
| * certainly should be if we're trying to modify the tuple), this must |
| * mean that the row was updated or deleted by either a committed xact |
| * or our own xact. If it was deleted, we can ignore it; if it was |
| * updated then chain up to the next version and repeat the whole |
| * test. |
| * |
| * As above, it should be safe to examine xmax and t_ctid without the |
| * buffer content lock, because they can't be changing. |
| */ |
| if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid)) |
| { |
| /* deleted, so forget about it */ |
| ReleaseBuffer(buffer); |
| return NULL; |
| } |
| |
| /* updated, so look at the updated row */ |
| tuple.t_self = tuple.t_data->t_ctid; |
| /* updated row should have xmin matching this xmax */ |
| priorXmax = HeapTupleHeaderGetXmax(tuple.t_data); |
| ReleaseBuffer(buffer); |
| /* loop back to fetch next in chain */ |
| } |
| |
| //*tid = tuple.t_self; |
| |
| return copyTuple; |
| } |
| |
| /* |
| * Support for SELECT INTO (a/k/a CREATE TABLE AS) |
| * |
| * We implement SELECT INTO by diverting SELECT's normal output with |
| * a specialized DestReceiver type. |
| * |
| * TODO: remove some of the INTO-specific cruft from EState, and keep |
| * it in the DestReceiver instead. |
| */ |
| |
| typedef struct |
| { |
| DestReceiver pub; /* publicly-known function pointers */ |
| EState *estate; /* EState we are working with */ |
| AppendOnlyInsertDescData *ao_insertDesc; /* descriptor to AO tables */ |
| ParquetInsertDescData *parquet_insertDesc; /* descriptor to parquet tables */ |
| OrcInsertDescData *orc_insertDesc; // descriptor to orc tables |
| ExternalInsertDescData *ext_insertDesc; /* descriptor to external tables */ |
| } DR_intorel; |
| |
| static Relation |
| CreateIntoRel(QueryDesc *queryDesc) |
| { |
| EState *estate = queryDesc->estate; |
| char *intoName; |
| IntoClause *intoClause; |
| List *segnos = NIL; |
| |
| Relation intoRelationDesc; |
| |
| char relkind = RELKIND_RELATION; |
| char relstorage; |
| Oid namespaceId; |
| Oid tablespaceId; |
| Datum reloptions; |
| StdRdOptions *stdRdOptions; |
| AclResult aclresult; |
| Oid intoRelationId; |
| TupleDesc tupdesc; |
| Oid intoOid; |
| Oid intoComptypeOid; |
| GpPolicy *targetPolicy; |
| int safefswritesize = gp_safefswritesize; |
| ItemPointerData persistentTid; |
| int64 persistentSerialNum; |
| |
| Assert(Gp_role != GP_ROLE_EXECUTE); |
| |
| targetPolicy = queryDesc->plannedstmt->intoPolicy; |
| intoClause = queryDesc->plannedstmt->intoClause; |
| /* |
| * Check consistency of arguments |
| */ |
| Insist(intoClause); |
| if (intoClause->onCommit != ONCOMMIT_NOOP && !intoClause->rel->istemp) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("ON COMMIT can only be used on temporary tables"), |
| errOmitLocation(true))); |
| |
| /* MPP specific stuff */ |
| intoOid = intoClause->oidInfo.relOid; |
| intoComptypeOid = intoClause->oidInfo.comptypeOid; |
| |
| /* |
| * Find namespace to create in, check its permissions |
| */ |
| intoName = intoClause->rel->relname; |
| namespaceId = RangeVarGetCreationNamespace(intoClause->rel); |
| |
| aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), |
| ACL_CREATE); |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, ACL_KIND_NAMESPACE, |
| get_namespace_name(namespaceId)); |
| |
| /* |
| * Select tablespace to use. If not specified, use default_tablespace |
| * (which may in turn default to database's default). |
| */ |
| if (intoClause->tableSpaceName) |
| { |
| tablespaceId = get_tablespace_oid(intoClause->tableSpaceName); |
| if (!OidIsValid(tablespaceId)) |
| ereport(ERROR, |
| (errcode(ERRCODE_UNDEFINED_OBJECT), |
| errmsg("tablespace \"%s\" does not exist", |
| intoClause->tableSpaceName))); |
| } |
| else |
| { |
| tablespaceId = GetDefaultTablespace(); |
| |
| /* Need the real tablespace id for dispatch */ |
| if (!OidIsValid(tablespaceId)) |
| tablespaceId = MyDatabaseTableSpace; |
| |
| /* MPP-10329 - must dispatch tablespace */ |
| intoClause->tableSpaceName = get_tablespace_name(tablespaceId); |
| } |
| |
| /* Check permissions except when using the database's default space */ |
| if (tablespaceId != MyDatabaseTableSpace && |
| tablespaceId != get_database_dts(MyDatabaseId)) |
| { |
| AclResult aclresult; |
| |
| aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), |
| ACL_CREATE); |
| |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, ACL_KIND_TABLESPACE, |
| get_tablespace_name(tablespaceId)); |
| } |
| |
| |
| /* |
| * check options, report if user want to create heap table. |
| */ |
| { |
| bool has_option = false; |
| ListCell *cell = NULL; |
| |
| foreach(cell, intoClause->options) |
| { |
| DefElem *e = (DefElem *) lfirst(cell); |
| char *s = NULL; |
| |
| if (!IsA(e, DefElem)) |
| continue; |
| if (!e->arg || !IsA(e->arg, String)) |
| continue; |
| if (pg_strcasecmp(e->defname, "appendonly")) |
| continue; |
| |
| has_option = true; |
| s = strVal(e->arg); |
| if (pg_strcasecmp(s, "false") == 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED), |
| errmsg("gpsql does not support heap table, use append only table instead"), |
| errOmitLocation(true))); |
| } |
| |
| if (!has_option) |
| { |
| intoClause->options = lappend(intoClause->options, makeDefElem("appendonly", (Node *) makeString("true"))); |
| } |
| } |
| |
| /* Parse and validate any reloptions */ |
| reloptions = transformRelOptions((Datum) 0, intoClause->options, true, false); |
| /* get the relstorage (heap or AO tables) */ |
| stdRdOptions = (StdRdOptions*) heap_reloptions(relkind, reloptions, true); |
| heap_test_override_reloptions(relkind, stdRdOptions, &safefswritesize); |
| if(stdRdOptions->appendonly) |
| { |
| /* |
| relstorage = stdRdOptions->columnstore ? RELSTORAGE_AOCOLS : RELSTORAGE_AOROWS; |
| relstorage = stdRdOptions->parquetstore ? RELSTORAGE_PARQUET : RELSTORAGE_AOROWS; |
| */ |
| relstorage = stdRdOptions->columnstore; |
| } |
| else |
| { |
| relstorage = RELSTORAGE_HEAP; |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED), |
| errmsg("gpsql does not support heap table, use append only table instead"), |
| errOmitLocation(true))); |
| } |
| |
| /* have to copy the actual tupdesc to get rid of any constraints */ |
| tupdesc = CreateTupleDescCopy(queryDesc->tupDesc); |
| |
| /* Now we can actually create the new relation */ |
| intoRelationId = heap_create_with_catalog(intoName, |
| namespaceId, |
| tablespaceId, |
| intoOid, /* MPP */ |
| GetUserId(), |
| tupdesc, |
| /* relam */ InvalidOid, |
| relkind, |
| relstorage, |
| false, |
| true, |
| false, |
| 0, |
| intoClause->onCommit, |
| targetPolicy, /* MPP */ |
| reloptions, |
| allowSystemTableModsDDL, |
| &intoComptypeOid, /* MPP */ |
| &persistentTid, |
| &persistentSerialNum, |
| /* formattername */ NULL); |
| |
| FreeTupleDesc(tupdesc); |
| |
| /* |
| * Advance command counter so that the newly-created relation's catalog |
| * tuples will be visible to heap_open. |
| */ |
| CommandCounterIncrement(); |
| |
| /* |
| * If necessary, create a TOAST table for the new relation, or an Append |
| * Only segment table. Note that AlterTableCreateXXXTable ends with |
| * CommandCounterIncrement(), so that the new tables will be visible for |
| * insertion. |
| */ |
| AlterTableCreateToastTableWithOid(intoRelationId, |
| intoClause->oidInfo.toastOid, |
| intoClause->oidInfo.toastIndexOid, |
| &intoClause->oidInfo.toastComptypeOid, |
| false); |
| AlterTableCreateAoSegTableWithOid(intoRelationId, |
| intoClause->oidInfo.aosegOid, |
| intoClause->oidInfo.aosegIndexOid, |
| &intoClause->oidInfo.aosegComptypeOid, |
| false); |
| /* don't create AO block directory here, it'll be created when needed */ |
| |
| /* |
| * Advance command counter so that the newly-created relation's catalog |
| * tuples will be visible to heap_open. |
| */ |
| CommandCounterIncrement(); |
| |
| /* |
| * And open the constructed table for writing. |
| */ |
| intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock); |
| |
| /* |
| * Add column encoding entries based on the WITH clause. |
| * |
| * NOTE: we could also do this expansion during parse analysis, by |
| * expanding the IntoClause options field into some attr_encodings field |
| * (cf. CreateStmt and transformCreateStmt()). As it stands, there's no real |
| * benefit for doing that from a code complexity POV. In fact, it would mean |
| * more code. If, however, we supported column encoding syntax during CTAS, |
| * it would be a good time to relocate this code. |
| */ |
| AddDefaultRelationAttributeOptions(intoRelationDesc, |
| intoClause->options); |
| |
| /* |
| * create a list of segment file numbers for insert. |
| */ |
| segnos = SetSegnoForWrite(NIL, intoRelationId, GetQEGangNum(), true, true, false); |
| CreateAoSegFileForRelationOnMaster(intoRelationDesc, segnos); |
| queryDesc->plannedstmt->into_aosegnos = segnos; |
| |
| /** |
| * lock segment files |
| */ |
| /* |
| * currently, we disable vacuum, do not lock since lock table is too small. |
| */ |
| /*if (Gp_role == GP_ROLE_DISPATCH) |
| LockSegfilesOnMaster(intoRelationDesc, 1);*/ |
| |
| intoClause->oidInfo.relOid = intoRelationId; |
| estate->es_into_relation_descriptor = intoRelationDesc; |
| |
| return intoRelationDesc; |
| } |
| |
| static Relation |
| CreateIntoMagmaRel(QueryDesc *queryDesc) |
| { |
| EState *estate = queryDesc->estate; |
| |
| Relation intoRelationDesc; |
| Oid intoRelationId; |
| CreateExternalStmt *createExtStmt = makeNode(CreateExternalStmt); |
| |
| IntoClause *intoClause; |
| intoClause = queryDesc->plannedstmt->intoClause; |
| Insist(intoClause); |
| |
| createExtStmt->exttypedesc = makeNode(ExtTableTypeDesc); |
| ExtTableTypeDesc *desc = (ExtTableTypeDesc *)(createExtStmt->exttypedesc); |
| |
| desc->exttabletype = EXTTBL_TYPE_MAGMA; |
| desc->location_list = NIL; |
| int location_len = 0; |
| location_len = strlen(PROTOCOL_MAGMA) + /* magma:// */ |
| strlen(magma_nodes_url) + 1; /* magma_nodes_url + '\0' */ |
| |
| char *path = (char *)palloc(sizeof(char) * location_len); |
| |
| sprintf(path, "%s%s", PROTOCOL_MAGMA, magma_nodes_url); |
| |
| desc->location_list = list_make1((Node *) makeString(path)); |
| |
| desc->command_string = NULL; |
| desc->on_clause = NIL; |
| |
| /* have to copy the actual tupdesc to get rid of any constraints */ |
| TupleDesc tupdesc = CreateTupleDescCopy(queryDesc->tupDesc); |
| |
| createExtStmt->iswritable = TRUE; |
| createExtStmt->isexternal = FALSE; |
| createExtStmt->isweb = FALSE; |
| createExtStmt->base.relation = intoClause->rel; |
| createExtStmt->base.tableElts = BuildSchemaFromDesc(tupdesc); |
| createExtStmt->format = pstrdup(default_table_format); |
| createExtStmt->base.tablespacename = NULL; // tablespace is meaningless for external table |
| createExtStmt->base.options = intoClause->options; |
| createExtStmt->encoding = PG_SQL_ASCII; // default magma encoding |
| createExtStmt->sreh = NULL; |
| createExtStmt->base.partitionBy = NIL; // cannot create a partitioned table using CREATE TABLE AS SELECT |
| createExtStmt->policy = queryDesc->plannedstmt->intoPolicy; |
| FreeTupleDesc(tupdesc); |
| |
| /* |
| * check options, report if user want to create heap or ao table. |
| */ |
| ListCell *cell = NULL; |
| foreach(cell, createExtStmt->base.options) |
| { |
| DefElem *e = (DefElem *) lfirst(cell); |
| char *s = NULL; |
| |
| if (!IsA(e, DefElem)) |
| continue; |
| if (!e->arg || !IsA(e->arg, String)) |
| continue; |
| if (pg_strcasecmp(e->defname, "appendonly")) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED), |
| errmsg("does not support heap or appendonly table when default_table_format is magma format"), |
| errOmitLocation(true))); |
| } |
| |
| GpPolicy *targetPolicy; |
| targetPolicy = queryDesc->plannedstmt->intoPolicy; |
| /* |
| * in magma case, if distribution columns is not specified, |
| * choose first column as distribution key. |
| */ |
| if (targetPolicy->nattrs == 0) { |
| targetPolicy->nattrs = 1; |
| targetPolicy->attrs[0] = 1; |
| targetPolicy->bucketnum = GetRelOpt_bucket_num_fromOptions( |
| createExtStmt->base.options, GetDefaultMagmaBucketNum()); |
| } |
| // actually create magma table here |
| DefineExternalRelation(createExtStmt); |
| |
| intoRelationId = RangeVarGetRelid(createExtStmt->base.relation, true, false /*allowHcatalog*/); |
| |
| Assert(Gp_role != GP_ROLE_EXECUTE); |
| |
| /* |
| * Check consistency of arguments |
| */ |
| if (intoClause->onCommit != ONCOMMIT_NOOP && !intoClause->rel->istemp) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TABLE_DEFINITION), |
| errmsg("ON COMMIT can only be used on temporary tables"), |
| errOmitLocation(true))); |
| |
| /* |
| * Advance command counter so that the newly-created relation's catalog |
| * tuples will be visible to heap_open. |
| */ |
| CommandCounterIncrement(); |
| |
| /* |
| * And open the constructed table for writing. |
| */ |
| intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock); |
| |
| /* |
| * Add column encoding entries based on the WITH clause. |
| * |
| * NOTE: we could also do this expansion during parse analysis, by |
| * expanding the IntoClause options field into some attr_encodings field |
| * (cf. CreateStmt and transformCreateStmt()). As it stands, there's no real |
| * benefit for doing that from a code complexity POV. In fact, it would mean |
| * more code. If, however, we supported column encoding syntax during CTAS, |
| * it would be a good time to relocate this code. |
| */ |
| // options for external table is incompatible with internal table. |
| // AddDefaultRelationAttributeOptions(intoRelationDesc, |
| // intoClause->options); |
| |
| /* |
| * create a list of magma table ranges for insert. |
| */ |
| List * relOids = list_make1_oid(intoRelationId); |
| estate->es_plannedstmt->scantable_splits = list_concat( |
| estate->es_plannedstmt->scantable_splits, get_magma_scansplits(relOids)); |
| |
| /* |
| * list of segment file numbers, used for ao format table. |
| */ |
| queryDesc->plannedstmt->into_aosegnos = NIL; |
| |
| intoClause->oidInfo.relOid = intoRelationId; |
| estate->es_into_relation_descriptor = intoRelationDesc; |
| |
| return intoRelationDesc; |
| } |
| |
| /* |
| * OpenIntoRel --- actually create the SELECT INTO target relation |
| * |
| * This also replaces QueryDesc->dest with the special DestReceiver for |
| * SELECT INTO. We assume that the correct result tuple type has already |
| * been placed in queryDesc->tupDesc. |
| */ |
| static void |
| OpenIntoRel(QueryDesc *queryDesc) |
| { |
| EState *estate = queryDesc->estate; |
| Relation intoRelationDesc; |
| |
| IntoClause *intoClause; |
| |
| Oid intoRelationId; |
| DR_intorel *myState; |
| |
| if (Gp_role != GP_ROLE_EXECUTE) |
| { |
| if (strcmp("appendonly", default_table_format) == 0) |
| { |
| intoRelationDesc = CreateIntoRel(queryDesc); |
| } else if (strcmp(MAGMA_TP_FORMAT, default_table_format) == 0 |
| || strcmp(MAGMA_AP_FORMAT, default_table_format) == 0) |
| { |
| intoRelationDesc = CreateIntoMagmaRel(queryDesc); |
| } |
| } |
| else |
| { |
| intoClause = queryDesc->plannedstmt->intoClause; |
| intoRelationId = intoClause->oidInfo.relOid; |
| Assert(OidIsValid(intoRelationId)); |
| /* |
| * And open the constructed table for writing. |
| */ |
| intoRelationDesc = heap_open(intoRelationId, RowShareLock); |
| estate->es_into_relation_descriptor = intoRelationDesc; |
| } |
| |
| /* use_wal off requires rd_targblock be initially invalid */ |
| Assert(intoRelationDesc->rd_targblock == InvalidBlockNumber); |
| |
| Assert (!estate->es_into_relation_is_bulkload); |
| |
| /* |
| * Now replace the query's DestReceiver with one for SELECT INTO |
| */ |
| queryDesc->dest = CreateDestReceiver(DestIntoRel, NULL); |
| myState = (DR_intorel *) queryDesc->dest; |
| Assert(myState->pub.mydest == DestIntoRel); |
| myState->estate = estate; |
| myState->estate->into_aosegnos = queryDesc->plannedstmt->into_aosegnos; |
| } |
| |
| /* |
| * CloseIntoRel --- clean up SELECT INTO at ExecutorEnd time |
| */ |
| static void |
| CloseIntoRel(QueryDesc *queryDesc) |
| { |
| EState *estate = queryDesc->estate; |
| Relation rel = estate->es_into_relation_descriptor; |
| |
| /* Partition with SELECT INTO is not supported */ |
| Assert(!PointerIsValid(estate->es_result_partitions)); |
| |
| /* OpenIntoRel might never have gotten called */ |
| if (rel) |
| { |
| /* APPEND_ONLY is closed in the intorel_shutdown */ |
| if (!(RelationIsAo(rel) || RelationIsExternal(rel))) |
| { |
| Insist(!"hawq does not support heap table, use append only table instead"); |
| } |
| |
| /* close rel, but keep lock until commit */ |
| heap_close(rel, NoLock); |
| |
| rel = NULL; |
| } |
| } |
| |
| /* |
| * CreateIntoRelDestReceiver -- create a suitable DestReceiver object |
| * |
| * Since CreateDestReceiver doesn't accept the parameters we'd need, |
| * we just leave the private fields empty here. OpenIntoRel will |
| * fill them in. |
| */ |
| DestReceiver * |
| CreateIntoRelDestReceiver(void) |
| { |
| DR_intorel *self = (DR_intorel *) palloc(sizeof(DR_intorel)); |
| |
| self->pub.receiveSlot = intorel_receive; |
| self->pub.rStartup = intorel_startup; |
| self->pub.rShutdown = intorel_shutdown; |
| self->pub.rDestroy = intorel_destroy; |
| self->pub.mydest = DestIntoRel; |
| |
| self->estate = NULL; |
| self->ao_insertDesc = NULL; |
| self->parquet_insertDesc = NULL; |
| self->orc_insertDesc = NULL; |
| self->ext_insertDesc = NULL; |
| |
| return (DestReceiver *) self; |
| } |
| |
| /* |
| * intorel_startup --- executor startup |
| */ |
| static void |
| intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) |
| { |
| UnusedArg(self); |
| UnusedArg(operation); |
| UnusedArg(typeinfo); |
| |
| /* no-op */ |
| } |
| |
| /* |
| * intorel_receive --- receive one tuple |
| */ |
| static void |
| intorel_receive(TupleTableSlot *slot, DestReceiver *self) |
| { |
| DR_intorel *myState = (DR_intorel *) self; |
| EState *estate = myState->estate; |
| Relation into_rel = estate->es_into_relation_descriptor; |
| |
| Assert(estate->es_result_partitions == NULL); |
| |
| if (RelationIsAoRows(into_rel)) |
| { |
| MemTuple tuple = ExecCopySlotMemTuple(slot); |
| Oid tupleOid; |
| AOTupleId aoTupleId; |
| |
| if (myState->ao_insertDesc == NULL) |
| { |
| int segno = list_nth_int(estate->into_aosegnos, GetQEIndex()); |
| ResultRelSegFileInfo *segfileinfo = InitResultRelSegFileInfo(segno, RELSTORAGE_AOROWS, 1); |
| myState->ao_insertDesc = appendonly_insert_init(into_rel, |
| segfileinfo); |
| } |
| |
| appendonly_insert(myState->ao_insertDesc, tuple, &tupleOid, &aoTupleId); |
| pfree(tuple); |
| } |
| else if(RelationIsParquet(into_rel)) |
| { |
| if(myState->parquet_insertDesc == NULL) |
| { |
| int segno = list_nth_int(estate->into_aosegnos, GetQEIndex()); |
| ResultRelSegFileInfo *segfileinfo = InitResultRelSegFileInfo(segno, RELSTORAGE_PARQUET, 1); |
| myState->parquet_insertDesc = parquet_insert_init(into_rel, segfileinfo); |
| } |
| |
| parquet_insert(myState->parquet_insertDesc, slot); |
| } |
| else if (RelationIsOrc(into_rel)) |
| { |
| if(myState->orc_insertDesc == NULL) |
| { |
| int segno = list_nth_int(estate->into_aosegnos, GetQEIndex()); |
| ResultRelSegFileInfo *segfileinfo = InitResultRelSegFileInfo(segno, RELSTORAGE_ORC, 1); |
| myState->orc_insertDesc = orcBeginInsert(into_rel, segfileinfo); |
| } |
| orcInsert(myState->orc_insertDesc, slot); |
| } |
| else if(RelationIsExternal(into_rel)) |
| { |
| /* Writable external table */ |
| if (myState->ext_insertDesc == NULL) |
| { |
| /* Get pg_exttable information for the external table */ |
| ExtTableEntry *extEntry = |
| GetExtTableEntry(RelationGetRelid(into_rel)); |
| |
| /* Get formatter type and name for the external table */ |
| int formatterType = ExternalTableType_Invalid; |
| char *formatterName = NULL; |
| |
| getExternalTableTypeStr(extEntry->fmtcode, extEntry->fmtopts, |
| &formatterType, &formatterName); |
| |
| pfree(extEntry); |
| |
| if (formatterType == ExternalTableType_Invalid) |
| { |
| elog(ERROR, "invalid formatter type for external table: %s", __func__); |
| } |
| else if (formatterType != ExternalTableType_PLUG) |
| { |
| myState->ext_insertDesc = external_insert_init( |
| into_rel, 0, formatterType, formatterName, estate->es_plannedstmt); |
| } |
| else |
| { |
| Assert(formatterName && (strcmp(formatterName, MAGMA_AP_FORMAT) == 0 || |
| strcmp(formatterName, MAGMA_TP_FORMAT) == 0)); |
| |
| Oid procOid = LookupPlugStorageValidatorFunc(formatterName, |
| "insert_init"); |
| |
| if (OidIsValid(procOid)) |
| { |
| FmgrInfo insertInitFunc; |
| fmgr_info(procOid, &insertInitFunc); |
| |
| ResultRelSegFileInfo *segfileinfo = makeNode(ResultRelSegFileInfo); |
| segfileinfo->segno = 0; |
| /* this structure is used for ao and orc insert, set to zero in magma case |
| ResultRelInfoSetSegFileInfo(resultRelInfo, |
| estate->es_result_segfileinfos); |
| segfileinfo = (ResultRelSegFileInfo *) list_nth( |
| resultRelInfo->ri_aosegfileinfos, GetQEIndex()); |
| */ |
| myState->ext_insertDesc = |
| InvokePlugStorageFormatInsertInit(&insertInitFunc, |
| into_rel, |
| formatterType, |
| formatterName, |
| estate->es_plannedstmt, |
| segfileinfo->segno, |
| PlugStorageGetTransactionSnapshot(NULL)); |
| } |
| else |
| { |
| elog(ERROR, "%s_insert_init function was not found", formatterName); |
| } |
| } |
| } |
| |
| ExternalInsertDesc extInsertDesc = myState->ext_insertDesc; |
| if (extInsertDesc->ext_formatter_type == ExternalTableType_Invalid) |
| { |
| elog(ERROR, "invalid formatter type for external table: %s", __func__); |
| } |
| else if (extInsertDesc->ext_formatter_type != ExternalTableType_PLUG) |
| { |
| external_insert(extInsertDesc, slot); |
| } |
| else |
| { |
| Assert(extInsertDesc->ext_formatter_name); |
| |
| /* Form virtual tuple */ |
| slot_getallattrs(slot); |
| |
| FmgrInfo *insertFunc = extInsertDesc->ext_ps_insert_funcs.insert; |
| |
| if (insertFunc) |
| { |
| InvokePlugStorageFormatInsert(insertFunc, |
| extInsertDesc, |
| slot); |
| } |
| else |
| { |
| elog(ERROR, "%s_insert function was not found", |
| extInsertDesc->ext_formatter_name); |
| } |
| } |
| } |
| else |
| { |
| /* gpsql do not support heap table */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED), |
| errmsg("gpsql does not support heap table, use append only table instead"))); |
| } |
| |
| /* We know this is a newly created relation, so there are no indexes */ |
| |
| IncrAppended(); |
| } |
| |
| /* |
| * intorel_shutdown --- executor end |
| */ |
| static void |
| intorel_shutdown(DestReceiver *self) |
| { |
| int aocount = 0; |
| |
| /* If target was append only, finalise */ |
| DR_intorel *myState = (DR_intorel *) self; |
| EState *estate = myState->estate; |
| Relation into_rel = estate->es_into_relation_descriptor; |
| |
| StringInfo buf = NULL; |
| QueryContextDispatchingSendBack sendback = NULL; |
| |
| if (RelationIsAoRows(into_rel) && myState->ao_insertDesc) |
| ++aocount; |
| else if(RelationIsParquet(into_rel) && myState->parquet_insertDesc) |
| ++aocount; |
| else if (RelationIsOrc(into_rel) && myState->orc_insertDesc) |
| ++aocount; |
| |
| if (Gp_role == GP_ROLE_EXECUTE && aocount > 0) |
| buf = PreSendbackChangedCatalog(aocount); |
| |
| if (RelationIsAoRows(into_rel) && myState->ao_insertDesc) |
| { |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| myState->ao_insertDesc->sendback = sendback; |
| |
| sendback->relid = RelationGetRelid(myState->ao_insertDesc->aoi_rel); |
| |
| appendonly_insert_finish(myState->ao_insertDesc); |
| } |
| else if (RelationIsParquet(into_rel) && myState->parquet_insertDesc) |
| { |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| myState->parquet_insertDesc->sendback = sendback; |
| |
| sendback->relid = RelationGetRelid(myState->parquet_insertDesc->parquet_rel); |
| |
| parquet_insert_finish(myState->parquet_insertDesc); |
| } |
| else if (RelationIsOrc(into_rel) && myState->orc_insertDesc) |
| { |
| sendback = CreateQueryContextDispatchingSendBack(1); |
| myState->orc_insertDesc->sendback = sendback; |
| sendback->relid = RelationGetRelid(myState->orc_insertDesc->rel); |
| orcEndInsert(myState->orc_insertDesc); |
| } |
| else if (RelationIsExternal(into_rel) && myState->ext_insertDesc) |
| { |
| ExternalInsertDesc extInsertDesc = myState->ext_insertDesc; |
| |
| if (extInsertDesc->ext_formatter_type == ExternalTableType_Invalid) |
| { |
| elog(ERROR, "invalid formatter type for external table: %s", __func__); |
| } |
| else if (extInsertDesc->ext_formatter_type != ExternalTableType_PLUG) |
| { |
| external_insert_finish(extInsertDesc); |
| } |
| else |
| { |
| FmgrInfo *insertFinishFunc = |
| extInsertDesc->ext_ps_insert_funcs.insert_finish; |
| |
| if (insertFinishFunc) |
| { |
| InvokePlugStorageFormatInsertFinish(insertFinishFunc, |
| extInsertDesc); |
| } |
| else |
| { |
| elog(ERROR, "%s_insert_finish function was not found", |
| extInsertDesc->ext_formatter_name); |
| } |
| } |
| } |
| |
| if (sendback && Gp_role == GP_ROLE_EXECUTE) |
| AddSendbackChangedCatalogContent(buf, sendback); |
| |
| DropQueryContextDispatchingSendBack(sendback); |
| |
| if (Gp_role == GP_ROLE_EXECUTE && aocount > 0) |
| FinishSendbackChangedCatalog(buf); |
| } |
| |
| /* |
| * intorel_destroy --- release DestReceiver object |
| */ |
| static void |
| intorel_destroy(DestReceiver *self) |
| { |
| pfree(self); |
| } |
| |
| /* |
| * Calculate the part to use for the given key, then find or calculate |
| * and cache required information about that part in the hash table |
| * anchored in estate. |
| * |
| * Return a pointer to the information, an entry in the table |
| * estate->es_result_relations. Note that the first entry in this |
| * table is for the partitioned table itself and that the entire table |
| * may be reallocated, changing the addresses of its entries. |
| * |
| * Thus, it is important to avoid holding long-lived pointers to |
| * table entries (such as the pointer returned from this function). |
| */ |
| static ResultRelInfo * |
| get_part(EState *estate, Datum *values, bool *isnull, TupleDesc tupdesc) |
| { |
| ResultRelInfo *resultRelInfo; |
| Oid targetid; |
| bool found; |
| ResultPartHashEntry *entry; |
| |
| /* add a short term memory context if one wasn't assigned already */ |
| Assert(estate->es_partition_state != NULL && |
| estate->es_partition_state->accessMethods != NULL); |
| if (!estate->es_partition_state->accessMethods->part_cxt) |
| estate->es_partition_state->accessMethods->part_cxt = |
| GetPerTupleExprContext(estate)->ecxt_per_tuple_memory; |
| |
| targetid = selectPartition(estate->es_result_partitions, values, |
| isnull, tupdesc, estate->es_partition_state->accessMethods); |
| |
| if (!OidIsValid(targetid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_PARTITION_FOR_PARTITIONING_KEY), |
| errmsg("no partition for partitioning key"), |
| errOmitLocation(true))); |
| |
| |
| Assert(estate->es_result_partitions && estate->es_result_partitions->part); |
| Assert(estate->es_result_relations && estate->es_result_relations->ri_RelationDesc); |
| Oid parent = estate->es_result_partitions->part->parrelid; |
| Oid result = RelationGetRelid(estate->es_result_relations->ri_RelationDesc); |
| /* |
| * We insert into a partition's child table. |
| */ |
| if (parent != result && targetid != result) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_PARTITION_FOR_PARTITIONING_KEY), |
| errmsg("the data does not belong to partition: %s", |
| RelationGetRelationName(estate->es_result_relations->ri_RelationDesc)), |
| errOmitLocation(true))); |
| } |
| |
| if (estate->es_partition_state->result_partition_hash == NULL) |
| { |
| HASHCTL ctl; |
| long num_buckets; |
| |
| /* reasonable assumption? */ |
| num_buckets = |
| list_length(all_partition_relids(estate->es_result_partitions)); |
| num_buckets /= num_partition_levels(estate->es_result_partitions); |
| |
| ctl.keysize = sizeof(Oid); |
| ctl.entrysize = sizeof(*entry); |
| ctl.hash = oid_hash; |
| |
| estate->es_partition_state->result_partition_hash = |
| hash_create("Partition Result Relation Hash", |
| num_buckets, |
| &ctl, |
| HASH_ELEM | HASH_FUNCTION); |
| } |
| |
| entry = hash_search(estate->es_partition_state->result_partition_hash, |
| &targetid, |
| HASH_ENTER, |
| &found); |
| |
| if (found) |
| { |
| resultRelInfo = estate->es_result_relations; |
| resultRelInfo += entry->offset; |
| Assert(RelationGetRelid(resultRelInfo->ri_RelationDesc) == targetid); |
| } |
| else |
| { |
| int result_array_size = |
| estate->es_partition_state->result_partition_array_size; |
| RangeTblEntry *rte = makeNode(RangeTblEntry); |
| List *rangeTable; |
| int natts; |
| |
| if (estate->es_num_result_relations + 1 >= result_array_size) |
| { |
| int32 sz = result_array_size * 2; |
| |
| /* we shouldn't be able to overflow */ |
| Insist((int)sz > result_array_size); |
| |
| estate->es_result_relation_info = estate->es_result_relations = |
| (ResultRelInfo *)repalloc(estate->es_result_relations, |
| sz * sizeof(ResultRelInfo)); |
| estate->es_partition_state->result_partition_array_size = (int)sz; |
| } |
| |
| resultRelInfo = estate->es_result_relations; |
| natts = resultRelInfo->ri_RelationDesc->rd_att->natts; /* in base relation */ |
| resultRelInfo += estate->es_num_result_relations; |
| entry->offset = estate->es_num_result_relations; |
| |
| estate->es_num_result_relations++; |
| |
| rte->relid = targetid; /* all we need */ |
| rangeTable = list_make1(rte); |
| initResultRelInfo(resultRelInfo, 1, |
| rangeTable, |
| CMD_INSERT, |
| estate->es_instrument, |
| (Gp_role != GP_ROLE_EXECUTE || Gp_is_writer)); |
| |
| map_part_attrs(estate->es_result_relations->ri_RelationDesc, |
| resultRelInfo->ri_RelationDesc, |
| &(resultRelInfo->ri_partInsertMap), |
| TRUE); /* throw on error, so result not needed */ |
| |
| if (resultRelInfo->ri_partInsertMap) |
| resultRelInfo->ri_partSlot = |
| MakeSingleTupleTableSlot(resultRelInfo->ri_RelationDesc->rd_att); |
| } |
| return resultRelInfo; |
| } |
| |
| ResultRelInfo * |
| values_get_partition(Datum *values, bool *nulls, TupleDesc tupdesc, |
| EState *estate) |
| { |
| ResultRelInfo *relinfo; |
| |
| Assert(PointerIsValid(estate->es_result_partitions)); |
| |
| relinfo = get_part(estate, values, nulls, tupdesc); |
| |
| return relinfo; |
| } |
| |
| /* |
| * Find the partition we want and get the ResultRelInfo for the |
| * partition. |
| */ |
| ResultRelInfo * |
| slot_get_partition(TupleTableSlot *slot, EState *estate) |
| { |
| ResultRelInfo *resultRelInfo; |
| AttrNumber max_attr; |
| Datum *values; |
| bool *nulls; |
| |
| Assert(PointerIsValid(estate->es_result_partitions)); |
| |
| max_attr = estate->es_partition_state->max_partition_attr; |
| |
| slot_getsomeattrs(slot, max_attr); |
| values = slot_get_values(slot); |
| nulls = slot_get_isnull(slot); |
| |
| resultRelInfo = get_part(estate, values, nulls, slot->tts_tupleDescriptor); |
| |
| return resultRelInfo; |
| } |
| |
| /* Wrap an attribute map (presumably from base partitioned table to part |
| * as created by map_part_attrs in execMain.c) with an AttrMap. The new |
| * AttrMap will contain a copy of the argument map. The caller retains |
| * the responsibility to dispose of the argument map eventually. |
| * |
| * If the input AttrNumber vector is empty or null, it is taken as an |
| * identity map, i.e., a null AttrMap. |
| */ |
| AttrMap *makeAttrMap(int base_count, AttrNumber *base_map) |
| { |
| int i, n, p; |
| AttrMap *map; |
| |
| if ( base_count < 1 || base_map == NULL ) |
| return NULL; |
| |
| map = palloc0(sizeof(AttrMap) + base_count * sizeof(AttrNumber)); |
| |
| for ( i = n = p = 0; i <= base_count; i++ ) |
| { |
| map->attr_map[i] = base_map[i]; |
| |
| if ( map->attr_map[i] != 0 ) |
| { |
| if ( map->attr_map[i] > p ) p = map->attr_map[i]; |
| n++; |
| } |
| } |
| |
| map->live_count = n; |
| map->attr_max = p; |
| map->attr_count = base_count; |
| |
| return map; |
| } |
| |
| /* Invert a base-to-part attribute map to produce a part-to-base attribute map. |
| * The result attribute map will have a attribute count at least as large as |
| * the largest part attribute in the base map, however, the argument inv_count |
| * may be used to force a larger count. |
| * |
| * The identity map, null, is handled specially. |
| */ |
| AttrMap *invertedAttrMap(AttrMap *base_map, int inv_count) |
| { |
| AttrMap *inv_map; |
| int i; |
| |
| if ( base_map == NULL ) |
| return NULL; |
| |
| if ( inv_count < base_map->attr_max ) |
| { |
| inv_count = base_map->attr_max; |
| } |
| |
| inv_map = palloc0(sizeof(AttrMap) + inv_count * sizeof(AttrNumber)); |
| |
| inv_map->live_count = base_map->live_count; |
| inv_map->attr_max = base_map->attr_count; |
| inv_map->attr_count = inv_count; |
| |
| for ( i = 1; i <= base_map->attr_count; i++ ) |
| { |
| AttrNumber inv = base_map->attr_map[i]; |
| |
| if ( inv == 0 ) continue; |
| inv_map->attr_map[inv] = i; |
| } |
| |
| return inv_map; |
| } |
| |
| static AttrMap *copyAttrMap(AttrMap *original) |
| { |
| AttrMap *copy; |
| size_t sz; |
| |
| if ( original == NULL ) return NULL; |
| |
| sz = sizeof(AttrMap) + original->attr_count *sizeof(AttrNumber); |
| copy = palloc0(sz); |
| memcpy(copy, original, sz); |
| |
| return copy; |
| } |
| |
| /* Compose two attribute maps giving a map with the same effect as applying |
| * the first (input) map, then the second (output) map. |
| */ |
| AttrMap *compositeAttrMap(AttrMap *input_map, AttrMap *output_map) |
| { |
| AttrMap *composite_map; |
| int i; |
| |
| if ( input_map == NULL ) |
| return copyAttrMap(output_map); |
| if ( output_map == NULL ) |
| return copyAttrMap(input_map); |
| |
| composite_map = copyAttrMap(input_map); |
| composite_map->attr_max = output_map->attr_max; |
| |
| for ( i = 1; i <=input_map->attr_count; i++ ) |
| { |
| AttrNumber k = attrMap(output_map, attrMap(input_map, i)); |
| composite_map->attr_map[i] = k; |
| } |
| |
| return composite_map; |
| } |
| |
| /* Use the given attribute map to convert an attribute number in the |
| * base relation to an attribute number in the other relation. Forgive |
| * out-of-range attributes by mapping them to zero. Treat null as |
| * the identity map. |
| */ |
| AttrNumber attrMap(AttrMap *map, AttrNumber anum) |
| { |
| if ( map == NULL ) |
| return anum; |
| if ( 0 < anum && anum <= map->attr_count ) |
| return map->attr_map[anum]; |
| return 0; |
| } |
| |
| /* Apply attrMap over an integer list of attribute numbers. |
| */ |
| List *attrMapIntList(AttrMap *map, List *attrs) |
| { |
| ListCell *lc; |
| List *remapped_list = NIL; |
| |
| foreach (lc, attrs) |
| { |
| int anum = (int)attrMap(map, (AttrNumber)lfirst_int(lc)); |
| remapped_list = lappend_int(remapped_list, anum); |
| } |
| return remapped_list; |
| } |
| |
| /* For attrMapExpr below. |
| * |
| * Mutate Var nodes in an expression using the given attribute map. |
| * Insist the Var nodes have varno == 1 and the that the mapping |
| * yields a live attribute number (non-zero). |
| */ |
| static Node *apply_attrmap_mutator(Node *node, AttrMap *map) |
| { |
| if ( node == NULL ) |
| return NULL; |
| |
| if (IsA(node, Var) ) |
| { |
| AttrNumber anum = 0; |
| Var *var = (Var*)node; |
| Assert(var->varno == 1); /* in CHECK contraints */ |
| anum = attrMap(map, var->varattno); |
| |
| if ( anum == 0 ) |
| { |
| /* Should never happen, but best caught early. */ |
| elog(ERROR, "attribute map discrepancy"); |
| } |
| else if ( anum != var->varattno ) |
| { |
| var = copyObject(var); |
| var->varattno = anum; |
| } |
| return (Node *)var; |
| } |
| return expression_tree_mutator(node, apply_attrmap_mutator, (void *)map); |
| } |
| |
| /* Apply attrMap over the Var nodes in an expression. |
| */ |
| Node *attrMapExpr(AttrMap *map, Node *expr) |
| { |
| return apply_attrmap_mutator(expr, map); |
| } |
| |
| |
| /* Check compatibility of the attributes of the given partitioned |
| * table and part for purposes of INSERT (through the partitioned |
| * table) or EXCHANGE (of the part into the partitioned table). |
| * Don't use any partitioning catalogs, because this must run |
| * on the segment databases as well as on the entry database. |
| * |
| * If requested and needed, make a vector mapping the attribute |
| * numbers of the partitioned table to corresponding attribute |
| * numbers in the part. Represent the "unneeded" identity map |
| * as null. |
| * |
| * base -- the partitioned table |
| * part -- the part table |
| * map_ptr -- where to store a pointer to the result, or NULL |
| * throw -- whether to throw an error in case of incompatibility |
| * |
| * The implicit result is a vector one longer than the number |
| * of attributes (existing or not) in the base relation. |
| * It is returned through the map_ptr argument, if that argument |
| * is non-null. |
| * |
| * The explicit result indicates whether the part is compatible |
| * with the base relation. If the throw argument is true, however, |
| * an error is issued rather than returning false. |
| * |
| * Note that, in the map, element 0 is wasted and is always zero, |
| * so the vector is indexed by attribute number (origin 1). |
| * |
| * The i-th element of the map is the attribute number in |
| * the part relation that corresponds to attribute i of the |
| * base relation, or it is zero to indicate that attribute |
| * i of the base relation doesn't exist (has been dropped). |
| * |
| * This is a handy map for renumbering attributes for use with |
| * part relations that may have a different configuration of |
| * "holes" than the partitioned table in which they occur. |
| * |
| * Be sure to call this in the memory context in which the result |
| * vector ought to be stored. |
| * |
| * Though some error checking is done, it is not comprehensive. |
| * If internal assumptions about possible tuple formats are |
| * correct, errors should not occur. Still, the downside is |
| * incorrect data, so use errors (not assertions) for the case. |
| * |
| * Checks include same number of non-dropped attributes in all |
| * parts of a partitioned table, non-dropped attributes in |
| * corresponding relative positions must match in name, type |
| * and alignment, attribute numbers must agree with their |
| * position in tuple, etc. |
| */ |
| bool |
| map_part_attrs(Relation base, Relation part, AttrMap **map_ptr, bool throw) |
| { |
| AttrNumber i = 1; |
| AttrNumber n = base->rd_att->natts; |
| FormData_pg_attribute *battr = NULL; |
| |
| AttrNumber j = 1; |
| AttrNumber m = part->rd_att->natts; |
| FormData_pg_attribute *pattr = NULL; |
| |
| AttrNumber *v = NULL; |
| |
| /* If we might want a map, allocate one. */ |
| if ( map_ptr != NULL ) |
| { |
| v = palloc0(sizeof(AttrNumber)*(n+1)); |
| *map_ptr = NULL; |
| } |
| |
| bool identical = TRUE; |
| bool compatible = TRUE; |
| |
| /* For each matched pair of attributes ... */ |
| while ( i <= n && j <= m ) |
| { |
| battr = base->rd_att->attrs[i-1]; |
| pattr = part->rd_att->attrs[j-1]; |
| |
| /* Skip dropped attributes. */ |
| |
| if ( battr->attisdropped ) |
| { |
| i++; |
| continue; |
| } |
| |
| if ( pattr->attisdropped ) |
| { |
| j++; |
| continue; |
| } |
| |
| /* Check attribute conformability requirements. */ |
| |
| /* -- Names must match. */ |
| if ( strncmp(NameStr(battr->attname), NameStr(pattr->attname), NAMEDATALEN) != 0 ) |
| { |
| if ( throw ) |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("relation \"%s\" must have the same " |
| "column names and column order as \"%s\"", |
| RelationGetRelationName(part), |
| RelationGetRelationName(base)), |
| errOmitLocation(true))); |
| compatible = FALSE; |
| break; |
| } |
| |
| /* -- Types must match. */ |
| if (battr->atttypid != pattr->atttypid) |
| { |
| if ( throw ) |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("type mismatch for attribute \"%s\"", |
| NameStr((battr->attname))), |
| errOmitLocation(true))); |
| compatible = FALSE; |
| break; |
| } |
| |
| /* -- Alignment should match, but check just to be safe. */ |
| if (battr->attalign != pattr->attalign ) |
| { |
| if ( throw ) |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("alignment mismatch for attribute \"%s\"", |
| NameStr((battr->attname))), |
| errOmitLocation(true))); |
| compatible = FALSE; |
| break; |
| } |
| |
| /* -- Attribute numbers must match position (+1) in tuple. |
| * This is a hard requirement so always throw. This could |
| * be an assertion, except that we want to fail even in a |
| * distribution build. |
| */ |
| if ( battr->attnum != i || pattr->attnum != j ) |
| elog(ERROR, |
| "attribute numbers out of order"); |
| |
| /* Note any attribute number difference. */ |
| if ( i != j ) |
| identical = FALSE; |
| |
| /* If we're building a map, update it. */ |
| if ( v != NULL ) |
| v[i] = j; |
| |
| i++; |
| j++; |
| } |
| |
| if ( compatible ) |
| { |
| /* Any excess attributes in parent better be marked dropped */ |
| for ( ; i <= n; i++ ) |
| { |
| if ( !base->rd_att->attrs[i-1]->attisdropped ) |
| { |
| if ( throw ) |
| /* the partitioned table has more columns than the part */ |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("relation \"%s\" must have the same number columns as relation \"%s\"", |
| RelationGetRelationName(part), |
| RelationGetRelationName(base)), |
| errOmitLocation(true))); |
| compatible = FALSE; |
| } |
| } |
| |
| /* Any excess attributes in part better be marked dropped */ |
| for ( ; j <= m; j++ ) |
| { |
| if ( !part->rd_att->attrs[j-1]->attisdropped ) |
| { |
| if ( throw ) |
| /* the partitioned table has fewer columns than the part */ |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("relation \"%s\" must have the same number columns as relation \"%s\"", |
| RelationGetRelationName(part), |
| RelationGetRelationName(base)), |
| errOmitLocation(true))); |
| compatible = FALSE; |
| } |
| } |
| } |
| |
| /* Identical tuple descriptors should have the same number of columns */ |
| if (n != m) |
| { |
| identical = FALSE; |
| } |
| |
| if ( !compatible ) |
| { |
| |
| Assert( !throw ); |
| if ( v != NULL ) |
| pfree(v); |
| return FALSE; |
| } |
| |
| /* If parent and part are the same, don't use a map */ |
| if ( identical && v != NULL ) |
| { |
| pfree(v); |
| v = NULL; |
| } |
| |
| if ( map_ptr != NULL && v != NULL ) |
| { |
| *map_ptr = makeAttrMap(n, v); |
| pfree(v); |
| } |
| return TRUE; |
| } |
| |
| /* |
| * Maps the tuple descriptor of a part to a target tuple descriptor. |
| * The mapping assume that the attributes in the target descriptor |
| * could be in any position in the part descriptor. |
| */ |
| void |
| map_part_attrs_from_targetdesc(TupleDesc target, TupleDesc part, AttrMap **map_ptr) |
| { |
| Assert(target); |
| Assert(part); |
| Assert(NULL == *map_ptr); |
| |
| int ntarget = target->natts; |
| int npart = part->natts; |
| |
| FormData_pg_attribute *tattr = NULL; |
| FormData_pg_attribute *pattr = NULL; |
| |
| AttrNumber *mapper = palloc0(sizeof(AttrNumber)*(ntarget+1)); |
| |
| /* Map every attribute in the target to an attribute |
| * in the part tuple descriptor |
| */ |
| for(AttrNumber i = 0; i < ntarget; i++) |
| { |
| tattr = target->attrs[i]; |
| mapper[i+1] = 0; |
| |
| if ( tattr->attisdropped ) |
| { |
| continue; |
| } |
| |
| /* Scan all attributes in the part tuple descriptor */ |
| for (AttrNumber j = 0; j < npart; j++) |
| { |
| pattr = part->attrs[j]; |
| if ( pattr->attisdropped ) |
| { |
| continue; |
| } |
| |
| if (strncmp(NameStr(tattr->attname), NameStr(pattr->attname), NAMEDATALEN) == 0 && |
| tattr->atttypid == pattr->atttypid && |
| tattr->attalign == pattr->attalign ) |
| { |
| mapper[i+1] = j+1; |
| /* Continue with next attribute in the target list */ |
| break; |
| } |
| } |
| } |
| |
| Assert ( map_ptr != NULL && mapper != NULL ); |
| |
| *map_ptr = makeAttrMap(ntarget, mapper); |
| pfree(mapper); |
| } |
| |
| |
| |
| /* |
| * Clear any partition state held in the argument EState node. This is |
| * called during ExecEndPlan and is not, itself, recursive. |
| * |
| * At present, the only required cleanup is to decrement reference counts |
| * in any tuple descriptors held in slots in the partition state. |
| */ |
| static void |
| ClearPartitionState(EState *estate) |
| { |
| PartitionState *pstate = estate->es_partition_state; |
| HASH_SEQ_STATUS hash_seq_status; |
| ResultPartHashEntry *entry; |
| |
| if ( pstate == NULL || pstate->result_partition_hash == NULL ) |
| return; |
| |
| /* Examine each hash table entry. */ |
| hash_freeze(pstate->result_partition_hash); |
| hash_seq_init(&hash_seq_status, pstate->result_partition_hash); |
| while ( (entry = hash_seq_search(&hash_seq_status)) ) |
| { |
| ResultPartHashEntry *part = (ResultPartHashEntry*)entry; |
| ResultRelInfo *info = &estate->es_result_relations[part->offset]; |
| if ( info->ri_partSlot ) |
| { |
| Assert( info->ri_partInsertMap ); /* paired with slot */ |
| if ( info->ri_partSlot->tts_tupleDescriptor ) |
| ReleaseTupleDesc(info->ri_partSlot->tts_tupleDescriptor); |
| ExecClearTuple(info->ri_partSlot); |
| } |
| } |
| /* No need for hash_seq_term() since we iterated to end. */ |
| } |
| |
| |